Rewrite internal streaming pipeline so it is fully message driven.

No ugly callbacks and weird locking scenarios no more.

Addresses ticket #15
This commit is contained in:
Andreas Öman 2009-06-03 19:06:33 +00:00
parent e655ffc61a
commit 206767ac3a
19 changed files with 665 additions and 732 deletions

View file

@ -900,10 +900,9 @@ cwc_transport_destroy(th_descrambler_t *td)
static inline th_stream_t *
cwc_find_stream_by_caid(th_transport_t *t, int caid)
{
streaming_pad_t *sp = &t->tht_streaming_pad;
th_stream_t *st;
LIST_FOREACH(st, &sp->sp_components, st_link) {
LIST_FOREACH(st, &t->tht_components, st_link) {
if(st->st_caid == caid)
return st;
}

View file

@ -444,7 +444,7 @@ dvb_adapter_clone(th_dvb_adapter_t *dst, th_dvb_adapter_t *src)
pthread_mutex_lock(&t_src->tht_stream_mutex);
LIST_FOREACH(st_src, &t_src->tht_streaming_pad.sp_components, st_link) {
LIST_FOREACH(st_src, &t_src->tht_components, st_link) {
st_dst = transport_add_stream(t_dst,
st_src->st_pid,

View file

@ -57,7 +57,6 @@ static int
dvb_transport_start(th_transport_t *t, unsigned int weight, int status,
int force_start)
{
streaming_pad_t *sp = &t->tht_streaming_pad;
struct dmx_pes_filter_params dmx_param;
th_stream_t *st;
int w, fd, pid;
@ -81,7 +80,7 @@ dvb_transport_start(th_transport_t *t, unsigned int weight, int status,
}
tdmi = t->tht_dvb_mux_instance;
LIST_FOREACH(st, &sp->sp_components, st_link) {
LIST_FOREACH(st, &t->tht_components, st_link) {
fd = open(tda->tda_demux_path, O_RDWR);
pid = st->st_pid;
@ -116,7 +115,7 @@ dvb_transport_start(th_transport_t *t, unsigned int weight, int status,
pthread_mutex_lock(&tda->tda_delivery_mutex);
LIST_INSERT_HEAD(&tda->tda_transports, t, tht_active_link);
t->tht_runstatus = status;
t->tht_status = status;
dvb_fe_tune(tdmi);
@ -133,7 +132,6 @@ dvb_transport_start(th_transport_t *t, unsigned int weight, int status,
static void
dvb_transport_stop(th_transport_t *t)
{
streaming_pad_t *sp = &t->tht_streaming_pad;
th_dvb_adapter_t *tda = t->tht_dvb_mux_instance->tdmi_adapter;
th_stream_t *st;
@ -143,11 +141,11 @@ dvb_transport_stop(th_transport_t *t)
LIST_REMOVE(t, tht_active_link);
pthread_mutex_unlock(&tda->tda_delivery_mutex);
LIST_FOREACH(st, &sp->sp_components, st_link) {
LIST_FOREACH(st, &t->tht_components, st_link) {
close(st->st_demuxer_fd);
st->st_demuxer_fd = -1;
}
t->tht_runstatus = TRANSPORT_IDLE;
t->tht_status = TRANSPORT_IDLE;
}

View file

@ -92,7 +92,10 @@ typedef struct dvr_entry {
/**
* Fields for recording
*/
pthread_t de_thread;
th_subscription_t *de_s;
streaming_queue_t de_sq;
/**
@ -106,7 +109,6 @@ typedef struct dvr_entry {
all commercial breaks so far */
struct dvr_rec_stream_list de_streams;
streaming_queue_t de_sq;
AVFormatContext *de_fctx;
enum {

View file

@ -46,85 +46,25 @@ typedef struct dvr_rec_stream {
/**
*
*/
static void dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp);
static void dvr_rec_stop(dvr_entry_t *de);
static void *dvr_thread(void *aux);
static void dvr_thread_new_pkt(dvr_entry_t *de, th_pkt_t *pkt);
static void dvr_spawn_postproc(dvr_entry_t *de);
static void dvr_thread_epilog(dvr_entry_t *de);
/**
*
*/
static void
dvr_subscription_callback(struct th_subscription *s,
subscription_event_t event, void *opaque)
{
dvr_entry_t *de = opaque;
const char *notifymsg = NULL;
th_transport_t *t;
switch(event) {
case SUBSCRIPTION_EVENT_INVALID:
abort();
case SUBSCRIPTION_TRANSPORT_RUN:
t = s->ths_transport;
dvr_rec_start(de, &t->tht_streaming_pad);
return;
case SUBSCRIPTION_NO_INPUT:
notifymsg = "No input detected";
break;
case SUBSCRIPTION_NO_DESCRAMBLER:
notifymsg = "No descrambler available";
break;
case SUBSCRIPTION_NO_ACCESS:
notifymsg = "Access denied";
break;
case SUBSCRIPTION_RAW_INPUT:
notifymsg = "Unable to reassemble packets from input";
break;
case SUBSCRIPTION_VALID_PACKETS:
return;
case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE:
notifymsg = "No transport available at the moment, automatic retry";
break;
case SUBSCRIPTION_TRANSPORT_LOST:
dvr_rec_stop(de);
notifymsg = "Lost transport";
break;
case SUBSCRIPTION_DESTROYED:
dvr_rec_stop(de); /* Recording completed */
return;
}
if(notifymsg != NULL)
tvhlog(LOG_WARNING, "dvr", "\"%s\" on \"%s\": %s",
de->de_title, de->de_channel->ch_name, notifymsg);
}
/**
*
*/
void
dvr_rec_subscribe(dvr_entry_t *de)
{
if(de->de_s != NULL)
return;
assert(de->de_s == NULL);
streaming_queue_init(&de->de_sq);
pthread_create(&de->de_thread, NULL, dvr_thread, de);
de->de_s = subscription_create_from_channel(de->de_channel, 1000, "pvr",
dvr_subscription_callback, de,
0);
&de->de_sq.sq_st);
}
/**
@ -133,10 +73,13 @@ dvr_rec_subscribe(dvr_entry_t *de)
void
dvr_rec_unsubscribe(dvr_entry_t *de)
{
if(de->de_s == NULL)
return;
assert(de->de_s != NULL);
subscription_unsubscribe(de->de_s);
streaming_target_deliver(&de->de_sq.sq_st, streaming_msg_create(SMT_EXIT));
pthread_join(de->de_thread, NULL);
de->de_s = NULL;
}
@ -299,9 +242,8 @@ dvr_rec_fatal_error(dvr_entry_t *de, const char *fmt, ...)
*
*/
static void
dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp)
dvr_rec_start(dvr_entry_t *de, htsmsg_t *streams)
{
th_stream_t *st;
dvr_rec_stream_t *drs;
AVOutputFormat *fmt;
AVFormatContext *fctx;
@ -312,8 +254,10 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp)
const char *codec_name;
char urlname[512];
int err;
pthread_t ptid;
pthread_attr_t attr;
htsmsg_field_t *f;
htsmsg_t *sub;
const char *type, *lang;
uint32_t idx;
if(pvr_generate_filename(de) != 0) {
dvr_rec_fatal_error(de, "Unable to create directories");
@ -357,40 +301,38 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp)
av_set_parameters(fctx, NULL);
pthread_mutex_lock(sp->sp_mutex);
/**
* Setup each stream
*/
LIST_FOREACH(st, &sp->sp_components, st_link) {
switch(st->st_type) {
default:
HTSMSG_FOREACH(f, streams) {
if(f->hmf_type != HMF_MAP)
continue;
case SCT_MPEG2VIDEO:
codec_id = CODEC_ID_MPEG2VIDEO;
codec_type = CODEC_TYPE_VIDEO;
codec_name = "mpeg2 video";
break;
sub = &f->hmf_msg;
case SCT_MPEG2AUDIO:
codec_id = CODEC_ID_MP2;
if((type = htsmsg_get_str(sub, "type")) == NULL)
continue;
if(htsmsg_get_u32(sub, "index", &idx))
continue;
if(!strcmp(type, "AC3")) {
codec_id = CODEC_ID_AC3;
codec_type = CODEC_TYPE_AUDIO;
codec_name = "mpeg2 audio";
break;
case SCT_AC3:
codec_id = CODEC_ID_AC3;
codec_name = "AC-3";
} else if(!strcmp(type, "MPEG2AUDIO")) {
codec_id = CODEC_ID_MP2;
codec_type = CODEC_TYPE_AUDIO;
codec_name = "AC3 audio";
break;
case SCT_H264:
codec_id = CODEC_ID_H264;
codec_name = "MPEG";
} else if(!strcmp(type, "MPEG2VIDEO")) {
codec_id = CODEC_ID_MPEG2VIDEO;
codec_type = CODEC_TYPE_VIDEO;
codec_name = "h.264 video";
break;
codec_name = "MPEG-2";
} else if(!strcmp(type, "H264")) {
codec_id = CODEC_ID_H264;
codec_type = CODEC_TYPE_VIDEO;
codec_name = "H264";
} else {
continue;
}
codec = avcodec_find_decoder(codec_id);
@ -402,7 +344,7 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp)
}
drs = calloc(1, sizeof(dvr_rec_stream_t));
drs->drs_source_index = st->st_index;
drs->drs_source_index = idx;
drs->drs_lavf_stream = av_new_stream(fctx, fctx->nb_streams);
@ -418,56 +360,14 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp)
continue;
}
memcpy(drs->drs_lavf_stream->language, st->st_lang, 4);
if((lang = htsmsg_get_str(sub, "language")) != NULL)
memcpy(drs->drs_lavf_stream->language, lang, 4);
LIST_INSERT_HEAD(&de->de_streams, drs, drs_link);
}
/* Link to the pad */
streaming_queue_init(&de->de_sq);
streaming_target_connect(sp, &de->de_sq.sq_st);
de->de_sq.sq_status = SQ_RUNNING;
de->de_fctx = fctx;
de->de_ts_offset = AV_NOPTS_VALUE;
pthread_mutex_unlock(sp->sp_mutex);
de->de_refcnt++;
/* Start the recorder thread */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&ptid, &attr, dvr_thread, de);
}
/**
* Called from subscription callback when we no longer have
* access to stream
*/
static void
dvr_rec_stop(dvr_entry_t *de)
{
streaming_queue_t *sq = &de->de_sq;
streaming_target_disconnect(&sq->sq_st);
pthread_mutex_lock(&sq->sq_mutex);
if(sq->sq_status == SQ_RUNNING) {
sq->sq_status = SQ_STOP_REQ;
pthread_cond_signal(&sq->sq_cond);
while(sq->sq_status != SQ_ZOMBIE)
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
}
streaming_queue_clear(&sq->sq_queue);
pthread_mutex_unlock(&sq->sq_mutex);
}
@ -480,14 +380,11 @@ dvr_thread(void *aux)
dvr_entry_t *de = aux;
streaming_queue_t *sq = &de->de_sq;
streaming_message_t *sm;
int run = 1;
pthread_mutex_lock(&sq->sq_mutex);
de->de_header_written = 0;
de->de_rec_state = DE_RS_WAIT_AUDIO_LOCK;
while(sq->sq_status == SQ_RUNNING) {
while(run) {
sm = TAILQ_FIRST(&sq->sq_queue);
if(sm == NULL) {
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
@ -504,26 +401,38 @@ dvr_thread(void *aux)
dvr_thread_new_pkt(de, sm->sm_data);
pkt_ref_dec(sm->sm_data);
break;
case SMT_START:
assert(de->de_fctx == NULL);
pthread_mutex_lock(&global_lock);
dvr_rec_start(de, sm->sm_data);
de->de_header_written = 0;
de->de_rec_state = DE_RS_WAIT_AUDIO_LOCK;
pthread_mutex_unlock(&global_lock);
break;
case SMT_STOP:
dvr_thread_epilog(de);
break;
case SMT_TRANSPORT_STATUS:
break;
case SMT_NOSOURCE:
dvr_rec_fatal_error(de,
"No source transport available, automatic retry");
break;
case SMT_EXIT:
run = 0;
break;
}
free(sm);
pthread_mutex_lock(&sq->sq_mutex);
}
/* Signal back that we no longer is running */
sq->sq_status = SQ_ZOMBIE;
pthread_cond_signal(&sq->sq_cond);
pthread_mutex_unlock(&sq->sq_mutex);
dvr_thread_epilog(de);
pthread_mutex_lock(&global_lock);
dvr_entry_dec_ref(de); /* Past this we may no longer
dereference de */
pthread_mutex_unlock(&global_lock);
/* Fade out ... */
return NULL;
}
@ -772,7 +681,8 @@ dvr_thread_epilog(dvr_entry_t *de)
AVStream *st;
int i;
assert(fctx != NULL);
if(fctx == NULL)
return;
/* Write trailer if we've written anything at all */

View file

@ -47,12 +47,16 @@
extern const char *htsversion;
LIST_HEAD(htsp_connection_list, htsp_connection);
LIST_HEAD(htsp_subscription_list, htsp_subscription);
TAILQ_HEAD(htsp_msg_queue, htsp_msg);
TAILQ_HEAD(htsp_msg_q_queue, htsp_msg_q);
static struct htsp_connection_list htsp_async_connections;
static void htsp_streaming_input(void *opaque, streaming_message_t *sm);
/**
*
*/
@ -122,7 +126,7 @@ typedef struct htsp_connection {
/**
*
*/
struct th_subscription_list htsp_subscriptions;
struct htsp_subscription_list htsp_subscriptions;
uint32_t htsp_granted_access;
@ -130,23 +134,28 @@ typedef struct htsp_connection {
} htsp_connection_t;
/**
*
*/
typedef struct htsp_stream {
streaming_target_t hs_st;
int hs_sid; /* Subscription ID */
typedef struct htsp_subscription {
htsp_connection_t *hs_htsp;
LIST_ENTRY(htsp_subscription) hs_link;
int hs_sid; /* Subscription ID (set by client) */
th_subscription_t *hs_s; // Temporary
streaming_target_t hs_input;
htsp_msg_q_t hs_q;
time_t hs_last_report; /* Last queue status report sent */
int hs_dropstats[PKT_NTYPES];
} htsp_stream_t;
} htsp_subscription_t;
@ -170,16 +179,6 @@ htsp_update_logname(htsp_connection_t *htsp)
}
/**
*
*/
static void htsp_subscription_callback(struct th_subscription *s,
subscription_event_t event,
void *opaque);
/**
*
*/
@ -209,7 +208,7 @@ htsp_init_queue(htsp_msg_q_t *hmq, int strict_prio)
*
*/
static void
htsp_destroy_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq)
htsp_flush_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq)
{
htsp_msg_t *hm;
@ -223,6 +222,19 @@ htsp_destroy_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq)
}
/**
*
*/
static void
htsp_subscription_destroy(htsp_connection_t *htsp, htsp_subscription_t *hs)
{
LIST_REMOVE(hs, hs_link);
subscription_unsubscribe(hs->hs_s);
htsp_flush_queue(htsp, &hs->hs_q);
free(hs);
}
/**
*
*/
@ -435,9 +447,9 @@ htsp_method_getEvent(htsp_connection_t *htsp, htsmsg_t *in)
static htsmsg_t *
htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
{
th_subscription_t *s;
uint32_t chid, sid;
channel_t *ch;
htsp_subscription_t *hs;
if(htsmsg_get_u32(in, "channelId", &chid))
return htsp_error("Missing argument 'channeId'");
@ -450,15 +462,24 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
/*
* We send the reply here or the user will get the 'subscriptionStart'
* We send the reply now to avoid the user getting the 'subscriptionStart'
* async message before the reply to 'subscribe'.
*/
htsp_reply(htsp, in, htsmsg_create_map());
s = subscription_create_from_channel(ch, 500, "htsp",
htsp_subscription_callback, htsp, sid);
/* Initialize the HTSP subscription structure */
hs = calloc(1, sizeof(htsp_subscription_t));
hs->hs_htsp = htsp;
htsp_init_queue(&hs->hs_q, 0);
hs->hs_sid = sid;
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link);
streaming_target_init(&hs->hs_input, htsp_streaming_input, hs);
hs->hs_s = subscription_create_from_channel(ch, 500, "htsp", &hs->hs_input);
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, s, ths_subscriber_link);
return NULL;
}
@ -469,27 +490,26 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
static htsmsg_t *
htsp_method_unsubscribe(htsp_connection_t *htsp, htsmsg_t *in)
{
th_subscription_t *s;
htsp_subscription_t *s;
uint32_t sid;
if(htsmsg_get_u32(in, "subscriptionId", &sid))
return htsp_error("Missing argument 'subscriptionId'");
LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link)
if(s->ths_u32 == sid)
LIST_FOREACH(s, &htsp->htsp_subscriptions, hs_link)
if(s->hs_sid == sid)
break;
/*
* We send the reply here or the user will get the 'subscriptionStart'
* async message before the reply to 'subscribe'.
* We send the reply here or the user will get the 'subscriptionStop'
* async message before the reply to 'unsubscribe'.
*/
htsp_reply(htsp, in, htsmsg_create_map());
if(s == NULL)
return NULL; /* Subscription did not exist, but we don't really care */
LIST_REMOVE(s, ths_subscriber_link);
subscription_unsubscribe(s);
htsp_subscription_destroy(htsp, s);
return NULL;
}
@ -810,7 +830,7 @@ htsp_serve(int fd, void *opaque, struct sockaddr_in *source)
{
htsp_connection_t htsp;
char buf[30];
th_subscription_t *s;
htsp_subscription_t *s;
snprintf(buf, sizeof(buf), "%s", inet_ntoa(source->sin_addr));
@ -849,8 +869,7 @@ htsp_serve(int fd, void *opaque, struct sockaddr_in *source)
down in the streaming code. So we do this as early as possible
to avoid any weird lockups */
while((s = LIST_FIRST(&htsp.htsp_subscriptions)) != NULL) {
LIST_REMOVE(s, ths_subscriber_link);
subscription_unsubscribe(s);
htsp_subscription_destroy(&htsp, s);
}
if(htsp.htsp_async_mode)
@ -984,7 +1003,7 @@ htsp_tag_delete(channel_tag_t *ct)
htsp_async_send(m);
}
#if 0
/**
*
*/
@ -996,13 +1015,14 @@ htsp_send_subscription_status(htsp_connection_t *htsp, th_subscription_t *ths,
m = htsmsg_create_map();
htsmsg_add_str(m, "method", "subscriptionStatus");
htsmsg_add_u32(m, "subscriptionId", ths->ths_u32);
abort(); //htsmsg_add_u32(m, "subscriptionId", ths->ths_u32);
if(txt != NULL)
htsmsg_add_str(m, "status", txt);
htsp_send_message(htsp, m, NULL);
}
#endif
const static char frametypearray[PKT_NTYPES] = {
[PKT_I_FRAME] = 'I',
@ -1014,18 +1034,14 @@ const static char frametypearray[PKT_NTYPES] = {
* Build a htsmsg from a th_pkt and enqueue it on our HTSP transport
*/
static void
htsp_stream_deliver(void *opaque, streaming_message_t *sm)
htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
{
htsp_stream_t *hs = opaque;
th_pkt_t *pkt = sm->sm_data;
htsmsg_t *m = htsmsg_create_map(), *n;
htsp_msg_t *hm;
htsp_connection_t *htsp = hs->hs_htsp;
int64_t ts;
int qlen = hs->hs_q.hmq_payload;
free(sm);
if((qlen > 500000 && pkt->pkt_frametype == PKT_B_FRAME) ||
(qlen > 750000 && pkt->pkt_frametype == PKT_P_FRAME) ||
(qlen > 1500000)) {
@ -1095,154 +1111,60 @@ htsp_stream_deliver(void *opaque, streaming_message_t *sm)
/**
* Send a 'subscriptionStart' message to client informing about
* delivery start and all components.
*
* Setup a streaming target that will deliver packets to the HTSP transport.
*/
static void
htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s,
streaming_pad_t *sp)
htsp_subscription_start(htsp_subscription_t *hs, htsmsg_t *streams)
{
htsp_stream_t *hs;
htsmsg_t *m, *streams, *c;
th_stream_t *st;
assert(s->ths_st == NULL);
hs = calloc(1, sizeof(htsp_stream_t));
hs->hs_htsp = htsp;
hs->hs_sid = s->ths_u32;
htsp_init_queue(&hs->hs_q, 0);
m = htsmsg_create_map();
htsmsg_t *m = htsmsg_create_map();
htsmsg_add_str(m, "method", "subscriptionStart");
htsmsg_add_u32(m, "subscriptionId", s->ths_u32);
streaming_target_init2(&hs->hs_st, htsp_stream_deliver, hs);
/**
* Lock streming pad delivery so we can hook us up.
*/
pthread_mutex_lock(sp->sp_mutex);
/* Setup each stream */
streams = htsmsg_create_list();
LIST_FOREACH(st, &sp->sp_components, st_link) {
c = htsmsg_create_map();
htsmsg_add_u32(c, "index", st->st_index);
htsmsg_add_str(c, "type", streaming_component_type2txt(st->st_type));
if(st->st_lang[0])
htsmsg_add_str(c, "language", st->st_lang);
htsmsg_add_msg(streams, NULL, c);
}
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
htsmsg_add_msg(m, "streams", streams);
htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
/* Link to the pad */
streaming_target_connect(sp, &hs->hs_st);
s->ths_st = &hs->hs_st;
/* Once we unlock here we will start getting the callback */
pthread_mutex_unlock(sp->sp_mutex);
}
/**
* Send a 'subscriptionStart' stop
*
* Tear down all streaming related stuff.
*/
static void
htsp_subscription_stop(htsp_connection_t *htsp, th_subscription_t *s,
const char *reason)
htsp_subscription_stop(htsp_subscription_t *hs, htsmsg_t *m)
{
htsp_stream_t *hs;
htsmsg_t *m;
assert(s->ths_st != NULL);
hs = (htsp_stream_t *)s->ths_st;
/* Unlink from pad */
streaming_target_disconnect(&hs->hs_st);
/* Send a stop message back */
m = htsmsg_create_map();
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
htsmsg_add_str(m, "method", "subscriptionStop");
if(reason)
htsmsg_add_str(m, "reason", reason);
/* Send on normal control queue cause we are about do destroy
the per-subscription queue */
htsp_send_message(hs->hs_htsp, m, NULL);
htsp_destroy_queue(htsp, &hs->hs_q);
free(hs);
s->ths_st = NULL;
htsmsg_add_u32(m, "subscriptionId", hs->hs_sid);
htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0);
}
/**
*
*/
static void
htsp_subscription_callback(struct th_subscription *s,
subscription_event_t event, void *opaque)
htsp_streaming_input(void *opaque, streaming_message_t *sm)
{
htsp_connection_t *htsp = opaque;
th_transport_t *t;
htsp_subscription_t *hs = opaque;
switch(event) {
case SUBSCRIPTION_EVENT_INVALID:
switch(sm->sm_type) {
case SMT_PACKET:
htsp_stream_deliver(hs, sm->sm_data); // reference is transfered
break;
case SMT_START:
htsp_subscription_start(hs, sm->sm_data);
break;
case SMT_STOP:
htsp_subscription_stop(hs, sm->sm_data);
break;
case SMT_TRANSPORT_STATUS:
break;
case SMT_NOSOURCE:
break;
case SMT_EXIT:
abort();
case SUBSCRIPTION_TRANSPORT_RUN:
htsp_send_subscription_status(htsp, s, NULL);
t = s->ths_transport;
htsp_subscription_start(htsp, s, &t->tht_streaming_pad);
return;
case SUBSCRIPTION_NO_INPUT:
htsp_send_subscription_status(htsp, s, "No input detected");
break;
case SUBSCRIPTION_NO_DESCRAMBLER:
htsp_send_subscription_status(htsp, s, "No descrambler available");
break;
case SUBSCRIPTION_NO_ACCESS:
htsp_send_subscription_status(htsp, s, "Access denied");
break;
case SUBSCRIPTION_RAW_INPUT:
htsp_send_subscription_status(htsp, s,
"Unable to reassemble packets from input");
break;
case SUBSCRIPTION_VALID_PACKETS:
htsp_send_subscription_status(htsp, s, NULL);
break;
case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE:
htsp_send_subscription_status(htsp, s,
"No transport available, retrying...");
break;
case SUBSCRIPTION_TRANSPORT_LOST:
htsp_subscription_stop(htsp, s, "Transport destroyed");
break;
case SUBSCRIPTION_DESTROYED:
htsp_subscription_stop(htsp, s, NULL);
return;
}
free(sm);
}

View file

@ -315,8 +315,6 @@ main(int argc, char **argv)
webui_init(contentpath);
subscriptions_init();
serviceprobe_init();
cwc_init();

View file

@ -877,11 +877,12 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
/**
* Input is ok
*/
transport_signal_status(t, SUBSCRIPTION_VALID_PACKETS);
transport_set_feed_status(t, TRANSPORT_FEED_VALID_PACKETS);
/* Forward packet */
pkt->pkt_componentindex = st->st_index;
streaming_pad_deliver_packet(&t->tht_streaming_pad, pkt);
streaming_pad_deliver(&t->tht_streaming_pad,
streaming_msg_create_pkt(pkt));
/* Decrease our own reference to the packet */
pkt_ref_dec(pkt);

View file

@ -555,7 +555,6 @@ streaming_component_type2txt(streaming_component_type_t s)
void
psi_save_transport_settings(htsmsg_t *m, th_transport_t *t)
{
streaming_pad_t *sp = &t->tht_streaming_pad;
th_stream_t *st;
htsmsg_t *sub;
@ -565,7 +564,7 @@ psi_save_transport_settings(htsmsg_t *m, th_transport_t *t)
lock_assert(&t->tht_stream_mutex);
LIST_FOREACH(st, &sp->sp_components, st_link) {
LIST_FOREACH(st, &t->tht_components, st_link) {
sub = htsmsg_create_map();
htsmsg_add_u32(sub, "pid", st->st_pid);

View file

@ -32,6 +32,7 @@
#include "subscriptions.h"
#include "serviceprobe.h"
#include "transports.h"
#include "streaming.h"
/* List of transports to be probed, protected with global_lock */
@ -58,77 +59,6 @@ serviceprobe_enqueue(th_transport_t *t)
pthread_cond_signal(&serviceprobe_cond);
}
/**
*
*/
static void
serviceprobe_callback(struct th_subscription *s, subscription_event_t event,
void *opaque)
{
th_transport_t *t = opaque;
channel_t *ch;
const char *errmsg;
switch(event) {
case SUBSCRIPTION_TRANSPORT_RUN:
return;
case SUBSCRIPTION_NO_INPUT:
errmsg = "No input detected";
break;
case SUBSCRIPTION_NO_DESCRAMBLER:
errmsg = "No descrambler available";
break;
case SUBSCRIPTION_NO_ACCESS:
errmsg = "Access denied";
break;
case SUBSCRIPTION_RAW_INPUT:
errmsg = "Unable to reassemble packets from input";
break;
case SUBSCRIPTION_VALID_PACKETS:
errmsg = NULL; /* All OK */
break;
case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE:
case SUBSCRIPTION_TRANSPORT_LOST:
errmsg = "Unable to probe";
break;
case SUBSCRIPTION_DESTROYED:
return; /* All done */
default:
abort();
}
assert(t == TAILQ_FIRST(&serviceprobe_queue));
if(errmsg != NULL) {
tvhlog(LOG_INFO, "serviceprobe", "%20s: skipped: %s",
t->tht_svcname, errmsg);
} else if(t->tht_ch == NULL) {
ch = channel_find_by_name(t->tht_svcname, 1);
transport_map_channel(t, ch);
pthread_mutex_lock(&t->tht_stream_mutex);
t->tht_config_change(t);
pthread_mutex_unlock(&t->tht_stream_mutex);
tvhlog(LOG_INFO, "serviceprobe", "\"%s\" mapped to channel \"%s\"",
t->tht_svcname, t->tht_svcname);
}
t->tht_sp_onqueue = 0;
TAILQ_REMOVE(&serviceprobe_queue, t, tht_sp_link);
pthread_cond_signal(&serviceprobe_cond);
}
/**
*
*/
@ -138,9 +68,17 @@ serviceprobe_thread(void *aux)
th_transport_t *t;
th_subscription_t *s;
int was_doing_work = 0;
streaming_queue_t sq;
streaming_message_t *sm;
transport_feed_status_t status;
int run;
const char *err;
channel_t *ch;
pthread_mutex_lock(&global_lock);
streaming_queue_init(&sq);
while(1) {
while((t = TAILQ_FIRST(&serviceprobe_queue)) == NULL) {
@ -154,18 +92,96 @@ serviceprobe_thread(void *aux)
if(!was_doing_work) {
tvhlog(LOG_INFO, "serviceprobe", "Starting");
was_doing_work = 1;
}
s = subscription_create_from_transport(t, "serviceprobe",
serviceprobe_callback, t);
s->ths_force_demux = 1;
s = subscription_create_from_transport(t, "serviceprobe", &sq.sq_st);
/* Wait for something to happen */
while(TAILQ_FIRST(&serviceprobe_queue) == t)
pthread_cond_wait(&serviceprobe_cond, &global_lock);
transport_ref(t);
pthread_mutex_unlock(&global_lock);
run = 1;
pthread_mutex_lock(&sq.sq_mutex);
while(run) {
while((sm = TAILQ_FIRST(&sq.sq_queue)) == NULL)
pthread_cond_wait(&sq.sq_cond, &sq.sq_mutex);
TAILQ_REMOVE(&sq.sq_queue, sm, sm_link);
pthread_mutex_unlock(&sq.sq_mutex);
if(sm->sm_type == SMT_TRANSPORT_STATUS) {
status = sm->sm_code;
switch(status) {
case TRANSPORT_FEED_UNKNOWN:
break;
case TRANSPORT_FEED_NO_INPUT:
err = "No data input from adapter detected";
run = 0;
break;
case TRANSPORT_FEED_NO_DEMUXED_INPUT:
err = "No mux packets for this service";
run = 0;
break;
case TRANSPORT_FEED_RAW_INPUT:
err = "Data received for service, "
"but no packets could be reassembled";
run = 0;
break;
case TRANSPORT_FEED_NO_DESCRAMBLER:
err = "No descrambler available for service";
run = 0;
break;
case TRANSPORT_FEED_NO_ACCESS:
err = "Access denied";
run = 0;
break;
case TRANSPORT_FEED_VALID_PACKETS:
err = NULL;
run = 0;
break;
}
}
streaming_msg_free(sm);
pthread_mutex_lock(&sq.sq_mutex);
}
streaming_queue_clear(&sq.sq_queue);
pthread_mutex_unlock(&sq.sq_mutex);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
was_doing_work = 1;
if(t->tht_status != TRANSPORT_ZOMBIE) {
if(err != NULL) {
tvhlog(LOG_INFO, "serviceprobe", "%20s: skipped: %s",
t->tht_svcname, err);
} else if(t->tht_ch == NULL) {
ch = channel_find_by_name(t->tht_svcname, 1);
transport_map_channel(t, ch);
pthread_mutex_lock(&t->tht_stream_mutex);
t->tht_config_change(t);
pthread_mutex_unlock(&t->tht_stream_mutex);
tvhlog(LOG_INFO, "serviceprobe", "\"%s\" mapped to channel \"%s\"",
t->tht_svcname, t->tht_svcname);
}
t->tht_sp_onqueue = 0;
TAILQ_REMOVE(&serviceprobe_queue, t, tht_sp_link);
}
transport_unref(t);
}
return NULL;
}

View file

@ -23,18 +23,16 @@
#include "packet.h"
void
streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex)
streaming_pad_init(streaming_pad_t *sp)
{
LIST_INIT(&sp->sp_targets);
LIST_INIT(&sp->sp_components);
sp->sp_mutex = mutex;
}
/**
*
*/
void
streaming_target_init2(streaming_target_t *st, st_callback_t *cb, void *opaque)
streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque)
{
st->st_cb = cb;
st->st_opaque = opaque;
@ -62,9 +60,7 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
void
streaming_queue_init(streaming_queue_t *sq)
{
sq->sq_status = SQ_IDLE;
streaming_target_init2(&sq->sq_st, streaming_queue_deliver, sq);
streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq);
pthread_mutex_init(&sq->sq_mutex, NULL);
pthread_cond_init(&sq->sq_cond, NULL);
@ -78,56 +74,104 @@ streaming_queue_init(streaming_queue_t *sq)
void
streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st)
{
lock_assert(sp->sp_mutex);
sp->sp_ntargets++;
st->st_pad = sp;
LIST_INSERT_HEAD(&sp->sp_targets, st, st_link);
}
/**
*
*/
void
streaming_target_disconnect(streaming_target_t *st)
streaming_target_disconnect(streaming_pad_t *sp, streaming_target_t *st)
{
streaming_pad_t *sp = st->st_pad;
if(sp == NULL)
return; /* Already disconnected */
pthread_mutex_lock(sp->sp_mutex);
sp->sp_ntargets--;
st->st_pad = NULL;
LIST_REMOVE(st, st_link);
pthread_mutex_unlock(sp->sp_mutex);
}
/**
*
*/
void
streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt)
streaming_message_t *
streaming_msg_create(streaming_message_type_t type)
{
streaming_target_t *st;
streaming_message_t *sm;
streaming_message_t *sm = malloc(sizeof(streaming_message_t));
sm->sm_type = type;
return sm;
}
lock_assert(sp->sp_mutex);
if(sp->sp_ntargets == 0)
return;
/**
*
*/
streaming_message_t *
streaming_msg_create_pkt(th_pkt_t *pkt)
{
streaming_message_t *sm = streaming_msg_create(SMT_PACKET);
sm->sm_data = pkt;
pkt_ref_inc(pkt);
return sm;
}
/* Increase multiple refcounts at once */
pkt_ref_inc_poly(pkt, sp->sp_ntargets);
LIST_FOREACH(st, &sp->sp_targets, st_link) {
/**
*
*/
streaming_message_t *
streaming_msg_create_msg(streaming_message_type_t type, htsmsg_t *msg)
{
streaming_message_t *sm = streaming_msg_create(type);
sm->sm_data = msg;
return sm;
}
sm = malloc(sizeof(streaming_message_t));
sm->sm_type = SMT_PACKET;
sm->sm_data = pkt;
st->st_cb(st->st_opaque, sm);
/**
*
*/
streaming_message_t *
streaming_msg_create_code(streaming_message_type_t type, int code)
{
streaming_message_t *sm = streaming_msg_create(type);
sm->sm_code = code;
return sm;
}
/**
*
*/
streaming_message_t *
streaming_msg_clone(streaming_message_t *src)
{
streaming_message_t *dst = malloc(sizeof(streaming_message_t));
dst->sm_type = src->sm_type;
switch(src->sm_type) {
case SMT_PACKET:
pkt_ref_inc(src->sm_data);
dst->sm_data = src->sm_data;
break;
case SMT_START:
case SMT_STOP:
dst->sm_data = htsmsg_copy(src->sm_data);
break;
case SMT_EXIT:
break;
default:
abort();
}
return dst;
}
@ -135,13 +179,24 @@ streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt)
*
*/
void
streaming_message_free(streaming_message_t *sm)
streaming_msg_free(streaming_message_t *sm)
{
switch(sm->sm_type) {
case SMT_PACKET:
pkt_ref_dec(sm->sm_data);
break;
case SMT_START:
case SMT_STOP:
htsmsg_destroy(sm->sm_data);
break;
case SMT_EXIT:
break;
case SMT_TRANSPORT_STATUS:
break;
default:
abort();
}
@ -149,6 +204,27 @@ streaming_message_free(streaming_message_t *sm)
}
/**
*
*/
void
streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm)
{
streaming_target_t *st, *next;
if(sp->sp_ntargets == 0)
return;
for(st = LIST_FIRST(&sp->sp_targets);; st = next) {
if((next = LIST_NEXT(st, st_link)) == NULL)
break;
st->st_cb(st->st_opaque, streaming_msg_clone(sm));
}
st->st_cb(st->st_opaque, sm);
}
/**
*
@ -160,6 +236,6 @@ streaming_queue_clear(struct streaming_message_queue *q)
while((sm = TAILQ_FIRST(q)) != NULL) {
TAILQ_REMOVE(q, sm, sm_link);
streaming_message_free(sm);
streaming_msg_free(sm);
}
}

View file

@ -21,25 +21,40 @@
#include "tvhead.h"
#include "packet.h"
#include "htsmsg.h"
/**
*
*/
void streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex);
void streaming_pad_init(streaming_pad_t *sp);
void streaming_target_init2(streaming_target_t *st,
st_callback_t *cb, void *opaque);
void streaming_target_init(streaming_target_t *st,
st_callback_t *cb, void *opaque);
void streaming_queue_init(streaming_queue_t *st);
void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st);
void streaming_target_disconnect(streaming_target_t *st);
void streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt);
void streaming_message_free(streaming_message_t *sm);
void streaming_queue_init(streaming_queue_t *sq);
void streaming_queue_clear(struct streaming_message_queue *q);
void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st);
void streaming_target_disconnect(streaming_pad_t *sp, streaming_target_t *st);
void streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm);
void streaming_msg_free(streaming_message_t *sm);
streaming_message_t *streaming_msg_clone(streaming_message_t *src);
streaming_message_t *streaming_msg_create(streaming_message_type_t type);
streaming_message_t *streaming_msg_create_msg(streaming_message_type_t type,
htsmsg_t *msg);
streaming_message_t *streaming_msg_create_code(streaming_message_type_t type,
int code);
streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt);
#define streaming_target_deliver(st, sm) ((st)->st_cb((st)->st_opaque, (sm)))
#endif /* STREAMING_H_ */

