service_mapper: New service mapper code

This will now allow the mappings between services and channels to be created.
Some basic options have been created for the purpose of allowing certain
level of control of how the mapping is done.
This commit is contained in:
Adam Sutton 2013-08-03 13:39:55 +01:00
parent ca35906be5
commit d8973cba3e
9 changed files with 90 additions and 76 deletions

View file

@ -35,6 +35,26 @@
static int
psi_parse_pmt(mpegts_service_t *t, const uint8_t *ptr, int len);
/* **************************************************************************
* Lookup tables
* *************************************************************************/
static const int dvb_servicetype_map[][2] = {
{ 0x01, ST_SDTV }, /* SDTV (MPEG2) */
{ 0x02, ST_RADIO },
{ 0x11, ST_HDTV }, /* HDTV (MPEG2) */
{ 0x16, ST_SDTV }, /* Advanced codec SDTV */
{ 0x19, ST_HDTV }, /* Advanced codec HDTV */
{ 0x80, ST_SDTV }, /* NET POA - Cabo SDTV */
{ 0x91, ST_HDTV }, /* Bell TV HDTV */
{ 0x96, ST_SDTV }, /* Bell TV SDTV */
{ 0xA0, ST_HDTV }, /* Bell TV tiered HDTV */
{ 0xA4, ST_HDTV }, /* DN HDTV */
{ 0xA6, ST_HDTV }, /* Bell TV tiered HDTV */
{ 0xA8, ST_SDTV }, /* DN advanced SDTV */
{ 0xD3, ST_SDTV }, /* SKY TV SDTV */
};
/* **************************************************************************
* Descriptors
* *************************************************************************/
@ -843,8 +863,15 @@ dvb_sdt_callback
/* Update service type */
if (stype && s->s_dvb_servicetype != stype) {
int i;
s->s_dvb_servicetype = stype;
save = 1;
/* Set tvh service type */
for (i = 0; i < ARRAY_SIZE(dvb_servicetype_map); i++) {
if (dvb_servicetype_map[i][0] == stype)
s->s_servicetype = dvb_servicetype_map[i][1];
}
}
/* Update scrambled state */

View file

