From 6b9d0d1c45f92a729b68ef80c387b64c14b62112 Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Tue, 14 Oct 2014 11:08:23 +0200 Subject: [PATCH] streaming profile: allow configuratible timeout and restart, fixes #2368 Note: If restart is enabled, tvh will keep trying to subscribe any service from the channel's list until unsubscribed. --- src/config.c | 21 +++++++- src/dvr/dvr_rec.c | 2 +- src/htsmsg.c | 38 ++++---------- src/htsmsg.h | 28 ++++++++-- src/htsp_server.c | 4 +- src/input/mpegts/mpegts_mux.c | 2 +- src/input/mpegts/mpegts_mux_sched.c | 2 +- src/profile.c | 14 +++++ src/profile.h | 2 + src/service.c | 23 +++++---- src/service.h | 6 ++- src/service_mapper.c | 3 +- src/subscriptions.c | 79 +++++++++++++++++++---------- src/subscriptions.h | 70 ++++++++++++++----------- src/webui/webui.c | 6 +-- 15 files changed, 186 insertions(+), 114 deletions(-) diff --git a/src/config.c b/src/config.c index 782c42b7..735563ff 100644 --- a/src/config.c +++ b/src/config.c @@ -1081,6 +1081,24 @@ config_migrate_v14 ( void ) } } +static void +config_migrate_v15 ( void ) +{ + htsmsg_t *c, *e; + htsmsg_field_t *f; + int i; + + if ((c = hts_settings_load("profile")) != NULL) { + HTSMSG_FOREACH(f, c) { + if (!(e = htsmsg_field_get_map(f))) continue; + if (htsmsg_get_s32(e, "timeout", &i)) { + htsmsg_set_s32(e, "timeout", 5); + hts_settings_save(e, "profile/%s", f->hmf_name); + } + } + } +} + /* * Perform backup */ @@ -1183,7 +1201,8 @@ static const config_migrate_t config_migrate_table[] = { config_migrate_v11, config_migrate_v12, config_migrate_v13, - config_migrate_v14 + config_migrate_v14, + config_migrate_v15 }; /* diff --git a/src/dvr/dvr_rec.c b/src/dvr/dvr_rec.c index 1964ed36..f9e41b12 100644 --- a/src/dvr/dvr_rec.c +++ b/src/dvr/dvr_rec.c @@ -84,7 +84,7 @@ dvr_rec_subscribe(dvr_entry_t *de) return; } - de->de_s = subscription_create_from_channel(de->de_channel, weight, + de->de_s = subscription_create_from_channel(de->de_channel, pro, weight, buf, prch->prch_st, prch->prch_flags, NULL, NULL, NULL); diff --git a/src/htsmsg.c b/src/htsmsg.c index db79c09b..24172d56 100644 --- a/src/htsmsg.c +++ b/src/htsmsg.c @@ -192,31 +192,6 @@ htsmsg_add_bool(htsmsg_t *msg, const char *name, int b) f->hmf_bool = !!b; } -/* - * - */ -void -htsmsg_add_u32(htsmsg_t *msg, const char *name, uint32_t u32) -{ - htsmsg_field_t *f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED); - f->hmf_s64 = u32; -} - -/* - * - */ -int -htsmsg_set_u32(htsmsg_t *msg, const char *name, uint32_t u32) -{ - htsmsg_field_t *f = htsmsg_field_find(msg, name); - if (!f) - f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED); - if (f->hmf_type != HMF_S64) - return 1; - f->hmf_s64 = u32; - return 0; -} - /* * */ @@ -230,11 +205,16 @@ htsmsg_add_s64(htsmsg_t *msg, const char *name, int64_t s64) /* * */ -void -htsmsg_add_s32(htsmsg_t *msg, const char *name, int32_t s32) +int +htsmsg_set_s64(htsmsg_t *msg, const char *name, int64_t s64) { - htsmsg_field_t *f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED); - f->hmf_s64 = s32; + htsmsg_field_t *f = htsmsg_field_find(msg, name); + if (!f) + f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED); + if (f->hmf_type != HMF_S64) + return 1; + f->hmf_s64 = s64; + return 0; } diff --git a/src/htsmsg.h b/src/htsmsg.h index 479a39a3..4d59c6ca 100644 --- a/src/htsmsg.h +++ b/src/htsmsg.h @@ -110,25 +110,43 @@ void htsmsg_destroy(htsmsg_t *msg); void htsmsg_add_bool(htsmsg_t *msg, const char *name, int b); +/** + * Add an integer field where source is signed 64 bit. + */ +void htsmsg_add_s64(htsmsg_t *msg, const char *name, int64_t s64); + +/** + * Add/update an integer field where source is signed 64 bit. + */ +int htsmsg_set_s64(htsmsg_t *msg, const char *name, int64_t s64); + /** * Add an integer field where source is unsigned 32 bit. */ -void htsmsg_add_u32(htsmsg_t *msg, const char *name, uint32_t u32); +static inline void +htsmsg_add_u32(htsmsg_t *msg, const char *name, uint32_t u32) + { htsmsg_add_s64(msg, name, u32); } /** * Add/update an integer field */ -int htsmsg_set_u32(htsmsg_t *msg, const char *name, uint32_t u32); +static inline int +htsmsg_set_u32(htsmsg_t *msg, const char *name, uint32_t u32) + { return htsmsg_set_s64(msg, name, u32); } /** * Add an integer field where source is signed 32 bit. */ -void htsmsg_add_s32(htsmsg_t *msg, const char *name, int32_t s32); +static inline void +htsmsg_add_s32(htsmsg_t *msg, const char *name, int32_t s32) + { htsmsg_add_s64(msg, name, s32); } /** - * Add an integer field where source is signed 64 bit. + * Add/update an integer field */ -void htsmsg_add_s64(htsmsg_t *msg, const char *name, int64_t s64); +static inline int +htsmsg_set_s32(htsmsg_t *msg, const char *name, int32_t s32) + { return htsmsg_set_s64(msg, name, s32); } /** * Add a string field. diff --git a/src/htsp_server.c b/src/htsp_server.c index 874bdbe6..33a27a1e 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -1792,13 +1792,15 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) st = hs->hs_work; normts = 1; } +#else + profile_t *pro = NULL; #endif if(normts) st = hs->hs_tsfix = tsfix_create(st); tvhdebug("htsp", "%s - subscribe to %s\n", htsp->htsp_logname, ch->ch_name ?: ""); - hs->hs_s = subscription_create_from_channel(ch, weight, + hs->hs_s = subscription_create_from_channel(ch, pro, weight, htsp->htsp_logname, st, SUBSCRIPTION_STREAMING, diff --git a/src/input/mpegts/mpegts_mux.c b/src/input/mpegts/mpegts_mux.c index 895a3889..1a9fc9f9 100644 --- a/src/input/mpegts/mpegts_mux.c +++ b/src/input/mpegts/mpegts_mux.c @@ -1048,7 +1048,7 @@ mpegts_mux_subscribe { int err = 0; th_subscription_t *s; - s = subscription_create_from_mux(mm, weight, name, NULL, + s = subscription_create_from_mux(mm, NULL, weight, name, NULL, SUBSCRIPTION_NONE, NULL, NULL, NULL, &err); return s ? 0 : err; diff --git a/src/input/mpegts/mpegts_mux_sched.c b/src/input/mpegts/mpegts_mux_sched.c index 39b3c596..356b48d7 100644 --- a/src/input/mpegts/mpegts_mux_sched.c +++ b/src/input/mpegts/mpegts_mux_sched.c @@ -205,7 +205,7 @@ mpegts_mux_sched_timer ( void *p ) assert(mms->mms_sub == NULL); mms->mms_sub - = subscription_create_from_mux(mm, mms->mms_weight, + = subscription_create_from_mux(mm, NULL, mms->mms_weight, mms->mms_creator ?: "", &mms->mms_input, SUBSCRIPTION_NONE, diff --git a/src/profile.c b/src/profile.c index aaa99313..406473d8 100644 --- a/src/profile.c +++ b/src/profile.c @@ -259,6 +259,20 @@ const idclass_t profile_class = .name = "Comment", .off = offsetof(profile_t, pro_comment), }, + { + .type = PT_INT, + .id = "timeout", + .name = "Timeout (sec)", + .off = offsetof(profile_t, pro_timeout), + .def.i = 5, + }, + { + .type = PT_BOOL, + .id = "restart", + .name = "Restart On Error", + .off = offsetof(profile_t, pro_restart), + .def.i = 0, + }, { } } }; diff --git a/src/profile.h b/src/profile.h index de82a58f..9972ce82 100644 --- a/src/profile.h +++ b/src/profile.h @@ -69,6 +69,8 @@ typedef struct profile { int pro_shield; char *pro_name; char *pro_comment; + int pro_timeout; + int pro_restart; void (*pro_free)(struct profile *pro); void (*pro_conf_changed)(struct profile *pro); diff --git a/src/service.c b/src/service.c index c72b9f77..9157b595 100644 --- a/src/service.c +++ b/src/service.c @@ -592,10 +592,10 @@ ignore: * */ int -service_start(service_t *t, int instance, int postpone) +service_start(service_t *t, int instance, int timeout, int postpone) { elementary_stream_t *st; - int r, timeout = 10; + int r, stimeout = 10; lock_assert(&global_lock); @@ -631,11 +631,13 @@ service_start(service_t *t, int instance, int postpone) pthread_mutex_unlock(&t->s_stream_mutex); if(t->s_grace_period != NULL) - timeout = t->s_grace_period(t); + stimeout = t->s_grace_period(t); - timeout += postpone; - t->s_grace_delay = timeout; - gtimer_arm(&t->s_receive_timer, service_data_timeout, t, timeout); + stimeout += postpone; + t->s_timeout = timeout; + t->s_grace_delay = stimeout; + if (stimeout > 0) + gtimer_arm(&t->s_receive_timer, service_data_timeout, t, stimeout); return 0; } @@ -646,7 +648,7 @@ service_start(service_t *t, int instance, int postpone) service_instance_t * service_find_instance (service_t *s, channel_t *ch, service_instance_list_t *sil, - int *error, int weight, int flags, int postpone) + int *error, int weight, int flags, int timeout, int postpone) { channel_service_mapping_t *csm; service_instance_t *si, *next; @@ -723,7 +725,7 @@ service_find_instance /* Start */ tvhtrace("service", "will start new instance %d", si->si_instance); - if (service_start(si->si_s, si->si_instance, postpone)) { + if (service_start(si->si_s, si->si_instance, timeout, postpone)) { tvhtrace("service", "tuning failed"); si->si_error = SM_CODE_TUNING_FAILED; if (*error < SM_CODE_TUNING_FAILED) @@ -1020,7 +1022,8 @@ service_data_timeout(void *aux) pthread_mutex_unlock(&t->s_stream_mutex); - gtimer_arm(&t->s_receive_timer, service_data_timeout, t, 5); + if (t->s_timeout > 0) + gtimer_arm(&t->s_receive_timer, service_data_timeout, t, t->s_timeout); } /** @@ -1485,7 +1488,6 @@ service_instance_add(service_instance_list_t *sil, return si; } - /** * */ @@ -1498,7 +1500,6 @@ service_instance_destroy free(si); } - /** * */ diff --git a/src/service.h b/src/service.h index 2aee0e3b..fc926ef7 100644 --- a/src/service.h +++ b/src/service.h @@ -341,6 +341,7 @@ typedef struct service { /** * Stream start time */ + int s_timeout; int s_grace_delay; time_t s_start_time; @@ -448,7 +449,7 @@ typedef struct service { void service_init(void); void service_done(void); -int service_start(service_t *t, int instance, int postpone); +int service_start(service_t *t, int instance, int timeout, int postpone); void service_stop(service_t *t); void service_build_filter(service_t *t); @@ -470,7 +471,8 @@ service_instance_t *service_find_instance(struct service *s, struct channel *ch, service_instance_list_t *sil, int *error, int weight, - int flags, int postpone); + int flags, int timeout, + int postpone); elementary_stream_t *service_stream_find_(service_t *t, int pid); diff --git a/src/service_mapper.c b/src/service_mapper.c index 30edd9e3..eb098ce2 100644 --- a/src/service_mapper.c +++ b/src/service_mapper.c @@ -30,6 +30,7 @@ #include "service_mapper.h" #include "streaming.h" #include "service.h" +#include "profile.h" #include "api.h" static service_mapper_status_t service_mapper_stat; @@ -361,7 +362,7 @@ service_mapper_thread ( void *aux ) /* Subscribe */ tvhinfo("service_mapper", "checking %s", s->s_nicename); - sub = subscription_create_from_service(s, SUBSCRIPTION_PRIO_MAPPER, + sub = subscription_create_from_service(s, NULL, SUBSCRIPTION_PRIO_MAPPER, "service_mapper", &sq.sq_st, 0, NULL, NULL, "service_mapper"); diff --git a/src/subscriptions.c b/src/subscriptions.c index 2e853307..2edf4d73 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -36,6 +36,7 @@ #include "streaming.h" #include "channels.h" #include "service.h" +#include "profile.h" #include "htsmsg.h" #include "notify.h" #include "atomic.h" @@ -312,7 +313,7 @@ subscription_reschedule(void) s->ths_service->s_nicename, s->ths_weight); si = service_find_instance(s->ths_service, s->ths_channel, &s->ths_instances, &error, s->ths_weight, - s->ths_flags, + s->ths_flags, s->ths_timeout, dispatch_clock > s->ths_postpone_end ? 0 : s->ths_postpone_end - dispatch_clock); s->ths_current_instance = si; @@ -325,6 +326,18 @@ subscription_reschedule(void) s->ths_last_error = error; continue; } + if (s->ths_flags & SUBSCRIPTION_RESTART) { + if (s->ths_channel) + tvhwarn("subscription", "restarting channel %s", + channel_get_name(s->ths_channel)); + else + tvhwarn("subscription", "restarting service %s", + s->ths_service->s_nicename); + s->ths_testing_error = 0; + s->ths_current_instance = NULL; + service_instance_list_clear(&s->ths_instances); + continue; + } /* No service available */ sm = streaming_msg_create_code(SMT_NOSTART, error); streaming_target_deliver(s->ths_output, sm); @@ -564,7 +577,7 @@ subscription_unsubscribe(th_subscription_t *s) */ th_subscription_t * subscription_create - (int weight, const char *name, streaming_target_t *st, + (profile_t *pro, int weight, const char *name, streaming_target_t *st, int flags, st_callback_t *cb, const char *hostname, const char *username, const char *client) { @@ -596,9 +609,13 @@ subscription_create s->ths_total_err = 0; s->ths_output = st; s->ths_flags = flags; + s->ths_timeout = pro ? pro->pro_timeout : 0; s->ths_postpone = subscription_postpone; s->ths_postpone_end = dispatch_clock + s->ths_postpone; + if (pro && pro->pro_restart) + s->ths_flags |= SUBSCRIPTION_RESTART; + time(&s->ths_start); s->ths_id = ++tally; @@ -616,11 +633,16 @@ subscription_create * */ static th_subscription_t * -subscription_create_from_channel_or_service - (channel_t *ch, service_t *t, unsigned int weight, - const char *name, streaming_target_t *st, - int flags, const char *hostname, - const char *username, const char *client) +subscription_create_from_channel_or_service(channel_t *ch, + service_t *t, + profile_t *pro, + unsigned int weight, + const char *name, + streaming_target_t *st, + int flags, + const char *hostname, + const char *username, + const char *client) { th_subscription_t *s; assert(!ch || !t); @@ -629,7 +651,7 @@ subscription_create_from_channel_or_service if (ch) tvhtrace("subscription", "creating subscription for %s weight %d", channel_get_name(ch), weight); - s = subscription_create(weight, name, st, flags, subscription_input, + s = subscription_create(pro, weight, name, st, flags, subscription_input, hostname, username, client); s->ths_channel = ch; s->ths_service = t; @@ -641,27 +663,29 @@ subscription_create_from_channel_or_service } th_subscription_t * -subscription_create_from_channel(channel_t *ch, unsigned int weight, +subscription_create_from_channel(channel_t *ch, profile_t *pro, + unsigned int weight, const char *name, streaming_target_t *st, int flags, const char *hostname, const char *username, const char *client) { return subscription_create_from_channel_or_service - (ch, NULL, weight, name, st, flags, hostname, username, client); + (ch, NULL, pro, weight, name, st, flags, hostname, username, client); } /** * */ th_subscription_t * -subscription_create_from_service(service_t *t, unsigned int weight, +subscription_create_from_service(service_t *t, profile_t *pro, + unsigned int weight, const char *name, streaming_target_t *st, int flags, const char *hostname, const char *username, const char *client) { return subscription_create_from_channel_or_service - (NULL, t, weight, name, st, flags, hostname, username, client); + (NULL, t, pro, weight, name, st, flags, hostname, username, client); } /** @@ -706,20 +730,20 @@ mux_data_timeout ( void *aux ) } mi->mi_live = 0; - gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, 5); + if (s->ths_timeout > 0) + gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, s->ths_timeout); } th_subscription_t * -subscription_create_from_mux - (mpegts_mux_t *mm, - unsigned int weight, - const char *name, - streaming_target_t *st, - int flags, - const char *hostname, - const char *username, - const char *client, - int *err) +subscription_create_from_mux(mpegts_mux_t *mm, profile_t *pro, + unsigned int weight, + const char *name, + streaming_target_t *st, + int flags, + const char *hostname, + const char *username, + const char *client, + int *err) { th_subscription_t *s; streaming_message_t *sm; @@ -737,7 +761,7 @@ subscription_create_from_mux /* Create subscription */ if (!st) flags |= SUBSCRIPTION_NONE; - s = subscription_create(weight, name, st, flags, NULL, + s = subscription_create(pro, weight, name, st, flags, NULL, hostname, username, client); s->ths_mmi = mm->mm_active; @@ -790,7 +814,8 @@ subscription_create_from_mux pthread_mutex_unlock(&mi->mi_output_lock); - gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, r); + if (r > 0) + gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, r); return s; } @@ -1045,10 +1070,8 @@ subscription_dummy_join(const char *id, int first) st = calloc(1, sizeof(streaming_target_t)); streaming_target_init(st, dummy_callback, NULL, 0); - subscription_create_from_service(t, 1, "dummy", st, 0, NULL, NULL, "dummy"); + subscription_create_from_service(t, NULL, 1, "dummy", st, 0, NULL, NULL, "dummy"); tvhlog(LOG_NOTICE, "subscription", "Dummy join %s ok", id); } - - diff --git a/src/subscriptions.h b/src/subscriptions.h index 6bf4fa8a..e3f07cb5 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -21,12 +21,15 @@ #include "service.h" +struct profile; + extern struct th_subscription_list subscriptions; -#define SUBSCRIPTION_RAW_MPEGTS 0x1 -#define SUBSCRIPTION_NONE 0x2 -#define SUBSCRIPTION_FULLMUX 0x4 -#define SUBSCRIPTION_STREAMING 0x8 +#define SUBSCRIPTION_RAW_MPEGTS 0x01 +#define SUBSCRIPTION_NONE 0x02 +#define SUBSCRIPTION_FULLMUX 0x04 +#define SUBSCRIPTION_STREAMING 0x08 +#define SUBSCRIPTION_RESTART 0x10 /* Some internal prioties */ #define SUBSCRIPTION_PRIO_SCAN_IDLE 1 ///< Idle scanning @@ -75,6 +78,7 @@ typedef struct th_subscription { streaming_target_t *ths_output; int ths_flags; + int ths_timeout; time_t ths_last_find; int ths_last_error; @@ -122,39 +126,45 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight); void subscription_reschedule(void); -th_subscription_t *subscription_create_from_channel(struct channel *ch, - unsigned int weight, - const char *name, - streaming_target_t *st, - int flags, - const char *hostname, - const char *username, - const char *client); +th_subscription_t * +subscription_create_from_channel(struct channel *ch, + struct profile *pro, + unsigned int weight, + const char *name, + streaming_target_t *st, + int flags, + const char *hostname, + const char *username, + const char *client); -th_subscription_t *subscription_create_from_service(struct service *t, - unsigned int weight, - const char *name, - streaming_target_t *st, - int flags, - const char *hostname, - const char *username, - const char *client); +th_subscription_t * +subscription_create_from_service(struct service *t, + struct profile *pro, + unsigned int weight, + const char *name, + streaming_target_t *st, + int flags, + const char *hostname, + const char *username, + const char *client); #if ENABLE_MPEGTS struct mpegts_mux; -th_subscription_t *subscription_create_from_mux - (struct mpegts_mux *m, - unsigned int weight, - const char *name, - streaming_target_t *st, - int flags, - const char *hostname, - const char *username, - const char *client, int *err); +th_subscription_t * +subscription_create_from_mux(struct mpegts_mux *m, + struct profile *pro, + unsigned int weight, + const char *name, + streaming_target_t *st, + int flags, + const char *hostname, + const char *username, + const char *client, int *err); #endif -th_subscription_t *subscription_create(int weight, const char *name, +th_subscription_t *subscription_create(struct profile *pro, + int weight, const char *name, streaming_target_t *st, int flags, st_callback_t *cb, const char *hostname, diff --git a/src/webui/webui.c b/src/webui/webui.c index dffd7d5c..90e47570 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -734,7 +734,7 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight) tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50); - s = subscription_create_from_service(service, weight ?: 100, "HTTP", + s = subscription_create_from_service(service, pro, weight ?: 100, "HTTP", prch.prch_st, prch.prch_flags | SUBSCRIPTION_STREAMING, addrbuf, @@ -788,7 +788,7 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight) tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50); - s = subscription_create_from_mux(mm, weight ?: 10, "HTTP", + s = subscription_create_from_mux(mm, NULL, weight ?: 10, "HTTP", prch.prch_st, prch.prch_flags | SUBSCRIPTION_FULLMUX | @@ -846,7 +846,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight) tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50); - s = subscription_create_from_channel(ch, weight ?: 100, "HTTP", + s = subscription_create_from_channel(ch, pro, weight ?: 100, "HTTP", prch.prch_st, prch.prch_flags | SUBSCRIPTION_STREAMING, addrbuf, hc->hc_username, http_arg_get(&hc->hc_args, "User-Agent"));