Rewrite packet handling to be more flexible

This commit is contained in:
Andreas Öman 2008-09-17 19:29:25 +00:00
parent 8d10a0352f
commit f90324f925
15 changed files with 237 additions and 542 deletions

View file

@ -2,10 +2,12 @@
SRCS = main.c access.c dtable.c tcp.c http.c notify.c epg.c xmltv.c spawn.c
SRCS += packet.c
VPATH += dvr
SRCS += dvr_db.c
SRCS += buffer.c channels.c subscriptions.c transports.c
SRCS += channels.c subscriptions.c transports.c
SRCS += psi.c parsers.c parser_h264.c tsdemux.c bitstream.c

364
buffer.c
View file

@ -1,364 +0,0 @@
/*
* Packet / Buffer management
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _XOPEN_SOURCE 500
#include <unistd.h>
#include <pthread.h>
#include <syslog.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tvhead.h"
#include "buffer.h"
int64_t store_mem_size;
int64_t store_mem_size_max;
int64_t store_disk_size;
int64_t store_disk_size_max;
int store_packets;
static struct th_pkt_queue store_mem_queue;
static struct th_pkt_queue store_disk_queue;
static int store_chunk_size;
static const char *store_path;
static th_storage_t *curstore;
static int store_tally;
static void storage_wipe(void);
static void storage_mem_enq(th_stream_t *st, th_pkt_t *pkt);
static void storage_disk_enq(th_stream_t *st, th_pkt_t *pkt);
static void storage_deref(th_storage_t *s);
/*
*
*/
void
pkt_init(void)
{
store_path = NULL;
if(store_path != NULL)
storage_wipe();
TAILQ_INIT(&store_mem_queue);
TAILQ_INIT(&store_disk_queue);
store_mem_size_max = 1024 * 1024 * 10ULL;
store_disk_size_max = 1024 * 1024 * 4000ULL;
store_chunk_size = store_disk_size_max / 32;
}
/*
*
*/
static void
pkt_free(th_pkt_t *pkt)
{
assert(pkt->pkt_storage == NULL);
free(pkt->pkt_payload);
memset(pkt, 0xff, sizeof(th_pkt_t));
store_packets--;
free(pkt);
}
/*
*
*/
void
pkt_deref(th_pkt_t *pkt)
{
assert(pkt->pkt_refcount > 0);
if(pkt->pkt_refcount > 1) {
pkt->pkt_refcount--;
return;
}
pkt_free(pkt);
}
/*
*
*/
th_pkt_t *
pkt_ref(th_pkt_t *pkt)
{
pkt->pkt_refcount++;
return pkt;
}
/*
*
*/
th_pkt_t *
pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts)
{
th_pkt_t *pkt;
pkt = calloc(1, sizeof(th_pkt_t));
pkt->pkt_payloadlen = datalen;
if(datalen > 0) {
pkt->pkt_payload = malloc(datalen + FF_INPUT_BUFFER_PADDING_SIZE);
if(data != NULL)
memcpy(pkt->pkt_payload, data, datalen);
}
pkt->pkt_dts = dts;
pkt->pkt_pts = pts;
pkt->pkt_refcount = 1;
store_packets++;
return pkt;
}
/*
*
*/
th_pkt_t *
pkt_copy(th_stream_t *st, th_pkt_t *orig)
{
th_pkt_t *pkt;
pkt_load(st, orig);
if(orig->pkt_payload == NULL)
return NULL;
pkt = malloc(sizeof(th_pkt_t));
memcpy(pkt, orig, sizeof(th_pkt_t));
pkt->pkt_payload = malloc(pkt->pkt_payloadlen +
FF_INPUT_BUFFER_PADDING_SIZE);
memcpy(pkt->pkt_payload, orig->pkt_payload, pkt->pkt_payloadlen);
pkt->pkt_on_stream_queue = 0;
pkt->pkt_storage = NULL;
pkt->pkt_refcount = 1;
return pkt;
}
/*
*
*/
void
pkt_store(th_stream_t *st, th_pkt_t *pkt)
{
if(pkt->pkt_on_stream_queue)
return;
pkt->pkt_on_stream_queue = 1;
pkt->pkt_refcount++;
TAILQ_INSERT_TAIL(&st->st_pktq, pkt, pkt_queue_link);
/* Persistent buffer management */
storage_mem_enq(st, pkt);
storage_disk_enq(st, pkt);
if(pkt->pkt_storage)
pwrite(pkt->pkt_storage->ts_fd, pkt->pkt_payload, pkt->pkt_payloadlen,
pkt->pkt_storage_offset);
}
/*
* Force flush of a packet
*/
void
pkt_unstore(th_stream_t *st, th_pkt_t *pkt)
{
assert(pkt->pkt_on_stream_queue == 1);
TAILQ_REMOVE(&st->st_pktq, pkt, pkt_queue_link);
pkt->pkt_on_stream_queue = 0;
if(pkt->pkt_storage != NULL) {
storage_deref(pkt->pkt_storage);
TAILQ_REMOVE(&store_disk_queue, pkt, pkt_disk_link);
store_disk_size -= pkt->pkt_payloadlen;
pkt->pkt_storage = NULL;
}
if(pkt->pkt_payload != NULL) {
TAILQ_REMOVE(&store_mem_queue, pkt, pkt_mem_link);
store_mem_size -= pkt->pkt_payloadlen;
}
pkt_deref(pkt);
}
/*
*
*/
int
pkt_load(th_stream_t *st, th_pkt_t *pkt)
{
if(pkt->pkt_payload == NULL && pkt->pkt_storage != NULL) {
pkt->pkt_payload = malloc(pkt->pkt_payloadlen +
FF_INPUT_BUFFER_PADDING_SIZE);
pread(pkt->pkt_storage->ts_fd, pkt->pkt_payload, pkt->pkt_payloadlen,
pkt->pkt_storage_offset);
storage_mem_enq(st, pkt);
}
return pkt->pkt_payload == NULL ? -1 : 0;
}
/*
*
*/
static void
storage_deref(th_storage_t *s)
{
if(s->ts_refcount > 1) {
s->ts_refcount--;
return;
}
if(curstore == s)
curstore = NULL;
close(s->ts_fd);
unlink(s->ts_filename);
free(s->ts_filename);
free(s);
}
/*
*
*/
static void
storage_mem_enq(th_stream_t *st, th_pkt_t *pkt)
{
TAILQ_INSERT_TAIL(&store_mem_queue, pkt, pkt_mem_link);
store_mem_size += pkt->pkt_payloadlen;
while(store_mem_size >= store_mem_size_max) {
pkt = TAILQ_FIRST(&store_mem_queue);
TAILQ_REMOVE(&store_mem_queue, pkt, pkt_mem_link);
store_mem_size -= pkt->pkt_payloadlen;
free(pkt->pkt_payload);
pkt->pkt_payload = NULL;
if(pkt->pkt_storage == NULL)
pkt_unstore(st, pkt);
}
}
/*
*
*/
static void
storage_disk_enq(th_stream_t *st, th_pkt_t *pkt)
{
th_storage_t *s;
char fbuf[500];
int fd;
if(curstore == NULL) {
snprintf(fbuf, sizeof(fbuf), "%s/s%d", store_path, ++store_tally);
fd = open(fbuf, O_RDWR | O_CREAT | O_TRUNC, 0644);
if(fd == -1) {
s = NULL;
} else {
s = calloc(1, sizeof(th_storage_t));
s->ts_fd = fd;
s->ts_filename = strdup(fbuf);
}
curstore = s;
} else {
s = curstore;
}
if(s != NULL) {
TAILQ_INSERT_TAIL(&store_disk_queue, pkt, pkt_disk_link);
store_disk_size += pkt->pkt_payloadlen;
s->ts_refcount++;
pkt->pkt_storage = s;
pkt->pkt_storage_offset = s->ts_offset;
s->ts_offset += pkt->pkt_payloadlen;
if(s->ts_offset > store_chunk_size)
curstore = NULL;
}
while(store_disk_size > store_disk_size_max) {
pkt = TAILQ_FIRST(&store_disk_queue);
if(pkt->pkt_refcount > 1)
printf("UNSTORE of reference packet %p\n", pkt);
pkt_unstore(st, pkt);
}
}
/**
* Erase all old files
*/
static void
storage_wipe(void)
{
DIR *dir;
struct dirent *d;
char fbuf[500];
if((dir = opendir(store_path)) != NULL) {
while((d = readdir(dir)) != NULL) {
if(d->d_name[0] == '.')
continue;
snprintf(fbuf, sizeof(fbuf), "%s/%s", store_path, d->d_name);
unlink(fbuf);
}
}
closedir(dir);
}

