Add callback driven streaming delivery (as opposed to thread + queue). Will be used by HTSP.

This commit is contained in:
Andreas Öman 2008-09-27 13:41:41 +00:00
parent 4f44db4f71
commit 0587c6b83b
4 changed files with 38 additions and 11 deletions

View file

@ -410,7 +410,7 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp)
/* Link to the pad */
streaming_target_init(&de->de_st);
streaming_target_init(&de->de_st, NULL, NULL);
streaming_target_connect(sp, &de->de_st);
de->de_st.st_status = ST_RUNNING;
de->de_fctx = fctx;

View file

@ -34,9 +34,15 @@ streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex)
*
*/
void
streaming_target_init(streaming_target_t *st)
streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque)
{
st->st_status = ST_IDLE;
st->st_cb = cb;
st->st_opaque = opaque;
if(cb != NULL)
return;
pthread_mutex_init(&st->st_mutex, NULL);
pthread_cond_init(&st->st_cond, NULL);
TAILQ_INIT(&st->st_queue);
@ -73,7 +79,8 @@ streaming_target_disconnect(streaming_target_t *st)
pthread_mutex_unlock(sp->sp_mutex);
pktref_clear_queue(&st->st_queue);
if(st->st_cb == NULL)
pktref_clear_queue(&st->st_queue);
}
/**
@ -98,9 +105,14 @@ streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt)
pr = malloc(sizeof(th_pktref_t));
pr->pr_pkt = pkt;
pthread_mutex_lock(&st->st_mutex);
TAILQ_INSERT_TAIL(&st->st_queue, pr, pr_link);
pthread_cond_signal(&st->st_cond);
pthread_mutex_unlock(&st->st_mutex);
if(st->st_cb != NULL) {
st->st_cb(st->st_opaque, pr);
} else {
pthread_mutex_lock(&st->st_mutex);
TAILQ_INSERT_TAIL(&st->st_queue, pr, pr_link);
pthread_cond_signal(&st->st_cond);
pthread_mutex_unlock(&st->st_mutex);
}
}
}

View file

@ -27,7 +27,8 @@
*/
void streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex);
void streaming_target_init(streaming_target_t *st);
void streaming_target_init(streaming_target_t *st,
st_callback_t *cb, void *opaque);
void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st);

View file

@ -142,8 +142,15 @@ typedef struct streaming_component {
/**
* A streaming pad generates data.
* It has one or more streaming targets attached to it.
*
* We support two different streaming target types:
* One is callback driven and the other uses a queue + thread.
*
* Targets which already has a queueing intrastructure in place (such
* as HTSP) does not need any interim queues so it would be a waste. That
* is why we have the callback target.
*
* It has one or more streaming_targets attached to it
*/
typedef struct streaming_pad {
struct streaming_target_list sp_targets;
@ -152,13 +159,17 @@ typedef struct streaming_pad {
pthread_mutex_t *sp_mutex; /* Mutex for protecting modification of
st_targets and delivery.
This needs to be created elsewhere */
This needs to be created elsewhere.
The mutex also protect sp_comonents */
} streaming_pad_t;
/**
* A streaming target receives data
* A streaming target receives data.
*/
struct th_pktref;
typedef void (st_callback_t)(void *opauqe, struct th_pktref *pr);
typedef struct streaming_target {
LIST_ENTRY(streaming_target) st_link;
streaming_pad_t *st_pad; /* Source we are linked to */
@ -176,6 +187,9 @@ typedef struct streaming_target {
ST_ZOMBIE,
} st_status;
/* Callback driven delivery */
st_callback_t *st_cb;
void *st_opaque;
} streaming_target_t;