Skip to content

Commit 920840f

Browse files
vrocheVltnrch
authored andcommitted
Redis protocol
1 parent 7e9f916 commit 920840f

File tree

6 files changed

+258
-0
lines changed

6 files changed

+258
-0
lines changed

Changelog

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ releases are sorted from youngest to oldest.
33

44
version 7.1.1-mtv:
55
- Molotov patches
6+
- Redis protocol via libhiredis
67

78
version 7.1.1:
89
avformat/hls: Partially revert "reduce default max reload to 3"

configure

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ External library support:
239239
--enable-libglslang enable GLSL->SPIRV compilation via libglslang [no]
240240
--enable-libgme enable Game Music Emu via libgme [no]
241241
--enable-libgsm enable GSM de/encoding via libgsm [no]
242+
--enable-libhiredis enable Redis protocol via libhiredis [no]
242243
--enable-libiec61883 enable iec61883 via libiec61883 [no]
243244
--enable-libilbc enable iLBC de/encoding via libilbc [no]
244245
--enable-libjack enable JACK audio sound server [no]
@@ -1924,6 +1925,7 @@ EXTERNAL_LIBRARY_LIST="
19241925
libglslang
19251926
libgme
19261927
libgsm
1928+
libhiredis
19271929
libiec61883
19281930
libilbc
19291931
libjack
@@ -3808,6 +3810,8 @@ ipns_gateway_protocol_select="https_protocol"
38083810
# external library protocols
38093811
libamqp_protocol_deps="librabbitmq"
38103812
libamqp_protocol_select="network"
3813+
libhiredis_protocol_deps="libhiredis"
3814+
libhiredis_protocol_select="network"
38113815
librist_protocol_deps="librist"
38123816
librist_protocol_select="network"
38133817
librtmp_protocol_deps="librtmp"
@@ -6907,6 +6911,7 @@ enabled libgme && { check_pkg_config libgme libgme gme/gme.h gme_new_
69076911
enabled libgsm && { for gsm_hdr in "gsm.h" "gsm/gsm.h"; do
69086912
check_lib libgsm "${gsm_hdr}" gsm_create -lgsm && break;
69096913
done || die "ERROR: libgsm not found"; }
6914+
enabled libhiredis && require_pkg_config libhiredis "hiredis >= 1.0.2" hiredis/hiredis.h redisConnect
69106915
enabled libilbc && require libilbc ilbc.h WebRtcIlbcfix_InitDecode -lilbc $pthreads_extralibs
69116916
enabled libjxl && require_pkg_config libjxl "libjxl >= 0.7.0" jxl/decode.h JxlDecoderVersion &&
69126917
require_pkg_config libjxl_threads "libjxl_threads >= 0.7.0" jxl/thread_parallel_runner.h JxlThreadParallelRunner

doc/protocols.texi

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,48 @@ Example usage:
816816
-f rtp_mpegts -fec prompeg=l=8:d=4 rtp://@var{hostname}:@var{port}
817817
@end example
818818

819+
@section redis
820+
821+
Redis protocol using hiredis C library.
822+
823+
FFmpeg must be compiled with --enable-libhiredis to support Redis.
824+
825+
Only writing is supported at the moment.
826+
Authentication is not supported at the moment.
827+
828+
Following syntax is required.
829+
830+
@example
831+
redis://[hostname][:port]/key
832+
@end example
833+
834+
Where @var{hostname} (default is 127.0.0.1) and @var{port} (default is 6379) is the address of Redis server,
835+
where @var{key} is the Redis key that will be requested.
836+
837+
The Redis key gets overwritten at each first write:
838+
the SET command is used for the first call to write, the APPEND command is used for all subsequent writes.
839+
840+
The following options are supported:
841+
842+
@table @option
843+
844+
@item timeout
845+
The timeout in microseconds during the initial connection to the Redis server
846+
and all commands executed afterwards.
847+
The default value is 0.5 seconds.
848+
849+
@item ttl
850+
The time to live in seconds of the Redis key.
851+
The default value is 0 second.
852+
When ttl is greater than 0 second, the EXPIRE command is executed is on the Redis key at closing.
853+
854+
@end table
855+
856+
Example: Generate JPEG thumbnails every second from an UDP stream and push it to @var{stream1} key on hostname:port Redis server.
857+
@example
858+
ffmpeg -i udp://<ip>:<port> -vf fps=1 -f image2 -update 1 redis://hostname:port/stream1
859+
@end example
860+
819861
@section rist
820862

821863
Reliable Internet Streaming Transport protocol

libavformat/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,7 @@ OBJS-$(CONFIG_UNIX_PROTOCOL) += unix.o
714714

715715
# external library protocols
716716
OBJS-$(CONFIG_LIBAMQP_PROTOCOL) += libamqp.o urldecode.o
717+
OBJS-$(CONFIG_LIBHIREDIS_PROTOCOL) += libhiredis.o urldecode.o
717718
OBJS-$(CONFIG_LIBRIST_PROTOCOL) += librist.o
718719
OBJS-$(CONFIG_LIBRTMP_PROTOCOL) += librtmp.o
719720
OBJS-$(CONFIG_LIBRTMPE_PROTOCOL) += librtmp.o

libavformat/libhiredis.c

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Copyright (c) 2022 MolotovTv
3+
*
4+
* This file is part of FFmpeg.
5+
*
6+
* FFmpeg is free software; you can redistribute it and/or
7+
* modify it under the terms of the GNU Lesser General Public
8+
* License as published by the Free Software Foundation; either
9+
* version 2.1 of the License, or (at your option) any later version.
10+
*
11+
* FFmpeg is distributed in the hope that it will be useful,
12+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14+
* Lesser General Public License for more details.
15+
*
16+
* You should have received a copy of the GNU Lesser General Public
17+
* License along with FFmpeg; if not, write to the Free Software
18+
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19+
*/
20+
21+
#include <hiredis/hiredis.h>
22+
#include <sys/time.h>
23+
#include "libavutil/mem.h"
24+
#include "libavutil/opt.h"
25+
#include "libavutil/parseutils.h"
26+
#include "avformat.h"
27+
#include "url.h"
28+
#include "urldecode.h"
29+
30+
typedef struct {
31+
const AVClass *class;
32+
redisContext *ctx;
33+
char *key;
34+
int append;
35+
int64_t timeout;
36+
int64_t ttl;
37+
} LIBHIREDISContext;
38+
39+
#define STR_LEN 1024
40+
41+
#define DEFAULT_IP "127.0.0.1"
42+
#define DEFAULT_PORT 6379
43+
#define DEFAULT_TIMEOUT 500000
44+
45+
#define COMMAND_APPEND "APPEND"
46+
#define COMMAND_SET "SET"
47+
#define COMMAND_EXPIRE "EXPIRE"
48+
49+
#define OFFSET(x) offsetof(LIBHIREDISContext, x)
50+
#define D AV_OPT_FLAG_DECODING_PARAM
51+
#define E AV_OPT_FLAG_ENCODING_PARAM
52+
static const AVOption options[] = {
53+
{ "timeout", "Set timeout (in microseconds) of socket I/O operations", OFFSET(timeout), AV_OPT_TYPE_INT64, { .i64 = -1 }, -1, INT64_MAX, .flags = D | E },
54+
{ "ttl", "Time to live (in seconds) of redis key", OFFSET(ttl), AV_OPT_TYPE_INT64, { .i64 = 0 }, 0, INT64_MAX, .flags = D | E },
55+
{ NULL }
56+
};
57+
58+
static int libhiredis_open(URLContext *h, const char *uri, int flags)
59+
{
60+
char hostname[STR_LEN], path[STR_LEN], buf[STR_LEN];
61+
int port;
62+
const char *ip;
63+
char *p;
64+
struct timeval tval = { 0 };
65+
LIBHIREDISContext *r = h->priv_data;
66+
67+
if (flags & AVIO_FLAG_READ) {
68+
av_log(h, AV_LOG_ERROR, "Redis read not supported yet.\n");
69+
return AVERROR(EINVAL);
70+
}
71+
/* Init */
72+
r->append = 0;
73+
/* Parse URI */
74+
av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname),
75+
&port, path, sizeof(path), uri);
76+
/* IP */
77+
if (*hostname == '\0')
78+
ip = DEFAULT_IP;
79+
else
80+
ip = hostname;
81+
/* Port */
82+
if (port < 0)
83+
port = DEFAULT_PORT;
84+
if (port <= 0 || port > 65535 ) {
85+
av_log(h, AV_LOG_ERROR, "Invalid port\n");
86+
return AVERROR(EINVAL);
87+
}
88+
/* Key */
89+
if (*path == '\0' || *(path + 1) == '\0') {
90+
av_log(h, AV_LOG_ERROR, "No key\n");
91+
return AVERROR(EINVAL);
92+
}
93+
p = strchr(path, '?');
94+
if (p)
95+
*p = '\0';
96+
r->key = ff_urldecode(path + 1, 0); /* skip leading '/' */
97+
if (!r->key)
98+
return AVERROR(ENOMEM);
99+
/* Options */
100+
p = strchr(uri, '?');
101+
if (p) {
102+
if (av_find_info_tag(buf, sizeof(buf), "timeout", p))
103+
r->timeout = strtol(buf, NULL, 10);
104+
if (av_find_info_tag(buf, sizeof(buf), "ttl", p))
105+
r->ttl = strtol(buf, NULL, 10);
106+
}
107+
if (r->timeout < 0)
108+
r->timeout = DEFAULT_TIMEOUT;
109+
/* Redis connect */
110+
tval.tv_sec = r->timeout / 1000000;
111+
tval.tv_usec = r->timeout % 1000000;
112+
r->ctx = redisConnectWithTimeout(ip, port, tval);
113+
if (r->ctx == NULL || r->ctx->err) {
114+
av_free(r->key);
115+
if (r->ctx) {
116+
av_log(h, AV_LOG_ERROR, "Error connect: %s\n", r->ctx->errstr);
117+
redisFree(r->ctx);
118+
return AVERROR_EXTERNAL;
119+
}
120+
return AVERROR(ENOMEM);
121+
}
122+
/* Command timeout */
123+
if (redisSetTimeout(r->ctx, tval) != REDIS_OK) {
124+
if (r->ctx->err)
125+
av_log(h, AV_LOG_ERROR, "Error set timeout: %s\n", r->ctx->errstr);
126+
av_free(r->key);
127+
redisFree(r->ctx);
128+
return AVERROR_EXTERNAL;
129+
}
130+
131+
return 0;
132+
}
133+
134+
static int libhiredis_write(URLContext *h, const uint8_t *buf, int size)
135+
{
136+
LIBHIREDISContext *r = h->priv_data;
137+
const char *command;
138+
redisReply *reply;
139+
140+
if (r->append)
141+
command = COMMAND_APPEND;
142+
else
143+
command = COMMAND_SET;
144+
/* Redis command */
145+
reply = redisCommand(r->ctx, "%s %s %b", command, r->key, buf, (size_t)size);
146+
if (!reply) {
147+
if (r->ctx->err == REDIS_ERR_IO)
148+
return AVERROR(EIO);
149+
if (r->ctx->err == REDIS_ERR_TIMEOUT)
150+
return AVERROR(EAGAIN);
151+
return AVERROR(ENOMEM);
152+
} else if (reply->type == REDIS_REPLY_ERROR) {
153+
av_log(h, AV_LOG_ERROR, "Error command (%s): %s\n", command, reply->str);
154+
freeReplyObject(reply);
155+
return AVERROR_EXTERNAL;
156+
}
157+
/* Ok */
158+
freeReplyObject(reply);
159+
r->append = 1;
160+
161+
return size;
162+
}
163+
164+
static int libhiredis_close(URLContext *h)
165+
{
166+
LIBHIREDISContext *r = h->priv_data;
167+
redisReply *reply;
168+
169+
/* TTL / Redis EXPIRE command */
170+
if (r->ttl > 0) {
171+
reply = redisCommand(r->ctx, "%s %s %d", COMMAND_EXPIRE, r->key, r->ttl);
172+
if (!reply) {
173+
if (r->ctx->err == REDIS_ERR_IO)
174+
return AVERROR(EIO);
175+
if (r->ctx->err == REDIS_ERR_TIMEOUT)
176+
return AVERROR(EAGAIN);
177+
return AVERROR(ENOMEM);
178+
} else if (reply->type == REDIS_REPLY_ERROR) {
179+
av_log(h, AV_LOG_ERROR, "Error command (%s): %s\n", COMMAND_EXPIRE, reply->str);
180+
freeReplyObject(reply);
181+
return AVERROR_EXTERNAL;
182+
}
183+
/* Ok */
184+
freeReplyObject(reply);
185+
}
186+
187+
av_free(r->key);
188+
redisFree(r->ctx);
189+
190+
return 0;
191+
}
192+
193+
static const AVClass libhiredis_context_class = {
194+
.class_name = "libhiredis",
195+
.item_name = av_default_item_name,
196+
.option = options,
197+
.version = LIBAVUTIL_VERSION_INT,
198+
};
199+
200+
const URLProtocol ff_libhiredis_protocol = {
201+
.name = "redis",
202+
.url_open = libhiredis_open,
203+
.url_write = libhiredis_write,
204+
.url_close = libhiredis_close,
205+
.priv_data_size = sizeof(LIBHIREDISContext),
206+
.priv_data_class = &libhiredis_context_class,
207+
.flags = URL_PROTOCOL_FLAG_NETWORK,
208+
};

libavformat/protocols.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ extern const URLProtocol ff_udp_protocol;
6666
extern const URLProtocol ff_udplite_protocol;
6767
extern const URLProtocol ff_unix_protocol;
6868
extern const URLProtocol ff_libamqp_protocol;
69+
extern const URLProtocol ff_libhiredis_protocol;
6970
extern const URLProtocol ff_librist_protocol;
7071
extern const URLProtocol ff_librtmp_protocol;
7172
extern const URLProtocol ff_librtmpe_protocol;

0 commit comments

Comments
 (0)