http: Let full mux dump use the normal straeming code
Fix some locking issues in the http streaming code
This commit is contained in:
parent
47049eaa21
commit
0bde079fda
2 changed files with 29 additions and 111 deletions
|
@ -107,7 +107,8 @@ pass_muxer_reconfigure(muxer_t* m, const struct streaming_start *ss)
|
|||
pass_muxer_t *pm = (pass_muxer_t*)m;
|
||||
const source_info_t *si = &ss->ss_si;
|
||||
|
||||
if(si->si_type == S_MPEG_TS) {
|
||||
if(si->si_type == S_MPEG_TS && ss->ss_pmt_pid) {
|
||||
pm->pm_pat = realloc(pm->pm_pat, 188);
|
||||
memset(pm->pm_pat, 0xff, 188);
|
||||
pm->pm_pat[0] = 0x47;
|
||||
pm->pm_pat[1] = 0x40;
|
||||
|
@ -120,6 +121,7 @@ pass_muxer_reconfigure(muxer_t* m, const struct streaming_start *ss)
|
|||
return -1;
|
||||
}
|
||||
|
||||
pm->pm_pmt = realloc(pm->pm_pmt, 188);
|
||||
memset(pm->pm_pmt, 0xff, 188);
|
||||
pm->pm_pmt[0] = 0x47;
|
||||
pm->pm_pmt[1] = 0x40 | (ss->ss_pmt_pid >> 8);
|
||||
|
@ -216,14 +218,16 @@ pass_muxer_write_ts(muxer_t *m, pktbuf_t *pb)
|
|||
pass_muxer_t *pm = (pass_muxer_t*)m;
|
||||
int rem;
|
||||
|
||||
// Inject pmt and pat into the stream
|
||||
rem = pm->pm_pc % TS_INJECTION_RATE;
|
||||
if(!rem) {
|
||||
pm->pm_pat[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
|
||||
pm->pm_pmt[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
|
||||
pass_muxer_write(m, pm->pm_pmt, 188);
|
||||
pass_muxer_write(m, pm->pm_pat, 188);
|
||||
pm->pm_ic++;
|
||||
if(pm->pm_pat != NULL) {
|
||||
// Inject pmt and pat into the stream
|
||||
rem = pm->pm_pc % TS_INJECTION_RATE;
|
||||
if(!rem) {
|
||||
pm->pm_pat[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
|
||||
pm->pm_pmt[3] = (pm->pm_pat[3] & 0xf0) | (pm->pm_ic & 0x0f);
|
||||
pass_muxer_write(m, pm->pm_pmt, 188);
|
||||
pass_muxer_write(m, pm->pm_pat, 188);
|
||||
pm->pm_ic++;
|
||||
}
|
||||
}
|
||||
|
||||
pass_muxer_write(m, pb->pb_data, pb->pb_size);
|
||||
|
@ -331,9 +335,6 @@ pass_muxer_create(muxer_container_type_t mc)
|
|||
pm->m_close = pass_muxer_close;
|
||||
pm->m_destroy = pass_muxer_destroy;
|
||||
|
||||
pm->pm_pat = malloc(188);
|
||||
pm->pm_pmt = malloc(188);
|
||||
|
||||
return (muxer_t *)pm;
|
||||
}
|
||||
|
||||
|
|
|
@ -142,8 +142,8 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque)
|
|||
* HTTP stream loop
|
||||
*/
|
||||
static void
|
||||
http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
|
||||
th_subscription_t *s, muxer_container_type_t mc)
|
||||
http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
|
||||
const char *name, muxer_container_type_t mc)
|
||||
{
|
||||
streaming_message_t *sm;
|
||||
int run = 1;
|
||||
|
@ -154,17 +154,11 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
|
|||
struct timeval tp;
|
||||
int err = 0;
|
||||
socklen_t errlen = sizeof(err);
|
||||
const char *name;
|
||||
|
||||
mux = muxer_create(mc);
|
||||
if(muxer_open_stream(mux, hc->hc_fd))
|
||||
run = 0;
|
||||
|
||||
if(s->ths_channel)
|
||||
name = s->ths_channel->ch_name;
|
||||
else
|
||||
name = "Live Stream";
|
||||
|
||||
/* reduce timeout on write() for streaming */
|
||||
tp.tv_sec = 5;
|
||||
tp.tv_usec = 0;
|
||||
|
@ -266,76 +260,6 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
|
|||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* HTTP stream loop
|
||||
*/
|
||||
static void
|
||||
http_stream_run2(http_connection_t *hc, streaming_queue_t *sq)
|
||||
{
|
||||
streaming_message_t *sm;
|
||||
int run = 1;
|
||||
int timeouts = 0;
|
||||
struct timespec ts;
|
||||
struct timeval tp;
|
||||
int err = 0;
|
||||
socklen_t errlen = sizeof(err);
|
||||
|
||||
/* reduce timeout on write() for streaming */
|
||||
tp.tv_sec = 5;
|
||||
tp.tv_usec = 0;
|
||||
setsockopt(hc->hc_fd, SOL_SOCKET, SO_SNDTIMEO, &tp, sizeof(tp));
|
||||
http_output_content(hc, "application/octet-stream");
|
||||
|
||||
while(run) {
|
||||
pthread_mutex_lock(&sq->sq_mutex);
|
||||
sm = TAILQ_FIRST(&sq->sq_queue);
|
||||
if(sm == NULL) {
|
||||
gettimeofday(&tp, NULL);
|
||||
ts.tv_sec = tp.tv_sec + 1;
|
||||
ts.tv_nsec = tp.tv_usec * 1000;
|
||||
|
||||
if(pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts) == ETIMEDOUT) {
|
||||
timeouts++;
|
||||
|
||||
//Check socket status
|
||||
getsockopt(hc->hc_fd, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen);
|
||||
if(err) {
|
||||
tvhlog(LOG_DEBUG, "webui", "Stop streaming %s, client hung up", hc->hc_url_orig);
|
||||
run = 0;
|
||||
}else if(timeouts >= 20) {
|
||||
tvhlog(LOG_WARNING, "webui", "Stop streaming %s, timeout waiting for packets", hc->hc_url_orig);
|
||||
run = 0;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&sq->sq_mutex);
|
||||
continue;
|
||||
}
|
||||
|
||||
timeouts = 0; //Reset timeout counter
|
||||
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
|
||||
pthread_mutex_unlock(&sq->sq_mutex);
|
||||
|
||||
pktbuf_t *pb;
|
||||
|
||||
switch(sm->sm_type) {
|
||||
case SMT_MPEGTS:
|
||||
pb = sm->sm_data;
|
||||
if(write(hc->hc_fd, pb->pb_data, pb->pb_size) != pb->pb_size) {
|
||||
tvhlog(LOG_DEBUG, "webui", "Write error %s, stopping", hc->hc_url_orig);
|
||||
run = 0;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
streaming_msg_free(sm);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Output a playlist containing a single channel
|
||||
*/
|
||||
|
@ -618,7 +542,8 @@ http_stream_service(http_connection_t *hc, service_t *service)
|
|||
muxer_container_type_t mc;
|
||||
int flags;
|
||||
const char *str;
|
||||
size_t qsize ;
|
||||
size_t qsize;
|
||||
const char *name;
|
||||
|
||||
mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
|
||||
if(mc == MC_UNKNOWN) {
|
||||
|
@ -645,15 +570,14 @@ http_stream_service(http_connection_t *hc, service_t *service)
|
|||
flags = 0;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&global_lock);
|
||||
s = subscription_create_from_service(service, "HTTP", st, flags);
|
||||
pthread_mutex_unlock(&global_lock);
|
||||
|
||||
if(s) {
|
||||
http_stream_run(hc, &sq, s, mc);
|
||||
name = strdupa(service->s_ch ?
|
||||
service->s_ch->ch_name : service->s_nicename);
|
||||
pthread_mutex_unlock(&global_lock);
|
||||
http_stream_run(hc, &sq, name, mc);
|
||||
pthread_mutex_lock(&global_lock);
|
||||
subscription_unsubscribe(s);
|
||||
pthread_mutex_unlock(&global_lock);
|
||||
}
|
||||
|
||||
if(gh)
|
||||
|
@ -676,19 +600,15 @@ http_stream_tdmi(http_connection_t *hc, th_dvb_mux_instance_t *tdmi)
|
|||
{
|
||||
th_subscription_t *s;
|
||||
streaming_queue_t sq;
|
||||
|
||||
const char *name;
|
||||
streaming_queue_init(&sq, SMT_PACKET);
|
||||
|
||||
pthread_mutex_lock(&global_lock);
|
||||
s = dvb_subscription_create_from_tdmi(tdmi, "HTTP", &sq.sq_st);
|
||||
name = strdupa(tdmi->tdmi_identifier);
|
||||
pthread_mutex_unlock(&global_lock);
|
||||
|
||||
http_stream_run2(hc, &sq);
|
||||
|
||||
|
||||
http_stream_run(hc, &sq, name, MC_PASS);
|
||||
pthread_mutex_lock(&global_lock);
|
||||
subscription_unsubscribe(s);
|
||||
pthread_mutex_unlock(&global_lock);
|
||||
|
||||
streaming_queue_deinit(&sq);
|
||||
|
||||
|
@ -713,6 +633,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
|
|||
muxer_container_type_t mc;
|
||||
char *str;
|
||||
size_t qsize;
|
||||
const char *name;
|
||||
|
||||
mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
|
||||
if(mc == MC_UNKNOWN) {
|
||||
|
@ -739,18 +660,17 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
|
|||
flags = 0;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&global_lock);
|
||||
s = subscription_create_from_channel(ch, priority, "HTTP", st, flags,
|
||||
inet_ntoa(hc->hc_peer->sin_addr),
|
||||
hc->hc_username,
|
||||
http_arg_get(&hc->hc_args, "User-Agent"));
|
||||
pthread_mutex_unlock(&global_lock);
|
||||
|
||||
if(s) {
|
||||
http_stream_run(hc, &sq, s, mc);
|
||||
name = strdupa(ch->ch_name);
|
||||
pthread_mutex_unlock(&global_lock);
|
||||
http_stream_run(hc, &sq, name, mc);
|
||||
pthread_mutex_lock(&global_lock);
|
||||
subscription_unsubscribe(s);
|
||||
pthread_mutex_unlock(&global_lock);
|
||||
}
|
||||
|
||||
if(gh)
|
||||
|
@ -792,7 +712,7 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque)
|
|||
|
||||
http_deescape(components[1]);
|
||||
|
||||
pthread_mutex_lock(&global_lock);
|
||||
scopedgloballock();
|
||||
|
||||
if(!strcmp(components[0], "channelid")) {
|
||||
ch = channel_find_by_identifier(atoi(components[1]));
|
||||
|
@ -804,9 +724,6 @@ http_stream(http_connection_t *hc, const char *remain, void *opaque)
|
|||
tdmi = dvb_mux_find_by_identifier(components[1]);
|
||||
}
|
||||
|
||||
// bug here: We can't retain pointers to channels etc outside global_lock
|
||||
pthread_mutex_unlock(&global_lock);
|
||||
|
||||
if(ch != NULL) {
|
||||
return http_stream_channel(hc, ch);
|
||||
} else if(service != NULL) {
|
||||
|
|
Loading…
Add table
Reference in a new issue