Make serviceprobing work again.

This commit is contained in:
Andreas Öman 2008-09-05 22:02:22 +00:00
parent 88310ee267
commit e560a859ff
14 changed files with 415 additions and 296 deletions

View file

@ -10,6 +10,8 @@ VPATH += dvb
SRCS += dvb.c dvb_support.c dvb_fe.c dvb_tables.c \
diseqc.c dvb_adapter.c dvb_multiplex.c dvb_transport.c dvb_preconf.c
SRCS += serviceprobe.c
SRCS += cwc.c krypt.c
VPATH += ffdecsa
SRCS += FFdecsa.c

View file

@ -338,7 +338,7 @@ channel_delete(channel_t *ch)
tvhlog(LOG_NOTICE, "channels", "Channel \"%s\" deleted",
ch->ch_name);
abort();//pvr_destroy_by_channel(ch);
fprintf(stderr, "!!!!!//pvr_destroy_by_channel(ch);\n");
while((t = LIST_FIRST(&ch->ch_transports)) != NULL) {
transport_unmap_channel(t);
@ -352,7 +352,7 @@ channel_delete(channel_t *ch)
epg_destroy_by_channel(ch);
abort();//autorec_destroy_by_channel(ch);
fprintf(stderr, "!!!!!//autorec_destroy_by_channel(ch);\n");
hts_settings_remove("channels/%d", ch->ch_id);
@ -388,7 +388,9 @@ channel_merge(channel_t *dst, channel_t *src)
while((t = LIST_FIRST(&src->ch_transports)) != NULL) {
transport_unmap_channel(t);
transport_map_channel(t, dst);
pthread_mutex_lock(&t->tht_stream_mutex);
t->tht_config_change(t);
pthread_mutex_unlock(&t->tht_stream_mutex);
}
channel_delete(src);

View file

@ -508,7 +508,9 @@ dvb_sdt_callback(th_dvb_mux_instance_t *tdmi, uint8_t *ptr, int len,
free((void *)t->tht_svcname);
t->tht_svcname = strdup(chname);
pthread_mutex_lock(&t->tht_stream_mutex);
t->tht_config_change(t);
pthread_mutex_unlock(&t->tht_stream_mutex);
}
if(t->tht_chname == NULL)
@ -790,6 +792,7 @@ dvb_pmt_callback(th_dvb_mux_instance_t *tdmi, uint8_t *ptr, int len,
uint8_t tableid, void *opaque)
{
th_transport_t *t = opaque;
pthread_mutex_lock(&t->tht_stream_mutex);
psi_parse_pmt(t, ptr, len, 1);
pthread_mutex_unlock(&t->tht_stream_mutex);

8
main.c
View file

@ -44,6 +44,8 @@
#include "dvb/dvb.h"
#include "xmltv.h"
#include "spawn.h"
#include "subscriptions.h"
#include "serviceprobe.h"
#include <libhts/htsparachute.h>
#include <libhts/htssettings.h>
@ -247,6 +249,8 @@ main(int argc, char **argv)
/**
* Initialize subsystems
*/
av_register_all();
xmltv_init(); /* Must be initialized before channels */
channels_init();
@ -261,6 +265,10 @@ main(int argc, char **argv)
webui_init();
subscriptions_init();
serviceprobe_init();
pthread_mutex_unlock(&global_lock);

View file

@ -103,9 +103,19 @@ void
parse_raw_mpeg(th_transport_t *t, th_stream_t *st, uint8_t *data,
int len, int start, int err)
{
if(LIST_FIRST(&t->tht_muxers) == NULL)
return; /* No muxers will take packet, so drop here */
th_subscription_t *s;
if(LIST_FIRST(&t->tht_muxers) == NULL) {
/* No muxers. However, subscriptions may force demultiplex
for other reasons (serviceprobe does this) */
LIST_FOREACH(s, &t->tht_subscriptions, ths_transport_link)
if(s->ths_force_demux)
break;
if(s == NULL)
return; /* No-one is interested, so drop here */
}
switch(st->st_type) {
case HTSTV_MPEG2VIDEO:
@ -840,7 +850,7 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
/**
* Input is ok
*/
transport_signal_status(t, TRANSPORT_STATUS_OK);
transport_signal_status(t, SUBSCRIPTION_VALID_PACKETS);
/* Alert all muxers tied to us that a new packet has arrived */

3
psi.c
View file

@ -562,8 +562,8 @@ psi_save_transport_settings(htsmsg_t *m, th_transport_t *t)
htsmsg_add_u32(m, "disabled", !!t->tht_disabled);
lock_assert(&t->tht_stream_mutex);
pthread_mutex_lock(&t->tht_stream_mutex);
LIST_FOREACH(st, &t->tht_streams, st_link) {
sub = htsmsg_create();
@ -581,7 +581,6 @@ psi_save_transport_settings(htsmsg_t *m, th_transport_t *t)
htsmsg_add_msg(m, "stream", sub);
}
pthread_mutex_unlock(&t->tht_stream_mutex);
}

View file

@ -22,230 +22,159 @@
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <fcntl.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include "tvhead.h"
#include "iptv_output.h"
#include "dispatch.h"
#include "channels.h"
#include "subscriptions.h"
#include "serviceprobe.h"
#include "transports.h"
#include "mux.h"
static void serviceprobe_engage(void);
TAILQ_HEAD(sp_queue, sp);
static struct sp_queue probequeue;
static struct sp *sp_current;
static dtimer_t sp_engage_timer;
typedef struct sp {
TAILQ_ENTRY(sp) sp_link;
th_muxer_t *sp_muxer;
th_transport_t *sp_t;
dtimer_t sp_timer;
th_subscription_t *sp_s;
} sp_t;
static void
sp_done_callback(void *aux, int64_t now)
{
sp_t *sp = aux;
th_subscription_t *s = sp->sp_s;
if(s != NULL) {
assert(sp == sp_current);
sp_current = NULL;
subscription_unsubscribe(s);
}
serviceprobe_engage();
sp->sp_t->tht_sp = NULL;
TAILQ_REMOVE(&probequeue, sp, sp_link);
free(sp);
}
/* List of transports to be probed, protected with global_lock */
static struct th_transport_queue serviceprobe_queue;
static pthread_cond_t serviceprobe_cond;
/**
* Got a packet, map it
*
*/
static void
sp_packet_input(void *opaque, th_muxstream_t *tms, th_pkt_t *pkt)
void
serviceprobe_enqueue(th_transport_t *t)
{
sp_t *sp = opaque;
th_transport_t *t = sp->sp_t;
channel_t *ch;
if(!transport_is_tv(t))
return; /* Don't even consider non-tv channels */
tvhlog(LOG_INFO, "serviceprobe", "Probed \"%s\" -- Ok", t->tht_svcname);
if(t->tht_ch == NULL && t->tht_svcname != NULL) {
ch = channel_find_by_name(t->tht_svcname, 1);
transport_map_channel(t, ch);
t->tht_config_change(t);
}
dtimer_arm(&sp->sp_timer, sp_done_callback, sp, 0);
}
/**
* Callback when transport changes status
*/
static void
sp_status_callback(struct th_subscription *s, int status, void *opaque)
{
sp_t *sp = opaque;
th_transport_t *t = sp->sp_t;
char *errtxt;
s->ths_status_callback = NULL;
switch(status) {
case TRANSPORT_STATUS_OK:
if(t->tht_sp_onqueue)
return;
case TRANSPORT_STATUS_NO_DESCRAMBLER:
errtxt = "No descrambler for stream";
break;
case TRANSPORT_STATUS_NO_ACCESS:
errtxt = "Access denied";
break;
case TRANSPORT_STATUS_NO_INPUT:
errtxt = "No video detected";
break;
default:
errtxt = "Other error";
break;
}
tvhlog(LOG_INFO, "serviceprobe",
"Probed \"%s\" -- %s", t->tht_svcname, errtxt);
dtimer_arm(&sp->sp_timer, sp_done_callback, sp, 0);
t->tht_sp_onqueue = 1;
TAILQ_INSERT_TAIL(&serviceprobe_queue, t, tht_sp_link);
pthread_cond_signal(&serviceprobe_cond);
}
/**
* Called when a subscription gets/loses access to a transport
*
*/
static void
sp_subscription_callback(struct th_subscription *s,
subscription_event_t event, void *opaque)
serviceprobe_callback(struct th_subscription *s, subscription_event_t event,
void *opaque)
{
sp_t *sp = opaque;
th_transport_t *t = opaque;
channel_t *ch;
const char *errmsg;
switch(event) {
case TRANSPORT_AVAILABLE:
case SUBSCRIPTION_TRANSPORT_RUN:
return;
case SUBSCRIPTION_NO_INPUT:
errmsg = "No input detected";
break;
case TRANSPORT_UNAVAILABLE:
muxer_deinit(sp->sp_muxer, s);
case SUBSCRIPTION_NO_DESCRAMBLER:
errmsg = "No descrambler available";
break;
case SUBSCRIPTION_NO_ACCESS:
errmsg = "Access denied";
break;
case SUBSCRIPTION_RAW_INPUT:
errmsg = "Unable to reassemble packets from input";
break;
case SUBSCRIPTION_VALID_PACKETS:
errmsg = NULL; /* All OK */
break;
case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE:
case SUBSCRIPTION_TRANSPORT_LOST:
errmsg = "Unable to probe";
break;
case SUBSCRIPTION_DESTROYED:
return; /* All done */
default:
abort();
}
assert(t == TAILQ_FIRST(&serviceprobe_queue));
if(errmsg != NULL) {
tvhlog(LOG_INFO, "serviceprobe", "%20s: skipped: %s",
t->tht_svcname, errmsg);
} else {
ch = channel_find_by_name(t->tht_svcname, 1);
transport_map_channel(t, ch);
pthread_mutex_lock(&t->tht_stream_mutex);
t->tht_config_change(t);
pthread_mutex_unlock(&t->tht_stream_mutex);
tvhlog(LOG_INFO, "serviceprobe", "\"%s\" mapped to channel \"%s\"",
t->tht_svcname, t->tht_svcname);
}
t->tht_sp_onqueue = 0;
TAILQ_REMOVE(&serviceprobe_queue, t, tht_sp_link);
pthread_cond_signal(&serviceprobe_cond);
}
/**
* Setup IPTV (TS over UDP) output
*
*/
static void
serviceprobe_start(void *aux, int64_t now)
static void *
serviceprobe_thread(void *aux)
{
th_subscription_t *s;
th_muxer_t *tm;
th_transport_t *t;
sp_t *sp;
th_subscription_t *s;
int was_doing_work = 0;
assert(sp_current == NULL);
pthread_mutex_lock(&global_lock);
if((sp = TAILQ_FIRST(&probequeue)) == NULL) {
tvhlog(LOG_NOTICE, "serviceprobe", "Nothing more to probe");
return;
while(1) {
while((t = TAILQ_FIRST(&serviceprobe_queue)) == NULL) {
if(was_doing_work) {
tvhlog(LOG_INFO, "serviceprobe", "Now idle");
was_doing_work = 0;
}
pthread_cond_wait(&serviceprobe_cond, &global_lock);
}
if(!was_doing_work) {
tvhlog(LOG_INFO, "serviceprobe", "Starting");
}
s = subscription_create_from_transport(t, "serviceprobe",
serviceprobe_callback, t);
s->ths_force_demux = 1;
/* Wait for something to happen */
while(TAILQ_FIRST(&serviceprobe_queue) == t)
pthread_cond_wait(&serviceprobe_cond, &global_lock);
subscription_unsubscribe(s);
was_doing_work = 1;
}
s = sp->sp_s = calloc(1, sizeof(th_subscription_t));
t = sp->sp_t;
sp_current = sp;
s->ths_title = strdup("probe");
s->ths_weight = INT32_MAX;
s->ths_opaque = sp;
s->ths_callback = sp_subscription_callback;
LIST_INSERT_HEAD(&subscriptions, s, ths_global_link);
if(t->tht_runstatus != TRANSPORT_RUNNING)
transport_start(t, INT32_MAX, 1);
s->ths_transport = t;
LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link);
sp->sp_muxer = tm = muxer_init(s, sp_packet_input, sp);
muxer_play(tm, AV_NOPTS_VALUE);
s->ths_status_callback = sp_status_callback;
}
/**
*
*/
static void
serviceprobe_engage(void)
{
dtimer_arm(&sp_engage_timer, serviceprobe_start, NULL, 0);
}
/**
*
*/
void
serviceprobe_add(th_transport_t *t)
{
sp_t *sp;
if(!transport_is_tv(t))
return;
if(t->tht_sp != NULL)
return;
sp = calloc(1, sizeof(sp_t));
TAILQ_INSERT_TAIL(&probequeue, sp, sp_link);
t->tht_sp = sp;
sp->sp_t = t;
if(sp_current == NULL)
serviceprobe_engage();
}
/**
*
*/
void
serviceprobe_delete(th_transport_t *t)
{
if(t->tht_sp == NULL)
return;
sp_done_callback(t->tht_sp, 0);
}
void
serviceprobe_setup(void)
serviceprobe_init(void)
{
TAILQ_INIT(&probequeue);
pthread_t ptid;
pthread_cond_init(&serviceprobe_cond, NULL);
TAILQ_INIT(&serviceprobe_queue);
pthread_create(&ptid, NULL, serviceprobe_thread, NULL);
}

View file

@ -16,13 +16,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef SERVICE_PROBE_H_
#define SERVICE_PROBE_H_
#ifndef SERVICEPROBE_H_
#define SERVICEPROBE_H_
void serviceprobe_setup(void);
void serviceprobe_init(void);
void serviceprobe_add(th_transport_t *t);
void serviceprobe_enqueue(th_transport_t *t);
void serviceprobe_delete(th_transport_t *t);
#endif /* SERVICE_PROBE_H_ */
#endif /* SERVICEPROBE_H_ */

View file

@ -37,6 +37,10 @@
#include "subscriptions.h"
struct th_subscription_list subscriptions;
static pthread_cond_t subscription_janitor_cond;
static pthread_mutex_t subscription_janitor_mutex;
static int subscription_janitor_work;
static gtimer_t subscription_reschedule_timer;
static int
@ -48,14 +52,31 @@ subscription_sort(th_subscription_t *a, th_subscription_t *b)
/**
*
*/
void
subscription_reschedule(void)
static void
subscription_link_transport(th_subscription_t *s, th_transport_t *t)
{
s->ths_transport = t;
LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link);
s->ths_event_callback(s, SUBSCRIPTION_TRANSPORT_RUN, s->ths_opaque);
s->ths_last_status = t->tht_last_status;
if(s->ths_last_status != SUBSCRIPTION_EVENT_INVALID)
s->ths_event_callback(s, s->ths_last_status, s->ths_opaque);
}
/**
*
*/
static void
subscription_reschedule(void *aux)
{
th_subscription_t *s;
th_transport_t *t;
lock_assert(&global_lock);
gtimer_arm(&subscription_reschedule_timer, subscription_reschedule, NULL, 2);
LIST_FOREACH(s, &subscriptions, ths_global_link) {
if(s->ths_transport != NULL)
continue; /* Got a transport, we're happy */
@ -65,12 +86,14 @@ subscription_reschedule(void)
t = transport_find(s->ths_channel, s->ths_weight);
if(t == NULL)
if(t == NULL) {
/* No transport available */
s->ths_event_callback(s, SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE,
s->ths_opaque);
continue;
}
s->ths_transport = t;
LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link);
s->ths_callback(s, TRANSPORT_AVAILABLE, s->ths_opaque);
subscription_link_transport(s, t);
}
}
@ -95,7 +118,28 @@ subscription_unsubscribe(th_subscription_t *s)
free(s->ths_title);
free(s);
subscription_reschedule();
subscription_reschedule(NULL);
}
/**
*
*/
static th_subscription_t *
subscription_create(int weight, const char *name,
ths_event_callback_t *cb, void *opaque)
{
th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
s->ths_weight = weight;
s->ths_event_callback = cb;
s->ths_opaque = opaque;
s->ths_title = strdup(name);
s->ths_total_err = 0;
time(&s->ths_start);
LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort);
return s;
}
@ -103,28 +147,17 @@ subscription_unsubscribe(th_subscription_t *s)
*
*/
th_subscription_t *
subscription_create(channel_t *ch, unsigned int weight, const char *name,
subscription_callback_t *cb, void *opaque, uint32_t u32)
subscription_create_from_channel(channel_t *ch,
unsigned int weight, const char *name,
ths_event_callback_t *cb, void *opaque)
{
th_subscription_t *s;
s = calloc(1, sizeof(th_subscription_t));
s->ths_callback = cb;
s->ths_opaque = opaque;
s->ths_title = strdup(name);
s->ths_total_err = 0;
s->ths_weight = weight;
s->ths_u32 = u32;
time(&s->ths_start);
LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort);
th_subscription_t *s = subscription_create(weight, name, cb, opaque);
s->ths_channel = ch;
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
s->ths_transport = NULL;
subscription_reschedule();
subscription_reschedule(NULL);
if(s->ths_transport == NULL)
tvhlog(LOG_NOTICE, "subscription",
@ -136,12 +169,90 @@ subscription_create(channel_t *ch, unsigned int weight, const char *name,
}
/**
*
*/
th_subscription_t *
subscription_create_from_transport(th_transport_t *t, const char *name,
ths_event_callback_t *cb, void *opaque)
{
th_subscription_t *s = subscription_create(INT32_MAX, name, cb, opaque);
if(t->tht_runstatus != TRANSPORT_RUNNING)
transport_start(t, INT32_MAX, 1);
subscription_link_transport(s, t);
return s;
}
/**
*
*/
void
subscription_janitor_has_duty(void)
{
pthread_mutex_lock(&subscription_janitor_mutex);
subscription_janitor_work++;
pthread_cond_signal(&subscription_janitor_cond);
pthread_mutex_unlock(&subscription_janitor_mutex);
}
/**
*
*/
static void *
subscription_janitor(void *aux)
{
int v;
th_subscription_t *s;
th_transport_t *t;
pthread_mutex_lock(&subscription_janitor_mutex);
v = subscription_janitor_work;
while(1) {
while(v == subscription_janitor_work)
pthread_cond_wait(&subscription_janitor_cond,
&subscription_janitor_mutex);
v = subscription_janitor_work;
pthread_mutex_unlock(&subscription_janitor_mutex);
pthread_mutex_lock(&global_lock);
LIST_FOREACH(s, &subscriptions, ths_global_link) {
if((t = s->ths_transport) == NULL)
continue;
if(s->ths_last_status != t->tht_last_status) {
s->ths_last_status = t->tht_last_status;
s->ths_event_callback(s, s->ths_last_status, s->ths_opaque);
}
}
pthread_mutex_unlock(&global_lock);
}
}
/**
*
*/
void
subscriptions_init(void)
{
pthread_t ptid;
pthread_cond_init(&subscription_janitor_cond, NULL);
pthread_mutex_init(&subscription_janitor_mutex, NULL);
pthread_create(&ptid, NULL, subscription_janitor, NULL);
}

View file

@ -19,31 +19,14 @@
#ifndef SUBSCRIPTIONS_H
#define SUBSCRIPTIONS_H
typedef void (ths_event_callback_t)(struct th_subscription *s,
subscription_event_t event,
void *opaque);
/*
* Subscription
*/
typedef enum {
TRANSPORT_AVAILABLE,
TRANSPORT_UNAVAILABLE,
} subscription_event_t;
typedef void (subscription_callback_t)(struct th_subscription *s,
subscription_event_t event,
void *opaque);
typedef void (subscription_raw_input_t)(struct th_subscription *s,
void *data, int len,
th_stream_t *st,
void *opaque);
typedef void (subscription_status_callback_t)(struct th_subscription *s,
int status,
void *opaque);
typedef void (ths_raw_input_t)(struct th_subscription *s,
void *data, int len,
th_stream_t *st,
void *opaque);
typedef struct th_subscription {
LIST_ENTRY(th_subscription) ths_global_link;
@ -61,19 +44,17 @@ typedef struct th_subscription {
LIST_ENTRY(th_subscription) ths_subscriber_link; /* Caller is responsible
for this link */
void *ths_opaque;
char *ths_title; /* display title */
time_t ths_start; /* time when subscription started */
int ths_total_err; /* total errors during entire subscription */
subscription_callback_t *ths_callback;
void *ths_opaque;
uint32_t ths_u32;
int ths_last_status;
subscription_raw_input_t *ths_raw_input;
ths_event_callback_t *ths_event_callback;
ths_raw_input_t *ths_raw_input;
th_muxer_t *ths_muxer;
subscription_status_callback_t *ths_status_callback;
int ths_force_demux;
} th_subscription_t;
@ -85,16 +66,22 @@ void subscription_unsubscribe(th_subscription_t *s);
void subscription_set_weight(th_subscription_t *s, unsigned int weight);
th_subscription_t *subscription_create(channel_t *ch, unsigned int weight,
const char *name,
subscription_callback_t *cb,
void *opaque,
uint32_t u32);
th_subscription_t *subscription_create_from_channel(channel_t *ch,
unsigned int weight,
const char *name,
ths_event_callback_t *cb,
void *opaque);
th_subscription_t *subscription_create_from_transport(th_transport_t *t,
const char *name,
ths_event_callback_t *cb,
void *opaque);
void subscriptions_init(void);
void subscription_stop(th_subscription_t *s);
void subscription_reschedule(void);
void subscription_janitor_has_duty(void);
#endif /* SUBSCRIPTIONS_H */

View file

@ -49,7 +49,7 @@
static struct th_transport_list transporthash[TRANSPORT_HASH_WIDTH];
//static void transport_data_timeout(void *aux, int64_t now);
static void transport_data_timeout(void *aux);
//static dtimer_t transport_monitor_timer;
@ -67,7 +67,7 @@ transport_stop(th_transport_t *t)
th_stream_t *st;
th_pkt_t *pkt;
// dtimer_disarm(&t->tht_receive_timer);
gtimer_disarm(&t->tht_receive_timer);
// dtimer_disarm(&transport_monitor_timer, transport_monitor, t, 1);
@ -79,6 +79,7 @@ transport_stop(th_transport_t *t)
t->tht_tt_commercial_advice = COMMERCIAL_UNKNOWN;
assert(LIST_FIRST(&t->tht_muxers) == NULL);
assert(LIST_FIRST(&t->tht_subscriptions) == NULL);
pthread_mutex_lock(&t->tht_stream_mutex);
@ -145,9 +146,9 @@ transport_stop(th_transport_t *t)
*
*/
static void
remove_subscriber(th_subscription_t *s)
remove_subscriber(th_subscription_t *s, subscription_event_t reason)
{
s->ths_callback(s, TRANSPORT_UNAVAILABLE, s->ths_opaque);
s->ths_event_callback(s, reason, s->ths_opaque);
LIST_REMOVE(s, ths_transport_link);
s->ths_transport = NULL;
}
@ -166,9 +167,9 @@ transport_remove_subscriber(th_transport_t *t, th_subscription_t *s)
if(s == NULL) {
while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL)
remove_subscriber(s);
remove_subscriber(s, SUBSCRIPTION_TRANSPORT_LOST);
} else {
remove_subscriber(s);
remove_subscriber(s, SUBSCRIPTION_DESTROYED);
}
if(LIST_FIRST(&t->tht_subscriptions) == NULL)
@ -262,6 +263,7 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start)
assert(st->st_ctx == NULL);
assert(st->st_parser == NULL);
if(id != CODEC_ID_NONE) {
c = avcodec_find_decoder(id);
if(c != NULL) {
@ -273,8 +275,10 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start)
}
// cwc_transport_start(t);
// dtimer_arm(&t->tht_receive_timer, transport_data_timeout, t, 4);
transport_signal_status(t, TRANSPORT_STATUS_STARTING);
t->tht_packets = 0;
gtimer_arm(&t->tht_receive_timer, transport_data_timeout, t, 4);
t->tht_last_status = SUBSCRIPTION_EVENT_INVALID;
return 0;
}
@ -441,7 +445,7 @@ transport_destroy(th_transport_t *t)
lock_assert(&global_lock);
while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL)
remove_subscriber(s);
remove_subscriber(s, SUBSCRIPTION_TRANSPORT_LOST);
//dtimer_disarm(&t->tht_receive_timer);
@ -593,11 +597,25 @@ transport_unmap_channel(th_transport_t *t)
}
/**
*
*/
static void
transport_data_timeout(void *aux)
{
th_transport_t *t = aux;
if(t->tht_last_status)
return; /* Something has happend so we don't have to update */
transport_signal_status(t, t->tht_packets ? SUBSCRIPTION_RAW_INPUT :
SUBSCRIPTION_NO_INPUT);
}
/**
*
*/
static struct strtab stypetab[] = {
{ "SDTV", ST_SDTV },
{ "Radio", ST_RADIO },
@ -644,7 +662,7 @@ transport_signal_status(th_transport_t *t, int newstatus)
return;
t->tht_last_status = newstatus;
//notify_transprot_status_change(t);
subscription_janitor_has_duty();
}
@ -652,18 +670,16 @@ transport_signal_status(th_transport_t *t, int newstatus)
* Table for status -> text conversion
*/
static struct strtab transportstatustab[] = {
{ "Unknown", TRANSPORT_STATUS_UNKNOWN },
{ "Starting", TRANSPORT_STATUS_STARTING },
{ "Ok", TRANSPORT_STATUS_OK },
{ "No input", TRANSPORT_STATUS_NO_INPUT },
{ "No descrambler", TRANSPORT_STATUS_NO_DESCRAMBLER },
{ "No access", TRANSPORT_STATUS_NO_ACCESS },
{ "Mux error", TRANSPORT_STATUS_MUX_ERROR },
{ "Ok", SUBSCRIPTION_VALID_PACKETS },
{ "No input", SUBSCRIPTION_NO_INPUT },
{ "No descrambler", SUBSCRIPTION_NO_DESCRAMBLER },
{ "No access", SUBSCRIPTION_NO_ACCESS },
};
const char *
transport_status_to_text(int status)
{
return val2str(status, transportstatustab) ?: "Invalid";
return val2str(status, transportstatustab) ?: "Unknown";
}

