Implement delivery of packets via streaming_pads

This commit is contained in:
Andreas Öman 2008-09-17 21:21:15 +00:00
parent 8e2446e65e
commit 9499b1f2e5
6 changed files with 65 additions and 11 deletions

View file

@ -88,6 +88,16 @@ pkt_ref_inc(th_pkt_t *pkt)
pthread_mutex_unlock(&refmutex);
}
/**
*
*/
void
pkt_ref_inc_poly(th_pkt_t *pkt, int n)
{
pthread_mutex_lock(&refmutex);
pkt->pkt_refcount += n;
pthread_mutex_unlock(&refmutex);
}
/**

View file

@ -33,7 +33,7 @@ typedef struct th_pkt {
int pkt_duration;
int pkt_refcount;
uint8_t pkt_streamindex;
uint8_t pkt_componentindex;
uint8_t pkt_frametype;
uint8_t pkt_commercial;
@ -59,6 +59,8 @@ void pkt_ref_dec(th_pkt_t *pkt);
void pkt_ref_inc(th_pkt_t *pkt);
void pkt_ref_inc_poly(th_pkt_t *pkt, int n);
void pktref_clear_queue(struct th_pktref_queue *q);
th_pkt_t *pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts);

View file

@ -31,6 +31,7 @@
#include "bitstream.h"
#include "packet.h"
#include "transports.h"
#include "streaming.h"
static const AVRational mpeg_tc = {1, 90000};
@ -870,16 +871,11 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
*/
transport_signal_status(t, SUBSCRIPTION_VALID_PACKETS);
/* Alert all muxers tied to us that a new packet has arrived */
lock_assert(&t->tht_stream_mutex);
#if 0
LIST_FOREACH(tm, &t->tht_muxers, tm_transport_link)
tm->tm_new_pkt(tm, st, pkt);
#endif
/* Unref (and possibly free) the packet, downstream code is supposed
to increase refcount or copy packet if they need anything */
/* Forward packet */
pkt->pkt_componentindex = st->st_sc.sc_index;
streaming_pad_deliver_packet(&t->tht_streaming_pad, pkt);
/* Decrease our own reference to the packet */
pkt_ref_dec(pkt);
}

View file

@ -20,6 +20,7 @@
#include "tvhead.h"
#include "streaming.h"
#include "packet.h"
void
streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex)
@ -28,3 +29,45 @@ streaming_pad_init(streaming_pad_t *sp, pthread_mutex_t *mutex)
LIST_INIT(&sp->sp_components);
sp->sp_mutex = mutex;
}
/**
*
*/
void
streaming_target_connect(streaming_pad_t *sp, streaming_target_t *st)
{
lock_assert(sp->sp_mutex);
sp->sp_ntargets++;
st->st_pad = sp;
LIST_INSERT_HEAD(&sp->sp_targets, st, st_link);
}
/**
*
*/
void
streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt)
{
streaming_target_t *st;
th_pktref_t *pr;
lock_assert(sp->sp_mutex);
if(sp->sp_ntargets == 0)
return;
/* Increase multiple refcounts at once */
pkt_ref_inc_poly(pkt, sp->sp_ntargets);
LIST_FOREACH(st, &sp->sp_targets, st_link) {
pr = malloc(sizeof(th_pktref_t));
pr->pr_pkt = pkt;
pthread_mutex_lock(&st->st_mutex);
TAILQ_INSERT_TAIL(&st->st_queue, pr, pr_link);
pthread_cond_signal(&st->st_cond);
pthread_mutex_unlock(&st->st_mutex);
}
}

View file

@ -20,7 +20,7 @@
#define STREAMING_H_
#include "tvhead.h"
#include "packet.h"
/**
*
@ -31,4 +31,6 @@ void streaming_target_init(streaming_target_t *st);
void streaming_target_connect(streaming_pad_t *pd, streaming_target_t *st);
void streaming_pad_deliver_packet(streaming_pad_t *sp, th_pkt_t *pkt);
#endif /* STREAMING_H_ */

View file

@ -147,6 +147,7 @@ typedef struct streaming_component {
*/
typedef struct streaming_pad {
struct streaming_target_list sp_targets;
int sp_ntargets;
struct streaming_component_list sp_components;
pthread_mutex_t *sp_mutex; /* Mutex for protecting modification of