Split queue-stuff from streaming_target into its own struct.

This commit is contained in:
Andreas Öman 2009-06-01 20:42:09 +00:00
parent 4851189fb5
commit a869d7101f
6 changed files with 89 additions and 62 deletions

View file

@ -106,7 +106,7 @@ typedef struct dvr_entry {
all commercial breaks so far */
struct dvr_rec_stream_list de_streams;
streaming_target_t de_st;
streaming_queue_t de_sq;
AVFormatContext *de_fctx;
enum {

View file

@ -424,9 +424,9 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp)
/* Link to the pad */
streaming_target_init(&de->de_st, NULL, NULL);
streaming_target_connect(sp, &de->de_st);
de->de_st.st_status = ST_RUNNING;
streaming_queue_init(&de->de_sq);
streaming_target_connect(sp, &de->de_sq.sq_st);
de->de_sq.sq_status = SQ_RUNNING;
de->de_fctx = fctx;
de->de_ts_offset = AV_NOPTS_VALUE;
@ -451,23 +451,23 @@ dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp)
static void
dvr_rec_stop(dvr_entry_t *de)
{
streaming_target_t *st = &de->de_st;
streaming_queue_t *sq = &de->de_sq;
streaming_target_disconnect(&de->de_st);
streaming_target_disconnect(&sq->sq_st);
pthread_mutex_lock(&st->st_mutex);
pthread_mutex_lock(&sq->sq_mutex);
if(st->st_status == ST_RUNNING) {
st->st_status = ST_STOP_REQ;
if(sq->sq_status == SQ_RUNNING) {
sq->sq_status = SQ_STOP_REQ;
pthread_cond_signal(&st->st_cond);
pthread_cond_signal(&sq->sq_cond);
while(st->st_status != ST_ZOMBIE)
pthread_cond_wait(&st->st_cond, &st->st_mutex);
while(sq->sq_status != SQ_ZOMBIE)
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
}
pktref_clear_queue(&st->st_queue);
pthread_mutex_unlock(&st->st_mutex);
pktref_clear_queue(&sq->sq_queue);
pthread_mutex_unlock(&sq->sq_mutex);
}
@ -478,25 +478,25 @@ static void *
dvr_thread(void *aux)
{
dvr_entry_t *de = aux;
streaming_target_t *st = &de->de_st;
streaming_queue_t *sq = &de->de_sq;
th_pktref_t *pr;
pthread_mutex_lock(&st->st_mutex);
pthread_mutex_lock(&sq->sq_mutex);
de->de_header_written = 0;
de->de_rec_state = DE_RS_WAIT_AUDIO_LOCK;
while(st->st_status == ST_RUNNING) {
while(sq->sq_status == SQ_RUNNING) {
pr = TAILQ_FIRST(&st->st_queue);
pr = TAILQ_FIRST(&sq->sq_queue);
if(pr == NULL) {
pthread_cond_wait(&st->st_cond, &st->st_mutex);
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
continue;
}
TAILQ_REMOVE(&st->st_queue, pr, pr_link);
TAILQ_REMOVE(&sq->sq_queue, pr, pr_link);
pthread_mutex_unlock(&st->st_mutex);
pthread_mutex_unlock(&sq->sq_mutex);
if(dispatch_clock > de->de_start)
dvr_thread_new_pkt(de, pr->pr_pkt);
@ -504,14 +504,14 @@ dvr_thread(void *aux)
pkt_ref_dec(pr->pr_pkt);
free(pr);
pthread_mutex_lock(&st->st_mutex);
pthread_mutex_lock(&sq->sq_mutex);
}
/* Signal back that we no longer is running */
st->st_status = ST_ZOMBIE;
pthread_cond_signal(&st->st_cond);
sq->sq_status = SQ_ZOMBIE;
pthread_cond_signal(&sq->sq_cond);
pthread_mutex_unlock(&st->st_mutex);
pthread_mutex_unlock(&sq->sq_mutex);
dvr_thread_epilog(de);

View file

@ -1118,7 +1118,7 @@ htsp_subscription_start(htsp_connection_t *htsp, th_subscription_t *s,
htsmsg_add_str(m, "method", "subscriptionStart");
htsmsg_add_u32(m, "subscriptionId", s->ths_u32);
streaming_target_init(&hs->hs_st, htsp_stream_deliver, hs);
streaming_target_init2(&hs->hs_st, htsp_stream_deliver, hs);
/**
* Lock streming pad delivery so we can hook us up.

View file

@ -34,20 +34,44 @@ streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex)
*
*/
void
streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque)
streaming_target_init2(streaming_target_t *st, st_callback_t *cb, void *opaque)
{
st->st_status = ST_IDLE;
st->st_cb = cb;
st->st_opaque = opaque;
if(cb != NULL)
return;
pthread_mutex_init(&st->st_mutex, NULL);
pthread_cond_init(&st->st_cond, NULL);
TAILQ_INIT(&st->st_queue);
}
/**
*
*/
static void
streaming_queue_deliver(void *opauqe, struct th_pktref *pr)
{
streaming_queue_t *sq = opauqe;
pthread_mutex_lock(&sq->sq_mutex);
TAILQ_INSERT_TAIL(&sq->sq_queue, pr, pr_link);
pthread_cond_signal(&sq->sq_cond);
pthread_mutex_unlock(&sq->sq_mutex);
}
/**
*
*/
void
streaming_queue_init(streaming_queue_t *sq)
{
sq->sq_status = SQ_IDLE;
streaming_target_init2(&sq->sq_st, streaming_queue_deliver, sq);
pthread_mutex_init(&sq->sq_mutex, NULL);
pthread_cond_init(&sq->sq_cond, NULL);
TAILQ_INIT(&sq->sq_queue);
}
/**
*
*/
@ -101,15 +125,6 @@ streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt)
pr = malloc(sizeof(th_pktref_t));
pr->pr_pkt = pkt;
if(st->st_cb != NULL) {
st->st_cb(st->st_opaque, pr);
} else {
pthread_mutex_lock(&st->st_mutex);
TAILQ_INSERT_TAIL(&st->st_queue, pr, pr_link);
pthread_cond_signal(&st->st_cond);
pthread_mutex_unlock(&st->st_mutex);
}
st->st_cb(st->st_opaque, pr);
}
}

