streaming profile: allow configuratible timeout and restart, fixes #2368

Note: If restart is enabled, tvh will keep trying to subscribe any service
from the channel's list until unsubscribed.
This commit is contained in:
Jaroslav Kysela 2014-10-14 11:08:23 +02:00
parent cc276263a3
commit 6b9d0d1c45
15 changed files with 186 additions and 114 deletions

View file

@ -1081,6 +1081,24 @@ config_migrate_v14 ( void )
}
}
static void
config_migrate_v15 ( void )
{
htsmsg_t *c, *e;
htsmsg_field_t *f;
int i;
if ((c = hts_settings_load("profile")) != NULL) {
HTSMSG_FOREACH(f, c) {
if (!(e = htsmsg_field_get_map(f))) continue;
if (htsmsg_get_s32(e, "timeout", &i)) {
htsmsg_set_s32(e, "timeout", 5);
hts_settings_save(e, "profile/%s", f->hmf_name);
}
}
}
}
/*
* Perform backup
*/
@ -1183,7 +1201,8 @@ static const config_migrate_t config_migrate_table[] = {
config_migrate_v11,
config_migrate_v12,
config_migrate_v13,
config_migrate_v14
config_migrate_v14,
config_migrate_v15
};
/*

View file

@ -84,7 +84,7 @@ dvr_rec_subscribe(dvr_entry_t *de)
return;
}
de->de_s = subscription_create_from_channel(de->de_channel, weight,
de->de_s = subscription_create_from_channel(de->de_channel, pro, weight,
buf, prch->prch_st,
prch->prch_flags,
NULL, NULL, NULL);

View file

@ -192,31 +192,6 @@ htsmsg_add_bool(htsmsg_t *msg, const char *name, int b)
f->hmf_bool = !!b;
}
/*
*
*/
void
htsmsg_add_u32(htsmsg_t *msg, const char *name, uint32_t u32)
{
htsmsg_field_t *f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED);
f->hmf_s64 = u32;
}
/*
*
*/
int
htsmsg_set_u32(htsmsg_t *msg, const char *name, uint32_t u32)
{
htsmsg_field_t *f = htsmsg_field_find(msg, name);
if (!f)
f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED);
if (f->hmf_type != HMF_S64)
return 1;
f->hmf_s64 = u32;
return 0;
}
/*
*
*/
@ -230,11 +205,16 @@ htsmsg_add_s64(htsmsg_t *msg, const char *name, int64_t s64)
/*
*
*/
void
htsmsg_add_s32(htsmsg_t *msg, const char *name, int32_t s32)
int
htsmsg_set_s64(htsmsg_t *msg, const char *name, int64_t s64)
{
htsmsg_field_t *f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED);
f->hmf_s64 = s32;
htsmsg_field_t *f = htsmsg_field_find(msg, name);
if (!f)
f = htsmsg_field_add(msg, name, HMF_S64, HMF_NAME_ALLOCED);
if (f->hmf_type != HMF_S64)
return 1;
f->hmf_s64 = s64;
return 0;
}

View file

