Merge pull request #60 from john-tornblom/http_mkvmuxing

do matroska muxing of live streams instead of raw TS
This commit is contained in:
Andreas Öman 2012-03-02 23:27:42 -08:00
commit abaf9290cb
3 changed files with 192 additions and 77 deletions

View file

@ -72,6 +72,7 @@ struct mk_mux {
mk_track *tracks;
int ntracks;
int has_video;
int64_t totduration;
@ -238,7 +239,8 @@ mk_build_tracks(mk_mux_t *mkm, const struct streaming_start *ss)
mkm->tracks[i].enabled = 1;
tracknum++;
mkm->tracks[i].tracknum = tracknum;
mkm->has_video |= (tracktype == 1);
t = htsbuf_queue_alloc(0);
ebml_append_uint(t, 0xd7, mkm->tracks[i].tracknum);
@ -373,14 +375,13 @@ mk_write_master(mk_mux_t *mkm, uint32_t id, htsbuf_queue_t *p)
/**
*
*/
static void
mk_write_segment_header(mk_mux_t *mkm, int64_t size)
static htsbuf_queue_t *
mk_build_segment_header(int64_t size)
{
htsbuf_queue_t q;
htsbuf_queue_t *q = htsbuf_queue_alloc(0);
uint8_t u8[8];
htsbuf_queue_init(&q, 0);
ebml_append_id(&q, 0x18538067);
ebml_append_id(q, 0x18538067);
u8[0] = 1;
if(size == 0) {
@ -394,9 +395,19 @@ mk_write_segment_header(mk_mux_t *mkm, int64_t size)
u8[6] = size >> 8;
u8[7] = size;
}
htsbuf_append(&q, &u8, 8);
htsbuf_append(q, &u8, 8);
mk_write_queue(mkm, &q);
return q;
}
/**
*
*/
static void
mk_write_segment_header(mk_mux_t *mkm, int64_t size)
{
mk_write_queue(mkm, mk_build_segment_header(size));
}
@ -508,6 +519,61 @@ mk_build_metadata(const dvr_entry_t *de)
}
static htsbuf_queue_t *
mk_build_metadata2(const event_t *e)
{
htsbuf_queue_t *q = htsbuf_queue_alloc(0);
char datestr[64];
struct tm tm;
const char *ctype;
localtime_r(&e->e_start, &tm);
snprintf(datestr, sizeof(datestr),
"%04d-%02d-%02d %02d:%02d:%02d",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec);
addtag(q, build_tag_string("DATE_BROADCASTED", datestr, 0, NULL));
addtag(q, build_tag_string("ORIGINAL_MEDIA_TYPE", "TV", 0, NULL));
if(e->e_content_type) {
ctype = epg_content_group_get_name(e->e_content_type);
if(ctype != NULL)
addtag(q, build_tag_string("CONTENT_TYPE", ctype, 0, NULL));
}
if(e->e_channel != NULL)
addtag(q, build_tag_string("TVCHANNEL", e->e_channel->ch_name, 0, NULL));
if(e->e_episode.ee_onscreen)
addtag(q, build_tag_string("SYNOPSIS",
e->e_episode.ee_onscreen, 0, NULL));
if(e->e_desc != NULL)
addtag(q, build_tag_string("SUMMARY", e->e_desc, 0, NULL));
if(e->e_episode.ee_season)
addtag(q, build_tag_int("PART_NUMBER", e->e_episode.ee_season,
60, "SEASON"));
if(e->e_episode.ee_episode)
addtag(q, build_tag_int("PART_NUMBER", e->e_episode.ee_episode,
0, NULL));
if(e->e_episode.ee_part)
addtag(q, build_tag_int("PART_NUMBER", e->e_episode.ee_part,
40, "PART"));
return q;
}
/**
*
*/
@ -583,6 +649,38 @@ mk_write_metaseek(mk_mux_t *mkm, int first)
htsbuf_queue_flush(&q);
}
/**
*
*/
static htsbuf_queue_t *
mk_build_segment(mk_mux_t *mkm,
const struct streaming_start *ss,
const event_t *e)
{
htsbuf_queue_t q;
htsbuf_queue_t *p = htsbuf_queue_alloc(0);
htsbuf_queue_init(&q, 0);
mkm->segmentinfo_pos = 33;
ebml_append_master(&q, 0x1549a966, mk_build_segment_info(mkm));
mkm->trackinfo_pos = 33 + q.hq_size;
ebml_append_master(&q, 0x1654ae6b, mk_build_tracks(mkm, ss));
if(e) {
mkm->metadata_pos = 33 + q.hq_size;
ebml_append_master(&q, 0x1254c367, mk_build_metadata2(e));
}
ebml_append_master(p, 0x114d9b74, mk_build_metaseek(mkm));
htsbuf_appendq(p, &q);
htsbuf_queue_flush(&q);
return p;
}
/**
*
*/
@ -631,6 +729,30 @@ mk_mux_create(const char *filename,
return mkm;
}
mk_mux_t *
mk_mux_stream_create(int fd, const struct streaming_start *ss,
const event_t *e)
{
mk_mux_t *mkm;
htsbuf_queue_t q;
mkm = calloc(1, sizeof(struct mk_mux));
getuuid(mkm->uuid);
mkm->filename = strdup("Live stream");
mkm->fd = fd;
mkm->title = strdup(e ? e->e_title : mkm->filename);
TAILQ_INIT(&mkm->cues);
htsbuf_queue_init(&q, 0);
ebml_append_master(&q, 0x1a45dfa3, mk_build_ebmlheader());
htsbuf_appendq(&q, mk_build_segment_header(0));
htsbuf_appendq(&q, mk_build_segment(mkm, ss, e));
mk_write_queue(mkm, &q);
return mkm;
}
/**
*
@ -705,6 +827,9 @@ mk_write_frame_i(mk_mux_t *mkm, mk_track *t, th_pkt_t *pkt)
else if(mkm->cluster && mkm->cluster->hq_size > clusersizemax)
mk_close_cluster(mkm);
else if(!mkm->has_video && mkm->cluster && mkm->cluster->hq_size > clusersizemax/40)
mk_close_cluster(mkm);
if(mkm->cluster == NULL) {
mkm->cluster_tc = pts;
mkm->cluster = htsbuf_queue_alloc(0);
@ -750,7 +875,7 @@ mk_write_frame_i(mk_mux_t *mkm, mk_track *t, th_pkt_t *pkt)
/**
*
*/
void
int
mk_mux_write_pkt(mk_mux_t *mkm, struct th_pkt *pkt)
{
int i;
@ -770,6 +895,8 @@ mk_mux_write_pkt(mk_mux_t *mkm, struct th_pkt *pkt)
}
pkt_ref_dec(pkt);
return mkm->error;
}

