Split streaming control from transport stream muxer.

While at it, improve the TS muxer quite a bit. Not perfect yet, but much better.
This commit is contained in:
Andreas Öman 2008-01-26 12:15:34 +00:00
parent 08c24d441c
commit aba7e9b779
15 changed files with 840 additions and 967 deletions

View file

@ -1,8 +1,8 @@
-include ../config.mak
SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c \
subscriptions.c tsmux.c tsdemux.c pes.c buffer.c tcp.c \
resolver.c
subscriptions.c mux.c tsdemux.c pes.c buffer.c tcp.c \
resolver.c tsmux.c
SRCS += http.c htmlui.c

View file

@ -913,8 +913,6 @@ page_status(http_connection_t *hc, const char *remain, void *opaque)
th_dvb_mux_instance_t *tdmi;
th_stream_t *st;
const char *txt, *t1, *t2;
th_muxer_t *tm;
th_muxstream_t *tms;
char tmptxt[100];
if(!html_verify_access(hc, "system-status"))
@ -1204,27 +1202,8 @@ page_status(http_connection_t *hc, const char *remain, void *opaque)
tcp_qprintf(&tq,
"Using transport \"%s\"<br>",
t->tht_name);
if((tm = s->ths_muxer) != NULL) {
int64_t i64min = INT64_MAX;
int64_t i64max = INT64_MIN;
LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) {
if(tms->tms_curpkt == NULL)
continue; /* stream is currently stale */
if(tms->tms_nextblock < i64min)
i64min = tms->tms_nextblock;
if(tms->tms_nextblock > i64max)
i64max = tms->tms_nextblock;
}
tcp_qprintf(&tq,
"Internal stream delta: %lld us<br>",
i64max - i64min);
}
}
}
tcp_qprintf(&tq, "</div>");
box_bottom(&tq);
tcp_qprintf(&tq, "<br>");

View file

@ -410,7 +410,7 @@ client_subscription_callback(struct th_subscription *s,
case TRANSPORT_AVAILABLE:
assert(c->c_muxer == NULL);
c->c_muxer = ts_muxer_init(s, client_output_ts, c,
TM_HTSCLIENTMODE | TM_SEEKABLE);
TS_HTSCLIENT | TS_SEEK);
ts_muxer_play(c->c_muxer, 0);
break;

View file

