From 4eb97a20f3e2971cc0e533633ca98a66e7933a50 Mon Sep 17 00:00:00 2001 From: Adam Sutton Date: Mon, 2 Sep 2013 10:58:22 +0100 Subject: [PATCH] subscription: Created a proper mux subscription mechanism It's a bit ugly to link subs and mpegts directly, but its not the end of the world (and probably unavoidable without lots of duplication). I'm still not convinced its robust to mux deletions if subs exist on that mux. Probably needs mmi to be ref counted. There is also a special kind of sub that expects to receive no stream data, i.e. all data will come from the SI tables only. This special sub is automatically unsubscribed (but that might need changing). --- src/input/mpegts.h | 15 +- src/input/mpegts/linuxdvb/linuxdvb_frontend.c | 2 +- src/input/mpegts/mpegts_input.c | 15 +- src/input/mpegts/mpegts_mux.c | 70 +++----- src/input/mpegts/mpegts_network.c | 11 +- src/input/mpegts/mpegts_service.c | 4 +- src/subscriptions.c | 158 ++++++++++++++++-- src/subscriptions.h | 28 +++- 8 files changed, 218 insertions(+), 85 deletions(-) diff --git a/src/input/mpegts.h b/src/input/mpegts.h index d9dd5ee6..dd388a0a 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -45,6 +45,12 @@ typedef TAILQ_HEAD(mpegts_mux_queue,mpegts_mux) mpegts_mux_queue_t; typedef LIST_HEAD (mpegts_mux_list,mpegts_mux) mpegts_mux_list_t; TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed); +/* Classes */ +extern const idclass_t mpegts_network_class; +extern const idclass_t mpegts_mux_class; +extern const idclass_t mpegts_service_class; +extern const idclass_t mpegts_input_class; + /* ************************************************************************** * SI processing * *************************************************************************/ @@ -253,9 +259,8 @@ struct mpegts_mux void (*mm_config_save) (mpegts_mux_t *mm); void (*mm_display_name) (mpegts_mux_t*, char *buf, size_t len); int (*mm_is_enabled) (mpegts_mux_t *mm); - int (*mm_start) (mpegts_mux_t *mm, - void *src, const char *r, int w); - void (*mm_stop) (mpegts_mux_t *mm, void *src, int force); + int (*mm_start) (mpegts_mux_t *mm, const char *r, int w); + void (*mm_stop) (mpegts_mux_t *mm, int force); void (*mm_open_table) (mpegts_mux_t*,mpegts_table_t*); void (*mm_close_table) (mpegts_mux_t*,mpegts_table_t*); void (*mm_create_instances) (mpegts_mux_t*); @@ -352,7 +357,7 @@ struct mpegts_mux_instance mpegts_mux_t *mmi_mux; mpegts_input_t *mmi_input; - RB_HEAD(,mpegts_mux_sub) mmi_subs; + LIST_HEAD(,th_subscription) mmi_subs; // TODO: remove this int mmi_tune_failed; // this is really DVB @@ -530,7 +535,7 @@ mpegts_service_t *mpegts_mux_find_service(mpegts_mux_t *ms, uint16_t sid); &type##_class, uuid,\ mi, mm); int mpegts_mux_instance_start - ( mpegts_mux_instance_t **mmiptr, void *src, int weight ); + ( mpegts_mux_instance_t **mmiptr ); int mpegts_mux_set_tsid ( mpegts_mux_t *mm, uint16_t tsid ); int mpegts_mux_set_onid ( mpegts_mux_t *mm, uint16_t onid ); diff --git a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c index edaf711b..01670da2 100644 --- a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c +++ b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c @@ -682,7 +682,7 @@ linuxdvb_frontend_tune0 return 0; /* Stop current */ - cur->mmi_mux->mm_stop(cur->mmi_mux, NULL, 1); + cur->mmi_mux->mm_stop(cur->mmi_mux, 1); } assert(LIST_FIRST(&lfe->mi_mux_active) == NULL); diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index 1002a798..fce1d220 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -93,24 +93,23 @@ int mpegts_input_current_weight ( mpegts_input_t *mi ) { const mpegts_mux_instance_t *mmi; - const mpegts_mux_sub_t *mms; const service_t *s; const th_subscription_t *ths; int w = 0; - /* Check for scan (weight 1) */ + /* Direct subs */ LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) { - RB_FOREACH(mms, &mmi->mmi_subs, mms_link) - w = MAX(w, mms->mms_weight); + LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) { + w = MAX(w, ths->ths_weight); + } } - /* Check for mux subs */ - - /* Check subscriptions */ + /* Service subs */ pthread_mutex_lock(&mi->mi_delivery_mutex); LIST_FOREACH(s, &mi->mi_transports, s_active_link) { - LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) + LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) { w = MAX(w, ths->ths_weight); + } } pthread_mutex_unlock(&mi->mi_delivery_mutex); return w; diff --git a/src/input/mpegts/mpegts_mux.c b/src/input/mpegts/mpegts_mux.c index 13e62d65..93dd874a 100644 --- a/src/input/mpegts/mpegts_mux.c +++ b/src/input/mpegts/mpegts_mux.c @@ -20,6 +20,7 @@ #include "idnode.h" #include "queue.h" #include "input/mpegts.h" +#include "subscriptions.h" #include @@ -68,17 +69,9 @@ mpegts_mux_instance_create0 return mmi; } -static int -mms_cmp ( mpegts_mux_sub_t *a, mpegts_mux_sub_t *b ) -{ - if (a->mms_src < b->mms_src) return -1; - if (a->mms_src > b->mms_src) return 1; - return 0; -} - int mpegts_mux_instance_start - ( mpegts_mux_instance_t **mmiptr, void *src, int weight ) + ( mpegts_mux_instance_t **mmiptr ) { int r; char buf[256], buf2[256];; @@ -100,21 +93,6 @@ mpegts_mux_instance_start r = mmi->mmi_input->mi_start_mux(mmi->mmi_input, mmi); if (r) return r; - /* Add sub */ - if (src) { - mpegts_mux_sub_t *sub; - static mpegts_mux_sub_t *skel = NULL; - if (!skel) - skel = calloc(1, sizeof(mpegts_mux_sub_t)); - skel->mms_src = src; - sub = RB_INSERT_SORTED(&mmi->mmi_subs, skel, mms_link, mms_cmp); - if (!sub) { - sub = skel; - skel = NULL; - sub->mms_weight = weight; - } - } - /* Start */ tvhdebug("mpegts", "%s - started", buf); mmi->mmi_input->mi_started_mux(mmi->mmi_input, mmi); @@ -275,7 +253,7 @@ mpegts_mux_delete ( mpegts_mux_t *mm ) tvhinfo("mpegts", "%s - deleting", buf); /* Stop */ - mm->mm_stop(mm, NULL, 1); + mm->mm_stop(mm, 1); /* Remove from lists */ LIST_REMOVE(mm, mm_network_link); @@ -315,7 +293,7 @@ mpegts_mux_create_instances ( mpegts_mux_t *mm ) static int mpegts_mux_start - ( mpegts_mux_t *mm, void *src, const char *reason, int weight ) + ( mpegts_mux_t *mm, const char *reason, int weight ) { int pass, fail; char buf[256]; @@ -378,7 +356,7 @@ mpegts_mux_start if (tune) { tvhinfo("mpegts", "%s - starting for '%s' (weight %d)", buf, reason, weight); - if (!(fail = mpegts_mux_instance_start(&tune, src, weight))) break; + if (!(fail = mpegts_mux_instance_start(&tune))) break; tune = NULL; tvhwarn("mpegts", "%s - failed to start, try another", buf); } @@ -403,7 +381,7 @@ mpegts_mux_has_subscribers ( mpegts_mux_t *mm ) { mpegts_mux_instance_t *mmi = mm->mm_active; if (mmi) { - if (RB_FIRST(&mmi->mmi_subs)) + if (LIST_FIRST(&mmi->mmi_subs)) return 1; return mmi->mmi_input->mi_has_subscription(mmi->mmi_input, mm); } @@ -411,28 +389,12 @@ mpegts_mux_has_subscribers ( mpegts_mux_t *mm ) } static void -mpegts_mux_stop ( mpegts_mux_t *mm, void *src, int force ) +mpegts_mux_stop ( mpegts_mux_t *mm, int force ) { char buf[256]; mpegts_mux_instance_t *mmi = mm->mm_active; mpegts_input_t *mi = NULL; - mpegts_mux_sub_t *sub, skel; - - /* Remove subs */ - if (mmi) { - if (force) { - while ((sub = RB_FIRST(&mmi->mmi_subs))) { - RB_REMOVE(&mmi->mmi_subs, sub, mms_link); - free(sub); - } - } else if (src) { - skel.mms_src = src; - if ((sub = RB_FIND(&mmi->mmi_subs, &skel, mms_link, mms_cmp))) { - RB_REMOVE(&mmi->mmi_subs, sub, mms_link); - free(sub); - } - } - } + th_subscription_t *sub; if (!force && mpegts_mux_has_subscribers(mm)) return; @@ -441,6 +403,8 @@ mpegts_mux_stop ( mpegts_mux_t *mm, void *src, int force ) tvhdebug("mpegts", "%s - stopping mux", buf); if (mmi) { + LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link) + subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN); mi = mmi->mmi_input; mi->mi_stop_mux(mi, mmi); mi->mi_stopped_mux(mi, mmi); @@ -530,7 +494,7 @@ mpegts_mux_initial_scan_timeout ( void *aux ) char buf[256]; mpegts_mux_t *mm = aux; mm->mm_display_name(mm, buf, sizeof(buf)); - tvhdebug("mpegts", "%s - initial scan timed out", buf); + tvhinfo("mpegts", "%s - initial scan timed out", buf); mpegts_mux_initial_scan_done(mm); } @@ -539,6 +503,10 @@ mpegts_mux_initial_scan_done ( mpegts_mux_t *mm ) { char buf[256]; mpegts_network_t *mn = mm->mm_network; + mpegts_mux_instance_t *mmi; + th_subscription_t *s; + + /* Stop */ mm->mm_display_name(mm, buf, sizeof(buf)); tvhinfo("mpegts", "%s - initial scan complete", buf); gtimer_disarm(&mm->mm_initial_scan_timeout); @@ -548,8 +516,12 @@ mpegts_mux_initial_scan_done ( mpegts_mux_t *mm ) TAILQ_REMOVE(&mn->mn_initial_scan_current_queue, mm, mm_initial_scan_link); mpegts_network_schedule_initial_scan(mn); - /* Stop */ - mm->mm_stop(mm, mn, 0); + /* Unsubscribe */ + // TODO: might be better to make a slightly more concrate link here + LIST_FOREACH(mmi, &mm->mm_instances, mmi_mux_link) + LIST_FOREACH(s, &mmi->mmi_subs, ths_mmi_link) + if (!strcmp(s->ths_title, "initscan")) + subscription_unsubscribe(s); /* Save */ mm->mm_initial_scan_done = 1; diff --git a/src/input/mpegts/mpegts_network.c b/src/input/mpegts/mpegts_network.c index 0edd4a66..783cbc92 100644 --- a/src/input/mpegts/mpegts_network.c +++ b/src/input/mpegts/mpegts_network.c @@ -17,6 +17,7 @@ */ #include "input/mpegts.h" +#include "subscriptions.h" #include @@ -235,14 +236,16 @@ mpegts_network_delete static void mpegts_network_initial_scan(void *aux) { - mpegts_network_t *mn = aux; - mpegts_mux_t *mm; + mpegts_network_t *mn = aux; + mpegts_mux_t *mm; + th_subscription_t *s; tvhtrace("mpegts", "setup initial scan for %p", mn); while((mm = TAILQ_FIRST(&mn->mn_initial_scan_pending_queue)) != NULL) { assert(mm->mm_initial_scan_status == MM_SCAN_PENDING); - if (mm->mm_start(mm, mn, "initial scan", 1)) - break; + s = subscription_create_from_mux(mm, 1, "initscan", NULL, + SUBSCRIPTION_NONE, NULL, NULL, NULL); + if (!s) break; assert(mm->mm_initial_scan_status == MM_SCAN_CURRENT); } gtimer_arm(&mn->mn_initial_scan_timer, mpegts_network_initial_scan, mn, 10); diff --git a/src/input/mpegts/mpegts_service.c b/src/input/mpegts/mpegts_service.c index 9f47b9bf..1322cd2d 100644 --- a/src/input/mpegts/mpegts_service.c +++ b/src/input/mpegts/mpegts_service.c @@ -182,7 +182,7 @@ mpegts_service_start(service_t *t, int instance) return SM_CODE_UNDEFINED_ERROR; /* Start Mux */ - r = mpegts_mux_instance_start(&mmi, NULL, 0); + r = mpegts_mux_instance_start(&mmi); /* Start */ if (!r) { @@ -210,7 +210,7 @@ mpegts_service_stop(service_t *t) lock_assert(&global_lock); /* Stop */ - mm->mm_stop(mm, NULL, 0); + mm->mm_stop(mm, 0); i->mi_close_service(i, s); s->s_status = SERVICE_IDLE; } diff --git a/src/subscriptions.c b/src/subscriptions.c index 7de155ba..a641338f 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -39,6 +39,9 @@ #include "htsmsg.h" #include "notify.h" #include "atomic.h" +#if ENABLE_MPEGTS +#include "input/mpegts.h" +#endif struct th_subscription_list subscriptions; static gtimer_t subscription_reschedule_timer; @@ -136,6 +139,32 @@ subscription_unlink_service(th_subscription_t *s, int reason) s->ths_service = NULL; } +/* + * Called from mpegts code + */ +void +subscription_unlink_mux(th_subscription_t *s, int reason) +{ + streaming_message_t *sm; + mpegts_mux_instance_t *mmi = s->ths_mmi; + + pthread_mutex_lock(&mmi->mmi_input->mi_delivery_mutex); + + streaming_target_disconnect(&mmi->mmi_streaming_pad, &s->ths_input); + + sm = streaming_msg_create_code(SMT_STOP, reason); + streaming_target_deliver(s->ths_output, sm); + + pthread_mutex_unlock(&mmi->mmi_input->mi_delivery_mutex); + + LIST_REMOVE(s, ths_mmi_link); + s->ths_mmi = NULL; + + /* Free memory */ + if (s->ths_flags & SUBSCRIPTION_NONE) + subscription_unsubscribe(s); +} + /** * @@ -153,10 +182,12 @@ subscription_reschedule_cb(void *aux) void subscription_reschedule(void) { + static int reenter = 0; th_subscription_t *s; service_instance_t *si; streaming_message_t *sm; int error; + if (reenter) return; lock_assert(&global_lock); @@ -164,6 +195,8 @@ subscription_reschedule(void) subscription_reschedule_cb, NULL, 2); LIST_FOREACH(s, &subscriptions, ths_global_link) { + if (s->ths_mmi) continue; + if (!s->ths_channel && !s->ths_mmi) continue; #if 0 if(s->ths_channel == NULL) continue; /* stale entry, channel has been destroyed */ @@ -173,7 +206,7 @@ subscription_reschedule(void) /* Already got a service */ if(s->ths_state != SUBSCRIPTION_BAD_SERVICE) - continue; /* And it not bad, so we're happy */ + continue; /* And it not bad, so we're happy */ si = s->ths_current_instance; @@ -200,6 +233,8 @@ subscription_reschedule(void) subscription_link_service(s, si->si_s); } + + reenter = 0; } /** @@ -228,13 +263,11 @@ subscription_unsubscribe(th_subscription_t *s) if(t != NULL) service_remove_subscriber(t, s, SM_CODE_OK); -#ifdef TODO_NEED_A_BETTER_SOLUTION - if(s->ths_tdmi != NULL) { - LIST_REMOVE(s, ths_tdmi_link); - th_dvb_adapter_t *tda = s->ths_tdmi->tdmi_adapter; - pthread_mutex_lock(&tda->tda_delivery_mutex); - streaming_target_disconnect(&tda->tda_streaming_pad, &s->ths_input); - pthread_mutex_unlock(&tda->tda_delivery_mutex); +#if ENABLE_MPEGTS + if(s->ths_mmi) { + subscription_unlink_mux(s, SM_CODE_SUBSCRIPTION_OVERRIDDEN); + if (s->ths_mmi->mmi_mux) + s->ths_mmi->mmi_mux->mm_stop(s->ths_mmi->mmi_mux, 0); } #endif @@ -345,11 +378,14 @@ subscription_create(int weight, const char *name, streaming_target_t *st, int reject = 0; static int tally; - if(flags & SUBSCRIPTION_RAW_MPEGTS) + if (flags & SUBSCRIPTION_NONE) + reject |= -1; + else if(flags & SUBSCRIPTION_RAW_MPEGTS) reject |= SMT_TO_MASK(SMT_PACKET); // Reject parsed frames else reject |= SMT_TO_MASK(SMT_MPEGTS); // Reject raw mpegts + // TODO: possibly we don't connect anything for SUB_NONE streaming_target_init(&s->ths_input, cb ?: subscription_input_direct, s, reject); @@ -449,6 +485,106 @@ subscription_create_from_service(service_t *t, unsigned int weight, (NULL, t, weight, name, st, flags, hostname, username, client); } +/** + * + */ +/** + * + */ +#if ENABLE_MPEGTS +// TODO: move this +static void +mpegts_mux_setsourceinfo ( mpegts_mux_t *mm, source_info_t *si ) +{ + char buf[128]; + + /* Validate */ + lock_assert(&global_lock); + + /* Update */ + if(mm->mm_network->mn_network_name != NULL) + si->si_network = strdup(mm->mm_network->mn_network_name); + + mm->mm_display_name(mm, buf, sizeof(buf)); + si->si_mux = strdup(buf); + + if(mm->mm_active && mm->mm_active->mmi_input) { + mpegts_input_t *mi = mm->mm_active->mmi_input; + mi->mi_display_name(mi, buf, sizeof(buf)); + si->si_adapter = strdup(buf); + } +} + + +th_subscription_t * +subscription_create_from_mux + (mpegts_mux_t *mm, + unsigned int weight, + const char *name, + streaming_target_t *st, + int flags, + const char *hostname, + const char *username, + const char *client) +{ + th_subscription_t *s; + streaming_message_t *sm; + streaming_start_t *ss; + int r; + + if (!flags) + flags = SUBSCRIPTION_RAW_MPEGTS; + + s = subscription_create(weight, name, st, flags, + NULL, hostname, username, client); + + /* Tune */ + r = mm->mm_start(mm, s->ths_title, weight); + + /* Failed */ + if (r) { + subscription_unsubscribe(s); + return NULL; + } + s->ths_mmi = mm->mm_active; + + pthread_mutex_lock(&s->ths_mmi->mmi_input->mi_delivery_mutex); + + /* Store */ + LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link); + + /* Connect (not for NONE streams) */ + if (!(flags & SUBSCRIPTION_NONE)) { + streaming_target_connect(&s->ths_mmi->mmi_streaming_pad, &s->ths_input); + + /* Deliver a start message */ + ss = calloc(1, sizeof(streaming_start_t)); + ss->ss_num_components = 0; + ss->ss_refcount = 1; + + mpegts_mux_setsourceinfo(mm, &ss->ss_si); + ss->ss_si.si_service = strdup("rawmux"); + + sm = streaming_msg_create_data(SMT_START, ss); + streaming_target_deliver(s->ths_output, sm); + + tvhinfo("subscription", + "'%s' subscribing to mux, weight: %d, adapter: '%s', " + "network: '%s', mux: '%s'", + s->ths_title, + s->ths_weight, + ss->ss_si.si_adapter ?: "", + ss->ss_si.si_network ?: "", + ss->ss_si.si_mux ?: ""); + } + + pthread_mutex_unlock(&s->ths_mmi->mmi_input->mi_delivery_mutex); + + notify_reload("subscriptions"); + return s; +} +#endif + /** * */ @@ -579,10 +715,10 @@ subscription_create_msg(th_subscription_t *s) htsmsg_add_str(m, "title", s->ths_title); if(s->ths_channel != NULL) - htsmsg_add_str(m, "channel", s->ths_channel->ch_name); + htsmsg_add_str(m, "channel", s->ths_channel->ch_name ?: ""); if(s->ths_service != NULL) - htsmsg_add_str(m, "service", s->ths_service->s_nicename); + htsmsg_add_str(m, "service", s->ths_service->s_nicename ?: ""); return m; } diff --git a/src/subscriptions.h b/src/subscriptions.h index f229158a..5364225d 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -24,6 +24,7 @@ extern struct th_subscription_list subscriptions; #define SUBSCRIPTION_RAW_MPEGTS 0x1 +#define SUBSCRIPTION_NONE 0x2 typedef struct th_subscription { @@ -73,11 +74,12 @@ typedef struct th_subscription { struct service_instance_list ths_instances; struct service_instance *ths_current_instance; -#ifdef TODO_NEED_A_BETTER_SOLUTION - // Ugly ugly ugly to refer DVB code here - - LIST_ENTRY(th_subscription) ths_tdmi_link; - struct th_dvb_mux_instance *ths_tdmi; +#if ENABLE_MPEGTS + // Note: its a bit ugly linking MPEG-TS code directly here, but to do + // otherwise would probably require adding lots of additional + // (repeated) logic elsewhere + LIST_ENTRY(th_subscription) ths_mmi_link; + struct mpegts_mux_instance *ths_mmi; #endif } th_subscription_t; @@ -113,6 +115,19 @@ th_subscription_t *subscription_create_from_service(struct service *t, const char *username, const char *client); +#if ENABLE_MPEGTS +struct mpegts_mux; +th_subscription_t *subscription_create_from_mux + (struct mpegts_mux *m, + unsigned int weight, + const char *name, + streaming_target_t *st, + int flags, + const char *hostname, + const char *username, + const char *client); +#endif + th_subscription_t *subscription_create(int weight, const char *name, streaming_target_t *st, int flags, st_callback_t *cb, @@ -120,6 +135,7 @@ th_subscription_t *subscription_create(int weight, const char *name, const char *username, const char *client); + void subscription_change_weight(th_subscription_t *s, int weight); void subscription_set_speed @@ -132,6 +148,8 @@ void subscription_stop(th_subscription_t *s); void subscription_unlink_service(th_subscription_t *s, int reason); +void subscription_unlink_mux(th_subscription_t *s, int reason); + void subscription_dummy_join(const char *id, int first); int subscriptions_active(void);