View file

@ -30,7 +30,11 @@ mk_mux_t *mk_mux_create(const char *filename,
const struct dvr_entry *de,
int write_tags);
void mk_mux_write_pkt(mk_mux_t *mkm, struct th_pkt *pkt);
mk_mux_t *mk_mux_stream_create(int fd,
const struct streaming_start *ss,
const event_t *e);
int mk_mux_write_pkt(mk_mux_t *mkm, struct th_pkt *pkt);
void mk_mux_close(mk_mux_t *mk_mux);

View file

@ -34,8 +34,11 @@
#include "http.h"
#include "webui.h"
#include "dvr/dvr.h"
#include "dvr/mkmux.h"
#include "filebundle.h"
#include "psi.h"
#include "plumbing/tsfix.h"
#include "plumbing/globalheaders.h"
struct filebundle *filebundles;
@ -125,15 +128,15 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque)
* HTTP stream loop
*/
static void
http_stream_run(http_connection_t *hc, streaming_queue_t *sq)
http_stream_run(http_connection_t *hc, streaming_queue_t *sq, th_subscription_t *s)
{
streaming_message_t *sm;
int run = 1;
int start = 1;
mk_mux_t *mkm = NULL;
int timeouts = 0;
pthread_mutex_lock(&sq->sq_mutex);
while(run) {
pthread_mutex_lock(&sq->sq_mutex);
sm = TAILQ_FIRST(&sq->sq_queue);
if(sm == NULL) {
struct timespec ts;
@ -157,63 +160,39 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq)
run = 0;
}
}
pthread_mutex_unlock(&sq->sq_mutex);
continue;
}
timeouts = 0; //Reset timeout counter
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
pthread_mutex_unlock(&sq->sq_mutex);
switch(sm->sm_type) {
case SMT_PACKET:
//printf("SMT_PACKET\n");
if(!mkm)
break;
pkt_ref_inc(sm->sm_data);
run = !mk_mux_write_pkt(mkm, sm->sm_data);
sm->sm_data = NULL;
break;
case SMT_START:
if (start) {
struct streaming_start *ss = sm->sm_data;
uint8_t pat_ts[188];
uint8_t pmt_ts[188];
int pcrpid = ss->ss_pcr_pid;
int pmtpid = 0x0fff;
case SMT_START: {
if(s->ths_service->s_servicetype == ST_RADIO)
http_output_content(hc, "audio/x-matroska");
else
http_output_content(hc, "video/x-matroska");
http_output_content(hc, "video/mp2t");
//Send PAT
memset(pat_ts, 0xff, 188);
psi_build_pat(NULL, pat_ts+5, 183, pmtpid);
pat_ts[0] = 0x47;
pat_ts[1] = 0x40;
pat_ts[2] = 0x00;
pat_ts[3] = 0x10;
pat_ts[4] = 0x00;
run = (write(hc->hc_fd, pat_ts, 188) == 188);
if(!run) {
break;
}
event_t *e = NULL; //epg_event_find_by_time(s->ths_channel, dispatch_clock);
//Send PMT
memset(pmt_ts, 0xff, 188);
psi_build_pmt(ss, pmt_ts+5, 183, pcrpid);
pmt_ts[0] = 0x47;
pmt_ts[1] = 0x40 | (pmtpid >> 8);
pmt_ts[2] = pmtpid;
pmt_ts[3] = 0x10;
pmt_ts[4] = 0x00;
run = (write(hc->hc_fd, pmt_ts, 188) == 188);
start = 0;
}
mkm = mk_mux_stream_create(hc->hc_fd, sm->sm_data, e);
break;
}
case SMT_STOP:
run = 0;
break;
case SMT_SERVICE_STATUS:
//printf("SMT_TRANSPORT_STATUS\n");
break;
case SMT_NOSTART:
@ -221,23 +200,22 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq)
break;
case SMT_MPEGTS:
run = (write(hc->hc_fd, sm->sm_data, 188) == 188);
break;
case SMT_EXIT:
run = 0;
break;
}
streaming_msg_free(sm);
pthread_mutex_lock(&sq->sq_mutex);
pthread_mutex_unlock(&sq->sq_mutex);
}
pthread_mutex_unlock(&sq->sq_mutex);
if(mkm)
mk_mux_close(mkm);
}
/**
* Output a playlist with http streams for a channel (.m3u format)
* Output a playlist with http streams for a channel (.m3u8 format)
*/
static int
http_stream_playlist(http_connection_t *hc, channel_t *channel)
@ -248,7 +226,7 @@ http_stream_playlist(http_connection_t *hc, channel_t *channel)
channel_t *ch = NULL;
const char *host = http_arg_get(&hc->hc_args, "Host");
pthread_mutex_lock(&global_lock);
htsbuf_qprintf(hq, "#EXTM3U\n");
@ -372,26 +350,28 @@ http_stream_service(http_connection_t *hc, service_t *service)
{
streaming_queue_t sq;
th_subscription_t *s;
streaming_target_t *gh;
streaming_target_t *tsfix;
streaming_queue_init(&sq, 0);
gh = globalheaders_create(&sq.sq_st);
tsfix = tsfix_create(gh);
pthread_mutex_lock(&global_lock);
streaming_queue_init(&sq, ~SMT_TO_MASK(SUBSCRIPTION_RAW_MPEGTS));
s = subscription_create_from_service(service,
"HTTP", &sq.sq_st,
SUBSCRIPTION_RAW_MPEGTS);
"HTTP", tsfix,
0);
pthread_mutex_unlock(&global_lock);
//We won't get a START command, send http-header here.
http_output_content(hc, "video/mp2t");
http_stream_run(hc, &sq);
http_stream_run(hc, &sq, s);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
pthread_mutex_unlock(&global_lock);
globalheaders_destroy(gh);
tsfix_destroy(tsfix);
streaming_queue_deinit(&sq);
return 0;
@ -405,24 +385,28 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
{
streaming_queue_t sq;
th_subscription_t *s;
int priority = 150; //Default value, Compute this somehow
streaming_target_t *gh;
streaming_target_t *tsfix;
int priority = 100;
streaming_queue_init(&sq, 0);
gh = globalheaders_create(&sq.sq_st);
tsfix = tsfix_create(gh);
pthread_mutex_lock(&global_lock);
streaming_queue_init(&sq, ~SMT_TO_MASK(SUBSCRIPTION_RAW_MPEGTS));
s = subscription_create_from_channel(ch, priority,
"HTTP", &sq.sq_st,
SUBSCRIPTION_RAW_MPEGTS);
"HTTP", tsfix,
0);
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq);
http_stream_run(hc, &sq, s);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
pthread_mutex_unlock(&global_lock);
globalheaders_destroy(gh);
tsfix_destroy(tsfix);
streaming_queue_deinit(&sq);
return 0;