subscription: Created a proper mux subscription mechanism

It's a bit ugly to link subs and mpegts directly, but its not the end
of the world (and probably unavoidable without lots of duplication).

I'm still not convinced its robust to mux deletions if subs exist on
that mux. Probably needs mmi to be ref counted.

There is also a special kind of sub that expects to receive no stream
data, i.e. all data will come from the SI tables only. This special
sub is automatically unsubscribed (but that might need changing).
This commit is contained in:
Adam Sutton 2013-09-02 10:58:22 +01:00
parent 5564cdce6e
commit 4eb97a20f3
8 changed files with 218 additions and 85 deletions

View file

@ -45,6 +45,12 @@ typedef TAILQ_HEAD(mpegts_mux_queue,mpegts_mux) mpegts_mux_queue_t;
typedef LIST_HEAD (mpegts_mux_list,mpegts_mux) mpegts_mux_list_t;
TAILQ_HEAD(mpegts_table_feed_queue, mpegts_table_feed);
/* Classes */
extern const idclass_t mpegts_network_class;
extern const idclass_t mpegts_mux_class;
extern const idclass_t mpegts_service_class;
extern const idclass_t mpegts_input_class;
/* **************************************************************************
* SI processing
* *************************************************************************/
@ -253,9 +259,8 @@ struct mpegts_mux
void (*mm_config_save) (mpegts_mux_t *mm);
void (*mm_display_name) (mpegts_mux_t*, char *buf, size_t len);
int (*mm_is_enabled) (mpegts_mux_t *mm);
int (*mm_start) (mpegts_mux_t *mm,
void *src, const char *r, int w);
void (*mm_stop) (mpegts_mux_t *mm, void *src, int force);
int (*mm_start) (mpegts_mux_t *mm, const char *r, int w);
void (*mm_stop) (mpegts_mux_t *mm, int force);
void (*mm_open_table) (mpegts_mux_t*,mpegts_table_t*);
void (*mm_close_table) (mpegts_mux_t*,mpegts_table_t*);
void (*mm_create_instances) (mpegts_mux_t*);
@ -352,7 +357,7 @@ struct mpegts_mux_instance
mpegts_mux_t *mmi_mux;
mpegts_input_t *mmi_input;
RB_HEAD(,mpegts_mux_sub) mmi_subs;
LIST_HEAD(,th_subscription) mmi_subs;
// TODO: remove this
int mmi_tune_failed; // this is really DVB
@ -530,7 +535,7 @@ mpegts_service_t *mpegts_mux_find_service(mpegts_mux_t *ms, uint16_t sid);
&type##_class, uuid,\
mi, mm);
int mpegts_mux_instance_start
( mpegts_mux_instance_t **mmiptr, void *src, int weight );
( mpegts_mux_instance_t **mmiptr );
int mpegts_mux_set_tsid ( mpegts_mux_t *mm, uint16_t tsid );
int mpegts_mux_set_onid ( mpegts_mux_t *mm, uint16_t onid );

View file

@ -682,7 +682,7 @@ linuxdvb_frontend_tune0
return 0;
/* Stop current */
cur->mmi_mux->mm_stop(cur->mmi_mux, NULL, 1);
cur->mmi_mux->mm_stop(cur->mmi_mux, 1);
}
assert(LIST_FIRST(&lfe->mi_mux_active) == NULL);

View file

@ -93,24 +93,23 @@ int
mpegts_input_current_weight ( mpegts_input_t *mi )
{
const mpegts_mux_instance_t *mmi;
const mpegts_mux_sub_t *mms;
const service_t *s;
const th_subscription_t *ths;
int w = 0;
/* Check for scan (weight 1) */
/* Direct subs */
LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
RB_FOREACH(mms, &mmi->mmi_subs, mms_link)
w = MAX(w, mms->mms_weight);
LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) {
w = MAX(w, ths->ths_weight);
}
}
/* Check for mux subs */
/* Check subscriptions */
/* Service subs */
pthread_mutex_lock(&mi->mi_delivery_mutex);
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link)
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
w = MAX(w, ths->ths_weight);
}
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
return w;

View file

