subscription/dbus: implement subscription postpone function

This commit is contained in:
Jaroslav Kysela 2014-08-07 17:30:47 +02:00
parent 54164baa11
commit 4b5da7c1f5
5 changed files with 143 additions and 43 deletions

View file

@ -27,6 +27,7 @@
#include "tvheadend.h"
#include "tvhpoll.h"
#include "subscriptions.h"
#include "dbus.h"
@ -202,6 +203,32 @@ dbus_reply_to_ping(DBusMessage *msg, DBusConnection *conn)
dbus_message_unref(reply);
}
/**
* Set the subscription postpone delay
*/
static void
dbus_reply_to_postpone(DBusMessage *msg, DBusConnection *conn)
{
DBusMessageIter args;
DBusMessage *reply;
int64_t param;
if (!dbus_message_iter_init(msg, &args))
return;
if (DBUS_TYPE_INT64 != dbus_message_iter_get_arg_type(&args))
return;
dbus_message_iter_get_basic(&args, &param);
param = subscription_set_postpone(param);
reply = dbus_message_new_method_return(msg);
dbus_message_iter_init_append(reply, &args);
dbus_message_iter_append_basic(&args, DBUS_TYPE_INT64, &param);
dbus_connection_send(conn, reply, NULL);
dbus_connection_flush(conn);
dbus_message_unref(reply);
}
/**
*
*/
@ -313,6 +340,8 @@ dbus_server_thread(void *aux)
if (dbus_message_is_method_call(msg, "org.tvheadend", "ping"))
dbus_reply_to_ping(msg, conn);
else if (dbus_message_is_method_call(msg, "org.tvheadend", "postpone"))
dbus_reply_to_postpone(msg, conn);
dbus_message_unref(msg);
}

View file