View file

@ -36,7 +36,6 @@
#include "channels.h"
#include "transports.h"
#include "epg.h"
#include "pvr.h"
#include "autorec.h"
#include "xmltv.h"
#include "dtable.h"

106
packet.c Normal file
View file

@ -0,0 +1,106 @@
/**
* Packet management
* Copyright (C) 2008 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tvhead.h"
#include "packet.h"
#include "string.h"
static pthread_mutex_t refmutex = PTHREAD_MUTEX_INITIALIZER;
/*
*
*/
static void
pkt_destroy(th_pkt_t *pkt)
{
free(pkt->pkt_payload);
free(pkt);
}
/**
* Allocate a new packet and give it a refcount of one (which caller is
* suppoed to take care of)
*/
th_pkt_t *
pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts)
{
th_pkt_t *pkt;
pkt = calloc(1, sizeof(th_pkt_t));
pkt->pkt_payloadlen = datalen;
if(datalen > 0) {
pkt->pkt_payload = malloc(datalen + FF_INPUT_BUFFER_PADDING_SIZE);
if(data != NULL)
memcpy(pkt->pkt_payload, data, datalen);
}
pkt->pkt_dts = dts;
pkt->pkt_pts = pts;
pkt->pkt_refcount = 1;
return pkt;
}
/**
*
*/
void
pkt_ref_dec(th_pkt_t *pkt)
{
if(pkt->pkt_refcount == 1) {
pkt_destroy(pkt);
return;
}
/* Use atomic decrease */
pthread_mutex_lock(&refmutex);
pkt->pkt_refcount--;
pthread_mutex_unlock(&refmutex);
}
/**
*
*/
void
pkt_ref_inc(th_pkt_t *pkt)
{
/* Use atomic increase */
pthread_mutex_lock(&refmutex);
pkt->pkt_refcount++;
pthread_mutex_unlock(&refmutex);
}
/**
*
*/
void
pktref_clear_queue(struct th_pktref_queue *q)
{
th_pktref_t *pr;
while((pr = TAILQ_FIRST(q)) != NULL) {
TAILQ_REMOVE(q, pr, pr_link);
pkt_ref_dec(pr->pr_pkt);
free(pr);
}
}