@ -40,7 +40,7 @@
#define MULTICAST_PKT_SIZ (188 * 7)
typedef struct output_multicast {
th_muxer_t *om_muxer;
ts_muxer_t *om_muxer;
int om_fd;
struct sockaddr_in om_dst;
@ -61,8 +61,8 @@ typedef struct output_multicast {
* Output MPEG TS
*/
void
iptv_output_ts(void *opaque, th_subscription_t *s,
uint8_t *pkt, int blocks, int64_t pcr)
iptv_output_ts(void *opaque, th_subscription_t *s, uint8_t *pkt,
int blocks, int64_t pcr)
{
output_multicast_t *om = opaque;
@ -101,8 +101,8 @@ iptv_subscription_callback(struct th_subscription *s,
switch(event) {
case TRANSPORT_AVAILABLE:
assert(om->om_muxer == NULL);
om->om_muxer = ts_muxer_init(s, iptv_output_ts, om, 0);
om->om_muxer->tm_drop_rate = om->om_intra_drop_rate;
om->om_muxer = ts_muxer_init(s, iptv_output_ts, om, TS_SEEK);
// om->om_muxer->tm_drop_rate = om->om_intra_drop_rate;
ts_muxer_play(om->om_muxer, 0);
break;

199
mux.c Normal file
View file

@ -0,0 +1,199 @@
/*
* tvheadend, Muxing
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _GNU_SOURCE
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <libhts/htscfg.h>
#include "tvhead.h"
#include "dispatch.h"
#include "teletext.h"
#include "transports.h"
#include "subscriptions.h"
#include "pes.h"
#include "psi.h"
#include "buffer.h"
/*
* pause playback
*/
void
muxer_pause(th_muxer_t *tm)
{
}
/*
* playback start
*/
void
muxer_play(th_muxer_t *tm, int64_t toffset)
{
th_subscription_t *s = tm->tm_subscription;
if(!tm->tm_linked)
LIST_INSERT_HEAD(&s->ths_transport->tht_muxers, tm, tm_transport_link);
if(toffset == AV_NOPTS_VALUE) {
/* continue from last playback */
tm->tm_offset = 0;
} else {
tm->tm_offset = toffset;
}
tm->tm_status = TM_PLAY;
}
/**
*
*/
static void
mux_new_packet_for_stream(th_muxer_t *tm, th_muxstream_t *tms, th_pkt_t *pkt)
{
if(tm->tm_offset == 0) {
/* Direct playback, pass it on at once */
tm->tm_output(tm->tm_opaque, tms, pkt);
return;
}
}
/**
*
*/
static void
mux_new_packet(th_muxer_t *tm, th_stream_t *st, th_pkt_t *pkt)
{
th_muxstream_t *tms;
pkt_store(pkt); /* need to keep packet around */
switch(tm->tm_status) {
case TM_IDLE:
break;
case TM_WAITING_FOR_LOCK:
break;
case TM_PLAY:
LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
if(tms->tms_stream == st) {
mux_new_packet_for_stream(tm, tms, pkt);
break;
}
}
break;
case TM_PAUSE:
break;
}
}
/*
* TS Muxer
*/
th_muxer_t *
muxer_init(th_subscription_t *s, th_mux_output_t *cb, void *opaque,
int flags)
{
th_transport_t *t = s->ths_transport;
th_stream_t *st;
th_muxer_t *tm;
th_muxstream_t *tms;
printf("muxer init!\n");
tm = calloc(1, sizeof(th_muxer_t));
tm->tm_subscription = s;
tm->tm_output = cb;
tm->tm_opaque = opaque;
tm->tm_new_pkt = mux_new_packet;
LIST_FOREACH(st, &t->tht_streams, st_link) {
tms = calloc(1, sizeof(th_muxstream_t));
tms->tms_muxer = tm;
tms->tms_stream = st;
LIST_INSERT_HEAD(&tm->tm_streams, tms, tms_muxer_link0);
TAILQ_INIT(&tms->tms_metaqueue);
}
s->ths_muxer = tm;
return tm;
}
/*
*
*/
static void
tms_destroy(th_muxstream_t *tms)
{
LIST_REMOVE(tms, tms_muxer_link0);
// dtimer_disarm(&tms->tms_timer);
free(tms);
}
/*
*
*/
void
muxer_deinit(th_muxer_t *tm, th_subscription_t *s)
{
th_muxstream_t *tms;
s->ths_raw_input = NULL;
s->ths_muxer = NULL;
if(tm->tm_linked)
LIST_INSERT_HEAD(&s->ths_transport->tht_muxers, tm, tm_transport_link);
while((tms = LIST_FIRST(&tm->tm_streams)) != NULL)
tms_destroy(tms);
free(tm);
}

31
mux.h Normal file
View file

@ -0,0 +1,31 @@
/*
* tvheadend, Stream muxer
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef MUX_H
#define MUX_H
th_muxer_t *muxer_init(th_subscription_t *s, th_mux_output_t *cb,
void *opaque);
void muxer_deinit(th_muxer_t *tm, th_subscription_t *s);
void muxer_play(th_muxer_t *tm, int64_t toffset);
void muxer_pause(th_muxer_t *tm);
#endif /* MUX_H */

25
psi.c
View file

@ -131,7 +131,7 @@ psi_append_crc32(uint8_t *buf, int offset, int maxlen)
*/
int
psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen)
psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen, int pmtpid)
{
if(maxlen < 12)
return -1;
@ -150,8 +150,8 @@ psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen)
buf[8] = 0; /* Program number, we only have one program */
buf[9] = 1;
buf[10] = 0; /* PMT pid */
buf[11] = 100;
buf[10] = 0xe0 | (pmtpid >> 8);
buf[11] = pmtpid;
return psi_append_crc32(buf, 12, maxlen);
}
@ -292,7 +292,7 @@ psi_parse_pmt(th_transport_t *t, uint8_t *ptr, int len, int chksvcid)
*/
int
psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen)
psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid)
{
th_stream_t *st;
th_muxstream_t *tms;
@ -314,19 +314,8 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen)
buf[6] = 0;
buf[7] = 0;
/* Find PID that carries PCR */
LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link)
if(tms->tms_dopcr)
break;
if(tms == NULL) {
buf[8] = 0xff;
buf[9] = 0xff;
} else {
buf[8] = 0xe0 | (tms->tms_index >> 8);
buf[9] = tms->tms_index;
}
buf[8] = 0xe0 | (pcrpid >> 8);
buf[9] = pcrpid;
buf[10] = 0xf0; /* Program info length */
buf[11] = 0x00; /* We dont have any such things atm */
@ -334,7 +323,7 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen)
buf += 12;
tlen = 12;
LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) {
LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
st = tms->tms_stream;
switch(st->st_type) {

4
psi.h
View file

@ -37,9 +37,9 @@ int psi_parse_pmt(th_transport_t *t, uint8_t *ptr, int len, int chksvcid);
uint32_t psi_crc32(uint8_t *data, size_t datalen);
int psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen);
int psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen, int pmtpid);
int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen);
int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid);
const char *psi_caid2name(uint16_t caid);