@ -557,7 +557,7 @@ ignore:
*
*/
int
service_start(service_t *t, int instance)
service_start(service_t *t, int instance, int postpone)
{
elementary_stream_t *st;
int r, timeout = 10;
@ -598,6 +598,7 @@ service_start(service_t *t, int instance)
if(t->s_grace_period != NULL)
timeout = t->s_grace_period(t);
timeout += postpone;
t->s_grace_delay = timeout;
gtimer_arm(&t->s_receive_timer, service_data_timeout, t, timeout);
return 0;
@ -610,7 +611,7 @@ service_start(service_t *t, int instance)
service_instance_t *
service_find_instance
(service_t *s, channel_t *ch, service_instance_list_t *sil,
int *error, int weight)
int *error, int weight, int postpone)
{
channel_service_mapping_t *csm;
service_instance_t *si, *next;
@ -687,7 +688,7 @@ service_find_instance
/* Start */
tvhtrace("service", "will start new instance %d", si->si_instance);
if (service_start(si->si_s, si->si_instance)) {
if (service_start(si->si_s, si->si_instance, postpone)) {
tvhtrace("service", "tuning failed");
si->si_error = SM_CODE_TUNING_FAILED;
if (*error < SM_CODE_TUNING_FAILED)

View file

@ -449,7 +449,7 @@ typedef struct service {
void service_init(void);
void service_done(void);
int service_start(service_t *t, int instance);
int service_start(service_t *t, int instance, int postpone);
void service_stop(service_t *t);
void service_build_filter(service_t *t);
@ -470,7 +470,7 @@ service_instance_t *service_find_instance(struct service *s,
struct channel *ch,
service_instance_list_t *sil,
int *error,
int weight);
int weight, int postpone);
elementary_stream_t *service_stream_find_(service_t *t, int pid);

View file

@ -45,6 +45,7 @@
struct th_subscription_list subscriptions;
struct th_subscription_list subscriptions_remove;
static gtimer_t subscription_reschedule_timer;
static int subscription_postpone;
/**
*
@ -87,7 +88,7 @@ subscription_link_service(th_subscription_t *s, service_t *t)
// Link to service output
streaming_target_connect(&t->s_streaming_pad, &s->ths_input);
sm = streaming_msg_create_code(SMT_GRACE, t->s_grace_delay);
sm = streaming_msg_create_code(SMT_GRACE, s->ths_postpone + t->s_grace_delay);
streaming_pad_deliver(&t->s_streaming_pad, sm);
if(s->ths_start_message != NULL && t->s_streaming_status & TSS_PACKETS) {
@ -188,6 +189,49 @@ subscription_sort(th_subscription_t *a, th_subscription_t *b)
}
static void
subscription_show_none(th_subscription_t *s)
{
channel_t *ch = s->ths_channel;
tvhlog(LOG_NOTICE, "subscription",
"No transponder available for subscription \"%s\" "
"to channel \"%s\"",
s->ths_title, ch ? channel_get_name(ch) : "none");
}
static void
subscription_show_info(th_subscription_t *s)
{
char buf[512];
channel_t *ch = s->ths_channel;
source_info_t si;
size_t buflen;
s->ths_service->s_setsourceinfo(s->ths_service, &si);
snprintf(buf, sizeof(buf),
"\"%s\" subscribing on \"%s\", weight: %d, adapter: \"%s\", "
"network: \"%s\", mux: \"%s\", provider: \"%s\", "
"service: \"%s\"",
s->ths_title, ch ? channel_get_name(ch) : "none", s->ths_weight,
si.si_adapter ?: "<N/A>",
si.si_network ?: "<N/A>",
si.si_mux ?: "<N/A>",
si.si_provider ?: "<N/A>",
si.si_service ?: "<N/A>");
service_source_info_free(&si);
if (s->ths_hostname) {
buflen = strlen(buf);
snprintf(buf + buflen, sizeof(buf) - buflen,
", hostname=\"%s\", username=\"%s\", client=\"%s\"",
s->ths_hostname ?: "<N/A>",
s->ths_username ?: "<N/A>",
s->ths_client ?: "<N/A>");
}
tvhlog(LOG_INFO, "subscription", "%s", buf);
}
/**
*
*/
@ -209,25 +253,33 @@ subscription_reschedule(void)
service_t *t;
service_instance_t *si;
streaming_message_t *sm;
int error;
int error, postpone = INT_MAX;
assert(reenter == 0);
reenter = 1;
lock_assert(&global_lock);
gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, 2);
LIST_FOREACH(s, &subscriptions, ths_global_link) {
if (s->ths_mmi) continue;
if (!s->ths_service && !s->ths_channel) continue;
/* Postpone the tuner decision */
/* Leave some time to wakeup tuners through DBus or so */
if (s->ths_postpone_end > dispatch_clock) {
if (postpone > s->ths_postpone_end - dispatch_clock)
postpone = s->ths_postpone_end - dispatch_clock;
streaming_message_t *sm;
sm = streaming_msg_create_code(SMT_GRACE, (s->ths_postpone_end - dispatch_clock) + 5);
streaming_target_deliver(s->ths_output, sm);
continue;
}
t = s->ths_service;
if(t != NULL && s->ths_current_instance != NULL) {
/* Already got a service */
if(s->ths_state != SUBSCRIPTION_BAD_SERVICE)
continue; /* And it not bad, so we're happy */
continue; /* And it not bad, so we're happy */
t->s_streaming_status = 0;
t->s_status = SERVICE_IDLE;
@ -255,28 +307,65 @@ subscription_reschedule(void)
tvhtrace("subscription", "find instance for %s weight %d",
s->ths_service->s_nicename, s->ths_weight);
si = service_find_instance(s->ths_service, s->ths_channel,
&s->ths_instances, &error, s->ths_weight);
&s->ths_instances, &error, s->ths_weight,
dispatch_clock > s->ths_postpone_end ?
0 : s->ths_postpone_end - dispatch_clock);
s->ths_current_instance = si;
if(si == NULL) {
/* No service available */
sm = streaming_msg_create_code(SMT_NOSTART, error);
streaming_target_deliver(s->ths_output, sm);
subscription_show_none(s);
continue;
}
subscription_link_service(s, si->si_s);
subscription_show_info(s);
}
while ((s = LIST_FIRST(&subscriptions_remove))) {
LIST_REMOVE(s, ths_remove_link);
subscription_unsubscribe(s);
}
if (!postpone)
postpone = 2;
gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, postpone);
reenter = 0;
}
/**
*
*/
int
subscription_set_postpone(int postpone)
{
th_subscription_t *s;
time_t now = time(NULL);
/* some limits that make sense */
if (postpone < 0)
postpone = 0;
if (postpone > 120)
postpone = 120;
pthread_mutex_lock(&global_lock);
if (subscription_postpone != postpone) {
subscription_postpone = postpone;
LIST_FOREACH(s, &subscriptions, ths_global_link) {
s->ths_postpone = postpone;
if (s->ths_postpone_end > now && s->ths_postpone_end - now > postpone)
s->ths_postpone_end = now + postpone;
}
gtimer_arm(&subscription_reschedule_timer,
subscription_reschedule_cb, NULL, 0);
}
pthread_mutex_unlock(&global_lock);
return postpone;
}
/* **************************************************************************
* Streaming handlers
* *************************************************************************/
@ -492,6 +581,8 @@ subscription_create
s->ths_total_err = 0;
s->ths_output = st;
s->ths_flags = flags;
s->ths_postpone = subscription_postpone;
s->ths_postpone_end = dispatch_clock + s->ths_postpone;
time(&s->ths_start);
@ -530,36 +621,7 @@ subscription_create_from_channel_or_service
if (ch)
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
// TODO: do we really need this here?
subscription_reschedule();
if(s->ths_service == NULL) {
tvhlog(LOG_NOTICE, "subscription",
"No transponder available for subscription \"%s\" "
"to channel \"%s\"",
s->ths_title, ch ? channel_get_name(ch) : "none");
} else {
source_info_t si;
s->ths_service->s_setsourceinfo(s->ths_service, &si);
tvhlog(LOG_INFO, "subscription",
"\"%s\" subscribing on \"%s\", weight: %d, adapter: \"%s\", "
"network: \"%s\", mux: \"%s\", provider: \"%s\", "
"service: \"%s\", hostname: \"%s\", username: \"%s\", client: \"%s\"",
s->ths_title, ch ? channel_get_name(ch) : "none", weight,
si.si_adapter ?: "<N/A>",
si.si_network ?: "<N/A>",
si.si_mux ?: "<N/A>",
si.si_provider ?: "<N/A>",
si.si_service ?: "<N/A>",
hostname ?: "<N/A>",
username ?: "<N/A>",
client ?: "<N/A>");
service_source_info_free(&si);
}
return s;
}

View file

@ -87,6 +87,12 @@ typedef struct th_subscription {
service_instance_list_t ths_instances;
struct service_instance *ths_current_instance;
/**
* Postpone
*/
int ths_postpone;
time_t ths_postpone_end;
#if ENABLE_MPEGTS
// Note: its a bit ugly linking MPEG-TS code directly here, but to do
// otherwise would probably require adding lots of additional
@ -112,6 +118,8 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight);
void subscription_reschedule(void);
int subscription_set_postpone(int postpone);
th_subscription_t *subscription_create_from_channel(struct channel *ch,
unsigned int weight,
const char *name,