From dade35a931b5e6f8d9eccf1c73401baf1e7bcc92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Wed, 18 Nov 2009 20:28:35 +0000 Subject: [PATCH] Initial work on TS pass-through --- src/dvr/dvr_rec.c | 5 ++++- src/htsp.c | 7 +++++-- src/parsers.c | 22 ++++++---------------- src/psi.c | 41 ++++++++++++++--------------------------- src/psi.h | 3 ++- src/rtsp.c | 29 +++++++++++++++++++++-------- src/serviceprobe.c | 2 +- src/streaming.c | 43 +++++++++++++++++++++++++++++++++---------- src/streaming.h | 6 +++++- src/subscriptions.c | 29 +++++++++++++++++++---------- src/subscriptions.h | 12 ++++++++---- src/transports.c | 23 ++++++++++++++++++++--- src/transports.h | 2 ++ src/tsdemux.c | 23 +++++++++++++++++++++-- src/tvhead.h | 5 +++++ 15 files changed, 166 insertions(+), 86 deletions(-) diff --git a/src/dvr/dvr_rec.c b/src/dvr/dvr_rec.c index 51de470b..b2a0bf3d 100644 --- a/src/dvr/dvr_rec.c +++ b/src/dvr/dvr_rec.c @@ -69,7 +69,7 @@ dvr_rec_subscribe(dvr_entry_t *de) pthread_create(&de->de_thread, NULL, dvr_thread, de); de->de_s = subscription_create_from_channel(de->de_channel, 1000, buf, - &de->de_sq.sq_st); + &de->de_sq.sq_st, 0); } /** @@ -443,6 +443,9 @@ dvr_thread(void *aux) "No source transport available, automatic retry"); break; + case SMT_MPEGTS: + break; + case SMT_EXIT: run = 0; break; diff --git a/src/htsp.c b/src/htsp.c index 08ae243e..134cea91 100644 --- a/src/htsp.c +++ b/src/htsp.c @@ -644,10 +644,10 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) hs->hs_sid = sid; LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link); - streaming_target_init(&hs->hs_input, htsp_streaming_input, hs); + streaming_target_init(&hs->hs_input, htsp_streaming_input, hs, 0); hs->hs_s = subscription_create_from_channel(ch, 500, htsp->htsp_logname, - &hs->hs_input); + &hs->hs_input, 0); return NULL; } @@ -1410,6 +1410,9 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm) htsp_subscription_status(hs, "No available sources"); break; + case SMT_MPEGTS: + break; + case SMT_EXIT: abort(); } diff --git a/src/parsers.c b/src/parsers.c index 6097b099..3b0ff017 100644 --- a/src/parsers.c +++ b/src/parsers.c @@ -114,20 +114,6 @@ void parse_mpeg_ts(th_transport_t *t, th_stream_t *st, uint8_t *data, int len, int start, int err) { - th_subscription_t *s; - - if(LIST_FIRST(&t->tht_streaming_pad.sp_targets) == NULL) { - /* No muxers. However, subscriptions may force demultiplex - for other reasons (serviceprobe does this) */ - LIST_FOREACH(s, &t->tht_subscriptions, ths_transport_link) - if(s->ths_force_demux) - break; - - if(s == NULL) - return; /* No-one is interested, so drop here */ - } - - switch(st->st_type) { case SCT_MPEG2VIDEO: parse_video(t, st, data, len, parse_mpeg2video); @@ -1048,11 +1034,15 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) /* Forward packet */ pkt->pkt_componentindex = st->st_index; - streaming_pad_deliver(&t->tht_streaming_pad, - streaming_msg_create_pkt(pkt)); + + streaming_message_t *sm = streaming_msg_create_pkt(pkt); + + streaming_pad_deliver(&t->tht_streaming_pad, sm); + streaming_msg_free(sm); /* Decrease our own reference to the packet */ pkt_ref_dec(pkt); + } /** diff --git a/src/psi.c b/src/psi.c index 7ec9a2d3..46b99824 100644 --- a/src/psi.c +++ b/src/psi.c @@ -453,21 +453,14 @@ psi_parse_pmt(th_transport_t *t, const uint8_t *ptr, int len, int chksvcid, } - - - /** - * PAT generator + * PMT generator */ -#if 0 int -psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid) +psi_build_pmt(streaming_start_t *ss, uint8_t *buf0, int maxlen, int pcrpid) { - th_stream_t *st; - th_muxstream_t *tms; - int c, tlen, dlen, l; + int c, tlen, dlen, l, i, pid; uint8_t *buf, *buf1; - buf = buf0; @@ -492,13 +485,12 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid) buf += 12; tlen = 12; - LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) { - st = tms->tms_stream; + for(i = 0; i < ss->ss_num_components; i++) { + streaming_start_component_t *ssc = &ss->ss_components[i]; - if(tms->tms_index == 0) - continue; + pid = 200 + i; - switch(st->st_type) { + switch(ssc->ssc_type) { case SCT_MPEG2VIDEO: c = 0x02; break; @@ -521,15 +513,15 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid) buf[0] = c; - buf[1] = 0xe0 | (tms->tms_index >> 8); - buf[2] = tms->tms_index; + buf[1] = 0xe0 | (pid >> 8); + buf[2] = pid; buf1 = &buf[3]; tlen += 5; buf += 5; dlen = 0; - switch(st->st_type) { + switch(ssc->ssc_type) { case SCT_AC3: buf[0] = DVB_DESC_AC3; buf[1] = 1; @@ -554,10 +546,6 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid) return psi_append_crc32(buf0, tlen, maxlen); } -#endif - - - /* * CRC32 @@ -688,14 +676,13 @@ static struct strtab streamtypetab[] = { { "PMT", SCT_PMT }, { "PAT", SCT_PAT }, { "AAC", SCT_AAC }, + { "MPEGTS", SCT_MPEGTS }, }; - - - - - +/** + * + */ const char * streaming_component_type2txt(streaming_component_type_t s) { diff --git a/src/psi.h b/src/psi.h index 9ad64c15..6cb66f50 100644 --- a/src/psi.h +++ b/src/psi.h @@ -20,6 +20,7 @@ #define PSI_H_ #include "htsmsg.h" +#include "streaming.h" #define PSI_SECTION_SIZE 4096 @@ -42,7 +43,7 @@ uint32_t psi_crc32(uint8_t *data, size_t datalen); int psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen, int pmtpid); -//int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid); +int psi_build_pmt(streaming_start_t *ss, uint8_t *buf, int maxlen, int pcrpid); const char *psi_caid2name(uint16_t caid); diff --git a/src/rtsp.c b/src/rtsp.c index cb3ff70a..e5d06932 100644 --- a/src/rtsp.c +++ b/src/rtsp.c @@ -51,6 +51,8 @@ typedef struct rtsp_resource { RTSP_RESOURCE_SERVICE } rr_type; + int rr_mpegts; + union { struct channel *rr_channel; struct th_transport *rr_service; @@ -464,6 +466,9 @@ rtsp_streaming_input(void *opaque, streaming_message_t *sm) case SMT_NOSOURCE: break; + case SMT_MPEGTS: + break; + default: abort(); } @@ -496,16 +501,18 @@ rtsp_subscribe(rtsp_t *rtsp, rtsp_resource_t *rr) - streaming_target_init(&rtsp->rtsp_input, rtsp_streaming_input, rtsp); + streaming_target_init(&rtsp->rtsp_input, rtsp_streaming_input, rtsp, 0); + + int flags = rr->rr_mpegts ? SUBSCRIPTION_RAW_MPEGTS : 0; switch(rr->rr_type) { case RTSP_RESOURCE_CHANNEL: s = subscription_create_from_channel(rr->rr_channel, 500, "RTSP", - &rtsp->rtsp_input); + &rtsp->rtsp_input, flags); break; case RTSP_RESOURCE_SERVICE: s = subscription_create_from_transport(rr->rr_service, "RTSP", - &rtsp->rtsp_input); + &rtsp->rtsp_input, flags); break; } @@ -517,11 +524,12 @@ rtsp_subscribe(rtsp_t *rtsp, rtsp_resource_t *rr) * Resolve an URL into a resource */ static int -rtsp_resolve_url(char *url, rtsp_resource_t *rr, char **remainp) +rtsp_resolve_url(http_connection_t *hc, rtsp_resource_t *rr, char **remainp) { char *components[5]; void *r; int nc; + char *url = hc->hc_url; if(!strncasecmp(url, "rtsp://", strlen("rtsp://"))) { url += strlen("rtsp://"); @@ -537,7 +545,7 @@ rtsp_resolve_url(char *url, rtsp_resource_t *rr, char **remainp) return -1; http_deescape(components[1]); - + if(!strcmp(components[0], "channel")) { rr->rr_type = RTSP_RESOURCE_CHANNEL; r = channel_find_by_name(components[1], 0); @@ -582,7 +590,7 @@ rtsp_cmd_options(http_connection_t *hc) pthread_mutex_lock(&global_lock); - if(strcmp(hc->hc_url, "*") && rtsp_resolve_url(hc->hc_url, &rr, NULL)) { + if(strcmp(hc->hc_url, "*") && rtsp_resolve_url(hc, &rr, NULL)) { rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve"); pthread_mutex_unlock(&global_lock); return 0; @@ -635,7 +643,7 @@ rtsp_cmd_describe(http_connection_t *hc, rtsp_t *rtsp) pthread_mutex_lock(&global_lock); - if(rtsp_resolve_url(hc->hc_url, &rr, NULL)) { + if(rtsp_resolve_url(hc, &rr, NULL)) { rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve"); pthread_mutex_unlock(&global_lock); return 0; @@ -865,6 +873,11 @@ rtsp_setup_udp(http_connection_t *hc, rtsp_t *rtsp, LIST_INSERT_HEAD(&rtsp->rtsp_streams, rs, rs_link); + tvhlog(LOG_DEBUG, "RTSP", "%s: %s -- Stream %s to UDP port %d", + inet_ntoa(hc->hc_peer->sin_addr), hc->hc_url_orig, + streaming_component_type2txt(ssc->ssc_type), + client_ports[0]); + rtsp_printf(hc, "RTSP/1.0 200 OK\r\n" "Session: %s\r\n" @@ -903,7 +916,7 @@ rtsp_cmd_setup(http_connection_t *hc, rtsp_t *rtsp) pthread_mutex_lock(&global_lock); - if(rtsp_resolve_url(hc->hc_url, &rr, &remain)) { + if(rtsp_resolve_url(hc, &rr, &remain)) { rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve"); pthread_mutex_unlock(&global_lock); return 0; diff --git a/src/serviceprobe.c b/src/serviceprobe.c index 5a4d4bf4..ec5e6fd2 100644 --- a/src/serviceprobe.c +++ b/src/serviceprobe.c @@ -112,7 +112,7 @@ serviceprobe_thread(void *aux) tvhlog(LOG_INFO, "serviceprobe", "%20s: checking...", t->tht_svcname); - s = subscription_create_from_transport(t, "serviceprobe", &sq.sq_st); + s = subscription_create_from_transport(t, "serviceprobe", &sq.sq_st, 0); transport_ref(t); pthread_mutex_unlock(&global_lock); diff --git a/src/streaming.c b/src/streaming.c index 1c1c1f0f..b58e21e0 100644 --- a/src/streaming.c +++ b/src/streaming.c @@ -34,10 +34,12 @@ streaming_pad_init(streaming_pad_t *sp) * */ void -streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque) +streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque, + int reject_filter) { st->st_cb = cb; st->st_opaque = opaque; + st->st_reject_filter = reject_filter; } @@ -62,7 +64,7 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm) void streaming_queue_init(streaming_queue_t *sq) { - streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq); + streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, 0); pthread_mutex_init(&sq->sq_mutex, NULL); pthread_cond_init(&sq->sq_cond, NULL); @@ -176,6 +178,11 @@ streaming_msg_clone(streaming_message_t *src) case SMT_EXIT: break; + case SMT_MPEGTS: + dst->sm_data = malloc(188); + memcpy(dst->sm_data, src->sm_data, 188); + break; + default: abort(); } @@ -225,6 +232,10 @@ streaming_msg_free(streaming_message_t *sm) case SMT_NOSOURCE: break; + case SMT_MPEGTS: + free(sm->sm_data); + break; + default: abort(); } @@ -240,17 +251,29 @@ streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm) { streaming_target_t *st, *next; - if(sp->sp_ntargets == 0) - return; + for(st = LIST_FIRST(&sp->sp_targets);st; st = next) { - for(st = LIST_FIRST(&sp->sp_targets);; st = next) { - - if((next = LIST_NEXT(st, st_link)) == NULL) - break; - + next = LIST_NEXT(st, st_link); + if(st->st_reject_filter & SMT_TO_MASK(sm->sm_type)) + continue; st->st_cb(st->st_opaque, streaming_msg_clone(sm)); } - st->st_cb(st->st_opaque, sm); +} + + +/** + * + */ +int +streaming_pad_probe_type(streaming_pad_t *sp, streaming_message_type_t smt) +{ + streaming_target_t *st; + + LIST_FOREACH(st, &sp->sp_targets, st_link) { + if(!(st->st_reject_filter & SMT_TO_MASK(smt))) + return 1; + } + return 0; } diff --git a/src/streaming.h b/src/streaming.h index af8e58c6..7a6010ca 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -51,7 +51,8 @@ typedef struct streaming_start { void streaming_pad_init(streaming_pad_t *sp); void streaming_target_init(streaming_target_t *st, - st_callback_t *cb, void *opaque); + st_callback_t *cb, void *opaque, + int reject_filter); void streaming_queue_init(streaming_queue_t *sq); @@ -80,5 +81,8 @@ streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt); #define streaming_target_deliver(st, sm) ((st)->st_cb((st)->st_opaque, (sm))) void streaming_start_unref(streaming_start_t *ss); + +int streaming_pad_probe_type(streaming_pad_t *sp, + streaming_message_type_t smt); #endif /* STREAMING_H_ */ diff --git a/src/subscriptions.c b/src/subscriptions.c index dc66d4e3..4545ba3f 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -194,8 +194,7 @@ static void subscription_input(void *opauqe, streaming_message_t *sm) { th_subscription_t *s = opauqe; - - streaming_target_deliver(s->ths_output, sm); + streaming_target_deliver(s->ths_output, sm); } @@ -204,16 +203,25 @@ subscription_input(void *opauqe, streaming_message_t *sm) * */ static th_subscription_t * -subscription_create(int weight, const char *name, streaming_target_t *st) +subscription_create(int weight, const char *name, streaming_target_t *st, + int flags) { th_subscription_t *s = calloc(1, sizeof(th_subscription_t)); + int reject = 0; - streaming_target_init(&s->ths_input, subscription_input, s); + if(flags & SUBSCRIPTION_RAW_MPEGTS) + reject |= SMT_TO_MASK(SMT_PACKET); // Reject parsed frames + else + reject |= SMT_TO_MASK(SMT_MPEGTS); // Reject raw mpegts + + + streaming_target_init(&s->ths_input, subscription_input, s, reject); s->ths_weight = weight; s->ths_title = strdup(name); s->ths_total_err = 0; s->ths_output = st; + s->ths_flags = flags; time(&s->ths_start); LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort); @@ -227,9 +235,10 @@ subscription_create(int weight, const char *name, streaming_target_t *st) */ th_subscription_t * subscription_create_from_channel(channel_t *ch, unsigned int weight, - const char *name, streaming_target_t *st) + const char *name, streaming_target_t *st, + int flags) { - th_subscription_t *s = subscription_create(weight, name, st); + th_subscription_t *s = subscription_create(weight, name, st, flags); s->ths_channel = ch; LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link); @@ -270,9 +279,9 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight, */ th_subscription_t * subscription_create_from_transport(th_transport_t *t, const char *name, - streaming_target_t *st) + streaming_target_t *st, int flags) { - th_subscription_t *s = subscription_create(INT32_MAX, name, st); + th_subscription_t *s = subscription_create(INT32_MAX, name, st, flags); if(t->tht_status != TRANSPORT_RUNNING) transport_start(t, INT32_MAX, 1); @@ -336,8 +345,8 @@ subscription_dummy_join(const char *id) } st = calloc(1, sizeof(streaming_target_t)); - streaming_target_init(st, dummy_callback, NULL); - subscription_create_from_transport(t, "dummy", st); + streaming_target_init(st, dummy_callback, NULL, 0); + subscription_create_from_transport(t, "dummy", st, 0); tvhlog(LOG_NOTICE, "subscription", "Dummy join %s ok", id); diff --git a/src/subscriptions.h b/src/subscriptions.h index 6711fde4..6a9bc615 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -19,6 +19,8 @@ #ifndef SUBSCRIPTIONS_H #define SUBSCRIPTIONS_H +#define SUBSCRIPTION_RAW_MPEGTS 0x1 + typedef struct th_subscription { LIST_ENTRY(th_subscription) ths_global_link; int ths_weight; @@ -36,12 +38,12 @@ typedef struct th_subscription { time_t ths_start; /* time when subscription started */ int ths_total_err; /* total errors during entire subscription */ - int ths_force_demux; - streaming_target_t ths_input; streaming_target_t *ths_output; + int ths_flags; + } th_subscription_t; @@ -55,12 +57,14 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight); th_subscription_t *subscription_create_from_channel(channel_t *ch, unsigned int weight, const char *name, - streaming_target_t *st); + streaming_target_t *st, + int flags); th_subscription_t *subscription_create_from_transport(th_transport_t *t, const char *name, - streaming_target_t *st); + streaming_target_t *st, + int flags); void subscription_stop(th_subscription_t *s); diff --git a/src/transports.c b/src/transports.c index 14a43347..59b96a30 100644 --- a/src/transports.c +++ b/src/transports.c @@ -669,9 +669,10 @@ transport_set_feed_status(th_transport_t *t, transport_feed_status_t newstatus) t->tht_feed_status = newstatus; - streaming_pad_deliver(&t->tht_streaming_pad, - streaming_msg_create_code(SMT_TRANSPORT_STATUS, - newstatus)); + streaming_message_t *sm = streaming_msg_create_code(SMT_TRANSPORT_STATUS, + newstatus); + streaming_pad_deliver(&t->tht_streaming_pad, sm); + streaming_msg_free(sm); } @@ -689,6 +690,7 @@ transport_restart(th_transport_t *t, int had_components) if(had_components) { sm = streaming_msg_create_code(SMT_STOP, 0); streaming_pad_deliver(&t->tht_streaming_pad, sm); + streaming_msg_free(sm); } t->tht_refresh_feed(t); @@ -698,6 +700,7 @@ transport_restart(th_transport_t *t, int had_components) sm = streaming_msg_create_data(SMT_START, transport_build_stream_start(t)); streaming_pad_deliver(&t->tht_streaming_pad, sm); + streaming_msg_free(sm); } } @@ -872,3 +875,17 @@ transport_source_info_free(struct source_info *si) free(si->si_provider); free(si->si_service); } + + +void +transport_source_info_copy(source_info_t *dst, source_info_t *src) +{ +#define COPY(x) dst->si_##x = src->si_##x ? strdup(src->si_##x) : NULL + COPY(device); + COPY(adapter); + COPY(network); + COPY(mux); + COPY(provider); + COPY(service); +#undef COPY +} diff --git a/src/transports.h b/src/transports.h index 76c1bb4e..31987411 100644 --- a/src/transports.h +++ b/src/transports.h @@ -89,4 +89,6 @@ void transport_request_save(th_transport_t *t); void transport_source_info_free(source_info_t *si); +void transport_source_info_copy(source_info_t *dst, source_info_t *src); + #endif /* TRANSPORTS_H */ diff --git a/src/tsdemux.c b/src/tsdemux.c index 8f65b7bb..12d32674 100644 --- a/src/tsdemux.c +++ b/src/tsdemux.c @@ -42,12 +42,13 @@ #include "psi.h" #include "tsdemux.h" #include "parsers.h" +#include "streaming.h" +static void ts_remux(th_transport_t *t, const uint8_t *tsb); /** * Code for dealing with a complete section */ - static void got_section(th_transport_t *t, th_stream_t *st) { @@ -64,7 +65,6 @@ got_section(th_transport_t *t, th_stream_t *st) } - /** * Continue processing of transport stream packets */ @@ -73,6 +73,9 @@ ts_recv_packet0(th_transport_t *t, th_stream_t *st, uint8_t *tsb) { int off, len, pusi, cc, err = 0; + if(streaming_pad_probe_type(&t->tht_streaming_pad, SMT_MPEGTS)) + ts_remux(t, tsb); + /* Check CC */ if(tsb[3] & 0x10) { @@ -262,3 +265,19 @@ ts_recv_packet2(th_transport_t *t, uint8_t *tsb) if((st = transport_find_stream_by_pid(t, pid)) != NULL) ts_recv_packet0(t, st, tsb); } + + +/** + * + */ +static void +ts_remux(th_transport_t *t, const uint8_t *src) +{ + uint8_t tsb[188]; + memcpy(tsb, src, 188); + + streaming_message_t sm; + sm.sm_type = SMT_MPEGTS; + sm.sm_data = tsb; + streaming_pad_deliver(&t->tht_streaming_pad, &sm); +} diff --git a/src/tvhead.h b/src/tvhead.h index 6e8138e8..054af314 100644 --- a/src/tvhead.h +++ b/src/tvhead.h @@ -130,6 +130,7 @@ typedef enum { SCT_PAT, SCT_PMT, SCT_AAC, + SCT_MPEGTS, } streaming_component_type_t; @@ -164,8 +165,11 @@ typedef enum { SMT_TRANSPORT_STATUS, // sm_code is TRANSPORT_STATUS_ SMT_EXIT, // Used to signal exit to threads SMT_NOSOURCE, + SMT_MPEGTS, // sm_data is raw MPEG TS } streaming_message_type_t; +#define SMT_TO_MASK(x) (1 << ((unsigned int)x)) + /** * Streaming messages are sent from the pad to its receivers @@ -191,6 +195,7 @@ typedef struct streaming_target { st_callback_t *st_cb; void *st_opaque; + int st_reject_filter; } streaming_target_t;