View file

@ -205,6 +205,10 @@ ts_recv_packet1(th_transport_t *t, uint8_t *tsb)
if(st == NULL)
return;
t->tht_packets = 1;
pthread_mutex_lock(&t->tht_stream_mutex);
avgstat_add(&t->tht_rate, 188, dispatch_clock);
/* Extract PCR */
@ -220,22 +224,24 @@ ts_recv_packet1(th_transport_t *t, uint8_t *tsb)
n++;
r = td->td_descramble(td, t, st, tsb);
if(r == 0)
if(r == 0) {
pthread_mutex_unlock(&t->tht_stream_mutex);
return;
}
if(r == 1)
m++;
}
if(n == 0) {
transport_signal_status(t, TRANSPORT_STATUS_NO_DESCRAMBLER);
transport_signal_status(t, SUBSCRIPTION_NO_DESCRAMBLER);
} else if(m == n) {
transport_signal_status(t, TRANSPORT_STATUS_NO_ACCESS);
transport_signal_status(t, SUBSCRIPTION_NO_ACCESS);
}
return;
} else {
ts_recv_packet0(t, st, tsb);
}
ts_recv_packet0(t, st, tsb);
pthread_mutex_unlock(&t->tht_stream_mutex);
}

View file

@ -357,7 +357,48 @@ typedef struct th_stream {
/*
/**
* Transport events, these are sent to subscribers via
* s->ths_event_callback
*/
typedef enum {
SUBSCRIPTION_EVENT_INVALID = 0, /* mbz */
/** Transport is receiving data from source */
SUBSCRIPTION_TRANSPORT_RUN,
/** No input is received from source */
SUBSCRIPTION_NO_INPUT,
/** No descrambler is able to decrypt the stream */
SUBSCRIPTION_NO_DESCRAMBLER,
/** Potential descrambler is available, but access is denied */
SUBSCRIPTION_NO_ACCESS,
/** Raw input seen but nothing has really been decoded */
SUBSCRIPTION_RAW_INPUT,
/** Packet are being parsed. Only signalled if at least one muxer is
registerd */
SUBSCRIPTION_VALID_PACKETS,
/** No transport is available for delivering subscription */
SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE,
/** Transport no longer runs, it was needed by someone with higher
priority */
SUBSCRIPTION_TRANSPORT_LOST,
/** Subscription destroyed */
SUBSCRIPTION_DESTROYED,
} subscription_event_t;
/**
* A Transport (or in MPEG TS terms: a 'service')
*/
typedef struct th_transport {
@ -517,14 +558,23 @@ typedef struct th_transport {
*/
int tht_last_status;
#define TRANSPORT_STATUS_UNKNOWN 0
#define TRANSPORT_STATUS_STARTING 1
#define TRANSPORT_STATUS_OK 2
#define TRANSPORT_STATUS_NO_INPUT 3
#define TRANSPORT_STATUS_NO_DESCRAMBLER 4
#define TRANSPORT_STATUS_NO_ACCESS 5
#define TRANSPORT_STATUS_MUX_ERROR 6
/**
* Service probe, see serviceprobe.c for details
*/
int tht_sp_onqueue;
TAILQ_ENTRY(th_transport) tht_sp_link;
/**
* Timer which is armed at transport start. Once it fires
* it will check if any packets has been parsed. If not the status
* will be set to SUBSCRIPTION_NO_INPUT
*/
gtimer_t tht_receive_timer;
/**
* Set as soon as we get some kind of activity
*/
int tht_packets;
/*********************************************************
*
@ -776,8 +826,6 @@ typedef struct th_muxer {
struct th_muxstream_list tm_streams;
struct th_subscription *tm_subscription;
th_mux_output_t *tm_output;
void *tm_opaque;

View file

@ -392,8 +392,8 @@ extjs_dvbadapter(http_connection_t *hc, const char *remain, void *opaque)
const char *s = http_arg_get(&hc->hc_req_args, "adapterId");
const char *op = http_arg_get(&hc->hc_req_args, "op");
th_dvb_adapter_t *tda = s ? dvb_adapter_find_by_identifier(s) : NULL;
//th_dvb_mux_instance_t *tdmi;
// th_transport_t *t;
th_dvb_mux_instance_t *tdmi;
th_transport_t *t;
htsmsg_t *r, *out;
@ -433,13 +433,13 @@ extjs_dvbadapter(http_connection_t *hc, const char *remain, void *opaque)
tvhlog(LOG_NOTICE, "web interface",
"Service probe started on \"%s\"", tda->tda_displayname);
#if 0
RB_FOREACH(tdmi, &tda->tda_muxes, tdmi_adapter_link) {
LIST_FOREACH(t, &tdmi->tdmi_transports, tht_mux_link) {
serviceprobe_add(t);
serviceprobe_enqueue(t);
}
}
#endif
out = htsmsg_create();
htsmsg_add_u32(out, "success", 1);