[PR-171] Update stream queue size protection to be flexible.

The queue size limits are now configurable in the queue init function.

For now only HTTP queues are bounded, the others should not be necessary.
This commit is contained in:
Adam Sutton 2012-10-23 13:42:06 +01:00
parent ae31362eec
commit a06ee4fba9
4 changed files with 43 additions and 12 deletions

View file

@ -54,8 +54,10 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
pthread_mutex_lock(&sq->sq_mutex);
/* queue size protection */
int queue_size = streaming_queue_size(&sq->sq_queue);
if (queue_size > 1500000)
// TODO: would be better to update size as we go, but this would
// require updates elsewhere to ensure all removals from the queue
// are covered (new function)
if (sq->sq_maxsize && streaming_queue_size(&sq->sq_queue) >= sq->sq_maxsize)
streaming_msg_free(sm);
else
TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
@ -69,13 +71,24 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
*
*/
void
streaming_queue_init(streaming_queue_t *sq, int reject_filter)
streaming_queue_init2(streaming_queue_t *sq, int reject_filter, size_t maxsize)
{
streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, reject_filter);
pthread_mutex_init(&sq->sq_mutex, NULL);
pthread_cond_init(&sq->sq_cond, NULL);
TAILQ_INIT(&sq->sq_queue);
sq->sq_maxsize = maxsize;
}
/**
*
*/
void
streaming_queue_init(streaming_queue_t *sq, int reject_filter)
{
streaming_queue_init2(sq, reject_filter, 0); // 0 = unlimited
}
@ -341,7 +354,7 @@ streaming_queue_clear(struct streaming_message_queue *q)
/**
*
*/
int streaming_queue_size(struct streaming_message_queue *q)
size_t streaming_queue_size(struct streaming_message_queue *q)
{
streaming_message_t *sm;
int size = 0;

View file

@ -71,9 +71,12 @@ void streaming_target_init(streaming_target_t *st,
void streaming_queue_init(streaming_queue_t *sq, int reject_filter);
void streaming_queue_init2
(streaming_queue_t *sq, int reject_filter, size_t maxsize);
void streaming_queue_clear(struct streaming_message_queue *q);
int streaming_queue_size(struct streaming_message_queue *q);
size_t streaming_queue_size(struct streaming_message_queue *q);
void streaming_queue_deinit(streaming_queue_t *sq);

View file

@ -332,9 +332,10 @@ typedef struct streaming_queue {
streaming_target_t sq_st;
pthread_mutex_t sq_mutex; /* Protects sp_queue */
pthread_cond_t sq_cond; /* Condvar for signalling new
packets */
pthread_mutex_t sq_mutex; /* Protects sp_queue */
pthread_cond_t sq_cond; /* Condvar for signalling new packets */
size_t sq_maxsize; /* Max queue size (bytes) */
struct streaming_message_queue sq_queue;

View file

@ -541,6 +541,8 @@ http_stream_service(http_connection_t *hc, service_t *service)
dvr_config_t *cfg;
muxer_container_type_t mc;
int flags;
const char *str;
size_t qsize ;
mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
if(mc == MC_UNKNOWN) {
@ -548,14 +550,19 @@ http_stream_service(http_connection_t *hc, service_t *service)
mc = cfg->dvr_mc;
}
if ((str = http_arg_get(&hc->hc_req_args, "qsize")))
qsize = atoll(str);
else
qsize = 1500000;
if(mc == MC_PASS) {
streaming_queue_init(&sq, SMT_PACKET);
streaming_queue_init2(&sq, SMT_PACKET, qsize);
gh = NULL;
tsfix = NULL;
st = &sq.sq_st;
flags = SUBSCRIPTION_RAW_MPEGTS;
} else {
streaming_queue_init(&sq, 0);
streaming_queue_init2(&sq, 0, qsize);
gh = globalheaders_create(&sq.sq_st);
tsfix = tsfix_create(gh);
st = tsfix;
@ -600,6 +607,8 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
int priority = 100;
int flags;
muxer_container_type_t mc;
char *str;
size_t qsize;
mc = muxer_container_txt2type(http_arg_get(&hc->hc_req_args, "mux"));
if(mc == MC_UNKNOWN) {
@ -607,14 +616,19 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
mc = cfg->dvr_mc;
}
if ((str = http_arg_get(&hc->hc_req_args, "qsize")))
qsize = atoll(str);
else
qsize = 1500000;
if(mc == MC_PASS) {
streaming_queue_init(&sq, SMT_PACKET);
streaming_queue_init2(&sq, SMT_PACKET, qsize);
gh = NULL;
tsfix = NULL;
st = &sq.sq_st;
flags = SUBSCRIPTION_RAW_MPEGTS;
} else {
streaming_queue_init(&sq, 0);
streaming_queue_init2(&sq, 0, qsize);
gh = globalheaders_create(&sq.sq_st);
tsfix = tsfix_create(gh);
st = tsfix;