diff --git a/Makefile b/Makefile index 45e5c38a..40c242f2 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,8 @@ VPATH += dvb SRCS += dvb.c dvb_support.c dvb_fe.c dvb_tables.c \ diseqc.c dvb_adapter.c dvb_multiplex.c dvb_transport.c dvb_preconf.c +SRCS += serviceprobe.c + SRCS += cwc.c krypt.c VPATH += ffdecsa SRCS += FFdecsa.c diff --git a/channels.c b/channels.c index 5a43d09e..4cdad87d 100644 --- a/channels.c +++ b/channels.c @@ -338,7 +338,7 @@ channel_delete(channel_t *ch) tvhlog(LOG_NOTICE, "channels", "Channel \"%s\" deleted", ch->ch_name); - abort();//pvr_destroy_by_channel(ch); + fprintf(stderr, "!!!!!//pvr_destroy_by_channel(ch);\n"); while((t = LIST_FIRST(&ch->ch_transports)) != NULL) { transport_unmap_channel(t); @@ -352,7 +352,7 @@ channel_delete(channel_t *ch) epg_destroy_by_channel(ch); - abort();//autorec_destroy_by_channel(ch); + fprintf(stderr, "!!!!!//autorec_destroy_by_channel(ch);\n"); hts_settings_remove("channels/%d", ch->ch_id); @@ -388,7 +388,9 @@ channel_merge(channel_t *dst, channel_t *src) while((t = LIST_FIRST(&src->ch_transports)) != NULL) { transport_unmap_channel(t); transport_map_channel(t, dst); + pthread_mutex_lock(&t->tht_stream_mutex); t->tht_config_change(t); + pthread_mutex_unlock(&t->tht_stream_mutex); } channel_delete(src); diff --git a/dvb/dvb_tables.c b/dvb/dvb_tables.c index 1929f2e2..62944734 100644 --- a/dvb/dvb_tables.c +++ b/dvb/dvb_tables.c @@ -508,7 +508,9 @@ dvb_sdt_callback(th_dvb_mux_instance_t *tdmi, uint8_t *ptr, int len, free((void *)t->tht_svcname); t->tht_svcname = strdup(chname); + pthread_mutex_lock(&t->tht_stream_mutex); t->tht_config_change(t); + pthread_mutex_unlock(&t->tht_stream_mutex); } if(t->tht_chname == NULL) @@ -790,6 +792,7 @@ dvb_pmt_callback(th_dvb_mux_instance_t *tdmi, uint8_t *ptr, int len, uint8_t tableid, void *opaque) { th_transport_t *t = opaque; + pthread_mutex_lock(&t->tht_stream_mutex); psi_parse_pmt(t, ptr, len, 1); pthread_mutex_unlock(&t->tht_stream_mutex); diff --git a/main.c b/main.c index fc248305..200cf2d9 100644 --- a/main.c +++ b/main.c @@ -44,6 +44,8 @@ #include "dvb/dvb.h" #include "xmltv.h" #include "spawn.h" +#include "subscriptions.h" +#include "serviceprobe.h" #include #include @@ -247,6 +249,8 @@ main(int argc, char **argv) /** * Initialize subsystems */ + av_register_all(); + xmltv_init(); /* Must be initialized before channels */ channels_init(); @@ -261,6 +265,10 @@ main(int argc, char **argv) webui_init(); + subscriptions_init(); + + serviceprobe_init(); + pthread_mutex_unlock(&global_lock); diff --git a/parsers.c b/parsers.c index dfcf007c..ef0950c3 100644 --- a/parsers.c +++ b/parsers.c @@ -103,9 +103,19 @@ void parse_raw_mpeg(th_transport_t *t, th_stream_t *st, uint8_t *data, int len, int start, int err) { - - if(LIST_FIRST(&t->tht_muxers) == NULL) - return; /* No muxers will take packet, so drop here */ + th_subscription_t *s; + + if(LIST_FIRST(&t->tht_muxers) == NULL) { + /* No muxers. However, subscriptions may force demultiplex + for other reasons (serviceprobe does this) */ + LIST_FOREACH(s, &t->tht_subscriptions, ths_transport_link) + if(s->ths_force_demux) + break; + + if(s == NULL) + return; /* No-one is interested, so drop here */ + } + switch(st->st_type) { case HTSTV_MPEG2VIDEO: @@ -840,7 +850,7 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) /** * Input is ok */ - transport_signal_status(t, TRANSPORT_STATUS_OK); + transport_signal_status(t, SUBSCRIPTION_VALID_PACKETS); /* Alert all muxers tied to us that a new packet has arrived */ diff --git a/psi.c b/psi.c index 332b7a4c..b25046e4 100644 --- a/psi.c +++ b/psi.c @@ -562,8 +562,8 @@ psi_save_transport_settings(htsmsg_t *m, th_transport_t *t) htsmsg_add_u32(m, "disabled", !!t->tht_disabled); + lock_assert(&t->tht_stream_mutex); - pthread_mutex_lock(&t->tht_stream_mutex); LIST_FOREACH(st, &t->tht_streams, st_link) { sub = htsmsg_create(); @@ -581,7 +581,6 @@ psi_save_transport_settings(htsmsg_t *m, th_transport_t *t) htsmsg_add_msg(m, "stream", sub); } - pthread_mutex_unlock(&t->tht_stream_mutex); } diff --git a/serviceprobe.c b/serviceprobe.c index ff86e0b7..d0be3318 100644 --- a/serviceprobe.c +++ b/serviceprobe.c @@ -22,230 +22,159 @@ #include #include #include -#include -#include #include -#include -#include -#include + + + #include "tvhead.h" -#include "iptv_output.h" -#include "dispatch.h" #include "channels.h" #include "subscriptions.h" #include "serviceprobe.h" #include "transports.h" #include "mux.h" -static void serviceprobe_engage(void); -TAILQ_HEAD(sp_queue, sp); - -static struct sp_queue probequeue; - -static struct sp *sp_current; - -static dtimer_t sp_engage_timer; - -typedef struct sp { - TAILQ_ENTRY(sp) sp_link; - - th_muxer_t *sp_muxer; - th_transport_t *sp_t; - dtimer_t sp_timer; - - th_subscription_t *sp_s; - -} sp_t; - - -static void -sp_done_callback(void *aux, int64_t now) -{ - sp_t *sp = aux; - th_subscription_t *s = sp->sp_s; - - if(s != NULL) { - assert(sp == sp_current); - sp_current = NULL; - subscription_unsubscribe(s); - } - - serviceprobe_engage(); - - sp->sp_t->tht_sp = NULL; - - TAILQ_REMOVE(&probequeue, sp, sp_link); - free(sp); -} +/* List of transports to be probed, protected with global_lock */ +static struct th_transport_queue serviceprobe_queue; +static pthread_cond_t serviceprobe_cond; /** - * Got a packet, map it + * */ -static void -sp_packet_input(void *opaque, th_muxstream_t *tms, th_pkt_t *pkt) +void +serviceprobe_enqueue(th_transport_t *t) { - sp_t *sp = opaque; - th_transport_t *t = sp->sp_t; - channel_t *ch; + if(!transport_is_tv(t)) + return; /* Don't even consider non-tv channels */ - tvhlog(LOG_INFO, "serviceprobe", "Probed \"%s\" -- Ok", t->tht_svcname); - - if(t->tht_ch == NULL && t->tht_svcname != NULL) { - ch = channel_find_by_name(t->tht_svcname, 1); - transport_map_channel(t, ch); - - t->tht_config_change(t); - } - dtimer_arm(&sp->sp_timer, sp_done_callback, sp, 0); -} - -/** - * Callback when transport changes status - */ -static void -sp_status_callback(struct th_subscription *s, int status, void *opaque) -{ - sp_t *sp = opaque; - th_transport_t *t = sp->sp_t; - char *errtxt; - - s->ths_status_callback = NULL; - - switch(status) { - case TRANSPORT_STATUS_OK: + if(t->tht_sp_onqueue) return; - case TRANSPORT_STATUS_NO_DESCRAMBLER: - errtxt = "No descrambler for stream"; - break; - case TRANSPORT_STATUS_NO_ACCESS: - errtxt = "Access denied"; - break; - case TRANSPORT_STATUS_NO_INPUT: - errtxt = "No video detected"; - break; - default: - errtxt = "Other error"; - break; - } - - tvhlog(LOG_INFO, "serviceprobe", - "Probed \"%s\" -- %s", t->tht_svcname, errtxt); - dtimer_arm(&sp->sp_timer, sp_done_callback, sp, 0); + t->tht_sp_onqueue = 1; + TAILQ_INSERT_TAIL(&serviceprobe_queue, t, tht_sp_link); + pthread_cond_signal(&serviceprobe_cond); } /** - * Called when a subscription gets/loses access to a transport + * */ static void -sp_subscription_callback(struct th_subscription *s, - subscription_event_t event, void *opaque) +serviceprobe_callback(struct th_subscription *s, subscription_event_t event, + void *opaque) { - sp_t *sp = opaque; + th_transport_t *t = opaque; + channel_t *ch; + const char *errmsg; switch(event) { - case TRANSPORT_AVAILABLE: + case SUBSCRIPTION_TRANSPORT_RUN: + return; + + case SUBSCRIPTION_NO_INPUT: + errmsg = "No input detected"; break; - case TRANSPORT_UNAVAILABLE: - muxer_deinit(sp->sp_muxer, s); + case SUBSCRIPTION_NO_DESCRAMBLER: + errmsg = "No descrambler available"; break; + + case SUBSCRIPTION_NO_ACCESS: + errmsg = "Access denied"; + break; + + case SUBSCRIPTION_RAW_INPUT: + errmsg = "Unable to reassemble packets from input"; + break; + + case SUBSCRIPTION_VALID_PACKETS: + errmsg = NULL; /* All OK */ + break; + + case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE: + case SUBSCRIPTION_TRANSPORT_LOST: + errmsg = "Unable to probe"; + break; + + case SUBSCRIPTION_DESTROYED: + return; /* All done */ + + default: + abort(); } + + assert(t == TAILQ_FIRST(&serviceprobe_queue)); + + + if(errmsg != NULL) { + tvhlog(LOG_INFO, "serviceprobe", "%20s: skipped: %s", + t->tht_svcname, errmsg); + } else { + ch = channel_find_by_name(t->tht_svcname, 1); + transport_map_channel(t, ch); + + pthread_mutex_lock(&t->tht_stream_mutex); + t->tht_config_change(t); + pthread_mutex_unlock(&t->tht_stream_mutex); + + tvhlog(LOG_INFO, "serviceprobe", "\"%s\" mapped to channel \"%s\"", + t->tht_svcname, t->tht_svcname); + } + + t->tht_sp_onqueue = 0; + TAILQ_REMOVE(&serviceprobe_queue, t, tht_sp_link); + pthread_cond_signal(&serviceprobe_cond); } /** - * Setup IPTV (TS over UDP) output + * */ - -static void -serviceprobe_start(void *aux, int64_t now) +static void * +serviceprobe_thread(void *aux) { - th_subscription_t *s; - th_muxer_t *tm; th_transport_t *t; - sp_t *sp; + th_subscription_t *s; + int was_doing_work = 0; - assert(sp_current == NULL); + pthread_mutex_lock(&global_lock); - if((sp = TAILQ_FIRST(&probequeue)) == NULL) { - tvhlog(LOG_NOTICE, "serviceprobe", "Nothing more to probe"); - return; + while(1) { + + while((t = TAILQ_FIRST(&serviceprobe_queue)) == NULL) { + + if(was_doing_work) { + tvhlog(LOG_INFO, "serviceprobe", "Now idle"); + was_doing_work = 0; + } + pthread_cond_wait(&serviceprobe_cond, &global_lock); + } + + if(!was_doing_work) { + tvhlog(LOG_INFO, "serviceprobe", "Starting"); + } + + s = subscription_create_from_transport(t, "serviceprobe", + serviceprobe_callback, t); + s->ths_force_demux = 1; + + /* Wait for something to happen */ + while(TAILQ_FIRST(&serviceprobe_queue) == t) + pthread_cond_wait(&serviceprobe_cond, &global_lock); + + subscription_unsubscribe(s); + was_doing_work = 1; } - s = sp->sp_s = calloc(1, sizeof(th_subscription_t)); - t = sp->sp_t; - - sp_current = sp; - - s->ths_title = strdup("probe"); - s->ths_weight = INT32_MAX; - s->ths_opaque = sp; - s->ths_callback = sp_subscription_callback; - LIST_INSERT_HEAD(&subscriptions, s, ths_global_link); - - - if(t->tht_runstatus != TRANSPORT_RUNNING) - transport_start(t, INT32_MAX, 1); - - s->ths_transport = t; - LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link); - - sp->sp_muxer = tm = muxer_init(s, sp_packet_input, sp); - muxer_play(tm, AV_NOPTS_VALUE); - - s->ths_status_callback = sp_status_callback; -} - -/** - * - */ -static void -serviceprobe_engage(void) -{ - dtimer_arm(&sp_engage_timer, serviceprobe_start, NULL, 0); } /** * */ -void -serviceprobe_add(th_transport_t *t) -{ - sp_t *sp; - - if(!transport_is_tv(t)) - return; - - if(t->tht_sp != NULL) - return; - - sp = calloc(1, sizeof(sp_t)); - - TAILQ_INSERT_TAIL(&probequeue, sp, sp_link); - t->tht_sp = sp; - sp->sp_t = t; - - if(sp_current == NULL) - serviceprobe_engage(); -} - -/** - * - */ -void -serviceprobe_delete(th_transport_t *t) -{ - if(t->tht_sp == NULL) - return; - - sp_done_callback(t->tht_sp, 0); -} - - void -serviceprobe_setup(void) +serviceprobe_init(void) { - TAILQ_INIT(&probequeue); + pthread_t ptid; + pthread_cond_init(&serviceprobe_cond, NULL); + TAILQ_INIT(&serviceprobe_queue); + pthread_create(&ptid, NULL, serviceprobe_thread, NULL); } diff --git a/serviceprobe.h b/serviceprobe.h index 54774ead..bb1fba84 100644 --- a/serviceprobe.h +++ b/serviceprobe.h @@ -16,13 +16,11 @@ * along with this program. If not, see . */ -#ifndef SERVICE_PROBE_H_ -#define SERVICE_PROBE_H_ +#ifndef SERVICEPROBE_H_ +#define SERVICEPROBE_H_ -void serviceprobe_setup(void); +void serviceprobe_init(void); -void serviceprobe_add(th_transport_t *t); +void serviceprobe_enqueue(th_transport_t *t); -void serviceprobe_delete(th_transport_t *t); - -#endif /* SERVICE_PROBE_H_ */ +#endif /* SERVICEPROBE_H_ */ diff --git a/subscriptions.c b/subscriptions.c index b8b1bbe2..1bd06931 100644 --- a/subscriptions.c +++ b/subscriptions.c @@ -37,6 +37,10 @@ #include "subscriptions.h" struct th_subscription_list subscriptions; +static pthread_cond_t subscription_janitor_cond; +static pthread_mutex_t subscription_janitor_mutex; +static int subscription_janitor_work; +static gtimer_t subscription_reschedule_timer; static int @@ -48,14 +52,31 @@ subscription_sort(th_subscription_t *a, th_subscription_t *b) /** * */ -void -subscription_reschedule(void) +static void +subscription_link_transport(th_subscription_t *s, th_transport_t *t) +{ + s->ths_transport = t; + LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link); + s->ths_event_callback(s, SUBSCRIPTION_TRANSPORT_RUN, s->ths_opaque); + + s->ths_last_status = t->tht_last_status; + if(s->ths_last_status != SUBSCRIPTION_EVENT_INVALID) + s->ths_event_callback(s, s->ths_last_status, s->ths_opaque); +} + +/** + * + */ +static void +subscription_reschedule(void *aux) { th_subscription_t *s; th_transport_t *t; lock_assert(&global_lock); + gtimer_arm(&subscription_reschedule_timer, subscription_reschedule, NULL, 2); + LIST_FOREACH(s, &subscriptions, ths_global_link) { if(s->ths_transport != NULL) continue; /* Got a transport, we're happy */ @@ -65,12 +86,14 @@ subscription_reschedule(void) t = transport_find(s->ths_channel, s->ths_weight); - if(t == NULL) + if(t == NULL) { + /* No transport available */ + s->ths_event_callback(s, SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE, + s->ths_opaque); continue; + } - s->ths_transport = t; - LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link); - s->ths_callback(s, TRANSPORT_AVAILABLE, s->ths_opaque); + subscription_link_transport(s, t); } } @@ -95,7 +118,28 @@ subscription_unsubscribe(th_subscription_t *s) free(s->ths_title); free(s); - subscription_reschedule(); + subscription_reschedule(NULL); +} + +/** + * + */ +static th_subscription_t * +subscription_create(int weight, const char *name, + ths_event_callback_t *cb, void *opaque) +{ + th_subscription_t *s = calloc(1, sizeof(th_subscription_t)); + + s->ths_weight = weight; + s->ths_event_callback = cb; + s->ths_opaque = opaque; + s->ths_title = strdup(name); + s->ths_total_err = 0; + + time(&s->ths_start); + LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort); + + return s; } @@ -103,28 +147,17 @@ subscription_unsubscribe(th_subscription_t *s) * */ th_subscription_t * -subscription_create(channel_t *ch, unsigned int weight, const char *name, - subscription_callback_t *cb, void *opaque, uint32_t u32) +subscription_create_from_channel(channel_t *ch, + unsigned int weight, const char *name, + ths_event_callback_t *cb, void *opaque) { - th_subscription_t *s; - - s = calloc(1, sizeof(th_subscription_t)); - s->ths_callback = cb; - s->ths_opaque = opaque; - s->ths_title = strdup(name); - s->ths_total_err = 0; - s->ths_weight = weight; - s->ths_u32 = u32; - - time(&s->ths_start); - LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort); + th_subscription_t *s = subscription_create(weight, name, cb, opaque); s->ths_channel = ch; LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link); - s->ths_transport = NULL; - subscription_reschedule(); + subscription_reschedule(NULL); if(s->ths_transport == NULL) tvhlog(LOG_NOTICE, "subscription", @@ -136,12 +169,90 @@ subscription_create(channel_t *ch, unsigned int weight, const char *name, } +/** + * + */ +th_subscription_t * +subscription_create_from_transport(th_transport_t *t, const char *name, + ths_event_callback_t *cb, void *opaque) +{ + th_subscription_t *s = subscription_create(INT32_MAX, name, cb, opaque); + + if(t->tht_runstatus != TRANSPORT_RUNNING) + transport_start(t, INT32_MAX, 1); + + subscription_link_transport(s, t); + return s; +} + + +/** + * + */ +void +subscription_janitor_has_duty(void) +{ + pthread_mutex_lock(&subscription_janitor_mutex); + subscription_janitor_work++; + pthread_cond_signal(&subscription_janitor_cond); + pthread_mutex_unlock(&subscription_janitor_mutex); +} + + + +/** + * + */ +static void * +subscription_janitor(void *aux) +{ + int v; + th_subscription_t *s; + th_transport_t *t; + + pthread_mutex_lock(&subscription_janitor_mutex); + + v = subscription_janitor_work; + + while(1) { + + while(v == subscription_janitor_work) + pthread_cond_wait(&subscription_janitor_cond, + &subscription_janitor_mutex); + + v = subscription_janitor_work; + pthread_mutex_unlock(&subscription_janitor_mutex); + + pthread_mutex_lock(&global_lock); + + LIST_FOREACH(s, &subscriptions, ths_global_link) { + if((t = s->ths_transport) == NULL) + continue; + + if(s->ths_last_status != t->tht_last_status) { + s->ths_last_status = t->tht_last_status; + s->ths_event_callback(s, s->ths_last_status, s->ths_opaque); + } + } + + pthread_mutex_unlock(&global_lock); + } +} + + + + /** * */ void subscriptions_init(void) { + pthread_t ptid; + pthread_cond_init(&subscription_janitor_cond, NULL); + pthread_mutex_init(&subscription_janitor_mutex, NULL); + + pthread_create(&ptid, NULL, subscription_janitor, NULL); } diff --git a/subscriptions.h b/subscriptions.h index db2f8d65..23937a28 100644 --- a/subscriptions.h +++ b/subscriptions.h @@ -19,31 +19,14 @@ #ifndef SUBSCRIPTIONS_H #define SUBSCRIPTIONS_H +typedef void (ths_event_callback_t)(struct th_subscription *s, + subscription_event_t event, + void *opaque); - -/* - * Subscription - */ - -typedef enum { - TRANSPORT_AVAILABLE, - TRANSPORT_UNAVAILABLE, -} subscription_event_t; - -typedef void (subscription_callback_t)(struct th_subscription *s, - subscription_event_t event, - void *opaque); - -typedef void (subscription_raw_input_t)(struct th_subscription *s, - void *data, int len, - th_stream_t *st, - void *opaque); - - - -typedef void (subscription_status_callback_t)(struct th_subscription *s, - int status, - void *opaque); +typedef void (ths_raw_input_t)(struct th_subscription *s, + void *data, int len, + th_stream_t *st, + void *opaque); typedef struct th_subscription { LIST_ENTRY(th_subscription) ths_global_link; @@ -61,19 +44,17 @@ typedef struct th_subscription { LIST_ENTRY(th_subscription) ths_subscriber_link; /* Caller is responsible for this link */ + void *ths_opaque; char *ths_title; /* display title */ time_t ths_start; /* time when subscription started */ int ths_total_err; /* total errors during entire subscription */ - subscription_callback_t *ths_callback; - void *ths_opaque; - uint32_t ths_u32; + int ths_last_status; - subscription_raw_input_t *ths_raw_input; + ths_event_callback_t *ths_event_callback; + ths_raw_input_t *ths_raw_input; - th_muxer_t *ths_muxer; - - subscription_status_callback_t *ths_status_callback; + int ths_force_demux; } th_subscription_t; @@ -85,16 +66,22 @@ void subscription_unsubscribe(th_subscription_t *s); void subscription_set_weight(th_subscription_t *s, unsigned int weight); -th_subscription_t *subscription_create(channel_t *ch, unsigned int weight, - const char *name, - subscription_callback_t *cb, - void *opaque, - uint32_t u32); +th_subscription_t *subscription_create_from_channel(channel_t *ch, + unsigned int weight, + const char *name, + ths_event_callback_t *cb, + void *opaque); + + +th_subscription_t *subscription_create_from_transport(th_transport_t *t, + const char *name, + ths_event_callback_t *cb, + void *opaque); void subscriptions_init(void); void subscription_stop(th_subscription_t *s); -void subscription_reschedule(void); +void subscription_janitor_has_duty(void); #endif /* SUBSCRIPTIONS_H */ diff --git a/transports.c b/transports.c index 6a596e87..0532babf 100644 --- a/transports.c +++ b/transports.c @@ -49,7 +49,7 @@ static struct th_transport_list transporthash[TRANSPORT_HASH_WIDTH]; -//static void transport_data_timeout(void *aux, int64_t now); +static void transport_data_timeout(void *aux); //static dtimer_t transport_monitor_timer; @@ -67,7 +67,7 @@ transport_stop(th_transport_t *t) th_stream_t *st; th_pkt_t *pkt; - // dtimer_disarm(&t->tht_receive_timer); + gtimer_disarm(&t->tht_receive_timer); // dtimer_disarm(&transport_monitor_timer, transport_monitor, t, 1); @@ -79,6 +79,7 @@ transport_stop(th_transport_t *t) t->tht_tt_commercial_advice = COMMERCIAL_UNKNOWN; assert(LIST_FIRST(&t->tht_muxers) == NULL); + assert(LIST_FIRST(&t->tht_subscriptions) == NULL); pthread_mutex_lock(&t->tht_stream_mutex); @@ -145,9 +146,9 @@ transport_stop(th_transport_t *t) * */ static void -remove_subscriber(th_subscription_t *s) +remove_subscriber(th_subscription_t *s, subscription_event_t reason) { - s->ths_callback(s, TRANSPORT_UNAVAILABLE, s->ths_opaque); + s->ths_event_callback(s, reason, s->ths_opaque); LIST_REMOVE(s, ths_transport_link); s->ths_transport = NULL; } @@ -166,9 +167,9 @@ transport_remove_subscriber(th_transport_t *t, th_subscription_t *s) if(s == NULL) { while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL) - remove_subscriber(s); + remove_subscriber(s, SUBSCRIPTION_TRANSPORT_LOST); } else { - remove_subscriber(s); + remove_subscriber(s, SUBSCRIPTION_DESTROYED); } if(LIST_FIRST(&t->tht_subscriptions) == NULL) @@ -262,6 +263,7 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start) assert(st->st_ctx == NULL); assert(st->st_parser == NULL); + if(id != CODEC_ID_NONE) { c = avcodec_find_decoder(id); if(c != NULL) { @@ -273,8 +275,10 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start) } // cwc_transport_start(t); - // dtimer_arm(&t->tht_receive_timer, transport_data_timeout, t, 4); - transport_signal_status(t, TRANSPORT_STATUS_STARTING); + + t->tht_packets = 0; + gtimer_arm(&t->tht_receive_timer, transport_data_timeout, t, 4); + t->tht_last_status = SUBSCRIPTION_EVENT_INVALID; return 0; } @@ -441,7 +445,7 @@ transport_destroy(th_transport_t *t) lock_assert(&global_lock); while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL) - remove_subscriber(s); + remove_subscriber(s, SUBSCRIPTION_TRANSPORT_LOST); //dtimer_disarm(&t->tht_receive_timer); @@ -593,11 +597,25 @@ transport_unmap_channel(th_transport_t *t) } +/** + * + */ +static void +transport_data_timeout(void *aux) +{ + th_transport_t *t = aux; + + if(t->tht_last_status) + return; /* Something has happend so we don't have to update */ + + transport_signal_status(t, t->tht_packets ? SUBSCRIPTION_RAW_INPUT : + SUBSCRIPTION_NO_INPUT); +} + /** * */ - static struct strtab stypetab[] = { { "SDTV", ST_SDTV }, { "Radio", ST_RADIO }, @@ -644,7 +662,7 @@ transport_signal_status(th_transport_t *t, int newstatus) return; t->tht_last_status = newstatus; - //notify_transprot_status_change(t); + subscription_janitor_has_duty(); } @@ -652,18 +670,16 @@ transport_signal_status(th_transport_t *t, int newstatus) * Table for status -> text conversion */ static struct strtab transportstatustab[] = { - { "Unknown", TRANSPORT_STATUS_UNKNOWN }, - { "Starting", TRANSPORT_STATUS_STARTING }, - { "Ok", TRANSPORT_STATUS_OK }, - { "No input", TRANSPORT_STATUS_NO_INPUT }, - { "No descrambler", TRANSPORT_STATUS_NO_DESCRAMBLER }, - { "No access", TRANSPORT_STATUS_NO_ACCESS }, - { "Mux error", TRANSPORT_STATUS_MUX_ERROR }, + { "Ok", SUBSCRIPTION_VALID_PACKETS }, + { "No input", SUBSCRIPTION_NO_INPUT }, + { "No descrambler", SUBSCRIPTION_NO_DESCRAMBLER }, + { "No access", SUBSCRIPTION_NO_ACCESS }, }; const char * transport_status_to_text(int status) { - return val2str(status, transportstatustab) ?: "Invalid"; + return val2str(status, transportstatustab) ?: "Unknown"; } + diff --git a/tsdemux.c b/tsdemux.c index bc0a7fb7..3c5113a9 100644 --- a/tsdemux.c +++ b/tsdemux.c @@ -205,6 +205,10 @@ ts_recv_packet1(th_transport_t *t, uint8_t *tsb) if(st == NULL) return; + t->tht_packets = 1; + + pthread_mutex_lock(&t->tht_stream_mutex); + avgstat_add(&t->tht_rate, 188, dispatch_clock); /* Extract PCR */ @@ -220,22 +224,24 @@ ts_recv_packet1(th_transport_t *t, uint8_t *tsb) n++; r = td->td_descramble(td, t, st, tsb); - if(r == 0) + if(r == 0) { + pthread_mutex_unlock(&t->tht_stream_mutex); return; - + } + if(r == 1) m++; } if(n == 0) { - transport_signal_status(t, TRANSPORT_STATUS_NO_DESCRAMBLER); + transport_signal_status(t, SUBSCRIPTION_NO_DESCRAMBLER); } else if(m == n) { - transport_signal_status(t, TRANSPORT_STATUS_NO_ACCESS); + transport_signal_status(t, SUBSCRIPTION_NO_ACCESS); } - return; + } else { + ts_recv_packet0(t, st, tsb); } - - ts_recv_packet0(t, st, tsb); + pthread_mutex_unlock(&t->tht_stream_mutex); } diff --git a/tvhead.h b/tvhead.h index 8b44408e..d4b6b585 100644 --- a/tvhead.h +++ b/tvhead.h @@ -357,7 +357,48 @@ typedef struct th_stream { -/* +/** + * Transport events, these are sent to subscribers via + * s->ths_event_callback + */ +typedef enum { + + SUBSCRIPTION_EVENT_INVALID = 0, /* mbz */ + + /** Transport is receiving data from source */ + SUBSCRIPTION_TRANSPORT_RUN, + + /** No input is received from source */ + SUBSCRIPTION_NO_INPUT, + + /** No descrambler is able to decrypt the stream */ + SUBSCRIPTION_NO_DESCRAMBLER, + + /** Potential descrambler is available, but access is denied */ + SUBSCRIPTION_NO_ACCESS, + + /** Raw input seen but nothing has really been decoded */ + SUBSCRIPTION_RAW_INPUT, + + /** Packet are being parsed. Only signalled if at least one muxer is + registerd */ + SUBSCRIPTION_VALID_PACKETS, + + /** No transport is available for delivering subscription */ + SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE, + + /** Transport no longer runs, it was needed by someone with higher + priority */ + SUBSCRIPTION_TRANSPORT_LOST, + + /** Subscription destroyed */ + SUBSCRIPTION_DESTROYED, + +} subscription_event_t; + + + +/** * A Transport (or in MPEG TS terms: a 'service') */ typedef struct th_transport { @@ -517,14 +558,23 @@ typedef struct th_transport { */ int tht_last_status; -#define TRANSPORT_STATUS_UNKNOWN 0 -#define TRANSPORT_STATUS_STARTING 1 -#define TRANSPORT_STATUS_OK 2 -#define TRANSPORT_STATUS_NO_INPUT 3 -#define TRANSPORT_STATUS_NO_DESCRAMBLER 4 -#define TRANSPORT_STATUS_NO_ACCESS 5 -#define TRANSPORT_STATUS_MUX_ERROR 6 + /** + * Service probe, see serviceprobe.c for details + */ + int tht_sp_onqueue; + TAILQ_ENTRY(th_transport) tht_sp_link; + /** + * Timer which is armed at transport start. Once it fires + * it will check if any packets has been parsed. If not the status + * will be set to SUBSCRIPTION_NO_INPUT + */ + gtimer_t tht_receive_timer; + + /** + * Set as soon as we get some kind of activity + */ + int tht_packets; /********************************************************* * @@ -776,8 +826,6 @@ typedef struct th_muxer { struct th_muxstream_list tm_streams; - struct th_subscription *tm_subscription; - th_mux_output_t *tm_output; void *tm_opaque; diff --git a/webui/extjs.c b/webui/extjs.c index ef505f7f..4b63ee14 100644 --- a/webui/extjs.c +++ b/webui/extjs.c @@ -392,8 +392,8 @@ extjs_dvbadapter(http_connection_t *hc, const char *remain, void *opaque) const char *s = http_arg_get(&hc->hc_req_args, "adapterId"); const char *op = http_arg_get(&hc->hc_req_args, "op"); th_dvb_adapter_t *tda = s ? dvb_adapter_find_by_identifier(s) : NULL; - //th_dvb_mux_instance_t *tdmi; - // th_transport_t *t; + th_dvb_mux_instance_t *tdmi; + th_transport_t *t; htsmsg_t *r, *out; @@ -433,13 +433,13 @@ extjs_dvbadapter(http_connection_t *hc, const char *remain, void *opaque) tvhlog(LOG_NOTICE, "web interface", "Service probe started on \"%s\"", tda->tda_displayname); -#if 0 + RB_FOREACH(tdmi, &tda->tda_muxes, tdmi_adapter_link) { LIST_FOREACH(t, &tdmi->tdmi_transports, tht_mux_link) { - serviceprobe_add(t); + serviceprobe_enqueue(t); } } -#endif + out = htsmsg_create(); htsmsg_add_u32(out, "success", 1);