Add tentative RTSP streamer

This commit is contained in:
Andreas Öman 2009-11-12 23:07:02 +00:00
parent b19f4636d9
commit 8451ee2006
12 changed files with 1606 additions and 572 deletions

View file

@ -26,7 +26,7 @@ PROG=${BUILDDIR}/tvheadend
CFLAGS = -Wall -Werror -Wwrite-strings -Wno-deprecated-declarations
CFLAGS += -Wmissing-prototypes
LDFLAGS += -lrt
#
# Core
@ -53,6 +53,8 @@ SRCS = src/main.c \
src/parser_latm.c \
src/tsdemux.c \
src/bitstream.c \
src/rtsp.c \
src/rtp.c \
src/htsp.c \
src/serviceprobe.c \
src/htsmsg.c \

View file

@ -34,6 +34,7 @@
#include "tvhead.h"
#include "tcp.h"
#include "http.h"
#include "rtsp.h"
#include "access.h"
static void *http_server;
@ -446,6 +447,8 @@ process_request(http_connection_t *hc, htsbuf_queue_t *spill)
int n, rval = -1;
uint8_t authbuf[150];
hc->hc_url_orig = tvh_strdupa(hc->hc_url);
/* Set keep-alive status */
v = http_arg_get(&hc->hc_args, "connection");
@ -489,10 +492,8 @@ process_request(http_connection_t *hc, htsbuf_queue_t *spill)
switch(hc->hc_version) {
case RTSP_VERSION_1_0:
rval = -1;
rval = rtsp_process_request(hc);
break;
// rtsp_process_request(hc);
return 0;
case HTTP_VERSION_1_0:
case HTTP_VERSION_1_1:
@ -600,7 +601,7 @@ http_path_add(const char *path, void *opaque, http_callback_t *callback,
/**
* De-escape HTTP URL
*/
static void
void
http_deescape(char *s)
{
char v, *d = s;
@ -768,7 +769,7 @@ http_serve(int fd, void *opaque, struct sockaddr_in *source)
free(hc.hc_username);
free(hc.hc_password);
// rtsp_disconncet(hc);
rtsp_disconncet(&hc);
http_arg_flush(&hc.hc_args);
http_arg_flush(&hc.hc_req_args);

View file

@ -36,14 +36,13 @@ typedef struct http_arg {
#define HTTP_STATUS_NOT_FOUND 404
LIST_HEAD(rtsp_session_head, rtsp_session);
typedef struct http_connection {
int hc_fd;
struct sockaddr_in *hc_peer;
char *hc_representative;
char *hc_url;
char *hc_url_orig;
int hc_keep_alive;
htsbuf_queue_t hc_reply;
@ -79,8 +78,6 @@ typedef struct http_connection {
char *hc_username;
char *hc_password;
struct rtsp_session_head hc_rtsp_sessions;
int hc_authenticated; /* Used by RTSP, it seems VLC does not
send authentication headers for each
command, so we just say that it's ok
@ -93,6 +90,8 @@ typedef struct http_connection {
char *hc_post_data;
unsigned int hc_post_len;
struct rtsp *hc_rtsp_session;
} http_connection_t;
@ -137,4 +136,6 @@ void http_server_init(void);
int http_access_verify(http_connection_t *hc, int mask);
void http_deescape(char *s);
#endif /* HTTP_H_ */

118
src/rtp.c
View file

@ -17,12 +17,10 @@
*/
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <fcntl.h>
#include <errno.h>
#include <netinet/in.h>
@ -30,74 +28,64 @@
#include <arpa/inet.h>
#include "tvhead.h"
#include "channels.h"
#include "rtp.h"
#include "dispatch.h"
int
rtp_sendmsg(uint8_t *pkt, int blocks, int64_t pcr,
int fd, struct sockaddr *dst, socklen_t dstlen,
uint16_t seq)
{
struct msghdr msg;
struct iovec vec[2];
AVRational mpeg_tc = {1, 90000};
char hdr[12];
pcr = av_rescale_q(pcr, AV_TIME_BASE_Q, mpeg_tc);
hdr[0] = 0x80;
hdr[1] = 33; /* M2TS */
hdr[2] = seq >> 8;
hdr[3] = seq;
hdr[4] = pcr >> 24;
hdr[5] = pcr >> 16;
hdr[6] = pcr >> 8;
hdr[7] = pcr;
hdr[8] = 0;
hdr[9] = 0;
hdr[10] = 0;
hdr[11] = 0;
vec[0].iov_base = hdr;
vec[0].iov_len = 12;
vec[1].iov_base = pkt;
vec[1].iov_len = blocks * 188;
memset(&msg, 0, sizeof(msg));
msg.msg_name = dst;
msg.msg_namelen = dstlen;
msg.msg_iov = vec;
msg.msg_iovlen = 2;
return sendmsg(fd, &msg, 0);
}
static const AVRational mpeg_tc = {1, 90000};
void
rtp_output_ts(void *opaque, th_subscription_t *s,
uint8_t *pkt, int blocks, int64_t pcr)
rtp_send_mpv(rtp_send_t *sender, void *opaque, rtp_stream_t *rs,
const uint8_t *data, size_t len,
int64_t pts)
{
th_rtp_streamer_t *trs = opaque;
uint32_t flags = 0;
int s;
int payloadsize = RTP_MAX_PACKET_SIZE - (4 + 4 + 4 + 4);
uint8_t *buf;
if(data[0] != 0x00 || data[1] != 0x00 || data[2] != 0x01)
return; // Not a startcode, something is fishy
rtp_sendmsg(pkt, blocks, pcr, trs->trs_fd,
(struct sockaddr *)&trs->trs_dest, sizeof(struct sockaddr_in),
trs->trs_seq);
pts = av_rescale_q(pts, AV_TIME_BASE_Q, mpeg_tc);
trs->trs_seq++;
}
void
rtp_streamer_init(th_rtp_streamer_t *trs, int fd, struct sockaddr_in *dst)
{
trs->trs_fd = fd;
trs->trs_dest = *dst;
trs->trs_seq = 0;
if(data[3] == 0xb3) {
// Sequence Start code, set Begin-Of-Sequence
flags |= 1 << 13;
}
while(len > 0) {
s = len > payloadsize ? payloadsize : len;
buf = rs->rs_buf;
buf[0] = 0x80;
buf[1] = 32 | (len == payloadsize ? 0x80 : 0);
buf[2] = rs->rs_seq >> 8;
buf[3] = rs->rs_seq;
buf[4] = pts >> 24;
buf[5] = pts >> 16;
buf[6] = pts >> 8;
buf[7] = pts;
buf[8] = 0;
buf[9] = 0;
buf[10] = 0;
buf[11] = 0;
buf[12] = flags >> 24;
buf[13] = flags >> 16;
buf[14] = flags >> 8;
buf[15] = flags;
memcpy(buf + 16, data, s);
len -= s;
data += s;
sender(opaque, buf, s + 16);
rs->rs_seq++;
flags = 0;
}
assert(len == 0);
}

View file

@ -1,6 +1,6 @@
/*
* tvheadend, RTP interface
* Copyright (C) 2007 Andreas Öman
* Tvheadend, RTP streamer
* Copyright (C) 2007, 2009 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -19,21 +19,18 @@
#ifndef RTP_H_
#define RTP_H_
typedef struct th_rtp_streamer {
int trs_fd;
struct sockaddr_in trs_dest;
int16_t trs_seq;
typedef void (rtp_send_t)(void *opaque, void *buf, size_t len);
} th_rtp_streamer_t;
#define RTP_MAX_PACKET_SIZE 1472
void rtp_streamer_init(th_rtp_streamer_t *trs, int fd,
struct sockaddr_in *dst);
typedef struct rtp_stream {
uint16_t rs_seq;
int rs_ptr;
uint8_t rs_buf[RTP_MAX_PACKET_SIZE];
} rtp_stream_t;
void rtp_output_ts(void *opaque, th_subscription_t *s,
uint8_t *pkt, int blocks, int64_t pcr);
int rtp_sendmsg(uint8_t *pkt, int blocks, int64_t pcr,
int fd, struct sockaddr *dst, socklen_t dstlen,
uint16_t seq);
void rtp_send_mpv(rtp_send_t *sender, void *opaque, rtp_stream_t *rs,
const uint8_t *data, size_t len, int64_t pts);
#endif /* RTP_H_ */

1331
src/rtsp.c

File diff suppressed because it is too large Load diff

View file

@ -186,8 +186,8 @@ streaming_msg_clone(streaming_message_t *src)
/**
*
*/
static void
streaming_start_deref(streaming_start_t *ss)
void
streaming_start_unref(streaming_start_t *ss)
{
if((atomic_add(&ss->ss_refcount, -1)) != 1)
return;
@ -209,7 +209,8 @@ streaming_msg_free(streaming_message_t *sm)
break;
case SMT_START:
streaming_start_deref(sm->sm_data);
if(sm->sm_data)
streaming_start_unref(sm->sm_data);
break;
case SMT_STOP:

View file

@ -78,5 +78,7 @@ streaming_message_t *streaming_msg_create_code(streaming_message_type_t type,
streaming_message_t *streaming_msg_create_pkt(th_pkt_t *pkt);
#define streaming_target_deliver(st, sm) ((st)->st_cb((st)->st_opaque, (sm)))
void streaming_start_unref(streaming_start_t *ss);
#endif /* STREAMING_H_ */

View file

@ -143,7 +143,7 @@ ts_recv_packet0(th_transport_t *t, th_stream_t *st, uint8_t *tsb)
static void
ts_extract_pcr(th_transport_t *t, th_stream_t *st, uint8_t *tsb)
{
int64_t real = getclock_hires();
int64_t real = getmonoclock();
int64_t pcr, d;
pcr = tsb[6] << 25;

View file

@ -1,6 +1,6 @@
/*
* tvheadend, MPEG Transport stream muxer
* Copyright (C) 2008 Andreas Öman
* Copyright (C) 2008 - 2009 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -15,6 +15,642 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include "tvhead.h"
#include "streaming.h"
#include "tsmux.h"
#define PID_PMT 1000
#define PID_ES_BASE 2000
static const AVRational mpeg_tc = {1, 90000};
static const AVRational mpeg_tc_27M = {1, 27000000};
LIST_HEAD(tsmuxer_es_list, tsmuxer_es);
TAILQ_HEAD(tsmuxer_pkt_queue, tsmuxer_pkt);
/**
*
*/
typedef struct tsmuxer_pkt {
TAILQ_ENTRY(tsmuxer_pkt) tsp_link;
int64_t tsp_deadline;
int64_t tsp_dts;
int64_t tsp_pcr;
int tsp_contentsize;
uint8_t tsp_payload[188];
} tsmuxer_pkt_t;
/**
*
*/
typedef struct tsmuxer_fifo {
struct tsmuxer_pkt_queue tsf_queue;
int tsf_contentsize;
int tsf_len;
} tsmuxer_fifo_t;
/**
* TS Elementary stream
*/
typedef struct tsmuxer_es {
LIST_ENTRY(tsmuxer_es) te_link;
int te_type;
int te_input_index;
char te_lang[4];
int te_pid;
int te_cc;
int te_startcode; // Startcode to use for PES packetization
struct streaming_message_queue te_smq;
int te_lookahead_depth; // bytes in te_smq
int te_lookahead_packets; // packets in te_smq
int64_t te_mux_offset;
int te_vbv_delay;
tsmuxer_fifo_t te_delivery_fifo;
} tsmuxer_es_t;
/**
* TS Multiplexer
*/
typedef struct tsmuxer {
int tsm_run;
struct tsmuxer_es_list tsm_eslist;
int64_t tsm_pcr_start;
int64_t tsm_pcr_ref;
int64_t tsm_pcr_last;
tsmuxer_es_t *tsm_pcr_stream;
streaming_queue_t tsm_input;
streaming_target_t *tsm_output;
pthread_t tsm_thread;
} tsmuxer_t;
/**
*
*/
static void
tmf_enq(tsmuxer_fifo_t *tsf, tsmuxer_pkt_t *tsp)
{
/* record real content size */
tsf->tsf_contentsize += tsp->tsp_contentsize;
/* Enqueue packet */
TAILQ_INSERT_TAIL(&tsf->tsf_queue, tsp, tsp_link);
tsf->tsf_len++;
}
#if 0
/**
*
*/
static void
tsf_remove(tsmuxer_fifo_t *tsf, tsmuxer_pkt_t *tsp)
{
tsf->tsf_contentsize -= tsp->tsp_contentsize;
TAILQ_REMOVE(&tsf->tsf_queue, tsp, tsp_link);
tsf->tsf_len--;
}
/**
*
*/
static tsmuxer_pkt_t *
tsf_deq(tsmuxer_fifo_t *tsf)
{
tsmuxer_pkt_t *tm;
tm = TAILQ_FIRST(&tsf->tsf_queue);
if(tm != NULL)
tsf_remove(tsf, tm);
return tm;
}
#endif
/**
*
*/
static void
tsf_init(tsmuxer_fifo_t *tsf)
{
TAILQ_INIT(&tsf->tsf_queue);
}
#if 0
/**
* Check if we need to start delivery timer for the given stream
*
* Also, if it is the PCR stream and we're not yet runnig, figure out
* PCR and start generating packets
*/
static void
ts_check_deliver(tsmuxer_t *tsm, tsmuxer_es_t *te)
{
int64_t now;
tsmuxer_fifo_t *tsf = &te->te_delivery_fifo;
int64_t next;
if(dtimer_isarmed(&tms->tms_mux_timer))
return; /* timer already running, we're fine */
assert(tms->tms_delivery_fifo.tmf_len != 0);
now = getclock_hires();
if(ts->ts_pcr_ref == AV_NOPTS_VALUE) {
if(ts->ts_pcr_start == AV_NOPTS_VALUE)
return; /* dont know anything yet */
ts->ts_pcr_ref = now - ts->ts_pcr_start + t->tht_pcr_drift;
}
f = TAILQ_FIRST(&tmf->tmf_queue); /* next packet we are going to send */
next = f->tm_deadline + ts->ts_pcr_ref - t->tht_pcr_drift;
if(next < now + 100)
next = now + 100;
dtimer_arm_hires(&tms->tms_mux_timer, ts_deliver, tms, next - 500);
}
#endif
/**
* Generate TS packet, see comments inline
*/
static void
lookahead_dequeue(tsmuxer_t *tsm, tsmuxer_es_t *te)
{
streaming_message_t *sm;
th_pkt_t *pkt;
tsmuxer_pkt_t *tsp;
uint8_t *tsb;
uint16_t u16;
int frrem, pad, tsrem, len, off, cc, hlen, flags;
int64_t t, tdur, toff, tlen, dts, pcr, basedelivery;
tlen = 0;
TAILQ_FOREACH(sm, &te->te_smq, sm_link) {
pkt = sm->sm_data;
tlen += pkt->pkt_payloadlen;
if(pkt->pkt_pts != pkt->pkt_dts) {
tlen += 19; /* pes header with DTS and PTS */
} else {
tlen += 14; /* pes header with PTS only */
}
}
if(tlen == 0)
return;
sm = TAILQ_FIRST(&te->te_smq);
pkt = sm->sm_data;
toff = 0;
/* XXX: assumes duration is linear, but it's probably ok */
tdur = pkt->pkt_duration * te->te_lookahead_packets;
if(te->te_mux_offset == AV_NOPTS_VALUE) {
if(te->te_vbv_delay == -1)
te->te_mux_offset = tdur / 2 - pkt->pkt_duration;
else
te->te_mux_offset = te->te_vbv_delay;
}
if(tsm->tsm_pcr_start == AV_NOPTS_VALUE && tsm->tsm_pcr_stream == te)
tsm->tsm_pcr_start = pkt->pkt_dts - te->te_mux_offset;
basedelivery = pkt->pkt_dts - te->te_mux_offset;
while((sm = TAILQ_FIRST(&te->te_smq)) != NULL) {
off = 0;
pkt = sm->sm_data;
if(pkt->pkt_dts == pkt->pkt_pts) {
hlen = 8;
flags = 0x80;
} else {
hlen = 13;
flags = 0xc0;
}
while(off < pkt->pkt_payloadlen) {
tsp = malloc(sizeof(tsmuxer_pkt_t));
tsp->tsp_deadline = basedelivery + tdur * toff / tlen;
dts = (int64_t)pkt->pkt_duration *
(int64_t)off / (int64_t)pkt->pkt_payloadlen;
dts += pkt->pkt_dts;
tsp->tsp_dts = dts;
tsb = tsp->tsp_payload;
/* TS marker */
*tsb++ = 0x47;
/* Write PID and optionally payload unit start indicator */
*tsb++ = te->te_pid >> 8 | (off ? 0 : 0x40);
*tsb++ = te->te_pid;
cc = te->te_cc & 0xf;
te->te_cc++;
/* Remaing bytes after 4 bytes of TS header */
tsrem = 184;
if(off == 0) {
/* When writing the packet header, shave of a bit of available
payload size */
tsrem -= hlen + 6;
}
/* Remaining length of frame */
frrem = pkt->pkt_payloadlen - off;
/* Compute amout of padding needed */
pad = tsrem - frrem;
pcr = tsp->tsp_deadline;
tsp->tsp_pcr = AV_NOPTS_VALUE;
if(tsm->tsm_pcr_stream == te && tsm->tsm_pcr_last + 20000 < pcr) {
tsp->tsp_pcr = pcr;
/* Insert PCR */
tlen += 8; /* compensate total length */
tsrem -= 8;
pad -= 8;
if(pad < 0)
pad = 0;
*tsb++ = 0x30 | cc;
*tsb++ = 7 + pad;
*tsb++ = 0x10; /* PCR flag */
t = av_rescale_q(pcr, AV_TIME_BASE_Q, mpeg_tc);
*tsb++ = t >> 25;
*tsb++ = t >> 17;
*tsb++ = t >> 9;
*tsb++ = t >> 1;
*tsb++ = (t & 1) << 7;
*tsb++ = 0;
memset(tsb, 0xff, pad);
tsb += pad;
tsrem -= pad;
tsm->tsm_pcr_last = pcr + 20000;
} else if(pad > 0) {
/* Must pad TS packet */
*tsb++ = 0x30 | cc;
tsrem -= pad;
*tsb++ = --pad;
memset(tsb, 0x00, pad);
tsb += pad;
} else {
*tsb++ = 0x10 | cc;
}
if(off == 0) {
/* Insert PES header */
/* Write startcode */
*tsb++ = 0;
*tsb++ = 0;
*tsb++ = te->te_startcode >> 8;
*tsb++ = te->te_startcode;
/* Write total frame length (without accounting for startcode and
length field itself */
len = pkt->pkt_payloadlen + hlen;
if(te->te_type == SCT_MPEG2VIDEO) {
/* It's okay to write len as 0 in transport streams,
but only for video frames, and i dont expect any of the
audio frames to exceed 64k
*/
len = 0;
}
*tsb++ = len >> 8;
*tsb++ = len;
*tsb++ = 0x80; /* MPEG2 */
*tsb++ = flags;
*tsb++ = hlen - 3; /* length of rest of header (pts & dts) */
/* Write PTS */
if(flags == 0xc0) {
t = av_rescale_q(pkt->pkt_pts, AV_TIME_BASE_Q, mpeg_tc);
*tsb++ = (((t >> 30) & 7) << 1) | 1;
u16 = (((t >> 15) & 0x7fff) << 1) | 1;
*tsb++ = u16 >> 8;
*tsb++ = u16;
u16 = ((t & 0x7fff) << 1) | 1;
*tsb++ = u16 >> 8;
*tsb++ = u16;
}
/* Write DTS */
t = av_rescale_q(pkt->pkt_dts, AV_TIME_BASE_Q, mpeg_tc);
*tsb++ = (((t >> 30) & 7) << 1) | 1;
u16 = (((t >> 15) & 0x7fff) << 1) | 1;
*tsb++ = u16 >> 8;
*tsb++ = u16;
u16 = ((t & 0x7fff) << 1) | 1;
*tsb++ = u16 >> 8;
*tsb++ = u16;
}
memcpy(tsb, pkt->pkt_payload + off, tsrem);
tsp->tsp_contentsize = tsrem;
tmf_enq(&te->te_delivery_fifo, tsp);
toff += tsrem;
off += tsrem;
}
te->te_lookahead_depth -= pkt->pkt_payloadlen;
te->te_lookahead_packets--;
pkt_ref_dec(pkt);
TAILQ_REMOVE(&te->te_smq, sm, sm_link);
free(sm);
}
// ts_check_deliver(ts, tms);
}
/**
* Packet input.
*
* We get the entire streaming message cause we might need to hold and
* enqueue the packet. So we can just reuse that allocation from now
* on
*/
static void
tsm_packet_input(tsmuxer_t *tsm, streaming_message_t *sm)
{
tsmuxer_es_t *te;
th_pkt_t *pkt = sm->sm_data;
LIST_FOREACH(te, &tsm->tsm_eslist, te_link)
if(te->te_input_index == pkt->pkt_componentindex)
break;
if(te == NULL) {
// Stream not in use
streaming_msg_free(sm);
return;
}
lookahead_dequeue(tsm, te);
te->te_lookahead_depth += pkt->pkt_payloadlen;
te->te_lookahead_packets++;
TAILQ_INSERT_TAIL(&te->te_smq, sm, sm_link);
}
/**
*
*/
static void
te_destroy(tsmuxer_es_t *te)
{
streaming_queue_clear(&te->te_smq);
LIST_REMOVE(te, te_link);
free(te);
}
/**
*
*/
static void
tsm_start(tsmuxer_t *tsm, const streaming_start_t *ss)
{
int i, sc;
tsmuxer_es_t *te;
tsm->tsm_pcr_start = AV_NOPTS_VALUE;
tsm->tsm_pcr_ref = AV_NOPTS_VALUE;
tsm->tsm_pcr_last = INT64_MIN;
for(i = 0; i < ss->ss_num_components; i++) {
const streaming_start_component_t *ssc = &ss->ss_components[i];
int dopcr = 0;
switch(ssc->ssc_type) {
case SCT_MPEG2VIDEO:
sc = 0x1e0;
dopcr = 1;
break;
case SCT_MPEG2AUDIO:
sc = 0x1c0;
break;
case SCT_AC3:
sc = 0x1bd;
break;
case SCT_H264:
sc = 0x1e0;
dopcr = 1;
break;
default:
continue;
}
te = calloc(1, sizeof(tsmuxer_es_t));
tsf_init(&te->te_delivery_fifo);
memcpy(te->te_lang, ssc->ssc_lang, 4);
te->te_input_index = ssc->ssc_index;
te->te_type = ssc->ssc_type;
te->te_pid = PID_ES_BASE + i;
te->te_startcode = sc;
te->te_mux_offset = AV_NOPTS_VALUE;
TAILQ_INIT(&te->te_smq);
LIST_INSERT_HEAD(&tsm->tsm_eslist, te, te_link);
}
}
/**
*
*/
static void
tsm_stop(tsmuxer_t *tsm)
{
tsmuxer_es_t *te;
while((te = LIST_FIRST(&tsm->tsm_eslist)) != NULL)
te_destroy(te);
}
/**
*
*/
static void
tsm_handle_input(tsmuxer_t *tsm, streaming_message_t *sm)
{
switch(sm->sm_type) {
case SMT_PACKET:
tsm_packet_input(tsm, sm);
sm = NULL;
break;
case SMT_START:
assert(LIST_FIRST(&tsm->tsm_eslist) == NULL);
tsm_start(tsm, sm->sm_data);
break;
case SMT_STOP:
tsm_stop(tsm);
break;
case SMT_TRANSPORT_STATUS:
// htsp_subscription_transport_status(hs, sm->sm_code);
break;
case SMT_NOSOURCE:
// htsp_subscription_status(hs, "No available sources");
break;
case SMT_EXIT:
tsm->tsm_run = 0;
break;
}
if(sm != NULL)
streaming_msg_free(sm);
}
/**
*
*/
static void *
tsm_thread(void *aux)
{
tsmuxer_t *tsm = aux;
streaming_queue_t *sq = &tsm->tsm_input;
streaming_message_t *sm;
int64_t now, next = 0;
tsm->tsm_run = 1;
while(tsm->tsm_run) {
pthread_mutex_lock(&sq->sq_mutex);
sm = TAILQ_FIRST(&sq->sq_queue);
if(sm == NULL) {
if(next == 0) {
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
} else {
struct timespec ts;
ts.tv_sec = next / 1000000;
ts.tv_nsec = (next % 1000000) * 1000;
pthread_cond_timedwait(&sq->sq_cond, &sq->sq_mutex, &ts);
}
}
sm = TAILQ_FIRST(&sq->sq_queue);
if(sm != NULL)
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
pthread_mutex_unlock(&sq->sq_mutex);
now = getmonoclock();
if(sm != NULL)
tsm_handle_input(tsm, sm);
}
return NULL;
}
/**
*
*/
void
tsm_init(void)
{
tsmuxer_t *tsm = calloc(1, sizeof(tsmuxer_t));
streaming_queue_init(&tsm->tsm_input);
pthread_create(&tsm->tsm_thread, NULL, tsm_thread, tsm);
}
#if 0
#define _GNU_SOURCE
#include <stdlib.h>
@ -45,11 +681,6 @@
static void lookahead_dequeue(ts_muxer_t *ts, th_muxstream_t *tms);
static const AVRational mpeg_tc = {1, 90000};
static const AVRational mpeg_tc_27M = {1, 27000000};
#define PID_PMT 1000
#define PID_ES_BASE 2000
/**
* Send current packet
@ -769,3 +1400,4 @@ ts_muxer_pause(ts_muxer_t *ts)
dtimer_disarm(&ts->ts_patpmt_timer);
muxer_pause(ts->ts_muxer);
}
#endif

View file

@ -1,6 +1,6 @@
/*
* tvheadend, MPEG transport stream muxer
* Copyright (C) 2007 Andreas Öman
* Copyright (C) 2008 - 2009 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -19,6 +19,11 @@
#ifndef TSMUX_H
#define TSMUX_H
void tsm_init(void);
#if 0
typedef void (ts_mux_output_t)(void *opaque, th_subscription_t *s,
uint8_t *pkt, int npackets, int64_t pcr_ref);
@ -64,5 +69,7 @@ void ts_muxer_deinit(ts_muxer_t *ts, th_subscription_t *s);
void ts_muxer_play(ts_muxer_t *ts, int64_t toffset);
void ts_muxer_pause(ts_muxer_t *ts);
#endif
#endif /* TSMUX_H */

View file

@ -687,14 +687,13 @@ extern int log_debug;
static inline int64_t
getclock_hires(void)
getmonoclock(void)
{
int64_t now;
struct timeval tv;
struct timespec tp;
gettimeofday(&tv, NULL);
now = (uint64_t)tv.tv_sec * 1000000ULL + (uint64_t)tv.tv_usec;
return now;
clock_gettime(CLOCK_MONOTONIC, &tp);
return tp.tv_sec * 1000000ULL + (tp.tv_nsec / 1000);
}
@ -717,4 +716,7 @@ extern void scopedunlock(pthread_mutex_t **mtxp);
char *tvh_b = alloca(tvh_l + 1); \
memcpy(tvh_b, n, tvh_l + 1); })
#define tvh_strlcatf(buf, size, fmt...) \
snprintf((buf) + strlen(buf), (size) - strlen(buf), fmt)
#endif /* TV_HEAD_H */