From a869d7101f8c611dd0c0f2f6112f4c7dbbaac36c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Mon, 1 Jun 2009 20:42:09 +0000 Subject: [PATCH] Split queue-stuff from streaming_target into its own struct. --- src/dvr/dvr.h | 2 +- src/dvr/dvr_rec.c | 48 +++++++++++++++++++++--------------------- src/htsp.c | 2 +- src/streaming.c | 53 ++++++++++++++++++++++++++++++----------------- src/streaming.h | 6 ++++-- src/tvhead.h | 40 +++++++++++++++++++++-------------- 6 files changed, 89 insertions(+), 62 deletions(-) diff --git a/src/dvr/dvr.h b/src/dvr/dvr.h index c25e45a1..9158a5ef 100644 --- a/src/dvr/dvr.h +++ b/src/dvr/dvr.h @@ -106,7 +106,7 @@ typedef struct dvr_entry { all commercial breaks so far */ struct dvr_rec_stream_list de_streams; - streaming_target_t de_st; + streaming_queue_t de_sq; AVFormatContext *de_fctx; enum { diff --git a/src/dvr/dvr_rec.c b/src/dvr/dvr_rec.c index bd9f5213..f211254c 100644 --- a/src/dvr/dvr_rec.c +++ b/src/dvr/dvr_rec.c @@ -424,9 +424,9 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp) /* Link to the pad */ - streaming_target_init(&de->de_st, NULL, NULL); - streaming_target_connect(sp, &de->de_st); - de->de_st.st_status = ST_RUNNING; + streaming_queue_init(&de->de_sq); + streaming_target_connect(sp, &de->de_sq.sq_st); + de->de_sq.sq_status = SQ_RUNNING; de->de_fctx = fctx; de->de_ts_offset = AV_NOPTS_VALUE; @@ -451,23 +451,23 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp) static void dvr_rec_stop(dvr_entry_t *de) { - streaming_target_t *st = &de->de_st; + streaming_queue_t *sq = &de->de_sq; - streaming_target_disconnect(&de->de_st); + streaming_target_disconnect(&sq->sq_st); - pthread_mutex_lock(&st->st_mutex); + pthread_mutex_lock(&sq->sq_mutex); - if(st->st_status == ST_RUNNING) { - st->st_status = ST_STOP_REQ; + if(sq->sq_status == SQ_RUNNING) { + sq->sq_status = SQ_STOP_REQ; - pthread_cond_signal(&st->st_cond); + pthread_cond_signal(&sq->sq_cond); - while(st->st_status != ST_ZOMBIE) - pthread_cond_wait(&st->st_cond, &st->st_mutex); + while(sq->sq_status != SQ_ZOMBIE) + pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); } - pktref_clear_queue(&st->st_queue); - pthread_mutex_unlock(&st->st_mutex); + pktref_clear_queue(&sq->sq_queue); + pthread_mutex_unlock(&sq->sq_mutex); } @@ -478,25 +478,25 @@ static void * dvr_thread(void *aux) { dvr_entry_t *de = aux; - streaming_target_t *st = &de->de_st; + streaming_queue_t *sq = &de->de_sq; th_pktref_t *pr; - pthread_mutex_lock(&st->st_mutex); + pthread_mutex_lock(&sq->sq_mutex); de->de_header_written = 0; de->de_rec_state = DE_RS_WAIT_AUDIO_LOCK; - while(st->st_status == ST_RUNNING) { + while(sq->sq_status == SQ_RUNNING) { - pr = TAILQ_FIRST(&st->st_queue); + pr = TAILQ_FIRST(&sq->sq_queue); if(pr == NULL) { - pthread_cond_wait(&st->st_cond, &st->st_mutex); + pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); continue; } - TAILQ_REMOVE(&st->st_queue, pr, pr_link); + TAILQ_REMOVE(&sq->sq_queue, pr, pr_link); - pthread_mutex_unlock(&st->st_mutex); + pthread_mutex_unlock(&sq->sq_mutex); if(dispatch_clock > de->de_start) dvr_thread_new_pkt(de, pr->pr_pkt); @@ -504,14 +504,14 @@ dvr_thread(void *aux) pkt_ref_dec(pr->pr_pkt); free(pr); - pthread_mutex_lock(&st->st_mutex); + pthread_mutex_lock(&sq->sq_mutex); } /* Signal back that we no longer is running */ - st->st_status = ST_ZOMBIE; - pthread_cond_signal(&st->st_cond); + sq->sq_status = SQ_ZOMBIE; + pthread_cond_signal(&sq->sq_cond); - pthread_mutex_unlock(&st->st_mutex); + pthread_mutex_unlock(&sq->sq_mutex); dvr_thread_epilog(de); diff --git a/src/htsp.c b/src/htsp.c index 68626813..741ff652 100644 --- a/src/htsp.c +++ b/src/htsp.c @@ -1118,7 +1118,7 @@ htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s, htsmsg_add_str(m, "method", "subscriptionStart"); htsmsg_add_u32(m, "subscriptionId", s->ths_u32); - streaming_target_init(&hs->hs_st, htsp_stream_deliver, hs); + streaming_target_init2(&hs->hs_st, htsp_stream_deliver, hs); /** * Lock streming pad delivery so we can hook us up. diff --git a/src/streaming.c b/src/streaming.c index 6aa96336..0715a3db 100644 --- a/src/streaming.c +++ b/src/streaming.c @@ -34,20 +34,44 @@ streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex) * */ void -streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque) +streaming_target_init2(streaming_target_t *st, st_callback_t *cb, void *opaque) { - st->st_status = ST_IDLE; st->st_cb = cb; st->st_opaque = opaque; - - if(cb != NULL) - return; - - pthread_mutex_init(&st->st_mutex, NULL); - pthread_cond_init(&st->st_cond, NULL); - TAILQ_INIT(&st->st_queue); } + +/** + * + */ +static void +streaming_queue_deliver(void *opauqe, struct th_pktref *pr) +{ + streaming_queue_t *sq = opauqe; + + pthread_mutex_lock(&sq->sq_mutex); + TAILQ_INSERT_TAIL(&sq->sq_queue, pr, pr_link); + pthread_cond_signal(&sq->sq_cond); + pthread_mutex_unlock(&sq->sq_mutex); +} + + +/** + * + */ +void +streaming_queue_init(streaming_queue_t *sq) +{ + sq->sq_status = SQ_IDLE; + + streaming_target_init2(&sq->sq_st, streaming_queue_deliver, sq); + + pthread_mutex_init(&sq->sq_mutex, NULL); + pthread_cond_init(&sq->sq_cond, NULL); + TAILQ_INIT(&sq->sq_queue); +} + + /** * */ @@ -101,15 +125,6 @@ streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt) pr = malloc(sizeof(th_pktref_t)); pr->pr_pkt = pkt; - - if(st->st_cb != NULL) { - st->st_cb(st->st_opaque, pr); - } else { - - pthread_mutex_lock(&st->st_mutex); - TAILQ_INSERT_TAIL(&st->st_queue, pr, pr_link); - pthread_cond_signal(&st->st_cond); - pthread_mutex_unlock(&st->st_mutex); - } + st->st_cb(st->st_opaque, pr); } } diff --git a/src/streaming.h b/src/streaming.h index 20478e0b..dd8e6187 100644 --- a/src/streaming.h +++ b/src/streaming.h @@ -27,8 +27,10 @@ */ void streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex); -void streaming_target_init(streaming_target_t *st, - st_callback_t *cb, void *opaque); +void streaming_target_init2(streaming_target_t *st, + st_callback_t *cb, void *opaque); + +void streaming_queue_init(streaming_queue_t *st); void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st); diff --git a/src/tvhead.h b/src/tvhead.h index b76913d7..10067669 100644 --- a/src/tvhead.h +++ b/src/tvhead.h @@ -156,25 +156,35 @@ typedef struct streaming_target { LIST_ENTRY(streaming_target) st_link; streaming_pad_t *st_pad; /* Source we are linked to */ - pthread_mutex_t st_mutex; /* Protects sp_queue */ - pthread_cond_t st_cond; /* Condvar for signalling new - packets */ - - struct th_pktref_queue st_queue; - - enum { - ST_IDLE, - ST_RUNNING, - ST_STOP_REQ, - ST_ZOMBIE, - } st_status; - - /* Callback driven delivery */ st_callback_t *st_cb; void *st_opaque; - } streaming_target_t; + +/** + * + */ +typedef struct streaming_queue { + + streaming_target_t sq_st; + + pthread_mutex_t sq_mutex; /* Protects sp_queue */ + pthread_cond_t sq_cond; /* Condvar for signalling new + packets */ + + struct th_pktref_queue sq_queue; + + enum { + SQ_IDLE, + SQ_RUNNING, + SQ_STOP_REQ, + SQ_ZOMBIE, + } sq_status; + +} streaming_queue_t; + + + /** * Descrambler superclass *