@ -20,6 +20,7 @@
#include "idnode.h"
#include "queue.h"
#include "input/mpegts.h"
#include "subscriptions.h"
#include <assert.h>
@ -68,17 +69,9 @@ mpegts_mux_instance_create0
return mmi;
}
static int
mms_cmp ( mpegts_mux_sub_t *a, mpegts_mux_sub_t *b )
{
if (a->mms_src < b->mms_src) return -1;
if (a->mms_src > b->mms_src) return 1;
return 0;
}
int
mpegts_mux_instance_start
( mpegts_mux_instance_t **mmiptr, void *src, int weight )
( mpegts_mux_instance_t **mmiptr )
{
int r;
char buf[256], buf2[256];;
@ -100,21 +93,6 @@ mpegts_mux_instance_start
r = mmi->mmi_input->mi_start_mux(mmi->mmi_input, mmi);
if (r) return r;
/* Add sub */
if (src) {
mpegts_mux_sub_t *sub;
static mpegts_mux_sub_t *skel = NULL;
if (!skel)
skel = calloc(1, sizeof(mpegts_mux_sub_t));
skel->mms_src = src;
sub = RB_INSERT_SORTED(&mmi->mmi_subs, skel, mms_link, mms_cmp);
if (!sub) {
sub = skel;
skel = NULL;
sub->mms_weight = weight;
}
}
/* Start */
tvhdebug("mpegts", "%s - started", buf);
mmi->mmi_input->mi_started_mux(mmi->mmi_input, mmi);
@ -275,7 +253,7 @@ mpegts_mux_delete ( mpegts_mux_t *mm )
tvhinfo("mpegts", "%s - deleting", buf);
/* Stop */
mm->mm_stop(mm, NULL, 1);
mm->mm_stop(mm, 1);
/* Remove from lists */
LIST_REMOVE(mm, mm_network_link);
@ -315,7 +293,7 @@ mpegts_mux_create_instances ( mpegts_mux_t *mm )
static int
mpegts_mux_start
( mpegts_mux_t *mm, void *src, const char *reason, int weight )
( mpegts_mux_t *mm, const char *reason, int weight )
{
int pass, fail;
char buf[256];
@ -378,7 +356,7 @@ mpegts_mux_start
if (tune) {
tvhinfo("mpegts", "%s - starting for '%s' (weight %d)",
buf, reason, weight);
if (!(fail = mpegts_mux_instance_start(&tune, src, weight))) break;
if (!(fail = mpegts_mux_instance_start(&tune))) break;
tune = NULL;
tvhwarn("mpegts", "%s - failed to start, try another", buf);
}
@ -403,7 +381,7 @@ mpegts_mux_has_subscribers ( mpegts_mux_t *mm )
{
mpegts_mux_instance_t *mmi = mm->mm_active;
if (mmi) {
if (RB_FIRST(&mmi->mmi_subs))
if (LIST_FIRST(&mmi->mmi_subs))
return 1;
return mmi->mmi_input->mi_has_subscription(mmi->mmi_input, mm);
}
@ -411,28 +389,12 @@ mpegts_mux_has_subscribers ( mpegts_mux_t *mm )
}
static void
mpegts_mux_stop ( mpegts_mux_t *mm, void *src, int force )
mpegts_mux_stop ( mpegts_mux_t *mm, int force )
{
char buf[256];
mpegts_mux_instance_t *mmi = mm->mm_active;
mpegts_input_t *mi = NULL;
mpegts_mux_sub_t *sub, skel;
/* Remove subs */
if (mmi) {
if (force) {
while ((sub = RB_FIRST(&mmi->mmi_subs))) {
RB_REMOVE(&mmi->mmi_subs, sub, mms_link);
free(sub);
}
} else if (src) {
skel.mms_src = src;
if ((sub = RB_FIND(&mmi->mmi_subs, &skel, mms_link, mms_cmp))) {
RB_REMOVE(&mmi->mmi_subs, sub, mms_link);
free(sub);
}
}
}
th_subscription_t *sub;
if (!force && mpegts_mux_has_subscribers(mm))
return;
@ -441,6 +403,8 @@ mpegts_mux_stop ( mpegts_mux_t *mm, void *src, int force )
tvhdebug("mpegts", "%s - stopping mux", buf);
if (mmi) {
LIST_FOREACH(sub, &mmi->mmi_subs, ths_mmi_link)
subscription_unlink_mux(sub, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
mi = mmi->mmi_input;
mi->mi_stop_mux(mi, mmi);
mi->mi_stopped_mux(mi, mmi);
@ -530,7 +494,7 @@ mpegts_mux_initial_scan_timeout ( void *aux )
char buf[256];
mpegts_mux_t *mm = aux;
mm->mm_display_name(mm, buf, sizeof(buf));
tvhdebug("mpegts", "%s - initial scan timed out", buf);
tvhinfo("mpegts", "%s - initial scan timed out", buf);
mpegts_mux_initial_scan_done(mm);
}
@ -539,6 +503,10 @@ mpegts_mux_initial_scan_done ( mpegts_mux_t *mm )
{
char buf[256];
mpegts_network_t *mn = mm->mm_network;
mpegts_mux_instance_t *mmi;
th_subscription_t *s;
/* Stop */
mm->mm_display_name(mm, buf, sizeof(buf));
tvhinfo("mpegts", "%s - initial scan complete", buf);
gtimer_disarm(&mm->mm_initial_scan_timeout);
@ -548,8 +516,12 @@ mpegts_mux_initial_scan_done ( mpegts_mux_t *mm )
TAILQ_REMOVE(&mn->mn_initial_scan_current_queue, mm, mm_initial_scan_link);
mpegts_network_schedule_initial_scan(mn);
/* Stop */
mm->mm_stop(mm, mn, 0);
/* Unsubscribe */
// TODO: might be better to make a slightly more concrate link here
LIST_FOREACH(mmi, &mm->mm_instances, mmi_mux_link)
LIST_FOREACH(s, &mmi->mmi_subs, ths_mmi_link)
if (!strcmp(s->ths_title, "initscan"))
subscription_unsubscribe(s);
/* Save */
mm->mm_initial_scan_done = 1;

View file

@ -17,6 +17,7 @@
*/
#include "input/mpegts.h"
#include "subscriptions.h"
#include <assert.h>
@ -235,14 +236,16 @@ mpegts_network_delete
static void
mpegts_network_initial_scan(void *aux)
{
mpegts_network_t *mn = aux;
mpegts_mux_t *mm;
mpegts_network_t *mn = aux;
mpegts_mux_t *mm;
th_subscription_t *s;
tvhtrace("mpegts", "setup initial scan for %p", mn);
while((mm = TAILQ_FIRST(&mn->mn_initial_scan_pending_queue)) != NULL) {
assert(mm->mm_initial_scan_status == MM_SCAN_PENDING);
if (mm->mm_start(mm, mn, "initial scan", 1))
break;
s = subscription_create_from_mux(mm, 1, "initscan", NULL,
SUBSCRIPTION_NONE, NULL, NULL, NULL);
if (!s) break;
assert(mm->mm_initial_scan_status == MM_SCAN_CURRENT);
}
gtimer_arm(&mn->mn_initial_scan_timer, mpegts_network_initial_scan, mn, 10);

View file

@ -182,7 +182,7 @@ mpegts_service_start(service_t *t, int instance)
return SM_CODE_UNDEFINED_ERROR;
/* Start Mux */
r = mpegts_mux_instance_start(&mmi, NULL, 0);
r = mpegts_mux_instance_start(&mmi);
/* Start */
if (!r) {
@ -210,7 +210,7 @@ mpegts_service_stop(service_t *t)
lock_assert(&global_lock);
/* Stop */
mm->mm_stop(mm, NULL, 0);
mm->mm_stop(mm, 0);
i->mi_close_service(i, s);
s->s_status = SERVICE_IDLE;
}

View file

@ -39,6 +39,9 @@
#include "htsmsg.h"
#include "notify.h"
#include "atomic.h"
#if ENABLE_MPEGTS
#include "input/mpegts.h"
#endif
struct th_subscription_list subscriptions;
static gtimer_t subscription_reschedule_timer;
@ -136,6 +139,32 @@ subscription_unlink_service(th_subscription_t *s, int reason)
s->ths_service = NULL;
}
/*
* Called from mpegts code
*/
void
subscription_unlink_mux(th_subscription_t *s, int reason)
{
streaming_message_t *sm;
mpegts_mux_instance_t *mmi = s->ths_mmi;
pthread_mutex_lock(&mmi->mmi_input->mi_delivery_mutex);
streaming_target_disconnect(&mmi->mmi_streaming_pad, &s->ths_input);
sm = streaming_msg_create_code(SMT_STOP, reason);
streaming_target_deliver(s->ths_output, sm);
pthread_mutex_unlock(&mmi->mmi_input->mi_delivery_mutex);
LIST_REMOVE(s, ths_mmi_link);
s->ths_mmi = NULL;
/* Free memory */
if (s->ths_flags & SUBSCRIPTION_NONE)
subscription_unsubscribe(s);
}
/**
*
@ -153,10 +182,12 @@ subscription_reschedule_cb(void *aux)
void
subscription_reschedule(void)
{
static int reenter = 0;
th_subscription_t *s;
service_instance_t *si;
streaming_message_t *sm;
int error;
if (reenter) return;
lock_assert(&global_lock);
@ -164,6 +195,8 @@ subscription_reschedule(void)
subscription_reschedule_cb, NULL, 2);
LIST_FOREACH(s, &subscriptions, ths_global_link) {
if (s->ths_mmi) continue;
if (!s->ths_channel && !s->ths_mmi) continue;
#if 0
if(s->ths_channel == NULL)
continue; /* stale entry, channel has been destroyed */
@ -173,7 +206,7 @@ subscription_reschedule(void)
/* Already got a service */
if(s->ths_state != SUBSCRIPTION_BAD_SERVICE)
continue; /* And it not bad, so we're happy */
continue; /* And it not bad, so we're happy */
si = s->ths_current_instance;
@ -200,6 +233,8 @@ subscription_reschedule(void)
subscription_link_service(s, si->si_s);
}
reenter = 0;
}
/**
@ -228,13 +263,11 @@ subscription_unsubscribe(th_subscription_t *s)
if(t != NULL)
service_remove_subscriber(t, s, SM_CODE_OK);
#ifdef TODO_NEED_A_BETTER_SOLUTION
if(s->ths_tdmi != NULL) {
LIST_REMOVE(s, ths_tdmi_link);
th_dvb_adapter_t *tda = s->ths_tdmi->tdmi_adapter;
pthread_mutex_lock(&tda->tda_delivery_mutex);
streaming_target_disconnect(&tda->tda_streaming_pad, &s->ths_input);
pthread_mutex_unlock(&tda->tda_delivery_mutex);
#if ENABLE_MPEGTS
if(s->ths_mmi) {
subscription_unlink_mux(s, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
if (s->ths_mmi->mmi_mux)
s->ths_mmi->mmi_mux->mm_stop(s->ths_mmi->mmi_mux, 0);
}
#endif
@ -345,11 +378,14 @@ subscription_create(int weight, const char *name, streaming_target_t *st,
int reject = 0;
static int tally;
if(flags & SUBSCRIPTION_RAW_MPEGTS)
if (flags & SUBSCRIPTION_NONE)
reject |= -1;
else if(flags & SUBSCRIPTION_RAW_MPEGTS)
reject |= SMT_TO_MASK(SMT_PACKET); // Reject parsed frames
else
reject |= SMT_TO_MASK(SMT_MPEGTS); // Reject raw mpegts
// TODO: possibly we don't connect anything for SUB_NONE
streaming_target_init(&s->ths_input,
cb ?: subscription_input_direct, s, reject);
@ -449,6 +485,106 @@ subscription_create_from_service(service_t *t, unsigned int weight,
(NULL, t, weight, name, st, flags, hostname, username, client);
}
/**
*
*/
/**
*
*/
#if ENABLE_MPEGTS
// TODO: move this
static void
mpegts_mux_setsourceinfo ( mpegts_mux_t *mm, source_info_t *si )
{
char buf[128];
/* Validate */
lock_assert(&global_lock);
/* Update */
if(mm->mm_network->mn_network_name != NULL)
si->si_network = strdup(mm->mm_network->mn_network_name);
mm->mm_display_name(mm, buf, sizeof(buf));
si->si_mux = strdup(buf);
if(mm->mm_active && mm->mm_active->mmi_input) {
mpegts_input_t *mi = mm->mm_active->mmi_input;
mi->mi_display_name(mi, buf, sizeof(buf));
si->si_adapter = strdup(buf);
}
}
th_subscription_t *
subscription_create_from_mux
(mpegts_mux_t *mm,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client)
{
th_subscription_t *s;
streaming_message_t *sm;
streaming_start_t *ss;
int r;
if (!flags)
flags = SUBSCRIPTION_RAW_MPEGTS;
s = subscription_create(weight, name, st, flags,
NULL, hostname, username, client);
/* Tune */
r = mm->mm_start(mm, s->ths_title, weight);
/* Failed */
if (r) {
subscription_unsubscribe(s);
return NULL;
}
s->ths_mmi = mm->mm_active;
pthread_mutex_lock(&s->ths_mmi->mmi_input->mi_delivery_mutex);
/* Store */
LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link);
/* Connect (not for NONE streams) */
if (!(flags & SUBSCRIPTION_NONE)) {
streaming_target_connect(&s->ths_mmi->mmi_streaming_pad, &s->ths_input);
/* Deliver a start message */
ss = calloc(1, sizeof(streaming_start_t));
ss->ss_num_components = 0;
ss->ss_refcount = 1;
mpegts_mux_setsourceinfo(mm, &ss->ss_si);
ss->ss_si.si_service = strdup("rawmux");
sm = streaming_msg_create_data(SMT_START, ss);
streaming_target_deliver(s->ths_output, sm);
tvhinfo("subscription",
"'%s' subscribing to mux, weight: %d, adapter: '%s', "
"network: '%s', mux: '%s'",
s->ths_title,
s->ths_weight,
ss->ss_si.si_adapter ?: "<N/A>",
ss->ss_si.si_network ?: "<N/A>",
ss->ss_si.si_mux ?: "<N/A>");
}
pthread_mutex_unlock(&s->ths_mmi->mmi_input->mi_delivery_mutex);
notify_reload("subscriptions");
return s;
}
#endif
/**
*
*/
@ -579,10 +715,10 @@ subscription_create_msg(th_subscription_t *s)
htsmsg_add_str(m, "title", s->ths_title);
if(s->ths_channel != NULL)
htsmsg_add_str(m, "channel", s->ths_channel->ch_name);
htsmsg_add_str(m, "channel", s->ths_channel->ch_name ?: "");
if(s->ths_service != NULL)
htsmsg_add_str(m, "service", s->ths_service->s_nicename);
htsmsg_add_str(m, "service", s->ths_service->s_nicename ?: "");
return m;
}

View file

@ -24,6 +24,7 @@
extern struct th_subscription_list subscriptions;
#define SUBSCRIPTION_RAW_MPEGTS 0x1
#define SUBSCRIPTION_NONE 0x2
typedef struct th_subscription {
@ -73,11 +74,12 @@ typedef struct th_subscription {
struct service_instance_list ths_instances;
struct service_instance *ths_current_instance;
#ifdef TODO_NEED_A_BETTER_SOLUTION
// Ugly ugly ugly to refer DVB code here
LIST_ENTRY(th_subscription) ths_tdmi_link;
struct th_dvb_mux_instance *ths_tdmi;
#if ENABLE_MPEGTS
// Note: its a bit ugly linking MPEG-TS code directly here, but to do
// otherwise would probably require adding lots of additional
// (repeated) logic elsewhere
LIST_ENTRY(th_subscription) ths_mmi_link;
struct mpegts_mux_instance *ths_mmi;
#endif
} th_subscription_t;
@ -113,6 +115,19 @@ th_subscription_t *subscription_create_from_service(struct service *t,
const char *username,
const char *client);
#if ENABLE_MPEGTS
struct mpegts_mux;
th_subscription_t *subscription_create_from_mux
(struct mpegts_mux *m,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client);
#endif
th_subscription_t *subscription_create(int weight, const char *name,
streaming_target_t *st,
int flags, st_callback_t *cb,
@ -120,6 +135,7 @@ th_subscription_t *subscription_create(int weight, const char *name,
const char *username,
const char *client);
void subscription_change_weight(th_subscription_t *s, int weight);
void subscription_set_speed
@ -132,6 +148,8 @@ void subscription_stop(th_subscription_t *s);
void subscription_unlink_service(th_subscription_t *s, int reason);
void subscription_unlink_mux(th_subscription_t *s, int reason);
void subscription_dummy_join(const char *id, int first);
int subscriptions_active(void);