View file

@ -27,8 +27,10 @@
*/
void streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex);
void streaming_target_init(streaming_target_t *st,
st_callback_t *cb, void *opaque);
void streaming_target_init2(streaming_target_t *st,
st_callback_t *cb, void *opaque);
void streaming_queue_init(streaming_queue_t *st);
void streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st);

View file

@ -156,25 +156,35 @@ typedef struct streaming_target {
LIST_ENTRY(streaming_target) st_link;
streaming_pad_t *st_pad; /* Source we are linked to */
pthread_mutex_t st_mutex; /* Protects sp_queue */
pthread_cond_t st_cond; /* Condvar for signalling new
packets */
struct th_pktref_queue st_queue;
enum {
ST_IDLE,
ST_RUNNING,
ST_STOP_REQ,
ST_ZOMBIE,
} st_status;
/* Callback driven delivery */
st_callback_t *st_cb;
void *st_opaque;
} streaming_target_t;
/**
*
*/
typedef struct streaming_queue {
streaming_target_t sq_st;
pthread_mutex_t sq_mutex; /* Protects sp_queue */
pthread_cond_t sq_cond; /* Condvar for signalling new
packets */
struct th_pktref_queue sq_queue;
enum {
SQ_IDLE,
SQ_RUNNING,
SQ_STOP_REQ,
SQ_ZOMBIE,
} sq_status;
} streaming_queue_t;
/**
* Descrambler superclass
*