From 42b8b19d4e24ce0170591874fa907c94c8f567c4 Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Wed, 9 Apr 2014 18:21:51 +0200 Subject: [PATCH] Improve the SAT>IP RTSP implementation --- src/input/mpegts/satip/satip_frontend.c | 41 +++++-- src/input/mpegts/satip/satip_private.h | 15 ++- src/input/mpegts/satip/satip_rtsp.c | 143 +++++++++++++----------- 3 files changed, 128 insertions(+), 71 deletions(-) diff --git a/src/input/mpegts/satip/satip_frontend.c b/src/input/mpegts/satip/satip_frontend.c index d6568e36..987d4ac1 100644 --- a/src/input/mpegts/satip/satip_frontend.c +++ b/src/input/mpegts/satip/satip_frontend.c @@ -637,7 +637,10 @@ satip_frontend_pid_changed( satip_rtsp_connection_t *rtsp, lfe->sf_pids_tcount = lfe->sf_pids_count; pthread_mutex_unlock(&lfe->sf_dvr_lock); - r = satip_rtsp_play(rtsp, NULL, add, del); + if (add[0] != '\0' || del[0] != '\0') + r = satip_rtsp_play(rtsp, NULL, add, del); + else + r = 0; } if (r < 0) @@ -663,6 +666,7 @@ satip_frontend_input_thread ( void *aux ) size_t c; int tc; tvhpoll_event_t ev[4]; + tvhpoll_event_t evr; tvhpoll_t *efd; int changing = 0, ms = -1, fatal = 0; uint32_t seq = -1, nseq; @@ -690,6 +694,7 @@ satip_frontend_input_thread ( void *aux ) ev[2].events = TVHPOLL_IN; ev[2].fd = rtsp->fd; ev[2].data.u64 = (uint64_t)rtsp; + evr = ev[2]; ev[3].events = TVHPOLL_IN; ev[3].fd = lfe->sf_dvr_pipe.rd; ev[3].data.u64 = 0; @@ -717,6 +722,18 @@ satip_frontend_input_thread ( void *aux ) while (tvheadend_running && !fatal) { + if (rtsp->sending) { + if ((evr.events & TVHPOLL_OUT) == 0) { + evr.events |= TVHPOLL_OUT; + tvhpoll_add(efd, &evr, 1); + } + } else { + if (evr.events & TVHPOLL_OUT) { + evr.events &= ~TVHPOLL_OUT; + tvhpoll_add(efd, &evr, 1); + } + } + nfds = tvhpoll_wait(efd, ev, 1, ms); if (nfds > 0 && ev[0].data.u64 == 0) { @@ -740,12 +757,12 @@ satip_frontend_input_thread ( void *aux ) if (nfds < 1) continue; if (ev[0].data.u64 == (uint64_t)rtsp) { - r = satip_rtsp_receive(rtsp); + r = satip_rtsp_run(rtsp); if (r < 0) { tvhlog(LOG_ERR, "satip", "%s - RTSP error %d (%s) [%i-%i]", buf, r, strerror(-r), rtsp->cmd, rtsp->code); fatal = 1; - } else if (r) { + } else if (r == SATIP_RTSP_READ_DONE) { switch (rtsp->cmd) { case SATIP_RTSP_CMD_OPTIONS: r = satip_rtsp_options_decode(rtsp); @@ -783,6 +800,7 @@ satip_frontend_input_thread ( void *aux ) } } + /* We need to keep the session alive */ if (rtsp->ping_time + rtsp->timeout / 2 < dispatch_clock && rtsp->cmd == SATIP_RTSP_CMD_NONE) satip_rtsp_options(rtsp); @@ -862,11 +880,20 @@ satip_frontend_input_thread ( void *aux ) if (r < 0) { tvhtrace("satip", "%s - bad teardown", buf); } else { - while (1) { - tvhpoll_wait(efd, ev, 1, -1); - r = satip_rtsp_receive(rtsp); - if (r) + if (r == SATIP_RTSP_INCOMPLETE) { + evr.events |= TVHPOLL_OUT; + tvhpoll_add(efd, &evr, 1); + } + r = 0; + while (r == SATIP_RTSP_INCOMPLETE) { + if (!rtsp->sending) { + evr.events &= ~TVHPOLL_OUT; + tvhpoll_add(efd, &evr, 1); + } + nfds = tvhpoll_wait(efd, ev, 1, -1); + if (nfds < 0) break; + r = satip_rtsp_run(rtsp); } } } diff --git a/src/input/mpegts/satip/satip_private.h b/src/input/mpegts/satip/satip_private.h index a064b2f6..67554e5b 100644 --- a/src/input/mpegts/satip/satip_private.h +++ b/src/input/mpegts/satip/satip_private.h @@ -210,6 +210,11 @@ typedef enum { SATIP_RTSP_CMD_DESCRIBE } satip_rtsp_cmd_t; +#define SATIP_RTSP_OK 1 +#define SATIP_RTSP_READ_DONE 1 +#define SATIP_RTSP_SEND_DONE 1 +#define SATIP_RTSP_INCOMPLETE 0 + typedef struct satip_rtsp_connection { /* decoded answer */ int cseq; @@ -229,6 +234,8 @@ typedef struct satip_rtsp_connection { int fd; char rbuf[4096]; size_t rsize; + size_t hsize; /* header size */ + size_t csize; /* contents size (exclude header) */ char *wbuf; size_t wpos; size_t wsize; @@ -252,7 +259,7 @@ satip_rtsp_send( satip_rtsp_connection_t *conn, htsbuf_queue_t *q, satip_rtsp_cmd_t cmd ); int -satip_rtsp_receive( satip_rtsp_connection_t *conn ); +satip_rtsp_run( satip_rtsp_connection_t *conn ); int satip_rtsp_options_decode( satip_rtsp_connection_t *conn ); @@ -276,4 +283,10 @@ satip_rtsp_play( satip_rtsp_connection_t *sd, const char *pids, int satip_rtsp_teardown( satip_rtsp_connection_t *conn ); +int +satip_rtsp_describe_decode( satip_rtsp_connection_t *conn ); + +int +satip_rtsp_describe( satip_rtsp_connection_t *conn ); + #endif /* __TVH_SATIP_PRIVATE_H__ */ diff --git a/src/input/mpegts/satip/satip_rtsp.c b/src/input/mpegts/satip/satip_rtsp.c index ab6574bc..a9f2117a 100644 --- a/src/input/mpegts/satip/satip_rtsp.c +++ b/src/input/mpegts/satip/satip_rtsp.c @@ -74,25 +74,23 @@ satip_rtsp_send_partial( satip_rtsp_connection_t *conn ) r = send(conn->fd, conn->wbuf + conn->wpos, conn->wsize - conn->wpos, MSG_DONTWAIT); if (r < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) - continue; + return SATIP_RTSP_INCOMPLETE; return -errno; } conn->wpos += r; if (conn->wpos >= conn->wsize) { conn->sending = 0; - return 1; + return SATIP_RTSP_SEND_DONE; } break; } - return 0; + return SATIP_RTSP_INCOMPLETE; } int satip_rtsp_send( satip_rtsp_connection_t *conn, htsbuf_queue_t *q, satip_rtsp_cmd_t cmd ) { - int r; - conn->ping_time = dispatch_clock; conn->cmd = cmd; free(conn->wbuf); @@ -104,8 +102,7 @@ satip_rtsp_send( satip_rtsp_connection_t *conn, htsbuf_queue_t *q, tvhtrace("satip", "%s - sending RTSP cmd", conn->device->sd_info.addr); tvhlog_hexdump("satip", conn->wbuf, conn->wsize); #endif - while (!(r = satip_rtsp_send_partial(conn))) ; - return r; + return satip_rtsp_send_partial(conn); } static int @@ -115,23 +112,37 @@ satip_rtsp_send2( satip_rtsp_connection_t *conn, htsbuf_queue_t *q, conn->wq2_loaded = 1; conn->wq2_cmd = cmd; htsbuf_appendq(&conn->wq2, q); - return 1; + return SATIP_RTSP_SEND_DONE; +} + +static char * +satip_rtsp_hstrip(char *h) +{ + while (*h && *h <= ' ') + h++; + return h; } int -satip_rtsp_receive( satip_rtsp_connection_t *conn ) +satip_rtsp_run( satip_rtsp_connection_t *conn ) { char buf[1024], *saveptr, *argv[3], *d, *p, *p1; - htsbuf_queue_t header, data; + htsbuf_queue_t header; int cseq_seen; ssize_t r; + size_t len; + if (conn->sending) { + r = satip_rtsp_send_partial(conn); + if (r < 0 || r == SATIP_RTSP_INCOMPLETE) + return r; + } r = recv(conn->fd, buf, sizeof(buf), MSG_DONTWAIT); if (r == 0) return -ESTRPIPE; if (r < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) - return 0; + return SATIP_RTSP_INCOMPLETE; return -errno; } #if ENABLE_TRACE @@ -145,11 +156,11 @@ satip_rtsp_receive( satip_rtsp_connection_t *conn ) memcpy(conn->rbuf + conn->rsize, buf, r); conn->rsize += r; conn->rbuf[conn->rsize] = '\0'; - if (conn->rsize > 3 && + if (!conn->csize && conn->rsize > 3 && (d = strstr(conn->rbuf, "\r\n\r\n")) != NULL) { + conn->hsize = d - conn->rbuf + 4; *d = '\0'; htsbuf_queue_init(&header, 0); - htsbuf_queue_init(&data, 0); p = strtok_r(conn->rbuf, "\r\n", &saveptr); if (p == NULL) goto fail; @@ -162,50 +173,60 @@ satip_rtsp_receive( satip_rtsp_connection_t *conn ) goto fail; cseq_seen = 0; while ((p = strtok_r(NULL, "\r\n", &saveptr)) != NULL) { - p1 = strdup(p); - if (http_tokenize(p, argv, 2, ':') != 2) - goto fail; - if (strcmp(argv[0], "CSeq") == 0) { - cseq_seen = conn->cseq == atoi(argv[1]); + if (strncasecmp(p, "CSeq:", 5) == 0) { + p1 = satip_rtsp_hstrip(p + 5); + if (p1) + cseq_seen = conn->cseq == atoi(p1); + } else if (strncasecmp(p, "Content-Length:", 15) == 0) { + conn->csize = atoll(p + 15); } else { - htsbuf_append(&header, p1, strlen(p1)); + htsbuf_append(&header, p, strlen(p)); htsbuf_append(&header, "\n", 1); } - free(p1); } if (!cseq_seen) goto fail; free(conn->header); - free(conn->data); conn->header = htsbuf_to_string(&header); - conn->data = htsbuf_to_string(&data); + htsbuf_queue_flush(&header); + free(conn->data); + conn->data = NULL; + if (!conn->csize) + goto processed; + if (conn->rsize > conn->hsize) + goto data; + } else if (conn->hsize + conn->csize >= conn->rsize) { +data: + conn->data = malloc(conn->csize + 1); + memcpy(conn->data, conn->rbuf + conn->hsize, conn->csize); + conn->data[conn->csize] = '\0'; +processed: + len = conn->hsize + conn->csize; + memcpy(conn->rbuf, conn->rbuf + len, conn->rsize - len); + conn->rsize -= len; #if ENABLE_TRACE tvhtrace("satip", "%s - received RTSP header", conn->device->sd_info.addr); tvhlog_hexdump("satip", conn->header, strlen(conn->header)); - if (strlen(conn->data)) { + if (conn->csize) { tvhtrace("satip", "%s - received RTSP data", conn->device->sd_info.addr); - tvhlog_hexdump("satip", conn->data, strlen(conn->data)); + tvhlog_hexdump("satip", conn->data, conn->csize); } #endif - htsbuf_queue_flush(&header); - htsbuf_queue_flush(&data); - conn->rsize = 0; + conn->hsize = conn->csize = 0; /* second write */ - if (conn->wq2_loaded && conn->code == 200) { + if (conn->wq2_loaded && conn->code == 200 && !conn->rsize) { r = satip_rtsp_send(conn, &conn->wq2, conn->wq2_cmd); htsbuf_queue_flush(&conn->wq2); conn->wq2_loaded = 0; return r; } - return 1; + return SATIP_RTSP_READ_DONE; fail: htsbuf_queue_flush(&header); - htsbuf_queue_flush(&data); conn->rsize = 0; return -EINVAL; } - /* unfinished */ - return 0; + return SATIP_RTSP_INCOMPLETE; } /* @@ -221,7 +242,7 @@ satip_rtsp_options_decode( satip_rtsp_connection_t *conn ) s = strtok_r(conn->header, "\n", &saveptr); while (s) { n = http_tokenize(s, argv, 32, ','); - if (strcmp(argv[0], "Public:") == 0) + if (strcasecmp(argv[0], "Public:") == 0) for (i = 1; i < n; i++) { if (strcmp(argv[i], "DESCRIBE") == 0) what |= 1; @@ -234,7 +255,7 @@ satip_rtsp_options_decode( satip_rtsp_connection_t *conn ) } s = strtok_r(NULL, "\n", &saveptr); } - return (conn->code != 200 && what != 0x0f) ? -1 : 1; + return (conn->code != 200 && what != 0x0f) ? -EIO : SATIP_RTSP_OK; } void @@ -255,35 +276,33 @@ satip_rtsp_setup_decode( satip_rtsp_connection_t *conn ) char *argv[32], *s, *saveptr; int i, n; - if (conn->code >= 400) - return -1; if (conn->code != 200) - return 0; + return -EIO; conn->client_port = 0; s = strtok_r(conn->header, "\n", &saveptr); while (s) { n = http_tokenize(s, argv, 32, ';'); - if (strcmp(argv[0], "Session:") == 0) { + if (strcasecmp(argv[0], "Session:") == 0) { conn->session = strdup(argv[1]); for (i = 2; i < n; i++) { - if (strncmp(argv[i], "timeout=", 8) == 0) { + if (strncasecmp(argv[i], "timeout=", 8) == 0) { conn->timeout = atoi(argv[i] + 8); if (conn->timeout <= 20 || conn->timeout > 3600) - return -1; + return -EIO; } } - } else if (strcmp(argv[0], "com.ses.streamID:") == 0) { + } else if (strcasecmp(argv[0], "com.ses.streamID:") == 0) { conn->stream_id = atoll(argv[1]); /* zero is valid stream id per specification */ if (argv[1][0] == '0' && argv[1][0] == '\0') conn->stream_id = 0; else if (conn->stream_id <= 0) - return -1; - } else if (strcmp(argv[0], "Transport:") == 0) { - if (strcmp(argv[1], "RTP/AVP")) - return -1; - if (strcmp(argv[2], "unicast")) - return -1; + return -EIO; + } else if (strcasecmp(argv[0], "Transport:") == 0) { + if (strcasecmp(argv[1], "RTP/AVP")) + return -EIO; + if (strcasecmp(argv[2], "unicast")) + return -EIO; for (i = 2; i < n; i++) { if (strncmp(argv[i], "client_port=", 12) == 0) conn->client_port = atoi(argv[i] + 12); @@ -291,7 +310,7 @@ satip_rtsp_setup_decode( satip_rtsp_connection_t *conn ) } s = strtok_r(NULL, "\n", &saveptr); } - return 1; + return SATIP_RTSP_OK; } typedef struct tvh2satip { @@ -499,9 +518,9 @@ satip_rtsp_play( satip_rtsp_connection_t *conn, const char *pids, delpids = satip_rtsp_pids_strip(conn, delpids); if (pids == NULL && addpids == NULL && delpids == NULL) - return 1; + return -EINVAL; - // printf("pids = '%s' addpids = '%s' delpids = '%s'\n", pids, addpids, delpids); + //printf("pids = '%s' addpids = '%s' delpids = '%s'\n", pids, addpids, delpids); htsbuf_queue_init(&q, 0); htsbuf_qprintf(&q, "PLAY rtsp://%s/stream=%li?", @@ -527,7 +546,7 @@ satip_rtsp_play( satip_rtsp_connection_t *conn, const char *pids, htsbuf_qprintf(&q, " RTSP/1.0\r\nSession: %s\r\n", conn->session); r = satip_rtsp_send(conn, &q, SATIP_RTSP_CMD_PLAY); htsbuf_queue_flush(&q); - if (r || !split) + if (r < 0 || !split) return r; htsbuf_queue_init(&q, 0); @@ -554,27 +573,25 @@ satip_rtsp_teardown( satip_rtsp_connection_t *conn ) return r; } -#if 0 -static int -satip_rtsp_describe_decode - ( satip_connection_t *conn ) +int +satip_rtsp_describe_decode( satip_rtsp_connection_t *conn ) { - if (header == NULL) - return 1; printf("describe: %i\n", conn->code); printf("header:\n%s\n", conn->header); printf("data:\n%s\n", conn->data); - return 0; + return SATIP_RTSP_SEND_DONE; } -static void -satip_rtsp_describe( satip_connection_t *conn ) +int +satip_rtsp_describe( satip_rtsp_connection_t *conn ) { + int r; + htsbuf_queue_t q; htsbuf_queue_init(&q, 0); htsbuf_qprintf(&q, - "DESCRIBE rtsp://%s/ RTSP/1.0\r\n", sd->sd_info.addr); - satip_rtsp_write(conn, &q); + "DESCRIBE rtsp://%s/ RTSP/1.0\r\n", conn->device->sd_info.addr); + r = satip_rtsp_send(conn, &q, SATIP_RTSP_CMD_DESCRIBE); htsbuf_queue_flush(&q); + return r; } -#endif