subscription: reworked the subscription handling to avoid re-entrancy

This is particularly important for null susbcriptions that receive no
stream data and therefore handle the SMT_STOP inline (actually was
direct before). This causes list corruption, therefore these subs
are now stored to a secondary list that are cleaned up within the
standard rescheduler.
This commit is contained in:
Adam Sutton 2013-09-10 13:50:04 +01:00
parent dfa5fc994e
commit e857acca84
7 changed files with 323 additions and 278 deletions

View file

@ -180,8 +180,6 @@ struct epggrab_ota_mux
int om_interval;
time_t om_when; ///< Next event time
struct th_subscription *om_sub;
LIST_ENTRY(epggrab_ota_mux) om_q_link;
RB_ENTRY(epggrab_ota_mux) om_global_link;
};

View file

@ -109,12 +109,6 @@ epggrab_ota_done ( epggrab_ota_mux_t *ota, int timeout )
ota->om_when = dispatch_clock + epggrab_ota_period(ota);
LIST_INSERT_SORTED(&epggrab_ota_pending, ota, om_q_link, om_time_cmp);
/* Remove subscription */
if (ota->om_sub) {
subscription_unsubscribe(ota->om_sub);
ota->om_sub = NULL;
}
/* Re-arm */
if (LIST_FIRST(&epggrab_ota_pending) == ota)
epggrab_ota_pending_timer_cb(NULL);
@ -179,10 +173,6 @@ epggrab_mux_stop ( mpegts_mux_t *mm, void *p )
epggrab_ota_done(ota, 0);
}
/* **************************************************************************
* Completion handling
* *************************************************************************/
/* **************************************************************************
* Module methods
* *************************************************************************/
@ -244,6 +234,7 @@ epggrab_ota_complete
{
int done = 1;
epggrab_ota_map_t *map;
mpegts_mux_t *mm;
tvhinfo(mod->id, "grab complete");
/* Test for completion */
@ -256,6 +247,10 @@ epggrab_ota_complete
}
if (!done) return;
/* Remove subscriber */
if ((mm = mpegts_mux_find(ota->om_mux_uuid)))
mpegts_mux_unsubscribe_by_name(mm, "epggrab");
/* Done */
epggrab_ota_done(ota, 0);
}
@ -294,13 +289,11 @@ epggrab_ota_pending_timer_cb ( void *p )
epggrab_ota_map_t *map;
epggrab_ota_mux_t *om = LIST_FIRST(&epggrab_ota_pending);
mpegts_mux_t *mm;
th_subscription_t *s;
gtimer_disarm(&epggrab_ota_pending_timer);
lock_assert(&global_lock);
if (!om)
return;
assert(om->om_sub == NULL);
/* Double check */
if (om->om_when > dispatch_clock)
@ -327,10 +320,7 @@ epggrab_ota_pending_timer_cb ( void *p )
epggrab_ota_start(om);
/* Subscribe to the mux */
// TODO: remove hardcoded weight
s = subscription_create_from_mux(mm, 2, "epggrab", NULL,
SUBSCRIPTION_NONE, NULL, NULL, NULL);
if (!(om->om_sub = s)) {
if (mpegts_mux_subscribe(mm, "epggrab", 2)) {
LIST_REMOVE(om, om_q_link);
om->om_active = 0;
om->om_when = dispatch_clock + epggrab_ota_period(om) / 2;

View file

@ -23,6 +23,7 @@
#include "input.h"
#include "service.h"
#include "mpegts/dvb.h"
#include "subscriptions.h"
#define MPEGTS_ONID_NONE 0xFFFF
#define MPEGTS_TSID_NONE 0xFFFF
@ -535,7 +536,7 @@ mpegts_mux_t *mpegts_mux_create0
mn, onid, tsid, conf)
#define mpegts_mux_find(u)\
idnode_find(u, &mpegts_mux_class);
idnode_find(u, &mpegts_mux_class)
#define mpegts_mux_delete_by_uuid(u)\
{ mpegts_mux_t *mm = mpegts_mux_find(u); if (mm) mm->mm_delete(mm); }
@ -566,6 +567,10 @@ int mpegts_mux_set_crid_authority ( mpegts_mux_t *mm, const char *defauth );
void mpegts_mux_open_table ( mpegts_mux_t *mm, mpegts_table_t *mt );
void mpegts_mux_close_table ( mpegts_mux_t *mm, mpegts_table_t *mt );
void mpegts_mux_remove_subscriber(mpegts_mux_t *mm, th_subscription_t *s, int reason);
int mpegts_mux_subscribe(mpegts_mux_t *mm, const char *name, int weight);
void mpegts_mux_unsubscribe_by_name(mpegts_mux_t *mm, const char *name);
size_t mpegts_input_recv_packets
(mpegts_input_t *mi, mpegts_mux_instance_t *mmi, uint8_t *tsb, size_t len,
int64_t *pcr, uint16_t *pcr_pid, const char *name);

View file

@ -501,8 +501,6 @@ mpegts_mux_initial_scan_done ( mpegts_mux_t *mm )
{
char buf[256];
mpegts_network_t *mn = mm->mm_network;
mpegts_mux_instance_t *mmi;
th_subscription_t *s;
/* Stop */
mm->mm_display_name(mm, buf, sizeof(buf));
@ -515,11 +513,7 @@ mpegts_mux_initial_scan_done ( mpegts_mux_t *mm )
mpegts_network_schedule_initial_scan(mn);
/* Unsubscribe */
// TODO: might be better to make a slightly more concrate link here
LIST_FOREACH(mmi, &mm->mm_instances, mmi_mux_link)
LIST_FOREACH(s, &mmi->mmi_subs, ths_mmi_link)
if (!strcmp(s->ths_title, "initscan"))
subscription_unsubscribe(s);
mpegts_mux_unsubscribe_by_name(mm, "initscan");
/* Save */
mm->mm_initial_scan_done = 1;
@ -630,6 +624,42 @@ mpegts_mux_set_crid_authority ( mpegts_mux_t *mm, const char *defauth )
return 1;
}
/* **************************************************************************
* Subscriptions
* *************************************************************************/
void
mpegts_mux_remove_subscriber
( mpegts_mux_t *mm, th_subscription_t *s, int reason )
{
subscription_unlink_mux(s, reason);
mm->mm_stop(mm, 0);
}
int
mpegts_mux_subscribe
( mpegts_mux_t *mm, const char *name, int weight )
{
th_subscription_t *s;
s = subscription_create_from_mux(mm, weight, name, NULL,
SUBSCRIPTION_NONE,
NULL, NULL, NULL);
return s != NULL ? 0 : 1;
}
void
mpegts_mux_unsubscribe_by_name
( mpegts_mux_t *mm, const char *name )
{
mpegts_mux_instance_t *mmi;
th_subscription_t *s;
LIST_FOREACH(mmi, &mm->mm_instances, mmi_mux_link)
LIST_FOREACH(s, &mmi->mmi_subs, ths_mmi_link)
if (!strcmp(s->ths_title, "initscan"))
subscription_unsubscribe(s);
}
/* **************************************************************************
* Search
* *************************************************************************/

View file

@ -238,14 +238,12 @@ mpegts_network_initial_scan(void *aux)
{
mpegts_network_t *mn = aux;
mpegts_mux_t *mm;
th_subscription_t *s;
tvhtrace("mpegts", "setup initial scan for %p", mn);
while((mm = TAILQ_FIRST(&mn->mn_initial_scan_pending_queue)) != NULL) {
assert(mm->mm_initial_scan_status == MM_SCAN_PENDING);
s = subscription_create_from_mux(mm, 1, "initscan", NULL,
SUBSCRIPTION_NONE, NULL, NULL, NULL);
if (!s) break;
if (mpegts_mux_subscribe(mm, "initscan", 1))
break;
assert(mm->mm_initial_scan_status == MM_SCAN_CURRENT);
}
gtimer_arm(&mn->mn_initial_scan_timer, mpegts_network_initial_scan, mn, 10);

View file

@ -44,7 +44,8 @@
#endif
struct th_subscription_list subscriptions;
static gtimer_t subscription_reschedule_timer;
struct th_subscription_list subscriptions_remove;
static gtimer_t subscription_reschedule_timer;
/**
*
@ -55,17 +56,9 @@ subscriptions_active(void)
return LIST_FIRST(&subscriptions) != NULL;
}
/**
*
*/
static int
subscription_sort(th_subscription_t *a, th_subscription_t *b)
{
return b->ths_weight - a->ths_weight;
}
/* **************************************************************************
* Subscription linking
* *************************************************************************/
/**
* The service is producing output.
@ -111,7 +104,6 @@ subscription_link_service(th_subscription_t *s, service_t *t)
pthread_mutex_unlock(&t->s_stream_mutex);
}
/**
* Called from service code
*/
@ -150,12 +142,10 @@ subscription_unlink_mux(th_subscription_t *s, int reason)
pthread_mutex_lock(&mmi->mmi_input->mi_delivery_mutex);
if (!(s->ths_flags & SUBSCRIPTION_NONE)) {
streaming_target_disconnect(&mmi->mmi_streaming_pad, &s->ths_input);
streaming_target_disconnect(&mmi->mmi_streaming_pad, &s->ths_input);
sm = streaming_msg_create_code(SMT_STOP, reason);
streaming_target_deliver(s->ths_output, sm);
}
sm = streaming_msg_create_code(SMT_STOP, reason);
streaming_target_deliver(s->ths_output, sm);
s->ths_mmi = NULL;
LIST_REMOVE(s, ths_mmi_link);
@ -163,6 +153,20 @@ subscription_unlink_mux(th_subscription_t *s, int reason)
pthread_mutex_unlock(&mmi->mmi_input->mi_delivery_mutex);
}
/* **************************************************************************
* Scheduling
* *************************************************************************/
/**
*
*/
static int
subscription_sort(th_subscription_t *a, th_subscription_t *b)
{
return b->ths_weight - a->ths_weight;
}
/**
*
*/
@ -184,21 +188,17 @@ subscription_reschedule(void)
service_instance_t *si;
streaming_message_t *sm;
int error;
if (reenter) return;
assert(reenter == 0);
reenter = 1;
lock_assert(&global_lock);
gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, 2);
subscription_reschedule_cb, NULL, 2);
LIST_FOREACH(s, &subscriptions, ths_global_link) {
if (s->ths_mmi) continue;
if (!s->ths_service && !s->ths_channel) continue;
#if 0
if(s->ths_channel == NULL)
continue; /* stale entry, channel has been destroyed */
#endif
if(s->ths_service != NULL && s->ths_current_instance != NULL) {
/* Already got a service */
@ -231,67 +231,58 @@ subscription_reschedule(void)
subscription_link_service(s, si->si_s);
}
while ((s = LIST_FIRST(&subscriptions_remove))) {
LIST_REMOVE(s, ths_remove_link);
subscription_unsubscribe(s);
}
reenter = 0;
}
/* **************************************************************************
* Streaming handlers
* *************************************************************************/
/**
* NULL handlers for fake subs
*
* These require no data, though expect to receive the stop command
*/
static void
subscription_input_null(void *opaque, streaming_message_t *sm)
{
if (sm->sm_type == SMT_STOP) {
th_subscription_t *s = opaque;
LIST_INSERT_HEAD(&subscriptions_remove, s, ths_remove_link);
gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, 0);
}
streaming_msg_free(sm);
}
/**
*
*/
static void
subscription_unsubscribe0(th_subscription_t *s, int silent)
static void
subscription_input_direct(void *opauqe, streaming_message_t *sm)
{
service_t *t = s->ths_service;
service_instance_t *si = s->ths_current_instance;
th_subscription_t *s = opauqe;
lock_assert(&global_lock);
service_instance_list_clear(&s->ths_instances);
LIST_REMOVE(s, ths_global_link);
if (!silent) {
if(s->ths_channel != NULL) {
LIST_REMOVE(s, ths_channel_link);
tvhlog(LOG_INFO, "subscription", "\"%s\" unsubscribing from \"%s\"",
s->ths_title, s->ths_channel->ch_name);
} else {
tvhlog(LOG_INFO, "subscription", "\"%s\" unsubscribing",
s->ths_title);
}
/* Log data and errors */
if(sm->sm_type == SMT_PACKET) {
th_pkt_t *pkt = sm->sm_data;
if(pkt->pkt_err)
s->ths_total_err++;
s->ths_bytes += pkt->pkt_payload->pb_size;
} else if(sm->sm_type == SMT_MPEGTS) {
pktbuf_t *pb = sm->sm_data;
s->ths_bytes += pb->pb_size;
}
if(si != NULL)
service_remove_subscriber(t, s, SM_CODE_OK);
#if ENABLE_MPEGTS
if(s->ths_mmi) {
mpegts_mux_t *mm = s->ths_mmi->mmi_mux;
subscription_unlink_mux(s, SM_CODE_SUBSCRIPTION_OVERRIDDEN);
if (mm)
mm->mm_stop(mm, 0);
}
#endif
if(s->ths_start_message != NULL)
streaming_msg_free(s->ths_start_message);
free(s->ths_title);
free(s->ths_hostname);
free(s->ths_username);
free(s->ths_client);
free(s);
if (!silent) {
subscription_reschedule();
notify_reload("subscriptions");
}
}
void
subscription_unsubscribe(th_subscription_t *s)
{
subscription_unsubscribe0(s, 0);
/* Pass to output */
streaming_target_deliver(s->ths_output, sm);
}
/**
@ -308,7 +299,7 @@ subscription_input(void *opauqe, streaming_message_t *sm)
if(sm->sm_type == SMT_START) {
if(s->ths_start_message != NULL)
streaming_msg_free(s->ths_start_message);
streaming_msg_free(s->ths_start_message);
s->ths_start_message = sm;
return;
}
@ -326,8 +317,8 @@ subscription_input(void *opauqe, streaming_message_t *sm)
if(sm->sm_type == SMT_SERVICE_STATUS &&
sm->sm_code & TSS_PACKETS) {
if(s->ths_start_message != NULL) {
streaming_target_deliver(s->ths_output, s->ths_start_message);
s->ths_start_message = NULL;
streaming_target_deliver(s->ths_output, s->ths_start_message);
s->ths_start_message = NULL;
}
s->ths_state = SUBSCRIPTION_GOT_SERVICE;
}
@ -338,65 +329,91 @@ subscription_input(void *opauqe, streaming_message_t *sm)
return;
}
if(sm->sm_type == SMT_PACKET) {
th_pkt_t *pkt = sm->sm_data;
if(pkt->pkt_err)
s->ths_total_err++;
s->ths_bytes += pkt->pkt_payload->pb_size;
} else if(sm->sm_type == SMT_MPEGTS) {
pktbuf_t *pb = sm->sm_data;
s->ths_bytes += pb->pb_size;
}
streaming_target_deliver(s->ths_output, sm);
/* Pass to direct handler to log traffic */
subscription_input_direct(s, sm);
}
/* **************************************************************************
* Destroy subscriptions
* *************************************************************************/
/**
*
* Delete
*/
static void
subscription_input_direct(void *opauqe, streaming_message_t *sm)
void
subscription_unsubscribe(th_subscription_t *s)
{
th_subscription_t *s = opauqe;
service_t *t = s->ths_service;
service_instance_t *si = s->ths_current_instance;
if(sm->sm_type == SMT_PACKET) {
th_pkt_t *pkt = sm->sm_data;
if(pkt->pkt_err)
s->ths_total_err++;
s->ths_bytes += pkt->pkt_payload->pb_size;
} else if(sm->sm_type == SMT_MPEGTS) {
pktbuf_t *pb = sm->sm_data;
s->ths_bytes += pb->pb_size;
lock_assert(&global_lock);
service_instance_list_clear(&s->ths_instances);
LIST_REMOVE(s, ths_global_link);
if(s->ths_channel != NULL) {
LIST_REMOVE(s, ths_channel_link);
tvhlog(LOG_INFO, "subscription", "\"%s\" unsubscribing from \"%s\"",
s->ths_title, s->ths_channel->ch_name);
} else {
tvhlog(LOG_INFO, "subscription", "\"%s\" unsubscribing",
s->ths_title);
}
streaming_target_deliver(s->ths_output, sm);
if(si != NULL)
service_remove_subscriber(t, s, SM_CODE_OK);
#if ENABLE_MPEGTS
if(s->ths_mmi)
mpegts_mux_remove_subscriber(s->ths_mmi->mmi_mux, s, SM_CODE_OK);
#endif
if(s->ths_start_message != NULL)
streaming_msg_free(s->ths_start_message);
free(s->ths_title);
free(s->ths_hostname);
free(s->ths_username);
free(s->ths_client);
free(s);
gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, 0);
notify_reload("subscriptions");
}
/* **************************************************************************
* Create subscriptions
* *************************************************************************/
/**
*
/*
* Generic handler for all susbcription creation
*/
th_subscription_t *
subscription_create(int weight, const char *name, streaming_target_t *st,
int flags, st_callback_t *cb, const char *hostname,
const char *username, const char *client)
subscription_create
(int weight, const char *name, streaming_target_t *st,
int flags, st_callback_t *cb, const char *hostname,
const char *username, const char *client)
{
th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
int reject = 0;
static int tally;
if (flags & SUBSCRIPTION_NONE)
reject |= -1;
if(flags & SUBSCRIPTION_NONE)
reject |= (SMT_TO_MASK(SMT_PACKET) | SMT_TO_MASK(SMT_MPEGTS));
else if(flags & SUBSCRIPTION_RAW_MPEGTS)
reject |= SMT_TO_MASK(SMT_PACKET); // Reject parsed frames
else
reject |= SMT_TO_MASK(SMT_MPEGTS); // Reject raw mpegts
// TODO: possibly we don't connect anything for SUB_NONE
streaming_target_init(&s->ths_input,
cb ?: subscription_input_direct, s, reject);
if (!cb) cb = subscription_input_direct;
if (!st) {
st = calloc(1, sizeof(streaming_target_t));
streaming_target_init(st, subscription_input_null, s, 0);
}
streaming_target_init(&s->ths_input, cb, s, reject);
s->ths_weight = weight;
s->ths_title = strdup(name);
@ -413,10 +430,13 @@ subscription_create(int weight, const char *name, streaming_target_t *st,
LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort);
gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, 0);
notify_reload("subscriptions");
return s;
}
/**
*
*/
@ -429,18 +449,19 @@ subscription_create_from_channel_or_service
{
th_subscription_t *s;
assert(!ch || !t);
assert(st);
if (ch)
tvhtrace("subscription", "creating subscription for %s weight %d",
ch->ch_name, weight);
s = subscription_create(weight, name, st, flags, subscription_input,
hostname, username, client);
hostname, username, client);
s->ths_channel = ch;
s->ths_service = t;
if (ch)
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
s->ths_service = t;
// TODO: do we really need this here?
subscription_reschedule();
if(s->ths_service == NULL) {
@ -466,7 +487,7 @@ subscription_create_from_channel_or_service
service_source_info_free(&si);
}
notify_reload("subscriptions");
return s;
}
@ -541,145 +562,58 @@ subscription_create_from_mux
streaming_start_t *ss;
int r;
if (!flags)
flags = SUBSCRIPTION_RAW_MPEGTS;
s = subscription_create(weight, name, st, flags,
NULL, hostname, username, client);
/* Tune */
r = mm->mm_start(mm, s->ths_title, weight);
/* Failed */
if (r) {
subscription_unsubscribe0(s, 1);
r = mm->mm_start(mm, name, weight);
if (r)
return NULL;
}
/* Create subscription */
s = subscription_create(weight, name, st, flags, NULL,
hostname, username, client);
s->ths_mmi = mm->mm_active;
pthread_mutex_lock(&s->ths_mmi->mmi_input->mi_delivery_mutex);
/* Store */
LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link);
/* Connect (not for NONE streams) */
if (!(flags & SUBSCRIPTION_NONE)) {
streaming_target_connect(&s->ths_mmi->mmi_streaming_pad, &s->ths_input);
/* Deliver a start message */
ss = calloc(1, sizeof(streaming_start_t));
ss->ss_num_components = 0;
ss->ss_refcount = 1;
/* Connect */
streaming_target_connect(&s->ths_mmi->mmi_streaming_pad, &s->ths_input);
/* Deliver a start message */
ss = calloc(1, sizeof(streaming_start_t));
ss->ss_num_components = 0;
ss->ss_refcount = 1;
mpegts_mux_setsourceinfo(mm, &ss->ss_si);
ss->ss_si.si_service = strdup("rawmux");
sm = streaming_msg_create_data(SMT_START, ss);
streaming_target_deliver(s->ths_output, sm);
mpegts_mux_setsourceinfo(mm, &ss->ss_si);
ss->ss_si.si_service = strdup("rawmux");
tvhinfo("subscription",
"'%s' subscribing to mux, weight: %d, adapter: '%s', "
"network: '%s', mux: '%s'",
s->ths_title,
s->ths_weight,
ss->ss_si.si_adapter ?: "<N/A>",
ss->ss_si.si_network ?: "<N/A>",
ss->ss_si.si_mux ?: "<N/A>");
}
tvhinfo("subscription",
"'%s' subscribing to mux, weight: %d, adapter: '%s', "
"network: '%s', mux: '%s'",
s->ths_title,
s->ths_weight,
ss->ss_si.si_adapter ?: "<N/A>",
ss->ss_si.si_network ?: "<N/A>",
ss->ss_si.si_mux ?: "<N/A>");
sm = streaming_msg_create_data(SMT_START, ss);
streaming_target_deliver(s->ths_output, sm);
pthread_mutex_unlock(&s->ths_mmi->mmi_input->mi_delivery_mutex);
notify_reload("subscriptions");
return s;
}
#endif
/**
*
*/
void
subscription_change_weight(th_subscription_t *s, int weight)
{
if(s->ths_weight == weight)
return;
/* **************************************************************************
* Status monitoring
* *************************************************************************/
LIST_REMOVE(s, ths_global_link);
static gtimer_t subscription_status_timer;
s->ths_weight = weight;
LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort);
subscription_reschedule();
}
/**
*
*/
static void
dummy_callback(void *opauqe, streaming_message_t *sm)
{
switch(sm->sm_type) {
case SMT_START:
fprintf(stderr, "dummysubscription START\n");
break;
case SMT_STOP:
fprintf(stderr, "dummysubscription STOP\n");
break;
case SMT_SERVICE_STATUS:
fprintf(stderr, "dummsubscription: %x\n", sm->sm_code);
break;
default:
break;
}
streaming_msg_free(sm);
}
static gtimer_t dummy_sub_timer;
/**
*
*/
static void
dummy_retry(void *opaque)
{
subscription_dummy_join(opaque, 0);
free(opaque);
}
/**
*
*/
void
subscription_dummy_join(const char *id, int first)
{
service_t *t = service_find_by_identifier(id);
streaming_target_t *st;
if(first) {
gtimer_arm(&dummy_sub_timer, dummy_retry, strdup(id), 2);
return;
}
if(t == NULL) {
tvhlog(LOG_ERR, "subscription",
"Unable to dummy join %s, service not found, retrying...", id);
gtimer_arm(&dummy_sub_timer, dummy_retry, strdup(id), 1);
return;
}
st = calloc(1, sizeof(streaming_target_t));
streaming_target_init(st, dummy_callback, NULL, 0);
subscription_create_from_service(t, 1, "dummy", st, 0, NULL, NULL, "dummy");
tvhlog(LOG_NOTICE, "subscription",
"Dummy join %s ok", id);
}
/**
*
/*
* Serialize info about subscription
*/
htsmsg_t *
subscription_create_msg(th_subscription_t *s)
@ -732,22 +666,19 @@ subscription_create_msg(th_subscription_t *s)
return m;
}
static gtimer_t every_sec;
/**
*
* Check status (bandwidth, errors, etc.)
*/
static void
every_sec_cb(void *aux)
subscription_status_callback ( void *p )
{
th_subscription_t *s;
gtimer_arm(&every_sec, every_sec_cb, NULL, 1);
gtimer_arm(&subscription_status_timer,
subscription_status_callback, NULL, 1);
LIST_FOREACH(s, &subscriptions, ths_global_link) {
int errors = s->ths_total_err;
int bw = atomic_exchange(&s->ths_bytes, 0);
int errors = s->ths_total_err;
int bw = atomic_exchange(&s->ths_bytes, 0);
htsmsg_t *m = subscription_create_msg(s);
htsmsg_delete_field(m, "errors");
htsmsg_add_u32(m, "errors", errors);
@ -757,14 +688,35 @@ every_sec_cb(void *aux)
}
}
/**
*
* Initialise subsystem
*/
void
subscription_init(void)
{
gtimer_arm(&every_sec, every_sec_cb, NULL, 1);
subscription_status_callback(NULL);
}
/* **************************************************************************
* Subscription control
* *************************************************************************/
/**
* Change weight
*/
void
subscription_change_weight(th_subscription_t *s, int weight)
{
if(s->ths_weight == weight)
return;
LIST_REMOVE(s, ths_global_link);
s->ths_weight = weight;
LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort);
gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, 0);
}
/**
@ -808,3 +760,74 @@ subscription_set_skip ( th_subscription_t *s, const streaming_skip_t *skip )
pthread_mutex_unlock(&t->s_stream_mutex);
}
/* **************************************************************************
* Dummy subscription - testing
* *************************************************************************/
/**
*
*/
static void
dummy_callback(void *opauqe, streaming_message_t *sm)
{
switch(sm->sm_type) {
case SMT_START:
fprintf(stderr, "dummysubscription START\n");
break;
case SMT_STOP:
fprintf(stderr, "dummysubscription STOP\n");
break;
case SMT_SERVICE_STATUS:
fprintf(stderr, "dummsubscription: %x\n", sm->sm_code);
break;
default:
break;
}
streaming_msg_free(sm);
}
static gtimer_t dummy_sub_timer;
/**
*
*/
static void
dummy_retry(void *opaque)
{
subscription_dummy_join(opaque, 0);
free(opaque);
}
/**
*
*/
void
subscription_dummy_join(const char *id, int first)
{
service_t *t = service_find_by_identifier(id);
streaming_target_t *st;
if(first) {
gtimer_arm(&dummy_sub_timer, dummy_retry, strdup(id), 2);
return;
}
if(t == NULL) {
tvhlog(LOG_ERR, "subscription",
"Unable to dummy join %s, service not found, retrying...", id);
gtimer_arm(&dummy_sub_timer, dummy_retry, strdup(id), 1);
return;
}
st = calloc(1, sizeof(streaming_target_t));
streaming_target_init(st, dummy_callback, NULL, 0);
subscription_create_from_service(t, 1, "dummy", st, 0, NULL, NULL, "dummy");
tvhlog(LOG_NOTICE, "subscription",
"Dummy join %s ok", id);
}

View file

@ -31,6 +31,7 @@ typedef struct th_subscription {
int ths_id;
LIST_ENTRY(th_subscription) ths_global_link;
LIST_ENTRY(th_subscription) ths_remove_link;
int ths_weight;
enum {