Rework the subscription scheduler so it's not blocking during subscription start

Prefer slower DVB devices over faster ones when chasing for an available transport
This commit is contained in:
Andreas Öman 2010-01-24 22:46:11 +00:00
parent 702297ecfe
commit 82a8b932b1
4 changed files with 142 additions and 107 deletions

View file

@ -61,38 +61,41 @@ subscription_sort(th_subscription_t *a, th_subscription_t *b)
/**
* The transport is producing output. Thus, we may link our subscription
* to it.
* The transport is producing output.
*/
static void
subscription_link_transport(th_subscription_t *s, th_transport_t *t)
{
streaming_message_t *sm;
s->ths_state = SUBSCRIPTION_TESTING_TRANSPORT;
s->ths_transport = t;
LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link);
pthread_mutex_lock(&t->tht_stream_mutex);
if(LIST_FIRST(&t->tht_components) != NULL)
s->ths_start_message =
streaming_msg_create_data(SMT_START, transport_build_stream_start(t));
// Link to transport output
streaming_target_connect(&t->tht_streaming_pad, &s->ths_input);
if(LIST_FIRST(&t->tht_components) != NULL) {
if(s->ths_start_message != NULL && t->tht_streaming_status & TSS_PACKETS) {
s->ths_state = SUBSCRIPTION_GOT_TRANSPORT;
// Send a START message to the subscription client
sm = streaming_msg_create_data(SMT_START,
transport_build_stream_start(t));
streaming_target_deliver(s->ths_output, s->ths_start_message);
s->ths_start_message = NULL;
// Send status report
sm = streaming_msg_create_code(SMT_TRANSPORT_STATUS,
t->tht_streaming_status);
streaming_target_deliver(s->ths_output, sm);
// Send a TRANSPORT_STATUS message to the subscription client
if(t->tht_streaming_status) {
sm = streaming_msg_create_code(SMT_TRANSPORT_STATUS,
t->tht_streaming_status);
streaming_target_deliver(s->ths_output, sm);
}
}
pthread_mutex_unlock(&t->tht_stream_mutex);
}
@ -131,7 +134,7 @@ static void
subscription_reschedule(void *aux)
{
th_subscription_t *s;
th_transport_t *t;
th_transport_t *t, *skip;
streaming_message_t *sm;
char buf[128];
int errorcode;
@ -140,14 +143,22 @@ subscription_reschedule(void *aux)
gtimer_arm(&subscription_reschedule_timer, subscription_reschedule, NULL, 2);
LIST_FOREACH(s, &subscriptions, ths_global_link) {
if(s->ths_transport != NULL)
continue; /* Got a transport, we're happy */
if(s->ths_channel == NULL)
continue; /* stale entry, channel has been destroyed */
if(s->ths_transport != NULL) {
/* Already got a transport */
if(s->ths_state != SUBSCRIPTION_BAD_TRANSPORT)
continue; /* And it seems to work ok, so we're happy */
skip = s->ths_transport;
transport_remove_subscriber(s->ths_transport, s);
} else {
skip = NULL;
}
snprintf(buf, sizeof(buf), "Subscription \"%s\"", s->ths_title);
t = transport_find(s->ths_channel, s->ths_weight, buf, &errorcode);
t = transport_find(s->ths_channel, s->ths_weight, buf, &errorcode, skip);
if(t == NULL) {
/* No transport available */
@ -185,6 +196,9 @@ subscription_unsubscribe(th_subscription_t *s)
if(t != NULL)
transport_remove_subscriber(t, s);
if(s->ths_start_message != NULL)
streaming_msg_free(s->ths_start_message);
free(s->ths_title);
free(s);
@ -200,7 +214,52 @@ static void
subscription_input(void *opauqe, streaming_message_t *sm)
{
th_subscription_t *s = opauqe;
streaming_target_deliver(s->ths_output, sm);
if(s->ths_state == SUBSCRIPTION_TESTING_TRANSPORT) {
// We are just testing if this transport is good
if(sm->sm_type == SMT_START) {
if(s->ths_start_message != NULL)
streaming_msg_free(s->ths_start_message);
s->ths_start_message = sm;
return;
}
if(sm->sm_type == SMT_TRANSPORT_STATUS &&
sm->sm_code & (TSS_GRACEPERIOD | TSS_ERRORS)) {
// No, mark our subscription as bad_transport
// the scheduler will take care of things
s->ths_state = SUBSCRIPTION_BAD_TRANSPORT;
streaming_msg_free(sm);
return;
}
if(sm->sm_type == SMT_TRANSPORT_STATUS &&
sm->sm_code & TSS_PACKETS) {
if(s->ths_start_message != NULL) {
streaming_target_deliver(s->ths_output, s->ths_start_message);
s->ths_start_message = NULL;
}
s->ths_state = SUBSCRIPTION_GOT_TRANSPORT;
}
}
if(s->ths_state != SUBSCRIPTION_GOT_TRANSPORT) {
streaming_msg_free(sm);
return;
}
streaming_target_deliver(s->ths_output, sm);
}
/**
*
*/
static void
subscription_input_direct(void *opauqe, streaming_message_t *sm)
{
th_subscription_t *s = opauqe;
streaming_target_deliver(s->ths_output, sm);
}
@ -210,7 +269,7 @@ subscription_input(void *opauqe, streaming_message_t *sm)
*/
static th_subscription_t *
subscription_create(int weight, const char *name, streaming_target_t *st,
int flags)
int flags, int direct)
{
th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
int reject = 0;
@ -221,7 +280,8 @@ subscription_create(int weight, const char *name, streaming_target_t *st,
reject |= SMT_TO_MASK(SMT_MPEGTS); // Reject raw mpegts
streaming_target_init(&s->ths_input, subscription_input, s, reject);
streaming_target_init(&s->ths_input, direct ? subscription_input_direct :
subscription_input, s, reject);
s->ths_weight = weight;
s->ths_title = strdup(name);
@ -244,7 +304,7 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight,
const char *name, streaming_target_t *st,
int flags)
{
th_subscription_t *s = subscription_create(weight, name, st, flags);
th_subscription_t *s = subscription_create(weight, name, st, flags, 0);
s->ths_channel = ch;
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
@ -287,12 +347,12 @@ th_subscription_t *
subscription_create_from_transport(th_transport_t *t, const char *name,
streaming_target_t *st, int flags)
{
th_subscription_t *s = subscription_create(INT32_MAX, name, st, flags);
th_subscription_t *s = subscription_create(INT32_MAX, name, st, flags, 1);
source_info_t si;
int r;
if(t->tht_status != TRANSPORT_RUNNING) {
if((r = transport_start(t, INT32_MAX, 1, 0)) != 0) {
if((r = transport_start(t, INT32_MAX, 1)) != 0) {
subscription_unsubscribe(s);
tvhlog(LOG_INFO, "subscription",

View file

@ -25,6 +25,13 @@ typedef struct th_subscription {
LIST_ENTRY(th_subscription) ths_global_link;
int ths_weight;
enum {
SUBSCRIPTION_IDLE,
SUBSCRIPTION_TESTING_TRANSPORT,
SUBSCRIPTION_GOT_TRANSPORT,
SUBSCRIPTION_BAD_TRANSPORT,
} ths_state;
LIST_ENTRY(th_subscription) ths_channel_link;
struct channel *ths_channel; /* May be NULL if channel has been
destroyed during the
@ -44,6 +51,8 @@ typedef struct th_subscription {
int ths_flags;
streaming_message_t *ths_start_message;
} th_subscription_t;

View file

@ -45,7 +45,7 @@
#include "notify.h"
#include "serviceprobe.h"
#include "atomic.h"
#include "dvb/dvb.h"
#define TRANSPORT_HASH_WIDTH 101
@ -61,16 +61,11 @@ transport_nostart2txt(int code)
{
switch(code) {
case TRANSPORT_NOSTART_NO_HARDWARE: return "No hardware present";
case TRANSPORT_NOSTART_MUX_NOT_ENABLED: return "No mux enabled";
case TRANSPORT_NOSTART_MUX_NOT_ENABLED: return "Mux not enabled";
case TRANSPORT_NOSTART_NOT_FREE: return "Adapter in use by other subscription";
case TRANSPORT_NOSTART_TUNING_FAILED: return "Tuning failed";
case TRANSPORT_NOSTART_SVC_NOT_ENABLED: return "No service enabled";
case TRANSPORT_NOSTART_BAD_SIGNAL: return "Too bad signal quality";
case TRANSPORT_NOSTART_NO_ACCESS: return "No access";
case TRANSPORT_NOSTART_NO_DESCRAMBLER: return "No descrambler available";
case TRANSPORT_NOSTART_NO_INPUT: return "No input detected";
case TRANSPORT_NOSTART_NO_SIGNAL: return "No signal";
}
return "Unknown error";
}
@ -262,13 +257,10 @@ transport_remove_subscriber(th_transport_t *t, th_subscription_t *s)
*
*/
int
transport_start(th_transport_t *t, unsigned int weight, int force_start,
int wait_for_ok)
transport_start(th_transport_t *t, unsigned int weight, int force_start)
{
th_stream_t *st;
int r, err;
int timeout = 2;
struct timespec to;
int r, timeout = 2;
lock_assert(&global_lock);
@ -280,6 +272,9 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start,
if((r = t->tht_start_feed(t, weight, force_start)))
return r;
cwc_transport_start(t);
capmt_transport_start(t);
pthread_mutex_lock(&t->tht_stream_mutex);
t->tht_status = TRANSPORT_RUNNING;
@ -292,61 +287,21 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start,
pthread_mutex_unlock(&t->tht_stream_mutex);
cwc_transport_start(t);
capmt_transport_start(t);
if(t->tht_grace_period != NULL)
timeout = t->tht_grace_period(t);
gtimer_arm(&t->tht_receive_timer, transport_data_timeout, t, timeout);
if(!wait_for_ok)
return 0;
tvhlog(LOG_DEBUG, "Transport", "%s: Waiting for input before start",
transport_nicename(t));
pthread_mutex_lock(&t->tht_stream_mutex);
to.tv_sec = time(NULL) + timeout;
to.tv_nsec = 0;
do {
if(t->tht_streaming_status & TSS_MUX_PACKETS) {
tvhlog(LOG_DEBUG, "Transport", "%s: Got demultiplexable packets",
transport_nicename(t));
pthread_mutex_unlock(&t->tht_stream_mutex);
return 0;
}
} while(pthread_cond_timedwait(&t->tht_tss_cond, &t->tht_stream_mutex,
&to) != ETIMEDOUT);
err = t->tht_streaming_status;
// Startup timed out
pthread_mutex_unlock(&t->tht_stream_mutex);
transport_stop(t);
// Translate streaming status flags to NOSTART errorcode
if(err & TSS_NO_DESCRAMBLER)
return TRANSPORT_NOSTART_NO_DESCRAMBLER;
if(err & TSS_NO_ACCESS)
return TRANSPORT_NOSTART_NO_ACCESS;
if(err & TSS_NO_ACCESS)
return TRANSPORT_NOSTART_NO_ACCESS;
if(err & TSS_INPUT_SERVICE)
return TRANSPORT_NOSTART_NO_INPUT;
return TRANSPORT_NOSTART_NO_SIGNAL;
return 0;
}
/**
*
*/
static int
dvb_extra_prio(th_dvb_adapter_t *tda)
{
return tda->tda_hostconnection * 10;
}
/**
* Return prio for the given transport
@ -356,18 +311,17 @@ transport_get_prio(th_transport_t *t)
{
switch(t->tht_type) {
case TRANSPORT_DVB:
if(t->tht_scrambled)
return 3;
return 1;
return (t->tht_scrambled ? 300 : 100) +
dvb_extra_prio(t->tht_dvb_mux_instance->tdmi_adapter);
case TRANSPORT_IPTV:
return 2;
return 200;
case TRANSPORT_V4L:
return 4;
return 400;
default:
return 5;
return 500;
}
}
@ -414,10 +368,10 @@ transportcmp(const void *A, const void *B)
*/
th_transport_t *
transport_find(channel_t *ch, unsigned int weight, const char *loginfo,
int *errorp)
int *errorp, th_transport_t *skip)
{
th_transport_t *t, **vec;
int cnt = 0, i, r;
int cnt = 0, i, r, off;
int error = 0;
lock_assert(&global_lock);
@ -457,23 +411,38 @@ transport_find(channel_t *ch, unsigned int weight, const char *loginfo,
qsort(vec, cnt, sizeof(th_transport_t *), transportcmp);
/* First, try all transports without stealing */
// Skip up to the transport that the caller didn't want
// If the sorting above is not stable that might mess up things
// temporary. But it should resolve itself eventually
if(skip != NULL) {
for(i = 0; i < cnt; i++) {
if(skip == t)
break;
}
off = i + 1;
} else {
off = 0;
}
for(i = 0; i < cnt; i++) {
error = TRANSPORT_NOSTART_NO_HARDWARE;
/* First, try all transports without stealing */
for(i = off; i < cnt; i++) {
t = vec[i];
if(t->tht_status == TRANSPORT_RUNNING)
return t;
if(!transport_start(t, 0, 0, 1))
if((r = transport_start(t, 0, 0)) == 0)
return t;
tvhlog(LOG_DEBUG, "Transport", "%s: Unable to use \"%s\" -- %s",
loginfo, transport_nicename(t), transport_nostart2txt(r));
}
/* Ok, nothing, try again, but supply our weight and thus, try to steal
transponders */
for(i = 0; i < cnt; i++) {
for(i = off; i < cnt; i++) {
t = vec[i];
if((r = transport_start(t, weight, 0, 1)) == 0)
if((r = transport_start(t, weight, 0)) == 0)
return t;
error = r;
if(loginfo != NULL)
@ -779,7 +748,8 @@ transport_data_timeout(void *aux)
pthread_mutex_lock(&t->tht_stream_mutex);
transport_set_streaming_status_flags(t, TSS_GRACEPERIOD);
if(!t->tht_streaming_status & TSS_PACKETS)
transport_set_streaming_status_flags(t, TSS_GRACEPERIOD);
pthread_mutex_unlock(&t->tht_stream_mutex);
}

View file

@ -29,17 +29,12 @@
#define TRANSPORT_NOSTART_TUNING_FAILED 4
#define TRANSPORT_NOSTART_SVC_NOT_ENABLED 5
#define TRANSPORT_NOSTART_BAD_SIGNAL 6
#define TRANSPORT_NOSTART_NO_ACCESS 7
#define TRANSPORT_NOSTART_NO_DESCRAMBLER 8
#define TRANSPORT_NOSTART_NO_INPUT 9
#define TRANSPORT_NOSTART_NO_SIGNAL 10
void transport_init(void);
unsigned int transport_compute_weight(struct th_transport_list *head);
int transport_start(th_transport_t *t, unsigned int weight, int force_start,
int wait_for_ok);
int transport_start(th_transport_t *t, unsigned int weight, int force_start);
th_transport_t *transport_create(const char *identifier, int type,
int source_type);
@ -53,7 +48,8 @@ th_transport_t *transport_find_by_identifier(const char *identifier);
void transport_map_channel(th_transport_t *t, channel_t *ch, int save);
th_transport_t *transport_find(channel_t *ch, unsigned int weight,
const char *loginfo, int *errorp);
const char *loginfo, int *errorp,
th_transport_t *skip);
th_stream_t *transport_stream_find(th_transport_t *t, int pid);