From 9b4165296b74102dffc4726a7c563333f52fb63e Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Sun, 27 Jul 2014 16:18:53 +0200 Subject: [PATCH] subscription: Add watchdog for the data reception .. for sevices and add the timeout timer for the mux subscriptions --- src/input/mpegts.h | 3 ++- src/input/mpegts/mpegts_input.c | 4 ++++ src/input/mpegts/tsdemux.c | 1 + src/parsers/parsers.c | 1 + src/service.c | 21 +++++++++++++++++---- src/service.h | 9 +++++++++ src/subscriptions.c | 31 +++++++++++++++++++++++++++++-- src/subscriptions.h | 1 + 8 files changed, 64 insertions(+), 7 deletions(-) diff --git a/src/input/mpegts.h b/src/input/mpegts.h index bcd56c00..292d2527 100644 --- a/src/input/mpegts.h +++ b/src/input/mpegts.h @@ -538,7 +538,8 @@ struct mpegts_input * Input processing */ - int mi_running; + uint8_t mi_running; + uint8_t mi_live; time_t mi_last_dispatch; /* Data input */ diff --git a/src/input/mpegts/mpegts_input.c b/src/input/mpegts/mpegts_input.c index 55caae25..9c131de6 100644 --- a/src/input/mpegts/mpegts_input.c +++ b/src/input/mpegts/mpegts_input.c @@ -399,6 +399,8 @@ mpegts_input_started_mux { /* Deliver first TS packets as fast as possible */ mi->mi_last_dispatch = 0; + /* Wait for first TS packet */ + mi->mi_live = 0; /* Arm timer */ if (LIST_FIRST(&mi->mi_mux_active) == NULL) @@ -635,6 +637,8 @@ mpegts_input_process mpegts_mux_instance_t *mmi = mm->mm_active; mpegts_pid_t *last_mp = NULL; + mi->mi_live = 1; + /* Process */ assert((len % 188) == 0); while ( tsb < end ) { diff --git a/src/input/mpegts/tsdemux.c b/src/input/mpegts/tsdemux.c index ef403c8b..114e5a14 100644 --- a/src/input/mpegts/tsdemux.c +++ b/src/input/mpegts/tsdemux.c @@ -287,6 +287,7 @@ ts_remux(mpegts_service_t *t, const uint8_t *src) pktbuf_ref_dec(pb); service_set_streaming_status_flags((service_t*)t, TSS_PACKETS); + t->s_streaming_live |= TSS_LIVE; sbuf_reset(sb, TS_REMUX_BUFSIZE); } diff --git a/src/parsers/parsers.c b/src/parsers/parsers.c index 8ff6e4e1..277438d2 100644 --- a/src/parsers/parsers.c +++ b/src/parsers/parsers.c @@ -1407,6 +1407,7 @@ parser_deliver(service_t *t, elementary_stream_t *st, th_pkt_t *pkt, int error) * Input is ok */ service_set_streaming_status_flags(t, TSS_PACKETS); + t->s_streaming_live |= TSS_LIVE; /* Forward packet */ pkt->pkt_componentindex = st->es_index; diff --git a/src/service.c b/src/service.c index 62e1a1cd..65be059f 100644 --- a/src/service.c +++ b/src/service.c @@ -553,6 +553,7 @@ service_start(service_t *t, int instance) assert(t->s_status != SERVICE_RUNNING); t->s_streaming_status = 0; + t->s_streaming_live = 0; t->s_scrambled_seen = 0; t->s_start_time = dispatch_clock; @@ -943,13 +944,21 @@ static void service_data_timeout(void *aux) { service_t *t = aux; + int flags = 0; pthread_mutex_lock(&t->s_stream_mutex); if(!(t->s_streaming_status & TSS_PACKETS)) - service_set_streaming_status_flags(t, TSS_GRACEPERIOD); + flags |= TSS_GRACEPERIOD; + if(!(t->s_streaming_live & TSS_LIVE)) + flags |= TSS_TIMEOUT; + if (flags) + service_set_streaming_status_flags(t, flags); + t->s_streaming_live &= ~TSS_LIVE; pthread_mutex_unlock(&t->s_stream_mutex); + + gtimer_arm(&t->s_receive_timer, service_data_timeout, t, 5); } /** @@ -1052,7 +1061,7 @@ service_set_streaming_status_flags_(service_t *t, int set) t->s_streaming_status = n; - tvhlog(LOG_DEBUG, "service", "%s: Status changed to %s%s%s%s%s%s%s", + tvhlog(LOG_DEBUG, "service", "%s: Status changed to %s%s%s%s%s%s%s%s", service_nicename(t), n & TSS_INPUT_HARDWARE ? "[Hardware input] " : "", n & TSS_INPUT_SERVICE ? "[Input on service] " : "", @@ -1060,7 +1069,8 @@ service_set_streaming_status_flags_(service_t *t, int set) n & TSS_PACKETS ? "[Reassembled packets] " : "", n & TSS_NO_DESCRAMBLER ? "[No available descrambler] " : "", n & TSS_NO_ACCESS ? "[No access] " : "", - n & TSS_GRACEPERIOD ? "[Graceperiod expired] " : ""); + n & TSS_GRACEPERIOD ? "[Graceperiod expired] " : "", + n & TSS_TIMEOUT ? "[Data timeout] " : ""); sm = streaming_msg_create_code(SMT_SERVICE_STATUS, t->s_streaming_status); @@ -1331,6 +1341,9 @@ service_tss2text(int flags) if(flags & TSS_GRACEPERIOD) return "No input detected"; + if(flags & TSS_TIMEOUT) + return "Data timeout"; + return "No status"; } @@ -1347,7 +1360,7 @@ tss2errcode(int tss) if(tss & TSS_NO_DESCRAMBLER) return SM_CODE_NO_DESCRAMBLER; - if(tss & TSS_GRACEPERIOD) + if(tss & (TSS_GRACEPERIOD|TSS_TIMEOUT)) return SM_CODE_NO_INPUT; return SM_CODE_OK; diff --git a/src/service.h b/src/service.h index a7738db1..54b7f9ff 100644 --- a/src/service.h +++ b/src/service.h @@ -383,10 +383,19 @@ typedef struct service { // Errors #define TSS_NO_DESCRAMBLER 0x10000 #define TSS_NO_ACCESS 0x20000 +#define TSS_TIMEOUT 0x40000 #define TSS_ERRORS 0xffff0000 + /** + * + */ + int s_streaming_live; + + // Live status +#define TSS_LIVE 0x01 + /** * For simple streaming sources (such as video4linux) keeping * track of the video and audio stream is convenient. diff --git a/src/subscriptions.c b/src/subscriptions.c index db440eda..be55f423 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -152,6 +152,8 @@ subscription_unlink_mux(th_subscription_t *s, int reason) mpegts_mux_t *mm = mmi->mmi_mux; mpegts_input_t *mi = mmi->mmi_input; + gtimer_disarm(&s->ths_receive_timer); + pthread_mutex_lock(&mi->mi_output_lock); s->ths_mmi = NULL; @@ -363,6 +365,14 @@ subscription_input(void *opauqe, streaming_message_t *sm) return; } + if (sm->sm_type == SMT_SERVICE_STATUS && + sm->sm_code & TSS_TIMEOUT) { + error = tss2errcode(sm->sm_code); + if (error > s->ths_testing_error) + s->ths_testing_error = error; + s->ths_state = SUBSCRIPTION_BAD_SERVICE; + } + /* Pass to direct handler to log traffic */ subscription_input_direct(s, sm); } @@ -598,6 +608,20 @@ mpegts_mux_setsourceinfo ( mpegts_mux_t *mm, source_info_t *si ) } } +static void +mux_data_timeout ( void *aux ) +{ + th_subscription_t *s = aux; + mpegts_input_t *mi = s->ths_mmi->mmi_input; + + if (!mi->mi_live) { + subscription_unlink_mux(s, SM_CODE_NO_INPUT); + return; + } + mi->mi_live = 0; + + gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, 5); +} th_subscription_t * subscription_create_from_mux @@ -639,7 +663,7 @@ subscription_create_from_mux pthread_mutex_unlock(&mi->mi_output_lock); } - pthread_mutex_lock(&s->ths_mmi->mmi_input->mi_output_lock); + pthread_mutex_lock(&mi->mi_output_lock); /* Store */ LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link); @@ -672,7 +696,10 @@ subscription_create_from_mux sm = streaming_msg_create_data(SMT_START, ss); streaming_target_deliver(s->ths_output, sm); - pthread_mutex_unlock(&s->ths_mmi->mmi_input->mi_output_lock); + pthread_mutex_unlock(&mi->mi_output_lock); + + r = (mi->mi_get_grace ? mi->mi_get_grace(mi, mm) : 0) + 20; + gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, r); return s; } diff --git a/src/subscriptions.h b/src/subscriptions.h index 51163971..29f16454 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -93,6 +93,7 @@ typedef struct th_subscription { // (repeated) logic elsewhere LIST_ENTRY(th_subscription) ths_mmi_link; struct mpegts_mux_instance *ths_mmi; + gtimer_t ths_receive_timer; #endif } th_subscription_t;