diff --git a/src/cwc.c b/src/cwc.c index f1d0a803..a2d2afc7 100644 --- a/src/cwc.c +++ b/src/cwc.c @@ -900,10 +900,9 @@ cwc_transport_destroy(th_descrambler_t *td) static inline th_stream_t * cwc_find_stream_by_caid(th_transport_t *t, int caid) { - streaming_pad_t *sp = &t->tht_streaming_pad; th_stream_t *st; - LIST_FOREACH(st, &sp->sp_components, st_link) { + LIST_FOREACH(st, &t->tht_components, st_link) { if(st->st_caid == caid) return st; } diff --git a/src/dvb/dvb_adapter.c b/src/dvb/dvb_adapter.c index a0b841a6..243f6e26 100644 --- a/src/dvb/dvb_adapter.c +++ b/src/dvb/dvb_adapter.c @@ -444,7 +444,7 @@ dvb_adapter_clone(th_dvb_adapter_t *dst, th_dvb_adapter_t *src) pthread_mutex_lock(&t_src->tht_stream_mutex); - LIST_FOREACH(st_src, &t_src->tht_streaming_pad.sp_components, st_link) { + LIST_FOREACH(st_src, &t_src->tht_components, st_link) { st_dst = transport_add_stream(t_dst, st_src->st_pid, diff --git a/src/dvb/dvb_transport.c b/src/dvb/dvb_transport.c index 68a0b0ef..903b1517 100644 --- a/src/dvb/dvb_transport.c +++ b/src/dvb/dvb_transport.c @@ -57,7 +57,6 @@ static int dvb_transport_start(th_transport_t *t, unsigned int weight, int status, int force_start) { - streaming_pad_t *sp = &t->tht_streaming_pad; struct dmx_pes_filter_params dmx_param; th_stream_t *st; int w, fd, pid; @@ -81,7 +80,7 @@ dvb_transport_start(th_transport_t *t, unsigned int weight, int status, } tdmi = t->tht_dvb_mux_instance; - LIST_FOREACH(st, &sp->sp_components, st_link) { + LIST_FOREACH(st, &t->tht_components, st_link) { fd = open(tda->tda_demux_path, O_RDWR); pid = st->st_pid; @@ -116,7 +115,7 @@ dvb_transport_start(th_transport_t *t, unsigned int weight, int status, pthread_mutex_lock(&tda->tda_delivery_mutex); LIST_INSERT_HEAD(&tda->tda_transports, t, tht_active_link); - t->tht_runstatus = status; + t->tht_status = status; dvb_fe_tune(tdmi); @@ -133,7 +132,6 @@ dvb_transport_start(th_transport_t *t, unsigned int weight, int status, static void dvb_transport_stop(th_transport_t *t) { - streaming_pad_t *sp = &t->tht_streaming_pad; th_dvb_adapter_t *tda = t->tht_dvb_mux_instance->tdmi_adapter; th_stream_t *st; @@ -143,11 +141,11 @@ dvb_transport_stop(th_transport_t *t) LIST_REMOVE(t, tht_active_link); pthread_mutex_unlock(&tda->tda_delivery_mutex); - LIST_FOREACH(st, &sp->sp_components, st_link) { + LIST_FOREACH(st, &t->tht_components, st_link) { close(st->st_demuxer_fd); st->st_demuxer_fd = -1; } - t->tht_runstatus = TRANSPORT_IDLE; + t->tht_status = TRANSPORT_IDLE; } diff --git a/src/dvr/dvr.h b/src/dvr/dvr.h index 9158a5ef..06e73856 100644 --- a/src/dvr/dvr.h +++ b/src/dvr/dvr.h @@ -92,7 +92,10 @@ typedef struct dvr_entry { /** * Fields for recording */ + pthread_t de_thread; + th_subscription_t *de_s; + streaming_queue_t de_sq; /** @@ -106,7 +109,6 @@ typedef struct dvr_entry { all commercial breaks so far */ struct dvr_rec_stream_list de_streams; - streaming_queue_t de_sq; AVFormatContext *de_fctx; enum { diff --git a/src/dvr/dvr_rec.c b/src/dvr/dvr_rec.c index 0bed592b..904d7613 100644 --- a/src/dvr/dvr_rec.c +++ b/src/dvr/dvr_rec.c @@ -46,85 +46,25 @@ typedef struct dvr_rec_stream { /** * */ -static void dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp); -static void dvr_rec_stop(dvr_entry_t *de); static void *dvr_thread(void *aux); static void dvr_thread_new_pkt(dvr_entry_t *de, th_pkt_t *pkt); static void dvr_spawn_postproc(dvr_entry_t *de); static void dvr_thread_epilog(dvr_entry_t *de); -/** - * - */ -static void -dvr_subscription_callback(struct th_subscription *s, - subscription_event_t event, void *opaque) -{ - dvr_entry_t *de = opaque; - const char *notifymsg = NULL; - th_transport_t *t; - - switch(event) { - case SUBSCRIPTION_EVENT_INVALID: - abort(); - - case SUBSCRIPTION_TRANSPORT_RUN: - t = s->ths_transport; - dvr_rec_start(de, &t->tht_streaming_pad); - return; - - case SUBSCRIPTION_NO_INPUT: - notifymsg = "No input detected"; - break; - - case SUBSCRIPTION_NO_DESCRAMBLER: - notifymsg = "No descrambler available"; - break; - - case SUBSCRIPTION_NO_ACCESS: - notifymsg = "Access denied"; - break; - - case SUBSCRIPTION_RAW_INPUT: - notifymsg = "Unable to reassemble packets from input"; - break; - - case SUBSCRIPTION_VALID_PACKETS: - return; - - case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE: - notifymsg = "No transport available at the moment, automatic retry"; - break; - - case SUBSCRIPTION_TRANSPORT_LOST: - dvr_rec_stop(de); - notifymsg = "Lost transport"; - break; - - case SUBSCRIPTION_DESTROYED: - dvr_rec_stop(de); /* Recording completed */ - return; - } - if(notifymsg != NULL) - tvhlog(LOG_WARNING, "dvr", "\"%s\" on \"%s\": %s", - de->de_title, de->de_channel->ch_name, notifymsg); -} - - /** * */ void dvr_rec_subscribe(dvr_entry_t *de) { - if(de->de_s != NULL) - return; + assert(de->de_s == NULL); + + streaming_queue_init(&de->de_sq); + + pthread_create(&de->de_thread, NULL, dvr_thread, de); de->de_s = subscription_create_from_channel(de->de_channel, 1000, "pvr", - dvr_subscription_callback, de, - 0); - - + &de->de_sq.sq_st); } /** @@ -133,10 +73,13 @@ dvr_rec_subscribe(dvr_entry_t *de) void dvr_rec_unsubscribe(dvr_entry_t *de) { - if(de->de_s == NULL) - return; + assert(de->de_s != NULL); subscription_unsubscribe(de->de_s); + + streaming_target_deliver(&de->de_sq.sq_st, streaming_msg_create(SMT_EXIT)); + + pthread_join(de->de_thread, NULL); de->de_s = NULL; } @@ -299,9 +242,8 @@ dvr_rec_fatal_error(dvr_entry_t *de, const char *fmt, ...) * */ static void -dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp) +dvr_rec_start(dvr_entry_t *de, htsmsg_t *streams) { - th_stream_t *st; dvr_rec_stream_t *drs; AVOutputFormat *fmt; AVFormatContext *fctx; @@ -312,8 +254,10 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp) const char *codec_name; char urlname[512]; int err; - pthread_t ptid; - pthread_attr_t attr; + htsmsg_field_t *f; + htsmsg_t *sub; + const char *type, *lang; + uint32_t idx; if(pvr_generate_filename(de) != 0) { dvr_rec_fatal_error(de, "Unable to create directories"); @@ -357,40 +301,38 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp) av_set_parameters(fctx, NULL); - - pthread_mutex_lock(sp->sp_mutex); - /** * Setup each stream */ - LIST_FOREACH(st, &sp->sp_components, st_link) { - - switch(st->st_type) { - default: + HTSMSG_FOREACH(f, streams) { + if(f->hmf_type != HMF_MAP) continue; - case SCT_MPEG2VIDEO: - codec_id = CODEC_ID_MPEG2VIDEO; - codec_type = CODEC_TYPE_VIDEO; - codec_name = "mpeg2 video"; - break; + sub = &f->hmf_msg; - case SCT_MPEG2AUDIO: - codec_id = CODEC_ID_MP2; + if((type = htsmsg_get_str(sub, "type")) == NULL) + continue; + + if(htsmsg_get_u32(sub, "index", &idx)) + continue; + + if(!strcmp(type, "AC3")) { + codec_id = CODEC_ID_AC3; codec_type = CODEC_TYPE_AUDIO; - codec_name = "mpeg2 audio"; - break; - - case SCT_AC3: - codec_id = CODEC_ID_AC3; + codec_name = "AC-3"; + } else if(!strcmp(type, "MPEG2AUDIO")) { + codec_id = CODEC_ID_MP2; codec_type = CODEC_TYPE_AUDIO; - codec_name = "AC3 audio"; - break; - - case SCT_H264: - codec_id = CODEC_ID_H264; + codec_name = "MPEG"; + } else if(!strcmp(type, "MPEG2VIDEO")) { + codec_id = CODEC_ID_MPEG2VIDEO; codec_type = CODEC_TYPE_VIDEO; - codec_name = "h.264 video"; - break; + codec_name = "MPEG-2"; + } else if(!strcmp(type, "H264")) { + codec_id = CODEC_ID_H264; + codec_type = CODEC_TYPE_VIDEO; + codec_name = "H264"; + } else { + continue; } codec = avcodec_find_decoder(codec_id); @@ -402,7 +344,7 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp) } drs = calloc(1, sizeof(dvr_rec_stream_t)); - drs->drs_source_index = st->st_index; + drs->drs_source_index = idx; drs->drs_lavf_stream = av_new_stream(fctx, fctx->nb_streams); @@ -418,56 +360,14 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp) continue; } - memcpy(drs->drs_lavf_stream->language, st->st_lang, 4); + if((lang = htsmsg_get_str(sub, "language")) != NULL) + memcpy(drs->drs_lavf_stream->language, lang, 4); + LIST_INSERT_HEAD(&de->de_streams, drs, drs_link); } - /* Link to the pad */ - - streaming_queue_init(&de->de_sq); - streaming_target_connect(sp, &de->de_sq.sq_st); - de->de_sq.sq_status = SQ_RUNNING; de->de_fctx = fctx; de->de_ts_offset = AV_NOPTS_VALUE; - - pthread_mutex_unlock(sp->sp_mutex); - - de->de_refcnt++; - - /* Start the recorder thread */ - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - - pthread_create(&ptid, &attr, dvr_thread, de); - -} - - -/** - * Called from subscription callback when we no longer have - * access to stream - */ -static void -dvr_rec_stop(dvr_entry_t *de) -{ - streaming_queue_t *sq = &de->de_sq; - - streaming_target_disconnect(&sq->sq_st); - - pthread_mutex_lock(&sq->sq_mutex); - - if(sq->sq_status == SQ_RUNNING) { - sq->sq_status = SQ_STOP_REQ; - - pthread_cond_signal(&sq->sq_cond); - - while(sq->sq_status != SQ_ZOMBIE) - pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); - } - - streaming_queue_clear(&sq->sq_queue); - pthread_mutex_unlock(&sq->sq_mutex); } @@ -480,14 +380,11 @@ dvr_thread(void *aux) dvr_entry_t *de = aux; streaming_queue_t *sq = &de->de_sq; streaming_message_t *sm; + int run = 1; pthread_mutex_lock(&sq->sq_mutex); - de->de_header_written = 0; - de->de_rec_state = DE_RS_WAIT_AUDIO_LOCK; - - while(sq->sq_status == SQ_RUNNING) { - + while(run) { sm = TAILQ_FIRST(&sq->sq_queue); if(sm == NULL) { pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); @@ -504,26 +401,38 @@ dvr_thread(void *aux) dvr_thread_new_pkt(de, sm->sm_data); pkt_ref_dec(sm->sm_data); break; + + case SMT_START: + assert(de->de_fctx == NULL); + + pthread_mutex_lock(&global_lock); + dvr_rec_start(de, sm->sm_data); + de->de_header_written = 0; + de->de_rec_state = DE_RS_WAIT_AUDIO_LOCK; + pthread_mutex_unlock(&global_lock); + break; + + case SMT_STOP: + dvr_thread_epilog(de); + break; + + case SMT_TRANSPORT_STATUS: + break; + + case SMT_NOSOURCE: + dvr_rec_fatal_error(de, + "No source transport available, automatic retry"); + break; + + case SMT_EXIT: + run = 0; + break; } free(sm); pthread_mutex_lock(&sq->sq_mutex); } - - /* Signal back that we no longer is running */ - sq->sq_status = SQ_ZOMBIE; - pthread_cond_signal(&sq->sq_cond); - pthread_mutex_unlock(&sq->sq_mutex); - - dvr_thread_epilog(de); - - pthread_mutex_lock(&global_lock); - dvr_entry_dec_ref(de); /* Past this we may no longer - dereference de */ - pthread_mutex_unlock(&global_lock); - - /* Fade out ... */ return NULL; } @@ -772,7 +681,8 @@ dvr_thread_epilog(dvr_entry_t *de) AVStream *st; int i; - assert(fctx != NULL); + if(fctx == NULL) + return; /* Write trailer if we've written anything at all */ diff --git a/src/htsp.c b/src/htsp.c index 737ae387..29aae7d0 100644 --- a/src/htsp.c +++ b/src/htsp.c @@ -47,12 +47,16 @@ extern const char *htsversion; LIST_HEAD(htsp_connection_list, htsp_connection); +LIST_HEAD(htsp_subscription_list, htsp_subscription); TAILQ_HEAD(htsp_msg_queue, htsp_msg); TAILQ_HEAD(htsp_msg_q_queue, htsp_msg_q); static struct htsp_connection_list htsp_async_connections; +static void htsp_streaming_input(void *opaque, streaming_message_t *sm); + + /** * */ @@ -122,7 +126,7 @@ typedef struct htsp_connection { /** * */ - struct th_subscription_list htsp_subscriptions; + struct htsp_subscription_list htsp_subscriptions; uint32_t htsp_granted_access; @@ -130,23 +134,28 @@ typedef struct htsp_connection { } htsp_connection_t; + /** * */ -typedef struct htsp_stream { - streaming_target_t hs_st; - - int hs_sid; /* Subscription ID */ - +typedef struct htsp_subscription { htsp_connection_t *hs_htsp; + LIST_ENTRY(htsp_subscription) hs_link; + + int hs_sid; /* Subscription ID (set by client) */ + + th_subscription_t *hs_s; // Temporary + + streaming_target_t hs_input; + htsp_msg_q_t hs_q; time_t hs_last_report; /* Last queue status report sent */ int hs_dropstats[PKT_NTYPES]; -} htsp_stream_t; +} htsp_subscription_t; @@ -170,16 +179,6 @@ htsp_update_logname(htsp_connection_t *htsp) } -/** - * - */ -static void htsp_subscription_callback(struct th_subscription *s, - subscription_event_t event, - void *opaque); - - - - /** * */ @@ -209,7 +208,7 @@ htsp_init_queue(htsp_msg_q_t *hmq, int strict_prio) * */ static void -htsp_destroy_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq) +htsp_flush_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq) { htsp_msg_t *hm; @@ -223,6 +222,19 @@ htsp_destroy_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq) } +/** + * + */ +static void +htsp_subscription_destroy(htsp_connection_t *htsp, htsp_subscription_t *hs) +{ + LIST_REMOVE(hs, hs_link); + subscription_unsubscribe(hs->hs_s); + htsp_flush_queue(htsp, &hs->hs_q); + free(hs); +} + + /** * */ @@ -435,9 +447,9 @@ htsp_method_getEvent(htsp_connection_t *htsp, htsmsg_t *in) static htsmsg_t * htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) { - th_subscription_t *s; uint32_t chid, sid; channel_t *ch; + htsp_subscription_t *hs; if(htsmsg_get_u32(in, "channelId", &chid)) return htsp_error("Missing argument 'channeId'"); @@ -450,15 +462,24 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) /* - * We send the reply here or the user will get the 'subscriptionStart' + * We send the reply now to avoid the user getting the 'subscriptionStart' * async message before the reply to 'subscribe'. */ htsp_reply(htsp, in, htsmsg_create_map()); - s = subscription_create_from_channel(ch, 500, "htsp", - htsp_subscription_callback, htsp, sid); + /* Initialize the HTSP subscription structure */ + + hs = calloc(1, sizeof(htsp_subscription_t)); + + hs->hs_htsp = htsp; + htsp_init_queue(&hs->hs_q, 0); + + hs->hs_sid = sid; + LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link); + streaming_target_init(&hs->hs_input, htsp_streaming_input, hs); + + hs->hs_s = subscription_create_from_channel(ch, 500, "htsp", &hs->hs_input); - LIST_INSERT_HEAD(&htsp->htsp_subscriptions, s, ths_subscriber_link); return NULL; } @@ -469,27 +490,26 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in) static htsmsg_t * htsp_method_unsubscribe(htsp_connection_t *htsp, htsmsg_t *in) { - th_subscription_t *s; + htsp_subscription_t *s; uint32_t sid; if(htsmsg_get_u32(in, "subscriptionId", &sid)) return htsp_error("Missing argument 'subscriptionId'"); - LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link) - if(s->ths_u32 == sid) + LIST_FOREACH(s, &htsp->htsp_subscriptions, hs_link) + if(s->hs_sid == sid) break; - + /* - * We send the reply here or the user will get the 'subscriptionStart' - * async message before the reply to 'subscribe'. + * We send the reply here or the user will get the 'subscriptionStop' + * async message before the reply to 'unsubscribe'. */ htsp_reply(htsp, in, htsmsg_create_map()); if(s == NULL) return NULL; /* Subscription did not exist, but we don't really care */ - LIST_REMOVE(s, ths_subscriber_link); - subscription_unsubscribe(s); + htsp_subscription_destroy(htsp, s); return NULL; } @@ -810,7 +830,7 @@ htsp_serve(int fd, void *opaque, struct sockaddr_in *source) { htsp_connection_t htsp; char buf[30]; - th_subscription_t *s; + htsp_subscription_t *s; snprintf(buf, sizeof(buf), "%s", inet_ntoa(source->sin_addr)); @@ -849,8 +869,7 @@ htsp_serve(int fd, void *opaque, struct sockaddr_in *source) down in the streaming code. So we do this as early as possible to avoid any weird lockups */ while((s = LIST_FIRST(&htsp.htsp_subscriptions)) != NULL) { - LIST_REMOVE(s, ths_subscriber_link); - subscription_unsubscribe(s); + htsp_subscription_destroy(&htsp, s); } if(htsp.htsp_async_mode) @@ -984,7 +1003,7 @@ htsp_tag_delete(channel_tag_t *ct) htsp_async_send(m); } - +#if 0 /** * */ @@ -996,13 +1015,14 @@ htsp_send_subscription_status(htsp_connection_t *htsp, th_subscription_t *ths, m = htsmsg_create_map(); htsmsg_add_str(m, "method", "subscriptionStatus"); - htsmsg_add_u32(m, "subscriptionId", ths->ths_u32); + abort(); //htsmsg_add_u32(m, "subscriptionId", ths->ths_u32); if(txt != NULL) htsmsg_add_str(m, "status", txt); htsp_send_message(htsp, m, NULL); } +#endif const static char frametypearray[PKT_NTYPES] = { [PKT_I_FRAME] = 'I', @@ -1014,18 +1034,14 @@ const static char frametypearray[PKT_NTYPES] = { * Build a htsmsg from a th_pkt and enqueue it on our HTSP transport */ static void -htsp_stream_deliver(void *opaque, streaming_message_t *sm) +htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt) { - htsp_stream_t *hs = opaque; - th_pkt_t *pkt = sm->sm_data; htsmsg_t *m = htsmsg_create_map(), *n; htsp_msg_t *hm; htsp_connection_t *htsp = hs->hs_htsp; int64_t ts; int qlen = hs->hs_q.hmq_payload; - free(sm); - if((qlen > 500000 && pkt->pkt_frametype == PKT_B_FRAME) || (qlen > 750000 && pkt->pkt_frametype == PKT_P_FRAME) || (qlen > 1500000)) { @@ -1095,154 +1111,60 @@ htsp_stream_deliver(void *opaque, streaming_message_t *sm) /** * Send a 'subscriptionStart' message to client informing about * delivery start and all components. - * - * Setup a streaming target that will deliver packets to the HTSP transport. */ static void -htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s, - streaming_pad_t *sp) +htsp_subscription_start(htsp_subscription_t *hs, htsmsg_t *streams) { - htsp_stream_t *hs; - htsmsg_t *m, *streams, *c; - th_stream_t *st; - - assert(s->ths_st == NULL); - - hs = calloc(1, sizeof(htsp_stream_t)); - hs->hs_htsp = htsp; - hs->hs_sid = s->ths_u32; - htsp_init_queue(&hs->hs_q, 0); - - m = htsmsg_create_map(); + htsmsg_t *m = htsmsg_create_map(); htsmsg_add_str(m, "method", "subscriptionStart"); - htsmsg_add_u32(m, "subscriptionId", s->ths_u32); - - streaming_target_init2(&hs->hs_st, htsp_stream_deliver, hs); - - /** - * Lock streming pad delivery so we can hook us up. - */ - pthread_mutex_lock(sp->sp_mutex); - - /* Setup each stream */ - streams = htsmsg_create_list(); - LIST_FOREACH(st, &sp->sp_components, st_link) { - c = htsmsg_create_map(); - htsmsg_add_u32(c, "index", st->st_index); - htsmsg_add_str(c, "type", streaming_component_type2txt(st->st_type)); - if(st->st_lang[0]) - htsmsg_add_str(c, "language", st->st_lang); - htsmsg_add_msg(streams, NULL, c); - } - + htsmsg_add_u32(m, "subscriptionId", hs->hs_sid); htsmsg_add_msg(m, "streams", streams); - htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0); - - /* Link to the pad */ - streaming_target_connect(sp, &hs->hs_st); - - s->ths_st = &hs->hs_st; - - /* Once we unlock here we will start getting the callback */ - pthread_mutex_unlock(sp->sp_mutex); } /** * Send a 'subscriptionStart' stop - * - * Tear down all streaming related stuff. */ static void -htsp_subscription_stop(htsp_connection_t *htsp, th_subscription_t *s, - const char *reason) +htsp_subscription_stop(htsp_subscription_t *hs, htsmsg_t *m) { - htsp_stream_t *hs; - htsmsg_t *m; - - assert(s->ths_st != NULL); - - hs = (htsp_stream_t *)s->ths_st; - - /* Unlink from pad */ - streaming_target_disconnect(&hs->hs_st); - - /* Send a stop message back */ - m = htsmsg_create_map(); - htsmsg_add_u32(m, "subscriptionId", hs->hs_sid); htsmsg_add_str(m, "method", "subscriptionStop"); - - if(reason) - htsmsg_add_str(m, "reason", reason); - - /* Send on normal control queue cause we are about do destroy - the per-subscription queue */ - htsp_send_message(hs->hs_htsp, m, NULL); - - htsp_destroy_queue(htsp, &hs->hs_q); - - free(hs); - - s->ths_st = NULL; + htsmsg_add_u32(m, "subscriptionId", hs->hs_sid); + htsp_send(hs->hs_htsp, m, NULL, &hs->hs_q, 0); } - - /** * */ static void -htsp_subscription_callback(struct th_subscription *s, - subscription_event_t event, void *opaque) +htsp_streaming_input(void *opaque, streaming_message_t *sm) { - htsp_connection_t *htsp = opaque; - th_transport_t *t; + htsp_subscription_t *hs = opaque; - switch(event) { - case SUBSCRIPTION_EVENT_INVALID: + switch(sm->sm_type) { + case SMT_PACKET: + htsp_stream_deliver(hs, sm->sm_data); // reference is transfered + break; + + case SMT_START: + htsp_subscription_start(hs, sm->sm_data); + break; + + case SMT_STOP: + htsp_subscription_stop(hs, sm->sm_data); + break; + + case SMT_TRANSPORT_STATUS: + break; + + case SMT_NOSOURCE: + break; + + case SMT_EXIT: abort(); - - case SUBSCRIPTION_TRANSPORT_RUN: - htsp_send_subscription_status(htsp, s, NULL); - - t = s->ths_transport; - htsp_subscription_start(htsp, s, &t->tht_streaming_pad); - return; - - case SUBSCRIPTION_NO_INPUT: - htsp_send_subscription_status(htsp, s, "No input detected"); - break; - - case SUBSCRIPTION_NO_DESCRAMBLER: - htsp_send_subscription_status(htsp, s, "No descrambler available"); - break; - - case SUBSCRIPTION_NO_ACCESS: - htsp_send_subscription_status(htsp, s, "Access denied"); - break; - - case SUBSCRIPTION_RAW_INPUT: - htsp_send_subscription_status(htsp, s, - "Unable to reassemble packets from input"); - break; - - case SUBSCRIPTION_VALID_PACKETS: - htsp_send_subscription_status(htsp, s, NULL); - break; - - case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE: - htsp_send_subscription_status(htsp, s, - "No transport available, retrying..."); - break; - - case SUBSCRIPTION_TRANSPORT_LOST: - htsp_subscription_stop(htsp, s, "Transport destroyed"); - break; - - case SUBSCRIPTION_DESTROYED: - htsp_subscription_stop(htsp, s, NULL); - return; } + + free(sm); } diff --git a/src/main.c b/src/main.c index 08ae577c..3ab2347b 100644 --- a/src/main.c +++ b/src/main.c @@ -315,8 +315,6 @@ main(int argc, char **argv) webui_init(contentpath); - subscriptions_init(); - serviceprobe_init(); cwc_init(); diff --git a/src/parsers.c b/src/parsers.c index 75ae49dc..e08df3c1 100644 --- a/src/parsers.c +++ b/src/parsers.c @@ -877,11 +877,12 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) /** * Input is ok */ - transport_signal_status(t, SUBSCRIPTION_VALID_PACKETS); + transport_set_feed_status(t, TRANSPORT_FEED_VALID_PACKETS); /* Forward packet */ pkt->pkt_componentindex = st->st_index; - streaming_pad_deliver_packet(&t->tht_streaming_pad, pkt); + streaming_pad_deliver(&t->tht_streaming_pad, + streaming_msg_create_pkt(pkt)); /* Decrease our own reference to the packet */ pkt_ref_dec(pkt); diff --git a/src/psi.c b/src/psi.c index 41343143..d26691a4 100644 --- a/src/psi.c +++ b/src/psi.c @@ -555,7 +555,6 @@ streaming_component_type2txt(streaming_component_type_t s) void psi_save_transport_settings(htsmsg_t *m, th_transport_t *t) { - streaming_pad_t *sp = &t->tht_streaming_pad; th_stream_t *st; htsmsg_t *sub; @@ -565,7 +564,7 @@ psi_save_transport_settings(htsmsg_t *m, th_transport_t *t) lock_assert(&t->tht_stream_mutex); - LIST_FOREACH(st, &sp->sp_components, st_link) { + LIST_FOREACH(st, &t->tht_components, st_link) { sub = htsmsg_create_map(); htsmsg_add_u32(sub, "pid", st->st_pid); diff --git a/src/serviceprobe.c b/src/serviceprobe.c index 5680532b..00674aab 100644 --- a/src/serviceprobe.c +++ b/src/serviceprobe.c @@ -32,6 +32,7 @@ #include "subscriptions.h" #include "serviceprobe.h" #include "transports.h" +#include "streaming.h" /* List of transports to be probed, protected with global_lock */ @@ -58,77 +59,6 @@ serviceprobe_enqueue(th_transport_t *t) pthread_cond_signal(&serviceprobe_cond); } - -/** - * - */ -static void -serviceprobe_callback(struct th_subscription *s, subscription_event_t event, - void *opaque) -{ - th_transport_t *t = opaque; - channel_t *ch; - const char *errmsg; - - switch(event) { - case SUBSCRIPTION_TRANSPORT_RUN: - return; - - case SUBSCRIPTION_NO_INPUT: - errmsg = "No input detected"; - break; - - case SUBSCRIPTION_NO_DESCRAMBLER: - errmsg = "No descrambler available"; - break; - - case SUBSCRIPTION_NO_ACCESS: - errmsg = "Access denied"; - break; - - case SUBSCRIPTION_RAW_INPUT: - errmsg = "Unable to reassemble packets from input"; - break; - - case SUBSCRIPTION_VALID_PACKETS: - errmsg = NULL; /* All OK */ - break; - - case SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE: - case SUBSCRIPTION_TRANSPORT_LOST: - errmsg = "Unable to probe"; - break; - - case SUBSCRIPTION_DESTROYED: - return; /* All done */ - - default: - abort(); - } - - assert(t == TAILQ_FIRST(&serviceprobe_queue)); - - - if(errmsg != NULL) { - tvhlog(LOG_INFO, "serviceprobe", "%20s: skipped: %s", - t->tht_svcname, errmsg); - } else if(t->tht_ch == NULL) { - ch = channel_find_by_name(t->tht_svcname, 1); - transport_map_channel(t, ch); - - pthread_mutex_lock(&t->tht_stream_mutex); - t->tht_config_change(t); - pthread_mutex_unlock(&t->tht_stream_mutex); - - tvhlog(LOG_INFO, "serviceprobe", "\"%s\" mapped to channel \"%s\"", - t->tht_svcname, t->tht_svcname); - } - - t->tht_sp_onqueue = 0; - TAILQ_REMOVE(&serviceprobe_queue, t, tht_sp_link); - pthread_cond_signal(&serviceprobe_cond); -} - /** * */ @@ -138,9 +68,17 @@ serviceprobe_thread(void *aux) th_transport_t *t; th_subscription_t *s; int was_doing_work = 0; + streaming_queue_t sq; + streaming_message_t *sm; + transport_feed_status_t status; + int run; + const char *err; + channel_t *ch; pthread_mutex_lock(&global_lock); + streaming_queue_init(&sq); + while(1) { while((t = TAILQ_FIRST(&serviceprobe_queue)) == NULL) { @@ -154,18 +92,96 @@ serviceprobe_thread(void *aux) if(!was_doing_work) { tvhlog(LOG_INFO, "serviceprobe", "Starting"); + was_doing_work = 1; } - s = subscription_create_from_transport(t, "serviceprobe", - serviceprobe_callback, t); - s->ths_force_demux = 1; + s = subscription_create_from_transport(t, "serviceprobe", &sq.sq_st); - /* Wait for something to happen */ - while(TAILQ_FIRST(&serviceprobe_queue) == t) - pthread_cond_wait(&serviceprobe_cond, &global_lock); + transport_ref(t); + pthread_mutex_unlock(&global_lock); + run = 1; + pthread_mutex_lock(&sq.sq_mutex); + + while(run) { + + while((sm = TAILQ_FIRST(&sq.sq_queue)) == NULL) + pthread_cond_wait(&sq.sq_cond, &sq.sq_mutex); + TAILQ_REMOVE(&sq.sq_queue, sm, sm_link); + + pthread_mutex_unlock(&sq.sq_mutex); + + if(sm->sm_type == SMT_TRANSPORT_STATUS) { + status = sm->sm_code; + + switch(status) { + case TRANSPORT_FEED_UNKNOWN: + break; + + case TRANSPORT_FEED_NO_INPUT: + err = "No data input from adapter detected"; + run = 0; + break; + + case TRANSPORT_FEED_NO_DEMUXED_INPUT: + err = "No mux packets for this service"; + run = 0; + break; + + case TRANSPORT_FEED_RAW_INPUT: + err = "Data received for service, " + "but no packets could be reassembled"; + run = 0; + break; + + case TRANSPORT_FEED_NO_DESCRAMBLER: + err = "No descrambler available for service"; + run = 0; + break; + + case TRANSPORT_FEED_NO_ACCESS: + err = "Access denied"; + run = 0; + break; + + case TRANSPORT_FEED_VALID_PACKETS: + err = NULL; + run = 0; + break; + } + } + + streaming_msg_free(sm); + pthread_mutex_lock(&sq.sq_mutex); + } + + streaming_queue_clear(&sq.sq_queue); + pthread_mutex_unlock(&sq.sq_mutex); + + pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); - was_doing_work = 1; + + if(t->tht_status != TRANSPORT_ZOMBIE) { + + if(err != NULL) { + tvhlog(LOG_INFO, "serviceprobe", "%20s: skipped: %s", + t->tht_svcname, err); + } else if(t->tht_ch == NULL) { + ch = channel_find_by_name(t->tht_svcname, 1); + transport_map_channel(t, ch); + + pthread_mutex_lock(&t->tht_stream_mutex); + t->tht_config_change(t); + pthread_mutex_unlock(&t->tht_stream_mutex); + + tvhlog(LOG_INFO, "serviceprobe", "\"%s\" mapped to channel \"%s\"", + t->tht_svcname, t->tht_svcname); + } + + t->tht_sp_onqueue = 0; + TAILQ_REMOVE(&serviceprobe_queue, t, tht_sp_link); + } + transport_unref(t); } return NULL; } diff --git a/src/streaming.c b/src/streaming.c index 7868b7d8..e25a56a4 100644 --- a/src/streaming.c +++ b/src/streaming.c @@ -23,18 +23,16 @@ #include "packet.h" void -streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex) +streaming_pad_init(streaming_pad_t *sp) { LIST_INIT(&sp->sp_targets); - LIST_INIT(&sp->sp_components); - sp->sp_mutex = mutex; } /** * */ void -streaming_target_init2(streaming_target_t *st, st_callback_t *cb, void *opaque) +streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque) { st->st_cb = cb; st->st_opaque = opaque; @@ -62,9 +60,7 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm) void streaming_queue_init(streaming_queue_t *sq) { - sq->sq_status = SQ_IDLE; - - streaming_target_init2(&sq->sq_st, streaming_queue_deliver, sq); + streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq); pthread_mutex_init(&sq->sq_mutex, NULL); pthread_cond_init(&sq->sq_cond, NULL); @@ -78,56 +74,104 @@ streaming_queue_init(streaming_queue_t *sq) void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st) { - lock_assert(sp->sp_mutex); - sp->sp_ntargets++; st->st_pad = sp; LIST_INSERT_HEAD(&sp->sp_targets, st, st_link); } +/** + * + */ void -streaming_target_disconnect(streaming_target_t *st) +streaming_target_disconnect(streaming_pad_t *sp, streaming_target_t *st) { - streaming_pad_t *sp = st->st_pad; - - if(sp == NULL) - return; /* Already disconnected */ - - pthread_mutex_lock(sp->sp_mutex); - sp->sp_ntargets--; st->st_pad = NULL; LIST_REMOVE(st, st_link); - - pthread_mutex_unlock(sp->sp_mutex); } + /** * */ -void -streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt) +streaming_message_t * +streaming_msg_create(streaming_message_type_t type) { - streaming_target_t *st; - streaming_message_t *sm; + streaming_message_t *sm = malloc(sizeof(streaming_message_t)); + sm->sm_type = type; + return sm; +} - lock_assert(sp->sp_mutex); - if(sp->sp_ntargets == 0) - return; +/** + * + */ +streaming_message_t * +streaming_msg_create_pkt(th_pkt_t *pkt) +{ + streaming_message_t *sm = streaming_msg_create(SMT_PACKET); + sm->sm_data = pkt; + pkt_ref_inc(pkt); + return sm; +} - /* Increase multiple refcounts at once */ - pkt_ref_inc_poly(pkt, sp->sp_ntargets); - LIST_FOREACH(st, &sp->sp_targets, st_link) { +/** + * + */ +streaming_message_t * +streaming_msg_create_msg(streaming_message_type_t type, htsmsg_t *msg) +{ + streaming_message_t *sm = streaming_msg_create(type); + sm->sm_data = msg; + return sm; +} - sm = malloc(sizeof(streaming_message_t)); - sm->sm_type = SMT_PACKET; - sm->sm_data = pkt; - st->st_cb(st->st_opaque, sm); + +/** + * + */ +streaming_message_t * +streaming_msg_create_code(streaming_message_type_t type, int code) +{ + streaming_message_t *sm = streaming_msg_create(type); + sm->sm_code = code; + return sm; +} + + + +/** + * + */ +streaming_message_t * +streaming_msg_clone(streaming_message_t *src) +{ + streaming_message_t *dst = malloc(sizeof(streaming_message_t)); + + dst->sm_type = src->sm_type; + + switch(src->sm_type) { + + case SMT_PACKET: + pkt_ref_inc(src->sm_data); + dst->sm_data = src->sm_data; + break; + + case SMT_START: + case SMT_STOP: + dst->sm_data = htsmsg_copy(src->sm_data); + break; + + case SMT_EXIT: + break; + + default: + abort(); } + return dst; } @@ -135,13 +179,24 @@ streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt) * */ void -streaming_message_free(streaming_message_t *sm) +streaming_msg_free(streaming_message_t *sm) { switch(sm->sm_type) { case SMT_PACKET: pkt_ref_dec(sm->sm_data); break; + case SMT_START: + case SMT_STOP: + htsmsg_destroy(sm->sm_data); + break; + + case SMT_EXIT: + break; + + case SMT_TRANSPORT_STATUS: + break; + default: abort(); } @@ -149,6 +204,27 @@ streaming_message_free(streaming_message_t *sm) } +/** + * + */ +void +streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm) +{ + streaming_target_t *st, *next; + + if(sp->sp_ntargets == 0) + return; + + for(st = LIST_FIRST(&sp->sp_targets);; st = next) { + + if((next = LIST_NEXT(st, st_link)) == NULL) + break; + + st->st_cb(st->st_opaque, streaming_msg_clone(sm)); + } + st->st_cb(st->st_opaque, sm); +} + /** * @@ -160,6 +236,6 @@ streaming_queue_clear(struct streaming_message_queue *q) while((sm = TAILQ_FIRST(q)) != NULL) { TAILQ_REMOVE(q, sm, sm_link); - streaming_message_free(sm); + streaming_msg_free(sm); } } diff --git a/src/streaming.h b/src/streaming.h index f57d51b2..3d5b1ecc 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -21,25 +21,40 @@ #include "tvhead.h" #include "packet.h" +#include "htsmsg.h" /** * */ -void streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex); +void streaming_pad_init(streaming_pad_t *sp); -void streaming_target_init2(streaming_target_t *st, - st_callback_t *cb, void *opaque); +void streaming_target_init(streaming_target_t *st, + st_callback_t *cb, void *opaque); -void streaming_queue_init(streaming_queue_t *st); - -void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st); - -void streaming_target_disconnect(streaming_target_t *st); - -void streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt); - -void streaming_message_free(streaming_message_t *sm); +void streaming_queue_init(streaming_queue_t *sq); void streaming_queue_clear(struct streaming_message_queue *q); +void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st); + +void streaming_target_disconnect(streaming_pad_t *sp, streaming_target_t *st); + +void streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm); + +void streaming_msg_free(streaming_message_t *sm); + +streaming_message_t *streaming_msg_clone(streaming_message_t *src); + +streaming_message_t *streaming_msg_create(streaming_message_type_t type); + +streaming_message_t *streaming_msg_create_msg(streaming_message_type_t type, + htsmsg_t *msg); + +streaming_message_t *streaming_msg_create_code(streaming_message_type_t type, + int code); + +streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt); + +#define streaming_target_deliver(st, sm) ((st)->st_cb((st)->st_opaque, (sm))) + #endif /* STREAMING_H_ */ diff --git a/src/subscriptions.c b/src/subscriptions.c index 3c189d40..e1b29075 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -34,11 +34,9 @@ #include "tvhead.h" #include "transports.h" #include "subscriptions.h" +#include "streaming.h" struct th_subscription_list subscriptions; -static pthread_cond_t subscription_janitor_cond; -static pthread_mutex_t subscription_janitor_mutex; -static int subscription_janitor_work; static gtimer_t subscription_reschedule_timer; @@ -48,21 +46,62 @@ subscription_sort(th_subscription_t *a, th_subscription_t *b) return b->ths_weight - a->ths_weight; } + /** - * + * The transport is producing output. Thus, we may link our subscription + * to it. */ static void subscription_link_transport(th_subscription_t *s, th_transport_t *t) { + streaming_message_t *sm; + s->ths_transport = t; LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link); - s->ths_event_callback(s, SUBSCRIPTION_TRANSPORT_RUN, s->ths_opaque); - - s->ths_last_status = t->tht_last_status; - if(s->ths_last_status != SUBSCRIPTION_EVENT_INVALID) - s->ths_event_callback(s, s->ths_last_status, s->ths_opaque); + + + pthread_mutex_lock(&t->tht_stream_mutex); + + // Link to transport output + streaming_target_connect(&t->tht_streaming_pad, &s->ths_input); + + // Send a START message to the subscription client + sm = streaming_msg_create_msg(SMT_START, transport_build_stream_msg(t)); + streaming_target_deliver(s->ths_output, sm); + + // Send a TRANSPORT_STATUS message to the subscription client + sm = streaming_msg_create_code(SMT_TRANSPORT_STATUS, t->tht_feed_status); + streaming_target_deliver(s->ths_output, sm); + + pthread_mutex_unlock(&t->tht_stream_mutex); } + +/** + * Called from transport code + */ +void +subscription_unlink_transport(th_subscription_t *s) +{ + streaming_message_t *sm; + th_transport_t *t = s->ths_transport; + + pthread_mutex_lock(&t->tht_stream_mutex); + + // Unlink from transport output + streaming_target_disconnect(&t->tht_streaming_pad, &s->ths_input); + + // Send a STOP message to the subscription client + sm = streaming_msg_create_msg(SMT_STOP, htsmsg_create_map()); + streaming_target_deliver(s->ths_output, sm); + + pthread_mutex_unlock(&t->tht_stream_mutex); + + LIST_REMOVE(s, ths_transport_link); + s->ths_transport = NULL; +} + + /** * */ @@ -71,6 +110,7 @@ subscription_reschedule(void *aux) { th_subscription_t *s; th_transport_t *t; + streaming_message_t *sm; lock_assert(&global_lock); @@ -87,8 +127,9 @@ subscription_reschedule(void *aux) if(t == NULL) { /* No transport available */ - s->ths_event_callback(s, SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE, - s->ths_opaque); + + sm = streaming_msg_create(SMT_NOSOURCE); + streaming_target_deliver(s->ths_output, sm); continue; } @@ -120,20 +161,35 @@ subscription_unsubscribe(th_subscription_t *s) subscription_reschedule(NULL); } + +/** + * This callback is invoked when we receive data and status updates from + * the currently bound transport + */ +static void +subscription_input(void *opauqe, streaming_message_t *sm) +{ + th_subscription_t *s = opauqe; + + streaming_target_deliver(s->ths_output, sm); +} + + + /** * */ static th_subscription_t * -subscription_create(int weight, const char *name, - ths_event_callback_t *cb, void *opaque) +subscription_create(int weight, const char *name, streaming_target_t *st) { th_subscription_t *s = calloc(1, sizeof(th_subscription_t)); - s->ths_weight = weight; - s->ths_event_callback = cb; - s->ths_opaque = opaque; - s->ths_title = strdup(name); - s->ths_total_err = 0; + streaming_target_init(&s->ths_input, subscription_input, s); + + s->ths_weight = weight; + s->ths_title = strdup(name); + s->ths_total_err = 0; + s->ths_output = st; time(&s->ths_start); LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort); @@ -146,14 +202,10 @@ subscription_create(int weight, const char *name, * */ th_subscription_t * -subscription_create_from_channel(channel_t *ch, - unsigned int weight, const char *name, - ths_event_callback_t *cb, void *opaque, - uint32_t u32) +subscription_create_from_channel(channel_t *ch, unsigned int weight, + const char *name, streaming_target_t *st) { - th_subscription_t *s = subscription_create(weight, name, cb, opaque); - - s->ths_u32 = u32; + th_subscription_t *s = subscription_create(weight, name, st); s->ths_channel = ch; LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link); @@ -176,86 +228,13 @@ subscription_create_from_channel(channel_t *ch, */ th_subscription_t * subscription_create_from_transport(th_transport_t *t, const char *name, - ths_event_callback_t *cb, void *opaque) + streaming_target_t *st) { - th_subscription_t *s = subscription_create(INT32_MAX, name, cb, opaque); + th_subscription_t *s = subscription_create(INT32_MAX, name, st); - if(t->tht_runstatus != TRANSPORT_RUNNING) + if(t->tht_status != TRANSPORT_RUNNING) transport_start(t, INT32_MAX, 1); subscription_link_transport(s, t); return s; } - - -/** - * - */ -void -subscription_janitor_has_duty(void) -{ - pthread_mutex_lock(&subscription_janitor_mutex); - subscription_janitor_work++; - pthread_cond_signal(&subscription_janitor_cond); - pthread_mutex_unlock(&subscription_janitor_mutex); -} - - - -/** - * - */ -static void * -subscription_janitor(void *aux) -{ - int v; - th_subscription_t *s; - th_transport_t *t; - - pthread_mutex_lock(&subscription_janitor_mutex); - - v = subscription_janitor_work; - - while(1) { - - while(v == subscription_janitor_work) - pthread_cond_wait(&subscription_janitor_cond, - &subscription_janitor_mutex); - - v = subscription_janitor_work; - pthread_mutex_unlock(&subscription_janitor_mutex); - - pthread_mutex_lock(&global_lock); - - LIST_FOREACH(s, &subscriptions, ths_global_link) { - if((t = s->ths_transport) == NULL) - continue; - - if(s->ths_last_status != t->tht_last_status) { - s->ths_last_status = t->tht_last_status; - s->ths_event_callback(s, s->ths_last_status, s->ths_opaque); - } - } - - pthread_mutex_unlock(&global_lock); - } - return NULL; -} - - - - -/** - * - */ -void -subscriptions_init(void) -{ - pthread_t ptid; - - pthread_cond_init(&subscription_janitor_cond, NULL); - pthread_mutex_init(&subscription_janitor_mutex, NULL); - - pthread_create(&ptid, NULL, subscription_janitor, NULL); -} - diff --git a/src/subscriptions.h b/src/subscriptions.h index 07a81603..b0ed51e2 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -19,15 +19,6 @@ #ifndef SUBSCRIPTIONS_H #define SUBSCRIPTIONS_H -typedef void (ths_event_callback_t)(struct th_subscription *s, - subscription_event_t event, - void *opaque); - -typedef void (ths_raw_input_t)(struct th_subscription *s, - void *data, int len, - th_stream_t *st, - void *opaque); - typedef struct th_subscription { LIST_ENTRY(th_subscription) ths_global_link; int ths_weight; @@ -41,24 +32,15 @@ typedef struct th_subscription { struct th_transport *ths_transport; /* if NULL, ths_transport_link is not linked */ - LIST_ENTRY(th_subscription) ths_subscriber_link; /* Caller is responsible - for this link */ - - void *ths_opaque; - uint32_t ths_u32; - char *ths_title; /* display title */ time_t ths_start; /* time when subscription started */ int ths_total_err; /* total errors during entire subscription */ - int ths_last_status; - - ths_event_callback_t *ths_event_callback; - ths_raw_input_t *ths_raw_input; - int ths_force_demux; - streaming_target_t *ths_st; + streaming_target_t ths_input; + + streaming_target_t *ths_output; } th_subscription_t; @@ -73,20 +55,15 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight); th_subscription_t *subscription_create_from_channel(channel_t *ch, unsigned int weight, const char *name, - ths_event_callback_t *cb, - void *opaque, - uint32_t u32); + streaming_target_t *st); th_subscription_t *subscription_create_from_transport(th_transport_t *t, const char *name, - ths_event_callback_t *cb, - void *opaque); - -void subscriptions_init(void); + streaming_target_t *st); void subscription_stop(th_subscription_t *s); -void subscription_janitor_has_duty(void); +void subscription_unlink_transport(th_subscription_t *s); #endif /* SUBSCRIPTIONS_H */ diff --git a/src/transports.c b/src/transports.c index 97fd54b3..2257c582 100644 --- a/src/transports.c +++ b/src/transports.c @@ -43,6 +43,8 @@ #include "cwc.h" #include "notify.h" #include "serviceprobe.h" +#include "atomic.h" + #define TRANSPORT_HASH_WIDTH 101 @@ -50,11 +52,6 @@ static struct th_transport_list transporthash[TRANSPORT_HASH_WIDTH]; static void transport_data_timeout(void *aux); -//static dtimer_t transport_monitor_timer; - -//static const char *transport_settings_path(th_transport_t *t); -//static void transport_monitor(void *aux, int64_t now); - /** * Transport lock must be held @@ -62,7 +59,6 @@ static void transport_data_timeout(void *aux); static void transport_stop(th_transport_t *t) { - streaming_pad_t *sp = &t->tht_streaming_pad; th_descrambler_t *td; th_stream_t *st; @@ -85,7 +81,7 @@ transport_stop(th_transport_t *t) /** * Clean up each stream */ - LIST_FOREACH(st, &sp->sp_components, st_link) { + LIST_FOREACH(st, &t->tht_components, st_link) { if(st->st_parser != NULL) av_parser_close(st->st_parser); @@ -127,16 +123,6 @@ transport_stop(th_transport_t *t) pthread_mutex_unlock(&t->tht_stream_mutex); } -/** - * - */ -static void -remove_subscriber(th_subscription_t *s, subscription_event_t reason) -{ - s->ths_event_callback(s, reason, s->ths_opaque); - LIST_REMOVE(s, ths_transport_link); - s->ths_transport = NULL; -} /** * Remove the given subscriber from the transport @@ -151,10 +137,11 @@ transport_remove_subscriber(th_transport_t *t, th_subscription_t *s) lock_assert(&global_lock); if(s == NULL) { - while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL) - remove_subscriber(s, SUBSCRIPTION_TRANSPORT_LOST); + while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL) { + subscription_unlink_transport(s); + } } else { - remove_subscriber(s, SUBSCRIPTION_DESTROYED); + subscription_unlink_transport(s); } if(LIST_FIRST(&t->tht_subscriptions) == NULL) @@ -209,14 +196,13 @@ transport_unlink_muxer(th_muxer_t *tm) int transport_start(th_transport_t *t, unsigned int weight, int force_start) { - streaming_pad_t *sp = &t->tht_streaming_pad; th_stream_t *st; AVCodec *c; enum CodecID id; lock_assert(&global_lock); - assert(t->tht_runstatus != TRANSPORT_RUNNING); + assert(t->tht_status != TRANSPORT_RUNNING); if(t->tht_start_feed(t, weight, TRANSPORT_RUNNING, force_start)) return -1; @@ -227,7 +213,7 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start) /** * Initialize stream */ - LIST_FOREACH(st, &sp->sp_components, st_link) { + LIST_FOREACH(st, &t->tht_components, st_link) { st->st_startcond = 0xffffffff; st->st_curdts = AV_NOPTS_VALUE; st->st_curpts = AV_NOPTS_VALUE; @@ -268,9 +254,9 @@ transport_start(th_transport_t *t, unsigned int weight, int force_start) cwc_transport_start(t); - t->tht_packets = 0; gtimer_arm(&t->tht_receive_timer, transport_data_timeout, t, 4); - t->tht_last_status = SUBSCRIPTION_EVENT_INVALID; + t->tht_feed_status = TRANSPORT_FEED_UNKNOWN; + t->tht_input_status = TRANSPORT_FEED_NO_INPUT; return 0; } @@ -374,7 +360,7 @@ transport_find(channel_t *ch, unsigned int weight) for(i = 0; i < cnt; i++) { t = vec[i]; - if(t->tht_runstatus == TRANSPORT_RUNNING) + if(t->tht_status == TRANSPORT_RUNNING) return t; if(!transport_start(t, 0, 0)) @@ -415,17 +401,27 @@ transport_compute_weight(struct th_transport_list *head) } -#if 0 /** - * Timer that fires if transport is not receiving any data + * */ -static void -transport_data_timeout(void *aux, int64_t now) +void +transport_unref(th_transport_t *t) { - th_transport_t *t = aux; - transport_signal_status(t, TRANSPORT_STATUS_NO_INPUT); + if((atomic_add(&t->tht_refcount, -1)) == 1) + free(t); } -#endif + + +/** + * + */ +void +transport_ref(th_transport_t *t) +{ + atomic_add(&t->tht_refcount, 1); +} + + /** * Destroy a transport @@ -433,14 +429,14 @@ transport_data_timeout(void *aux, int64_t now) void transport_destroy(th_transport_t *t) { - streaming_pad_t *sp = &t->tht_streaming_pad; th_stream_t *st; th_subscription_t *s; lock_assert(&global_lock); - while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL) - remove_subscriber(s, SUBSCRIPTION_TRANSPORT_LOST); + while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL) { + subscription_unlink_transport(s); + } //dtimer_disarm(&t->tht_receive_timer); @@ -454,23 +450,23 @@ transport_destroy(th_transport_t *t) LIST_REMOVE(t, tht_mux_link); LIST_REMOVE(t, tht_hash_link); - if(t->tht_runstatus != TRANSPORT_IDLE) + if(t->tht_status != TRANSPORT_IDLE) transport_stop(t); + t->tht_status = TRANSPORT_ZOMBIE; free(t->tht_identifier); free(t->tht_svcname); free(t->tht_chname); free(t->tht_provider); - while((st = LIST_FIRST(&sp->sp_components)) != NULL) { + while((st = LIST_FIRST(&t->tht_components)) != NULL) { LIST_REMOVE(st, st_link); free(st); } abort();// serviceprobe_delete(t); - - free(t); + transport_unref(t); } @@ -489,8 +485,9 @@ transport_create(const char *identifier, int type, int source_type) t->tht_identifier = strdup(identifier); t->tht_type = type; t->tht_source_type = source_type; + t->tht_refcount = 1; - streaming_pad_init(&t->tht_streaming_pad, &t->tht_stream_mutex); + streaming_pad_init(&t->tht_streaming_pad); LIST_INSERT_HEAD(&transporthash[hash], t, tht_hash_link); return t; @@ -524,13 +521,12 @@ th_stream_t * transport_add_stream(th_transport_t *t, int pid, streaming_component_type_t type) { - streaming_pad_t *sp = &t->tht_streaming_pad; th_stream_t *st; int i = 0; lock_assert(&t->tht_stream_mutex); - LIST_FOREACH(st, &sp->sp_components, st_link) { + LIST_FOREACH(st, &t->tht_components, st_link) { i++; if(pid != -1 && st->st_pid == pid) return st; @@ -539,7 +535,7 @@ transport_add_stream(th_transport_t *t, int pid, st = calloc(1, sizeof(th_stream_t)); st->st_index = i; st->st_type = type; - LIST_INSERT_HEAD(&sp->sp_components, st, st_link); + LIST_INSERT_HEAD(&t->tht_components, st, st_link); st->st_pid = pid; st->st_demuxer_fd = -1; @@ -603,11 +599,12 @@ transport_data_timeout(void *aux) { th_transport_t *t = aux; - if(t->tht_last_status) - return; /* Something has happend so we don't have to update */ - - transport_signal_status(t, t->tht_packets ? SUBSCRIPTION_RAW_INPUT : - SUBSCRIPTION_NO_INPUT); + pthread_mutex_lock(&t->tht_stream_mutex); + + if(t->tht_feed_status == TRANSPORT_FEED_UNKNOWN) + transport_set_feed_status(t, t->tht_input_status); + + pthread_mutex_unlock(&t->tht_stream_mutex); } @@ -647,21 +644,25 @@ transport_is_tv(th_transport_t *t) int transport_is_available(th_transport_t *t) { - return transport_servicetype_txt(t) && - LIST_FIRST(&t->tht_streaming_pad.sp_components); + return transport_servicetype_txt(t) && LIST_FIRST(&t->tht_components); } /** * */ void -transport_signal_status(th_transport_t *t, int newstatus) +transport_set_feed_status(th_transport_t *t, transport_feed_status_t newstatus) { - if(t->tht_last_status == newstatus) + lock_assert(&t->tht_stream_mutex); + + if(t->tht_feed_status == newstatus) return; - t->tht_last_status = newstatus; - subscription_janitor_has_duty(); + t->tht_feed_status = newstatus; + + streaming_pad_deliver(&t->tht_streaming_pad, + streaming_msg_create_code(SMT_TRANSPORT_STATUS, + newstatus)); } @@ -669,10 +670,10 @@ transport_signal_status(th_transport_t *t, int newstatus) * Table for status -> text conversion */ static struct strtab transportstatustab[] = { - { "Ok", SUBSCRIPTION_VALID_PACKETS }, - { "No input", SUBSCRIPTION_NO_INPUT }, - { "No descrambler", SUBSCRIPTION_NO_DESCRAMBLER }, - { "No access", SUBSCRIPTION_NO_ACCESS }, + { "Ok", TRANSPORT_FEED_VALID_PACKETS }, + { "No input", TRANSPORT_FEED_NO_INPUT }, + { "No descrambler", TRANSPORT_FEED_NO_DESCRAMBLER }, + { "No access", TRANSPORT_FEED_NO_ACCESS }, }; @@ -682,3 +683,32 @@ transport_status_to_text(int status) return val2str(status, transportstatustab) ?: "Unknown"; } + +/** + * Generate a message containing info about all components + * + * Note: This is the same as the one in HTSP.subscriptionStart so take + * great care if you change anying. (Just adding is fine) + */ +htsmsg_t * +transport_build_stream_msg(th_transport_t *t) +{ + htsmsg_t *streams, *c; + th_stream_t *st; + + lock_assert(&t->tht_stream_mutex); + + /* Setup each stream */ + streams = htsmsg_create_list(); + LIST_FOREACH(st, &t->tht_components, st_link) { + c = htsmsg_create_map(); + htsmsg_add_u32(c, "index", st->st_index); + htsmsg_add_str(c, "type", streaming_component_type2txt(st->st_type)); + if(st->st_lang[0]) + htsmsg_add_str(c, "language", st->st_lang); + htsmsg_add_msg(streams, NULL, c); + } + return streams; +} + + diff --git a/src/transports.h b/src/transports.h index 1c73479d..9316e9c7 100644 --- a/src/transports.h +++ b/src/transports.h @@ -20,6 +20,7 @@ #define TRANSPORTS_H #include "channels.h" +#include "htsmsg.h" #include "subscriptions.h" unsigned int transport_compute_weight(struct th_transport_list *head); @@ -29,6 +30,10 @@ int transport_start(th_transport_t *t, unsigned int weight, int force_start); th_transport_t *transport_create(const char *identifier, int type, int source_type); +void transport_unref(th_transport_t *t); + +void transport_ref(th_transport_t *t); + th_transport_t *transport_find_by_identifier(const char *identifier); void transport_map_channel(th_transport_t *t, channel_t *ch); @@ -52,7 +57,8 @@ int transport_is_available(th_transport_t *t); void transport_destroy(th_transport_t *t); -void transport_signal_status(th_transport_t *t, int newstatus); +void transport_set_feed_status(th_transport_t *t, + transport_feed_status_t newstatus); const char *transport_status_to_text(int status); @@ -65,16 +71,16 @@ void transport_remove_subscriber(th_transport_t *t, th_subscription_t *s); static inline th_stream_t * transport_find_stream_by_pid(th_transport_t *t, int pid) { - streaming_pad_t *sp = &t->tht_streaming_pad; th_stream_t *st; - LIST_FOREACH(st, &sp->sp_components, st_link) { + LIST_FOREACH(st, &t->tht_components, st_link) { if(st->st_pid == pid) return st; } return NULL; } +htsmsg_t *transport_build_stream_msg(th_transport_t *t); #endif /* TRANSPORTS_H */ diff --git a/src/tsdemux.c b/src/tsdemux.c index 5ae59b42..ac4d5eb0 100644 --- a/src/tsdemux.c +++ b/src/tsdemux.c @@ -71,13 +71,8 @@ got_section(th_transport_t *t, th_stream_t *st) static void ts_recv_packet0(th_transport_t *t, th_stream_t *st, uint8_t *tsb) { - th_subscription_t *s; int off, len, pusi, cc, err = 0; - LIST_FOREACH(s, &t->tht_subscriptions, ths_transport_link) - if(s->ths_raw_input != NULL) - s->ths_raw_input(s, tsb, 188, st, s->ths_opaque); - /* Check CC */ if(tsb[3] & 0x10) { @@ -194,6 +189,8 @@ ts_recv_packet1(th_transport_t *t, uint8_t *tsb) int pid, n, m, r; th_descrambler_t *td; + t->tht_input_status = TRANSPORT_FEED_NO_DEMUXED_INPUT; + if(tsb[1] & 0x80) return; /* Transport Error Indicator */ @@ -201,7 +198,7 @@ ts_recv_packet1(th_transport_t *t, uint8_t *tsb) if((st = transport_find_stream_by_pid(t, pid)) == NULL) return; - t->tht_packets = 1; + t->tht_input_status = TRANSPORT_FEED_RAW_INPUT; pthread_mutex_lock(&t->tht_stream_mutex); @@ -230,9 +227,9 @@ ts_recv_packet1(th_transport_t *t, uint8_t *tsb) } if(n == 0) { - transport_signal_status(t, SUBSCRIPTION_NO_DESCRAMBLER); + transport_set_feed_status(t, TRANSPORT_FEED_NO_DESCRAMBLER); } else if(m == n) { - transport_signal_status(t, SUBSCRIPTION_NO_ACCESS); + transport_set_feed_status(t, TRANSPORT_FEED_NO_ACCESS); } } else { ts_recv_packet0(t, st, tsb); diff --git a/src/tvhead.h b/src/tvhead.h index 2a632225..ad21e5e7 100644 --- a/src/tvhead.h +++ b/src/tvhead.h @@ -137,12 +137,6 @@ typedef enum { typedef struct streaming_pad { struct streaming_target_list sp_targets; int sp_ntargets; - struct th_stream_list sp_components; - - pthread_mutex_t *sp_mutex; /* Mutex for protecting modification of - st_targets and delivery. - This needs to be created elsewhere. - The mutex also protect sp_comonents */ } streaming_pad_t; @@ -151,9 +145,13 @@ TAILQ_HEAD(streaming_message_queue, streaming_message); /** * Streaming messages types */ - typedef enum { - SMT_PACKET, + SMT_PACKET, // sm_data is a th_pkt. Unref when destroying msg + SMT_START, // sm_data is a htsmsg, see transport_build_stream_msg() + SMT_STOP, // sm_data is a htsmsg + SMT_TRANSPORT_STATUS, // sm_code is TRANSPORT_STATUS_ + SMT_EXIT, // Used to signal exit to threads + SMT_NOSOURCE, } streaming_message_type_t; @@ -163,8 +161,10 @@ typedef enum { typedef struct streaming_message { TAILQ_ENTRY(streaming_message) sm_link; streaming_message_type_t sm_type; - void *sm_data; - + union { + void *sm_data; + int sm_code; + }; } streaming_message_t; /** @@ -194,13 +194,6 @@ typedef struct streaming_queue { packets */ struct streaming_message_queue sq_queue; - - enum { - SQ_IDLE, - SQ_RUNNING, - SQ_STOP_REQ, - SQ_ZOMBIE, - } sq_status; } streaming_queue_t; @@ -320,46 +313,33 @@ typedef struct th_stream { } th_stream_t; - /** - * Transport events, these are sent to subscribers via - * s->ths_event_callback + * */ typedef enum { - SUBSCRIPTION_EVENT_INVALID = 0, /* mbz */ + /** No status known */ + TRANSPORT_FEED_UNKNOWN, - /** Transport is receiving data from source */ - SUBSCRIPTION_TRANSPORT_RUN, + /** No input is received from source at all */ + TRANSPORT_FEED_NO_INPUT, - /** No input is received from source */ - SUBSCRIPTION_NO_INPUT, - - /** No descrambler is able to decrypt the stream */ - SUBSCRIPTION_NO_DESCRAMBLER, - - /** Potential descrambler is available, but access is denied */ - SUBSCRIPTION_NO_ACCESS, + /** No input is received from source destined for this transport */ + TRANSPORT_FEED_NO_DEMUXED_INPUT, /** Raw input seen but nothing has really been decoded */ - SUBSCRIPTION_RAW_INPUT, + TRANSPORT_FEED_RAW_INPUT, - /** Packet are being parsed. Only signalled if at least one muxer is - registerd */ - SUBSCRIPTION_VALID_PACKETS, + /** No descrambler is able to decrypt the stream */ + TRANSPORT_FEED_NO_DESCRAMBLER, - /** No transport is available for delivering subscription */ - SUBSCRIPTION_TRANSPORT_NOT_AVAILABLE, + /** Potential descrambler is available, but access is denied */ + TRANSPORT_FEED_NO_ACCESS, - /** Transport no longer runs, it was needed by someone with higher - priority */ - SUBSCRIPTION_TRANSPORT_LOST, - - /** Subscription destroyed */ - SUBSCRIPTION_DESTROYED, - -} subscription_event_t; + /** Packet are being parsed. */ + TRANSPORT_FEED_VALID_PACKETS, +} transport_feed_status_t; /** @@ -380,10 +360,37 @@ typedef struct th_transport { } tht_type; enum { + /** + * Transport is idle. + */ TRANSPORT_IDLE, + + /** + * Transport producing output + */ TRANSPORT_RUNNING, - TRANSPORT_PROBING, - } tht_runstatus; + + /** + * Destroyed, but pointer is till valid. + * This would be the case if transport_destroy() did not actually free + * the transport because there are references held to it. + * + * Reference counts can be used so that code can hold a pointer to + * a transport without having the global lock. + * + * Note: No fields in the transport may be accessed without the + * global lock held. Thus, the global_lock must be reaquired and + * then tht_status must be checked. If it is ZOMBIE the code must + * just drop the refcount and pretend that the transport never + * was there in the first place. + */ + TRANSPORT_ZOMBIE, + } tht_status; + + /** + * Refcount, operated using atomic.h ops. + */ + int tht_refcount; /** * Source type is used to determine if an output requesting @@ -485,11 +492,6 @@ typedef struct th_transport { struct channel *tht_ch; char *tht_chname; - /** - * Last known status (or error) - */ - int tht_last_status; - /** * Service probe, see serviceprobe.c for details */ @@ -499,15 +501,10 @@ typedef struct th_transport { /** * Timer which is armed at transport start. Once it fires * it will check if any packets has been parsed. If not the status - * will be set to SUBSCRIPTION_NO_INPUT + * will be set to TRANSPORT_STATUS_NO_INPUT */ gtimer_t tht_receive_timer; - /** - * Set as soon as we get some kind of activity - */ - int tht_packets; - /********************************************************* * * Streaming part of transport @@ -527,6 +524,16 @@ typedef struct th_transport { */ pthread_mutex_t tht_stream_mutex; + /** + * Last known data status (or error) + */ + transport_feed_status_t tht_feed_status; + + /** + * Set as soon as we get some kind of activity + */ + transport_feed_status_t tht_input_status; + /** * For simple streaming sources (such as video4linux) keeping @@ -572,10 +579,12 @@ typedef struct th_transport { int tht_pmt_seen; + struct th_stream_list tht_components; + + /** * Delivery pad, this is were we finally deliver all streaming output */ - streaming_pad_t tht_streaming_pad; diff --git a/src/webui/extjs.c b/src/webui/extjs.c index 9129ad9b..cd41eec5 100644 --- a/src/webui/extjs.c +++ b/src/webui/extjs.c @@ -565,7 +565,6 @@ extjs_dvbadapter(http_connection_t *hc, const char *remain, void *opaque) static htsmsg_t * build_transport_msg(th_transport_t *t) { - streaming_pad_t *sp = &t->tht_streaming_pad; htsmsg_t *r = htsmsg_create_map(); th_stream_t *st; @@ -581,14 +580,14 @@ build_transport_msg(th_transport_t *t) htsmsg_add_str(r, "network", t->tht_networkname(t)); htsmsg_add_str(r, "source", t->tht_sourcename(t)); - htsmsg_add_str(r, "status", transport_status_to_text(t->tht_last_status)); + htsmsg_add_str(r, "status", transport_status_to_text(t->tht_feed_status)); video[0] = 0; audio[0] = 0; subtitles[0] = 0; scrambling[0] = 0; - LIST_FOREACH(st, &sp->sp_components, st_link) { + LIST_FOREACH(st, &t->tht_components, st_link) { switch(st->st_type) { case SCT_TELETEXT: