diff --git a/src/streaming.c b/src/streaming.c index 2c3c0dad..bf1614f8 100755 --- a/src/streaming.c +++ b/src/streaming.c @@ -54,8 +54,10 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm) pthread_mutex_lock(&sq->sq_mutex); /* queue size protection */ - int queue_size = streaming_queue_size(&sq->sq_queue); - if (queue_size > 1500000) + // TODO: would be better to update size as we go, but this would + // require updates elsewhere to ensure all removals from the queue + // are covered (new function) + if (sq->sq_maxsize && streaming_queue_size(&sq->sq_queue) >= sq->sq_maxsize) streaming_msg_free(sm); else TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link); @@ -69,13 +71,24 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm) * */ void -streaming_queue_init(streaming_queue_t *sq, int reject_filter) +streaming_queue_init2(streaming_queue_t *sq, int reject_filter, size_t maxsize) { streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, reject_filter); pthread_mutex_init(&sq->sq_mutex, NULL); pthread_cond_init(&sq->sq_cond, NULL); TAILQ_INIT(&sq->sq_queue); + + sq->sq_maxsize = maxsize; +} + +/** + * + */ +void +streaming_queue_init(streaming_queue_t *sq, int reject_filter) +{ + streaming_queue_init2(sq, reject_filter, 0); // 0 = unlimited } @@ -341,7 +354,7 @@ streaming_queue_clear(struct streaming_message_queue *q) /** * */ -int streaming_queue_size(struct streaming_message_queue *q) +size_t streaming_queue_size(struct streaming_message_queue *q) { streaming_message_t *sm; int size = 0; diff --git a/src/streaming.h b/src/streaming.h index cee24cf8..5dcda7db 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -71,9 +71,12 @@ void streaming_target_init(streaming_target_t *st, void streaming_queue_init(streaming_queue_t *sq, int reject_filter); +void streaming_queue_init2 + (streaming_queue_t *sq, int reject_filter, size_t maxsize); + void streaming_queue_clear(struct streaming_message_queue *q); -int streaming_queue_size(struct streaming_message_queue *q); +size_t streaming_queue_size(struct streaming_message_queue *q); void streaming_queue_deinit(streaming_queue_t *sq); diff --git a/src/tvheadend.h b/src/tvheadend.h index f7bb0f7e..ae785555 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -332,9 +332,10 @@ typedef struct streaming_queue { streaming_target_t sq_st; - pthread_mutex_t sq_mutex; /* Protects sp_queue */ - pthread_cond_t sq_cond; /* Condvar for signalling new - packets */ + pthread_mutex_t sq_mutex; /* Protects sp_queue */ + pthread_cond_t sq_cond; /* Condvar for signalling new packets */ + + size_t sq_maxsize; /* Max queue size (bytes) */ struct streaming_message_queue sq_queue; diff --git a/src/webui/webui.c b/src/webui/webui.c index fb22dbd0..e404d858 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -541,6 +541,8 @@ http_stream_service(http_connection_t *hc, service_t *service) dvr_config_t *cfg; muxer_container_type_t mc; int flags; + const char *str; + size_t qsize ; mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux")); if(mc == MC_UNKNOWN) { @@ -548,14 +550,19 @@ http_stream_service(http_connection_t *hc, service_t *service) mc = cfg->dvr_mc; } + if ((str = http_arg_get(&hc->hc_req_args, "qsize"))) + qsize = atoll(str); + else + qsize = 1500000; + if(mc == MC_PASS) { - streaming_queue_init(&sq, SMT_PACKET); + streaming_queue_init2(&sq, SMT_PACKET, qsize); gh = NULL; tsfix = NULL; st = &sq.sq_st; flags = SUBSCRIPTION_RAW_MPEGTS; } else { - streaming_queue_init(&sq, 0); + streaming_queue_init2(&sq, 0, qsize); gh = globalheaders_create(&sq.sq_st); tsfix = tsfix_create(gh); st = tsfix; @@ -600,6 +607,8 @@ http_stream_channel(http_connection_t *hc, channel_t *ch) int priority = 100; int flags; muxer_container_type_t mc; + char *str; + size_t qsize; mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux")); if(mc == MC_UNKNOWN) { @@ -607,14 +616,19 @@ http_stream_channel(http_connection_t *hc, channel_t *ch) mc = cfg->dvr_mc; } + if ((str = http_arg_get(&hc->hc_req_args, "qsize"))) + qsize = atoll(str); + else + qsize = 1500000; + if(mc == MC_PASS) { - streaming_queue_init(&sq, SMT_PACKET); + streaming_queue_init2(&sq, SMT_PACKET, qsize); gh = NULL; tsfix = NULL; st = &sq.sq_st; flags = SUBSCRIPTION_RAW_MPEGTS; } else { - streaming_queue_init(&sq, 0); + streaming_queue_init2(&sq, 0, qsize); gh = globalheaders_create(&sq.sq_st); tsfix = tsfix_create(gh); st = tsfix;