@ -91,7 +91,7 @@ const idclass_t mpegts_service_class =
},
{
.type = PT_U16,
.id = "servicetype",
.id = "dvb_servicetype",
.name = "Service Type",
.opts = PO_RDONLY,
.off = offsetof(mpegts_service_t, s_dvb_servicetype),

View file

@ -273,10 +273,9 @@ service_start(service_t *t, int instance)
* Main entry point for starting a service based on a channel
*/
service_instance_t *
service_find_instance(channel_t *ch, struct service_instance_list *sil,
service_find_instance(service_t *s, channel_t *ch, struct service_instance_list *sil,
int *error, int weight)
{
service_t *s;
channel_service_mapping_t *csm;
service_instance_t *si, *next;
@ -287,9 +286,13 @@ service_find_instance(channel_t *ch, struct service_instance_list *sil,
LIST_FOREACH(si, sil, si_link)
si->si_mark = 1;
LIST_FOREACH(csm, &ch->ch_services, csm_chn_link) {
s = csm->csm_svc;
if (!s->s_is_enabled(s)) continue;
if (ch) {
LIST_FOREACH(csm, &ch->ch_services, csm_chn_link) {
s = csm->csm_svc;
if (!s->s_is_enabled(s)) continue;
s->s_enlist(s, sil);
}
} else {
s->s_enlist(s, sil);
}

View file

@ -435,7 +435,8 @@ void service_ref(service_t *t);
service_t *service_find_by_identifier(const char *identifier);
service_instance_t *service_find_instance(struct channel *ch,
service_instance_t *service_find_instance(struct service *s,
struct channel *ch,
struct service_instance_list *sil,
int *error,
int weight);

View file

@ -30,6 +30,7 @@
#include "service_mapper.h"
#include "streaming.h"
#include "service.h"
#include "plumbing/tsfix.h"
static pthread_cond_t service_mapper_cond;
static struct service_queue service_mapper_queue;
@ -78,8 +79,6 @@ service_mapper_start ( const service_mapper_conf_t *conf )
/* Check each service */
TAILQ_FOREACH(s, &service_all, s_all_link) {
printf("name = %s\n", s->s_channel_name(s));
/* Disabled */
if (!s->s_is_enabled(s)) continue;
@ -91,7 +90,6 @@ printf("name = %s\n", s->s_channel_name(s));
e = service_is_encrypted(s);
tr = service_is_tv(s) || service_is_radio(s);
pthread_mutex_unlock(&s->s_stream_mutex);
printf(" tr = %d, e = %d\n", tr, e);
/* Skip non-TV / Radio */
if (!tr) continue;
@ -104,11 +102,11 @@ printf(" tr = %d, e = %d\n", tr, e);
if (!s->s_sm_onqueue) {
qd = 1;
TAILQ_INSERT_TAIL(&service_mapper_queue, s, s_sm_link);
s->s_sm_onqueue = 1;
}
/* Process */
} else {
printf(" map\n");
service_mapper_process(s);
}
}
@ -247,6 +245,7 @@ service_mapper_thread ( void *aux )
streaming_queue_t sq;
streaming_message_t *sm;
const char *err;
streaming_target_t *st;
streaming_queue_init(&sq, 0);
@ -262,6 +261,7 @@ service_mapper_thread ( void *aux )
}
pthread_cond_wait(&service_mapper_cond, &global_lock);
}
service_mapper_remove(s);
if (!working) {
working = 1;
@ -270,12 +270,12 @@ service_mapper_thread ( void *aux )
/* Subscribe */
tvhinfo("service_mapper", "%s: checking availability", s->s_nicename);
sub = subscription_create_from_service(s, "service_mapper", &sq.sq_st,
st = tsfix_create(&sq.sq_st);
sub = subscription_create_from_service(s, 2, "service_mapper", st,
0, NULL, NULL, "service_mapper");
/* Failed */
if (!sub) {
service_mapper_remove(s);
tvhinfo("service_mapper", "%s: could not subscribe", s->s_nicename);
continue;
}
@ -294,13 +294,13 @@ service_mapper_thread ( void *aux )
TAILQ_REMOVE(&sq.sq_queue, sm, sm_link);
pthread_mutex_unlock(&sq.sq_mutex);
if(sm->sm_type == SMT_SERVICE_STATUS) {
if(sm->sm_type == SMT_PACKET) {
run = 0;
err = NULL;
} else if (sm->sm_type == SMT_SERVICE_STATUS) {
int status = sm->sm_code;
if(status & TSS_PACKETS) {
run = 0;
err = NULL;
} else if(status & (TSS_GRACEPERIOD | TSS_ERRORS)) {
if(status & (TSS_GRACEPERIOD | TSS_ERRORS)) {
run = 0;
err = service_tss2text(status);
}
@ -315,6 +315,7 @@ service_mapper_thread ( void *aux )
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(sub);
tsfix_destroy(st);
if(err)
tvhinfo("service_mapper", "%s: failed %s", s->s_nicename, err);

View file

@ -164,10 +164,12 @@ subscription_reschedule(void)
subscription_reschedule_cb, NULL, 2);
LIST_FOREACH(s, &subscriptions, ths_global_link) {
#if 0
if(s->ths_channel == NULL)
continue; /* stale entry, channel has been destroyed */
#endif
if(s->ths_service != NULL) {
if(s->ths_service != NULL && s->ths_current_instance != NULL) {
/* Already got a service */
if(s->ths_state != SUBSCRIPTION_BAD_SERVICE)
@ -180,8 +182,11 @@ subscription_reschedule(void)
time(&si->si_error_time);
}
tvhtrace("subscription", "find service for %s weight %d", s->ths_channel->ch_name, s->ths_weight);
si = service_find_instance(s->ths_channel, &s->ths_instances, &error,
if (s->ths_channel)
tvhtrace("subscription", "find service for %s weight %d", s->ths_channel->ch_name, s->ths_weight);
else
tvhtrace("subscription", "find instance for %s weight %d", s->ths_service->s_nicename, s->ths_weight);
si = service_find_instance(s->ths_service, s->ths_channel, &s->ths_instances, &error,
s->ths_weight);
s->ths_current_instance = si;
@ -370,22 +375,26 @@ subscription_create(int weight, const char *name, streaming_target_t *st,
/**
*
*/
th_subscription_t *
subscription_create_from_channel(channel_t *ch, unsigned int weight,
const char *name, streaming_target_t *st,
int flags, const char *hostname,
const char *username, const char *client)
static th_subscription_t *
subscription_create_from_channel_or_service
(channel_t *ch, service_t *t, unsigned int weight,
const char *name, streaming_target_t *st,
int flags, const char *hostname,
const char *username, const char *client)
{
th_subscription_t *s;
assert(!ch || !t);
tvhtrace("subscription", "creating subscription for %s weight %d",
ch->ch_name, weight);
if (ch)
tvhtrace("subscription", "creating subscription for %s weight %d",
ch->ch_name, weight);
s = subscription_create(weight, name, st, flags, subscription_input,
hostname, username, client);
s->ths_channel = ch;
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
s->ths_service = NULL;
if (ch)
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
s->ths_service = t;
subscription_reschedule();
@ -393,7 +402,7 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight,
tvhlog(LOG_NOTICE, "subscription",
"No transponder available for subscription \"%s\" "
"to channel \"%s\"",
s->ths_title, ch->ch_name);
s->ths_title, ch ? ch->ch_name : "none");
} else {
source_info_t si;
@ -403,7 +412,7 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight,
"\"%s\" subscribing on \"%s\", weight: %d, adapter: \"%s\", "
"network: \"%s\", mux: \"%s\", provider: \"%s\", "
"service: \"%s\"",
s->ths_title, ch->ch_name, weight,
s->ths_title, ch ? ch->ch_name : "none", weight,
si.si_adapter ?: "<N/A>",
si.si_network ?: "<N/A>",
si.si_mux ?: "<N/A>",
@ -416,58 +425,30 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight,
return s;
}
th_subscription_t *
subscription_create_from_channel(channel_t *ch, unsigned int weight,
const char *name, streaming_target_t *st,
int flags, const char *hostname,
const char *username, const char *client)
{
return subscription_create_from_channel_or_service
(ch, NULL, weight, name, st, flags, hostname, username, client);
}
/**
*
*/
th_subscription_t *
subscription_create_from_service(service_t *t, const char *name,
subscription_create_from_service(service_t *t, unsigned int weight,
const char *name,
streaming_target_t *st, int flags,
const char *hostname, const char *username,
const char *client)
{
#if 0
th_subscription_t *s;
source_info_t si;
int r;
s = subscription_create(INT32_MAX, name, st, flags,
subscription_input_direct,
hostname, username, client);
if(t->s_status != SERVICE_RUNNING) {
if((r = service_start(t, INT32_MAX, 1)) != 0) {
subscription_unsubscribe(s);
tvhlog(LOG_INFO, "subscription",
"\"%s\" direct subscription failed -- %s", name,
streaming_code2txt(r));
return NULL;
}
}
t->s_setsourceinfo(t, &si);
tvhlog(LOG_INFO, "subscription",
"\"%s\" direct subscription to adapter: \"%s\", "
"network: \"%s\", mux: \"%s\", provider: \"%s\", "
"service: \"%s\"",
s->ths_title,
si.si_adapter ?: "<N/A>",
si.si_network ?: "<N/A>",
si.si_mux ?: "<N/A>",
si.si_provider ?: "<N/A>",
si.si_service ?: "<N/A>");
service_source_info_free(&si);
subscription_link_service(s, t);
notify_reload("subscriptions");
return s;
#endif
abort();
return subscription_create_from_channel_or_service
(NULL, t, weight, name, st, flags, hostname, username, client);
}
/**
*
*/
@ -546,7 +527,7 @@ subscription_dummy_join(const char *id, int first)
st = calloc(1, sizeof(streaming_target_t));
streaming_target_init(st, dummy_callback, NULL, 0);
subscription_create_from_service(t, "dummy", st, 0, NULL, NULL, "dummy");
subscription_create_from_service(t, 1, "dummy", st, 0, NULL, NULL, "dummy");
tvhlog(LOG_NOTICE, "subscription",
"Dummy join %s ok", id);

View file

@ -105,6 +105,7 @@ th_subscription_t *subscription_create_from_channel(struct channel *ch,
th_subscription_t *subscription_create_from_service(struct service *t,
unsigned int weight,
const char *name,
streaming_target_t *st,
int flags,

View file

@ -54,7 +54,7 @@ tvheadend.mapServices = function()
/* Form fields */
var availCheck = new Ext.form.Checkbox({
name : 'check_availbility',
name : 'check_availability',
fieldLabel : 'Check availability',
checked : false
});

View file

@ -591,7 +591,7 @@ http_stream_service(http_connection_t *hc, service_t *service)
}
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
s = subscription_create_from_service(service, "HTTP", st, flags,
s = subscription_create_from_service(service, 100, "HTTP", st, flags,
addrbuf,
hc->hc_username,
http_arg_get(&hc->hc_args, "User-Agent"));