diff --git a/src/dvr/dvr_rec.c b/src/dvr/dvr_rec.c index f211254c..0bed592b 100644 --- a/src/dvr/dvr_rec.c +++ b/src/dvr/dvr_rec.c @@ -466,7 +466,7 @@ dvr_rec_stop(dvr_entry_t *de) pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); } - pktref_clear_queue(&sq->sq_queue); + streaming_queue_clear(&sq->sq_queue); pthread_mutex_unlock(&sq->sq_mutex); } @@ -479,7 +479,7 @@ dvr_thread(void *aux) { dvr_entry_t *de = aux; streaming_queue_t *sq = &de->de_sq; - th_pktref_t *pr; + streaming_message_t *sm; pthread_mutex_lock(&sq->sq_mutex); @@ -488,22 +488,25 @@ dvr_thread(void *aux) while(sq->sq_status == SQ_RUNNING) { - pr = TAILQ_FIRST(&sq->sq_queue); - if(pr == NULL) { + sm = TAILQ_FIRST(&sq->sq_queue); + if(sm == NULL) { pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); continue; } - TAILQ_REMOVE(&sq->sq_queue, pr, pr_link); + TAILQ_REMOVE(&sq->sq_queue, sm, sm_link); pthread_mutex_unlock(&sq->sq_mutex); - if(dispatch_clock > de->de_start) - dvr_thread_new_pkt(de, pr->pr_pkt); - - pkt_ref_dec(pr->pr_pkt); - free(pr); + switch(sm->sm_type) { + case SMT_PACKET: + if(dispatch_clock > de->de_start) + dvr_thread_new_pkt(de, sm->sm_data); + pkt_ref_dec(sm->sm_data); + break; + } + free(sm); pthread_mutex_lock(&sq->sq_mutex); } diff --git a/src/htsp.c b/src/htsp.c index 741ff652..737ae387 100644 --- a/src/htsp.c +++ b/src/htsp.c @@ -63,10 +63,10 @@ typedef struct htsp_msg { int hm_payloadsize; /* For maintaining stats about streaming buffer depth */ - th_pktref_t *hm_pktref; /* For keeping reference to packet. - hm_msg can contain messages that points - to packet payload so to avoid copy we - keep a reference here */ + th_pkt_t *hm_pkt; /* For keeping reference to packet. + hm_msg can contain messages that points + to packet payload so to avoid copy we + keep a reference here */ } htsp_msg_t; @@ -187,10 +187,8 @@ static void htsp_msg_destroy(htsp_msg_t *hm) { htsmsg_destroy(hm->hm_msg); - if(hm->hm_pktref != NULL) { - pkt_ref_dec(hm->hm_pktref->pr_pkt); - free(hm->hm_pktref); - } + if(hm->hm_pkt != NULL) + pkt_ref_dec(hm->hm_pkt); free(hm); } @@ -229,13 +227,13 @@ htsp_destroy_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq) * */ static void -htsp_send(htsp_connection_t *htsp, htsmsg_t *m, th_pktref_t *pkt, +htsp_send(htsp_connection_t *htsp, htsmsg_t *m, th_pkt_t *pkt, htsp_msg_q_t *hmq, int payloadsize) { htsp_msg_t *hm = malloc(sizeof(htsp_msg_t)); hm->hm_msg = m; - hm->hm_pktref = pkt; + hm->hm_pkt = pkt; hm->hm_payloadsize = payloadsize; pthread_mutex_lock(&htsp->htsp_out_mutex); @@ -1016,16 +1014,18 @@ const static char frametypearray[PKT_NTYPES] = { * Build a htsmsg from a th_pkt and enqueue it on our HTSP transport */ static void -htsp_stream_deliver(void *opaque, struct th_pktref *pr) +htsp_stream_deliver(void *opaque, streaming_message_t *sm) { htsp_stream_t *hs = opaque; - th_pkt_t *pkt = pr->pr_pkt; + th_pkt_t *pkt = sm->sm_data; htsmsg_t *m = htsmsg_create_map(), *n; htsp_msg_t *hm; htsp_connection_t *htsp = hs->hs_htsp; int64_t ts; int qlen = hs->hs_q.hmq_payload; + free(sm); + if((qlen > 500000 && pkt->pkt_frametype == PKT_B_FRAME) || (qlen > 750000 && pkt->pkt_frametype == PKT_P_FRAME) || (qlen > 1500000)) { @@ -1033,8 +1033,7 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr) hs->hs_dropstats[pkt->pkt_frametype]++; /* Queue size protection */ - pkt_ref_dec(pr->pr_pkt); - free(pr); + pkt_ref_dec(pkt); return; } @@ -1053,7 +1052,7 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr) * object that just points to data, thus avoiding a copy. */ htsmsg_add_binptr(m, "payload", pkt->pkt_payload, pkt->pkt_payloadlen); - htsp_send(htsp, m, pr, &hs->hs_q, pkt->pkt_payloadlen); + htsp_send(htsp, m, pkt, &hs->hs_q, pkt->pkt_payloadlen); if(hs->hs_last_report != dispatch_clock) { /* Send a queue status report every second */ diff --git a/src/streaming.c b/src/streaming.c index 0715a3db..7868b7d8 100644 --- a/src/streaming.c +++ b/src/streaming.c @@ -45,12 +45,12 @@ streaming_target_init2(streaming_target_t *st, st_callback_t *cb, void *opaque) * */ static void -streaming_queue_deliver(void *opauqe, struct th_pktref *pr) +streaming_queue_deliver(void *opauqe, streaming_message_t *sm) { streaming_queue_t *sq = opauqe; pthread_mutex_lock(&sq->sq_mutex); - TAILQ_INSERT_TAIL(&sq->sq_queue, pr, pr_link); + TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link); pthread_cond_signal(&sq->sq_cond); pthread_mutex_unlock(&sq->sq_mutex); } @@ -111,7 +111,7 @@ void streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt) { streaming_target_t *st; - th_pktref_t *pr; + streaming_message_t *sm; lock_assert(sp->sp_mutex); @@ -123,8 +123,43 @@ streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt) LIST_FOREACH(st, &sp->sp_targets, st_link) { - pr = malloc(sizeof(th_pktref_t)); - pr->pr_pkt = pkt; - st->st_cb(st->st_opaque, pr); + sm = malloc(sizeof(streaming_message_t)); + sm->sm_type = SMT_PACKET; + sm->sm_data = pkt; + st->st_cb(st->st_opaque, sm); + } +} + + +/** + * + */ +void +streaming_message_free(streaming_message_t *sm) +{ + switch(sm->sm_type) { + case SMT_PACKET: + pkt_ref_dec(sm->sm_data); + break; + + default: + abort(); + } + free(sm); +} + + + +/** + * + */ +void +streaming_queue_clear(struct streaming_message_queue *q) +{ + streaming_message_t *sm; + + while((sm = TAILQ_FIRST(q)) != NULL) { + TAILQ_REMOVE(q, sm, sm_link); + streaming_message_free(sm); } } diff --git a/src/streaming.h b/src/streaming.h index dd8e6187..f57d51b2 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -38,4 +38,8 @@ void streaming_target_disconnect(streaming_target_t *st); void streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt); +void streaming_message_free(streaming_message_t *sm); + +void streaming_queue_clear(struct streaming_message_queue *q); + #endif /* STREAMING_H_ */ diff --git a/src/tvhead.h b/src/tvhead.h index 10067669..2a632225 100644 --- a/src/tvhead.h +++ b/src/tvhead.h @@ -146,11 +146,32 @@ typedef struct streaming_pad { } streaming_pad_t; +TAILQ_HEAD(streaming_message_queue, streaming_message); + +/** + * Streaming messages types + */ + +typedef enum { + SMT_PACKET, +} streaming_message_type_t; + + +/** + * Streaming messages are sent from the pad to its receivers + */ +typedef struct streaming_message { + TAILQ_ENTRY(streaming_message) sm_link; + streaming_message_type_t sm_type; + void *sm_data; + +} streaming_message_t; + /** * A streaming target receives data. */ -struct th_pktref; -typedef void (st_callback_t)(void *opauqe, struct th_pktref *pr); + +typedef void (st_callback_t)(void *opauqe, streaming_message_t *sm); typedef struct streaming_target { LIST_ENTRY(streaming_target) st_link; @@ -172,7 +193,7 @@ typedef struct streaming_queue { pthread_cond_t sq_cond; /* Condvar for signalling new packets */ - struct th_pktref_queue sq_queue; + struct streaming_message_queue sq_queue; enum { SQ_IDLE,