subscription: Add watchdog for the data reception

.. for sevices and add the timeout timer for the mux subscriptions
This commit is contained in:
Jaroslav Kysela 2014-07-27 16:18:53 +02:00
parent 65facb3fc4
commit 9b4165296b
8 changed files with 64 additions and 7 deletions

View file

@ -538,7 +538,8 @@ struct mpegts_input
* Input processing
*/
int mi_running;
uint8_t mi_running;
uint8_t mi_live;
time_t mi_last_dispatch;
/* Data input */

View file

@ -399,6 +399,8 @@ mpegts_input_started_mux
{
/* Deliver first TS packets as fast as possible */
mi->mi_last_dispatch = 0;
/* Wait for first TS packet */
mi->mi_live = 0;
/* Arm timer */
if (LIST_FIRST(&mi->mi_mux_active) == NULL)
@ -635,6 +637,8 @@ mpegts_input_process
mpegts_mux_instance_t *mmi = mm->mm_active;
mpegts_pid_t *last_mp = NULL;
mi->mi_live = 1;
/* Process */
assert((len % 188) == 0);
while ( tsb < end ) {

View file

@ -287,6 +287,7 @@ ts_remux(mpegts_service_t *t, const uint8_t *src)
pktbuf_ref_dec(pb);
service_set_streaming_status_flags((service_t*)t, TSS_PACKETS);
t->s_streaming_live |= TSS_LIVE;
sbuf_reset(sb, TS_REMUX_BUFSIZE);
}

View file

@ -1407,6 +1407,7 @@ parser_deliver(service_t *t, elementary_stream_t *st, th_pkt_t *pkt, int error)
* Input is ok
*/
service_set_streaming_status_flags(t, TSS_PACKETS);
t->s_streaming_live |= TSS_LIVE;
/* Forward packet */
pkt->pkt_componentindex = st->es_index;

View file

@ -553,6 +553,7 @@ service_start(service_t *t, int instance)
assert(t->s_status != SERVICE_RUNNING);
t->s_streaming_status = 0;
t->s_streaming_live = 0;
t->s_scrambled_seen = 0;
t->s_start_time = dispatch_clock;
@ -943,13 +944,21 @@ static void
service_data_timeout(void *aux)
{
service_t *t = aux;
int flags = 0;
pthread_mutex_lock(&t->s_stream_mutex);
if(!(t->s_streaming_status & TSS_PACKETS))
service_set_streaming_status_flags(t, TSS_GRACEPERIOD);
flags |= TSS_GRACEPERIOD;
if(!(t->s_streaming_live & TSS_LIVE))
flags |= TSS_TIMEOUT;
if (flags)
service_set_streaming_status_flags(t, flags);
t->s_streaming_live &= ~TSS_LIVE;
pthread_mutex_unlock(&t->s_stream_mutex);
gtimer_arm(&t->s_receive_timer, service_data_timeout, t, 5);
}
/**
@ -1052,7 +1061,7 @@ service_set_streaming_status_flags_(service_t *t, int set)
t->s_streaming_status = n;
tvhlog(LOG_DEBUG, "service", "%s: Status changed to %s%s%s%s%s%s%s",
tvhlog(LOG_DEBUG, "service", "%s: Status changed to %s%s%s%s%s%s%s%s",
service_nicename(t),
n & TSS_INPUT_HARDWARE ? "[Hardware input] " : "",
n & TSS_INPUT_SERVICE ? "[Input on service] " : "",
@ -1060,7 +1069,8 @@ service_set_streaming_status_flags_(service_t *t, int set)
n & TSS_PACKETS ? "[Reassembled packets] " : "",
n & TSS_NO_DESCRAMBLER ? "[No available descrambler] " : "",
n & TSS_NO_ACCESS ? "[No access] " : "",
n & TSS_GRACEPERIOD ? "[Graceperiod expired] " : "");
n & TSS_GRACEPERIOD ? "[Graceperiod expired] " : "",
n & TSS_TIMEOUT ? "[Data timeout] " : "");
sm = streaming_msg_create_code(SMT_SERVICE_STATUS,
t->s_streaming_status);
@ -1331,6 +1341,9 @@ service_tss2text(int flags)
if(flags & TSS_GRACEPERIOD)
return "No input detected";
if(flags & TSS_TIMEOUT)
return "Data timeout";
return "No status";
}
@ -1347,7 +1360,7 @@ tss2errcode(int tss)
if(tss & TSS_NO_DESCRAMBLER)
return SM_CODE_NO_DESCRAMBLER;
if(tss & TSS_GRACEPERIOD)
if(tss & (TSS_GRACEPERIOD|TSS_TIMEOUT))
return SM_CODE_NO_INPUT;
return SM_CODE_OK;

View file

@ -383,10 +383,19 @@ typedef struct service {
// Errors
#define TSS_NO_DESCRAMBLER 0x10000
#define TSS_NO_ACCESS 0x20000
#define TSS_TIMEOUT 0x40000
#define TSS_ERRORS 0xffff0000
/**
*
*/
int s_streaming_live;
// Live status
#define TSS_LIVE 0x01
/**
* For simple streaming sources (such as video4linux) keeping
* track of the video and audio stream is convenient.

View file

@ -152,6 +152,8 @@ subscription_unlink_mux(th_subscription_t *s, int reason)
mpegts_mux_t *mm = mmi->mmi_mux;
mpegts_input_t *mi = mmi->mmi_input;
gtimer_disarm(&s->ths_receive_timer);
pthread_mutex_lock(&mi->mi_output_lock);
s->ths_mmi = NULL;
@ -363,6 +365,14 @@ subscription_input(void *opauqe, streaming_message_t *sm)
return;
}
if (sm->sm_type == SMT_SERVICE_STATUS &&
sm->sm_code & TSS_TIMEOUT) {
error = tss2errcode(sm->sm_code);
if (error > s->ths_testing_error)
s->ths_testing_error = error;
s->ths_state = SUBSCRIPTION_BAD_SERVICE;
}
/* Pass to direct handler to log traffic */
subscription_input_direct(s, sm);
}
@ -598,6 +608,20 @@ mpegts_mux_setsourceinfo ( mpegts_mux_t *mm, source_info_t *si )
}
}
static void
mux_data_timeout ( void *aux )
{
th_subscription_t *s = aux;
mpegts_input_t *mi = s->ths_mmi->mmi_input;
if (!mi->mi_live) {
subscription_unlink_mux(s, SM_CODE_NO_INPUT);
return;
}
mi->mi_live = 0;
gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, 5);
}
th_subscription_t *
subscription_create_from_mux
@ -639,7 +663,7 @@ subscription_create_from_mux
pthread_mutex_unlock(&mi->mi_output_lock);
}
pthread_mutex_lock(&s->ths_mmi->mmi_input->mi_output_lock);
pthread_mutex_lock(&mi->mi_output_lock);
/* Store */
LIST_INSERT_HEAD(&mm->mm_active->mmi_subs, s, ths_mmi_link);
@ -672,7 +696,10 @@ subscription_create_from_mux
sm = streaming_msg_create_data(SMT_START, ss);
streaming_target_deliver(s->ths_output, sm);
pthread_mutex_unlock(&s->ths_mmi->mmi_input->mi_output_lock);
pthread_mutex_unlock(&mi->mi_output_lock);
r = (mi->mi_get_grace ? mi->mi_get_grace(mi, mm) : 0) + 20;
gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, r);
return s;
}

View file

@ -93,6 +93,7 @@ typedef struct th_subscription {
// (repeated) logic elsewhere
LIST_ENTRY(th_subscription) ths_mmi_link;
struct mpegts_mux_instance *ths_mmi;
gtimer_t ths_receive_timer;
#endif
} th_subscription_t;