12
pvr.c
View file

@ -787,7 +787,7 @@ pvrr_transport_available(pvr_rec_t *pvrr, th_transport_t *t)
tms = calloc(1, sizeof(th_muxstream_t));
tms->tms_stream = st;
LIST_INSERT_HEAD(&tm->tm_media_streams, tms, tms_muxer_media_link);
LIST_INSERT_HEAD(&tm->tm_streams, tms, tms_muxer_link0);
tms->tms_avstream = av_mallocz(sizeof(AVStream));
@ -855,8 +855,8 @@ pvrr_transport_unavailable(pvr_rec_t *pvrr, th_transport_t *t)
/* Destroy muxstreams */
while((tms = LIST_FIRST(&tm->tm_media_streams)) != NULL) {
LIST_REMOVE(tms, tms_muxer_media_link);
while((tms = LIST_FIRST(&tm->tm_streams)) != NULL) {
LIST_REMOVE(tms, tms_muxer_link0);
free(tms);
}
@ -965,7 +965,7 @@ is_all_decoded(th_muxer_t *tm, enum CodecType type)
th_muxstream_t *tms;
AVStream *st;
LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link) {
LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
st = tms->tms_avstream;
if(st->codec->codec->type == type && tms->tms_decoded == 0)
return 0;
@ -994,7 +994,7 @@ pvrr_record_packet(pvr_rec_t *pvrr, th_pkt_t *pkt)
char txt[100];
size_t bufsize;
LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link)
LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0)
if(tms->tms_stream == pkt->pkt_stream)
break;
@ -1108,7 +1108,7 @@ pvrr_record_packet(pvr_rec_t *pvrr, th_pkt_t *pkt)
if(pkt->pkt_commercial != COMMERCIAL_YES) {
LIST_FOREACH(tms, &tm->tm_media_streams, tms_muxer_media_link)
LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0)
tms->tms_decoded = 0;
pvrr_set_rec_state(pvrr, PVR_REC_WAIT_AUDIO_LOCK);

2
rtp.c
View file