View file

@ -1,6 +1,6 @@
/*
* Packet / Buffer management
* Copyright (C) 2007 Andreas Öman
* Packet nanagement
* Copyright (C) 2008 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -16,37 +16,50 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef BUFFER_H_
#define BUFFER_H_
#ifndef PACKET_H_
#define PACKET_H_
th_pkt_t *pkt_ref(th_pkt_t *pkt);
void pkt_deref(th_pkt_t *pkt);
/**
* Packets
*/
#define PKT_I_FRAME 1
#define PKT_P_FRAME 2
#define PKT_B_FRAME 3
void pkt_init(void);
typedef struct th_pkt {
int64_t pkt_dts;
int64_t pkt_pts;
int pkt_duration;
int pkt_refcount;
uint8_t pkt_frametype;
uint8_t pkt_commercial;
uint8_t *pkt_payload;
int pkt_payloadlen;
} th_pkt_t;
/**
* A packet reference
*/
typedef struct th_pktref {
TAILQ_ENTRY(th_pktref) pr_link;
th_pkt_t *pr_pkt;
} th_pktref_t;
/**
*
*/
void pkt_ref_dec(th_pkt_t *pkt);
void pkt_ref_inc(th_pkt_t *pkt);
void pktref_clear_queue(struct th_pktref_queue *q);
th_pkt_t *pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts);
th_pkt_t *pkt_copy(th_stream_t *st, th_pkt_t *pkt);
void pkt_store(th_stream_t *st, th_pkt_t *pkt);
void pkt_unstore(th_stream_t *st, th_pkt_t *pkt);
int pkt_load(th_stream_t *st, th_pkt_t *pkt);
void *pkt_payload(th_pkt_t *pkt);
size_t pkt_len(th_pkt_t *pkt);
extern int64_t store_mem_size;
extern int64_t store_mem_size_max;
extern int64_t store_disk_size;
extern int64_t store_disk_size_max;
extern int store_packets;
#define pkt_payload(pkt) ((pkt)->pkt_payload)
#define pkt_len(pkt) ((pkt)->pkt_payloadlen)
#endif /* BUFFER_H_ */
#endif /* PACKET_H_ */

