diff --git a/src/subscriptions.c b/src/subscriptions.c index a3980e82..cd326e1f 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -61,38 +61,41 @@ subscription_sort(th_subscription_t *a, th_subscription_t *b) /** - * The transport is producing output. Thus, we may link our subscription - * to it. + * The transport is producing output. */ static void subscription_link_transport(th_subscription_t *s, th_transport_t *t) { streaming_message_t *sm; + s->ths_state = SUBSCRIPTION_TESTING_TRANSPORT; s->ths_transport = t; LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link); pthread_mutex_lock(&t->tht_stream_mutex); + if(LIST_FIRST(&t->tht_components) != NULL) + s->ths_start_message = + streaming_msg_create_data(SMT_START, transport_build_stream_start(t)); + // Link to transport output streaming_target_connect(&t->tht_streaming_pad, &s->ths_input); - if(LIST_FIRST(&t->tht_components) != NULL) { + + if(s->ths_start_message != NULL && t->tht_streaming_status & TSS_PACKETS) { + + s->ths_state = SUBSCRIPTION_GOT_TRANSPORT; // Send a START message to the subscription client - sm = streaming_msg_create_data(SMT_START, - transport_build_stream_start(t)); + streaming_target_deliver(s->ths_output, s->ths_start_message); + s->ths_start_message = NULL; + // Send status report + sm = streaming_msg_create_code(SMT_TRANSPORT_STATUS, + t->tht_streaming_status); streaming_target_deliver(s->ths_output, sm); - - // Send a TRANSPORT_STATUS message to the subscription client - if(t->tht_streaming_status) { - sm = streaming_msg_create_code(SMT_TRANSPORT_STATUS, - t->tht_streaming_status); - streaming_target_deliver(s->ths_output, sm); - } - } + pthread_mutex_unlock(&t->tht_stream_mutex); } @@ -131,7 +134,7 @@ static void subscription_reschedule(void *aux) { th_subscription_t *s; - th_transport_t *t; + th_transport_t *t, *skip; streaming_message_t *sm; char buf[128]; int errorcode; @@ -140,14 +143,22 @@ subscription_reschedule(void *aux) gtimer_arm(&subscription_reschedule_timer, subscription_reschedule, NULL, 2); LIST_FOREACH(s, &subscriptions, ths_global_link) { - if(s->ths_transport != NULL) - continue; /* Got a transport, we're happy */ - if(s->ths_channel == NULL) continue; /* stale entry, channel has been destroyed */ + if(s->ths_transport != NULL) { + /* Already got a transport */ + + if(s->ths_state != SUBSCRIPTION_BAD_TRANSPORT) + continue; /* And it seems to work ok, so we're happy */ + skip = s->ths_transport; + transport_remove_subscriber(s->ths_transport, s); + } else { + skip = NULL; + } + snprintf(buf, sizeof(buf), "Subscription \"%s\"", s->ths_title); - t = transport_find(s->ths_channel, s->ths_weight, buf, &errorcode); + t = transport_find(s->ths_channel, s->ths_weight, buf, &errorcode, skip); if(t == NULL) { /* No transport available */ @@ -185,6 +196,9 @@ subscription_unsubscribe(th_subscription_t *s) if(t != NULL) transport_remove_subscriber(t, s); + if(s->ths_start_message != NULL) + streaming_msg_free(s->ths_start_message); + free(s->ths_title); free(s); @@ -200,7 +214,52 @@ static void subscription_input(void *opauqe, streaming_message_t *sm) { th_subscription_t *s = opauqe; - streaming_target_deliver(s->ths_output, sm); + + if(s->ths_state == SUBSCRIPTION_TESTING_TRANSPORT) { + // We are just testing if this transport is good + + if(sm->sm_type == SMT_START) { + if(s->ths_start_message != NULL) + streaming_msg_free(s->ths_start_message); + s->ths_start_message = sm; + return; + } + + if(sm->sm_type == SMT_TRANSPORT_STATUS && + sm->sm_code & (TSS_GRACEPERIOD | TSS_ERRORS)) { + // No, mark our subscription as bad_transport + // the scheduler will take care of things + s->ths_state = SUBSCRIPTION_BAD_TRANSPORT; + streaming_msg_free(sm); + return; + } + + if(sm->sm_type == SMT_TRANSPORT_STATUS && + sm->sm_code & TSS_PACKETS) { + if(s->ths_start_message != NULL) { + streaming_target_deliver(s->ths_output, s->ths_start_message); + s->ths_start_message = NULL; + } + s->ths_state = SUBSCRIPTION_GOT_TRANSPORT; + } + } + + if(s->ths_state != SUBSCRIPTION_GOT_TRANSPORT) { + streaming_msg_free(sm); + return; + } + streaming_target_deliver(s->ths_output, sm); +} + + +/** + * + */ +static void +subscription_input_direct(void *opauqe, streaming_message_t *sm) +{ + th_subscription_t *s = opauqe; + streaming_target_deliver(s->ths_output, sm); } @@ -210,7 +269,7 @@ subscription_input(void *opauqe, streaming_message_t *sm) */ static th_subscription_t * subscription_create(int weight, const char *name, streaming_target_t *st, - int flags) + int flags, int direct) { th_subscription_t *s = calloc(1, sizeof(th_subscription_t)); int reject = 0; @@ -221,7 +280,8 @@ subscription_create(int weight, const char *name, streaming_target_t *st, reject |= SMT_TO_MASK(SMT_MPEGTS); // Reject raw mpegts - streaming_target_init(&s->ths_input, subscription_input, s, reject); + streaming_target_init(&s->ths_input, direct ? subscription_input_direct : + subscription_input, s, reject); s->ths_weight = weight; s->ths_title = strdup(name); @@ -244,7 +304,7 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight, const char *name, streaming_target_t *st, int flags) { - th_subscription_t *s = subscription_create(weight, name, st, flags); + th_subscription_t *s = subscription_create(weight, name, st, flags, 0); s->ths_channel = ch; LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link); @@ -287,12 +347,12 @@ th_subscription_t * subscription_create_from_transport(th_transport_t *t, const char *name, streaming_target_t *st, int flags) { - th_subscription_t *s = subscription_create(INT32_MAX, name, st, flags); + th_subscription_t *s = subscription_create(INT32_MAX, name, st, flags, 1); source_info_t si; int r; if(t->tht_status != TRANSPORT_RUNNING) { - if((r = transport_start(t, INT32_MAX, 1, 0)) != 0) { + if((r = transport_start(t, INT32_MAX, 1)) != 0) { subscription_unsubscribe(s); tvhlog(LOG_INFO, "subscription", diff --git a/src/subscriptions.h b/src/subscriptions.h index 8abf1dce..34b32120 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -25,6 +25,13 @@ typedef struct th_subscription { LIST_ENTRY(th_subscription) ths_global_link; int ths_weight; + enum { + SUBSCRIPTION_IDLE, + SUBSCRIPTION_TESTING_TRANSPORT, + SUBSCRIPTION_GOT_TRANSPORT, + SUBSCRIPTION_BAD_TRANSPORT, + } ths_state; + LIST_ENTRY(th_subscription) ths_channel_link; struct channel *ths_channel; /* May be NULL if channel has been destroyed during the @@ -44,6 +51,8 @@ typedef struct th_subscription { int ths_flags; + streaming_message_t *ths_start_message; + } th_subscription_t; diff --git a/src/transports.c b/src/transports.c index 55a45da2..71876a9a 100644 --- a/src/transports.c +++ b/src/transports.c @@ -45,7 +45,7 @@ #include "notify.h" #include "serviceprobe.h" #include "atomic.h" - +#include "dvb/dvb.h" #define TRANSPORT_HASH_WIDTH 101 @@ -61,16 +61,11 @@ transport_nostart2txt(int code) { switch(code) { case TRANSPORT_NOSTART_NO_HARDWARE: return "No hardware present"; - case TRANSPORT_NOSTART_MUX_NOT_ENABLED: return "No mux enabled"; + case TRANSPORT_NOSTART_MUX_NOT_ENABLED: return "Mux not enabled"; case TRANSPORT_NOSTART_NOT_FREE: return "Adapter in use by other subscription"; case TRANSPORT_NOSTART_TUNING_FAILED: return "Tuning failed"; case TRANSPORT_NOSTART_SVC_NOT_ENABLED: return "No service enabled"; case TRANSPORT_NOSTART_BAD_SIGNAL: return "Too bad signal quality"; - case TRANSPORT_NOSTART_NO_ACCESS: return "No access"; - case TRANSPORT_NOSTART_NO_DESCRAMBLER: return "No descrambler available"; - case TRANSPORT_NOSTART_NO_INPUT: return "No input detected"; - case TRANSPORT_NOSTART_NO_SIGNAL: return "No signal"; - } return "Unknown error"; } @@ -262,13 +257,10 @@ transport_remove_subscriber(th_transport_t *t, th_subscription_t *s) * */ int -transport_start(th_transport_t *t, unsigned int weight, int force_start, - int wait_for_ok) +transport_start(th_transport_t *t, unsigned int weight, int force_start) { th_stream_t *st; - int r, err; - int timeout = 2; - struct timespec to; + int r, timeout = 2; lock_assert(&global_lock); @@ -280,6 +272,9 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start, if((r = t->tht_start_feed(t, weight, force_start))) return r; + cwc_transport_start(t); + capmt_transport_start(t); + pthread_mutex_lock(&t->tht_stream_mutex); t->tht_status = TRANSPORT_RUNNING; @@ -292,61 +287,21 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start, pthread_mutex_unlock(&t->tht_stream_mutex); - cwc_transport_start(t); - capmt_transport_start(t); - if(t->tht_grace_period != NULL) timeout = t->tht_grace_period(t); - gtimer_arm(&t->tht_receive_timer, transport_data_timeout, t, timeout); - - if(!wait_for_ok) - return 0; - - tvhlog(LOG_DEBUG, "Transport", "%s: Waiting for input before start", - transport_nicename(t)); - - pthread_mutex_lock(&t->tht_stream_mutex); - - to.tv_sec = time(NULL) + timeout; - to.tv_nsec = 0; - - do { - if(t->tht_streaming_status & TSS_MUX_PACKETS) { - tvhlog(LOG_DEBUG, "Transport", "%s: Got demultiplexable packets", - transport_nicename(t)); - pthread_mutex_unlock(&t->tht_stream_mutex); - return 0; - } - - } while(pthread_cond_timedwait(&t->tht_tss_cond, &t->tht_stream_mutex, - &to) != ETIMEDOUT); - - err = t->tht_streaming_status; - - // Startup timed out - - pthread_mutex_unlock(&t->tht_stream_mutex); - - transport_stop(t); - - // Translate streaming status flags to NOSTART errorcode - if(err & TSS_NO_DESCRAMBLER) - return TRANSPORT_NOSTART_NO_DESCRAMBLER; - - if(err & TSS_NO_ACCESS) - return TRANSPORT_NOSTART_NO_ACCESS; - - if(err & TSS_NO_ACCESS) - return TRANSPORT_NOSTART_NO_ACCESS; - - if(err & TSS_INPUT_SERVICE) - return TRANSPORT_NOSTART_NO_INPUT; - - return TRANSPORT_NOSTART_NO_SIGNAL; + return 0; } +/** + * + */ +static int +dvb_extra_prio(th_dvb_adapter_t *tda) +{ + return tda->tda_hostconnection * 10; +} /** * Return prio for the given transport @@ -356,18 +311,17 @@ transport_get_prio(th_transport_t *t) { switch(t->tht_type) { case TRANSPORT_DVB: - if(t->tht_scrambled) - return 3; - return 1; + return (t->tht_scrambled ? 300 : 100) + + dvb_extra_prio(t->tht_dvb_mux_instance->tdmi_adapter); case TRANSPORT_IPTV: - return 2; + return 200; case TRANSPORT_V4L: - return 4; + return 400; default: - return 5; + return 500; } } @@ -414,10 +368,10 @@ transportcmp(const void *A, const void *B) */ th_transport_t * transport_find(channel_t *ch, unsigned int weight, const char *loginfo, - int *errorp) + int *errorp, th_transport_t *skip) { th_transport_t *t, **vec; - int cnt = 0, i, r; + int cnt = 0, i, r, off; int error = 0; lock_assert(&global_lock); @@ -457,23 +411,38 @@ transport_find(channel_t *ch, unsigned int weight, const char *loginfo, qsort(vec, cnt, sizeof(th_transport_t *), transportcmp); - /* First, try all transports without stealing */ + // Skip up to the transport that the caller didn't want + // If the sorting above is not stable that might mess up things + // temporary. But it should resolve itself eventually + if(skip != NULL) { + for(i = 0; i < cnt; i++) { + if(skip == t) + break; + } + off = i + 1; + } else { + off = 0; + } - for(i = 0; i < cnt; i++) { + error = TRANSPORT_NOSTART_NO_HARDWARE; + + /* First, try all transports without stealing */ + for(i = off; i < cnt; i++) { t = vec[i]; if(t->tht_status == TRANSPORT_RUNNING) return t; - - if(!transport_start(t, 0, 0, 1)) + if((r = transport_start(t, 0, 0)) == 0) return t; + tvhlog(LOG_DEBUG, "Transport", "%s: Unable to use \"%s\" -- %s", + loginfo, transport_nicename(t), transport_nostart2txt(r)); } /* Ok, nothing, try again, but supply our weight and thus, try to steal transponders */ - for(i = 0; i < cnt; i++) { + for(i = off; i < cnt; i++) { t = vec[i]; - if((r = transport_start(t, weight, 0, 1)) == 0) + if((r = transport_start(t, weight, 0)) == 0) return t; error = r; if(loginfo != NULL) @@ -779,7 +748,8 @@ transport_data_timeout(void *aux) pthread_mutex_lock(&t->tht_stream_mutex); - transport_set_streaming_status_flags(t, TSS_GRACEPERIOD); + if(!t->tht_streaming_status & TSS_PACKETS) + transport_set_streaming_status_flags(t, TSS_GRACEPERIOD); pthread_mutex_unlock(&t->tht_stream_mutex); } diff --git a/src/transports.h b/src/transports.h index 8e27b7c8..0876d3fd 100644 --- a/src/transports.h +++ b/src/transports.h @@ -29,17 +29,12 @@ #define TRANSPORT_NOSTART_TUNING_FAILED 4 #define TRANSPORT_NOSTART_SVC_NOT_ENABLED 5 #define TRANSPORT_NOSTART_BAD_SIGNAL 6 -#define TRANSPORT_NOSTART_NO_ACCESS 7 -#define TRANSPORT_NOSTART_NO_DESCRAMBLER 8 -#define TRANSPORT_NOSTART_NO_INPUT 9 -#define TRANSPORT_NOSTART_NO_SIGNAL 10 void transport_init(void); unsigned int transport_compute_weight(struct th_transport_list *head); -int transport_start(th_transport_t *t, unsigned int weight, int force_start, - int wait_for_ok); +int transport_start(th_transport_t *t, unsigned int weight, int force_start); th_transport_t *transport_create(const char *identifier, int type, int source_type); @@ -53,7 +48,8 @@ th_transport_t *transport_find_by_identifier(const char *identifier); void transport_map_channel(th_transport_t *t, channel_t *ch, int save); th_transport_t *transport_find(channel_t *ch, unsigned int weight, - const char *loginfo, int *errorp); + const char *loginfo, int *errorp, + th_transport_t *skip); th_stream_t *transport_stream_find(th_transport_t *t, int pid);