Allow generic messages to be sent over a streaming pad, not only packets.

This commit is contained in:
Andreas Öman 2009-06-02 17:38:05 +00:00
parent 8dcb75c38a
commit 962d6cd407
5 changed files with 96 additions and 34 deletions

View file

@ -466,7 +466,7 @@ dvr_rec_stop(dvr_entry_t *de)
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
}
pktref_clear_queue(&sq->sq_queue);
streaming_queue_clear(&sq->sq_queue);
pthread_mutex_unlock(&sq->sq_mutex);
}
@ -479,7 +479,7 @@ dvr_thread(void *aux)
{
dvr_entry_t *de = aux;
streaming_queue_t *sq = &de->de_sq;
th_pktref_t *pr;
streaming_message_t *sm;
pthread_mutex_lock(&sq->sq_mutex);
@ -488,22 +488,25 @@ dvr_thread(void *aux)
while(sq->sq_status == SQ_RUNNING) {
pr = TAILQ_FIRST(&sq->sq_queue);
if(pr == NULL) {
sm = TAILQ_FIRST(&sq->sq_queue);
if(sm == NULL) {
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
continue;
}
TAILQ_REMOVE(&sq->sq_queue, pr, pr_link);
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
pthread_mutex_unlock(&sq->sq_mutex);
if(dispatch_clock > de->de_start)
dvr_thread_new_pkt(de, pr->pr_pkt);
pkt_ref_dec(pr->pr_pkt);
free(pr);
switch(sm->sm_type) {
case SMT_PACKET:
if(dispatch_clock > de->de_start)
dvr_thread_new_pkt(de, sm->sm_data);
pkt_ref_dec(sm->sm_data);
break;
}
free(sm);
pthread_mutex_lock(&sq->sq_mutex);
}

View file

@ -63,10 +63,10 @@ typedef struct htsp_msg {
int hm_payloadsize; /* For maintaining stats about streaming
buffer depth */
th_pktref_t *hm_pktref; /* For keeping reference to packet.
hm_msg can contain messages that points
to packet payload so to avoid copy we
keep a reference here */
th_pkt_t *hm_pkt; /* For keeping reference to packet.
hm_msg can contain messages that points
to packet payload so to avoid copy we
keep a reference here */
} htsp_msg_t;
@ -187,10 +187,8 @@ static void
htsp_msg_destroy(htsp_msg_t *hm)
{
htsmsg_destroy(hm->hm_msg);
if(hm->hm_pktref != NULL) {
pkt_ref_dec(hm->hm_pktref->pr_pkt);
free(hm->hm_pktref);
}
if(hm->hm_pkt != NULL)
pkt_ref_dec(hm->hm_pkt);
free(hm);
}
@ -229,13 +227,13 @@ htsp_destroy_queue(htsp_connection_t *htsp, htsp_msg_q_t *hmq)
*
*/
static void
htsp_send(htsp_connection_t *htsp, htsmsg_t *m, th_pktref_t *pkt,
htsp_send(htsp_connection_t *htsp, htsmsg_t *m, th_pkt_t *pkt,
htsp_msg_q_t *hmq, int payloadsize)
{
htsp_msg_t *hm = malloc(sizeof(htsp_msg_t));
hm->hm_msg = m;
hm->hm_pktref = pkt;
hm->hm_pkt = pkt;
hm->hm_payloadsize = payloadsize;
pthread_mutex_lock(&htsp->htsp_out_mutex);
@ -1016,16 +1014,18 @@ const static char frametypearray[PKT_NTYPES] = {
* Build a htsmsg from a th_pkt and enqueue it on our HTSP transport
*/
static void
htsp_stream_deliver(void *opaque, struct th_pktref *pr)
htsp_stream_deliver(void *opaque, streaming_message_t *sm)
{
htsp_stream_t *hs = opaque;
th_pkt_t *pkt = pr->pr_pkt;
th_pkt_t *pkt = sm->sm_data;
htsmsg_t *m = htsmsg_create_map(), *n;
htsp_msg_t *hm;
htsp_connection_t *htsp = hs->hs_htsp;
int64_t ts;
int qlen = hs->hs_q.hmq_payload;
free(sm);
if((qlen > 500000 && pkt->pkt_frametype == PKT_B_FRAME) ||
(qlen > 750000 && pkt->pkt_frametype == PKT_P_FRAME) ||
(qlen > 1500000)) {
@ -1033,8 +1033,7 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr)
hs->hs_dropstats[pkt->pkt_frametype]++;
/* Queue size protection */
pkt_ref_dec(pr->pr_pkt);
free(pr);
pkt_ref_dec(pkt);
return;
}
@ -1053,7 +1052,7 @@ htsp_stream_deliver(void *opaque, struct th_pktref *pr)
* object that just points to data, thus avoiding a copy.
*/
htsmsg_add_binptr(m, "payload", pkt->pkt_payload, pkt->pkt_payloadlen);
htsp_send(htsp, m, pr, &hs->hs_q, pkt->pkt_payloadlen);
htsp_send(htsp, m, pkt, &hs->hs_q, pkt->pkt_payloadlen);
if(hs->hs_last_report != dispatch_clock) {
/* Send a queue status report every second */

View file

@ -45,12 +45,12 @@ streaming_target_init2(streaming_target_t *st, st_callback_t *cb, void *opaque)
*
*/
static void
streaming_queue_deliver(void *opauqe, struct th_pktref *pr)
streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
{
streaming_queue_t *sq = opauqe;
pthread_mutex_lock(&sq->sq_mutex);
TAILQ_INSERT_TAIL(&sq->sq_queue, pr, pr_link);
TAILQ_INSERT_TAIL(&sq->sq_queue, sm, sm_link);
pthread_cond_signal(&sq->sq_cond);
pthread_mutex_unlock(&sq->sq_mutex);
}
@ -111,7 +111,7 @@ void
streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt)
{
streaming_target_t *st;
th_pktref_t *pr;
streaming_message_t *sm;
lock_assert(sp->sp_mutex);
@ -123,8 +123,43 @@ streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt)
LIST_FOREACH(st, &sp->sp_targets, st_link) {
pr = malloc(sizeof(th_pktref_t));
pr->pr_pkt = pkt;
st->st_cb(st->st_opaque, pr);
sm = malloc(sizeof(streaming_message_t));
sm->sm_type = SMT_PACKET;
sm->sm_data = pkt;
st->st_cb(st->st_opaque, sm);
}
}
/**
*
*/
void
streaming_message_free(streaming_message_t *sm)
{
switch(sm->sm_type) {
case SMT_PACKET:
pkt_ref_dec(sm->sm_data);
break;
default:
abort();
}
free(sm);
}
/**
*
*/
void
streaming_queue_clear(struct streaming_message_queue *q)
{
streaming_message_t *sm;
while((sm = TAILQ_FIRST(q)) != NULL) {
TAILQ_REMOVE(q, sm, sm_link);
streaming_message_free(sm);
}
}

View file

@ -38,4 +38,8 @@ void streaming_target_disconnect(streaming_target_t *st);
void streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt);
void streaming_message_free(streaming_message_t *sm);
void streaming_queue_clear(struct streaming_message_queue *q);
#endif /* STREAMING_H_ */

View file

@ -146,11 +146,32 @@ typedef struct streaming_pad {
} streaming_pad_t;
TAILQ_HEAD(streaming_message_queue, streaming_message);
/**
* Streaming messages types
*/
typedef enum {
SMT_PACKET,
} streaming_message_type_t;
/**
* Streaming messages are sent from the pad to its receivers
*/
typedef struct streaming_message {
TAILQ_ENTRY(streaming_message) sm_link;
streaming_message_type_t sm_type;
void *sm_data;
} streaming_message_t;
/**
* A streaming target receives data.
*/
struct th_pktref;
typedef void (st_callback_t)(void *opauqe, struct th_pktref *pr);
typedef void (st_callback_t)(void *opauqe, streaming_message_t *sm);
typedef struct streaming_target {
LIST_ENTRY(streaming_target) st_link;
@ -172,7 +193,7 @@ typedef struct streaming_queue {
pthread_cond_t sq_cond; /* Condvar for signalling new
packets */
struct th_pktref_queue sq_queue;
struct streaming_message_queue sq_queue;
enum {
SQ_IDLE,