View file

@ -31,7 +31,6 @@
#include "tvhead.h"
#include "parsers.h"
#include "parser_h264.h"
#include "buffer.h"
#include "bitstream.h"
/**

116
parsers.c
View file

@ -29,7 +29,7 @@
#include "parsers.h"
#include "parser_h264.h"
#include "bitstream.h"
#include "buffer.h"
#include "packet.h"
#include "transports.h"
static const AVRational mpeg_tc = {1, 90000};
@ -85,15 +85,16 @@ static void parse_mpegaudio(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
static void parse_ac3(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
void parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
static void parse_compute_pts(th_transport_t *t, th_stream_t *st,
th_pkt_t *pkt);
static void parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
static int parse_pes_header(th_transport_t *t, th_stream_t *st, uint8_t *buf,
size_t len);
void parser_compute_duration(th_transport_t *t, th_stream_t *st,
th_pkt_t *pkt);
static void parser_compute_duration(th_transport_t *t, th_stream_t *st,
th_pktref_t *pr);
/**
* Parse raw mpeg data
@ -296,7 +297,7 @@ parse_mpegaudio(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
sample_rate = mpegaudio_freq_tab[(header >> 10) & 3];
if(sample_rate == 0) {
pkt_deref(pkt);
pkt_ref_dec(pkt);
return;
}
@ -370,7 +371,7 @@ parse_ac3(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
sample_rate = ac3_freq_tab[src] >> bsid;
if(sample_rate == 0) {
pkt_deref(pkt);
pkt_ref_dec(pkt);
return;
}
@ -534,7 +535,7 @@ parse_mpeg2video(th_transport_t *t, th_stream_t *st, size_t len,
return 1;
if(st->st_curpkt != NULL)
pkt_deref(st->st_curpkt);
pkt_ref_dec(st->st_curpkt);
st->st_curpkt = pkt_alloc(NULL, 0, st->st_curpts, st->st_curdts);
st->st_curpkt->pkt_frametype = pkt0.pkt_frametype;
@ -685,6 +686,7 @@ parse_h264(th_transport_t *t, th_stream_t *st, size_t len,
st->st_curpkt->pkt_payloadlen = st->st_buffer_ptr;
parser_deliver(t, st, st->st_curpkt);
pkt_ref_dec(st->st_curpkt);
st->st_curpkt = NULL;
st->st_buffer = malloc(st->st_buffer_size);
return 1;
@ -700,25 +702,36 @@ parse_h264(th_transport_t *t, th_stream_t *st, size_t len,
* We do this by placing packets on a queue and wait for next I/P
* frame to appear
*/
void
static void
parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
{
th_pkt_t *p;
if(pkt->pkt_pts != AV_NOPTS_VALUE && st->st_ptsq_len == 0) {
/* PTS known and no other packets in queue, deliver at once */
th_pktref_t *pr;
if(pkt->pkt_duration == 0)
parser_compute_duration(t, st, pkt);
else
parser_deliver(t, st, pkt);
return;
}
int validpts = pkt->pkt_pts != AV_NOPTS_VALUE && st->st_ptsq_len == 0;
TAILQ_INSERT_TAIL(&st->st_ptsq, pkt, pkt_queue_link);
/* PTS known and no other packets in queue, deliver at once */
if(validpts && pkt->pkt_duration)
return parser_deliver(t, st, pkt);
/* Reference count is transfered to queue */
pr = malloc(sizeof(th_pktref_t));
pr->pr_pkt = pkt;
if(validpts)
return parser_compute_duration(t, st, pr);
TAILQ_INSERT_TAIL(&st->st_ptsq, pr, pr_link);
st->st_ptsq_len++;
while((pkt = TAILQ_FIRST(&st->st_ptsq)) != NULL) {
/* */
while((pr = TAILQ_FIRST(&st->st_ptsq)) != NULL) {
pkt = pr->pr_pkt;
switch(pkt->pkt_frametype) {
case PKT_B_FRAME:
/* B-frames have same PTS as DTS, pass them on */
@ -729,26 +742,29 @@ parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
case PKT_P_FRAME:
/* Presentation occures at DTS of next I or P frame,
try to find it */
p = TAILQ_NEXT(pkt, pkt_queue_link);
pr = TAILQ_NEXT(pr, pr_link);
while(1) {
if(p == NULL)
if(pr == NULL)
return; /* not arrived yet, wait */
if(p->pkt_frametype <= PKT_P_FRAME) {
pkt->pkt_pts = p->pkt_dts;
if(pr->pr_pkt->pkt_frametype <= PKT_P_FRAME) {
pkt->pkt_pts = pr->pr_pkt->pkt_dts;
break;
}
p = TAILQ_NEXT(p, pkt_queue_link);
pr = TAILQ_NEXT(pr, pr_link);
}
break;
}
TAILQ_REMOVE(&st->st_ptsq, pkt, pkt_queue_link);
pr = TAILQ_FIRST(&st->st_ptsq);
TAILQ_REMOVE(&st->st_ptsq, pr, pr_link);
st->st_ptsq_len--;
if(pkt->pkt_duration == 0)
parser_compute_duration(t, st, pkt);
else
if(pkt->pkt_duration == 0) {
parser_compute_duration(t, st, pr);
} else {
parser_deliver(t, st, pkt);
free(pr);
}
}
}
@ -756,28 +772,27 @@ parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
* Compute duration of a packet, we do this by keeping a packet
* until the next one arrives, then we release it
*/
void
parser_compute_duration(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
static void
parser_compute_duration(th_transport_t *t, th_stream_t *st, th_pktref_t *pr)
{
th_pkt_t *next;
th_pktref_t *next;
int64_t d;
TAILQ_INSERT_TAIL(&st->st_durationq, pkt, pkt_queue_link);
TAILQ_INSERT_TAIL(&st->st_durationq, pr, pr_link);
pkt = TAILQ_FIRST(&st->st_durationq);
if((next = TAILQ_NEXT(pkt, pkt_queue_link)) == NULL)
pr = TAILQ_FIRST(&st->st_durationq);
if((next = TAILQ_NEXT(pr, pr_link)) == NULL)
return;
d = next->pkt_dts - pkt->pkt_dts;
TAILQ_REMOVE(&st->st_durationq, pkt, pkt_queue_link);
d = next->pr_pkt->pkt_dts - pr->pr_pkt->pkt_dts;
TAILQ_REMOVE(&st->st_durationq, pr, pr_link);
if(d < 10) {
pkt_deref(pkt);
return;
pkt_ref_dec(pr->pr_pkt);
} else {
pr->pr_pkt->pkt_duration = d;
parser_deliver(t, st, pr->pr_pkt);
}
pkt->pkt_duration = d;
parser_deliver(t, st, pkt);
free(pr);
}
@ -788,7 +803,6 @@ parser_compute_duration(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
static void
parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
{
th_muxer_t *tm;
int64_t dts, pts, ptsoff;
assert(pkt->pkt_dts != AV_NOPTS_VALUE);
@ -796,7 +810,7 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
assert(pkt->pkt_duration > 10);
if(t->tht_dts_start == AV_NOPTS_VALUE) {
pkt_deref(pkt);
pkt_ref_dec(pkt);
return;
}
@ -812,7 +826,7 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
if(st->st_last_dts == AV_NOPTS_VALUE) {
if(dts < 0) {
/* Early packet with negative time stamp, drop those */
pkt_deref(pkt);
pkt_ref_dec(pkt);
return;
}
} else if(dts + st->st_dts_epoch < st->st_last_dts - (1LL << 24)) {
@ -854,14 +868,14 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
/* Alert all muxers tied to us that a new packet has arrived */
lock_assert(&t->tht_stream_mutex);
#if 0
LIST_FOREACH(tm, &t->tht_muxers, tm_transport_link)
tm->tm_new_pkt(tm, st, pkt);
/* Unref (and possibly free) the packet, muxers are supposed
#endif
/* Unref (and possibly free) the packet, downstream code is supposed
to increase refcount or copy packet if they need anything */
pkt_deref(pkt);
pkt_ref_dec(pkt);
}
/**
@ -917,7 +931,7 @@ parser_enqueue_packet(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
}
if(err) {
pkt_deref(pkt);
pkt_ref_dec(pkt);
return;
}

View file

@ -19,14 +19,11 @@
#ifndef PARSERS_H
#define PARSERS_H
#include "packet.h"
void parse_raw_mpeg(th_transport_t *t, th_stream_t *st, uint8_t *data,
int len, int start, int err);
void parser_compute_duration(th_transport_t *t, th_stream_t *st,
th_pkt_t *pkt);
void parse_compute_pts(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
void parser_enqueue_packet(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt);
extern const unsigned int mpeg2video_framedurations[16];

4
psi.c
View file

@ -306,7 +306,7 @@ psi_parse_pmt(th_transport_t *t, uint8_t *ptr, int len, int chksvcid)
/**
* PAT generator
*/
#if 0
int
psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid)
{
@ -401,7 +401,7 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid)
return psi_append_crc32(buf0, tlen, maxlen);
}
#endif

2
psi.h
View file

@ -41,7 +41,7 @@ uint32_t psi_crc32(uint8_t *data, size_t datalen);
int psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen, int pmtpid);
int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid);
//int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid);
const char *psi_caid2name(uint16_t caid);

2
pvr.h
View file

@ -68,7 +68,7 @@ typedef struct pvr_rec {
pthread_t pvrr_ptid;
//dtimer_t pvrr_timer;
th_ffmuxer_t pvrr_tffm;
th_ffmuxer_t pvrr_tffm;
int64_t pvrr_dts_offset;

View file

@ -38,7 +38,7 @@
#include "v4l.h"
#include "psi.h"
#include "buffer.h"
#include "packet.h"
#include "channels.h"
#include "cwc.h"
#include "notify.h"
@ -64,7 +64,6 @@ transport_stop(th_transport_t *t)
{
th_descrambler_t *td;
th_stream_t *st;
th_pkt_t *pkt;
gtimer_disarm(&t->tht_receive_timer);
@ -110,35 +109,21 @@ transport_stop(th_transport_t *t)
st->st_startcode = 0;
if(st->st_curpkt != NULL) {
pkt_deref(st->st_curpkt);
pkt_ref_dec(st->st_curpkt);
st->st_curpkt = NULL;
}
/* Clear PTS queue */
while((pkt = TAILQ_FIRST(&st->st_ptsq)) != NULL) {
TAILQ_REMOVE(&st->st_ptsq, pkt, pkt_queue_link);
assert(pkt->pkt_refcount == 1);
pkt_deref(pkt);
}
pktref_clear_queue(&st->st_ptsq);
st->st_ptsq_len = 0;
/* Clear durationq */
while((pkt = TAILQ_FIRST(&st->st_durationq)) != NULL) {
TAILQ_REMOVE(&st->st_durationq, pkt, pkt_queue_link);
assert(pkt->pkt_refcount == 1);
pkt_deref(pkt);
}
/* Flush framestore */
while((pkt = TAILQ_FIRST(&st->st_pktq)) != NULL)
pkt_unstore(st, pkt);
pktref_clear_queue(&st->st_durationq);
}
pthread_mutex_unlock(&t->tht_stream_mutex);
pthread_mutex_unlock(&t->tht_stream_mutex);
}
/**
@ -175,7 +160,7 @@ transport_remove_subscriber(th_transport_t *t, th_subscription_t *s)
transport_stop(t);
}
#if 0
/**
*
*/
@ -214,6 +199,7 @@ transport_unlink_muxer(th_muxer_t *tm)
pthread_mutex_unlock(&t->tht_stream_mutex);
tm->tm_transport = NULL;
}
#endif
/**
@ -546,7 +532,6 @@ transport_add_stream(th_transport_t *t, int pid, tv_streamtype_t type)
TAILQ_INIT(&st->st_ptsq);
TAILQ_INIT(&st->st_durationq);
TAILQ_INIT(&st->st_pktq);
avgstat_init(&st->st_rate, 10);
avgstat_init(&st->st_cc_errors, 10);

View file

@ -58,8 +58,8 @@ const char *transport_status_to_text(int status);
void transport_remove_subscriber(th_transport_t *t, th_subscription_t *s);
void transport_link_muxer(th_transport_t *t, th_muxer_t *tm);
//void transport_link_muxer(th_transport_t *t, th_muxer_t *tm);
void transport_unlink_muxer(th_muxer_t *tm);
//void transport_unlink_muxer(th_muxer_t *tm);
#endif /* TRANSPORTS_H */

