From f90324f925161df452342b3dda2bfc70b70f38a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Wed, 17 Sep 2008 19:29:25 +0000 Subject: [PATCH] Rewrite packet handling to be more flexible --- Makefile | 4 +- buffer.c | 364 ------------------------------------------- channels.c | 1 - packet.c | 106 +++++++++++++ buffer.h => packet.h | 73 +++++---- parser_h264.c | 1 - parsers.c | 116 ++++++++------ parsers.h | 7 +- psi.c | 4 +- psi.h | 2 +- pvr.h | 2 +- transports.c | 29 +--- transports.h | 4 +- tsdemux.c | 1 - tvhead.h | 65 +------- 15 files changed, 237 insertions(+), 542 deletions(-) delete mode 100644 buffer.c create mode 100644 packet.c rename buffer.h => packet.h (50%) diff --git a/Makefile b/Makefile index 2abd8db1..0774e482 100644 --- a/Makefile +++ b/Makefile @@ -2,10 +2,12 @@ SRCS = main.c access.c dtable.c tcp.c http.c notify.c epg.c xmltv.c spawn.c +SRCS += packet.c + VPATH += dvr SRCS += dvr_db.c -SRCS += buffer.c channels.c subscriptions.c transports.c +SRCS += channels.c subscriptions.c transports.c SRCS += psi.c parsers.c parser_h264.c tsdemux.c bitstream.c diff --git a/buffer.c b/buffer.c deleted file mode 100644 index 7ef28b02..00000000 --- a/buffer.c +++ /dev/null @@ -1,364 +0,0 @@ -/* - * Packet / Buffer management - * Copyright (C) 2007 Andreas Öman - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -#define _XOPEN_SOURCE 500 -#include - -#include -#include - -#include - -#include -#include -#include -#include -#include -#include -#include -#include - - -#include "tvhead.h" -#include "buffer.h" - - -int64_t store_mem_size; -int64_t store_mem_size_max; -int64_t store_disk_size; -int64_t store_disk_size_max; -int store_packets; - - - - -static struct th_pkt_queue store_mem_queue; -static struct th_pkt_queue store_disk_queue; - -static int store_chunk_size; -static const char *store_path; -static th_storage_t *curstore; -static int store_tally; - - -static void storage_wipe(void); -static void storage_mem_enq(th_stream_t *st, th_pkt_t *pkt); -static void storage_disk_enq(th_stream_t *st, th_pkt_t *pkt); - -static void storage_deref(th_storage_t *s); - -/* - * - */ -void -pkt_init(void) -{ - store_path = NULL; - - if(store_path != NULL) - storage_wipe(); - - TAILQ_INIT(&store_mem_queue); - TAILQ_INIT(&store_disk_queue); - - store_mem_size_max = 1024 * 1024 * 10ULL; - store_disk_size_max = 1024 * 1024 * 4000ULL; - - store_chunk_size = store_disk_size_max / 32; -} - -/* - * - */ -static void -pkt_free(th_pkt_t *pkt) -{ - assert(pkt->pkt_storage == NULL); - free(pkt->pkt_payload); - memset(pkt, 0xff, sizeof(th_pkt_t)); - store_packets--; - free(pkt); -} - - -/* - * - */ -void -pkt_deref(th_pkt_t *pkt) -{ - assert(pkt->pkt_refcount > 0); - if(pkt->pkt_refcount > 1) { - pkt->pkt_refcount--; - return; - } - pkt_free(pkt); -} - -/* - * - */ -th_pkt_t * -pkt_ref(th_pkt_t *pkt) -{ - pkt->pkt_refcount++; - return pkt; -} - -/* - * - */ -th_pkt_t * -pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts) -{ - th_pkt_t *pkt; - - pkt = calloc(1, sizeof(th_pkt_t)); - pkt->pkt_payloadlen = datalen; - if(datalen > 0) { - pkt->pkt_payload = malloc(datalen + FF_INPUT_BUFFER_PADDING_SIZE); - if(data != NULL) - memcpy(pkt->pkt_payload, data, datalen); - } - - pkt->pkt_dts = dts; - pkt->pkt_pts = pts; - pkt->pkt_refcount = 1; - - store_packets++; - return pkt; -} - -/* - * - */ -th_pkt_t * -pkt_copy(th_stream_t *st, th_pkt_t *orig) -{ - th_pkt_t *pkt; - - pkt_load(st, orig); - if(orig->pkt_payload == NULL) - return NULL; - - pkt = malloc(sizeof(th_pkt_t)); - memcpy(pkt, orig, sizeof(th_pkt_t)); - - pkt->pkt_payload = malloc(pkt->pkt_payloadlen + - FF_INPUT_BUFFER_PADDING_SIZE); - memcpy(pkt->pkt_payload, orig->pkt_payload, pkt->pkt_payloadlen); - - pkt->pkt_on_stream_queue = 0; - pkt->pkt_storage = NULL; - pkt->pkt_refcount = 1; - return pkt; -} - - -/* - * - */ -void -pkt_store(th_stream_t *st, th_pkt_t *pkt) -{ - if(pkt->pkt_on_stream_queue) - return; - - pkt->pkt_on_stream_queue = 1; - pkt->pkt_refcount++; - TAILQ_INSERT_TAIL(&st->st_pktq, pkt, pkt_queue_link); - - /* Persistent buffer management */ - - storage_mem_enq(st, pkt); - storage_disk_enq(st, pkt); - - if(pkt->pkt_storage) - pwrite(pkt->pkt_storage->ts_fd, pkt->pkt_payload, pkt->pkt_payloadlen, - pkt->pkt_storage_offset); -} - - -/* - * Force flush of a packet - */ -void -pkt_unstore(th_stream_t *st, th_pkt_t *pkt) -{ - assert(pkt->pkt_on_stream_queue == 1); - TAILQ_REMOVE(&st->st_pktq, pkt, pkt_queue_link); - pkt->pkt_on_stream_queue = 0; - - if(pkt->pkt_storage != NULL) { - storage_deref(pkt->pkt_storage); - TAILQ_REMOVE(&store_disk_queue, pkt, pkt_disk_link); - store_disk_size -= pkt->pkt_payloadlen; - pkt->pkt_storage = NULL; - } - - if(pkt->pkt_payload != NULL) { - TAILQ_REMOVE(&store_mem_queue, pkt, pkt_mem_link); - store_mem_size -= pkt->pkt_payloadlen; - } - pkt_deref(pkt); -} - - -/* - * - */ -int -pkt_load(th_stream_t *st, th_pkt_t *pkt) -{ - if(pkt->pkt_payload == NULL && pkt->pkt_storage != NULL) { - pkt->pkt_payload = malloc(pkt->pkt_payloadlen + - FF_INPUT_BUFFER_PADDING_SIZE); - pread(pkt->pkt_storage->ts_fd, pkt->pkt_payload, pkt->pkt_payloadlen, - pkt->pkt_storage_offset); - storage_mem_enq(st, pkt); - } - return pkt->pkt_payload == NULL ? -1 : 0; -} - - - -/* - * - */ -static void -storage_deref(th_storage_t *s) -{ - if(s->ts_refcount > 1) { - s->ts_refcount--; - return; - } - if(curstore == s) - curstore = NULL; - - close(s->ts_fd); - unlink(s->ts_filename); - free(s->ts_filename); - free(s); -} - - - - -/* - * - */ -static void -storage_mem_enq(th_stream_t *st, th_pkt_t *pkt) -{ - TAILQ_INSERT_TAIL(&store_mem_queue, pkt, pkt_mem_link); - store_mem_size += pkt->pkt_payloadlen; - - while(store_mem_size >= store_mem_size_max) { - pkt = TAILQ_FIRST(&store_mem_queue); - - TAILQ_REMOVE(&store_mem_queue, pkt, pkt_mem_link); - store_mem_size -= pkt->pkt_payloadlen; - - free(pkt->pkt_payload); - pkt->pkt_payload = NULL; - - if(pkt->pkt_storage == NULL) - pkt_unstore(st, pkt); - } -} - - - - -/* - * - */ -static void -storage_disk_enq(th_stream_t *st, th_pkt_t *pkt) -{ - th_storage_t *s; - char fbuf[500]; - int fd; - - if(curstore == NULL) { - snprintf(fbuf, sizeof(fbuf), "%s/s%d", store_path, ++store_tally); - - fd = open(fbuf, O_RDWR | O_CREAT | O_TRUNC, 0644); - if(fd == -1) { - s = NULL; - } else { - s = calloc(1, sizeof(th_storage_t)); - s->ts_fd = fd; - s->ts_filename = strdup(fbuf); - } - curstore = s; - } else { - s = curstore; - } - - - if(s != NULL) { - TAILQ_INSERT_TAIL(&store_disk_queue, pkt, pkt_disk_link); - store_disk_size += pkt->pkt_payloadlen; - - s->ts_refcount++; - pkt->pkt_storage = s; - pkt->pkt_storage_offset = s->ts_offset; - s->ts_offset += pkt->pkt_payloadlen; - if(s->ts_offset > store_chunk_size) - curstore = NULL; - } - - while(store_disk_size > store_disk_size_max) { - pkt = TAILQ_FIRST(&store_disk_queue); - if(pkt->pkt_refcount > 1) - printf("UNSTORE of reference packet %p\n", pkt); - pkt_unstore(st, pkt); - } -} - - - - - - - - - -/** - * Erase all old files - */ -static void -storage_wipe(void) -{ - DIR *dir; - struct dirent *d; - char fbuf[500]; - - if((dir = opendir(store_path)) != NULL) { - - while((d = readdir(dir)) != NULL) { - if(d->d_name[0] == '.') - continue; - - snprintf(fbuf, sizeof(fbuf), "%s/%s", store_path, d->d_name); - unlink(fbuf); - } - } - closedir(dir); -} diff --git a/channels.c b/channels.c index 6d23d329..d3d84bd6 100644 --- a/channels.c +++ b/channels.c @@ -36,7 +36,6 @@ #include "channels.h" #include "transports.h" #include "epg.h" -#include "pvr.h" #include "autorec.h" #include "xmltv.h" #include "dtable.h" diff --git a/packet.c b/packet.c new file mode 100644 index 00000000..7493af9c --- /dev/null +++ b/packet.c @@ -0,0 +1,106 @@ +/** + * Packet management + * Copyright (C) 2008 Andreas Öman + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + + +#include "tvhead.h" +#include "packet.h" +#include "string.h" + +static pthread_mutex_t refmutex = PTHREAD_MUTEX_INITIALIZER; + + +/* + * + */ +static void +pkt_destroy(th_pkt_t *pkt) +{ + free(pkt->pkt_payload); + free(pkt); +} + + +/** + * Allocate a new packet and give it a refcount of one (which caller is + * suppoed to take care of) + */ +th_pkt_t * +pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts) +{ + th_pkt_t *pkt; + + pkt = calloc(1, sizeof(th_pkt_t)); + pkt->pkt_payloadlen = datalen; + if(datalen > 0) { + pkt->pkt_payload = malloc(datalen + FF_INPUT_BUFFER_PADDING_SIZE); + if(data != NULL) + memcpy(pkt->pkt_payload, data, datalen); + } + + pkt->pkt_dts = dts; + pkt->pkt_pts = pts; + pkt->pkt_refcount = 1; + return pkt; +} + +/** + * + */ +void +pkt_ref_dec(th_pkt_t *pkt) +{ + if(pkt->pkt_refcount == 1) { + pkt_destroy(pkt); + return; + } + + /* Use atomic decrease */ + pthread_mutex_lock(&refmutex); + pkt->pkt_refcount--; + pthread_mutex_unlock(&refmutex); +} + + +/** + * + */ +void +pkt_ref_inc(th_pkt_t *pkt) +{ + /* Use atomic increase */ + pthread_mutex_lock(&refmutex); + pkt->pkt_refcount++; + pthread_mutex_unlock(&refmutex); +} + + + +/** + * + */ +void +pktref_clear_queue(struct th_pktref_queue *q) +{ + th_pktref_t *pr; + + while((pr = TAILQ_FIRST(q)) != NULL) { + TAILQ_REMOVE(q, pr, pr_link); + pkt_ref_dec(pr->pr_pkt); + free(pr); + } +} diff --git a/buffer.h b/packet.h similarity index 50% rename from buffer.h rename to packet.h index 2492d798..8e909a8f 100644 --- a/buffer.h +++ b/packet.h @@ -1,6 +1,6 @@ /* - * Packet / Buffer management - * Copyright (C) 2007 Andreas Öman + * Packet nanagement + * Copyright (C) 2008 Andreas Öman * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -16,37 +16,50 @@ * along with this program. If not, see . */ -#ifndef BUFFER_H_ -#define BUFFER_H_ +#ifndef PACKET_H_ +#define PACKET_H_ -th_pkt_t *pkt_ref(th_pkt_t *pkt); -void pkt_deref(th_pkt_t *pkt); +/** + * Packets + */ +#define PKT_I_FRAME 1 +#define PKT_P_FRAME 2 +#define PKT_B_FRAME 3 -void pkt_init(void); +typedef struct th_pkt { + int64_t pkt_dts; + int64_t pkt_pts; + int pkt_duration; + int pkt_refcount; + + uint8_t pkt_frametype; + uint8_t pkt_commercial; + + uint8_t *pkt_payload; + int pkt_payloadlen; + +} th_pkt_t; + + +/** + * A packet reference + */ +typedef struct th_pktref { + TAILQ_ENTRY(th_pktref) pr_link; + th_pkt_t *pr_pkt; +} th_pktref_t; + + +/** + * + */ +void pkt_ref_dec(th_pkt_t *pkt); + +void pkt_ref_inc(th_pkt_t *pkt); + +void pktref_clear_queue(struct th_pktref_queue *q); th_pkt_t *pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts); -th_pkt_t *pkt_copy(th_stream_t *st, th_pkt_t *pkt); - -void pkt_store(th_stream_t *st, th_pkt_t *pkt); - -void pkt_unstore(th_stream_t *st, th_pkt_t *pkt); - -int pkt_load(th_stream_t *st, th_pkt_t *pkt); - -void *pkt_payload(th_pkt_t *pkt); - -size_t pkt_len(th_pkt_t *pkt); - -extern int64_t store_mem_size; -extern int64_t store_mem_size_max; -extern int64_t store_disk_size; -extern int64_t store_disk_size_max; -extern int store_packets; - - -#define pkt_payload(pkt) ((pkt)->pkt_payload) -#define pkt_len(pkt) ((pkt)->pkt_payloadlen) - -#endif /* BUFFER_H_ */ +#endif /* PACKET_H_ */ diff --git a/parser_h264.c b/parser_h264.c index 8f223e51..03b32186 100644 --- a/parser_h264.c +++ b/parser_h264.c @@ -31,7 +31,6 @@ #include "tvhead.h" #include "parsers.h" #include "parser_h264.h" -#include "buffer.h" #include "bitstream.h" /** diff --git a/parsers.c b/parsers.c index 59deb63e..ad9d9172 100644 --- a/parsers.c +++ b/parsers.c @@ -29,7 +29,7 @@ #include "parsers.h" #include "parser_h264.h" #include "bitstream.h" -#include "buffer.h" +#include "packet.h" #include "transports.h" static const AVRational mpeg_tc = {1, 90000}; @@ -85,15 +85,16 @@ static void parse_mpegaudio(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt); static void parse_ac3(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt); -void parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt); +static void parse_compute_pts(th_transport_t *t, th_stream_t *st, + th_pkt_t *pkt); static void parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt); static int parse_pes_header(th_transport_t *t, th_stream_t *st, uint8_t *buf, size_t len); -void parser_compute_duration(th_transport_t *t, th_stream_t *st, - th_pkt_t *pkt); +static void parser_compute_duration(th_transport_t *t, th_stream_t *st, + th_pktref_t *pr); /** * Parse raw mpeg data @@ -296,7 +297,7 @@ parse_mpegaudio(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) sample_rate = mpegaudio_freq_tab[(header >> 10) & 3]; if(sample_rate == 0) { - pkt_deref(pkt); + pkt_ref_dec(pkt); return; } @@ -370,7 +371,7 @@ parse_ac3(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) sample_rate = ac3_freq_tab[src] >> bsid; if(sample_rate == 0) { - pkt_deref(pkt); + pkt_ref_dec(pkt); return; } @@ -534,7 +535,7 @@ parse_mpeg2video(th_transport_t *t, th_stream_t *st, size_t len, return 1; if(st->st_curpkt != NULL) - pkt_deref(st->st_curpkt); + pkt_ref_dec(st->st_curpkt); st->st_curpkt = pkt_alloc(NULL, 0, st->st_curpts, st->st_curdts); st->st_curpkt->pkt_frametype = pkt0.pkt_frametype; @@ -685,6 +686,7 @@ parse_h264(th_transport_t *t, th_stream_t *st, size_t len, st->st_curpkt->pkt_payloadlen = st->st_buffer_ptr; parser_deliver(t, st, st->st_curpkt); + pkt_ref_dec(st->st_curpkt); st->st_curpkt = NULL; st->st_buffer = malloc(st->st_buffer_size); return 1; @@ -700,25 +702,36 @@ parse_h264(th_transport_t *t, th_stream_t *st, size_t len, * We do this by placing packets on a queue and wait for next I/P * frame to appear */ -void +static void parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) { - th_pkt_t *p; - - if(pkt->pkt_pts != AV_NOPTS_VALUE && st->st_ptsq_len == 0) { - /* PTS known and no other packets in queue, deliver at once */ + th_pktref_t *pr; - if(pkt->pkt_duration == 0) - parser_compute_duration(t, st, pkt); - else - parser_deliver(t, st, pkt); - return; - } + int validpts = pkt->pkt_pts != AV_NOPTS_VALUE && st->st_ptsq_len == 0; - TAILQ_INSERT_TAIL(&st->st_ptsq, pkt, pkt_queue_link); + + /* PTS known and no other packets in queue, deliver at once */ + if(validpts && pkt->pkt_duration) + return parser_deliver(t, st, pkt); + + + /* Reference count is transfered to queue */ + pr = malloc(sizeof(th_pktref_t)); + pr->pr_pkt = pkt; + + + if(validpts) + return parser_compute_duration(t, st, pr); + + TAILQ_INSERT_TAIL(&st->st_ptsq, pr, pr_link); st->st_ptsq_len++; - while((pkt = TAILQ_FIRST(&st->st_ptsq)) != NULL) { + /* */ + + while((pr = TAILQ_FIRST(&st->st_ptsq)) != NULL) { + + pkt = pr->pr_pkt; + switch(pkt->pkt_frametype) { case PKT_B_FRAME: /* B-frames have same PTS as DTS, pass them on */ @@ -729,26 +742,29 @@ parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) case PKT_P_FRAME: /* Presentation occures at DTS of next I or P frame, try to find it */ - p = TAILQ_NEXT(pkt, pkt_queue_link); + pr = TAILQ_NEXT(pr, pr_link); while(1) { - if(p == NULL) + if(pr == NULL) return; /* not arrived yet, wait */ - if(p->pkt_frametype <= PKT_P_FRAME) { - pkt->pkt_pts = p->pkt_dts; + if(pr->pr_pkt->pkt_frametype <= PKT_P_FRAME) { + pkt->pkt_pts = pr->pr_pkt->pkt_dts; break; } - p = TAILQ_NEXT(p, pkt_queue_link); + pr = TAILQ_NEXT(pr, pr_link); } break; } - TAILQ_REMOVE(&st->st_ptsq, pkt, pkt_queue_link); + pr = TAILQ_FIRST(&st->st_ptsq); + TAILQ_REMOVE(&st->st_ptsq, pr, pr_link); st->st_ptsq_len--; - if(pkt->pkt_duration == 0) - parser_compute_duration(t, st, pkt); - else + if(pkt->pkt_duration == 0) { + parser_compute_duration(t, st, pr); + } else { parser_deliver(t, st, pkt); + free(pr); + } } } @@ -756,28 +772,27 @@ parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) * Compute duration of a packet, we do this by keeping a packet * until the next one arrives, then we release it */ -void -parser_compute_duration(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) +static void +parser_compute_duration(th_transport_t *t, th_stream_t *st, th_pktref_t *pr) { - th_pkt_t *next; + th_pktref_t *next; int64_t d; - TAILQ_INSERT_TAIL(&st->st_durationq, pkt, pkt_queue_link); + TAILQ_INSERT_TAIL(&st->st_durationq, pr, pr_link); - pkt = TAILQ_FIRST(&st->st_durationq); - if((next = TAILQ_NEXT(pkt, pkt_queue_link)) == NULL) + pr = TAILQ_FIRST(&st->st_durationq); + if((next = TAILQ_NEXT(pr, pr_link)) == NULL) return; - d = next->pkt_dts - pkt->pkt_dts; - TAILQ_REMOVE(&st->st_durationq, pkt, pkt_queue_link); + d = next->pr_pkt->pkt_dts - pr->pr_pkt->pkt_dts; + TAILQ_REMOVE(&st->st_durationq, pr, pr_link); if(d < 10) { - pkt_deref(pkt); - return; + pkt_ref_dec(pr->pr_pkt); + } else { + pr->pr_pkt->pkt_duration = d; + parser_deliver(t, st, pr->pr_pkt); } - - pkt->pkt_duration = d; - - parser_deliver(t, st, pkt); + free(pr); } @@ -788,7 +803,6 @@ parser_compute_duration(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) static void parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) { - th_muxer_t *tm; int64_t dts, pts, ptsoff; assert(pkt->pkt_dts != AV_NOPTS_VALUE); @@ -796,7 +810,7 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) assert(pkt->pkt_duration > 10); if(t->tht_dts_start == AV_NOPTS_VALUE) { - pkt_deref(pkt); + pkt_ref_dec(pkt); return; } @@ -812,7 +826,7 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) if(st->st_last_dts == AV_NOPTS_VALUE) { if(dts < 0) { /* Early packet with negative time stamp, drop those */ - pkt_deref(pkt); + pkt_ref_dec(pkt); return; } } else if(dts + st->st_dts_epoch < st->st_last_dts - (1LL << 24)) { @@ -854,14 +868,14 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) /* Alert all muxers tied to us that a new packet has arrived */ lock_assert(&t->tht_stream_mutex); - +#if 0 LIST_FOREACH(tm, &t->tht_muxers, tm_transport_link) tm->tm_new_pkt(tm, st, pkt); - - /* Unref (and possibly free) the packet, muxers are supposed +#endif + /* Unref (and possibly free) the packet, downstream code is supposed to increase refcount or copy packet if they need anything */ - pkt_deref(pkt); + pkt_ref_dec(pkt); } /** @@ -917,7 +931,7 @@ parser_enqueue_packet(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt) } if(err) { - pkt_deref(pkt); + pkt_ref_dec(pkt); return; } diff --git a/parsers.h b/parsers.h index 6b610e14..245833d3 100644 --- a/parsers.h +++ b/parsers.h @@ -19,14 +19,11 @@ #ifndef PARSERS_H #define PARSERS_H +#include "packet.h" + void parse_raw_mpeg(th_transport_t *t, th_stream_t *st, uint8_t *data, int len, int start, int err); -void parser_compute_duration(th_transport_t *t, th_stream_t *st, - th_pkt_t *pkt); - -void parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt); - void parser_enqueue_packet(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt); extern const unsigned int mpeg2video_framedurations[16]; diff --git a/psi.c b/psi.c index b25046e4..0459585f 100644 --- a/psi.c +++ b/psi.c @@ -306,7 +306,7 @@ psi_parse_pmt(th_transport_t *t, uint8_t *ptr, int len, int chksvcid) /** * PAT generator */ - +#if 0 int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid) { @@ -401,7 +401,7 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid) return psi_append_crc32(buf0, tlen, maxlen); } - +#endif diff --git a/psi.h b/psi.h index 519a4c8e..f1af665e 100644 --- a/psi.h +++ b/psi.h @@ -41,7 +41,7 @@ uint32_t psi_crc32(uint8_t *data, size_t datalen); int psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen, int pmtpid); -int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid); +//int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid); const char *psi_caid2name(uint16_t caid); diff --git a/pvr.h b/pvr.h index 79a32779..aceeb74f 100644 --- a/pvr.h +++ b/pvr.h @@ -68,7 +68,7 @@ typedef struct pvr_rec { pthread_t pvrr_ptid; //dtimer_t pvrr_timer; - th_ffmuxer_t pvrr_tffm; + th_ffmuxer_t pvrr_tffm; int64_t pvrr_dts_offset; diff --git a/transports.c b/transports.c index 86a88f97..ba20c565 100644 --- a/transports.c +++ b/transports.c @@ -38,7 +38,7 @@ #include "v4l.h" #include "psi.h" -#include "buffer.h" +#include "packet.h" #include "channels.h" #include "cwc.h" #include "notify.h" @@ -64,7 +64,6 @@ transport_stop(th_transport_t *t) { th_descrambler_t *td; th_stream_t *st; - th_pkt_t *pkt; gtimer_disarm(&t->tht_receive_timer); @@ -110,35 +109,21 @@ transport_stop(th_transport_t *t) st->st_startcode = 0; if(st->st_curpkt != NULL) { - pkt_deref(st->st_curpkt); + pkt_ref_dec(st->st_curpkt); st->st_curpkt = NULL; } /* Clear PTS queue */ - while((pkt = TAILQ_FIRST(&st->st_ptsq)) != NULL) { - TAILQ_REMOVE(&st->st_ptsq, pkt, pkt_queue_link); - assert(pkt->pkt_refcount == 1); - pkt_deref(pkt); - } + pktref_clear_queue(&st->st_ptsq); st->st_ptsq_len = 0; /* Clear durationq */ - while((pkt = TAILQ_FIRST(&st->st_durationq)) != NULL) { - TAILQ_REMOVE(&st->st_durationq, pkt, pkt_queue_link); - assert(pkt->pkt_refcount == 1); - pkt_deref(pkt); - } - - /* Flush framestore */ - - while((pkt = TAILQ_FIRST(&st->st_pktq)) != NULL) - pkt_unstore(st, pkt); - + pktref_clear_queue(&st->st_durationq); } - pthread_mutex_unlock(&t->tht_stream_mutex); + pthread_mutex_unlock(&t->tht_stream_mutex); } /** @@ -175,7 +160,7 @@ transport_remove_subscriber(th_transport_t *t, th_subscription_t *s) transport_stop(t); } - +#if 0 /** * */ @@ -214,6 +199,7 @@ transport_unlink_muxer(th_muxer_t *tm) pthread_mutex_unlock(&t->tht_stream_mutex); tm->tm_transport = NULL; } +#endif /** @@ -546,7 +532,6 @@ transport_add_stream(th_transport_t *t, int pid, tv_streamtype_t type) TAILQ_INIT(&st->st_ptsq); TAILQ_INIT(&st->st_durationq); - TAILQ_INIT(&st->st_pktq); avgstat_init(&st->st_rate, 10); avgstat_init(&st->st_cc_errors, 10); diff --git a/transports.h b/transports.h index 1069592a..910aee2a 100644 --- a/transports.h +++ b/transports.h @@ -58,8 +58,8 @@ const char *transport_status_to_text(int status); void transport_remove_subscriber(th_transport_t *t, th_subscription_t *s); -void transport_link_muxer(th_transport_t *t, th_muxer_t *tm); +//void transport_link_muxer(th_transport_t *t, th_muxer_t *tm); -void transport_unlink_muxer(th_muxer_t *tm); +//void transport_unlink_muxer(th_muxer_t *tm); #endif /* TRANSPORTS_H */ diff --git a/tsdemux.c b/tsdemux.c index 04269aa0..5998038e 100644 --- a/tsdemux.c +++ b/tsdemux.c @@ -40,7 +40,6 @@ #include "transports.h" #include "subscriptions.h" #include "psi.h" -#include "buffer.h" #include "tsdemux.h" #include "parsers.h" diff --git a/tvhead.h b/tvhead.h index 3db9bdbb..e378be74 100644 --- a/tvhead.h +++ b/tvhead.h @@ -99,14 +99,13 @@ RB_HEAD(th_transport_tree, th_transport); TAILQ_HEAD(th_transport_queue, th_transport); RB_HEAD(th_dvb_mux_instance_tree, th_dvb_mux_instance); LIST_HEAD(th_stream_list, th_stream); -TAILQ_HEAD(th_pkt_queue, th_pkt); -LIST_HEAD(th_pkt_list, th_pkt); LIST_HEAD(th_muxer_list, th_muxer); LIST_HEAD(th_muxstream_list, th_muxstream); LIST_HEAD(th_descrambler_list, th_descrambler); TAILQ_HEAD(th_refpkt_queue, th_refpkt); TAILQ_HEAD(th_muxpkt_queue, th_muxpkt); LIST_HEAD(autorec_list, autorec); +TAILQ_HEAD(th_pktref_queue, th_pktref); extern time_t dispatch_clock; extern int startupcounter; @@ -326,22 +325,14 @@ typedef struct th_stream { struct AVCodecContext *st_ctx; struct AVCodecParserContext *st_parser; - /* All packets currently hanging on to us */ - - struct th_pkt_list st_packets; - /* Temporary frame store for calculating PTS */ - struct th_pkt_queue st_ptsq; + struct th_pktref_queue st_ptsq; int st_ptsq_len; /* Temporary frame store for calculating duration */ - struct th_pkt_queue st_durationq; - - /* Final frame store */ - - struct th_pkt_queue st_pktq; + struct th_pktref_queue st_durationq; /* ca id for this stream */ @@ -683,53 +674,7 @@ typedef struct th_transport { #define tht_file_input u.file_input.file_input -/* - * Storage - */ -typedef struct th_storage { - unsigned int ts_offset; - unsigned int ts_refcount; - int ts_fd; - char *ts_filename; -} th_storage_t; - -/* - * A packet - */ - -#define PKT_I_FRAME 1 -#define PKT_P_FRAME 2 -#define PKT_B_FRAME 3 - - -typedef struct th_pkt { - TAILQ_ENTRY(th_pkt) pkt_queue_link; - uint8_t pkt_on_stream_queue; - uint8_t pkt_frametype; - uint8_t pkt_commercial; - - int64_t pkt_dts; - int64_t pkt_pts; - int pkt_duration; - int pkt_refcount; - - th_storage_t *pkt_storage; - TAILQ_ENTRY(th_pkt) pkt_disk_link; - int pkt_storage_offset; - - uint8_t *pkt_payload; - int pkt_payloadlen; - TAILQ_ENTRY(th_pkt) pkt_mem_link; -} th_pkt_t; - -/** - * Referenced packets - */ -typedef struct th_refpkt { - TAILQ_ENTRY(th_refpkt) trp_link; - th_pkt_t *trp_pkt; -} th_refpkt_t; - +#if 0 /** * Muxed packets @@ -866,7 +811,7 @@ typedef struct th_ffmuxer { struct AVFormatContext *tffm_avfctx; } th_ffmuxer_t; - +#endif /*