@ -110,25 +110,43 @@ void htsmsg_destroy(htsmsg_t *msg);
void htsmsg_add_bool(htsmsg_t *msg, const char *name, int b);
/**
* Add an integer field where source is signed 64 bit.
*/
void htsmsg_add_s64(htsmsg_t *msg, const char *name, int64_t s64);
/**
* Add/update an integer field where source is signed 64 bit.
*/
int htsmsg_set_s64(htsmsg_t *msg, const char *name, int64_t s64);
/**
* Add an integer field where source is unsigned 32 bit.
*/
void htsmsg_add_u32(htsmsg_t *msg, const char *name, uint32_t u32);
static inline void
htsmsg_add_u32(htsmsg_t *msg, const char *name, uint32_t u32)
{ htsmsg_add_s64(msg, name, u32); }
/**
* Add/update an integer field
*/
int htsmsg_set_u32(htsmsg_t *msg, const char *name, uint32_t u32);
static inline int
htsmsg_set_u32(htsmsg_t *msg, const char *name, uint32_t u32)
{ return htsmsg_set_s64(msg, name, u32); }
/**
* Add an integer field where source is signed 32 bit.
*/
void htsmsg_add_s32(htsmsg_t *msg, const char *name, int32_t s32);
static inline void
htsmsg_add_s32(htsmsg_t *msg, const char *name, int32_t s32)
{ htsmsg_add_s64(msg, name, s32); }
/**
* Add an integer field where source is signed 64 bit.
* Add/update an integer field
*/
void htsmsg_add_s64(htsmsg_t *msg, const char *name, int64_t s64);
static inline int
htsmsg_set_s32(htsmsg_t *msg, const char *name, int32_t s32)
{ return htsmsg_set_s64(msg, name, s32); }
/**
* Add a string field.

View file

@ -1792,13 +1792,15 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
st = hs->hs_work;
normts = 1;
}
#else
profile_t *pro = NULL;
#endif
if(normts)
st = hs->hs_tsfix = tsfix_create(st);
tvhdebug("htsp", "%s - subscribe to %s\n", htsp->htsp_logname, ch->ch_name ?: "");
hs->hs_s = subscription_create_from_channel(ch, weight,
hs->hs_s = subscription_create_from_channel(ch, pro, weight,
htsp->htsp_logname,
st,
SUBSCRIPTION_STREAMING,

View file

@ -1048,7 +1048,7 @@ mpegts_mux_subscribe
{
int err = 0;
th_subscription_t *s;
s = subscription_create_from_mux(mm, weight, name, NULL,
s = subscription_create_from_mux(mm, NULL, weight, name, NULL,
SUBSCRIPTION_NONE,
NULL, NULL, NULL, &err);
return s ? 0 : err;

View file

@ -205,7 +205,7 @@ mpegts_mux_sched_timer ( void *p )
assert(mms->mms_sub == NULL);
mms->mms_sub
= subscription_create_from_mux(mm, mms->mms_weight,
= subscription_create_from_mux(mm, NULL, mms->mms_weight,
mms->mms_creator ?: "",
&mms->mms_input,
SUBSCRIPTION_NONE,

View file

@ -259,6 +259,20 @@ const idclass_t profile_class =
.name = "Comment",
.off = offsetof(profile_t, pro_comment),
},
{
.type = PT_INT,
.id = "timeout",
.name = "Timeout (sec)",
.off = offsetof(profile_t, pro_timeout),
.def.i = 5,
},
{
.type = PT_BOOL,
.id = "restart",
.name = "Restart On Error",
.off = offsetof(profile_t, pro_restart),
.def.i = 0,
},
{ }
}
};

View file

@ -69,6 +69,8 @@ typedef struct profile {
int pro_shield;
char *pro_name;
char *pro_comment;
int pro_timeout;
int pro_restart;
void (*pro_free)(struct profile *pro);
void (*pro_conf_changed)(struct profile *pro);

View file

@ -592,10 +592,10 @@ ignore:
*
*/
int
service_start(service_t *t, int instance, int postpone)
service_start(service_t *t, int instance, int timeout, int postpone)
{
elementary_stream_t *st;
int r, timeout = 10;
int r, stimeout = 10;
lock_assert(&global_lock);
@ -631,11 +631,13 @@ service_start(service_t *t, int instance, int postpone)
pthread_mutex_unlock(&t->s_stream_mutex);
if(t->s_grace_period != NULL)
timeout = t->s_grace_period(t);
stimeout = t->s_grace_period(t);
timeout += postpone;
t->s_grace_delay = timeout;
gtimer_arm(&t->s_receive_timer, service_data_timeout, t, timeout);
stimeout += postpone;
t->s_timeout = timeout;
t->s_grace_delay = stimeout;
if (stimeout > 0)
gtimer_arm(&t->s_receive_timer, service_data_timeout, t, stimeout);
return 0;
}
@ -646,7 +648,7 @@ service_start(service_t *t, int instance, int postpone)
service_instance_t *
service_find_instance
(service_t *s, channel_t *ch, service_instance_list_t *sil,
int *error, int weight, int flags, int postpone)
int *error, int weight, int flags, int timeout, int postpone)
{
channel_service_mapping_t *csm;
service_instance_t *si, *next;
@ -723,7 +725,7 @@ service_find_instance
/* Start */
tvhtrace("service", "will start new instance %d", si->si_instance);
if (service_start(si->si_s, si->si_instance, postpone)) {
if (service_start(si->si_s, si->si_instance, timeout, postpone)) {
tvhtrace("service", "tuning failed");
si->si_error = SM_CODE_TUNING_FAILED;
if (*error < SM_CODE_TUNING_FAILED)
@ -1020,7 +1022,8 @@ service_data_timeout(void *aux)
pthread_mutex_unlock(&t->s_stream_mutex);
gtimer_arm(&t->s_receive_timer, service_data_timeout, t, 5);
if (t->s_timeout > 0)
gtimer_arm(&t->s_receive_timer, service_data_timeout, t, t->s_timeout);
}
/**
@ -1485,7 +1488,6 @@ service_instance_add(service_instance_list_t *sil,
return si;
}
/**
*
*/
@ -1498,7 +1500,6 @@ service_instance_destroy
free(si);
}
/**
*
*/

View file

@ -341,6 +341,7 @@ typedef struct service {
/**
* Stream start time
*/
int s_timeout;
int s_grace_delay;
time_t s_start_time;
@ -448,7 +449,7 @@ typedef struct service {
void service_init(void);
void service_done(void);
int service_start(service_t *t, int instance, int postpone);
int service_start(service_t *t, int instance, int timeout, int postpone);
void service_stop(service_t *t);
void service_build_filter(service_t *t);
@ -470,7 +471,8 @@ service_instance_t *service_find_instance(struct service *s,
struct channel *ch,
service_instance_list_t *sil,
int *error, int weight,
int flags, int postpone);
int flags, int timeout,
int postpone);
elementary_stream_t *service_stream_find_(service_t *t, int pid);

View file

@ -30,6 +30,7 @@
#include "service_mapper.h"
#include "streaming.h"
#include "service.h"
#include "profile.h"
#include "api.h"
static service_mapper_status_t service_mapper_stat;
@ -361,7 +362,7 @@ service_mapper_thread ( void *aux )
/* Subscribe */
tvhinfo("service_mapper", "checking %s", s->s_nicename);
sub = subscription_create_from_service(s, SUBSCRIPTION_PRIO_MAPPER,
sub = subscription_create_from_service(s, NULL, SUBSCRIPTION_PRIO_MAPPER,
"service_mapper", &sq.sq_st,
0, NULL, NULL, "service_mapper");

View file

@ -36,6 +36,7 @@
#include "streaming.h"
#include "channels.h"
#include "service.h"
#include "profile.h"
#include "htsmsg.h"
#include "notify.h"
#include "atomic.h"
@ -312,7 +313,7 @@ subscription_reschedule(void)
s->ths_service->s_nicename, s->ths_weight);
si = service_find_instance(s->ths_service, s->ths_channel,
&s->ths_instances, &error, s->ths_weight,
s->ths_flags,
s->ths_flags, s->ths_timeout,
dispatch_clock > s->ths_postpone_end ?
0 : s->ths_postpone_end - dispatch_clock);
s->ths_current_instance = si;
@ -325,6 +326,18 @@ subscription_reschedule(void)
s->ths_last_error = error;
continue;
}
if (s->ths_flags & SUBSCRIPTION_RESTART) {
if (s->ths_channel)
tvhwarn("subscription", "restarting channel %s",
channel_get_name(s->ths_channel));
else
tvhwarn("subscription", "restarting service %s",
s->ths_service->s_nicename);
s->ths_testing_error = 0;
s->ths_current_instance = NULL;
service_instance_list_clear(&s->ths_instances);
continue;
}
/* No service available */
sm = streaming_msg_create_code(SMT_NOSTART, error);
streaming_target_deliver(s->ths_output, sm);
@ -564,7 +577,7 @@ subscription_unsubscribe(th_subscription_t *s)
*/
th_subscription_t *
subscription_create
(int weight, const char *name, streaming_target_t *st,
(profile_t *pro, int weight, const char *name, streaming_target_t *st,
int flags, st_callback_t *cb, const char *hostname,
const char *username, const char *client)
{
@ -596,9 +609,13 @@ subscription_create
s->ths_total_err = 0;
s->ths_output = st;
s->ths_flags = flags;
s->ths_timeout = pro ? pro->pro_timeout : 0;
s->ths_postpone = subscription_postpone;
s->ths_postpone_end = dispatch_clock + s->ths_postpone;
if (pro && pro->pro_restart)
s->ths_flags |= SUBSCRIPTION_RESTART;
time(&s->ths_start);
s->ths_id = ++tally;
@ -616,11 +633,16 @@ subscription_create
*
*/
static th_subscription_t *
subscription_create_from_channel_or_service
(channel_t *ch, service_t *t, unsigned int weight,
const char *name, streaming_target_t *st,
int flags, const char *hostname,
const char *username, const char *client)
subscription_create_from_channel_or_service(channel_t *ch,
service_t *t,
profile_t *pro,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client)
{
th_subscription_t *s;
assert(!ch || !t);
@ -629,7 +651,7 @@ subscription_create_from_channel_or_service
if (ch)
tvhtrace("subscription", "creating subscription for %s weight %d",
channel_get_name(ch), weight);
s = subscription_create(weight, name, st, flags, subscription_input,
s = subscription_create(pro, weight, name, st, flags, subscription_input,
hostname, username, client);
s->ths_channel = ch;
s->ths_service = t;
@ -641,27 +663,29 @@ subscription_create_from_channel_or_service
}
th_subscription_t *
subscription_create_from_channel(channel_t *ch, unsigned int weight,
subscription_create_from_channel(channel_t *ch, profile_t *pro,
unsigned int weight,
const char *name, streaming_target_t *st,
int flags, const char *hostname,
const char *username, const char *client)
{
return subscription_create_from_channel_or_service
(ch, NULL, weight, name, st, flags, hostname, username, client);
(ch, NULL, pro, weight, name, st, flags, hostname, username, client);
}
/**
*
*/
th_subscription_t *
subscription_create_from_service(service_t *t, unsigned int weight,
subscription_create_from_service(service_t *t, profile_t *pro,
unsigned int weight,
const char *name,
streaming_target_t *st, int flags,
const char *hostname, const char *username,
const char *client)
{
return subscription_create_from_channel_or_service
(NULL, t, weight, name, st, flags, hostname, username, client);
(NULL, t, pro, weight, name, st, flags, hostname, username, client);
}
/**
@ -706,20 +730,20 @@ mux_data_timeout ( void *aux )
}
mi->mi_live = 0;
gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, 5);
if (s->ths_timeout > 0)
gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, s->ths_timeout);
}
th_subscription_t *
subscription_create_from_mux
(mpegts_mux_t *mm,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client,
int *err)
subscription_create_from_mux(mpegts_mux_t *mm, profile_t *pro,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client,
int *err)
{
th_subscription_t *s;
streaming_message_t *sm;
@ -737,7 +761,7 @@ subscription_create_from_mux
/* Create subscription */
if (!st)
flags |= SUBSCRIPTION_NONE;
s = subscription_create(weight, name, st, flags, NULL,
s = subscription_create(pro, weight, name, st, flags, NULL,
hostname, username, client);
s->ths_mmi = mm->mm_active;
@ -790,7 +814,8 @@ subscription_create_from_mux
pthread_mutex_unlock(&mi->mi_output_lock);
gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, r);
if (r > 0)
gtimer_arm(&s->ths_receive_timer, mux_data_timeout, s, r);
return s;
}
@ -1045,10 +1070,8 @@ subscription_dummy_join(const char *id, int first)
st = calloc(1, sizeof(streaming_target_t));
streaming_target_init(st, dummy_callback, NULL, 0);
subscription_create_from_service(t, 1, "dummy", st, 0, NULL, NULL, "dummy");
subscription_create_from_service(t, NULL, 1, "dummy", st, 0, NULL, NULL, "dummy");
tvhlog(LOG_NOTICE, "subscription",
"Dummy join %s ok", id);
}

View file

@ -21,12 +21,15 @@
#include "service.h"
struct profile;
extern struct th_subscription_list subscriptions;
#define SUBSCRIPTION_RAW_MPEGTS 0x1
#define SUBSCRIPTION_NONE 0x2
#define SUBSCRIPTION_FULLMUX 0x4
#define SUBSCRIPTION_STREAMING 0x8
#define SUBSCRIPTION_RAW_MPEGTS 0x01
#define SUBSCRIPTION_NONE 0x02
#define SUBSCRIPTION_FULLMUX 0x04
#define SUBSCRIPTION_STREAMING 0x08
#define SUBSCRIPTION_RESTART 0x10
/* Some internal prioties */
#define SUBSCRIPTION_PRIO_SCAN_IDLE 1 ///< Idle scanning
@ -75,6 +78,7 @@ typedef struct th_subscription {
streaming_target_t *ths_output;
int ths_flags;
int ths_timeout;
time_t ths_last_find;
int ths_last_error;
@ -122,39 +126,45 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight);
void subscription_reschedule(void);
th_subscription_t *subscription_create_from_channel(struct channel *ch,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client);
th_subscription_t *
subscription_create_from_channel(struct channel *ch,
struct profile *pro,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client);
th_subscription_t *subscription_create_from_service(struct service *t,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client);
th_subscription_t *
subscription_create_from_service(struct service *t,
struct profile *pro,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client);
#if ENABLE_MPEGTS
struct mpegts_mux;
th_subscription_t *subscription_create_from_mux
(struct mpegts_mux *m,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client, int *err);
th_subscription_t *
subscription_create_from_mux(struct mpegts_mux *m,
struct profile *pro,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,
const char *hostname,
const char *username,
const char *client, int *err);
#endif
th_subscription_t *subscription_create(int weight, const char *name,
th_subscription_t *subscription_create(struct profile *pro,
int weight, const char *name,
streaming_target_t *st,
int flags, st_callback_t *cb,
const char *hostname,

View file

@ -734,7 +734,7 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight)
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
s = subscription_create_from_service(service, weight ?: 100, "HTTP",
s = subscription_create_from_service(service, pro, weight ?: 100, "HTTP",
prch.prch_st,
prch.prch_flags | SUBSCRIPTION_STREAMING,
addrbuf,
@ -788,7 +788,7 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight)
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
s = subscription_create_from_mux(mm, weight ?: 10, "HTTP",
s = subscription_create_from_mux(mm, NULL, weight ?: 10, "HTTP",
prch.prch_st,
prch.prch_flags |
SUBSCRIPTION_FULLMUX |
@ -846,7 +846,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight)
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
s = subscription_create_from_channel(ch, weight ?: 100, "HTTP",
s = subscription_create_from_channel(ch, pro, weight ?: 100, "HTTP",
prch.prch_st, prch.prch_flags | SUBSCRIPTION_STREAMING,
addrbuf, hc->hc_username,
http_arg_get(&hc->hc_args, "User-Agent"));