View file

@ -34,11 +34,9 @@
#include "tvhead.h"
#include "transports.h"
#include "subscriptions.h"
#include "streaming.h"
struct th_subscription_list subscriptions;
static pthread_cond_t subscription_janitor_cond;
static pthread_mutex_t subscription_janitor_mutex;
static int subscription_janitor_work;
static gtimer_t subscription_reschedule_timer;
@ -48,21 +46,62 @@ subscription_sort(th_subscription_t *a, th_subscription_t *b)
return b->ths_weight - a->ths_weight;
}
/**
*
* The transport is producing output. Thus, we may link our subscription
* to it.
*/
static void
subscription_link_transport(th_subscription_t *s, th_transport_t *t)
{
streaming_message_t *sm;
s->ths_transport = t;
LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link);
s->ths_event_callback(s, SUBSCRIPTION_TRANSPORT_RUN, s->ths_opaque);
s->ths_last_status = t->tht_last_status;
if(s->ths_last_status != SUBSCRIPTION_EVENT_INVALID)
s->ths_event_callback(s, s->ths_last_status, s->ths_opaque);
pthread_mutex_lock(&t->tht_stream_mutex);
// Link to transport output
streaming_target_connect(&t->tht_streaming_pad, &s->ths_input);
// Send a START message to the subscription client
sm = streaming_msg_create_msg(SMT_START, transport_build_stream_msg(t));
streaming_target_deliver(s->ths_output, sm);
// Send a TRANSPORT_STATUS message to the subscription client
sm = streaming_msg_create_code(SMT_TRANSPORT_STATUS, t->tht_feed_status);
streaming_target_deliver(s->ths_output, sm);
pthread_mutex_unlock(&t->tht_stream_mutex);
}
/**
* Called from transport code
*/
void
subscription_unlink_transport(th_subscription_t *s)
{
streaming_message_t *sm;
th_transport_t *t = s->ths_transport;
pthread_mutex_lock(&t->tht_stream_mutex);
// Unlink from transport output
streaming_target_disconnect(&t->tht_streaming_pad, &s->ths_input);
// Send a STOP message to the subscription client
sm = streaming_msg_create_msg(SMT_STOP, htsmsg_create_map());
streaming_target_deliver(s->ths_output, sm);
pthread_mutex_unlock(&t->tht_stream_mutex);
LIST_REMOVE(s, ths_transport_link);
s->ths_transport = NULL;
}
/**
*
*/
@ -71,6 +110,7 @@ subscription_reschedule(void *aux)
{
th_subscription_t *s;
th_transport_t *t;
streaming_message_t *sm;
lock_assert(&global_lock);
@ -87,8 +127,9 @@ subscription_reschedule(void *aux)
if(t == NULL) {
/* No transport available */
s->ths_event_callback(s, SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE,
s->ths_opaque);
sm = streaming_msg_create(SMT_NOSOURCE);
streaming_target_deliver(s->ths_output, sm);
continue;
}
@ -120,20 +161,35 @@ subscription_unsubscribe(th_subscription_t *s)
subscription_reschedule(NULL);
}
/**
* This callback is invoked when we receive data and status updates from
* the currently bound transport
*/
static void
subscription_input(void *opauqe, streaming_message_t *sm)
{
th_subscription_t *s = opauqe;
streaming_target_deliver(s->ths_output, sm);
}
/**
*
*/
static th_subscription_t *
subscription_create(int weight, const char *name,
ths_event_callback_t *cb, void *opaque)
subscription_create(int weight, const char *name, streaming_target_t *st)
{
th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
s->ths_weight = weight;
s->ths_event_callback = cb;
s->ths_opaque = opaque;
s->ths_title = strdup(name);
s->ths_total_err = 0;
streaming_target_init(&s->ths_input, subscription_input, s);
s->ths_weight = weight;
s->ths_title = strdup(name);
s->ths_total_err = 0;
s->ths_output = st;
time(&s->ths_start);
LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort);
@ -146,14 +202,10 @@ subscription_create(int weight, const char *name,
*
*/
th_subscription_t *
subscription_create_from_channel(channel_t *ch,
unsigned int weight, const char *name,
ths_event_callback_t *cb, void *opaque,
uint32_t u32)
subscription_create_from_channel(channel_t *ch, unsigned int weight,
const char *name, streaming_target_t *st)
{
th_subscription_t *s = subscription_create(weight, name, cb, opaque);
s->ths_u32 = u32;
th_subscription_t *s = subscription_create(weight, name, st);
s->ths_channel = ch;
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
@ -176,86 +228,13 @@ subscription_create_from_channel(channel_t *ch,
*/
th_subscription_t *
subscription_create_from_transport(th_transport_t *t, const char *name,
ths_event_callback_t *cb, void *opaque)
streaming_target_t *st)
{
th_subscription_t *s = subscription_create(INT32_MAX, name, cb, opaque);
th_subscription_t *s = subscription_create(INT32_MAX, name, st);
if(t->tht_runstatus != TRANSPORT_RUNNING)
if(t->tht_status != TRANSPORT_RUNNING)
transport_start(t, INT32_MAX, 1);
subscription_link_transport(s, t);
return s;
}
/**
*
*/
void
subscription_janitor_has_duty(void)
{
pthread_mutex_lock(&subscription_janitor_mutex);
subscription_janitor_work++;
pthread_cond_signal(&subscription_janitor_cond);
pthread_mutex_unlock(&subscription_janitor_mutex);
}
/**
*
*/
static void *
subscription_janitor(void *aux)
{
int v;
th_subscription_t *s;
th_transport_t *t;
pthread_mutex_lock(&subscription_janitor_mutex);
v = subscription_janitor_work;
while(1) {
while(v == subscription_janitor_work)
pthread_cond_wait(&subscription_janitor_cond,
&subscription_janitor_mutex);
v = subscription_janitor_work;
pthread_mutex_unlock(&subscription_janitor_mutex);
pthread_mutex_lock(&global_lock);
LIST_FOREACH(s, &subscriptions, ths_global_link) {
if((t = s->ths_transport) == NULL)
continue;
if(s->ths_last_status != t->tht_last_status) {
s->ths_last_status = t->tht_last_status;
s->ths_event_callback(s, s->ths_last_status, s->ths_opaque);
}
}
pthread_mutex_unlock(&global_lock);
}
return NULL;
}
/**
*
*/
void
subscriptions_init(void)
{
pthread_t ptid;
pthread_cond_init(&subscription_janitor_cond, NULL);
pthread_mutex_init(&subscription_janitor_mutex, NULL);
pthread_create(&ptid, NULL, subscription_janitor, NULL);
}

View file

@ -19,15 +19,6 @@
#ifndef SUBSCRIPTIONS_H
#define SUBSCRIPTIONS_H
typedef void (ths_event_callback_t)(struct th_subscription *s,
subscription_event_t event,
void *opaque);
typedef void (ths_raw_input_t)(struct th_subscription *s,
void *data, int len,
th_stream_t *st,
void *opaque);
typedef struct th_subscription {
LIST_ENTRY(th_subscription) ths_global_link;
int ths_weight;
@ -41,24 +32,15 @@ typedef struct th_subscription {
struct th_transport *ths_transport; /* if NULL, ths_transport_link
is not linked */
LIST_ENTRY(th_subscription) ths_subscriber_link; /* Caller is responsible
for this link */
void *ths_opaque;
uint32_t ths_u32;
char *ths_title; /* display title */
time_t ths_start; /* time when subscription started */
int ths_total_err; /* total errors during entire subscription */
int ths_last_status;
ths_event_callback_t *ths_event_callback;
ths_raw_input_t *ths_raw_input;
int ths_force_demux;
streaming_target_t *ths_st;
streaming_target_t ths_input;
streaming_target_t *ths_output;
} th_subscription_t;
@ -73,20 +55,15 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight);
th_subscription_t *subscription_create_from_channel(channel_t *ch,
unsigned int weight,
const char *name,
ths_event_callback_t *cb,
void *opaque,
uint32_t u32);
streaming_target_t *st);
th_subscription_t *subscription_create_from_transport(th_transport_t *t,
const char *name,
ths_event_callback_t *cb,
void *opaque);
void subscriptions_init(void);
streaming_target_t *st);
void subscription_stop(th_subscription_t *s);
void subscription_janitor_has_duty(void);
void subscription_unlink_transport(th_subscription_t *s);
#endif /* SUBSCRIPTIONS_H */

