diff --git a/src/streaming.c b/src/streaming.c index 7b4559fe..2c3c0dad 100755 --- a/src/streaming.c +++ b/src/streaming.c @@ -52,7 +52,14 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm) streaming_queue_t *sq = opauqe; pthread_mutex_lock(&sq->sq_mutex); - TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link); + + /* queue size protection */ + int queue_size = streaming_queue_size(&sq->sq_queue); + if (queue_size > 1500000) + streaming_msg_free(sm); + else + TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link); + pthread_cond_signal(&sq->sq_cond); pthread_mutex_unlock(&sq->sq_mutex); } @@ -331,6 +338,36 @@ streaming_queue_clear(struct streaming_message_queue *q) } +/** + * + */ +int streaming_queue_size(struct streaming_message_queue *q) +{ + streaming_message_t *sm; + int size = 0; + + TAILQ_FOREACH(sm, q, sm_link) { + if (sm->sm_type == SMT_PACKET) + { + th_pkt_t *pkt = sm->sm_data; + if (pkt && pkt->pkt_payload) + { + size += pkt->pkt_payload->pb_size; + } + } + else if (sm->sm_type == SMT_MPEGTS) + { + pktbuf_t *pkt_payload = sm->sm_data; + if (pkt_payload) + { + size += pkt_payload->pb_size; + } + } + } + return size; +} + + /** * */ diff --git a/src/streaming.h b/src/streaming.h index eb2919c9..cee24cf8 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -73,6 +73,8 @@ void streaming_queue_init(streaming_queue_t *sq, int reject_filter); void streaming_queue_clear(struct streaming_message_queue *q); +int streaming_queue_size(struct streaming_message_queue *q); + void streaming_queue_deinit(streaming_queue_t *sq); void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st);