diff --git a/src/input/mpegts.h b/src/input/mpegts.h index d9dd5ee6..dd388a0a 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -45,6 +45,12 @@ typedef TAILQ_HEAD(mpegts_mux_queue,mpegts_mux) mpegts_mux_queue_t; typedef LIST_HEAD (mpegts_mux_list,mpegts_mux) mpegts_mux_list_t; TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed); +/* Classes */ +extern const idclass_t mpegts_network_class; +extern const idclass_t mpegts_mux_class; +extern const idclass_t mpegts_service_class; +extern const idclass_t mpegts_input_class; + /* ************************************************************************** * SI processing * *************************************************************************/ @@ -253,9 +259,8 @@ struct mpegts_mux void (*mm_config_save) (mpegts_mux_t *mm); void (*mm_display_name) (mpegts_mux_t*, char *buf, size_t len); int (*mm_is_enabled) (mpegts_mux_t *mm); - int (*mm_start) (mpegts_mux_t *mm, - void *src, const char *r, int w); - void (*mm_stop) (mpegts_mux_t *mm, void *src, int force); + int (*mm_start) (mpegts_mux_t *mm, const char *r, int w); + void (*mm_stop) (mpegts_mux_t *mm, int force); void (*mm_open_table) (mpegts_mux_t*,mpegts_table_t*); void (*mm_close_table) (mpegts_mux_t*,mpegts_table_t*); void (*mm_create_instances) (mpegts_mux_t*); @@ -352,7 +357,7 @@ struct mpegts_mux_instance mpegts_mux_t *mmi_mux; mpegts_input_t *mmi_input; - RB_HEAD(,mpegts_mux_sub) mmi_subs; + LIST_HEAD(,th_subscription) mmi_subs; // TODO: remove this int mmi_tune_failed; // this is really DVB @@ -530,7 +535,7 @@ mpegts_service_t *mpegts_mux_find_service(mpegts_mux_t *ms, uint16_t sid); &type##_class, uuid,\ mi, mm); int mpegts_mux_instance_start - ( mpegts_mux_instance_t **mmiptr, void *src, int weight ); + ( mpegts_mux_instance_t **mmiptr ); int mpegts_mux_set_tsid ( mpegts_mux_t *mm, uint16_t tsid ); int mpegts_mux_set_onid ( mpegts_mux_t *mm, uint16_t onid ); diff --git a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c index edaf711b..01670da2 100644 --- a/src/input/mpegts/linuxdvb/linuxdvb_frontend.c +++ b/src/input/mpegts/linuxdvb/linuxdvb_frontend.c @@ -682,7 +682,7 @@ linuxdvb_frontend_tune0 return 0; /* Stop current */ - cur->mmi_mux->mm_stop(cur->mmi_mux, NULL, 1); + cur->mmi_mux->mm_stop(cur->mmi_mux, 1); } assert(LIST_FIRST(&lfe->mi_mux_active) == NULL); diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index 1002a798..fce1d220 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -93,24 +93,23 @@ int mpegts_input_current_weight ( mpegts_input_t *mi ) { const mpegts_mux_instance_t *mmi; - const mpegts_mux_sub_t *mms; const service_t *s; const th_subscription_t *ths; int w = 0; - /* Check for scan (weight 1) */ + /* Direct subs */ LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) { - RB_FOREACH(mms, &mmi->mmi_subs, mms_link) - w = MAX(w, mms->mms_weight); + LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) { + w = MAX(w, ths->ths_weight); + } } - /* Check for mux subs */ - - /* Check subscriptions */ + /* Service subs */ pthread_mutex_lock(&mi->mi_delivery_mutex); LIST_FOREACH(s, &mi->mi_transports, s_active_link) { - LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) + LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) { w = MAX(w, ths->ths_weight); + } } pthread_mutex_unlock(&mi->mi_delivery_mutex); return w; diff --git a/src/input/mpegts/mpegts_mux.c b/src/input/mpegts/mpegts_mux.c index 13e62d65..93dd874a 100644 --- a/src/input/mpegts/mpegts_mux.c +++ b/src/input/mpegts/mpegts_mux.c @@ -20,6 +20,7 @@ #include "idnode.h" #include "queue.h" #include "input/mpegts.h" +#include "subscriptions.h" #include @@ -68,17 +69,9 @@ mpegts_mux_instance_create0 return mmi; } -static int -mms_cmp ( mpegts_mux_sub_t *a, mpegts_mux_sub_t *b ) -{ - if (a->mms_src < b->mms_src) return -1; - if (a->mms_src > b->mms_src) return 1; - return 0; -} - int mpegts_mux_instance_start - ( mpegts_mux_instance_t **mmiptr, void *src, int weight ) + ( mpegts_mux_instance_t **mmiptr ) { int r; char buf[256], buf2[256];; @@ -100,21 +93,6 @@ mpegts_mux_instance_start r = mmi->mmi_input->mi_start_mux(mmi->mmi_input, mmi); if (r) return r; - /* Add sub */ - if (src) { - mpegts_mux_sub_t *sub; - static mpegts_mux_sub_t *skel = NULL; - if (!skel) - skel = calloc(1, sizeof(mpegts_mux_sub_t)); - skel->mms_src = src; - sub = RB_INSERT_SORTED(&mmi->mmi_subs, skel, mms_link, mms_cmp); - if (!sub) { - sub = skel; - skel = NULL; - sub->mms_weight = weight; - } - } - /* Start */ tvhdebug("mpegts", "%s - started", buf); mmi->mmi_input->mi_started_mux(mmi->mmi_input, mmi); @@ -275,7 +253,7 @@ mpegts_mux_delete ( mpegts_mux_t *mm ) tvhinfo("mpegts", "%s - deleting", buf); /* Stop */ - mm->mm_stop(mm, NULL, 1); + mm->mm_stop(mm, 1); /* Remove from lists */ LIST_REMOVE(mm, mm_network_link); @@ -315,7 +293,7 @@ mpegts_mux_create_instances ( mpegts_mux_t *mm ) static int mpegts_mux_start - ( mpegts_mux_t *mm, void *src, const char *reason, int weight ) + ( mpegts_mux_t *mm, const char *reason, int weight ) { int pass, fail; char buf[256]; @@ -378,7 +356,7 @@ mpegts_mux_start if (tune) { tvhinfo("mpegts", "%s - starting for '%s' (weight %d)", buf, reason, weight); - if (!(fail = mpegts_mux_instance_start(&tune, src, weight))) break; + if (!(fail = mpegts_mux_instance_start(&tune))) break; tune = NULL; tvhwarn("mpegts", "%s - failed to start, try another", buf); } @@ -403,7 +381,7 @@ mpegts_mux_has_subscribers ( mpegts_mux_t *mm ) { mpegts_mux_instance_t *mmi = mm->mm_active; if (mmi) { - if (RB_FIRST(&mmi->mmi_subs)) + if (LIST_FIRST(&mmi->mmi_subs)) return 1; return mmi->mmi_input->mi_has_subscription(mmi->mmi_input, mm); } @@ -411,28 +389,12 @@ mpegts_mux_has_subscribers ( mpegts_mux_t *mm ) } static void -mpegts_mux_stop ( mpegts_mux_t *mm, void *src, int force ) +mpegts_mux_stop ( mpegts_mux_t *mm, int force ) { char buf[256]; mpegts_mux_instance_t *mmi = mm->mm_active; mpegts_input_t *mi = NULL; - mpegts_mux_sub_t *sub, skel; - - /* Remove subs */ - if (mmi) { - if (force) { - while ((sub = RB_FIRST(&mmi->mmi_subs))) { - RB_REMOVE(&mmi->mmi_subs, sub, mms_link); - free(sub); - } - } else if (src) { - skel.mms_src = src; - if ((sub = RB_FIND(&mmi->mmi_subs, &skel, mms_link, mms_cmp))) { - RB_REMOVE(&mmi->mmi_subs, sub, mms_link); - free(sub); - } - } - } + th_subscription_t *sub; if (!force && mpegts_mux_has_subscribers(mm)) return; @@ -441,6 +403,8 @@ mpegts_mux_stop ( mpegts_mux_t *mm, void *src, int force ) tvhdebug("mpegts", "%s - stopping mux", buf); if (mmi) { + LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link) + subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN); mi = mmi->mmi_input; mi->mi_stop_mux(mi, mmi); mi->mi_stopped_mux(mi, mmi); @@ -530,7 +494,7 @@ mpegts_mux_initial_scan_timeout ( void *aux ) char buf[256]; mpegts_mux_t *mm = aux; mm->mm_display_name(mm, buf, sizeof(buf)); - tvhdebug("mpegts", "%s - initial scan timed out", buf); + tvhinfo("mpegts", "%s - initial scan timed out", buf); mpegts_mux_initial_scan_done(mm); } @@ -539,6 +503,10 @@ mpegts_mux_initial_scan_done ( mpegts_mux_t *mm ) { char buf[256]; mpegts_network_t *mn = mm->mm_network; + mpegts_mux_instance_t *mmi; + th_subscription_t *s; + + /* Stop */ mm->mm_display_name(mm, buf, sizeof(buf)); tvhinfo("mpegts", "%s - initial scan complete", buf); gtimer_disarm(&mm->mm_initial_scan_timeout); @@ -548,8 +516,12 @@ mpegts_mux_initial_scan_done ( mpegts_mux_t *mm ) TAILQ_REMOVE(&mn->mn_initial_scan_current_queue, mm, mm_initial_scan_link); mpegts_network_schedule_initial_scan(mn); - /* Stop */ - mm->mm_stop(mm, mn, 0); + /* Unsubscribe */ + // TODO: might be better to make a slightly more concrate link here + LIST_FOREACH(mmi, &mm->mm_instances, mmi_mux_link) + LIST_FOREACH(s, &mmi->mmi_subs, ths_mmi_link) + if (!strcmp(s->ths_title, "initscan")) + subscription_unsubscribe(s); /* Save */ mm->mm_initial_scan_done = 1; diff --git a/src/input/mpegts/mpegts_network.c b/src/input/mpegts/mpegts_network.c index 0edd4a66..783cbc92 100644 --- a/src/input/mpegts/mpegts_network.c +++ b/src/input/mpegts/mpegts_network.c @@ -17,6 +17,7 @@ */ #include "input/mpegts.h" +#include "subscriptions.h" #include @@ -235,14 +236,16 @@ mpegts_network_delete static void mpegts_network_initial_scan(void *aux) { - mpegts_network_t *mn = aux; - mpegts_mux_t *mm; + mpegts_network_t *mn = aux; + mpegts_mux_t *mm; + th_subscription_t *s; tvhtrace("mpegts", "setup initial scan for %p", mn); while((mm = TAILQ_FIRST(&mn->mn_initial_scan_pending_queue)) != NULL) { assert(mm->mm_initial_scan_status == MM_SCAN_PENDING); - if (mm->mm_start(mm, mn, "initial scan", 1)) - break; + s = subscription_create_from_mux(mm, 1, "initscan", NULL, + SUBSCRIPTION_NONE, NULL, NULL, NULL); + if (!s) break; assert(mm->mm_initial_scan_status == MM_SCAN_CURRENT); } gtimer_arm(&mn->mn_initial_scan_timer, mpegts_network_initial_scan, mn, 10); diff --git a/src/input/mpegts/mpegts_service.c b/src/input/mpegts/mpegts_service.c index 9f47b9bf..1322cd2d 100644 --- a/src/input/mpegts/mpegts_service.c +++ b/src/input/mpegts/mpegts_service.c @@ -182,7 +182,7 @@ mpegts_service_start(service_t *t, int instance) return SM_CODE_UNDEFINED_ERROR; /* Start Mux */ - r = mpegts_mux_instance_start(&mmi, NULL, 0); + r = mpegts_mux_instance_start(&mmi); /* Start */ if (!r) { @@ -210,7 +210,7 @@ mpegts_service_stop(service_t *t) lock_assert(&global_lock); /* Stop */ - mm->mm_stop(mm, NULL, 0); + mm->mm_stop(mm, 0); i->mi_close_service(i, s); s->s_status = SERVICE_IDLE; } diff --git a/src/subscriptions.c b/src/subscriptions.c index 7de155ba..a641338f 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -39,6 +39,9 @@ #include "htsmsg.h" #include "notify.h" #include "atomic.h" +#if ENABLE_MPEGTS +#include "input/mpegts.h" +#endif struct th_subscription_list subscriptions; static gtimer_t subscription_reschedule_timer; @@ -136,6 +139,32 @@ subscription_unlink_service(th_subscription_t *s, int reason) s->ths_service = NULL; } +/* + * Called from mpegts code + */ +void +subscription_unlink_mux(th_subscription_t *s, int reason) +{ + streaming_message_t *sm; + mpegts_mux_instance_t *mmi = s->ths_mmi; + + pthread_mutex_lock(&mmi->mmi_input->mi_delivery_mutex); + + streaming_target_disconnect(&mmi->mmi_streaming_pad, &s->ths_input); + + sm = streaming_msg_create_code(SMT_STOP, reason); + streaming_target_deliver(s->ths_output, sm); + + pthread_mutex_unlock(&mmi->mmi_input->mi_delivery_mutex); + + LIST_REMOVE(s, ths_mmi_link); + s->ths_mmi = NULL; + + /* Free memory */ + if (s->ths_flags & SUBSCRIPTION_NONE) + subscription_unsubscribe(s); +} + /** * @@ -153,10 +182,12 @@ subscription_reschedule_cb(void *aux) void subscription_reschedule(void) { + static int reenter = 0; th_subscription_t *s; service_instance_t *si; streaming_message_t *sm; int error; + if (reenter) return; lock_assert(&global_lock); @@ -164,6 +195,8 @@ subscription_reschedule(void) subscription_reschedule_cb, NULL, 2); LIST_FOREACH(s, &subscriptions, ths_global_link) { + if (s->ths_mmi) continue; + if (!s->ths_channel && !s->ths_mmi) continue; #if 0 if(s->ths_channel == NULL) continue; /* stale entry, channel has been destroyed */ @@ -173,7 +206,7 @@ subscription_reschedule(void) /* Already got a service */ if(s->ths_state != SUBSCRIPTION_BAD_SERVICE) - continue; /* And it not bad, so we're happy */ + continue; /* And it not bad, so we're happy */ si = s->ths_current_instance; @@ -200,6 +233,8 @@ subscription_reschedule(void) subscription_link_service(s, si->si_s); } + + reenter = 0; } /** @@ -228,13 +263,11 @@ subscription_unsubscribe(th_subscription_t *s) if(t != NULL) service_remove_subscriber(t, s, SM_CODE_OK); -#ifdef TODO_NEED_A_BETTER_SOLUTION - if(s->ths_tdmi != NULL) { - LIST_REMOVE(s, ths_tdmi_link); - th_dvb_adapter_t *tda = s->ths_tdmi->tdmi_adapter; - pthread_mutex_lock(&tda->tda_delivery_mutex); - streaming_target_disconnect(&tda->tda_streaming_pad, &s->ths_input); - pthread_mutex_unlock(&tda->tda_delivery_mutex); +#if ENABLE_MPEGTS + if(s->ths_mmi) { + subscription_unlink_mux(s, SM_CODE_SUBSCRIPTION_OVERRIDDEN); + if (s->ths_mmi->mmi_mux) + s->ths_mmi->mmi_mux->mm_stop(s->ths_mmi->mmi_mux, 0); } #endif @@ -345,11 +378,14 @@ subscription_create(int weight, const char *name, streaming_target_t *st, int reject = 0; static int tally; - if(flags & SUBSCRIPTION_RAW_MPEGTS) + if (flags & SUBSCRIPTION_NONE) + reject |= -1; + else if(flags & SUBSCRIPTION_RAW_MPEGTS) reject |= SMT_TO_MASK(SMT_PACKET); // Reject parsed frames else reject |= SMT_TO_MASK(SMT_MPEGTS); // Reject raw mpegts + // TODO: possibly we don't connect anything for SUB_NONE streaming_target_init(&s->ths_input, cb ?: subscription_input_direct, s, reject); @@ -449,6 +485,106 @@ subscription_create_from_service(service_t *t, unsigned int weight, (NULL, t, weight, name, st, flags, hostname, username, client); } +/** + * + */ +/** + * + */ +#if ENABLE_MPEGTS +// TODO: move this +static void +mpegts_mux_setsourceinfo ( mpegts_mux_t *mm, source_info_t *si ) +{ + char buf[128]; + + /* Validate */ + lock_assert(&global_lock); + + /* Update */ + if(mm->mm_network->mn_network_name != NULL) + si->si_network = strdup(mm->mm_network->mn_network_name); + + mm->mm_display_name(mm, buf, sizeof(buf)); + si->si_mux = strdup(buf); + + if(mm->mm_active && mm->mm_active->mmi_input) { + mpegts_input_t *mi = mm->mm_active->mmi_input; + mi->mi_display_name(mi, buf, sizeof(buf)); + si->si_adapter = strdup(buf); + } +} + + +th_subscription_t * +subscription_create_from_mux + (mpegts_mux_t *mm, + unsigned int weight, + const char *name, + streaming_target_t *st, + int flags, + const char *hostname, + const char *username, + const char *client) +{ + th_subscription_t *s; + streaming_message_t *sm; + streaming_start_t *ss; + int r; + + if (!flags) + flags = SUBSCRIPTION_RAW_MPEGTS; + + s = subscription_create(weight, name, st, flags, + NULL, hostname, username, client); + + /* Tune */ + r = mm->mm_start(mm, s->ths_title, weight); + + /* Failed */ + if (r) { + subscription_unsubscribe(s); + return NULL; + } + s->ths_mmi = mm->mm_active; + + pthread_mutex_lock(&s->ths_mmi->mmi_input->mi_delivery_mutex); + + /* Store */ + LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link); + + /* Connect (not for NONE streams) */ + if (!(flags & SUBSCRIPTION_NONE)) { + streaming_target_connect(&s->ths_mmi->mmi_streaming_pad, &s->ths_input); + + /* Deliver a start message */ + ss = calloc(1, sizeof(streaming_start_t)); + ss->ss_num_components = 0; + ss->ss_refcount = 1; + + mpegts_mux_setsourceinfo(mm, &ss->ss_si); + ss->ss_si.si_service = strdup("rawmux"); + + sm = streaming_msg_create_data(SMT_START, ss); + streaming_target_deliver(s->ths_output, sm); + + tvhinfo("subscription", + "'%s' subscribing to mux, weight: %d, adapter: '%s', " + "network: '%s', mux: '%s'", + s->ths_title, + s->ths_weight, + ss->ss_si.si_adapter ?: "", + ss->ss_si.si_network ?: "", + ss->ss_si.si_mux ?: ""); + } + + pthread_mutex_unlock(&s->ths_mmi->mmi_input->mi_delivery_mutex); + + notify_reload("subscriptions"); + return s; +} +#endif + /** * */ @@ -579,10 +715,10 @@ subscription_create_msg(th_subscription_t *s) htsmsg_add_str(m, "title", s->ths_title); if(s->ths_channel != NULL) - htsmsg_add_str(m, "channel", s->ths_channel->ch_name); + htsmsg_add_str(m, "channel", s->ths_channel->ch_name ?: ""); if(s->ths_service != NULL) - htsmsg_add_str(m, "service", s->ths_service->s_nicename); + htsmsg_add_str(m, "service", s->ths_service->s_nicename ?: ""); return m; } diff --git a/src/subscriptions.h b/src/subscriptions.h index f229158a..5364225d 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -24,6 +24,7 @@ extern struct th_subscription_list subscriptions; #define SUBSCRIPTION_RAW_MPEGTS 0x1 +#define SUBSCRIPTION_NONE 0x2 typedef struct th_subscription { @@ -73,11 +74,12 @@ typedef struct th_subscription { struct service_instance_list ths_instances; struct service_instance *ths_current_instance; -#ifdef TODO_NEED_A_BETTER_SOLUTION - // Ugly ugly ugly to refer DVB code here - - LIST_ENTRY(th_subscription) ths_tdmi_link; - struct th_dvb_mux_instance *ths_tdmi; +#if ENABLE_MPEGTS + // Note: its a bit ugly linking MPEG-TS code directly here, but to do + // otherwise would probably require adding lots of additional + // (repeated) logic elsewhere + LIST_ENTRY(th_subscription) ths_mmi_link; + struct mpegts_mux_instance *ths_mmi; #endif } th_subscription_t; @@ -113,6 +115,19 @@ th_subscription_t *subscription_create_from_service(struct service *t, const char *username, const char *client); +#if ENABLE_MPEGTS +struct mpegts_mux; +th_subscription_t *subscription_create_from_mux + (struct mpegts_mux *m, + unsigned int weight, + const char *name, + streaming_target_t *st, + int flags, + const char *hostname, + const char *username, + const char *client); +#endif + th_subscription_t *subscription_create(int weight, const char *name, streaming_target_t *st, int flags, st_callback_t *cb, @@ -120,6 +135,7 @@ th_subscription_t *subscription_create(int weight, const char *name, const char *username, const char *client); + void subscription_change_weight(th_subscription_t *s, int weight); void subscription_set_speed @@ -132,6 +148,8 @@ void subscription_stop(th_subscription_t *s); void subscription_unlink_service(th_subscription_t *s, int reason); +void subscription_unlink_mux(th_subscription_t *s, int reason); + void subscription_dummy_join(const char *id, int first); int subscriptions_active(void);