View file

@ -43,6 +43,8 @@
#include "cwc.h"
#include "notify.h"
#include "serviceprobe.h"
#include "atomic.h"
#define TRANSPORT_HASH_WIDTH 101
@ -50,11 +52,6 @@ static struct th_transport_list transporthash[TRANSPORT_HASH_WIDTH];
static void transport_data_timeout(void *aux);
//static dtimer_t transport_monitor_timer;
//static const char *transport_settings_path(th_transport_t *t);
//static void transport_monitor(void *aux, int64_t now);
/**
* Transport lock must be held
@ -62,7 +59,6 @@ static void transport_data_timeout(void *aux);
static void
transport_stop(th_transport_t *t)
{
streaming_pad_t *sp = &t->tht_streaming_pad;
th_descrambler_t *td;
th_stream_t *st;
@ -85,7 +81,7 @@ transport_stop(th_transport_t *t)
/**
* Clean up each stream
*/
LIST_FOREACH(st, &sp->sp_components, st_link) {
LIST_FOREACH(st, &t->tht_components, st_link) {
if(st->st_parser != NULL)
av_parser_close(st->st_parser);
@ -127,16 +123,6 @@ transport_stop(th_transport_t *t)
pthread_mutex_unlock(&t->tht_stream_mutex);
}
/**
*
*/
static void
remove_subscriber(th_subscription_t *s, subscription_event_t reason)
{
s->ths_event_callback(s, reason, s->ths_opaque);
LIST_REMOVE(s, ths_transport_link);
s->ths_transport = NULL;
}
/**
* Remove the given subscriber from the transport
@ -151,10 +137,11 @@ transport_remove_subscriber(th_transport_t *t, th_subscription_t *s)
lock_assert(&global_lock);
if(s == NULL) {
while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL)
remove_subscriber(s, SUBSCRIPTION_TRANSPORT_LOST);
while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL) {
subscription_unlink_transport(s);
}
} else {
remove_subscriber(s, SUBSCRIPTION_DESTROYED);
subscription_unlink_transport(s);
}
if(LIST_FIRST(&t->tht_subscriptions) == NULL)
@ -209,14 +196,13 @@ transport_unlink_muxer(th_muxer_t *tm)
int
transport_start(th_transport_t *t, unsigned int weight, int force_start)
{
streaming_pad_t *sp = &t->tht_streaming_pad;
th_stream_t *st;
AVCodec *c;
enum CodecID id;
lock_assert(&global_lock);
assert(t->tht_runstatus != TRANSPORT_RUNNING);
assert(t->tht_status != TRANSPORT_RUNNING);
if(t->tht_start_feed(t, weight, TRANSPORT_RUNNING, force_start))
return -1;
@ -227,7 +213,7 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start)
/**
* Initialize stream
*/
LIST_FOREACH(st, &sp->sp_components, st_link) {
LIST_FOREACH(st, &t->tht_components, st_link) {
st->st_startcond = 0xffffffff;
st->st_curdts = AV_NOPTS_VALUE;
st->st_curpts = AV_NOPTS_VALUE;
@ -268,9 +254,9 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start)
cwc_transport_start(t);
t->tht_packets = 0;
gtimer_arm(&t->tht_receive_timer, transport_data_timeout, t, 4);
t->tht_last_status = SUBSCRIPTION_EVENT_INVALID;
t->tht_feed_status = TRANSPORT_FEED_UNKNOWN;
t->tht_input_status = TRANSPORT_FEED_NO_INPUT;
return 0;
}
@ -374,7 +360,7 @@ transport_find(channel_t *ch, unsigned int weight)
for(i = 0; i < cnt; i++) {
t = vec[i];
if(t->tht_runstatus == TRANSPORT_RUNNING)
if(t->tht_status == TRANSPORT_RUNNING)
return t;
if(!transport_start(t, 0, 0))
@ -415,17 +401,27 @@ transport_compute_weight(struct th_transport_list *head)
}
#if 0
/**
* Timer that fires if transport is not receiving any data
*
*/
static void
transport_data_timeout(void *aux, int64_t now)
void
transport_unref(th_transport_t *t)
{
th_transport_t *t = aux;
transport_signal_status(t, TRANSPORT_STATUS_NO_INPUT);
if((atomic_add(&t->tht_refcount, -1)) == 1)
free(t);
}
#endif
/**
*
*/
void
transport_ref(th_transport_t *t)
{
atomic_add(&t->tht_refcount, 1);
}
/**
* Destroy a transport
@ -433,14 +429,14 @@ transport_data_timeout(void *aux, int64_t now)
void
transport_destroy(th_transport_t *t)
{
streaming_pad_t *sp = &t->tht_streaming_pad;
th_stream_t *st;
th_subscription_t *s;
lock_assert(&global_lock);
while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL)
remove_subscriber(s, SUBSCRIPTION_TRANSPORT_LOST);
while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL) {
subscription_unlink_transport(s);
}
//dtimer_disarm(&t->tht_receive_timer);
@ -454,23 +450,23 @@ transport_destroy(th_transport_t *t)
LIST_REMOVE(t, tht_mux_link);
LIST_REMOVE(t, tht_hash_link);
if(t->tht_runstatus != TRANSPORT_IDLE)
if(t->tht_status != TRANSPORT_IDLE)
transport_stop(t);
t->tht_status = TRANSPORT_ZOMBIE;
free(t->tht_identifier);
free(t->tht_svcname);
free(t->tht_chname);
free(t->tht_provider);
while((st = LIST_FIRST(&sp->sp_components)) != NULL) {
while((st = LIST_FIRST(&t->tht_components)) != NULL) {
LIST_REMOVE(st, st_link);
free(st);
}
abort();// serviceprobe_delete(t);
free(t);
transport_unref(t);
}
@ -489,8 +485,9 @@ transport_create(const char *identifier, int type, int source_type)
t->tht_identifier = strdup(identifier);
t->tht_type = type;
t->tht_source_type = source_type;
t->tht_refcount = 1;
streaming_pad_init(&t->tht_streaming_pad, &t->tht_stream_mutex);
streaming_pad_init(&t->tht_streaming_pad);
LIST_INSERT_HEAD(&transporthash[hash], t, tht_hash_link);
return t;
@ -524,13 +521,12 @@ th_stream_t *
transport_add_stream(th_transport_t *t, int pid,
streaming_component_type_t type)
{
streaming_pad_t *sp = &t->tht_streaming_pad;
th_stream_t *st;
int i = 0;
lock_assert(&t->tht_stream_mutex);
LIST_FOREACH(st, &sp->sp_components, st_link) {
LIST_FOREACH(st, &t->tht_components, st_link) {
i++;
if(pid != -1 && st->st_pid == pid)
return st;
@ -539,7 +535,7 @@ transport_add_stream(th_transport_t *t, int pid,
st = calloc(1, sizeof(th_stream_t));
st->st_index = i;
st->st_type = type;
LIST_INSERT_HEAD(&sp->sp_components, st, st_link);
LIST_INSERT_HEAD(&t->tht_components, st, st_link);
st->st_pid = pid;
st->st_demuxer_fd = -1;
@ -603,11 +599,12 @@ transport_data_timeout(void *aux)
{
th_transport_t *t = aux;
if(t->tht_last_status)
return; /* Something has happend so we don't have to update */
transport_signal_status(t, t->tht_packets ? SUBSCRIPTION_RAW_INPUT :
SUBSCRIPTION_NO_INPUT);
pthread_mutex_lock(&t->tht_stream_mutex);
if(t->tht_feed_status == TRANSPORT_FEED_UNKNOWN)
transport_set_feed_status(t, t->tht_input_status);
pthread_mutex_unlock(&t->tht_stream_mutex);
}
@ -647,21 +644,25 @@ transport_is_tv(th_transport_t *t)
int
transport_is_available(th_transport_t *t)
{
return transport_servicetype_txt(t) &&
LIST_FIRST(&t->tht_streaming_pad.sp_components);
return transport_servicetype_txt(t) && LIST_FIRST(&t->tht_components);
}
/**
*
*/
void
transport_signal_status(th_transport_t *t, int newstatus)
transport_set_feed_status(th_transport_t *t, transport_feed_status_t newstatus)
{
if(t->tht_last_status == newstatus)
lock_assert(&t->tht_stream_mutex);
if(t->tht_feed_status == newstatus)
return;
t->tht_last_status = newstatus;
subscription_janitor_has_duty();
t->tht_feed_status = newstatus;
streaming_pad_deliver(&t->tht_streaming_pad,
streaming_msg_create_code(SMT_TRANSPORT_STATUS,
newstatus));
}
@ -669,10 +670,10 @@ transport_signal_status(th_transport_t *t, int newstatus)
* Table for status -> text conversion
*/
static struct strtab transportstatustab[] = {
{ "Ok", SUBSCRIPTION_VALID_PACKETS },
{ "No input", SUBSCRIPTION_NO_INPUT },
{ "No descrambler", SUBSCRIPTION_NO_DESCRAMBLER },
{ "No access", SUBSCRIPTION_NO_ACCESS },
{ "Ok", TRANSPORT_FEED_VALID_PACKETS },
{ "No input", TRANSPORT_FEED_NO_INPUT },
{ "No descrambler", TRANSPORT_FEED_NO_DESCRAMBLER },
{ "No access", TRANSPORT_FEED_NO_ACCESS },
};
@ -682,3 +683,32 @@ transport_status_to_text(int status)
return val2str(status, transportstatustab) ?: "Unknown";
}
/**
* Generate a message containing info about all components
*
* Note: This is the same as the one in HTSP.subscriptionStart so take
* great care if you change anying. (Just adding is fine)
*/
htsmsg_t *
transport_build_stream_msg(th_transport_t *t)
{
htsmsg_t *streams, *c;
th_stream_t *st;
lock_assert(&t->tht_stream_mutex);
/* Setup each stream */
streams = htsmsg_create_list();
LIST_FOREACH(st, &t->tht_components, st_link) {
c = htsmsg_create_map();
htsmsg_add_u32(c, "index", st->st_index);
htsmsg_add_str(c, "type", streaming_component_type2txt(st->st_type));
if(st->st_lang[0])
htsmsg_add_str(c, "language", st->st_lang);
htsmsg_add_msg(streams, NULL, c);
}
return streams;
}

