From 15536d8296b2caec5e443b61122f57c3548db28e Mon Sep 17 00:00:00 2001 From: Adam Sutton Date: Wed, 1 May 2013 09:46:06 +0100 Subject: [PATCH] mpegts: some further updates to get things working properly Started to get basic IPTV input working, though it still needs lots of work to add in the missing stuff. --- Makefile | 1 - src/input.h | 6 ++ src/input/mpegts.h | 2 + src/input/mpegts/iptv/iptv.c | 116 +++++++++++++++++++-------- src/input/mpegts/iptv/iptv_mux.c | 10 ++- src/input/mpegts/iptv/iptv_private.h | 1 - src/input/mpegts/mpegts_mux.c | 6 +- src/main.c | 5 +- src/service.c | 2 - src/service.h | 25 +++--- 10 files changed, 117 insertions(+), 57 deletions(-) diff --git a/Makefile b/Makefile index 6ad0f5e9..52040f1b 100644 --- a/Makefile +++ b/Makefile @@ -108,7 +108,6 @@ SRCS = src/version.c \ src/tvhtime.c \ src/descrambler/descrambler.c \ src/serviceprobe.c \ - src/input.c \ SRCS += \ src/parsers/parsers.c \ diff --git a/src/input.h b/src/input.h index 4765618a..a377d7bc 100644 --- a/src/input.h +++ b/src/input.h @@ -28,6 +28,12 @@ #if ENABLE_TSFILE #include "input/mpegts/tsfile.h" #endif +#if ENABLE_IPTV +#include "input/mpegts/iptv.h" +#endif +#if ENABLE_LINUXDVB +#include "input/mpegts/linuxdvb.h" +#endif #endif void input_init ( void ); diff --git a/src/input/mpegts.h b/src/input/mpegts.h index 1466d1db..1752c37c 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -255,6 +255,7 @@ struct mpegts_service { service_t; // Parent +#if 0 // TODO: temporarily moved to service_t /** * PID carrying the programs PCR. * XXX: We don't support transports that does not carry @@ -266,6 +267,7 @@ struct mpegts_service * PID for the PMT of this MPEG-TS stream. */ uint16_t s_pmt_pid; +#endif /* * Fields defined by DVB standard EN 300 468 diff --git a/src/input/mpegts/iptv/iptv.c b/src/input/mpegts/iptv/iptv.c index 64d4e14b..c5f4336a 100644 --- a/src/input/mpegts/iptv/iptv.c +++ b/src/input/mpegts/iptv/iptv.c @@ -24,7 +24,7 @@ #include #include #include - +#include /* * Globals @@ -35,6 +35,10 @@ int iptv_poll_fd; pthread_t iptv_thread; pthread_mutex_t iptv_lock; +/* + * URL processing - TODO: move to a library + */ + typedef struct url { char buf[2048]; @@ -67,58 +71,64 @@ url_parse ( url_t *up, const char *urlstr ) /* Port */ up->port = 0; - if (!(t2 = strstr(up->host, "::"))) { + if (!(t2 = strstr(up->host, ":"))) { if (!strcmp(up->scheme, "https")) up->port = 443; else if (!strcmp(up->scheme, "http")) up->port = 80; + } else { + *t2 = 0; + up->port = atoi(t2+1); } return 0; } -static int http_connect (const char *urlstr, htsbuf_queue_t *spill) +/* + * HTTP client + */ + +static int http_connect (const char *urlstr) { - int fd, c; + int fd, c, i; char buf[1024]; url_t url; if (url_parse(&url, urlstr)) return -1; /* Make connection */ + // TODO: move connection to thread + // TODO: this is really only for testing and to allow use of TVH webserver as input + tvhlog(LOG_DEBUG, "iptv", "connecting to http %s %d", url.host, url.port); fd = tcp_connect(url.host, url.port, buf, sizeof(buf), 10); - if (!fd) + if (fd < 0) { + tvhlog(LOG_ERR, "iptv", "tcp_connect() failed %s", buf); return -1; + } /* Send request (VERY basic) */ - c = snprintf(buf, sizeof(buf), "GET %s HTTP/1.1\r\n", urlstr); + c = snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\n", url.path); tvh_write(fd, buf, c); c = snprintf(buf, sizeof(buf), "Hostname: %s\r\n", url.host); tvh_write(fd, buf, c); tvh_write(fd, "\r\n", 2); /* Read back header */ - htsbuf_queue_flush(spill); - while ((c = tcp_read_line(fd, buf, sizeof(buf), spill))) - if (!*buf) break; + // TODO: do this properly + i = 0; + while (1) { + if (!(c = read(fd, buf+i, 1))) + continue; + i++; + if (i == 4 && !strncmp(buf, "\r\n\r\n", 4)) + break; + memmove(buf, buf+1, 3); i = 3; + } + printf("DONE\n"); return fd; } -/* - * HTTP - */ -static int -iptv_input_start_http ( iptv_mux_t *im ) -{ - int ret = SM_CODE_TUNING_FAILED; - - /* Setup connection */ - im->mm_iptv_fd = http_connect(im->mm_iptv_url, &im->mm_iptv_spill); - - return ret; -} - /* * Input definition */ @@ -131,6 +141,18 @@ const idclass_t iptv_input_class = { } }; +/* + * HTTP + */ +static int +iptv_input_start_http ( iptv_mux_t *im ) +{ + /* Setup connection */ + im->mm_iptv_fd = http_connect(im->mm_iptv_url); + + return im->mm_iptv_fd <= 0 ? SM_CODE_TUNING_FAILED : 0; +} + static int iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) { @@ -142,7 +164,8 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) if (!im->mm_active) { /* HTTP */ - if (!strcmp(im->mm_iptv_url, "http")) + // TODO: this needs to happen in a thread + if (!strncmp(im->mm_iptv_url, "http", 4)) ret = iptv_input_start_http(im); /* OK */ @@ -153,6 +176,12 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) ev.data.fd = im->mm_iptv_fd; epoll_ctl(iptv_poll_fd, EPOLL_CTL_ADD, im->mm_iptv_fd, &ev); im->mm_active = mmi; + + /* Install table handlers */ + mpegts_table_add(mmi->mmi_mux, 0x0, 0xff, psi_pat_callback, NULL, "pat", + MT_QUICKREQ| MT_CRC, 0); + + // TODO: need to fire mux start event } } pthread_mutex_unlock(&iptv_lock); @@ -163,6 +192,19 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) static void iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) { + iptv_mux_t *im = (iptv_mux_t*)mmi->mmi_mux; + assert(mmi == &im->mm_iptv_instance); + + pthread_mutex_lock(&iptv_lock); + if (im->mm_active) { + + // TODO: multicast will require additional work + + close(im->mm_iptv_fd); // removes from epoll + im->mm_iptv_fd = -1; + + } + pthread_mutex_unlock(&iptv_lock); } static int @@ -180,8 +222,7 @@ iptv_input_current_weight ( mpegts_input_t *mi ) static void * iptv_input_thread ( void *aux ) { -#if 0 - int nfds, fd; + int nfds, fd, r, pos = 0; uint8_t tsb[65536]; struct epoll_event ev; iptv_mux_t *im; @@ -200,7 +241,7 @@ iptv_input_thread ( void *aux ) /* Read data */ fd = ev.data.fd; - r = read(fd, tsb+c, sizeof(tsb)-c); + r = read(fd, tsb+pos, sizeof(tsb)-pos); /* Error */ if (r < 0) { @@ -212,9 +253,10 @@ iptv_input_thread ( void *aux ) pthread_mutex_lock(&iptv_lock); /* Find mux */ - LIST_FOREACH(mm, &iptv_network.mn_muxes, mm_network_link) - if (((iptv_mux_t*)mm)->im_fd == fd) + LIST_FOREACH(mm, &iptv_network.mn_muxes, mm_network_link) { + if (((iptv_mux_t*)mm)->mm_iptv_fd == fd) break; + } if (!mm) { pthread_mutex_unlock(&iptv_lock); continue; @@ -223,14 +265,13 @@ iptv_input_thread ( void *aux ) /* Raw TS */ if (!strncmp(im->mm_iptv_url, "http", 4)) { - mpegts_input_recv_packets((mpegts_input_t*)&iptv_input, - &im->mm_mux_instance, - tsb, c, NULL, NULL); + pos = mpegts_input_recv_packets((mpegts_input_t*)&iptv_input, + &im->mm_iptv_instance, + tsb, r, NULL, NULL); } - pthread_mutex_lock(&iptv_unlock); + pthread_mutex_unlock(&iptv_lock); } -#endif return NULL; } @@ -266,6 +307,8 @@ iptv_network_create_service */ void iptv_init ( void ) { + pthread_t tid; + /* Init Input */ mpegts_input_create0((mpegts_input_t*)&iptv_input, &iptv_input_class, NULL); @@ -284,7 +327,10 @@ void iptv_init ( void ) mpegts_network_add_input((mpegts_network_t*)&iptv_network, (mpegts_input_t*)&iptv_input); - /* Setup thread */ + /* Set table thread */ + pthread_create(&tid, NULL, mpegts_input_table_thread, &iptv_input); + + /* Setup TS thread */ // TODO: could set this up only when needed iptv_poll_fd = epoll_create(10); pthread_mutex_init(&iptv_lock, NULL); diff --git a/src/input/mpegts/iptv/iptv_mux.c b/src/input/mpegts/iptv/iptv_mux.c index a9d5f60d..839b1798 100644 --- a/src/input/mpegts/iptv/iptv_mux.c +++ b/src/input/mpegts/iptv/iptv_mux.c @@ -51,10 +51,9 @@ iptv_mux_create ( const char *uuid, const char *url ) iptv_mux_t *im = mpegts_mux_create(iptv_mux, NULL, (mpegts_network_t*)&iptv_network, - MM_ONID_NONE, MM_TSID_NONE); + MPEGTS_ONID_NONE, MPEGTS_TSID_NONE); if (url) im->mm_iptv_url = strdup(url); - htsbuf_queue_init(&im->mm_iptv_spill, 65536); /* Create Instance */ mpegts_mux_instance_create0(&im->mm_iptv_instance, @@ -78,7 +77,7 @@ iptv_mux_load_one ( iptv_mux_t *im, htsmsg_t *c ) mpegts_mux_load_one((mpegts_mux_t*)im, c); /* URL */ - if ((str = htsmsg_get_str(c, "url"))) + if ((str = htsmsg_get_str(c, "iptv_url"))) tvh_str_update(&im->mm_iptv_url, str); } @@ -91,10 +90,13 @@ iptv_mux_load_all ( void ) if ((s = hts_settings_load_r(1, "input/mpegts/iptv/muxes"))) { HTSMSG_FOREACH(f, s) { - if (!(m = htsmsg_get_map_by_field(f))) { + if (!(m = htsmsg_get_map_by_field(f)) || !(m = htsmsg_get_map(m, "config"))) { tvhlog(LOG_ERR, "iptv", "failed to load mux config %s", f->hmf_name); continue; } + printf("UUID: %s\n", f->hmf_name); + printf("CONFIG:\n"); + htsmsg_print(m); /* Create */ im = iptv_mux_create(f->hmf_name, NULL); diff --git a/src/input/mpegts/iptv/iptv_private.h b/src/input/mpegts/iptv/iptv_private.h index 0ac7916b..355ff970 100644 --- a/src/input/mpegts/iptv/iptv_private.h +++ b/src/input/mpegts/iptv/iptv_private.h @@ -44,7 +44,6 @@ struct iptv_mux mpegts_mux_t; int mm_iptv_fd; - htsbuf_queue_t mm_iptv_spill; mpegts_mux_instance_t mm_iptv_instance; char *mm_iptv_url; }; diff --git a/src/input/mpegts/mpegts_mux.c b/src/input/mpegts/mpegts_mux.c index 4cfe3d20..400c4380 100644 --- a/src/input/mpegts/mpegts_mux.c +++ b/src/input/mpegts/mpegts_mux.c @@ -168,6 +168,7 @@ mpegts_mux_start ( mpegts_mux_t *mm, const char *reason, int weight ) /* Tune */ if (!mmi->mmi_input->mi_start_mux(mmi->mmi_input, mmi)) break; + tvhtrace("mpegts", "failed to run mmi %p", mmi); } /* Initial scanning */ @@ -219,7 +220,7 @@ mpegts_mux_load_one ( mpegts_mux_t *mm, htsmsg_t *c ) mpegts_mux_t * mpegts_mux_create0 ( mpegts_mux_t *mm, const idclass_t *class, const char *uuid, - mpegts_network_t *net, uint16_t onid, uint16_t tsid ) + mpegts_network_t *mn, uint16_t onid, uint16_t tsid ) { idnode_insert(&mm->mm_id, uuid, class); @@ -228,7 +229,8 @@ mpegts_mux_create0 mm->mm_tsid = tsid; /* Add to network */ - mm->mm_network = net; + LIST_INSERT_HEAD(&mn->mn_muxes, mm, mm_network_link); + mm->mm_network = mn; mm->mm_start = mpegts_mux_start; mpegts_mux_initial_scan_link(mm); diff --git a/src/main.c b/src/main.c index 93f9a18c..5c002044 100644 --- a/src/main.c +++ b/src/main.c @@ -698,8 +698,6 @@ main(int argc, char **argv) access_init(opt_firstrun, opt_noacl); - input_init(); - #if ENABLE_TIMESHIFT timeshift_init(); #endif @@ -726,6 +724,9 @@ main(int argc, char **argv) tsfile_add_file(opt_tsfile.str[i]); } #endif +#if ENABLE_IPTV + iptv_init(); +#endif if(opt_subscribe != NULL) subscription_dummy_join(opt_subscribe, 1); diff --git a/src/service.c b/src/service.c index a38d2b06..514b679e 100644 --- a/src/service.c +++ b/src/service.c @@ -916,10 +916,8 @@ service_build_stream_start(service_t *t) t->s_setsourceinfo(t, &ss->ss_si); ss->ss_refcount = 1; -#ifdef MOVE_TO_MPEGTS ss->ss_pcr_pid = t->s_pcr_pid; ss->ss_pmt_pid = t->s_pmt_pid; -#endif return ss; } diff --git a/src/service.h b/src/service.h index 3bfc0398..9958bdc4 100644 --- a/src/service.h +++ b/src/service.h @@ -228,22 +228,27 @@ typedef struct service { S_OTHER, } s_source_type; - - /** +// TODO: should this really be here? + + /** + * PID carrying the programs PCR. + * XXX: We don't support transports that does not carry + * the PCR in one of the content streams. + */ + uint16_t s_pcr_pid; + + /** + * PID for the PMT of this MPEG-TS stream. + */ + uint16_t s_pmt_pid; + + /** * Set if transport is enabled (the default). If disabled it should * not be considered when chasing for available transports during * subscription scheduling. */ int s_enabled; -#ifdef MOVE_TO_RAWTS - /** - * Last PCR seen, we use it for a simple clock for rawtsinput.c - */ - int64_t s_pcr_last; - int64_t s_pcr_last_realtime; -#endif - LIST_ENTRY(service) s_active_link; LIST_HEAD(, th_subscription) s_subscriptions;