From 2dfc2710b77dc3c17d8ba0480991f54b09dc0850 Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Thu, 5 Mar 2015 15:13:57 +0100 Subject: [PATCH] service/mpegts: add raw service type and raw PID handling --- src/dvr/dvr_rec.c | 2 +- src/htsp_server.c | 2 +- src/input/mpegts.h | 30 +++- src/input/mpegts/iptv/iptv.c | 7 - src/input/mpegts/mpegts_input.c | 241 ++++++++++++++++++---------- src/input/mpegts/mpegts_mux.c | 108 ++++++------- src/input/mpegts/mpegts_mux_sched.c | 2 +- src/input/mpegts/mpegts_pid.c | 7 + src/input/mpegts/mpegts_service.c | 134 +++++++++++++++- src/input/mpegts/tsdemux.c | 22 +++ src/input/mpegts/tsdemux.h | 2 + src/satip/rtsp.c | 8 +- src/service.c | 43 ++++- src/service.h | 32 +++- src/service_mapper.c | 2 +- src/subscriptions.c | 226 ++++++-------------------- src/subscriptions.h | 34 ++-- src/tvheadend.h | 3 +- src/webui/webui.c | 49 ++++-- 19 files changed, 567 insertions(+), 387 deletions(-) diff --git a/src/dvr/dvr_rec.c b/src/dvr/dvr_rec.c index 54ad9080..832a78a2 100644 --- a/src/dvr/dvr_rec.c +++ b/src/dvr/dvr_rec.c @@ -85,7 +85,7 @@ dvr_rec_subscribe(dvr_entry_t *de) return; } - de->de_s = subscription_create_from_channel(prch, weight, + de->de_s = subscription_create_from_channel(prch, NULL, weight, buf, prch->prch_flags, NULL, NULL, NULL); if (de->de_s == NULL) { diff --git a/src/htsp_server.c b/src/htsp_server.c index a4ce59fd..e19cb2dc 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -2023,7 +2023,7 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) tvhdebug("htsp", "%s - subscribe to %s using profile %s\n", htsp->htsp_logname, channel_get_name(ch), pro->pro_name ?: ""); - hs->hs_s = subscription_create_from_channel(&hs->hs_prch, weight, + hs->hs_s = subscription_create_from_channel(&hs->hs_prch, NULL, weight, htsp->htsp_logname, SUBSCRIPTION_STREAMING, htsp->htsp_peername, diff --git a/src/input/mpegts.h b/src/input/mpegts.h index 59e762e5..c6e9325e 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -146,12 +146,14 @@ typedef struct mpegts_table_state typedef struct mpegts_pid_sub { RB_ENTRY(mpegts_pid_sub) mps_link; - LIST_ENTRY(mpegts_pid_sub) mps_svc_link; -#define MPS_NONE 0x0 -#define MPS_STREAM 0x1 -#define MPS_TABLE 0x2 -#define MPS_FTABLE 0x4 -#define MPS_SERVICE 0x8 + LIST_ENTRY(mpegts_pid_sub) mps_svcraw_link; +#define MPS_NONE 0x00 +#define MPS_ALL 0x01 +#define MPS_RAW 0x02 +#define MPS_STREAM 0x04 +#define MPS_SERVICE 0x08 +#define MPS_TABLE 0x10 +#define MPS_FTABLE 0x20 int mps_type; void *mps_owner; } mpegts_pid_sub_t; @@ -429,11 +431,18 @@ struct mpegts_mux LIST_HEAD(, mpegts_mux_instance) mm_instances; mpegts_mux_instance_t *mm_active; + /* + * Raw subscriptions + */ + + LIST_HEAD(, th_subscription) mm_raw_subs; + /* * Data processing */ RB_HEAD(, mpegts_pid) mm_pids; + LIST_HEAD(, mpegts_pid_sub) mm_all_subs; int mm_last_pid; mpegts_pid_t *mm_last_mp; @@ -578,8 +587,6 @@ struct mpegts_mux_instance mpegts_mux_t *mmi_mux; mpegts_input_t *mmi_input; - LIST_HEAD(,th_subscription) mmi_subs; - tvh_input_stream_stats_t mmi_stats; int mmi_tune_failed; @@ -745,6 +752,8 @@ int mpegts_input_class_network_set ( void *o, const void *p ); htsmsg_t *mpegts_input_class_network_enum ( void *o ); char *mpegts_input_class_network_rend ( void *o ); +int mpegts_mps_cmp( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b ); + void mpegts_network_register_builder ( const idclass_t *idc, mpegts_network_t *(*build)(const idclass_t *idc, htsmsg_t *conf) ); @@ -877,6 +886,9 @@ mpegts_pid_t * mpegts_input_open_pid void mpegts_input_close_pid ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner ); +void mpegts_input_close_pids + ( mpegts_input_t *mi, mpegts_mux_t *mm, int type, void *owner ); + static inline void tsdebug_write(mpegts_mux_t *mm, uint8_t *buf, size_t len) { @@ -943,6 +955,8 @@ mpegts_service_t *mpegts_service_create0 mpegts_service_create0(calloc(1, sizeof(mpegts_service_t)),\ &mpegts_service_class, u, m, s, p, c) +mpegts_service_t *mpegts_service_create_raw(mpegts_mux_t *mm); + mpegts_service_t *mpegts_service_find ( mpegts_mux_t *mm, uint16_t sid, uint16_t pmt_pid, int create, int *save ); diff --git a/src/input/mpegts/iptv/iptv.c b/src/input/mpegts/iptv/iptv.c index 9aac168d..b1dda026 100644 --- a/src/input/mpegts/iptv/iptv.c +++ b/src/input/mpegts/iptv/iptv.c @@ -140,13 +140,6 @@ iptv_input_get_weight ( mpegts_input_t *mi, int flags ) if (!iptv_input_is_free(mi)) { w = 1000000; - /* Direct subs */ - LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) { - LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) { - w = MIN(w, ths->ths_weight); - } - } - /* Service subs */ pthread_mutex_lock(&mi->mi_output_lock); LIST_FOREACH(s, &mi->mi_transports, s_active_link) { diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index 9849da50..89012021 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -336,19 +336,10 @@ mpegts_input_is_free ( mpegts_input_t *mi ) int mpegts_input_get_weight ( mpegts_input_t *mi, int flags ) { - const mpegts_mux_instance_t *mmi; const service_t *s; const th_subscription_t *ths; int w = 0, count = 0; - /* Direct subs */ - LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) { - LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) { - w = MAX(w, ths->ths_weight); - count++; - } - } - /* Service subs */ pthread_mutex_lock(&mi->mi_output_lock); LIST_FOREACH(s, &mi->mi_transports, s_active_link) { @@ -401,8 +392,8 @@ mpegts_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) { } -static int -mps_cmp ( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b ) +int +mpegts_mps_cmp ( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b ) { if (a->mps_type != b->mps_type) { if (a->mps_type & MPS_SERVICE) @@ -415,25 +406,65 @@ mps_cmp ( mpegts_pid_sub_t *a, mpegts_pid_sub_t *b ) return 0; } +void +mpegts_input_close_pids + ( mpegts_input_t *mi, mpegts_mux_t *mm, int type, void *owner ) +{ + mpegts_pid_t *mp, *mp_next; + mpegts_pid_sub_t *mps, *mps_next; + + for (mp = RB_FIRST(&mm->mm_pids); mp; mp = mp_next) { + mp_next = RB_NEXT(mp, mp_link); + if ((mp->mp_type & MPS_RAW) == 0) continue; + for (mps = RB_FIRST(&mp->mp_subs); mps; mps = mps_next) { + mps_next = RB_NEXT(mps, mps_link); + if (mps->mps_owner != owner) continue; + LIST_REMOVE(mps, mps_svcraw_link); + RB_REMOVE(&mp->mp_subs, mps, mps_link); + free(mps); + if (!RB_FIRST(&mp->mp_subs)) { + RB_REMOVE(&mm->mm_pids, mp, mp_link); + free(mp); + } + } + } +} + mpegts_pid_t * mpegts_input_open_pid ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner ) { char buf[512]; mpegts_pid_t *mp; - mpegts_pid_sub_t *mps; + mpegts_pid_sub_t *mps, *mps2; assert(owner != NULL); - assert((type & (MPS_STREAM|MPS_SERVICE)) == 0 || - ((type & MPS_STREAM) ? 1 : 0) != ((type & MPS_SERVICE) ? 1 : 0)); + assert((type & (MPS_STREAM|MPS_SERVICE|MPS_RAW)) == 0 || + (((type & MPS_STREAM) ? 1 : 0) + + ((type & MPS_SERVICE) ? 1 : 0) + + ((type & MPS_RAW) ? 1 : 0)) == 1); lock_assert(&mi->mi_output_lock); + + if (pid == MPEGTS_FULLMUX_PID) + mpegts_input_close_pids(mi, mm, MPS_RAW, owner); + if ((mp = mpegts_mux_find_pid(mm, pid, 1))) { mps = calloc(1, sizeof(*mps)); mps->mps_type = type; mps->mps_owner = owner; - if (!RB_INSERT_SORTED(&mp->mp_subs, mps, mps_link, mps_cmp)) { + if (pid == MPEGTS_FULLMUX_PID) { + mp->mp_type |= type | MPS_ALL; + LIST_FOREACH(mps2, &mm->mm_all_subs, mps_svcraw_link) + if (mps2->mps_owner == owner) break; + if (mps2 == NULL) { + LIST_INSERT_HEAD(&mm->mm_all_subs, mps, mps_svcraw_link); + mpegts_mux_nice_name(mm, buf, sizeof(buf)); + tvhdebug("mpegts", "%s - open PID fullmux subscription [%d/%p]", + buf, type, owner); + } + } else if (!RB_INSERT_SORTED(&mp->mp_subs, mps, mps_link, mpegts_mps_cmp)) { mp->mp_type |= type; - if (type & MPS_SERVICE) - LIST_INSERT_HEAD(&mp->mp_svc_subs, mps, mps_svc_link); + if (type & (MPS_SERVICE|MPS_RAW)) + LIST_INSERT_HEAD(&mp->mp_svc_subs, mps, mps_svcraw_link); mpegts_mux_nice_name(mm, buf, sizeof(buf)); tvhdebug("mpegts", "%s - open PID %04X (%d) [%d/%p]", buf, mp->mp_pid, mp->mp_pid, type, owner); @@ -456,32 +487,44 @@ mpegts_input_close_pid lock_assert(&mi->mi_output_lock); if (!(mp = mpegts_mux_find_pid(mm, pid, 0))) return; - skel.mps_type = type; - skel.mps_owner = owner; - mps = RB_FIND(&mp->mp_subs, &skel, mps_link, mps_cmp); - if (pid == mm->mm_last_pid) { - mm->mm_last_pid = -1; - mm->mm_last_mp = NULL; - } - if (mps) { + if (pid == MPEGTS_FULLMUX_PID) { mpegts_mux_nice_name(mm, buf, sizeof(buf)); - tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]", - buf, mp->mp_pid, mp->mp_pid, type, owner); - if (type & MPS_SERVICE) - LIST_REMOVE(mps, mps_svc_link); - RB_REMOVE(&mp->mp_subs, mps, mps_link); + tvhdebug("mpegts", "%s - close PID fullmux subscription [%d/%p]", + buf, type, owner); + mpegts_input_close_pids(mi, mm, MPS_RAW, owner); + LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link) + if (mps->mps_owner == owner) break; + if (mps == NULL) return; + LIST_REMOVE(mps, mps_svcraw_link); free(mps); - if (!RB_FIRST(&mp->mp_subs)) { - RB_REMOVE(&mm->mm_pids, mp, mp_link); - if (mp->mp_fd != -1) - linuxdvb_filter_close(mp->mp_fd); - free(mp); - } else { - type = 0; - RB_FOREACH(mps, &mp->mp_subs, mps_link) - type |= mps->mps_type; - mp->mp_type = type; + } else { + skel.mps_type = type; + skel.mps_owner = owner; + mps = RB_FIND(&mp->mp_subs, &skel, mps_link, mpegts_mps_cmp); + if (pid == mm->mm_last_pid) { + mm->mm_last_pid = -1; + mm->mm_last_mp = NULL; } + if (mps) { + mpegts_mux_nice_name(mm, buf, sizeof(buf)); + tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]", + buf, mp->mp_pid, mp->mp_pid, type, owner); + if (type & (MPS_SERVICE|MPS_RAW)) + LIST_REMOVE(mps, mps_svcraw_link); + RB_REMOVE(&mp->mp_subs, mps, mps_link); + free(mps); + } + } + if (!RB_FIRST(&mp->mp_subs)) { + RB_REMOVE(&mm->mm_pids, mp, mp_link); + if (mp->mp_fd != -1) + linuxdvb_filter_close(mp->mp_fd); + free(mp); + } else { + type = 0; + RB_FOREACH(mps, &mp->mp_subs, mps_link) + type |= mps->mps_type; + mp->mp_type = type; } } @@ -489,6 +532,8 @@ void mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init ) { elementary_stream_t *st; + mpegts_apids_t *pids; + int i; /* Add to list */ pthread_mutex_lock(&mi->mi_output_lock); @@ -499,24 +544,37 @@ mpegts_input_open_service ( mpegts_input_t *mi, mpegts_service_t *s, int init ) /* Register PIDs */ pthread_mutex_lock(&s->s_stream_mutex); - mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s); - mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s); - /* Open only filtered components here */ - TAILQ_FOREACH(st, &s->s_filt_components, es_filt_link) { - if (st->es_type != SCT_CA) { - st->es_pid_opened = 1; - mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s); + if (s->s_type == STYPE_STD) { + mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s); + mi->mi_open_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s); + /* Open only filtered components here */ + TAILQ_FOREACH(st, &s->s_filt_components, es_filt_link) { + if (st->es_type != SCT_CA) { + st->es_pid_opened = 1; + mi->mi_open_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s); + } + } + } else { + if ((pids = s->s_pids) != NULL) { + if (pids->all) { + mi->mi_open_pid(mi, s->s_dvb_mux, MPEGTS_FULLMUX_PID, MPS_RAW, s); + } else { + for (i = 0; i < pids->count; i++) + mi->mi_open_pid(mi, s->s_dvb_mux, pids->pids[i], MPS_RAW, s); + } } } pthread_mutex_unlock(&s->s_stream_mutex); pthread_mutex_unlock(&mi->mi_output_lock); - /* Add PMT monitor */ - s->s_pmt_mon = - mpegts_table_add(s->s_dvb_mux, DVB_PMT_BASE, DVB_PMT_MASK, - dvb_pmt_callback, s, "pmt", - MT_CRC, s->s_pmt_pid); + /* Add PMT monitor */ + if(s->s_type == STYPE_STD) { + s->s_pmt_mon = + mpegts_table_add(s->s_dvb_mux, DVB_PMT_BASE, DVB_PMT_MASK, + dvb_pmt_callback, s, "pmt", + MT_CRC, s->s_pmt_pid); + } } void @@ -525,7 +583,7 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s ) elementary_stream_t *st; /* Close PMT table */ - if (s->s_pmt_mon) + if (s->s_type == STYPE_STD && s->s_pmt_mon) mpegts_table_destroy(s->s_pmt_mon); s->s_pmt_mon = NULL; @@ -538,14 +596,18 @@ mpegts_input_close_service ( mpegts_input_t *mi, mpegts_service_t *s ) /* Close PID */ pthread_mutex_lock(&s->s_stream_mutex); - mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s); - mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s); - /* Close all opened PIDs (the component filter may be changed at runtime) */ - TAILQ_FOREACH(st, &s->s_components, es_link) { - if (st->es_pid_opened) { - st->es_pid_opened = 0; - mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s); + if (s->s_type == STYPE_STD) { + mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pmt_pid, MPS_SERVICE, s); + mi->mi_close_pid(mi, s->s_dvb_mux, s->s_pcr_pid, MPS_SERVICE, s); + /* Close all opened PIDs (the component filter may be changed at runtime) */ + TAILQ_FOREACH(st, &s->s_components, es_link) { + if (st->es_pid_opened) { + st->es_pid_opened = 0; + mi->mi_close_pid(mi, s->s_dvb_mux, st->es_pid, MPS_SERVICE, s); + } } + } else { + mpegts_input_close_pids(mi, s->s_dvb_mux, MPS_RAW, s); } @@ -676,10 +738,16 @@ static int mpegts_input_has_subscription ( mpegts_input_t *mi, mpegts_mux_t *mm ) { int ret = 0; - service_t *t; + const service_t *t; + const th_subscription_t *ths; pthread_mutex_lock(&mi->mi_output_lock); LIST_FOREACH(t, &mi->mi_transports, s_active_link) { if (((mpegts_service_t*)t)->s_dvb_mux == mm) { + if (t->s_type == STYPE_RAW) { + LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link) + if (!strcmp(ths->ths_title, "keep")) break; + if (ths) continue; + } ret = 1; break; } @@ -940,7 +1008,6 @@ mpegts_input_process uint8_t *end = mpkt->mp_data + len; mpegts_mux_t *mm = mpkt->mp_mux; mpegts_mux_instance_t *mmi; - th_subscription_t *ths; #if ENABLE_TSDEBUG off_t tsdebug_pos; #endif @@ -948,9 +1015,6 @@ mpegts_input_process if (mm == NULL || (mmi = mm->mm_active) == NULL) return; - LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) - ths->ths_live = 1; - assert(mm == mmi->mmi_mux); #if ENABLE_TSDEBUG @@ -959,7 +1023,7 @@ mpegts_input_process /* Process */ assert((len % 188) == 0); - while ( tsb < end ) { + while (tsb < end) { pid = (tsb[1] << 8) | tsb[2]; cc = tsb[3]; @@ -993,10 +1057,18 @@ mpegts_input_process } type = mp->mp_type; + + /* Stream raw PIDs */ + if (type & MPS_RAW) { + LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link) + ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb); + LIST_FOREACH(mps, &mp->mp_svc_subs, mps_svcraw_link) + ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb); + } /* Stream service data */ if (type & MPS_SERVICE) { - LIST_FOREACH(mps, &mp->mp_svc_subs, mps_svc_link) { + LIST_FOREACH(mps, &mp->mp_svc_subs, mps_svcraw_link) { s = mps->mps_owner; f = (type & (MPS_TABLE|MPS_FTABLE)) || (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid); @@ -1006,6 +1078,7 @@ mpegts_input_process /* Stream table data */ if (type & MPS_STREAM) { LIST_FOREACH(s, &mi->mi_transports, s_active_link) { + if (s->s_type != STYPE_STD) continue; f = (type & (MPS_TABLE|MPS_FTABLE)) || (pid == s->s_pmt_pid) || (pid == s->s_pcr_pid); ts_recv_packet1((mpegts_service_t*)s, tsb, NULL, f); @@ -1031,6 +1104,13 @@ mpegts_input_process //tvhdebug("tsdemux", "%s - SI packet had errors", name); } } + + } else { + + /* Stream to all fullmux subscribers */ + LIST_FOREACH(mps, &mm->mm_all_subs, mps_svcraw_link) + ts_recv_raw((mpegts_service_t *)mps->mps_owner, tsb); + } done: @@ -1205,28 +1285,17 @@ mpegts_input_stream_status { int s = 0, w = 0; char buf[512]; - th_subscription_t *sub; + th_subscription_t *ths; + const service_t *t; mpegts_mux_t *mm = mmi->mmi_mux; mpegts_input_t *mi = mmi->mmi_input; - /* Get number of subs */ - // Note: this is a bit of a mess - LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link) { - s++; - w = MAX(w, sub->ths_weight); - } - // Note: due to satconf acting as proxy for input we can't always - // use mi_transports, so we can in via a convoluted route - LIST_FOREACH(sub, &subscriptions, ths_global_link) { - if (!sub->ths_service) continue; - if (!idnode_is_instance(&sub->ths_service->s_id, - &mpegts_service_class)) continue; - mpegts_service_t *ms = (mpegts_service_t*)sub->ths_service; - if (ms->s_dvb_mux == mm) { - s++; - w = MAX(w, sub->ths_weight); - } - } + LIST_FOREACH(t, &mi->mi_transports, s_active_link) + if (((mpegts_service_t *)t)->s_dvb_mux == mm) + LIST_FOREACH(ths, &t->s_subscriptions, ths_service_link) { + s++; + w = MAX(w, ths->ths_weight); + } st->uuid = strdup(idnode_uuid_as_str(&mmi->mmi_id)); mi->mi_display_name(mi, buf, sizeof(buf)); diff --git a/src/input/mpegts/mpegts_mux.c b/src/input/mpegts/mpegts_mux.c index fad436a6..59eddcc7 100644 --- a/src/input/mpegts/mpegts_mux.c +++ b/src/input/mpegts/mpegts_mux.c @@ -99,20 +99,26 @@ static int mpegts_mux_keep_exists ( mpegts_input_t *mi ) { - mpegts_mux_instance_t *mmi; - th_subscription_t *s; + const mpegts_mux_instance_t *mmi; + const service_t *s; + const th_subscription_t *ths; + int ret; + + lock_assert(&global_lock); if (!mi) return 0; - mmi = LIST_FIRST(&mi->mi_mux_active); - if (mmi) - LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) - LIST_FOREACH(s, &mmi->mmi_subs, ths_mmi_link) - if (!strcmp(s->ths_title, "keep")) - return 1; - - return 0; + ret = 0; + LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) + LIST_FOREACH(ths, &mmi->mmi_mux->mm_raw_subs, ths_mux_link) { + s = ths->ths_service; + if (s && s->s_type == STYPE_RAW && !strcmp(ths->ths_title, "keep")) { + ret = 1; + break; + } + } + return ret; } static int @@ -255,11 +261,6 @@ mpegts_mux_instance_weight ( mpegts_mux_instance_t *mmi ) mpegts_input_t *mi = mmi->mmi_input; lock_assert(&mi->mi_output_lock); - /* Direct subs */ - LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) { - w = MAX(w, ths->ths_weight); - } - /* Service subs */ LIST_FOREACH(s, &mi->mi_transports, s_active_link) { mpegts_service_t *ms = (mpegts_service_t*)s; @@ -806,15 +807,9 @@ static int mpegts_mux_has_subscribers ( mpegts_mux_t *mm, const char *name ) { mpegts_mux_instance_t *mmi = mm->mm_active; - th_subscription_t *sub; if (mmi) { - if ((sub = LIST_FIRST(&mmi->mmi_subs)) != NULL) - if (strcmp(sub->ths_title, "keep") || LIST_NEXT(sub, ths_mmi_link)) { - tvhtrace("mpegts", "%s - keeping mux (direct subscription)", name); - return 1; - } if (mmi->mmi_input->mi_has_subscription(mmi->mmi_input, mm)) { - tvhtrace("mpegts", "%s - keeping mux (service)", name); + tvhtrace("mpegts", "%s - keeping mux", name); return 1; } } @@ -827,7 +822,6 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force, int reason ) char buf[256], buf2[256], *s; mpegts_mux_instance_t *mmi = mm->mm_active, *mmi2; mpegts_input_t *mi = NULL, *mi2; - th_subscription_t *sub; mpegts_pid_t *mp; mpegts_pid_sub_t *mps; @@ -864,8 +858,6 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force, int reason ) } mi->mi_stopping_mux(mi, mmi); - LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link) - subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN); mi->mi_stop_mux(mi, mmi); mi->mi_stopped_mux(mi, mmi); @@ -885,13 +877,22 @@ mpegts_mux_stop ( mpegts_mux_t *mm, int force, int reason ) mm->mm_last_mp = NULL; while ((mp = RB_FIRST(&mm->mm_pids))) { assert(mi); - while ((mps = RB_FIRST(&mp->mp_subs))) { - tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]", buf, - mp->mp_pid, mp->mp_pid, mps->mps_type, mps->mps_owner); - RB_REMOVE(&mp->mp_subs, mps, mps_link); - if (mps->mps_type & MPS_SERVICE) - LIST_REMOVE(mps, mps_svc_link); - free(mps); + if (mp->mp_pid == MPEGTS_FULLMUX_PID) { + while ((mps = LIST_FIRST(&mm->mm_all_subs))) { + tvhdebug("mpegts", "%s - close PID fullmux subscription [%d/%p]", + buf, mps->mps_type, mps->mps_owner); + LIST_REMOVE(mps, mps_svcraw_link); + free(mps); + } + } else { + while ((mps = RB_FIRST(&mp->mp_subs))) { + tvhdebug("mpegts", "%s - close PID %04X (%d) [%d/%p]", buf, + mp->mp_pid, mp->mp_pid, mps->mps_type, mps->mps_owner); + RB_REMOVE(&mp->mp_subs, mps, mps_link); + if (mps->mps_type & (MPS_SERVICE|MPS_RAW|MPS_ALL)) + LIST_REMOVE(mps, mps_svcraw_link); + free(mps); + } } RB_REMOVE(&mm->mm_pids, mp, mp_link); if (mp->mp_fd != -1) @@ -1285,7 +1286,6 @@ mpegts_mux_remove_subscriber mpegts_mux_nice_name(mm, buf, sizeof(buf)); tvhtrace("mpegts", "%s - remove subscriber (reason %i)", buf, reason); #endif - subscription_unlink_mux(s, reason); mm->mm_stop(mm, 0, reason); } @@ -1294,7 +1294,6 @@ mpegts_mux_subscribe ( mpegts_mux_t *mm, mpegts_input_t *mi, const char *name, int weight, int flags ) { - int err = 0; profile_chain_t prch; th_subscription_t *s; memset(&prch, 0, sizeof(prch)); @@ -1302,8 +1301,8 @@ mpegts_mux_subscribe s = subscription_create_from_mux(&prch, (tvh_input_t *)mi, weight, name, SUBSCRIPTION_NONE | flags, - NULL, NULL, NULL, &err); - return s ? 0 : err; + NULL, NULL, NULL); + return s ? 0 : -EIO; } void @@ -1311,13 +1310,15 @@ mpegts_mux_unsubscribe_by_name ( mpegts_mux_t *mm, const char *name ) { mpegts_mux_instance_t *mmi; + const service_t *t; th_subscription_t *s, *n; LIST_FOREACH(mmi, &mm->mm_instances, mmi_mux_link) { - s = LIST_FIRST(&mmi->mmi_subs); + s = LIST_FIRST(&subscriptions); while (s) { - n = LIST_NEXT(s, ths_mmi_link); - if (!strcmp(s->ths_title, name)) + n = LIST_NEXT(s, ths_global_link); + t = s->ths_service; + if (t && t->s_type == STYPE_RAW && !strcmp(s->ths_title, name)) subscription_unsubscribe(s); s = n; } @@ -1328,9 +1329,7 @@ void mpegts_mux_tuning_error ( const char *mux_uuid, mpegts_mux_instance_t *mmi_match ) { mpegts_mux_t *mm; - th_subscription_t *sub; mpegts_mux_instance_t *mmi; - streaming_message_t *sm; struct timespec timeout; timeout.tv_sec = 2; @@ -1339,14 +1338,9 @@ mpegts_mux_tuning_error ( const char *mux_uuid, mpegts_mux_instance_t *mmi_match if (!pthread_mutex_timedlock(&global_lock, &timeout)) { mm = mpegts_mux_find(mux_uuid); if (mm) { - if ((mmi = mm->mm_active) != NULL && mmi == mmi_match) { - LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link) { - sm = streaming_msg_create_code(SMT_SERVICE_STATUS, TSS_TUNING); - streaming_target_deliver(sub->ths_output, sm); - } + if ((mmi = mm->mm_active) != NULL && mmi == mmi_match) if (mmi->mmi_input) mmi->mmi_input->mi_tuning_error(mmi->mmi_input, mm); - } } pthread_mutex_unlock(&global_lock); } @@ -1380,15 +1374,17 @@ mpegts_mux_find_pid_ ( mpegts_mux_t *mm, int pid, int create ) skel.mp_pid = pid; mp = RB_FIND(&mm->mm_pids, &skel, mp_link, mp_cmp); - if (mp == NULL && create) { - mp = calloc(1, sizeof(*mp)); - mp->mp_pid = pid; - if (!RB_INSERT_SORTED(&mm->mm_pids, mp, mp_link, mp_cmp)) { - mp->mp_fd = -1; - mp->mp_cc = -1; - } else { - free(mp); - mp = NULL; + if (mp == NULL) { + if (create) { + mp = calloc(1, sizeof(*mp)); + mp->mp_pid = pid; + if (!RB_INSERT_SORTED(&mm->mm_pids, mp, mp_link, mp_cmp)) { + mp->mp_fd = -1; + mp->mp_cc = -1; + } else { + free(mp); + mp = NULL; + } } } if (mp) { diff --git a/src/input/mpegts/mpegts_mux_sched.c b/src/input/mpegts/mpegts_mux_sched.c index 3794fad0..06689f14 100644 --- a/src/input/mpegts/mpegts_mux_sched.c +++ b/src/input/mpegts/mpegts_mux_sched.c @@ -214,7 +214,7 @@ mpegts_mux_sched_timer ( void *p ) = subscription_create_from_mux(mms->mms_prch, NULL, mms->mms_weight, mms->mms_creator ?: "", SUBSCRIPTION_NONE, - NULL, NULL, NULL, NULL); + NULL, NULL, NULL); /* Failed (try-again soon) */ if (!mms->mms_sub) { diff --git a/src/input/mpegts/mpegts_pid.c b/src/input/mpegts/mpegts_pid.c index ede0a80b..1d645173 100644 --- a/src/input/mpegts/mpegts_pid.c +++ b/src/input/mpegts/mpegts_pid.c @@ -159,9 +159,16 @@ mpegts_pid_compare(mpegts_apids_t *dst, mpegts_apids_t *src, { int i; + assert(dst); + assert(add); + assert(del); if (mpegts_pid_init(add, NULL, 0) || mpegts_pid_init(del, NULL, 0)) return -1; + if (src == NULL) { + mpegts_pid_copy(add, dst); + return add->count > 0; + } for (i = 0; i < src->count; i++) if (mpegts_pid_find_index(dst, src->pids[i]) < 0) mpegts_pid_add(del, src->pids[i]); diff --git a/src/input/mpegts/mpegts_service.c b/src/input/mpegts/mpegts_service.c index 49b165f2..48e52de7 100644 --- a/src/input/mpegts/mpegts_service.c +++ b/src/input/mpegts/mpegts_service.c @@ -236,7 +236,8 @@ mpegts_service_config_save ( service_t *t ) * Service instance list */ static void -mpegts_service_enlist(service_t *t, struct service_instance_list *sil, int flags) +mpegts_service_enlist(service_t *t, tvh_input_t *ti, + struct service_instance_list *sil, int flags) { int p = 0, w; mpegts_service_t *s = (mpegts_service_t*)t; @@ -256,6 +257,9 @@ mpegts_service_enlist(service_t *t, struct service_instance_list *sil, int flags mi = mmi->mmi_input; + if (ti && (tvh_input_t *)mi != ti) + continue; + if (!mi->mi_is_enabled(mi, mmi->mmi_mux, flags)) continue; /* Set weight to -1 (forced) for already active mux */ @@ -495,7 +499,7 @@ mpegts_service_delete ( service_t *t, int delconf ) mpegts_mux_t *mm = ms->s_dvb_mux; /* Remove config */ - if (delconf) + if (delconf && t->s_type == STYPE_STD) hts_settings_remove("input/dvb/networks/%s/muxes/%s/services/%s", idnode_uuid_as_str(&mm->mm_network->mn_id), idnode_uuid_as_str(&mm->mm_id), @@ -531,7 +535,8 @@ mpegts_service_create0 /* defaults for older version */ s->s_dvb_created = dispatch_clock; - if (service_create0((service_t*)s, class, uuid, S_MPEG_TS, conf) == NULL) + if (service_create0((service_t*)s, STYPE_STD, class, uuid, + S_MPEG_TS, conf) == NULL) return NULL; /* Create */ @@ -547,7 +552,6 @@ mpegts_service_create0 s->s_delete = mpegts_service_delete; s->s_is_enabled = mpegts_service_is_enabled; - s->s_config_save = mpegts_service_config_save; s->s_enlist = mpegts_service_enlist; s->s_start_feed = mpegts_service_start; s->s_stop_feed = mpegts_service_stop; @@ -618,6 +622,128 @@ mpegts_service_find return s; } +/* + * Raw MPEGTS Service + */ + +const idclass_t mpegts_service_raw_class = +{ + .ic_super = &service_raw_class, + .ic_class = "mpegts_raw_service", + .ic_caption = "MPEGTS Raw Service", + .ic_properties = NULL +}; + +static void +mpegts_service_raw_setsourceinfo(service_t *t, source_info_t *si) +{ + mpegts_service_setsourceinfo(t, si); + + free(si->si_service); + si->si_service = strdup("Raw Service"); +} + +static int +mpegts_service_raw_update_pids(service_t *t, mpegts_apids_t *pids) +{ + mpegts_service_t *ms = (mpegts_service_t *)t; + mpegts_input_t *mi = ms->s_dvb_active_input; + mpegts_mux_t *mm = ms->s_dvb_mux; + mpegts_apids_t *p, *x; + mpegts_apids_t add, del; + int i; + + lock_assert(&global_lock); + if (pids) { + p = calloc(1, sizeof(*p)); + mpegts_pid_init(p, NULL, 0); + mpegts_pid_copy(p, pids); + } else + p = NULL; + if (mi && mm) { + pthread_mutex_lock(&mi->mi_output_lock); + pthread_mutex_lock(&t->s_stream_mutex); + x = t->s_pids; + t->s_pids = p; + if (!pids->all && x && x->all) { + mi->mi_close_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_RAW, t); + mpegts_input_close_pids(mi, mm, MPS_RAW, t); + for (i = 0; i < x->count; i++) + mi->mi_open_pid(mi, mm, x->pids[i], MPS_RAW, t); + } else { + if (pids->all) { + mpegts_input_close_pids(mi, mm, MPS_RAW, t); + mi->mi_open_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_RAW, t); + } else { + mpegts_pid_compare(p, x, &add, &del); + for (i = 0; i < del.count; i++) + mi->mi_close_pid(mi, mm, del.pids[i], MPS_RAW, t); + for (i = 0; i < add.count; i++) + mi->mi_open_pid(mi, mm, add.pids[i], MPS_RAW, t); + mpegts_pid_done(&add); + mpegts_pid_done(&del); + } + } + pthread_mutex_unlock(&t->s_stream_mutex); + pthread_mutex_unlock(&mi->mi_output_lock); + } else { + pthread_mutex_lock(&t->s_stream_mutex); + x = t->s_pids; + t->s_pids = p; + pthread_mutex_unlock(&t->s_stream_mutex); + } + if (x) { + mpegts_pid_done(x); + free(x); + } + return 0; +} + +mpegts_service_t * +mpegts_service_create_raw ( mpegts_mux_t *mm ) +{ + mpegts_service_t *s = calloc(1, sizeof(*s)); + char buf[256]; + + mpegts_mux_nice_name(mm, buf, sizeof(buf)); + + if (service_create0((service_t*)s, STYPE_RAW, + &mpegts_service_raw_class, NULL, + S_MPEG_TS, NULL) == NULL) { + free(s); + return NULL; + } + + sbuf_init(&s->s_tsbuf); + + s->s_dvb_mux = mm; + + s->s_delete = mpegts_service_delete; + s->s_is_enabled = mpegts_service_is_enabled; + s->s_config_save = mpegts_service_config_save; + s->s_enlist = mpegts_service_enlist; + s->s_start_feed = mpegts_service_start; + s->s_stop_feed = mpegts_service_stop; + s->s_refresh_feed = mpegts_service_refresh; + s->s_setsourceinfo = mpegts_service_raw_setsourceinfo; + s->s_grace_period = mpegts_service_grace_period; + s->s_channel_number = mpegts_service_channel_number; + s->s_channel_name = mpegts_service_channel_name; + s->s_provider_name = mpegts_service_provider_name; + s->s_channel_icon = mpegts_service_channel_icon; + s->s_mapped = mpegts_service_mapped; + s->s_update_pids = mpegts_service_raw_update_pids; + + pthread_mutex_lock(&s->s_stream_mutex); + free(s->s_nicename); + s->s_nicename = strdup(buf); + pthread_mutex_unlock(&s->s_stream_mutex); + + tvhlog(LOG_DEBUG, "mpegts", "%s - add raw service", buf); + + return s; +} + /****************************************************************************** * Editor Configuration * diff --git a/src/input/mpegts/tsdemux.c b/src/input/mpegts/tsdemux.c index 93ab1c51..9c8be7e3 100644 --- a/src/input/mpegts/tsdemux.c +++ b/src/input/mpegts/tsdemux.c @@ -214,6 +214,28 @@ ts_recv_packet2(mpegts_service_t *t, const uint8_t *tsb) ts_recv_packet0(t, st, tsb); } +/* + * + */ +void +ts_recv_raw(mpegts_service_t *t, const uint8_t *tsb) +{ + elementary_stream_t *st = NULL; + int pid; + + pthread_mutex_lock(&t->s_stream_mutex); + if (t->s_parent) { + /* If PID is owned by parent, let parent service to + * deliver this PID (decrambling) + */ + pid = (tsb[1] & 0x1f) << 8 | tsb[2]; + st = service_stream_find(t->s_parent, pid); + } + if(st == NULL) + if (streaming_pad_probe_type(&t->s_streaming_pad, SMT_MPEGTS)) + ts_remux(t, tsb, 0); + pthread_mutex_unlock(&t->s_stream_mutex); +} /** * diff --git a/src/input/mpegts/tsdemux.h b/src/input/mpegts/tsdemux.h index 756f9acb..74d19eca 100644 --- a/src/input/mpegts/tsdemux.h +++ b/src/input/mpegts/tsdemux.h @@ -26,4 +26,6 @@ int ts_recv_packet1 void ts_recv_packet2(struct mpegts_service *t, const uint8_t *tsb); +void ts_recv_raw(struct mpegts_service *t, const uint8_t *tsb); + #endif /* TSDEMUX_H */ diff --git a/src/satip/rtsp.c b/src/satip/rtsp.c index 04a987ea..c41fa4e4 100644 --- a/src/satip/rtsp.c +++ b/src/satip/rtsp.c @@ -247,6 +247,7 @@ rtsp_start mpegts_network_t *mn, *mn2; dvb_network_t *ln; dvb_mux_t *mux; + service_t *svc; char buf[384]; int res = HTTP_STATUS_SERVICE, qsize = 3000000, created = 0; @@ -290,10 +291,9 @@ rtsp_start config_get_int("satip_weight", 100), "SAT>IP", rs->prch.prch_flags | - SUBSCRIPTION_FULLMUX | SUBSCRIPTION_STREAMING, addrbuf, hc->hc_username, - http_arg_get(&hc->hc_args, "User-Agent"), NULL); + http_arg_get(&hc->hc_args, "User-Agent")); if (!rs->subs) goto endrtp; if (rs->run) { @@ -303,6 +303,8 @@ rtsp_start } } else { pids: + svc = rs->subs->ths_service; + svc->s_update_pids(svc, &rs->pids); satip_rtp_update_pids((void *)(intptr_t)rs->stream, &rs->pids); } if (!setup && !rs->run) { @@ -314,6 +316,8 @@ pids: rs->udp_rtp->fd, rs->udp_rtcp->fd, rs->frontend, rs->findex, &rs->mux->lm_tuning, &rs->pids); + svc = rs->subs->ths_service; + svc->s_update_pids(svc, &rs->pids); rs->run = 1; } pthread_mutex_unlock(&global_lock); diff --git a/src/service.c b/src/service.c index c78b4aa7..bf2b0e4b 100644 --- a/src/service.c +++ b/src/service.c @@ -53,6 +53,7 @@ static void service_class_delete(struct idnode *self); static void service_class_save(struct idnode *self); struct service_queue service_all; +struct service_queue service_raw_all; static void service_class_notify_enabled ( void *obj ) @@ -246,6 +247,17 @@ const idclass_t service_class = { } }; +const idclass_t service_raw_class = { + .ic_class = "service_raw", + .ic_caption = "Service Raw", + .ic_event = "service_raw", + .ic_perm_def = ACCESS_ADMIN, + .ic_delete = service_class_delete, + .ic_save = NULL, + .ic_get_title = service_class_get_title, + .ic_properties = NULL +}; + /** * */ @@ -682,7 +694,8 @@ service_start(service_t *t, int instance, int timeout, int postpone) */ service_instance_t * service_find_instance - (service_t *s, channel_t *ch, service_instance_list_t *sil, + (service_t *s, channel_t *ch, tvh_input_t *ti, + service_instance_list_t *sil, int *error, int weight, int flags, int timeout, int postpone) { channel_service_mapping_t *csm; @@ -703,10 +716,10 @@ service_find_instance LIST_FOREACH(csm, &ch->ch_services, csm_chn_link) { s = csm->csm_svc; if (s->s_is_enabled(s, flags)) - s->s_enlist(s, sil, flags); + s->s_enlist(s, ti, sil, flags); } } else { - s->s_enlist(s, sil, flags); + s->s_enlist(s, ti, sil, flags); } /* Clean */ @@ -842,7 +855,10 @@ service_destroy(service_t *t, int delconf) avgstat_flush(&t->s_rate); - TAILQ_REMOVE(&service_all, t, s_all_link); + if (t->s_type == STYPE_RAW) + TAILQ_REMOVE(&service_raw_all, t, s_all_link); + else + TAILQ_REMOVE(&service_all, t, s_all_link); service_unref(t); } @@ -882,7 +898,8 @@ service_provider_name ( service_t *s ) */ service_t * service_create0 - ( service_t *t, const idclass_t *class, const char *uuid, + ( service_t *t, int service_type, + const idclass_t *class, const char *uuid, int source_type, htsmsg_t *conf ) { if (idnode_insert(&t->s_id, uuid, class, 0)) { @@ -894,10 +911,14 @@ service_create0 lock_assert(&global_lock); - TAILQ_INSERT_TAIL(&service_all, t, s_all_link); + if (service_type == STYPE_RAW) + TAILQ_INSERT_TAIL(&service_raw_all, t, s_all_link); + else + TAILQ_INSERT_TAIL(&service_all, t, s_all_link); pthread_mutex_init(&t->s_stream_mutex, NULL); pthread_cond_init(&t->s_tss_cond, NULL); + t->s_type = service_type; t->s_source_type = source_type; t->s_refcount = 1; t->s_enabled = 1; @@ -1201,6 +1222,9 @@ service_restart(service_t *t) { int had_components; + if(t->s_type != STYPE_STD) + goto refresh; + pthread_mutex_lock(&t->s_stream_mutex); had_components = TAILQ_FIRST(&t->s_filt_components) != NULL && @@ -1227,6 +1251,7 @@ service_restart(service_t *t) pthread_mutex_unlock(&t->s_stream_mutex); +refresh: if(t->s_refresh_feed != NULL) t->s_refresh_feed(t); @@ -1298,6 +1323,9 @@ static struct service_queue pending_save_queue; void service_request_save(service_t *t, int restart) { + if (t->s_type != STYPE_STD && !restart) + return; + pthread_mutex_lock(&pending_save_mutex); if(!t->s_ps_onqueue) { @@ -1359,7 +1387,7 @@ service_saver(void *aux) pthread_mutex_unlock(&pending_save_mutex); pthread_mutex_lock(&global_lock); - if(t->s_status != SERVICE_ZOMBIE) + if(t->s_status != SERVICE_ZOMBIE && t->s_config_save) t->s_config_save(t); if(t->s_status == SERVICE_RUNNING && restart) service_restart(t); @@ -1384,6 +1412,7 @@ service_init(void) { TAILQ_INIT(&pending_save_queue); TAILQ_INIT(&service_all); + TAILQ_INIT(&service_raw_all); pthread_mutex_init(&pending_save_mutex, NULL); pthread_cond_init(&pending_save_cond, NULL); tvhthread_create(&service_saver_tid, NULL, service_saver, NULL); diff --git a/src/service.h b/src/service.h index dd660fec..e9e1b525 100644 --- a/src/service.h +++ b/src/service.h @@ -24,10 +24,14 @@ #include "descrambler.h" extern const idclass_t service_class; +extern const idclass_t service_raw_class; extern struct service_queue service_all; +extern struct service_queue service_raw_all; struct channel; +struct tvh_input; +struct mpegts_apids; /** * Stream, one media component for a service. @@ -232,6 +236,14 @@ typedef struct service { */ int s_refcount; + /** + * Service type, standard or raw (for mux or partial mux streaming) + */ + enum { + STYPE_STD, + STYPE_RAW + } s_type; + /** * Source type is used to determine if an output requesting * MPEG-TS can shortcut all the parsing and remuxing. @@ -282,7 +294,8 @@ typedef struct service { int (*s_is_enabled)(struct service *t, int flags); - void (*s_enlist)(struct service *s, service_instance_list_t *sil, int flags); + void (*s_enlist)(struct service *s, struct tvh_input *ti, + service_instance_list_t *sil, int flags); int (*s_start_feed)(struct service *s, int instance); @@ -298,6 +311,10 @@ typedef struct service { void (*s_delete)(struct service *t, int delconf); +#if ENABLE_MPEGTS + int (*s_update_pids)(struct service *t, struct mpegts_apids *pids); +#endif + /** * Channel info */ @@ -441,7 +458,10 @@ typedef struct service { struct elementary_stream_queue s_filt_components; int s_last_pid; elementary_stream_t *s_last_es; - +#if ENABLE_MPEGTS + struct service *s_parent; + struct mpegts_apids *s_pids; +#endif /** * Delivery pad, this is were we finally deliver all streaming output @@ -471,10 +491,11 @@ void service_stop(service_t *t); void service_build_filter(service_t *t); -service_t *service_create0(service_t *t, const idclass_t *idc, const char *uuid, int source_type, htsmsg_t *conf); +service_t *service_create0(service_t *t, int service_type, const idclass_t *idc, + const char *uuid, int source_type, htsmsg_t *conf); -#define service_create(t, c, u, s, m)\ - (struct t*)service_create0(calloc(1, sizeof(struct t), &t##_class, c, u, s, m) +#define service_create(t, y, c, u, s, m)\ + (struct t*)service_create0(calloc(1, sizeof(struct t), y, &t##_class, c, u, s, m) void service_unref(service_t *t); @@ -486,6 +507,7 @@ static inline service_t *service_find(const char *identifier) service_instance_t *service_find_instance(struct service *s, struct channel *ch, + struct tvh_input *source, service_instance_list_t *sil, int *error, int weight, int flags, int timeout, diff --git a/src/service_mapper.c b/src/service_mapper.c index 777da9ec..b7d98bf3 100644 --- a/src/service_mapper.c +++ b/src/service_mapper.c @@ -375,7 +375,7 @@ service_mapper_thread ( void *aux ) /* Subscribe */ tvhinfo("service_mapper", "checking %s", s->s_nicename); prch.prch_id = s; - sub = subscription_create_from_service(&prch, SUBSCRIPTION_PRIO_MAPPER, + sub = subscription_create_from_service(&prch, NULL, SUBSCRIPTION_PRIO_MAPPER, "service_mapper", 0, NULL, NULL, "service_mapper"); diff --git a/src/subscriptions.c b/src/subscriptions.c index d6c9ca68..28abf623 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -73,11 +73,13 @@ subscription_link_service(th_subscription_t *s, service_t *t) s->ths_service = t; LIST_INSERT_HEAD(&t->s_subscriptions, s, ths_service_link); - tvhtrace("subscription", "%04X: linking sub %p to svc %p", shortid(s), s, t); + tvhtrace("subscription", "%04X: linking sub %p to svc %p type %i", + shortid(s), s, t, t->s_type); pthread_mutex_lock(&t->s_stream_mutex); - if(TAILQ_FIRST(&t->s_filt_components) != NULL) { + if(TAILQ_FIRST(&t->s_filt_components) != NULL || + t->s_type != STYPE_STD) { streaming_msg_free(s->ths_start_message); @@ -148,37 +150,6 @@ subscription_unlink_service(th_subscription_t *s, int reason) subscription_unlink_service0(s, reason, 1); } -/* - * Called from mpegts code - */ -void -subscription_unlink_mux(th_subscription_t *s, int reason) -{ - streaming_message_t *sm; - mpegts_mux_instance_t *mmi = s->ths_mmi; - mpegts_mux_t *mm = mmi->mmi_mux; - mpegts_input_t *mi = mmi->mmi_input; - - gtimer_disarm(&s->ths_receive_timer); - - assert(mi); - - pthread_mutex_lock(&mi->mi_output_lock); - s->ths_mmi = NULL; - - if (!(s->ths_flags & SUBSCRIPTION_NONE)) - streaming_target_disconnect(&mmi->mmi_streaming_pad, &s->ths_input); - - sm = streaming_msg_create_code(SMT_STOP, reason); - streaming_target_deliver(s->ths_output, sm); - - if (s->ths_flags & SUBSCRIPTION_FULLMUX) - mi->mi_close_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_NONE, s); - LIST_REMOVE(s, ths_mmi_link); - - pthread_mutex_unlock(&mi->mi_output_lock); -} - /* ************************************************************************** * Scheduling * *************************************************************************/ @@ -212,10 +183,9 @@ subscription_show_info(th_subscription_t *s) size_t buflen; s->ths_service->s_setsourceinfo(s->ths_service, &si); - snprintf(buf, sizeof(buf), + buflen = snprintf(buf, sizeof(buf), "\"%s\" subscribing on \"%s\", weight: %d, adapter: \"%s\", " - "network: \"%s\", mux: \"%s\", provider: \"%s\", " - "service: \"%s\"", + "network: \"%s\", mux: \"%s\", provider: \"%s\", service: \"%s\"", s->ths_title, ch ? channel_get_name(ch) : "none", s->ths_weight, si.si_adapter ?: "", si.si_network ?: "", @@ -224,8 +194,12 @@ subscription_show_info(th_subscription_t *s) si.si_service ?: ""); service_source_info_free(&si); + if (s->ths_prch && s->ths_prch->prch_pro) + buflen += snprintf(buf + buflen, sizeof(buf) - buflen, + ", profile=\"%s\"", + s->ths_prch->prch_pro->pro_name ?: ""); + if (s->ths_hostname) { - buflen = strlen(buf); snprintf(buf + buflen, sizeof(buf) - buflen, ", hostname=\"%s\", username=\"%s\", client=\"%s\"", s->ths_hostname ?: "", @@ -264,7 +238,6 @@ subscription_reschedule(void) lock_assert(&global_lock); LIST_FOREACH(s, &subscriptions, ths_global_link) { - if (s->ths_mmi) continue; if (!s->ths_service && !s->ths_channel) continue; /* Postpone the tuner decision */ @@ -312,6 +285,7 @@ subscription_reschedule(void) tvhtrace("subscription", "%04X: find instance for %s weight %d", shortid(s), s->ths_service->s_nicename, s->ths_weight); si = service_find_instance(s->ths_service, s->ths_channel, + s->ths_source, &s->ths_instances, &error, s->ths_weight, s->ths_flags, s->ths_timeout, dispatch_clock > s->ths_postpone_end ? @@ -527,7 +501,7 @@ subscription_unsubscribe(th_subscription_t *s) LIST_REMOVE(s, ths_global_link); - if(s->ths_channel != NULL) { + if (s->ths_channel != NULL) { LIST_REMOVE(s, ths_channel_link); snprintf(buf, sizeof(buf), "\"%s\" unsubscribing from \"%s\"", s->ths_title, channel_get_name(s->ths_channel)); @@ -545,13 +519,13 @@ subscription_unsubscribe(th_subscription_t *s) } tvhlog(LOG_INFO, "subscription", "%04X: %s", shortid(s), buf); - if(t) + if (t) { service_remove_subscriber(t, s, SM_CODE_OK); - #if ENABLE_MPEGTS - if(s->ths_mmi) - mpegts_mux_remove_subscriber(s->ths_mmi->mmi_mux, s, SM_CODE_OK); + if (t->s_type == STYPE_RAW) + LIST_REMOVE(s, ths_mux_link); #endif + } streaming_msg_free(s->ths_start_message); @@ -640,26 +614,22 @@ subscription_create */ static th_subscription_t * subscription_create_from_channel_or_service(profile_chain_t *prch, + tvh_input_t *ti, unsigned int weight, const char *name, int flags, const char *hostname, const char *username, const char *client, - int service) + service_t *service) { th_subscription_t *s; channel_t *ch = NULL; - service_t *t = NULL; assert(prch); assert(prch->prch_id); - assert(prch->prch_st); - - if (service) - t = prch->prch_id; - else + if (!service) ch = prch->prch_id; s = subscription_create(prch, weight, name, flags, subscription_input, @@ -671,26 +641,36 @@ subscription_create_from_channel_or_service(profile_chain_t *prch, shortid(s), channel_get_name(ch), weight, pro_name); else tvhtrace("subscription", "%04X: creating subscription for service %s weight %d sing profile %s", - shortid(s), t->s_nicename, weight, pro_name); + shortid(s), service->s_nicename, weight, pro_name); #endif s->ths_channel = ch; - s->ths_service = t; + s->ths_service = service; + s->ths_source = ti; if (ch) LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link); +#if ENABLE_MPEGTS + if (service && service->s_type == STYPE_RAW) { + mpegts_mux_t *mm = prch->prch_id; + LIST_INSERT_HEAD(&mm->mm_raw_subs, s, ths_mux_link); + } +#endif + subscription_reschedule(); return s; } th_subscription_t * subscription_create_from_channel(profile_chain_t *prch, + tvh_input_t *ti, unsigned int weight, const char *name, int flags, const char *hostname, const char *username, const char *client) { + assert(prch->prch_st); return subscription_create_from_channel_or_service - (prch, weight, name, flags, hostname, username, client, 0); + (prch, ti, weight, name, flags, hostname, username, client, NULL); } /** @@ -698,65 +678,24 @@ subscription_create_from_channel(profile_chain_t *prch, */ th_subscription_t * subscription_create_from_service(profile_chain_t *prch, + tvh_input_t *ti, unsigned int weight, const char *name, int flags, const char *hostname, const char *username, const char *client) { + assert(prch->prch_st); return subscription_create_from_channel_or_service - (prch, weight, name, flags, hostname, username, client, 1); + (prch, ti, weight, name, flags, hostname, username, client, + prch->prch_id); } -/** - * - */ /** * */ #if ENABLE_MPEGTS -// TODO: move this -static void -mpegts_mux_setsourceinfo ( mpegts_mux_t *mm, source_info_t *si ) -{ - char buf[256]; - - /* Validate */ - lock_assert(&global_lock); - - /* Update */ - if(mm->mm_network->mn_network_name != NULL) - si->si_network = strdup(mm->mm_network->mn_network_name); - - mm->mm_display_name(mm, buf, sizeof(buf)); - si->si_mux = strdup(buf); - - if(mm->mm_active && mm->mm_active->mmi_input) { - mpegts_input_t *mi = mm->mm_active->mmi_input; - mi->mi_display_name(mi, buf, sizeof(buf)); - si->si_adapter = strdup(buf); - } -} - -static void -mux_data_timeout ( void *aux ) -{ - th_subscription_t *s = aux; - - if (!s->ths_mmi) - return; - - if (!s->ths_live) { - tvhwarn("subscription", "%04X: mux data timeout for %s", shortid(s), s->ths_title); - mpegts_mux_remove_subscriber(s->ths_mmi->mmi_mux, s, SM_CODE_NO_INPUT); - return; - } - s->ths_live = 0; - - if (s->ths_timeout > 0) - gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, s->ths_timeout); -} - +#include "input/mpegts.h" th_subscription_t * subscription_create_from_mux(profile_chain_t *prch, tvh_input_t *ti, @@ -765,81 +704,17 @@ subscription_create_from_mux(profile_chain_t *prch, int flags, const char *hostname, const char *username, - const char *client, - int *err) + const char *client) { mpegts_mux_t *mm = prch->prch_id; - th_subscription_t *s; - streaming_message_t *sm; - streaming_start_t *ss; - mpegts_input_t *mi; - int r; + mpegts_service_t *s = mpegts_service_create_raw(mm); - /* Tune */ - r = mm->mm_start(mm, (mpegts_input_t *)ti, name, weight, flags); - if (r) { - if (err) *err = r; + if (!s) return NULL; - } - /* Create subscription */ - if (!prch->prch_st) - flags |= SUBSCRIPTION_NONE; - s = subscription_create(prch, weight, name, flags, NULL, - hostname, username, client); - s->ths_mmi = mm->mm_active; - - /* Install full mux handler */ - mi = s->ths_mmi->mmi_input; - assert(mi); - - pthread_mutex_lock(&mi->mi_output_lock); - - if (s->ths_flags & SUBSCRIPTION_FULLMUX) - mi->mi_open_pid(mi, mm, MPEGTS_FULLMUX_PID, MPS_NONE, s); - - /* Store */ - LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link); - - /* Connect */ - if (prch->prch_st) - streaming_target_connect(&s->ths_mmi->mmi_streaming_pad, &s->ths_input); - - /* Deliver a start message */ - ss = calloc(1, sizeof(streaming_start_t)); - ss->ss_num_components = 0; - ss->ss_refcount = 1; - - mpegts_mux_setsourceinfo(mm, &ss->ss_si); - ss->ss_si.si_service = strdup("rawmux"); - - tvhinfo("subscription", - "%04X: \"%s\" subscribing to mux, weight: %d, adapter: \"%s\", " - "network: \"%s\", mux: \"%s\", hostname: \"%s\", username: \"%s\", " - "client: \"%s\"", - shortid(s), - s->ths_title, - s->ths_weight, - ss->ss_si.si_adapter ?: "", - ss->ss_si.si_network ?: "", - ss->ss_si.si_mux ?: "", - hostname ?: "", - username ?: "", - client ?: ""); - - sm = streaming_msg_create_data(SMT_START, ss); - streaming_target_deliver(s->ths_output, sm); - - r = (mi->mi_get_grace ? mi->mi_get_grace(mi, mm) : 0) + 20; - sm = streaming_msg_create_code(SMT_GRACE, r); - streaming_target_deliver(s->ths_output, sm); - - pthread_mutex_unlock(&mi->mi_output_lock); - - if (r > 0) - gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, r); - - return s; + return subscription_create_from_channel_or_service + (prch, ti, weight, name, flags, hostname, username, client, + (service_t *)s); } #endif @@ -903,13 +778,6 @@ subscription_create_msg(th_subscription_t *s) else if(s->ths_dvrfile != NULL) htsmsg_add_str(m, "service", s->ths_dvrfile ?: ""); - else if (s->ths_mmi != NULL && s->ths_mmi->mmi_mux != NULL) { - char buf[512]; - mpegts_mux_t *mm = s->ths_mmi->mmi_mux; - mpegts_mux_nice_name(mm, buf, sizeof(buf)); - htsmsg_add_str(m, "service", buf); - } - return m; } @@ -961,7 +829,11 @@ subscription_init(void) void subscription_done(void) { + th_subscription_t *s; + pthread_mutex_lock(&global_lock); + LIST_FOREACH(s, &subscriptions, ths_global_link) + subscription_unsubscribe(s); /* clear remaining subscriptions */ subscription_reschedule(); pthread_mutex_unlock(&global_lock); @@ -1100,7 +972,7 @@ subscription_dummy_join(const char *id, int first) st = calloc(1, sizeof(*st)); streaming_target_init(st, dummy_callback, NULL, 0); prch->prch_st = st; - s = subscription_create_from_service(prch, 1, "dummy", 0, NULL, NULL, "dummy"); + s = subscription_create_from_service(prch, NULL, 1, "dummy", 0, NULL, NULL, "dummy"); tvhlog(LOG_NOTICE, "subscription", "%04X: Dummy join %s ok", shortid(s), id); diff --git a/src/subscriptions.h b/src/subscriptions.h index 7ef65dcf..229bcc9b 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -27,13 +27,12 @@ extern struct th_subscription_list subscriptions; #define SUBSCRIPTION_RAW_MPEGTS 0x001 #define SUBSCRIPTION_NONE 0x002 -#define SUBSCRIPTION_FULLMUX 0x004 -#define SUBSCRIPTION_STREAMING 0x008 -#define SUBSCRIPTION_RESTART 0x010 -#define SUBSCRIPTION_INITSCAN 0x020 ///< for mux subscriptions -#define SUBSCRIPTION_IDLESCAN 0x040 ///< for mux subscriptions -#define SUBSCRIPTION_USERSCAN 0x080 ///< for mux subscriptions -#define SUBSCRIPTION_EPG 0x100 ///< for mux subscriptions +#define SUBSCRIPTION_STREAMING 0x004 +#define SUBSCRIPTION_RESTART 0x008 +#define SUBSCRIPTION_INITSCAN 0x010 ///< for mux subscriptions +#define SUBSCRIPTION_IDLESCAN 0x020 ///< for mux subscriptions +#define SUBSCRIPTION_USERSCAN 0x040 ///< for mux subscriptions +#define SUBSCRIPTION_EPG 0x080 ///< for mux subscriptions /* Some internal priorities */ #define SUBSCRIPTION_PRIO_KEEP 1 ///< Keep input rolling @@ -75,6 +74,8 @@ typedef struct th_subscription { struct service *ths_service; /* if NULL, ths_service_link is not linked */ + struct tvh_input *ths_source; /* if NULL, all sources are allowed */ + char *ths_title; /* display title */ time_t ths_start; /* time when subscription started */ int ths_total_err; /* total errors during entire subscription */ @@ -110,15 +111,10 @@ typedef struct th_subscription { int ths_postpone; time_t ths_postpone_end; -#if ENABLE_MPEGTS - // Note: its a bit ugly linking MPEG-TS code directly here, but to do - // otherwise would probably require adding lots of additional - // (repeated) logic elsewhere - LIST_ENTRY(th_subscription) ths_mmi_link; - struct mpegts_mux_instance *ths_mmi; - gtimer_t ths_receive_timer; - uint8_t ths_live; -#endif + /* + * MPEG-TS mux chain + */ + LIST_ENTRY(th_subscription) ths_mux_link; } th_subscription_t; @@ -138,6 +134,7 @@ void subscription_reschedule(void); th_subscription_t * subscription_create_from_channel(struct profile_chain *prch, + struct tvh_input *ti, unsigned int weight, const char *name, int flags, @@ -148,6 +145,7 @@ subscription_create_from_channel(struct profile_chain *prch, th_subscription_t * subscription_create_from_service(struct profile_chain *prch, + struct tvh_input *ti, unsigned int weight, const char *name, int flags, @@ -165,7 +163,7 @@ subscription_create_from_mux(struct profile_chain *prch, int flags, const char *hostname, const char *username, - const char *client, int *err); + const char *client); #endif th_subscription_t *subscription_create(struct profile_chain *prch, @@ -188,8 +186,6 @@ void subscription_stop(th_subscription_t *s); void subscription_unlink_service(th_subscription_t *s, int reason); -void subscription_unlink_mux(th_subscription_t *s, int reason); - void subscription_dummy_join(const char *id, int first); diff --git a/src/tvheadend.h b/src/tvheadend.h index 5c77108f..04a18c4a 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -215,7 +215,8 @@ int get_device_connection(const char *dev); typedef enum { SCT_NONE = -1, SCT_UNKNOWN = 0, - SCT_MPEG2VIDEO = 1, + SCT_RAW = 1, + SCT_MPEG2VIDEO, SCT_MPEG2AUDIO, SCT_H264, SCT_AC3, diff --git a/src/webui/webui.c b/src/webui/webui.c index d04cd821..66494e9c 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -259,7 +259,7 @@ http_stream_run(http_connection_t *hc, profile_chain_t *prch, while(!hc->hc_shutdown && run && tvheadend_running) { pthread_mutex_lock(&sq->sq_mutex); sm = TAILQ_FIRST(&sq->sq_queue); - if(sm == NULL) { + if(sm == NULL) { gettimeofday(&tp, NULL); ts.tv_sec = tp.tv_sec + 1; ts.tv_nsec = tp.tv_usec * 1000; @@ -769,7 +769,7 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight) tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50); - s = subscription_create_from_service(&prch, weight ?: 100, "HTTP", + s = subscription_create_from_service(&prch, NULL, weight ?: 100, "HTTP", prch.prch_flags | SUBSCRIPTION_STREAMING, addrbuf, hc->hc_username, @@ -801,11 +801,12 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight) th_subscription_t *s; profile_chain_t prch; size_t qsize; - const char *name; + const char *name, *str; char addrbuf[50]; void *tcp_id; - const char *str; - int res = HTTP_STATUS_SERVICE; + char *p, *saveptr; + mpegts_apids_t pids; + int res = HTTP_STATUS_SERVICE, i; if(http_access_verify(hc, ACCESS_ADVANCED_STREAMING)) return HTTP_STATUS_UNAUTHORIZED; @@ -818,21 +819,46 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight) else qsize = 10000000; + mpegts_pid_init(&pids, NULL, 0); + if ((str = http_arg_get(&hc->hc_req_args, "pids"))) { + p = tvh_strdupa(str); + p = strtok_r(p, ",", &saveptr); + while (p) { + if (strcmp(p, "all") == 0) { + pids.all = 1; + } else { + i = atoi(p); + if (i < 0 || i > 8192) + return HTTP_STATUS_BAD_REQUEST; + if (i == 8192) + pids.all = 1; + else + mpegts_pid_add(&pids, i); + } + p = strtok_r(NULL, ",", &saveptr); + } + if (!pids.all && pids.count <= 0) + return HTTP_STATUS_BAD_REQUEST; + } else { + pids.all = 1; + } + if (!profile_chain_raw_open(&prch, mm, qsize)) { tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50); s = subscription_create_from_mux(&prch, NULL, weight ?: 10, "HTTP", prch.prch_flags | - SUBSCRIPTION_FULLMUX | SUBSCRIPTION_STREAMING, addrbuf, hc->hc_username, - http_arg_get(&hc->hc_args, "User-Agent"), NULL); + http_arg_get(&hc->hc_args, "User-Agent")); if (s) { name = tvh_strdupa(s->ths_title); - pthread_mutex_unlock(&global_lock); - http_stream_run(hc, &prch, name, s); - pthread_mutex_lock(&global_lock); + if (s->ths_service->s_update_pids(s->ths_service, &pids) == 0) { + pthread_mutex_unlock(&global_lock); + http_stream_run(hc, &prch, name, s); + pthread_mutex_lock(&global_lock); + } subscription_unsubscribe(s); res = 0; } @@ -882,7 +908,8 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight) tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50); - s = subscription_create_from_channel(&prch, weight ?: 100, "HTTP", + s = subscription_create_from_channel(&prch, + NULL, weight ?: 100, "HTTP", prch.prch_flags | SUBSCRIPTION_STREAMING, addrbuf, hc->hc_username, http_arg_get(&hc->hc_args, "User-Agent"));