From 0587c6b83b69ca72e4f3791e4d062771ecd1b5c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Sat, 27 Sep 2008 13:41:41 +0000 Subject: [PATCH] Add callback driven streaming delivery (as opposed to thread + queue). Will be used by HTSP. --- dvr/dvr_rec.c | 2 +- streaming.c | 24 ++++++++++++++++++------ streaming.h | 3 ++- tvhead.h | 20 +++++++++++++++++--- 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/dvr/dvr_rec.c b/dvr/dvr_rec.c index 997cd82a..1c2dc497 100644 --- a/dvr/dvr_rec.c +++ b/dvr/dvr_rec.c @@ -410,7 +410,7 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp) /* Link to the pad */ - streaming_target_init(&de->de_st); + streaming_target_init(&de->de_st, NULL, NULL); streaming_target_connect(sp, &de->de_st); de->de_st.st_status = ST_RUNNING; de->de_fctx = fctx; diff --git a/streaming.c b/streaming.c index 58edd3ba..2c071d20 100644 --- a/streaming.c +++ b/streaming.c @@ -34,9 +34,15 @@ streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex) * */ void -streaming_target_init(streaming_target_t *st) +streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque) { st->st_status = ST_IDLE; + st->st_cb = cb; + st->st_opaque = opaque; + + if(cb != NULL) + return; + pthread_mutex_init(&st->st_mutex, NULL); pthread_cond_init(&st->st_cond, NULL); TAILQ_INIT(&st->st_queue); @@ -73,7 +79,8 @@ streaming_target_disconnect(streaming_target_t *st) pthread_mutex_unlock(sp->sp_mutex); - pktref_clear_queue(&st->st_queue); + if(st->st_cb == NULL) + pktref_clear_queue(&st->st_queue); } /** @@ -98,9 +105,14 @@ streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt) pr = malloc(sizeof(th_pktref_t)); pr->pr_pkt = pkt; - pthread_mutex_lock(&st->st_mutex); - TAILQ_INSERT_TAIL(&st->st_queue, pr, pr_link); - pthread_cond_signal(&st->st_cond); - pthread_mutex_unlock(&st->st_mutex); + if(st->st_cb != NULL) { + st->st_cb(st->st_opaque, pr); + } else { + + pthread_mutex_lock(&st->st_mutex); + TAILQ_INSERT_TAIL(&st->st_queue, pr, pr_link); + pthread_cond_signal(&st->st_cond); + pthread_mutex_unlock(&st->st_mutex); + } } } diff --git a/streaming.h b/streaming.h index c3aa0c03..20478e0b 100644 --- a/streaming.h +++ b/streaming.h @@ -27,7 +27,8 @@ */ void streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex); -void streaming_target_init(streaming_target_t *st); +void streaming_target_init(streaming_target_t *st, + st_callback_t *cb, void *opaque); void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st); diff --git a/tvhead.h b/tvhead.h index 9e091cda..404855dd 100644 --- a/tvhead.h +++ b/tvhead.h @@ -142,8 +142,15 @@ typedef struct streaming_component { /** * A streaming pad generates data. + * It has one or more streaming targets attached to it. + * + * We support two different streaming target types: + * One is callback driven and the other uses a queue + thread. + * + * Targets which already has a queueing intrastructure in place (such + * as HTSP) does not need any interim queues so it would be a waste. That + * is why we have the callback target. * - * It has one or more streaming_targets attached to it */ typedef struct streaming_pad { struct streaming_target_list sp_targets; @@ -152,13 +159,17 @@ typedef struct streaming_pad { pthread_mutex_t *sp_mutex; /* Mutex for protecting modification of st_targets and delivery. - This needs to be created elsewhere */ + This needs to be created elsewhere. + The mutex also protect sp_comonents */ } streaming_pad_t; /** - * A streaming target receives data + * A streaming target receives data. */ +struct th_pktref; +typedef void (st_callback_t)(void *opauqe, struct th_pktref *pr); + typedef struct streaming_target { LIST_ENTRY(streaming_target) st_link; streaming_pad_t *st_pad; /* Source we are linked to */ @@ -176,6 +187,9 @@ typedef struct streaming_target { ST_ZOMBIE, } st_status; + /* Callback driven delivery */ + st_callback_t *st_cb; + void *st_opaque; } streaming_target_t;