From 9499b1f2e582de0a49d9d4fcc5ea677bd761490b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Wed, 17 Sep 2008 21:21:15 +0000 Subject: [PATCH] Implement delivery of packets via streaming_pads --- packet.c | 10 ++++++++++ packet.h | 4 +++- parsers.c | 14 +++++--------- streaming.c | 43 +++++++++++++++++++++++++++++++++++++++++++ streaming.h | 4 +++- tvhead.h | 1 + 6 files changed, 65 insertions(+), 11 deletions(-) diff --git a/packet.c b/packet.c index 7493af9c..583e7802 100644 --- a/packet.c +++ b/packet.c @@ -88,6 +88,16 @@ pkt_ref_inc(th_pkt_t *pkt) pthread_mutex_unlock(&refmutex); } +/** + * + */ +void +pkt_ref_inc_poly(th_pkt_t *pkt, int n) +{ + pthread_mutex_lock(&refmutex); + pkt->pkt_refcount += n; + pthread_mutex_unlock(&refmutex); +} /** diff --git a/packet.h b/packet.h index 48c2df4e..a4e4035b 100644 --- a/packet.h +++ b/packet.h @@ -33,7 +33,7 @@ typedef struct th_pkt { int pkt_duration; int pkt_refcount; - uint8_t pkt_streamindex; + uint8_t pkt_componentindex; uint8_t pkt_frametype; uint8_t pkt_commercial; @@ -59,6 +59,8 @@ void pkt_ref_dec(th_pkt_t *pkt); void pkt_ref_inc(th_pkt_t *pkt); +void pkt_ref_inc_poly(th_pkt_t *pkt, int n); + void pktref_clear_queue(struct th_pktref_queue *q); th_pkt_t *pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts); diff --git a/parsers.c b/parsers.c index 848b4ec8..22ee392c 100644 --- a/parsers.c +++ b/parsers.c @@ -31,6 +31,7 @@ #include "bitstream.h" #include "packet.h" #include "transports.h" +#include "streaming.h" static const AVRational mpeg_tc = {1, 90000}; @@ -870,16 +871,11 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) */ transport_signal_status(t, SUBSCRIPTION_VALID_PACKETS); - /* Alert all muxers tied to us that a new packet has arrived */ - - lock_assert(&t->tht_stream_mutex); -#if 0 - LIST_FOREACH(tm, &t->tht_muxers, tm_transport_link) - tm->tm_new_pkt(tm, st, pkt); -#endif - /* Unref (and possibly free) the packet, downstream code is supposed - to increase refcount or copy packet if they need anything */ + /* Forward packet */ + pkt->pkt_componentindex = st->st_sc.sc_index; + streaming_pad_deliver_packet(&t->tht_streaming_pad, pkt); + /* Decrease our own reference to the packet */ pkt_ref_dec(pkt); } diff --git a/streaming.c b/streaming.c index 616bccd9..4c102efb 100644 --- a/streaming.c +++ b/streaming.c @@ -20,6 +20,7 @@ #include "tvhead.h" #include "streaming.h" +#include "packet.h" void streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex) @@ -28,3 +29,45 @@ streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex) LIST_INIT(&sp->sp_components); sp->sp_mutex = mutex; } + +/** + * + */ +void +streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st) +{ + lock_assert(sp->sp_mutex); + + sp->sp_ntargets++; + st->st_pad = sp; + LIST_INSERT_HEAD(&sp->sp_targets, st, st_link); +} + +/** + * + */ +void +streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt) +{ + streaming_target_t *st; + th_pktref_t *pr; + + lock_assert(sp->sp_mutex); + + if(sp->sp_ntargets == 0) + return; + + /* Increase multiple refcounts at once */ + pkt_ref_inc_poly(pkt, sp->sp_ntargets); + + LIST_FOREACH(st, &sp->sp_targets, st_link) { + + pr = malloc(sizeof(th_pktref_t)); + pr->pr_pkt = pkt; + + pthread_mutex_lock(&st->st_mutex); + TAILQ_INSERT_TAIL(&st->st_queue, pr, pr_link); + pthread_cond_signal(&st->st_cond); + pthread_mutex_unlock(&st->st_mutex); + } +} diff --git a/streaming.h b/streaming.h index 0ac171a6..ee33c275 100644 --- a/streaming.h +++ b/streaming.h @@ -20,7 +20,7 @@ #define STREAMING_H_ #include "tvhead.h" - +#include "packet.h" /** * @@ -31,4 +31,6 @@ void streaming_target_init(streaming_target_t *st); void streaming_target_connect(streaming_pad_t *pd, streaming_target_t *st); +void streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt); + #endif /* STREAMING_H_ */ diff --git a/tvhead.h b/tvhead.h index cb865cd7..222f1af1 100644 --- a/tvhead.h +++ b/tvhead.h @@ -147,6 +147,7 @@ typedef struct streaming_component { */ typedef struct streaming_pad { struct streaming_target_list sp_targets; + int sp_ntargets; struct streaming_component_list sp_components; pthread_mutex_t *sp_mutex; /* Mutex for protecting modification of