From a06ee4fba9eb94a86f2fbaca42ced4b5e972ce93 Mon Sep 17 00:00:00 2001 From: Adam Sutton Date: Tue, 23 Oct 2012 13:42:06 +0100 Subject: [PATCH] [PR-171] Update stream queue size protection to be flexible. The queue size limits are now configurable in the queue init function. For now only HTTP queues are bounded, the others should not be necessary. --- src/streaming.c | 21 +++++++++++++++++---- src/streaming.h | 5 ++++- src/tvheadend.h | 7 ++++--- src/webui/webui.c | 22 ++++++++++++++++++---- 4 files changed, 43 insertions(+), 12 deletions(-) diff --git a/src/streaming.c b/src/streaming.c index 2c3c0dad..bf1614f8 100755 --- a/src/streaming.c +++ b/src/streaming.c @@ -54,8 +54,10 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm) pthread_mutex_lock(&sq->sq_mutex); /* queue size protection */ - int queue_size = streaming_queue_size(&sq->sq_queue); - if (queue_size > 1500000) + // TODO: would be better to update size as we go, but this would + // require updates elsewhere to ensure all removals from the queue + // are covered (new function) + if (sq->sq_maxsize && streaming_queue_size(&sq->sq_queue) >= sq->sq_maxsize) streaming_msg_free(sm); else TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link); @@ -69,13 +71,24 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm) * */ void -streaming_queue_init(streaming_queue_t *sq, int reject_filter) +streaming_queue_init2(streaming_queue_t *sq, int reject_filter, size_t maxsize) { streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, reject_filter); pthread_mutex_init(&sq->sq_mutex, NULL); pthread_cond_init(&sq->sq_cond, NULL); TAILQ_INIT(&sq->sq_queue); + + sq->sq_maxsize = maxsize; +} + +/** + * + */ +void +streaming_queue_init(streaming_queue_t *sq, int reject_filter) +{ + streaming_queue_init2(sq, reject_filter, 0); // 0 = unlimited } @@ -341,7 +354,7 @@ streaming_queue_clear(struct streaming_message_queue *q) /** * */ -int streaming_queue_size(struct streaming_message_queue *q) +size_t streaming_queue_size(struct streaming_message_queue *q) { streaming_message_t *sm; int size = 0; diff --git a/src/streaming.h b/src/streaming.h index cee24cf8..5dcda7db 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -71,9 +71,12 @@ void streaming_target_init(streaming_target_t *st, void streaming_queue_init(streaming_queue_t *sq, int reject_filter); +void streaming_queue_init2 + (streaming_queue_t *sq, int reject_filter, size_t maxsize); + void streaming_queue_clear(struct streaming_message_queue *q); -int streaming_queue_size(struct streaming_message_queue *q); +size_t streaming_queue_size(struct streaming_message_queue *q); void streaming_queue_deinit(streaming_queue_t *sq); diff --git a/src/tvheadend.h b/src/tvheadend.h index f7bb0f7e..ae785555 100644 --- a/src/tvheadend.h +++ b/src/tvheadend.h @@ -332,9 +332,10 @@ typedef struct streaming_queue { streaming_target_t sq_st; - pthread_mutex_t sq_mutex; /* Protects sp_queue */ - pthread_cond_t sq_cond; /* Condvar for signalling new - packets */ + pthread_mutex_t sq_mutex; /* Protects sp_queue */ + pthread_cond_t sq_cond; /* Condvar for signalling new packets */ + + size_t sq_maxsize; /* Max queue size (bytes) */ struct streaming_message_queue sq_queue; diff --git a/src/webui/webui.c b/src/webui/webui.c index fb22dbd0..e404d858 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -541,6 +541,8 @@ http_stream_service(http_connection_t *hc, service_t *service) dvr_config_t *cfg; muxer_container_type_t mc; int flags; + const char *str; + size_t qsize ; mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux")); if(mc == MC_UNKNOWN) { @@ -548,14 +550,19 @@ http_stream_service(http_connection_t *hc, service_t *service) mc = cfg->dvr_mc; } + if ((str = http_arg_get(&hc->hc_req_args, "qsize"))) + qsize = atoll(str); + else + qsize = 1500000; + if(mc == MC_PASS) { - streaming_queue_init(&sq, SMT_PACKET); + streaming_queue_init2(&sq, SMT_PACKET, qsize); gh = NULL; tsfix = NULL; st = &sq.sq_st; flags = SUBSCRIPTION_RAW_MPEGTS; } else { - streaming_queue_init(&sq, 0); + streaming_queue_init2(&sq, 0, qsize); gh = globalheaders_create(&sq.sq_st); tsfix = tsfix_create(gh); st = tsfix; @@ -600,6 +607,8 @@ http_stream_channel(http_connection_t *hc, channel_t *ch) int priority = 100; int flags; muxer_container_type_t mc; + char *str; + size_t qsize; mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux")); if(mc == MC_UNKNOWN) { @@ -607,14 +616,19 @@ http_stream_channel(http_connection_t *hc, channel_t *ch) mc = cfg->dvr_mc; } + if ((str = http_arg_get(&hc->hc_req_args, "qsize"))) + qsize = atoll(str); + else + qsize = 1500000; + if(mc == MC_PASS) { - streaming_queue_init(&sq, SMT_PACKET); + streaming_queue_init2(&sq, SMT_PACKET, qsize); gh = NULL; tsfix = NULL; st = &sq.sq_st; flags = SUBSCRIPTION_RAW_MPEGTS; } else { - streaming_queue_init(&sq, 0); + streaming_queue_init2(&sq, 0, qsize); gh = globalheaders_create(&sq.sq_st); tsfix = tsfix_create(gh); st = tsfix;