View file

@ -20,6 +20,7 @@
#define TRANSPORTS_H
#include "channels.h"
#include "htsmsg.h"
#include "subscriptions.h"
unsigned int transport_compute_weight(struct th_transport_list *head);
@ -29,6 +30,10 @@ int transport_start(th_transport_t *t, unsigned int weight, int force_start);
th_transport_t *transport_create(const char *identifier, int type,
int source_type);
void transport_unref(th_transport_t *t);
void transport_ref(th_transport_t *t);
th_transport_t *transport_find_by_identifier(const char *identifier);
void transport_map_channel(th_transport_t *t, channel_t *ch);
@ -52,7 +57,8 @@ int transport_is_available(th_transport_t *t);
void transport_destroy(th_transport_t *t);
void transport_signal_status(th_transport_t *t, int newstatus);
void transport_set_feed_status(th_transport_t *t,
transport_feed_status_t newstatus);
const char *transport_status_to_text(int status);
@ -65,16 +71,16 @@ void transport_remove_subscriber(th_transport_t *t, th_subscription_t *s);
static inline th_stream_t *
transport_find_stream_by_pid(th_transport_t *t, int pid)
{
streaming_pad_t *sp = &t->tht_streaming_pad;
th_stream_t *st;
LIST_FOREACH(st, &sp->sp_components, st_link) {
LIST_FOREACH(st, &t->tht_components, st_link) {
if(st->st_pid == pid)
return st;
}
return NULL;
}
htsmsg_t *transport_build_stream_msg(th_transport_t *t);
#endif /* TRANSPORTS_H */

View file

@ -71,13 +71,8 @@ got_section(th_transport_t *t, th_stream_t *st)
static void
ts_recv_packet0(th_transport_t *t, th_stream_t *st, uint8_t *tsb)
{
th_subscription_t *s;
int off, len, pusi, cc, err = 0;
LIST_FOREACH(s, &t->tht_subscriptions, ths_transport_link)
if(s->ths_raw_input != NULL)
s->ths_raw_input(s, tsb, 188, st, s->ths_opaque);
/* Check CC */
if(tsb[3] & 0x10) {
@ -194,6 +189,8 @@ ts_recv_packet1(th_transport_t *t, uint8_t *tsb)
int pid, n, m, r;
th_descrambler_t *td;
t->tht_input_status = TRANSPORT_FEED_NO_DEMUXED_INPUT;
if(tsb[1] & 0x80)
return; /* Transport Error Indicator */
@ -201,7 +198,7 @@ ts_recv_packet1(th_transport_t *t, uint8_t *tsb)
if((st = transport_find_stream_by_pid(t, pid)) == NULL)
return;
t->tht_packets = 1;
t->tht_input_status = TRANSPORT_FEED_RAW_INPUT;
pthread_mutex_lock(&t->tht_stream_mutex);
@ -230,9 +227,9 @@ ts_recv_packet1(th_transport_t *t, uint8_t *tsb)
}
if(n == 0) {
transport_signal_status(t, SUBSCRIPTION_NO_DESCRAMBLER);
transport_set_feed_status(t, TRANSPORT_FEED_NO_DESCRAMBLER);
} else if(m == n) {
transport_signal_status(t, SUBSCRIPTION_NO_ACCESS);
transport_set_feed_status(t, TRANSPORT_FEED_NO_ACCESS);
}
} else {
ts_recv_packet0(t, st, tsb);

View file

@ -137,12 +137,6 @@ typedef enum {
typedef struct streaming_pad {
struct streaming_target_list sp_targets;
int sp_ntargets;
struct th_stream_list sp_components;
pthread_mutex_t *sp_mutex; /* Mutex for protecting modification of
st_targets and delivery.
This needs to be created elsewhere.
The mutex also protect sp_comonents */
} streaming_pad_t;
@ -151,9 +145,13 @@ TAILQ_HEAD(streaming_message_queue, streaming_message);
/**
* Streaming messages types
*/
typedef enum {
SMT_PACKET,
SMT_PACKET, // sm_data is a th_pkt. Unref when destroying msg
SMT_START, // sm_data is a htsmsg, see transport_build_stream_msg()
SMT_STOP, // sm_data is a htsmsg
SMT_TRANSPORT_STATUS, // sm_code is TRANSPORT_STATUS_
SMT_EXIT, // Used to signal exit to threads
SMT_NOSOURCE,
} streaming_message_type_t;
@ -163,8 +161,10 @@ typedef enum {
typedef struct streaming_message {
TAILQ_ENTRY(streaming_message) sm_link;
streaming_message_type_t sm_type;
void *sm_data;
union {
void *sm_data;
int sm_code;
};
} streaming_message_t;
/**
@ -194,13 +194,6 @@ typedef struct streaming_queue {
packets */
struct streaming_message_queue sq_queue;
enum {
SQ_IDLE,
SQ_RUNNING,
SQ_STOP_REQ,
SQ_ZOMBIE,
} sq_status;
} streaming_queue_t;
@ -320,46 +313,33 @@ typedef struct th_stream {
} th_stream_t;
/**
* Transport events, these are sent to subscribers via
* s->ths_event_callback
*
*/
typedef enum {
SUBSCRIPTION_EVENT_INVALID = 0, /* mbz */
/** No status known */
TRANSPORT_FEED_UNKNOWN,
/** Transport is receiving data from source */
SUBSCRIPTION_TRANSPORT_RUN,
/** No input is received from source at all */
TRANSPORT_FEED_NO_INPUT,
/** No input is received from source */
SUBSCRIPTION_NO_INPUT,
/** No descrambler is able to decrypt the stream */
SUBSCRIPTION_NO_DESCRAMBLER,
/** Potential descrambler is available, but access is denied */
SUBSCRIPTION_NO_ACCESS,
/** No input is received from source destined for this transport */
TRANSPORT_FEED_NO_DEMUXED_INPUT,
/** Raw input seen but nothing has really been decoded */
SUBSCRIPTION_RAW_INPUT,
TRANSPORT_FEED_RAW_INPUT,
/** Packet are being parsed. Only signalled if at least one muxer is
registerd */
SUBSCRIPTION_VALID_PACKETS,
/** No descrambler is able to decrypt the stream */
TRANSPORT_FEED_NO_DESCRAMBLER,
/** No transport is available for delivering subscription */
SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE,
/** Potential descrambler is available, but access is denied */
TRANSPORT_FEED_NO_ACCESS,
/** Transport no longer runs, it was needed by someone with higher
priority */
SUBSCRIPTION_TRANSPORT_LOST,
/** Subscription destroyed */
SUBSCRIPTION_DESTROYED,
} subscription_event_t;
/** Packet are being parsed. */
TRANSPORT_FEED_VALID_PACKETS,
} transport_feed_status_t;
/**
@ -380,10 +360,37 @@ typedef struct th_transport {
} tht_type;
enum {
/**
* Transport is idle.
*/
TRANSPORT_IDLE,
/**
* Transport producing output
*/
TRANSPORT_RUNNING,
TRANSPORT_PROBING,
} tht_runstatus;
/**
* Destroyed, but pointer is till valid.
* This would be the case if transport_destroy() did not actually free
* the transport because there are references held to it.
*
* Reference counts can be used so that code can hold a pointer to
* a transport without having the global lock.
*
* Note: No fields in the transport may be accessed without the
* global lock held. Thus, the global_lock must be reaquired and
* then tht_status must be checked. If it is ZOMBIE the code must
* just drop the refcount and pretend that the transport never
* was there in the first place.
*/
TRANSPORT_ZOMBIE,
} tht_status;
/**
* Refcount, operated using atomic.h ops.
*/
int tht_refcount;
/**
* Source type is used to determine if an output requesting
@ -485,11 +492,6 @@ typedef struct th_transport {
struct channel *tht_ch;
char *tht_chname;
/**
* Last known status (or error)
*/
int tht_last_status;
/**
* Service probe, see serviceprobe.c for details
*/
@ -499,15 +501,10 @@ typedef struct th_transport {
/**
* Timer which is armed at transport start. Once it fires
* it will check if any packets has been parsed. If not the status
* will be set to SUBSCRIPTION_NO_INPUT
* will be set to TRANSPORT_STATUS_NO_INPUT
*/
gtimer_t tht_receive_timer;
/**
* Set as soon as we get some kind of activity
*/
int tht_packets;
/*********************************************************
*
* Streaming part of transport
@ -527,6 +524,16 @@ typedef struct th_transport {
*/
pthread_mutex_t tht_stream_mutex;
/**
* Last known data status (or error)
*/
transport_feed_status_t tht_feed_status;
/**
* Set as soon as we get some kind of activity
*/
transport_feed_status_t tht_input_status;
/**
* For simple streaming sources (such as video4linux) keeping
@ -572,10 +579,12 @@ typedef struct th_transport {
int tht_pmt_seen;
struct th_stream_list tht_components;
/**
* Delivery pad, this is were we finally deliver all streaming output
*/
streaming_pad_t tht_streaming_pad;

View file

@ -565,7 +565,6 @@ extjs_dvbadapter(http_connection_t *hc, const char *remain, void *opaque)
static htsmsg_t *
build_transport_msg(th_transport_t *t)
{
streaming_pad_t *sp = &t->tht_streaming_pad;
htsmsg_t *r = htsmsg_create_map();
th_stream_t *st;
@ -581,14 +580,14 @@ build_transport_msg(th_transport_t *t)
htsmsg_add_str(r, "network", t->tht_networkname(t));
htsmsg_add_str(r, "source", t->tht_sourcename(t));
htsmsg_add_str(r, "status", transport_status_to_text(t->tht_last_status));
htsmsg_add_str(r, "status", transport_status_to_text(t->tht_feed_status));
video[0] = 0;
audio[0] = 0;
subtitles[0] = 0;
scrambling[0] = 0;
LIST_FOREACH(st, &sp->sp_components, st_link) {
LIST_FOREACH(st, &t->tht_components, st_link) {
switch(st->st_type) {
case SCT_TELETEXT: