Improve the SAT>IP RTSP implementation

This commit is contained in:
Jaroslav Kysela 2014-04-09 18:21:51 +02:00
parent 76461dd5c1
commit 42b8b19d4e
3 changed files with 128 additions and 71 deletions

View file

@ -637,7 +637,10 @@ satip_frontend_pid_changed( satip_rtsp_connection_t *rtsp,
lfe->sf_pids_tcount = lfe->sf_pids_count;
pthread_mutex_unlock(&lfe->sf_dvr_lock);
r = satip_rtsp_play(rtsp, NULL, add, del);
if (add[0] != '\0' || del[0] != '\0')
r = satip_rtsp_play(rtsp, NULL, add, del);
else
r = 0;
}
if (r < 0)
@ -663,6 +666,7 @@ satip_frontend_input_thread ( void *aux )
size_t c;
int tc;
tvhpoll_event_t ev[4];
tvhpoll_event_t evr;
tvhpoll_t *efd;
int changing = 0, ms = -1, fatal = 0;
uint32_t seq = -1, nseq;
@ -690,6 +694,7 @@ satip_frontend_input_thread ( void *aux )
ev[2].events = TVHPOLL_IN;
ev[2].fd = rtsp->fd;
ev[2].data.u64 = (uint64_t)rtsp;
evr = ev[2];
ev[3].events = TVHPOLL_IN;
ev[3].fd = lfe->sf_dvr_pipe.rd;
ev[3].data.u64 = 0;
@ -717,6 +722,18 @@ satip_frontend_input_thread ( void *aux )
while (tvheadend_running && !fatal) {
if (rtsp->sending) {
if ((evr.events & TVHPOLL_OUT) == 0) {
evr.events |= TVHPOLL_OUT;
tvhpoll_add(efd, &evr, 1);
}
} else {
if (evr.events & TVHPOLL_OUT) {
evr.events &= ~TVHPOLL_OUT;
tvhpoll_add(efd, &evr, 1);
}
}
nfds = tvhpoll_wait(efd, ev, 1, ms);
if (nfds > 0 && ev[0].data.u64 == 0) {
@ -740,12 +757,12 @@ satip_frontend_input_thread ( void *aux )
if (nfds < 1) continue;
if (ev[0].data.u64 == (uint64_t)rtsp) {
r = satip_rtsp_receive(rtsp);
r = satip_rtsp_run(rtsp);
if (r < 0) {
tvhlog(LOG_ERR, "satip", "%s - RTSP error %d (%s) [%i-%i]",
buf, r, strerror(-r), rtsp->cmd, rtsp->code);
fatal = 1;
} else if (r) {
} else if (r == SATIP_RTSP_READ_DONE) {
switch (rtsp->cmd) {
case SATIP_RTSP_CMD_OPTIONS:
r = satip_rtsp_options_decode(rtsp);
@ -783,6 +800,7 @@ satip_frontend_input_thread ( void *aux )
}
}
/* We need to keep the session alive */
if (rtsp->ping_time + rtsp->timeout / 2 < dispatch_clock &&
rtsp->cmd == SATIP_RTSP_CMD_NONE)
satip_rtsp_options(rtsp);
@ -862,11 +880,20 @@ satip_frontend_input_thread ( void *aux )
if (r < 0) {
tvhtrace("satip", "%s - bad teardown", buf);
} else {
while (1) {
tvhpoll_wait(efd, ev, 1, -1);
r = satip_rtsp_receive(rtsp);
if (r)
if (r == SATIP_RTSP_INCOMPLETE) {
evr.events |= TVHPOLL_OUT;
tvhpoll_add(efd, &evr, 1);
}
r = 0;
while (r == SATIP_RTSP_INCOMPLETE) {
if (!rtsp->sending) {
evr.events &= ~TVHPOLL_OUT;
tvhpoll_add(efd, &evr, 1);
}
nfds = tvhpoll_wait(efd, ev, 1, -1);
if (nfds < 0)
break;
r = satip_rtsp_run(rtsp);
}
}
}

View file

@ -210,6 +210,11 @@ typedef enum {
SATIP_RTSP_CMD_DESCRIBE
} satip_rtsp_cmd_t;
#define SATIP_RTSP_OK 1
#define SATIP_RTSP_READ_DONE 1
#define SATIP_RTSP_SEND_DONE 1
#define SATIP_RTSP_INCOMPLETE 0
typedef struct satip_rtsp_connection {
/* decoded answer */
int cseq;
@ -229,6 +234,8 @@ typedef struct satip_rtsp_connection {
int fd;
char rbuf[4096];
size_t rsize;
size_t hsize; /* header size */
size_t csize; /* contents size (exclude header) */
char *wbuf;
size_t wpos;
size_t wsize;
@ -252,7 +259,7 @@ satip_rtsp_send( satip_rtsp_connection_t *conn, htsbuf_queue_t *q,
satip_rtsp_cmd_t cmd );
int
satip_rtsp_receive( satip_rtsp_connection_t *conn );
satip_rtsp_run( satip_rtsp_connection_t *conn );
int
satip_rtsp_options_decode( satip_rtsp_connection_t *conn );
@ -276,4 +283,10 @@ satip_rtsp_play( satip_rtsp_connection_t *sd, const char *pids,
int
satip_rtsp_teardown( satip_rtsp_connection_t *conn );
int
satip_rtsp_describe_decode( satip_rtsp_connection_t *conn );
int
satip_rtsp_describe( satip_rtsp_connection_t *conn );
#endif /* __TVH_SATIP_PRIVATE_H__ */

View file

@ -74,25 +74,23 @@ satip_rtsp_send_partial( satip_rtsp_connection_t *conn )
r = send(conn->fd, conn->wbuf + conn->wpos, conn->wsize - conn->wpos, MSG_DONTWAIT);
if (r < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
continue;
return SATIP_RTSP_INCOMPLETE;
return -errno;
}
conn->wpos += r;
if (conn->wpos >= conn->wsize) {
conn->sending = 0;
return 1;
return SATIP_RTSP_SEND_DONE;
}
break;
}
return 0;
return SATIP_RTSP_INCOMPLETE;
}
int
satip_rtsp_send( satip_rtsp_connection_t *conn, htsbuf_queue_t *q,
satip_rtsp_cmd_t cmd )
{
int r;
conn->ping_time = dispatch_clock;
conn->cmd = cmd;
free(conn->wbuf);
@ -104,8 +102,7 @@ satip_rtsp_send( satip_rtsp_connection_t *conn, htsbuf_queue_t *q,
tvhtrace("satip", "%s - sending RTSP cmd", conn->device->sd_info.addr);
tvhlog_hexdump("satip", conn->wbuf, conn->wsize);
#endif
while (!(r = satip_rtsp_send_partial(conn))) ;
return r;
return satip_rtsp_send_partial(conn);
}
static int
@ -115,23 +112,37 @@ satip_rtsp_send2( satip_rtsp_connection_t *conn, htsbuf_queue_t *q,
conn->wq2_loaded = 1;
conn->wq2_cmd = cmd;
htsbuf_appendq(&conn->wq2, q);
return 1;
return SATIP_RTSP_SEND_DONE;
}
static char *
satip_rtsp_hstrip(char *h)
{
while (*h && *h <= ' ')
h++;
return h;
}
int
satip_rtsp_receive( satip_rtsp_connection_t *conn )
satip_rtsp_run( satip_rtsp_connection_t *conn )
{
char buf[1024], *saveptr, *argv[3], *d, *p, *p1;
htsbuf_queue_t header, data;
htsbuf_queue_t header;
int cseq_seen;
ssize_t r;
size_t len;
if (conn->sending) {
r = satip_rtsp_send_partial(conn);
if (r < 0 || r == SATIP_RTSP_INCOMPLETE)
return r;
}
r = recv(conn->fd, buf, sizeof(buf), MSG_DONTWAIT);
if (r == 0)
return -ESTRPIPE;
if (r < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
return 0;
return SATIP_RTSP_INCOMPLETE;
return -errno;
}
#if ENABLE_TRACE
@ -145,11 +156,11 @@ satip_rtsp_receive( satip_rtsp_connection_t *conn )
memcpy(conn->rbuf + conn->rsize, buf, r);
conn->rsize += r;
conn->rbuf[conn->rsize] = '\0';
if (conn->rsize > 3 &&
if (!conn->csize && conn->rsize > 3 &&
(d = strstr(conn->rbuf, "\r\n\r\n")) != NULL) {
conn->hsize = d - conn->rbuf + 4;
*d = '\0';
htsbuf_queue_init(&header, 0);
htsbuf_queue_init(&data, 0);
p = strtok_r(conn->rbuf, "\r\n", &saveptr);
if (p == NULL)
goto fail;
@ -162,50 +173,60 @@ satip_rtsp_receive( satip_rtsp_connection_t *conn )
goto fail;
cseq_seen = 0;
while ((p = strtok_r(NULL, "\r\n", &saveptr)) != NULL) {
p1 = strdup(p);
if (http_tokenize(p, argv, 2, ':') != 2)
goto fail;
if (strcmp(argv[0], "CSeq") == 0) {
cseq_seen = conn->cseq == atoi(argv[1]);
if (strncasecmp(p, "CSeq:", 5) == 0) {
p1 = satip_rtsp_hstrip(p + 5);
if (p1)
cseq_seen = conn->cseq == atoi(p1);
} else if (strncasecmp(p, "Content-Length:", 15) == 0) {
conn->csize = atoll(p + 15);
} else {
htsbuf_append(&header, p1, strlen(p1));
htsbuf_append(&header, p, strlen(p));
htsbuf_append(&header, "\n", 1);
}
free(p1);
}
if (!cseq_seen)
goto fail;
free(conn->header);
free(conn->data);
conn->header = htsbuf_to_string(&header);
conn->data = htsbuf_to_string(&data);
htsbuf_queue_flush(&header);
free(conn->data);
conn->data = NULL;
if (!conn->csize)
goto processed;
if (conn->rsize > conn->hsize)
goto data;
} else if (conn->hsize + conn->csize >= conn->rsize) {
data:
conn->data = malloc(conn->csize + 1);
memcpy(conn->data, conn->rbuf + conn->hsize, conn->csize);
conn->data[conn->csize] = '\0';
processed:
len = conn->hsize + conn->csize;
memcpy(conn->rbuf, conn->rbuf + len, conn->rsize - len);
conn->rsize -= len;
#if ENABLE_TRACE
tvhtrace("satip", "%s - received RTSP header", conn->device->sd_info.addr);
tvhlog_hexdump("satip", conn->header, strlen(conn->header));
if (strlen(conn->data)) {
if (conn->csize) {
tvhtrace("satip", "%s - received RTSP data", conn->device->sd_info.addr);
tvhlog_hexdump("satip", conn->data, strlen(conn->data));
tvhlog_hexdump("satip", conn->data, conn->csize);
}
#endif
htsbuf_queue_flush(&header);
htsbuf_queue_flush(&data);
conn->rsize = 0;
conn->hsize = conn->csize = 0;
/* second write */
if (conn->wq2_loaded && conn->code == 200) {
if (conn->wq2_loaded && conn->code == 200 && !conn->rsize) {
r = satip_rtsp_send(conn, &conn->wq2, conn->wq2_cmd);
htsbuf_queue_flush(&conn->wq2);
conn->wq2_loaded = 0;
return r;
}
return 1;
return SATIP_RTSP_READ_DONE;
fail:
htsbuf_queue_flush(&header);
htsbuf_queue_flush(&data);
conn->rsize = 0;
return -EINVAL;
}
/* unfinished */
return 0;
return SATIP_RTSP_INCOMPLETE;
}
/*
@ -221,7 +242,7 @@ satip_rtsp_options_decode( satip_rtsp_connection_t *conn )
s = strtok_r(conn->header, "\n", &saveptr);
while (s) {
n = http_tokenize(s, argv, 32, ',');
if (strcmp(argv[0], "Public:") == 0)
if (strcasecmp(argv[0], "Public:") == 0)
for (i = 1; i < n; i++) {
if (strcmp(argv[i], "DESCRIBE") == 0)
what |= 1;
@ -234,7 +255,7 @@ satip_rtsp_options_decode( satip_rtsp_connection_t *conn )
}
s = strtok_r(NULL, "\n", &saveptr);
}
return (conn->code != 200 && what != 0x0f) ? -1 : 1;
return (conn->code != 200 && what != 0x0f) ? -EIO : SATIP_RTSP_OK;
}
void
@ -255,35 +276,33 @@ satip_rtsp_setup_decode( satip_rtsp_connection_t *conn )
char *argv[32], *s, *saveptr;
int i, n;
if (conn->code >= 400)
return -1;
if (conn->code != 200)
return 0;
return -EIO;
conn->client_port = 0;
s = strtok_r(conn->header, "\n", &saveptr);
while (s) {
n = http_tokenize(s, argv, 32, ';');
if (strcmp(argv[0], "Session:") == 0) {
if (strcasecmp(argv[0], "Session:") == 0) {
conn->session = strdup(argv[1]);
for (i = 2; i < n; i++) {
if (strncmp(argv[i], "timeout=", 8) == 0) {
if (strncasecmp(argv[i], "timeout=", 8) == 0) {
conn->timeout = atoi(argv[i] + 8);
if (conn->timeout <= 20 || conn->timeout > 3600)
return -1;
return -EIO;
}
}
} else if (strcmp(argv[0], "com.ses.streamID:") == 0) {
} else if (strcasecmp(argv[0], "com.ses.streamID:") == 0) {
conn->stream_id = atoll(argv[1]);
/* zero is valid stream id per specification */
if (argv[1][0] == '0' && argv[1][0] == '\0')
conn->stream_id = 0;
else if (conn->stream_id <= 0)
return -1;
} else if (strcmp(argv[0], "Transport:") == 0) {
if (strcmp(argv[1], "RTP/AVP"))
return -1;
if (strcmp(argv[2], "unicast"))
return -1;
return -EIO;
} else if (strcasecmp(argv[0], "Transport:") == 0) {
if (strcasecmp(argv[1], "RTP/AVP"))
return -EIO;
if (strcasecmp(argv[2], "unicast"))
return -EIO;
for (i = 2; i < n; i++) {
if (strncmp(argv[i], "client_port=", 12) == 0)
conn->client_port = atoi(argv[i] + 12);
@ -291,7 +310,7 @@ satip_rtsp_setup_decode( satip_rtsp_connection_t *conn )
}
s = strtok_r(NULL, "\n", &saveptr);
}
return 1;
return SATIP_RTSP_OK;
}
typedef struct tvh2satip {
@ -499,9 +518,9 @@ satip_rtsp_play( satip_rtsp_connection_t *conn, const char *pids,
delpids = satip_rtsp_pids_strip(conn, delpids);
if (pids == NULL && addpids == NULL && delpids == NULL)
return 1;
return -EINVAL;
// printf("pids = '%s' addpids = '%s' delpids = '%s'\n", pids, addpids, delpids);
//printf("pids = '%s' addpids = '%s' delpids = '%s'\n", pids, addpids, delpids);
htsbuf_queue_init(&q, 0);
htsbuf_qprintf(&q, "PLAY rtsp://%s/stream=%li?",
@ -527,7 +546,7 @@ satip_rtsp_play( satip_rtsp_connection_t *conn, const char *pids,
htsbuf_qprintf(&q, " RTSP/1.0\r\nSession: %s\r\n", conn->session);
r = satip_rtsp_send(conn, &q, SATIP_RTSP_CMD_PLAY);
htsbuf_queue_flush(&q);
if (r || !split)
if (r < 0 || !split)
return r;
htsbuf_queue_init(&q, 0);
@ -554,27 +573,25 @@ satip_rtsp_teardown( satip_rtsp_connection_t *conn )
return r;
}
#if 0
static int
satip_rtsp_describe_decode
( satip_connection_t *conn )
int
satip_rtsp_describe_decode( satip_rtsp_connection_t *conn )
{
if (header == NULL)
return 1;
printf("describe: %i\n", conn->code);
printf("header:\n%s\n", conn->header);
printf("data:\n%s\n", conn->data);
return 0;
return SATIP_RTSP_SEND_DONE;
}
static void
satip_rtsp_describe( satip_connection_t *conn )
int
satip_rtsp_describe( satip_rtsp_connection_t *conn )
{
int r;
htsbuf_queue_t q;
htsbuf_queue_init(&q, 0);
htsbuf_qprintf(&q,
"DESCRIBE rtsp://%s/ RTSP/1.0\r\n", sd->sd_info.addr);
satip_rtsp_write(conn, &q);
"DESCRIBE rtsp://%s/ RTSP/1.0\r\n", conn->device->sd_info.addr);
r = satip_rtsp_send(conn, &q, SATIP_RTSP_CMD_DESCRIBE);
htsbuf_queue_flush(&q);
return r;
}
#endif