diff --git a/src/dvb/dvb.c b/src/dvb/dvb.c index 4c764eba..9e3775b3 100644 --- a/src/dvb/dvb.c +++ b/src/dvb/dvb.c @@ -23,8 +23,8 @@ #include "dvb_charset.h" void -dvb_init(uint32_t adapter_mask) +dvb_init(uint32_t adapter_mask, const char *rawfile) { dvb_charset_init(); - dvb_adapter_init(adapter_mask); + dvb_adapter_init(adapter_mask, rawfile); } diff --git a/src/dvb/dvb.h b/src/dvb/dvb.h index 8d7929a9..2b0460d5 100644 --- a/src/dvb/dvb.h +++ b/src/dvb/dvb.h @@ -150,6 +150,8 @@ typedef struct th_dvb_mux_instance { TAILQ_HEAD(, epggrab_ota_mux) tdmi_epg_grab; + struct th_subscription_list tdmi_subscriptions; + } th_dvb_mux_instance_t; @@ -255,6 +257,10 @@ typedef struct th_dvb_adapter { int tda_rawmode; + // Full mux streaming, protected via the delivery mutex + + streaming_pad_t tda_streaming_pad; + struct dvb_table_feed_queue tda_table_feed; pthread_cond_t tda_table_feed_cond; // Bound to tda_delivery_mutex @@ -317,12 +323,12 @@ typedef struct th_dvb_table { extern struct th_dvb_adapter_queue dvb_adapters; extern struct th_dvb_mux_instance_tree dvb_muxes; -void dvb_init(uint32_t adapter_mask); +void dvb_init(uint32_t adapter_mask, const char *rawfile); /** * DVB Adapter */ -void dvb_adapter_init(uint32_t adapter_mask); +void dvb_adapter_init(uint32_t adapter_mask, const char *rawfile); void dvb_adapter_mux_scanner(void *aux); @@ -524,4 +530,14 @@ dvb_satconf_t *dvb_satconf_entry_find(th_dvb_adapter_t *tda, void dvb_lnb_get_frequencies(const char *id, int *f_low, int *f_hi, int *f_switch); + +/** + * Raw demux + */ +struct th_subscription; +struct th_subscription *dvb_subscription_create_from_tdmi(th_dvb_mux_instance_t *tdmi, + const char *name, + streaming_target_t *st); + #endif /* DVB_H_ */ + diff --git a/src/dvb/dvb_adapter.c b/src/dvb/dvb_adapter.c index c3ad82b7..b52df831 100644 --- a/src/dvb/dvb_adapter.c +++ b/src/dvb/dvb_adapter.c @@ -64,6 +64,7 @@ tda_alloc(void) TAILQ_INIT(&tda->tda_scan_queues[i]); TAILQ_INIT(&tda->tda_initial_scan_queue); TAILQ_INIT(&tda->tda_satconfs); + streaming_pad_init(&tda->tda_streaming_pad); return tda; } @@ -459,6 +460,53 @@ tda_add(int adapter_num) gtimer_arm(&tda->tda_mux_scanner_timer, dvb_adapter_mux_scanner, tda, 1); } + +/** + * + */ +static void +tda_add_from_file(const char *filename) +{ + int i, r; + th_dvb_adapter_t *tda; + char buf[400]; + + tda = tda_alloc(); + + tda->tda_adapter_num = -1; + tda->tda_fe_fd = -1; + tda->tda_dvr_pipe[0] = -1; + + tda->tda_type = -1; + + snprintf(buf, sizeof(buf), "%s", filename); + + r = strlen(buf); + for(i = 0; i < r; i++) + if(!isalnum((int)buf[i])) + buf[i] = '_'; + + tda->tda_identifier = strdup(buf); + + tda->tda_autodiscovery = 0; + tda->tda_idlescan = 0; + + tda->tda_sat = 0; + + /* Come up with an initial displayname, user can change it and it will + be overridden by any stored settings later on */ + + tda->tda_displayname = strdup(filename); + + TAILQ_INSERT_TAIL(&dvb_adapters, tda, tda_global_link); + + dvb_input_raw_setup(tda); +} + + +/** + * + */ void dvb_adapter_start ( th_dvb_adapter_t *tda ) { @@ -515,7 +563,7 @@ dvb_adapter_stop ( th_dvb_adapter_t *tda ) * */ void -dvb_adapter_init(uint32_t adapter_mask) +dvb_adapter_init(uint32_t adapter_mask, const char *rawfile) { htsmsg_t *l, *c; htsmsg_field_t *f; @@ -529,6 +577,10 @@ dvb_adapter_init(uint32_t adapter_mask) if ((1 << i) & adapter_mask) tda_add(i); + if(rawfile) + tda_add_from_file(rawfile); + + l = hts_settings_load("dvbadapters"); if(l != NULL) { HTSMSG_FOREACH(f, l) { @@ -605,6 +657,10 @@ dvb_adapter_mux_scanner(void *aux) if(service_compute_weight(&tda->tda_transports) > 0) return; + if(tda->tda_mux_current != NULL && + LIST_FIRST(&tda->tda_mux_current->tdmi_subscriptions) != NULL) + return; // Someone is doing full mux dump + /* Check if we have muxes pending for quickscan, if so, choose them */ if((tdmi = TAILQ_FIRST(&tda->tda_initial_scan_queue)) != NULL) { dvb_fe_tune(tdmi, "Initial autoscan"); @@ -804,7 +860,18 @@ dvb_adapter_input_dvr(void *aux) /* sync */ if (tsb[i] == 0x47) { - + + if(LIST_FIRST(&tda->tda_streaming_pad.sp_targets) != NULL) { + streaming_message_t sm; + pktbuf_t *pb = pktbuf_alloc(tsb, 188); + memset(&sm, 0, sizeof(sm)); + sm.sm_type = SMT_MPEGTS; + sm.sm_data = pb; + streaming_pad_deliver(&tda->tda_streaming_pad, &sm); + pktbuf_ref_dec(pb); + } + + if(!(tsb[i+1] & 0x80)) { // Only dispatch to table parser if not error int pid = (tsb[i+1] & 0x1f) << 8 | tsb[i+2]; if(tda->tda_table_filter[pid]) { diff --git a/src/dvb/dvb_multiplex.c b/src/dvb/dvb_multiplex.c index 46ea57ef..1ac5caf1 100644 --- a/src/dvb/dvb_multiplex.c +++ b/src/dvb/dvb_multiplex.c @@ -1285,3 +1285,32 @@ th_dvb_mux_instance_t *dvb_mux_find } return NULL; } + + +/** + * + */ +th_subscription_t * +dvb_subscription_create_from_tdmi(th_dvb_mux_instance_t *tdmi, + const char *name, + streaming_target_t *st) +{ + th_subscription_t *s; + th_dvb_adapter_t *tda = tdmi->tdmi_adapter; + + s = subscription_create(INT32_MAX, name, st, SUBSCRIPTION_RAW_MPEGTS, + NULL, NULL, NULL, NULL); + + + s->ths_tdmi = tdmi; + LIST_INSERT_HEAD(&tdmi->tdmi_subscriptions, s, ths_tdmi_link); + + dvb_fe_tune(tdmi, "Full mux subscription"); + + pthread_mutex_lock(&tda->tda_delivery_mutex); + streaming_target_connect(&tda->tda_streaming_pad, &s->ths_input); + pthread_mutex_unlock(&tda->tda_delivery_mutex); + + notify_reload("subscriptions"); + return s; +} diff --git a/src/dvb/dvb_service.c b/src/dvb/dvb_service.c index 32877f8e..82621227 100644 --- a/src/dvb/dvb_service.c +++ b/src/dvb/dvb_service.c @@ -79,7 +79,10 @@ dvb_service_start(service_t *t, unsigned int weight, int force_start) if(w && w >= weight && !force_start) /* We are outranked by weight, cant use it */ return SM_CODE_NOT_FREE; - + + if(LIST_FIRST(&tdmi->tdmi_subscriptions) != NULL) + return SM_CODE_NOT_FREE; + dvb_adapter_clean(tda); } diff --git a/src/main.c b/src/main.c index 9cca79fd..263bb6ce 100644 --- a/src/main.c +++ b/src/main.c @@ -262,6 +262,7 @@ main(int argc, char **argv) sigset_t set; const char *homedir; const char *rawts_input = NULL; + const char *dvb_rawts_input = NULL; const char *join_transport = NULL; const char *confpath = NULL; char *p, *endp; @@ -279,7 +280,7 @@ main(int argc, char **argv) // make sure the timezone is set tzset(); - while((c = getopt(argc, argv, "Aa:fp:u:g:c:Chdr:j:sw:e:E:")) != -1) { + while((c = getopt(argc, argv, "Aa:fp:u:g:c:Chdr:j:sw:e:E:R:")) != -1) { switch(c) { case 'a': adapter_mask = 0x0; @@ -340,6 +341,9 @@ main(int argc, char **argv) case 'r': rawts_input = optarg; break; + case 'R': + dvb_rawts_input = optarg; + break; case 'j': join_transport = optarg; break; @@ -421,7 +425,7 @@ main(int argc, char **argv) tcp_server_init(); #if ENABLE_LINUXDVB - dvb_init(adapter_mask); + dvb_init(adapter_mask, dvb_rawts_input); #endif iptv_input_init(); #if ENABLE_V4L diff --git a/src/subscriptions.c b/src/subscriptions.c index 1bd429db..1cae0a4f 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -39,6 +39,7 @@ #include "htsmsg.h" #include "notify.h" #include "atomic.h" +#include "dvb/dvb.h" struct th_subscription_list subscriptions; static gtimer_t subscription_reschedule_timer; @@ -215,6 +216,14 @@ subscription_unsubscribe(th_subscription_t *s) if(t != NULL) service_remove_subscriber(t, s, SM_CODE_OK); + if(s->ths_tdmi != NULL) { + LIST_REMOVE(s, ths_tdmi_link); + th_dvb_adapter_t *tda = s->ths_tdmi->tdmi_adapter; + pthread_mutex_lock(&tda->tda_delivery_mutex); + streaming_target_disconnect(&tda->tda_streaming_pad, &s->ths_input); + pthread_mutex_unlock(&tda->tda_delivery_mutex); + } + if(s->ths_start_message != NULL) streaming_msg_free(s->ths_start_message); @@ -302,9 +311,9 @@ subscription_input_direct(void *opauqe, streaming_message_t *sm) /** * */ -static th_subscription_t * +th_subscription_t * subscription_create(int weight, const char *name, streaming_target_t *st, - int flags, int direct, const char *hostname, + int flags, st_callback_t *cb, const char *hostname, const char *username, const char *client) { th_subscription_t *s = calloc(1, sizeof(th_subscription_t)); @@ -316,9 +325,8 @@ subscription_create(int weight, const char *name, streaming_target_t *st, else reject |= SMT_TO_MASK(SMT_MPEGTS); // Reject raw mpegts - - streaming_target_init(&s->ths_input, direct ? subscription_input_direct : - subscription_input, s, reject); + streaming_target_init(&s->ths_input, + cb ?: subscription_input_direct, s, reject); s->ths_weight = weight; s->ths_title = strdup(name); @@ -348,8 +356,10 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight, int flags, const char *hostname, const char *username, const char *client) { - th_subscription_t *s = subscription_create(weight, name, st, flags, 0, - hostname, username, client); + th_subscription_t *s; + + s = subscription_create(weight, name, st, flags, subscription_input, + hostname, username, client); s->ths_channel = ch; LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link); @@ -391,13 +401,16 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight, */ th_subscription_t * subscription_create_from_service(service_t *t, const char *name, - streaming_target_t *st, int flags) + streaming_target_t *st, int flags) { - th_subscription_t *s = subscription_create(INT32_MAX, name, st, flags, 1, - NULL, NULL, NULL); + th_subscription_t *s; source_info_t si; int r; + s = subscription_create(INT32_MAX, name, st, flags, + subscription_input_direct, + NULL, NULL, NULL); + if(t->s_status != SERVICE_RUNNING) { if((r = service_start(t, INT32_MAX, 1)) != 0) { subscription_unsubscribe(s); diff --git a/src/subscriptions.h b/src/subscriptions.h index ff5d1d1d..0a88e5f5 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -65,6 +65,10 @@ typedef struct th_subscription { char *ths_username; char *ths_client; + // Ugly ugly ugly to refer DVB code here + + LIST_ENTRY(th_subscription) ths_tdmi_link; + struct th_dvb_mux_instance *ths_tdmi; } th_subscription_t; @@ -95,9 +99,14 @@ th_subscription_t *subscription_create_from_service(struct service *t, streaming_target_t *st, int flags); -void subscription_change_weight(th_subscription_t *s, int weight); +th_subscription_t *subscription_create(int weight, const char *name, + streaming_target_t *st, + int flags, st_callback_t *cb, + const char *hostname, + const char *username, + const char *client); -void subscription_stop(th_subscription_t *s); +void subscription_change_weight(th_subscription_t *s, int weight); void subscription_unlink_service(th_subscription_t *s, int reason); diff --git a/src/webui/webui.c b/src/webui/webui.c index 26ac8c63..05e85df9 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -42,6 +42,8 @@ #include "plumbing/globalheaders.h" #include "epg.h" #include "muxer.h" +#include "dvb/dvb.h" +#include "dvb/dvb_support.h" /** * @@ -264,6 +266,76 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq, } + +/** + * HTTP stream loop + */ +static void +http_stream_run2(http_connection_t *hc, streaming_queue_t *sq) +{ + streaming_message_t *sm; + int run = 1; + int timeouts = 0; + struct timespec ts; + struct timeval tp; + int err = 0; + socklen_t errlen = sizeof(err); + + /* reduce timeout on write() for streaming */ + tp.tv_sec = 5; + tp.tv_usec = 0; + setsockopt(hc->hc_fd, SOL_SOCKET, SO_SNDTIMEO, &tp, sizeof(tp)); + http_output_content(hc, "application/octet-stream"); + + while(run) { + pthread_mutex_lock(&sq->sq_mutex); + sm = TAILQ_FIRST(&sq->sq_queue); + if(sm == NULL) { + gettimeofday(&tp, NULL); + ts.tv_sec = tp.tv_sec + 1; + ts.tv_nsec = tp.tv_usec * 1000; + + if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) { + timeouts++; + + //Check socket status + getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); + if(err) { + tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up", hc->hc_url_orig); + run = 0; + }else if(timeouts >= 20) { + tvhlog(LOG_WARNING, "webui", "Stop streaming %s, timeout waiting for packets", hc->hc_url_orig); + run = 0; + } + } + pthread_mutex_unlock(&sq->sq_mutex); + continue; + } + + timeouts = 0; //Reset timeout counter + TAILQ_REMOVE(&sq->sq_queue, sm, sm_link); + pthread_mutex_unlock(&sq->sq_mutex); + + pktbuf_t *pb; + + switch(sm->sm_type) { + case SMT_MPEGTS: + pb = sm->sm_data; + if(write(hc->hc_fd, pb->pb_data, pb->pb_size) != pb->pb_size) { + tvhlog(LOG_DEBUG, "webui", "Write error %s, stopping", hc->hc_url_orig); + run = 0; + } + break; + default: + break; + } + + streaming_msg_free(sm); + } +} + + + /** * Output a playlist containing a single channel */ @@ -596,6 +668,34 @@ http_stream_service(http_connection_t *hc, service_t *service) } +/** + * Subscribes to a service and starts the streaming loop + */ +static int +http_stream_tdmi(http_connection_t *hc, th_dvb_mux_instance_t *tdmi) +{ + th_subscription_t *s; + streaming_queue_t sq; + + streaming_queue_init(&sq, SMT_PACKET); + + pthread_mutex_lock(&global_lock); + s = dvb_subscription_create_from_tdmi(tdmi, "HTTP", &sq.sq_st); + pthread_mutex_unlock(&global_lock); + + http_stream_run2(hc, &sq); + + + pthread_mutex_lock(&global_lock); + subscription_unsubscribe(s); + pthread_mutex_unlock(&global_lock); + + streaming_queue_deinit(&sq); + + return 0; +} + + /** * Subscribes to a channel and starts the streaming loop */ @@ -668,6 +768,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch) * Handle the http request. http://tvheadend/stream/channelid/ * http://tvheadend/stream/channel/ * http://tvheadend/stream/service/ + * http://tvheadend/stream/mux/ */ static int http_stream(http_connection_t *hc, const char *remain, void *opaque) @@ -675,6 +776,7 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque) char *components[2]; channel_t *ch = NULL; service_t *service = NULL; + th_dvb_mux_instance_t *tdmi = NULL; hc->hc_keep_alive = 0; @@ -698,14 +800,19 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque) ch = channel_find_by_name(components[1], 0, 0); } else if(!strcmp(components[0], "service")) { service = service_find_by_identifier(components[1]); + } else if(!strcmp(components[0], "mux")) { + tdmi = dvb_mux_find_by_identifier(components[1]); } + // bug here: We can't retain pointers to channels etc outside global_lock pthread_mutex_unlock(&global_lock); if(ch != NULL) { return http_stream_channel(hc, ch); } else if(service != NULL) { return http_stream_service(hc, service); + } else if(tdmi != NULL) { + return http_stream_tdmi(hc, tdmi); } else { http_error(hc, HTTP_STATUS_BAD_REQUEST); return HTTP_STATUS_BAD_REQUEST;