diff --git a/src/muxer_pass.c b/src/muxer_pass.c index 73053438..20a986a7 100644 --- a/src/muxer_pass.c +++ b/src/muxer_pass.c @@ -107,7 +107,8 @@ pass_muxer_reconfigure(muxer_t* m, const struct streaming_start *ss) pass_muxer_t *pm = (pass_muxer_t*)m; const source_info_t *si = &ss->ss_si; - if(si->si_type == S_MPEG_TS) { + if(si->si_type == S_MPEG_TS && ss->ss_pmt_pid) { + pm->pm_pat = realloc(pm->pm_pat, 188); memset(pm->pm_pat, 0xff, 188); pm->pm_pat[0] = 0x47; pm->pm_pat[1] = 0x40; @@ -120,6 +121,7 @@ pass_muxer_reconfigure(muxer_t* m, const struct streaming_start *ss) return -1; } + pm->pm_pmt = realloc(pm->pm_pmt, 188); memset(pm->pm_pmt, 0xff, 188); pm->pm_pmt[0] = 0x47; pm->pm_pmt[1] = 0x40 | (ss->ss_pmt_pid >> 8); @@ -216,14 +218,16 @@ pass_muxer_write_ts(muxer_t *m, pktbuf_t *pb) pass_muxer_t *pm = (pass_muxer_t*)m; int rem; - // Inject pmt and pat into the stream - rem = pm->pm_pc % TS_INJECTION_RATE; - if(!rem) { - pm->pm_pat[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f); - pm->pm_pmt[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f); - pass_muxer_write(m, pm->pm_pmt, 188); - pass_muxer_write(m, pm->pm_pat, 188); - pm->pm_ic++; + if(pm->pm_pat != NULL) { + // Inject pmt and pat into the stream + rem = pm->pm_pc % TS_INJECTION_RATE; + if(!rem) { + pm->pm_pat[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f); + pm->pm_pmt[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f); + pass_muxer_write(m, pm->pm_pmt, 188); + pass_muxer_write(m, pm->pm_pat, 188); + pm->pm_ic++; + } } pass_muxer_write(m, pb->pb_data, pb->pb_size); @@ -331,9 +335,6 @@ pass_muxer_create(muxer_container_type_t mc) pm->m_close = pass_muxer_close; pm->m_destroy = pass_muxer_destroy; - pm->pm_pat = malloc(188); - pm->pm_pmt = malloc(188); - return (muxer_t *)pm; } diff --git a/src/webui/webui.c b/src/webui/webui.c index 05e85df9..7919f040 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -142,8 +142,8 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque) * HTTP stream loop */ static void -http_stream_run(http_connection_t *hc, streaming_queue_t *sq, - th_subscription_t *s, muxer_container_type_t mc) +http_stream_run(http_connection_t *hc, streaming_queue_t *sq, + const char *name, muxer_container_type_t mc) { streaming_message_t *sm; int run = 1; @@ -154,17 +154,11 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq, struct timeval tp; int err = 0; socklen_t errlen = sizeof(err); - const char *name; mux = muxer_create(mc); if(muxer_open_stream(mux, hc->hc_fd)) run = 0; - if(s->ths_channel) - name = s->ths_channel->ch_name; - else - name = "Live Stream"; - /* reduce timeout on write() for streaming */ tp.tv_sec = 5; tp.tv_usec = 0; @@ -266,76 +260,6 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq, } - -/** - * HTTP stream loop - */ -static void -http_stream_run2(http_connection_t *hc, streaming_queue_t *sq) -{ - streaming_message_t *sm; - int run = 1; - int timeouts = 0; - struct timespec ts; - struct timeval tp; - int err = 0; - socklen_t errlen = sizeof(err); - - /* reduce timeout on write() for streaming */ - tp.tv_sec = 5; - tp.tv_usec = 0; - setsockopt(hc->hc_fd, SOL_SOCKET, SO_SNDTIMEO, &tp, sizeof(tp)); - http_output_content(hc, "application/octet-stream"); - - while(run) { - pthread_mutex_lock(&sq->sq_mutex); - sm = TAILQ_FIRST(&sq->sq_queue); - if(sm == NULL) { - gettimeofday(&tp, NULL); - ts.tv_sec = tp.tv_sec + 1; - ts.tv_nsec = tp.tv_usec * 1000; - - if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) { - timeouts++; - - //Check socket status - getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); - if(err) { - tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up", hc->hc_url_orig); - run = 0; - }else if(timeouts >= 20) { - tvhlog(LOG_WARNING, "webui", "Stop streaming %s, timeout waiting for packets", hc->hc_url_orig); - run = 0; - } - } - pthread_mutex_unlock(&sq->sq_mutex); - continue; - } - - timeouts = 0; //Reset timeout counter - TAILQ_REMOVE(&sq->sq_queue, sm, sm_link); - pthread_mutex_unlock(&sq->sq_mutex); - - pktbuf_t *pb; - - switch(sm->sm_type) { - case SMT_MPEGTS: - pb = sm->sm_data; - if(write(hc->hc_fd, pb->pb_data, pb->pb_size) != pb->pb_size) { - tvhlog(LOG_DEBUG, "webui", "Write error %s, stopping", hc->hc_url_orig); - run = 0; - } - break; - default: - break; - } - - streaming_msg_free(sm); - } -} - - - /** * Output a playlist containing a single channel */ @@ -618,7 +542,8 @@ http_stream_service(http_connection_t *hc, service_t *service) muxer_container_type_t mc; int flags; const char *str; - size_t qsize ; + size_t qsize; + const char *name; mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux")); if(mc == MC_UNKNOWN) { @@ -645,15 +570,14 @@ http_stream_service(http_connection_t *hc, service_t *service) flags = 0; } - pthread_mutex_lock(&global_lock); s = subscription_create_from_service(service, "HTTP", st, flags); - pthread_mutex_unlock(&global_lock); - if(s) { - http_stream_run(hc, &sq, s, mc); + name = strdupa(service->s_ch ? + service->s_ch->ch_name : service->s_nicename); + pthread_mutex_unlock(&global_lock); + http_stream_run(hc, &sq, name, mc); pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); - pthread_mutex_unlock(&global_lock); } if(gh) @@ -676,19 +600,15 @@ http_stream_tdmi(http_connection_t *hc, th_dvb_mux_instance_t *tdmi) { th_subscription_t *s; streaming_queue_t sq; - + const char *name; streaming_queue_init(&sq, SMT_PACKET); - pthread_mutex_lock(&global_lock); s = dvb_subscription_create_from_tdmi(tdmi, "HTTP", &sq.sq_st); + name = strdupa(tdmi->tdmi_identifier); pthread_mutex_unlock(&global_lock); - - http_stream_run2(hc, &sq); - - + http_stream_run(hc, &sq, name, MC_PASS); pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); - pthread_mutex_unlock(&global_lock); streaming_queue_deinit(&sq); @@ -713,6 +633,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch) muxer_container_type_t mc; char *str; size_t qsize; + const char *name; mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux")); if(mc == MC_UNKNOWN) { @@ -739,18 +660,17 @@ http_stream_channel(http_connection_t *hc, channel_t *ch) flags = 0; } - pthread_mutex_lock(&global_lock); s = subscription_create_from_channel(ch, priority, "HTTP", st, flags, inet_ntoa(hc->hc_peer->sin_addr), hc->hc_username, http_arg_get(&hc->hc_args, "User-Agent")); - pthread_mutex_unlock(&global_lock); if(s) { - http_stream_run(hc, &sq, s, mc); + name = strdupa(ch->ch_name); + pthread_mutex_unlock(&global_lock); + http_stream_run(hc, &sq, name, mc); pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); - pthread_mutex_unlock(&global_lock); } if(gh) @@ -792,7 +712,7 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque) http_deescape(components[1]); - pthread_mutex_lock(&global_lock); + scopedgloballock(); if(!strcmp(components[0], "channelid")) { ch = channel_find_by_identifier(atoi(components[1])); @@ -804,9 +724,6 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque) tdmi = dvb_mux_find_by_identifier(components[1]); } - // bug here: We can't retain pointers to channels etc outside global_lock - pthread_mutex_unlock(&global_lock); - if(ch != NULL) { return http_stream_channel(hc, ch); } else if(service != NULL) {