View file

@ -40,7 +40,6 @@
#include "transports.h"
#include "subscriptions.h"
#include "psi.h"
#include "buffer.h"
#include "tsdemux.h"
#include "parsers.h"

View file

@ -99,14 +99,13 @@ RB_HEAD(th_transport_tree, th_transport);
TAILQ_HEAD(th_transport_queue, th_transport);
RB_HEAD(th_dvb_mux_instance_tree, th_dvb_mux_instance);
LIST_HEAD(th_stream_list, th_stream);
TAILQ_HEAD(th_pkt_queue, th_pkt);
LIST_HEAD(th_pkt_list, th_pkt);
LIST_HEAD(th_muxer_list, th_muxer);
LIST_HEAD(th_muxstream_list, th_muxstream);
LIST_HEAD(th_descrambler_list, th_descrambler);
TAILQ_HEAD(th_refpkt_queue, th_refpkt);
TAILQ_HEAD(th_muxpkt_queue, th_muxpkt);
LIST_HEAD(autorec_list, autorec);
TAILQ_HEAD(th_pktref_queue, th_pktref);
extern time_t dispatch_clock;
extern int startupcounter;
@ -326,22 +325,14 @@ typedef struct th_stream {
struct AVCodecContext *st_ctx;
struct AVCodecParserContext *st_parser;
/* All packets currently hanging on to us */
struct th_pkt_list st_packets;
/* Temporary frame store for calculating PTS */
struct th_pkt_queue st_ptsq;
struct th_pktref_queue st_ptsq;
int st_ptsq_len;
/* Temporary frame store for calculating duration */
struct th_pkt_queue st_durationq;
/* Final frame store */
struct th_pkt_queue st_pktq;
struct th_pktref_queue st_durationq;
/* ca id for this stream */
@ -683,53 +674,7 @@ typedef struct th_transport {
#define tht_file_input u.file_input.file_input
/*
* Storage
*/
typedef struct th_storage {
unsigned int ts_offset;
unsigned int ts_refcount;
int ts_fd;
char *ts_filename;
} th_storage_t;
/*
* A packet
*/
#define PKT_I_FRAME 1
#define PKT_P_FRAME 2
#define PKT_B_FRAME 3
typedef struct th_pkt {
TAILQ_ENTRY(th_pkt) pkt_queue_link;
uint8_t pkt_on_stream_queue;
uint8_t pkt_frametype;
uint8_t pkt_commercial;
int64_t pkt_dts;
int64_t pkt_pts;
int pkt_duration;
int pkt_refcount;
th_storage_t *pkt_storage;
TAILQ_ENTRY(th_pkt) pkt_disk_link;
int pkt_storage_offset;
uint8_t *pkt_payload;
int pkt_payloadlen;
TAILQ_ENTRY(th_pkt) pkt_mem_link;
} th_pkt_t;
/**
* Referenced packets
*/
typedef struct th_refpkt {
TAILQ_ENTRY(th_refpkt) trp_link;
th_pkt_t *trp_pkt;
} th_refpkt_t;
#if 0
/**
* Muxed packets
@ -866,7 +811,7 @@ typedef struct th_ffmuxer {
struct AVFormatContext *tffm_avfctx;
} th_ffmuxer_t;
#endif
/*