Initial work on TS pass-through

This commit is contained in:
Andreas Öman 2009-11-18 20:28:35 +00:00
parent 9ec995ced3
commit dade35a931
15 changed files with 166 additions and 86 deletions

View file

@ -69,7 +69,7 @@ dvr_rec_subscribe(dvr_entry_t *de)
pthread_create(&de->de_thread, NULL, dvr_thread, de);
de->de_s = subscription_create_from_channel(de->de_channel, 1000, buf,
&de->de_sq.sq_st);
&de->de_sq.sq_st, 0);
}
/**
@ -443,6 +443,9 @@ dvr_thread(void *aux)
"No source transport available, automatic retry");
break;
case SMT_MPEGTS:
break;
case SMT_EXIT:
run = 0;
break;

View file

@ -644,10 +644,10 @@ htsp_method_subscribe(htsp_connection_t *htsp, htsmsg_t *in)
hs->hs_sid = sid;
LIST_INSERT_HEAD(&htsp->htsp_subscriptions, hs, hs_link);
streaming_target_init(&hs->hs_input, htsp_streaming_input, hs);
streaming_target_init(&hs->hs_input, htsp_streaming_input, hs, 0);
hs->hs_s = subscription_create_from_channel(ch, 500, htsp->htsp_logname,
&hs->hs_input);
&hs->hs_input, 0);
return NULL;
}
@ -1410,6 +1410,9 @@ htsp_streaming_input(void *opaque, streaming_message_t *sm)
htsp_subscription_status(hs, "No available sources");
break;
case SMT_MPEGTS:
break;
case SMT_EXIT:
abort();
}

View file

@ -114,20 +114,6 @@ void
parse_mpeg_ts(th_transport_t *t, th_stream_t *st, uint8_t *data,
int len, int start, int err)
{
th_subscription_t *s;
if(LIST_FIRST(&t->tht_streaming_pad.sp_targets) == NULL) {
/* No muxers. However, subscriptions may force demultiplex
for other reasons (serviceprobe does this) */
LIST_FOREACH(s, &t->tht_subscriptions, ths_transport_link)
if(s->ths_force_demux)
break;
if(s == NULL)
return; /* No-one is interested, so drop here */
}
switch(st->st_type) {
case SCT_MPEG2VIDEO:
parse_video(t, st, data, len, parse_mpeg2video);
@ -1048,11 +1034,15 @@ parser_deliver(th_transport_t *t, th_stream_t *st, th_pkt_t *pkt)
/* Forward packet */
pkt->pkt_componentindex = st->st_index;
streaming_pad_deliver(&t->tht_streaming_pad,
streaming_msg_create_pkt(pkt));
streaming_message_t *sm = streaming_msg_create_pkt(pkt);
streaming_pad_deliver(&t->tht_streaming_pad, sm);
streaming_msg_free(sm);
/* Decrease our own reference to the packet */
pkt_ref_dec(pkt);
}
/**

View file

@ -453,21 +453,14 @@ psi_parse_pmt(th_transport_t *t, const uint8_t *ptr, int len, int chksvcid,
}
/**
* PAT generator
* PMT generator
*/
#if 0
int
psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid)
psi_build_pmt(streaming_start_t *ss, uint8_t *buf0, int maxlen, int pcrpid)
{
th_stream_t *st;
th_muxstream_t *tms;
int c, tlen, dlen, l;
int c, tlen, dlen, l, i, pid;
uint8_t *buf, *buf1;
buf = buf0;
@ -492,13 +485,12 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid)
buf += 12;
tlen = 12;
LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) {
st = tms->tms_stream;
for(i = 0; i < ss->ss_num_components; i++) {
streaming_start_component_t *ssc = &ss->ss_components[i];
if(tms->tms_index == 0)
continue;
pid = 200 + i;
switch(st->st_type) {
switch(ssc->ssc_type) {
case SCT_MPEG2VIDEO:
c = 0x02;
break;
@ -521,15 +513,15 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid)
buf[0] = c;
buf[1] = 0xe0 | (tms->tms_index >> 8);
buf[2] = tms->tms_index;
buf[1] = 0xe0 | (pid >> 8);
buf[2] = pid;
buf1 = &buf[3];
tlen += 5;
buf += 5;
dlen = 0;
switch(st->st_type) {
switch(ssc->ssc_type) {
case SCT_AC3:
buf[0] = DVB_DESC_AC3;
buf[1] = 1;
@ -554,10 +546,6 @@ psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid)
return psi_append_crc32(buf0, tlen, maxlen);
}
#endif
/*
* CRC32
@ -688,14 +676,13 @@ static struct strtab streamtypetab[] = {
{ "PMT", SCT_PMT },
{ "PAT", SCT_PAT },
{ "AAC", SCT_AAC },
{ "MPEGTS", SCT_MPEGTS },
};
/**
*
*/
const char *
streaming_component_type2txt(streaming_component_type_t s)
{

View file

@ -20,6 +20,7 @@
#define PSI_H_
#include "htsmsg.h"
#include "streaming.h"
#define PSI_SECTION_SIZE 4096
@ -42,7 +43,7 @@ uint32_t psi_crc32(uint8_t *data, size_t datalen);
int psi_build_pat(th_transport_t *t, uint8_t *buf, int maxlen, int pmtpid);
//int psi_build_pmt(th_muxer_t *tm, uint8_t *buf0, int maxlen, int pcrpid);
int psi_build_pmt(streaming_start_t *ss, uint8_t *buf, int maxlen, int pcrpid);
const char *psi_caid2name(uint16_t caid);

View file

@ -51,6 +51,8 @@ typedef struct rtsp_resource {
RTSP_RESOURCE_SERVICE
} rr_type;
int rr_mpegts;
union {
struct channel *rr_channel;
struct th_transport *rr_service;
@ -464,6 +466,9 @@ rtsp_streaming_input(void *opaque, streaming_message_t *sm)
case SMT_NOSOURCE:
break;
case SMT_MPEGTS:
break;
default:
abort();
}
@ -496,16 +501,18 @@ rtsp_subscribe(rtsp_t *rtsp, rtsp_resource_t *rr)
streaming_target_init(&rtsp->rtsp_input, rtsp_streaming_input, rtsp);
streaming_target_init(&rtsp->rtsp_input, rtsp_streaming_input, rtsp, 0);
int flags = rr->rr_mpegts ? SUBSCRIPTION_RAW_MPEGTS : 0;
switch(rr->rr_type) {
case RTSP_RESOURCE_CHANNEL:
s = subscription_create_from_channel(rr->rr_channel, 500, "RTSP",
&rtsp->rtsp_input);
&rtsp->rtsp_input, flags);
break;
case RTSP_RESOURCE_SERVICE:
s = subscription_create_from_transport(rr->rr_service, "RTSP",
&rtsp->rtsp_input);
&rtsp->rtsp_input, flags);
break;
}
@ -517,11 +524,12 @@ rtsp_subscribe(rtsp_t *rtsp, rtsp_resource_t *rr)
* Resolve an URL into a resource
*/
static int
rtsp_resolve_url(char *url, rtsp_resource_t *rr, char **remainp)
rtsp_resolve_url(http_connection_t *hc, rtsp_resource_t *rr, char **remainp)
{
char *components[5];
void *r;
int nc;
char *url = hc->hc_url;
if(!strncasecmp(url, "rtsp://", strlen("rtsp://"))) {
url += strlen("rtsp://");
@ -537,7 +545,7 @@ rtsp_resolve_url(char *url, rtsp_resource_t *rr, char **remainp)
return -1;
http_deescape(components[1]);
if(!strcmp(components[0], "channel")) {
rr->rr_type = RTSP_RESOURCE_CHANNEL;
r = channel_find_by_name(components[1], 0);
@ -582,7 +590,7 @@ rtsp_cmd_options(http_connection_t *hc)
pthread_mutex_lock(&global_lock);
if(strcmp(hc->hc_url, "*") && rtsp_resolve_url(hc->hc_url, &rr, NULL)) {
if(strcmp(hc->hc_url, "*") && rtsp_resolve_url(hc, &rr, NULL)) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve");
pthread_mutex_unlock(&global_lock);
return 0;
@ -635,7 +643,7 @@ rtsp_cmd_describe(http_connection_t *hc, rtsp_t *rtsp)
pthread_mutex_lock(&global_lock);
if(rtsp_resolve_url(hc->hc_url, &rr, NULL)) {
if(rtsp_resolve_url(hc, &rr, NULL)) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve");
pthread_mutex_unlock(&global_lock);
return 0;
@ -865,6 +873,11 @@ rtsp_setup_udp(http_connection_t *hc, rtsp_t *rtsp,
LIST_INSERT_HEAD(&rtsp->rtsp_streams, rs, rs_link);
tvhlog(LOG_DEBUG, "RTSP", "%s: %s -- Stream %s to UDP port %d",
inet_ntoa(hc->hc_peer->sin_addr), hc->hc_url_orig,
streaming_component_type2txt(ssc->ssc_type),
client_ports[0]);
rtsp_printf(hc,
"RTSP/1.0 200 OK\r\n"
"Session: %s\r\n"
@ -903,7 +916,7 @@ rtsp_cmd_setup(http_connection_t *hc, rtsp_t *rtsp)
pthread_mutex_lock(&global_lock);
if(rtsp_resolve_url(hc->hc_url, &rr, &remain)) {
if(rtsp_resolve_url(hc, &rr, &remain)) {
rtsp_error(hc, RTSP_STATUS_SERVICE, "URL does not resolve");
pthread_mutex_unlock(&global_lock);
return 0;

View file

@ -112,7 +112,7 @@ serviceprobe_thread(void *aux)
tvhlog(LOG_INFO, "serviceprobe", "%20s: checking...",
t->tht_svcname);
s = subscription_create_from_transport(t, "serviceprobe", &sq.sq_st);
s = subscription_create_from_transport(t, "serviceprobe", &sq.sq_st, 0);
transport_ref(t);
pthread_mutex_unlock(&global_lock);

View file

@ -34,10 +34,12 @@ streaming_pad_init(streaming_pad_t *sp)
*
*/
void
streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque)
streaming_target_init(streaming_target_t *st, st_callback_t *cb, void *opaque,
int reject_filter)
{
st->st_cb = cb;
st->st_opaque = opaque;
st->st_reject_filter = reject_filter;
}
@ -62,7 +64,7 @@ streaming_queue_deliver(void *opauqe, streaming_message_t *sm)
void
streaming_queue_init(streaming_queue_t *sq)
{
streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq);
streaming_target_init(&sq->sq_st, streaming_queue_deliver, sq, 0);
pthread_mutex_init(&sq->sq_mutex, NULL);
pthread_cond_init(&sq->sq_cond, NULL);
@ -176,6 +178,11 @@ streaming_msg_clone(streaming_message_t *src)
case SMT_EXIT:
break;
case SMT_MPEGTS:
dst->sm_data = malloc(188);
memcpy(dst->sm_data, src->sm_data, 188);
break;
default:
abort();
}
@ -225,6 +232,10 @@ streaming_msg_free(streaming_message_t *sm)
case SMT_NOSOURCE:
break;
case SMT_MPEGTS:
free(sm->sm_data);
break;
default:
abort();
}
@ -240,17 +251,29 @@ streaming_pad_deliver(streaming_pad_t *sp, streaming_message_t *sm)
{
streaming_target_t *st, *next;
if(sp->sp_ntargets == 0)
return;
for(st = LIST_FIRST(&sp->sp_targets);st; st = next) {
for(st = LIST_FIRST(&sp->sp_targets);; st = next) {
if((next = LIST_NEXT(st, st_link)) == NULL)
break;
next = LIST_NEXT(st, st_link);
if(st->st_reject_filter & SMT_TO_MASK(sm->sm_type))
continue;
st->st_cb(st->st_opaque, streaming_msg_clone(sm));
}
st->st_cb(st->st_opaque, sm);
}
/**
*
*/
int
streaming_pad_probe_type(streaming_pad_t *sp, streaming_message_type_t smt)
{
streaming_target_t *st;
LIST_FOREACH(st, &sp->sp_targets, st_link) {
if(!(st->st_reject_filter & SMT_TO_MASK(smt)))
return 1;
}
return 0;
}

View file

@ -51,7 +51,8 @@ typedef struct streaming_start {
void streaming_pad_init(streaming_pad_t *sp);
void streaming_target_init(streaming_target_t *st,
st_callback_t *cb, void *opaque);
st_callback_t *cb, void *opaque,
int reject_filter);
void streaming_queue_init(streaming_queue_t *sq);
@ -80,5 +81,8 @@ streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt);
#define streaming_target_deliver(st, sm) ((st)->st_cb((st)->st_opaque, (sm)))
void streaming_start_unref(streaming_start_t *ss);
int streaming_pad_probe_type(streaming_pad_t *sp,
streaming_message_type_t smt);
#endif /* STREAMING_H_ */

View file

@ -194,8 +194,7 @@ static void
subscription_input(void *opauqe, streaming_message_t *sm)
{
th_subscription_t *s = opauqe;
streaming_target_deliver(s->ths_output, sm);
streaming_target_deliver(s->ths_output, sm);
}
@ -204,16 +203,25 @@ subscription_input(void *opauqe, streaming_message_t *sm)
*
*/
static th_subscription_t *
subscription_create(int weight, const char *name, streaming_target_t *st)
subscription_create(int weight, const char *name, streaming_target_t *st,
int flags)
{
th_subscription_t *s = calloc(1, sizeof(th_subscription_t));
int reject = 0;
streaming_target_init(&s->ths_input, subscription_input, s);
if(flags & SUBSCRIPTION_RAW_MPEGTS)
reject |= SMT_TO_MASK(SMT_PACKET); // Reject parsed frames
else
reject |= SMT_TO_MASK(SMT_MPEGTS); // Reject raw mpegts
streaming_target_init(&s->ths_input, subscription_input, s, reject);
s->ths_weight = weight;
s->ths_title = strdup(name);
s->ths_total_err = 0;
s->ths_output = st;
s->ths_flags = flags;
time(&s->ths_start);
LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort);
@ -227,9 +235,10 @@ subscription_create(int weight, const char *name, streaming_target_t *st)
*/
th_subscription_t *
subscription_create_from_channel(channel_t *ch, unsigned int weight,
const char *name, streaming_target_t *st)
const char *name, streaming_target_t *st,
int flags)
{
th_subscription_t *s = subscription_create(weight, name, st);
th_subscription_t *s = subscription_create(weight, name, st, flags);
s->ths_channel = ch;
LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link);
@ -270,9 +279,9 @@ subscription_create_from_channel(channel_t *ch, unsigned int weight,
*/
th_subscription_t *
subscription_create_from_transport(th_transport_t *t, const char *name,
streaming_target_t *st)
streaming_target_t *st, int flags)
{
th_subscription_t *s = subscription_create(INT32_MAX, name, st);
th_subscription_t *s = subscription_create(INT32_MAX, name, st, flags);
if(t->tht_status != TRANSPORT_RUNNING)
transport_start(t, INT32_MAX, 1);
@ -336,8 +345,8 @@ subscription_dummy_join(const char *id)
}
st = calloc(1, sizeof(streaming_target_t));
streaming_target_init(st, dummy_callback, NULL);
subscription_create_from_transport(t, "dummy", st);
streaming_target_init(st, dummy_callback, NULL, 0);
subscription_create_from_transport(t, "dummy", st, 0);
tvhlog(LOG_NOTICE, "subscription",
"Dummy join %s ok", id);

View file

@ -19,6 +19,8 @@
#ifndef SUBSCRIPTIONS_H
#define SUBSCRIPTIONS_H
#define SUBSCRIPTION_RAW_MPEGTS 0x1
typedef struct th_subscription {
LIST_ENTRY(th_subscription) ths_global_link;
int ths_weight;
@ -36,12 +38,12 @@ typedef struct th_subscription {
time_t ths_start; /* time when subscription started */
int ths_total_err; /* total errors during entire subscription */
int ths_force_demux;
streaming_target_t ths_input;
streaming_target_t *ths_output;
int ths_flags;
} th_subscription_t;
@ -55,12 +57,14 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight);
th_subscription_t *subscription_create_from_channel(channel_t *ch,
unsigned int weight,
const char *name,
streaming_target_t *st);
streaming_target_t *st,
int flags);
th_subscription_t *subscription_create_from_transport(th_transport_t *t,
const char *name,
streaming_target_t *st);
streaming_target_t *st,
int flags);
void subscription_stop(th_subscription_t *s);

View file

@ -669,9 +669,10 @@ transport_set_feed_status(th_transport_t *t, transport_feed_status_t newstatus)
t->tht_feed_status = newstatus;
streaming_pad_deliver(&t->tht_streaming_pad,
streaming_msg_create_code(SMT_TRANSPORT_STATUS,
newstatus));
streaming_message_t *sm = streaming_msg_create_code(SMT_TRANSPORT_STATUS,
newstatus);
streaming_pad_deliver(&t->tht_streaming_pad, sm);
streaming_msg_free(sm);
}
@ -689,6 +690,7 @@ transport_restart(th_transport_t *t, int had_components)
if(had_components) {
sm = streaming_msg_create_code(SMT_STOP, 0);
streaming_pad_deliver(&t->tht_streaming_pad, sm);
streaming_msg_free(sm);
}
t->tht_refresh_feed(t);
@ -698,6 +700,7 @@ transport_restart(th_transport_t *t, int had_components)
sm = streaming_msg_create_data(SMT_START,
transport_build_stream_start(t));
streaming_pad_deliver(&t->tht_streaming_pad, sm);
streaming_msg_free(sm);
}
}
@ -872,3 +875,17 @@ transport_source_info_free(struct source_info *si)
free(si->si_provider);
free(si->si_service);
}
void
transport_source_info_copy(source_info_t *dst, source_info_t *src)
{
#define COPY(x) dst->si_##x = src->si_##x ? strdup(src->si_##x) : NULL
COPY(device);
COPY(adapter);
COPY(network);
COPY(mux);
COPY(provider);
COPY(service);
#undef COPY
}

View file

@ -89,4 +89,6 @@ void transport_request_save(th_transport_t *t);
void transport_source_info_free(source_info_t *si);
void transport_source_info_copy(source_info_t *dst, source_info_t *src);
#endif /* TRANSPORTS_H */

View file

@ -42,12 +42,13 @@
#include "psi.h"
#include "tsdemux.h"
#include "parsers.h"
#include "streaming.h"
static void ts_remux(th_transport_t *t, const uint8_t *tsb);
/**
* Code for dealing with a complete section
*/
static void
got_section(th_transport_t *t, th_stream_t *st)
{
@ -64,7 +65,6 @@ got_section(th_transport_t *t, th_stream_t *st)
}
/**
* Continue processing of transport stream packets
*/
@ -73,6 +73,9 @@ ts_recv_packet0(th_transport_t *t, th_stream_t *st, uint8_t *tsb)
{
int off, len, pusi, cc, err = 0;
if(streaming_pad_probe_type(&t->tht_streaming_pad, SMT_MPEGTS))
ts_remux(t, tsb);
/* Check CC */
if(tsb[3] & 0x10) {
@ -262,3 +265,19 @@ ts_recv_packet2(th_transport_t *t, uint8_t *tsb)
if((st = transport_find_stream_by_pid(t, pid)) != NULL)
ts_recv_packet0(t, st, tsb);
}
/**
*
*/
static void
ts_remux(th_transport_t *t, const uint8_t *src)
{
uint8_t tsb[188];
memcpy(tsb, src, 188);
streaming_message_t sm;
sm.sm_type = SMT_MPEGTS;
sm.sm_data = tsb;
streaming_pad_deliver(&t->tht_streaming_pad, &sm);
}

View file

@ -130,6 +130,7 @@ typedef enum {
SCT_PAT,
SCT_PMT,
SCT_AAC,
SCT_MPEGTS,
} streaming_component_type_t;
@ -164,8 +165,11 @@ typedef enum {
SMT_TRANSPORT_STATUS, // sm_code is TRANSPORT_STATUS_
SMT_EXIT, // Used to signal exit to threads
SMT_NOSOURCE,
SMT_MPEGTS, // sm_data is raw MPEG TS
} streaming_message_type_t;
#define SMT_TO_MASK(x) (1 << ((unsigned int)x))
/**
* Streaming messages are sent from the pad to its receivers
@ -191,6 +195,7 @@ typedef struct streaming_target {
st_callback_t *st_cb;
void *st_opaque;
int st_reject_filter;
} streaming_target_t;