diff --git a/Makefile b/Makefile
index 2abd8db1..0774e482 100644
--- a/Makefile
+++ b/Makefile
@@ -2,10 +2,12 @@
SRCS = main.c access.c dtable.c tcp.c http.c notify.c epg.c xmltv.c spawn.c
+SRCS += packet.c
+
VPATH += dvr
SRCS += dvr_db.c
-SRCS += buffer.c channels.c subscriptions.c transports.c
+SRCS += channels.c subscriptions.c transports.c
SRCS += psi.c parsers.c parser_h264.c tsdemux.c bitstream.c
diff --git a/buffer.c b/buffer.c
deleted file mode 100644
index 7ef28b02..00000000
--- a/buffer.c
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Packet / Buffer management
- * Copyright (C) 2007 Andreas Öman
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
- */
-
-#define _XOPEN_SOURCE 500
-#include
-
-#include
-#include
-
-#include
-
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-
-
-#include "tvhead.h"
-#include "buffer.h"
-
-
-int64_t store_mem_size;
-int64_t store_mem_size_max;
-int64_t store_disk_size;
-int64_t store_disk_size_max;
-int store_packets;
-
-
-
-
-static struct th_pkt_queue store_mem_queue;
-static struct th_pkt_queue store_disk_queue;
-
-static int store_chunk_size;
-static const char *store_path;
-static th_storage_t *curstore;
-static int store_tally;
-
-
-static void storage_wipe(void);
-static void storage_mem_enq(th_stream_t *st, th_pkt_t *pkt);
-static void storage_disk_enq(th_stream_t *st, th_pkt_t *pkt);
-
-static void storage_deref(th_storage_t *s);
-
-/*
- *
- */
-void
-pkt_init(void)
-{
- store_path = NULL;
-
- if(store_path != NULL)
- storage_wipe();
-
- TAILQ_INIT(&store_mem_queue);
- TAILQ_INIT(&store_disk_queue);
-
- store_mem_size_max = 1024 * 1024 * 10ULL;
- store_disk_size_max = 1024 * 1024 * 4000ULL;
-
- store_chunk_size = store_disk_size_max / 32;
-}
-
-/*
- *
- */
-static void
-pkt_free(th_pkt_t *pkt)
-{
- assert(pkt->pkt_storage == NULL);
- free(pkt->pkt_payload);
- memset(pkt, 0xff, sizeof(th_pkt_t));
- store_packets--;
- free(pkt);
-}
-
-
-/*
- *
- */
-void
-pkt_deref(th_pkt_t *pkt)
-{
- assert(pkt->pkt_refcount > 0);
- if(pkt->pkt_refcount > 1) {
- pkt->pkt_refcount--;
- return;
- }
- pkt_free(pkt);
-}
-
-/*
- *
- */
-th_pkt_t *
-pkt_ref(th_pkt_t *pkt)
-{
- pkt->pkt_refcount++;
- return pkt;
-}
-
-/*
- *
- */
-th_pkt_t *
-pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts)
-{
- th_pkt_t *pkt;
-
- pkt = calloc(1, sizeof(th_pkt_t));
- pkt->pkt_payloadlen = datalen;
- if(datalen > 0) {
- pkt->pkt_payload = malloc(datalen + FF_INPUT_BUFFER_PADDING_SIZE);
- if(data != NULL)
- memcpy(pkt->pkt_payload, data, datalen);
- }
-
- pkt->pkt_dts = dts;
- pkt->pkt_pts = pts;
- pkt->pkt_refcount = 1;
-
- store_packets++;
- return pkt;
-}
-
-/*
- *
- */
-th_pkt_t *
-pkt_copy(th_stream_t *st, th_pkt_t *orig)
-{
- th_pkt_t *pkt;
-
- pkt_load(st, orig);
- if(orig->pkt_payload == NULL)
- return NULL;
-
- pkt = malloc(sizeof(th_pkt_t));
- memcpy(pkt, orig, sizeof(th_pkt_t));
-
- pkt->pkt_payload = malloc(pkt->pkt_payloadlen +
- FF_INPUT_BUFFER_PADDING_SIZE);
- memcpy(pkt->pkt_payload, orig->pkt_payload, pkt->pkt_payloadlen);
-
- pkt->pkt_on_stream_queue = 0;
- pkt->pkt_storage = NULL;
- pkt->pkt_refcount = 1;
- return pkt;
-}
-
-
-/*
- *
- */
-void
-pkt_store(th_stream_t *st, th_pkt_t *pkt)
-{
- if(pkt->pkt_on_stream_queue)
- return;
-
- pkt->pkt_on_stream_queue = 1;
- pkt->pkt_refcount++;
- TAILQ_INSERT_TAIL(&st->st_pktq, pkt, pkt_queue_link);
-
- /* Persistent buffer management */
-
- storage_mem_enq(st, pkt);
- storage_disk_enq(st, pkt);
-
- if(pkt->pkt_storage)
- pwrite(pkt->pkt_storage->ts_fd, pkt->pkt_payload, pkt->pkt_payloadlen,
- pkt->pkt_storage_offset);
-}
-
-
-/*
- * Force flush of a packet
- */
-void
-pkt_unstore(th_stream_t *st, th_pkt_t *pkt)
-{
- assert(pkt->pkt_on_stream_queue == 1);
- TAILQ_REMOVE(&st->st_pktq, pkt, pkt_queue_link);
- pkt->pkt_on_stream_queue = 0;
-
- if(pkt->pkt_storage != NULL) {
- storage_deref(pkt->pkt_storage);
- TAILQ_REMOVE(&store_disk_queue, pkt, pkt_disk_link);
- store_disk_size -= pkt->pkt_payloadlen;
- pkt->pkt_storage = NULL;
- }
-
- if(pkt->pkt_payload != NULL) {
- TAILQ_REMOVE(&store_mem_queue, pkt, pkt_mem_link);
- store_mem_size -= pkt->pkt_payloadlen;
- }
- pkt_deref(pkt);
-}
-
-
-/*
- *
- */
-int
-pkt_load(th_stream_t *st, th_pkt_t *pkt)
-{
- if(pkt->pkt_payload == NULL && pkt->pkt_storage != NULL) {
- pkt->pkt_payload = malloc(pkt->pkt_payloadlen +
- FF_INPUT_BUFFER_PADDING_SIZE);
- pread(pkt->pkt_storage->ts_fd, pkt->pkt_payload, pkt->pkt_payloadlen,
- pkt->pkt_storage_offset);
- storage_mem_enq(st, pkt);
- }
- return pkt->pkt_payload == NULL ? -1 : 0;
-}
-
-
-
-/*
- *
- */
-static void
-storage_deref(th_storage_t *s)
-{
- if(s->ts_refcount > 1) {
- s->ts_refcount--;
- return;
- }
- if(curstore == s)
- curstore = NULL;
-
- close(s->ts_fd);
- unlink(s->ts_filename);
- free(s->ts_filename);
- free(s);
-}
-
-
-
-
-/*
- *
- */
-static void
-storage_mem_enq(th_stream_t *st, th_pkt_t *pkt)
-{
- TAILQ_INSERT_TAIL(&store_mem_queue, pkt, pkt_mem_link);
- store_mem_size += pkt->pkt_payloadlen;
-
- while(store_mem_size >= store_mem_size_max) {
- pkt = TAILQ_FIRST(&store_mem_queue);
-
- TAILQ_REMOVE(&store_mem_queue, pkt, pkt_mem_link);
- store_mem_size -= pkt->pkt_payloadlen;
-
- free(pkt->pkt_payload);
- pkt->pkt_payload = NULL;
-
- if(pkt->pkt_storage == NULL)
- pkt_unstore(st, pkt);
- }
-}
-
-
-
-
-/*
- *
- */
-static void
-storage_disk_enq(th_stream_t *st, th_pkt_t *pkt)
-{
- th_storage_t *s;
- char fbuf[500];
- int fd;
-
- if(curstore == NULL) {
- snprintf(fbuf, sizeof(fbuf), "%s/s%d", store_path, ++store_tally);
-
- fd = open(fbuf, O_RDWR | O_CREAT | O_TRUNC, 0644);
- if(fd == -1) {
- s = NULL;
- } else {
- s = calloc(1, sizeof(th_storage_t));
- s->ts_fd = fd;
- s->ts_filename = strdup(fbuf);
- }
- curstore = s;
- } else {
- s = curstore;
- }
-
-
- if(s != NULL) {
- TAILQ_INSERT_TAIL(&store_disk_queue, pkt, pkt_disk_link);
- store_disk_size += pkt->pkt_payloadlen;
-
- s->ts_refcount++;
- pkt->pkt_storage = s;
- pkt->pkt_storage_offset = s->ts_offset;
- s->ts_offset += pkt->pkt_payloadlen;
- if(s->ts_offset > store_chunk_size)
- curstore = NULL;
- }
-
- while(store_disk_size > store_disk_size_max) {
- pkt = TAILQ_FIRST(&store_disk_queue);
- if(pkt->pkt_refcount > 1)
- printf("UNSTORE of reference packet %p\n", pkt);
- pkt_unstore(st, pkt);
- }
-}
-
-
-
-
-
-
-
-
-
-/**
- * Erase all old files
- */
-static void
-storage_wipe(void)
-{
- DIR *dir;
- struct dirent *d;
- char fbuf[500];
-
- if((dir = opendir(store_path)) != NULL) {
-
- while((d = readdir(dir)) != NULL) {
- if(d->d_name[0] == '.')
- continue;
-
- snprintf(fbuf, sizeof(fbuf), "%s/%s", store_path, d->d_name);
- unlink(fbuf);
- }
- }
- closedir(dir);
-}
diff --git a/channels.c b/channels.c
index 6d23d329..d3d84bd6 100644
--- a/channels.c
+++ b/channels.c
@@ -36,7 +36,6 @@
#include "channels.h"
#include "transports.h"
#include "epg.h"
-#include "pvr.h"
#include "autorec.h"
#include "xmltv.h"
#include "dtable.h"
diff --git a/packet.c b/packet.c
new file mode 100644
index 00000000..7493af9c
--- /dev/null
+++ b/packet.c
@@ -0,0 +1,106 @@
+/**
+ * Packet management
+ * Copyright (C) 2008 Andreas Öman
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+
+#include "tvhead.h"
+#include "packet.h"
+#include "string.h"
+
+static pthread_mutex_t refmutex = PTHREAD_MUTEX_INITIALIZER;
+
+
+/*
+ *
+ */
+static void
+pkt_destroy(th_pkt_t *pkt)
+{
+ free(pkt->pkt_payload);
+ free(pkt);
+}
+
+
+/**
+ * Allocate a new packet and give it a refcount of one (which caller is
+ * suppoed to take care of)
+ */
+th_pkt_t *
+pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts)
+{
+ th_pkt_t *pkt;
+
+ pkt = calloc(1, sizeof(th_pkt_t));
+ pkt->pkt_payloadlen = datalen;
+ if(datalen > 0) {
+ pkt->pkt_payload = malloc(datalen + FF_INPUT_BUFFER_PADDING_SIZE);
+ if(data != NULL)
+ memcpy(pkt->pkt_payload, data, datalen);
+ }
+
+ pkt->pkt_dts = dts;
+ pkt->pkt_pts = pts;
+ pkt->pkt_refcount = 1;
+ return pkt;
+}
+
+/**
+ *
+ */
+void
+pkt_ref_dec(th_pkt_t *pkt)
+{
+ if(pkt->pkt_refcount == 1) {
+ pkt_destroy(pkt);
+ return;
+ }
+
+ /* Use atomic decrease */
+ pthread_mutex_lock(&refmutex);
+ pkt->pkt_refcount--;
+ pthread_mutex_unlock(&refmutex);
+}
+
+
+/**
+ *
+ */
+void
+pkt_ref_inc(th_pkt_t *pkt)
+{
+ /* Use atomic increase */
+ pthread_mutex_lock(&refmutex);
+ pkt->pkt_refcount++;
+ pthread_mutex_unlock(&refmutex);
+}
+
+
+
+/**
+ *
+ */
+void
+pktref_clear_queue(struct th_pktref_queue *q)
+{
+ th_pktref_t *pr;
+
+ while((pr = TAILQ_FIRST(q)) != NULL) {
+ TAILQ_REMOVE(q, pr, pr_link);
+ pkt_ref_dec(pr->pr_pkt);
+ free(pr);
+ }
+}
diff --git a/buffer.h b/packet.h
similarity index 50%
rename from buffer.h
rename to packet.h
index 2492d798..8e909a8f 100644
--- a/buffer.h
+++ b/packet.h
@@ -1,6 +1,6 @@
/*
- * Packet / Buffer management
- * Copyright (C) 2007 Andreas Öman
+ * Packet nanagement
+ * Copyright (C) 2008 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -16,37 +16,50 @@
* along with this program. If not, see .
*/
-#ifndef BUFFER_H_
-#define BUFFER_H_
+#ifndef PACKET_H_
+#define PACKET_H_
-th_pkt_t *pkt_ref(th_pkt_t *pkt);
-void pkt_deref(th_pkt_t *pkt);
+/**
+ * Packets
+ */
+#define PKT_I_FRAME 1
+#define PKT_P_FRAME 2
+#define PKT_B_FRAME 3
-void pkt_init(void);
+typedef struct th_pkt {
+ int64_t pkt_dts;
+ int64_t pkt_pts;
+ int pkt_duration;
+ int pkt_refcount;
+
+ uint8_t pkt_frametype;
+ uint8_t pkt_commercial;
+
+ uint8_t *pkt_payload;
+ int pkt_payloadlen;
+
+} th_pkt_t;
+
+
+/**
+ * A packet reference
+ */
+typedef struct th_pktref {
+ TAILQ_ENTRY(th_pktref) pr_link;
+ th_pkt_t *pr_pkt;
+} th_pktref_t;
+
+
+/**
+ *
+ */
+void pkt_ref_dec(th_pkt_t *pkt);
+
+void pkt_ref_inc(th_pkt_t *pkt);
+
+void pktref_clear_queue(struct th_pktref_queue *q);
th_pkt_t *pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts);
-th_pkt_t *pkt_copy(th_stream_t *st, th_pkt_t *pkt);
-
-void pkt_store(th_stream_t *st, th_pkt_t *pkt);
-
-void pkt_unstore(th_stream_t *st, th_pkt_t *pkt);
-
-int pkt_load(th_stream_t *st, th_pkt_t *pkt);
-
-void *pkt_payload(th_pkt_t *pkt);
-
-size_t pkt_len(th_pkt_t *pkt);
-
-extern int64_t store_mem_size;
-extern int64_t store_mem_size_max;
-extern int64_t store_disk_size;
-extern int64_t store_disk_size_max;
-extern int store_packets;
-
-
-#define pkt_payload(pkt) ((pkt)->pkt_payload)
-#define pkt_len(pkt) ((pkt)->pkt_payloadlen)
-
-#endif /* BUFFER_H_ */
+#endif /* PACKET_H_ */
diff --git a/parser_h264.c b/parser_h264.c
index 8f223e51..03b32186 100644
--- a/parser_h264.c
+++ b/parser_h264.c
@@ -31,7 +31,6 @@
#include "tvhead.h"
#include "parsers.h"
#include "parser_h264.h"
-#include "buffer.h"
#include "bitstream.h"
/**
diff --git a/parsers.c b/parsers.c
index 59deb63e..ad9d9172 100644
--- a/parsers.c
+++ b/parsers.c
@@ -29,7 +29,7 @@
#include "parsers.h"
#include "parser_h264.h"
#include "bitstream.h"
-#include "buffer.h"
+#include "packet.h"
#include "transports.h"
static const AVRational mpeg_tc = {1, 90000};
@@ -85,15 +85,16 @@ static void parse_mpegaudio(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
static void parse_ac3(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
-void parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
+static void parse_compute_pts(th_transport_t *t, th_stream_t *st,
+ th_pkt_t *pkt);
static void parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
static int parse_pes_header(th_transport_t *t, th_stream_t *st, uint8_t *buf,
size_t len);
-void parser_compute_duration(th_transport_t *t, th_stream_t *st,
- th_pkt_t *pkt);
+static void parser_compute_duration(th_transport_t *t, th_stream_t *st,
+ th_pktref_t *pr);
/**
* Parse raw mpeg data
@@ -296,7 +297,7 @@ parse_mpegaudio(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
sample_rate = mpegaudio_freq_tab[(header >> 10) & 3];
if(sample_rate == 0) {
- pkt_deref(pkt);
+ pkt_ref_dec(pkt);
return;
}
@@ -370,7 +371,7 @@ parse_ac3(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
sample_rate = ac3_freq_tab[src] >> bsid;
if(sample_rate == 0) {
- pkt_deref(pkt);
+ pkt_ref_dec(pkt);
return;
}
@@ -534,7 +535,7 @@ parse_mpeg2video(th_transport_t *t, th_stream_t *st, size_t len,
return 1;
if(st->st_curpkt != NULL)
- pkt_deref(st->st_curpkt);
+ pkt_ref_dec(st->st_curpkt);
st->st_curpkt = pkt_alloc(NULL, 0, st->st_curpts, st->st_curdts);
st->st_curpkt->pkt_frametype = pkt0.pkt_frametype;
@@ -685,6 +686,7 @@ parse_h264(th_transport_t *t, th_stream_t *st, size_t len,
st->st_curpkt->pkt_payloadlen = st->st_buffer_ptr;
parser_deliver(t, st, st->st_curpkt);
+ pkt_ref_dec(st->st_curpkt);
st->st_curpkt = NULL;
st->st_buffer = malloc(st->st_buffer_size);
return 1;
@@ -700,25 +702,36 @@ parse_h264(th_transport_t *t, th_stream_t *st, size_t len,
* We do this by placing packets on a queue and wait for next I/P
* frame to appear
*/
-void
+static void
parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
{
- th_pkt_t *p;
-
- if(pkt->pkt_pts != AV_NOPTS_VALUE && st->st_ptsq_len == 0) {
- /* PTS known and no other packets in queue, deliver at once */
+ th_pktref_t *pr;
- if(pkt->pkt_duration == 0)
- parser_compute_duration(t, st, pkt);
- else
- parser_deliver(t, st, pkt);
- return;
- }
+ int validpts = pkt->pkt_pts != AV_NOPTS_VALUE && st->st_ptsq_len == 0;
- TAILQ_INSERT_TAIL(&st->st_ptsq, pkt, pkt_queue_link);
+
+ /* PTS known and no other packets in queue, deliver at once */
+ if(validpts && pkt->pkt_duration)
+ return parser_deliver(t, st, pkt);
+
+
+ /* Reference count is transfered to queue */
+ pr = malloc(sizeof(th_pktref_t));
+ pr->pr_pkt = pkt;
+
+
+ if(validpts)
+ return parser_compute_duration(t, st, pr);
+
+ TAILQ_INSERT_TAIL(&st->st_ptsq, pr, pr_link);
st->st_ptsq_len++;
- while((pkt = TAILQ_FIRST(&st->st_ptsq)) != NULL) {
+ /* */
+
+ while((pr = TAILQ_FIRST(&st->st_ptsq)) != NULL) {
+
+ pkt = pr->pr_pkt;
+
switch(pkt->pkt_frametype) {
case PKT_B_FRAME:
/* B-frames have same PTS as DTS, pass them on */
@@ -729,26 +742,29 @@ parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
case PKT_P_FRAME:
/* Presentation occures at DTS of next I or P frame,
try to find it */
- p = TAILQ_NEXT(pkt, pkt_queue_link);
+ pr = TAILQ_NEXT(pr, pr_link);
while(1) {
- if(p == NULL)
+ if(pr == NULL)
return; /* not arrived yet, wait */
- if(p->pkt_frametype <= PKT_P_FRAME) {
- pkt->pkt_pts = p->pkt_dts;
+ if(pr->pr_pkt->pkt_frametype <= PKT_P_FRAME) {
+ pkt->pkt_pts = pr->pr_pkt->pkt_dts;
break;
}
- p = TAILQ_NEXT(p, pkt_queue_link);
+ pr = TAILQ_NEXT(pr, pr_link);
}
break;
}
- TAILQ_REMOVE(&st->st_ptsq, pkt, pkt_queue_link);
+ pr = TAILQ_FIRST(&st->st_ptsq);
+ TAILQ_REMOVE(&st->st_ptsq, pr, pr_link);
st->st_ptsq_len--;
- if(pkt->pkt_duration == 0)
- parser_compute_duration(t, st, pkt);
- else
+ if(pkt->pkt_duration == 0) {
+ parser_compute_duration(t, st, pr);
+ } else {
parser_deliver(t, st, pkt);
+ free(pr);
+ }
}
}
@@ -756,28 +772,27 @@ parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
* Compute duration of a packet, we do this by keeping a packet
* until the next one arrives, then we release it
*/
-void
-parser_compute_duration(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
+static void
+parser_compute_duration(th_transport_t *t, th_stream_t *st, th_pktref_t *pr)
{
- th_pkt_t *next;
+ th_pktref_t *next;
int64_t d;
- TAILQ_INSERT_TAIL(&st->st_durationq, pkt, pkt_queue_link);
+ TAILQ_INSERT_TAIL(&st->st_durationq, pr, pr_link);
- pkt = TAILQ_FIRST(&st->st_durationq);
- if((next = TAILQ_NEXT(pkt, pkt_queue_link)) == NULL)
+ pr = TAILQ_FIRST(&st->st_durationq);
+ if((next = TAILQ_NEXT(pr, pr_link)) == NULL)
return;
- d = next->pkt_dts - pkt->pkt_dts;
- TAILQ_REMOVE(&st->st_durationq, pkt, pkt_queue_link);
+ d = next->pr_pkt->pkt_dts - pr->pr_pkt->pkt_dts;
+ TAILQ_REMOVE(&st->st_durationq, pr, pr_link);
if(d < 10) {
- pkt_deref(pkt);
- return;
+ pkt_ref_dec(pr->pr_pkt);
+ } else {
+ pr->pr_pkt->pkt_duration = d;
+ parser_deliver(t, st, pr->pr_pkt);
}
-
- pkt->pkt_duration = d;
-
- parser_deliver(t, st, pkt);
+ free(pr);
}
@@ -788,7 +803,6 @@ parser_compute_duration(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
static void
parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
{
- th_muxer_t *tm;
int64_t dts, pts, ptsoff;
assert(pkt->pkt_dts != AV_NOPTS_VALUE);
@@ -796,7 +810,7 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
assert(pkt->pkt_duration > 10);
if(t->tht_dts_start == AV_NOPTS_VALUE) {
- pkt_deref(pkt);
+ pkt_ref_dec(pkt);
return;
}
@@ -812,7 +826,7 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
if(st->st_last_dts == AV_NOPTS_VALUE) {
if(dts < 0) {
/* Early packet with negative time stamp, drop those */
- pkt_deref(pkt);
+ pkt_ref_dec(pkt);
return;
}
} else if(dts + st->st_dts_epoch < st->st_last_dts - (1LL << 24)) {
@@ -854,14 +868,14 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
/* Alert all muxers tied to us that a new packet has arrived */
lock_assert(&t->tht_stream_mutex);
-
+#if 0
LIST_FOREACH(tm, &t->tht_muxers, tm_transport_link)
tm->tm_new_pkt(tm, st, pkt);
-
- /* Unref (and possibly free) the packet, muxers are supposed
+#endif
+ /* Unref (and possibly free) the packet, downstream code is supposed
to increase refcount or copy packet if they need anything */
- pkt_deref(pkt);
+ pkt_ref_dec(pkt);
}
/**
@@ -917,7 +931,7 @@ parser_enqueue_packet(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
}
if(err) {
- pkt_deref(pkt);
+ pkt_ref_dec(pkt);
return;
}
diff --git a/parsers.h b/parsers.h
index 6b610e14..245833d3 100644
--- a/parsers.h
+++ b/parsers.h
@@ -19,14 +19,11 @@
#ifndef PARSERS_H
#define PARSERS_H
+#include "packet.h"
+
void parse_raw_mpeg(th_transport_t *t, th_stream_t *st, uint8_t *data,
int len, int start, int err);
-void parser_compute_duration(th_transport_t *t, th_stream_t *st,
- th_pkt_t *pkt);
-
-void parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
-
void parser_enqueue_packet(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
extern const unsigned int mpeg2video_framedurations[16];
diff --git a/psi.c b/psi.c
index b25046e4..0459585f 100644
--- a/psi.c
+++ b/psi.c
@@ -306,7 +306,7 @@ psi_parse_pmt(th_transport_t *t, uint8_t *ptr, int len, int chksvcid)
/**
* PAT generator
*/
-
+#if 0
int
psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid)
{
@@ -401,7 +401,7 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid)
return psi_append_crc32(buf0, tlen, maxlen);
}
-
+#endif
diff --git a/psi.h b/psi.h
index 519a4c8e..f1af665e 100644
--- a/psi.h
+++ b/psi.h
@@ -41,7 +41,7 @@ uint32_t psi_crc32(uint8_t *data, size_t datalen);
int psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen, int pmtpid);
-int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid);
+//int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid);
const char *psi_caid2name(uint16_t caid);
diff --git a/pvr.h b/pvr.h
index 79a32779..aceeb74f 100644
--- a/pvr.h
+++ b/pvr.h
@@ -68,7 +68,7 @@ typedef struct pvr_rec {
pthread_t pvrr_ptid;
//dtimer_t pvrr_timer;
- th_ffmuxer_t pvrr_tffm;
+ th_ffmuxer_t pvrr_tffm;
int64_t pvrr_dts_offset;
diff --git a/transports.c b/transports.c
index 86a88f97..ba20c565 100644
--- a/transports.c
+++ b/transports.c
@@ -38,7 +38,7 @@
#include "v4l.h"
#include "psi.h"
-#include "buffer.h"
+#include "packet.h"
#include "channels.h"
#include "cwc.h"
#include "notify.h"
@@ -64,7 +64,6 @@ transport_stop(th_transport_t *t)
{
th_descrambler_t *td;
th_stream_t *st;
- th_pkt_t *pkt;
gtimer_disarm(&t->tht_receive_timer);
@@ -110,35 +109,21 @@ transport_stop(th_transport_t *t)
st->st_startcode = 0;
if(st->st_curpkt != NULL) {
- pkt_deref(st->st_curpkt);
+ pkt_ref_dec(st->st_curpkt);
st->st_curpkt = NULL;
}
/* Clear PTS queue */
- while((pkt = TAILQ_FIRST(&st->st_ptsq)) != NULL) {
- TAILQ_REMOVE(&st->st_ptsq, pkt, pkt_queue_link);
- assert(pkt->pkt_refcount == 1);
- pkt_deref(pkt);
- }
+ pktref_clear_queue(&st->st_ptsq);
st->st_ptsq_len = 0;
/* Clear durationq */
- while((pkt = TAILQ_FIRST(&st->st_durationq)) != NULL) {
- TAILQ_REMOVE(&st->st_durationq, pkt, pkt_queue_link);
- assert(pkt->pkt_refcount == 1);
- pkt_deref(pkt);
- }
-
- /* Flush framestore */
-
- while((pkt = TAILQ_FIRST(&st->st_pktq)) != NULL)
- pkt_unstore(st, pkt);
-
+ pktref_clear_queue(&st->st_durationq);
}
- pthread_mutex_unlock(&t->tht_stream_mutex);
+ pthread_mutex_unlock(&t->tht_stream_mutex);
}
/**
@@ -175,7 +160,7 @@ transport_remove_subscriber(th_transport_t *t, th_subscription_t *s)
transport_stop(t);
}
-
+#if 0
/**
*
*/
@@ -214,6 +199,7 @@ transport_unlink_muxer(th_muxer_t *tm)
pthread_mutex_unlock(&t->tht_stream_mutex);
tm->tm_transport = NULL;
}
+#endif
/**
@@ -546,7 +532,6 @@ transport_add_stream(th_transport_t *t, int pid, tv_streamtype_t type)
TAILQ_INIT(&st->st_ptsq);
TAILQ_INIT(&st->st_durationq);
- TAILQ_INIT(&st->st_pktq);
avgstat_init(&st->st_rate, 10);
avgstat_init(&st->st_cc_errors, 10);
diff --git a/transports.h b/transports.h
index 1069592a..910aee2a 100644
--- a/transports.h
+++ b/transports.h
@@ -58,8 +58,8 @@ const char *transport_status_to_text(int status);
void transport_remove_subscriber(th_transport_t *t, th_subscription_t *s);
-void transport_link_muxer(th_transport_t *t, th_muxer_t *tm);
+//void transport_link_muxer(th_transport_t *t, th_muxer_t *tm);
-void transport_unlink_muxer(th_muxer_t *tm);
+//void transport_unlink_muxer(th_muxer_t *tm);
#endif /* TRANSPORTS_H */
diff --git a/tsdemux.c b/tsdemux.c
index 04269aa0..5998038e 100644
--- a/tsdemux.c
+++ b/tsdemux.c
@@ -40,7 +40,6 @@
#include "transports.h"
#include "subscriptions.h"
#include "psi.h"
-#include "buffer.h"
#include "tsdemux.h"
#include "parsers.h"
diff --git a/tvhead.h b/tvhead.h
index 3db9bdbb..e378be74 100644
--- a/tvhead.h
+++ b/tvhead.h
@@ -99,14 +99,13 @@ RB_HEAD(th_transport_tree, th_transport);
TAILQ_HEAD(th_transport_queue, th_transport);
RB_HEAD(th_dvb_mux_instance_tree, th_dvb_mux_instance);
LIST_HEAD(th_stream_list, th_stream);
-TAILQ_HEAD(th_pkt_queue, th_pkt);
-LIST_HEAD(th_pkt_list, th_pkt);
LIST_HEAD(th_muxer_list, th_muxer);
LIST_HEAD(th_muxstream_list, th_muxstream);
LIST_HEAD(th_descrambler_list, th_descrambler);
TAILQ_HEAD(th_refpkt_queue, th_refpkt);
TAILQ_HEAD(th_muxpkt_queue, th_muxpkt);
LIST_HEAD(autorec_list, autorec);
+TAILQ_HEAD(th_pktref_queue, th_pktref);
extern time_t dispatch_clock;
extern int startupcounter;
@@ -326,22 +325,14 @@ typedef struct th_stream {
struct AVCodecContext *st_ctx;
struct AVCodecParserContext *st_parser;
- /* All packets currently hanging on to us */
-
- struct th_pkt_list st_packets;
-
/* Temporary frame store for calculating PTS */
- struct th_pkt_queue st_ptsq;
+ struct th_pktref_queue st_ptsq;
int st_ptsq_len;
/* Temporary frame store for calculating duration */
- struct th_pkt_queue st_durationq;
-
- /* Final frame store */
-
- struct th_pkt_queue st_pktq;
+ struct th_pktref_queue st_durationq;
/* ca id for this stream */
@@ -683,53 +674,7 @@ typedef struct th_transport {
#define tht_file_input u.file_input.file_input
-/*
- * Storage
- */
-typedef struct th_storage {
- unsigned int ts_offset;
- unsigned int ts_refcount;
- int ts_fd;
- char *ts_filename;
-} th_storage_t;
-
-/*
- * A packet
- */
-
-#define PKT_I_FRAME 1
-#define PKT_P_FRAME 2
-#define PKT_B_FRAME 3
-
-
-typedef struct th_pkt {
- TAILQ_ENTRY(th_pkt) pkt_queue_link;
- uint8_t pkt_on_stream_queue;
- uint8_t pkt_frametype;
- uint8_t pkt_commercial;
-
- int64_t pkt_dts;
- int64_t pkt_pts;
- int pkt_duration;
- int pkt_refcount;
-
- th_storage_t *pkt_storage;
- TAILQ_ENTRY(th_pkt) pkt_disk_link;
- int pkt_storage_offset;
-
- uint8_t *pkt_payload;
- int pkt_payloadlen;
- TAILQ_ENTRY(th_pkt) pkt_mem_link;
-} th_pkt_t;
-
-/**
- * Referenced packets
- */
-typedef struct th_refpkt {
- TAILQ_ENTRY(th_refpkt) trp_link;
- th_pkt_t *trp_pkt;
-} th_refpkt_t;
-
+#if 0
/**
* Muxed packets
@@ -866,7 +811,7 @@ typedef struct th_ffmuxer {
struct AVFormatContext *tffm_avfctx;
} th_ffmuxer_t;
-
+#endif
/*