[PR-171] Streaming queue size protection

This commit is contained in:
Jernej Fijačko 2012-10-23 13:23:54 +02:00 committed by Adam Sutton
parent 3efdfbf0af
commit ae31362eec
2 changed files with 40 additions and 1 deletions

View file

@ -52,7 +52,14 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
streaming_queue_t *sq = opauqe;
pthread_mutex_lock(&sq->sq_mutex);
TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
/* queue size protection */
int queue_size = streaming_queue_size(&sq->sq_queue);
if (queue_size > 1500000)
streaming_msg_free(sm);
else
TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
pthread_cond_signal(&sq->sq_cond);
pthread_mutex_unlock(&sq->sq_mutex);
}
@ -331,6 +338,36 @@ streaming_queue_clear(struct streaming_message_queue *q)
}
/**
*
*/
int streaming_queue_size(struct streaming_message_queue *q)
{
streaming_message_t *sm;
int size = 0;
TAILQ_FOREACH(sm, q, sm_link) {
if (sm->sm_type == SMT_PACKET)
{
th_pkt_t *pkt = sm->sm_data;
if (pkt && pkt->pkt_payload)
{
size += pkt->pkt_payload->pb_size;
}
}
else if (sm->sm_type == SMT_MPEGTS)
{
pktbuf_t *pkt_payload = sm->sm_data;
if (pkt_payload)
{
size += pkt_payload->pb_size;
}
}
}
return size;
}
/**
*
*/

View file

@ -73,6 +73,8 @@ void streaming_queue_init(streaming_queue_t *sq, int reject_filter);
void streaming_queue_clear(struct streaming_message_queue *q);
int streaming_queue_size(struct streaming_message_queue *q);
void streaming_queue_deinit(streaming_queue_t *sq);
void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st);