@ -82,7 +82,7 @@ rtp_sendmsg(uint8_t *pkt, int blocks, int64_t pcr,
void
rtp_output_ts(void *opaque, struct th_subscription *s,
rtp_output_ts(void *opaque, th_subscription_t *s,
uint8_t *pkt, int blocks, int64_t pcr)
{
th_rtp_streamer_t *trs = opaque;

2
rtp.h
View file

@ -29,7 +29,7 @@ typedef struct th_rtp_streamer {
void rtp_streamer_init(th_rtp_streamer_t *trs, int fd,
struct sockaddr_in *dst);
void rtp_output_ts(void *opaque, struct th_subscription *s,
void rtp_output_ts(void *opaque, th_subscription_t *s,
uint8_t *pkt, int blocks, int64_t pcr);
int rtp_sendmsg(uint8_t *pkt, int blocks, int64_t pcr,

3
rtsp.c
View file

@ -123,7 +123,8 @@ rtsp_subscription_callback(struct th_subscription *s,
switch(event) {
case TRANSPORT_AVAILABLE:
assert(rs->rs_muxer == NULL);
rs->rs_muxer = ts_muxer_init(s, rtp_output_ts, &rs->rs_rtp_streamer, 0);
rs->rs_muxer = ts_muxer_init(s, rtp_output_ts, &rs->rs_rtp_streamer,
TS_SEEK);
break;
case TRANSPORT_UNAVAILABLE:

1365
tsmux.c

File diff suppressed because it is too large Load diff

48
tsmux.h
View file

@ -19,13 +19,53 @@
#ifndef TSMUX_H
#define TSMUX_H
th_muxer_t *ts_muxer_init(th_subscription_t *s, th_mux_output_t *cb,
typedef void (ts_mux_output_t)(void *opaque, th_subscription_t *s,
uint8_t *pkt, int npackets, int64_t pcr);
typedef struct ts_muxer {
th_subscription_t *ts_subscription;
int ts_flags;
#define TS_SEEK 0x1
#define TS_HTSCLIENT 0x2
int ts_running;
th_muxer_t *ts_muxer;
ts_mux_output_t *ts_output;
void *ts_output_opaque;
int64_t ts_pcr_offset;
int64_t ts_pcr_ref;
int ts_pat_cc;
int ts_pmt_cc;
dtimer_t ts_patpmt_timer;
uint8_t *ts_packet;
int ts_block;
int ts_blocks_per_packet;
dtimer_t ts_stream_timer;
int ts_pcrpid;
int64_t ts_last_pcr;
} ts_muxer_t;
ts_muxer_t *ts_muxer_init(th_subscription_t *s, ts_mux_output_t *output,
void *opaque, int flags);
void ts_muxer_deinit(th_muxer_t *tm, th_subscription_t *s);
void ts_muxer_deinit(ts_muxer_t *ts, th_subscription_t *s);
void ts_muxer_play(th_muxer_t *tm, int64_t toffset);
void ts_muxer_play(ts_muxer_t *ts, int64_t toffset);
void ts_muxer_pause(th_muxer_t *tm);
void ts_muxer_pause(ts_muxer_t *ts);
#endif /* TSMUX_H */

View file

@ -86,7 +86,7 @@ LIST_HEAD(th_pkt_list, th_pkt);
LIST_HEAD(th_muxer_list, th_muxer);
LIST_HEAD(th_muxstream_list, th_muxstream);
LIST_HEAD(th_descrambler_list, th_descrambler);
TAILQ_HEAD(th_metapkt_queue, th_metapkt);
extern time_t dispatch_clock;
extern int startupcounter;
@ -521,37 +521,39 @@ typedef struct th_pkt {
} th_pkt_t;
/**
* Meta packets
*/
typedef struct th_metapkt {
TAILQ_ENTRY(th_metapkt) tm_link;
int64_t tm_ts_start;
int64_t tm_ts_stop;
int tm_pcroffset;
uint8_t tm_pkt[0];
} th_metapkt_t;
/*
* A mux stream reader
*/
typedef struct th_muxstream {
LIST_ENTRY(th_muxstream) tms_muxer_link;
LIST_ENTRY(th_muxstream) tms_muxer_link0;
struct th_muxer *tms_muxer;
th_pkt_t *tms_curpkt;
int tms_offset; /* offset in current packet */
th_stream_t *tms_stream;
LIST_ENTRY(th_muxstream) tms_muxer_media_link;
int64_t tms_nextblock; /* Time for delivery of next block */
int tms_block_interval;
int tms_block_rate;
int tms_mux_offset;
int tms_index; /* Used as PID or whatever */
th_pkt_t *tms_tmppkt; /* temporary pkt pointer during lock phaze */
struct th_metapkt_queue tms_metaqueue;
uint32_t tms_meta_packets; /* number of packets "" */
/* MPEG TS multiplex stuff */
int tms_sc; /* start code */
int tms_cc;
int tms_dopcr;
int64_t tms_nextpcr;
int64_t tms_muxoffset; /* DTS offset from PCR */
/* Memebers used when running with ffmpeg */
@ -571,8 +573,8 @@ typedef struct th_muxstream {
struct th_subscription;
typedef void (th_mux_output_t)(void *opaque, struct th_subscription *s,
uint8_t *pkt, int blocks, int64_t pcr);
typedef void (th_mux_output_t)(void *opaque, th_muxstream_t *tms,
th_pkt_t *pkt);
typedef void (th_mux_newpkt_t)(struct th_muxer *tm, th_stream_t *st,
@ -583,33 +585,17 @@ typedef struct th_muxer {
th_mux_newpkt_t *tm_new_pkt;
LIST_ENTRY(th_muxer) tm_transport_link;
int tm_transport_linked;
int tm_linked;
int64_t tm_clockref; /* Base clock ref */
int64_t tm_pauseref; /* Time when we were paused */
int64_t tm_offset;
int tm_flags;
#define TM_HTSCLIENTMODE 0x1 /* Ugly workaround for now */
#define TM_SEEKABLE 0x2 /* We need the pause / seek to work */
int64_t tm_start_dts;
int64_t tm_next_pat;
struct th_muxstream_list tm_media_streams;
struct th_muxstream_list tm_active_streams;
struct th_muxstream_list tm_stale_streams;
struct th_muxstream_list tm_stopped_streams;
uint8_t *tm_packet;
int tm_blocks_per_packet;
struct th_muxstream_list tm_streams;
struct th_subscription *tm_subscription;
th_mux_output_t *tm_callback;
th_mux_output_t *tm_output;
void *tm_opaque;
dtimer_t tm_timer;
enum {
TM_IDLE,
@ -618,16 +604,7 @@ typedef struct th_muxer {
TM_PAUSE,
} tm_status;
th_muxstream_t *tm_pat;
th_muxstream_t *tm_pmt;
struct AVFormatContext *tm_avfctx;
int tm_drop_rate;
int tm_drop_cnt;
dtimer_t tm_table_timer;
int tm_block_offset;
} th_muxer_t;