mpegts: some further updates to get things working properly

Started to get basic IPTV input working, though it still needs
lots of work to add in the missing stuff.
This commit is contained in:
Adam Sutton 2013-05-01 09:46:06 +01:00
parent f763c30fff
commit 15536d8296
10 changed files with 117 additions and 57 deletions

View file

@ -108,7 +108,6 @@ SRCS = src/version.c \
src/tvhtime.c \
src/descrambler/descrambler.c \
src/serviceprobe.c \
src/input.c \
SRCS += \
src/parsers/parsers.c \

View file

@ -28,6 +28,12 @@
#if ENABLE_TSFILE
#include "input/mpegts/tsfile.h"
#endif
#if ENABLE_IPTV
#include "input/mpegts/iptv.h"
#endif
#if ENABLE_LINUXDVB
#include "input/mpegts/linuxdvb.h"
#endif
#endif
void input_init ( void );

View file

@ -255,6 +255,7 @@ struct mpegts_service
{
service_t; // Parent
#if 0 // TODO: temporarily moved to service_t
/**
* PID carrying the programs PCR.
* XXX: We don't support transports that does not carry
@ -266,6 +267,7 @@ struct mpegts_service
* PID for the PMT of this MPEG-TS stream.
*/
uint16_t s_pmt_pid;
#endif
/*
* Fields defined by DVB standard EN 300 468

View file

@ -24,7 +24,7 @@
#include <sys/epoll.h>
#include <assert.h>
#include <regex.h>
#include <unistd.h>
/*
* Globals
@ -35,6 +35,10 @@ int iptv_poll_fd;
pthread_t iptv_thread;
pthread_mutex_t iptv_lock;
/*
* URL processing - TODO: move to a library
*/
typedef struct url
{
char buf[2048];
@ -67,58 +71,64 @@ url_parse ( url_t *up, const char *urlstr )
/* Port */
up->port = 0;
if (!(t2 = strstr(up->host, "::"))) {
if (!(t2 = strstr(up->host, ":"))) {
if (!strcmp(up->scheme, "https"))
up->port = 443;
else if (!strcmp(up->scheme, "http"))
up->port = 80;
} else {
*t2 = 0;
up->port = atoi(t2+1);
}
return 0;
}
static int http_connect (const char *urlstr, htsbuf_queue_t *spill)
/*
* HTTP client
*/
static int http_connect (const char *urlstr)
{
int fd, c;
int fd, c, i;
char buf[1024];
url_t url;
if (url_parse(&url, urlstr))
return -1;
/* Make connection */
// TODO: move connection to thread
// TODO: this is really only for testing and to allow use of TVH webserver as input
tvhlog(LOG_DEBUG, "iptv", "connecting to http %s %d", url.host, url.port);
fd = tcp_connect(url.host, url.port, buf, sizeof(buf), 10);
if (!fd)
if (fd < 0) {
tvhlog(LOG_ERR, "iptv", "tcp_connect() failed %s", buf);
return -1;
}
/* Send request (VERY basic) */
c = snprintf(buf, sizeof(buf), "GET %s HTTP/1.1\r\n", urlstr);
c = snprintf(buf, sizeof(buf), "GET /%s HTTP/1.1\r\n", url.path);
tvh_write(fd, buf, c);
c = snprintf(buf, sizeof(buf), "Hostname: %s\r\n", url.host);
tvh_write(fd, buf, c);
tvh_write(fd, "\r\n", 2);
/* Read back header */
htsbuf_queue_flush(spill);
while ((c = tcp_read_line(fd, buf, sizeof(buf), spill)))
if (!*buf) break;
// TODO: do this properly
i = 0;
while (1) {
if (!(c = read(fd, buf+i, 1)))
continue;
i++;
if (i == 4 && !strncmp(buf, "\r\n\r\n", 4))
break;
memmove(buf, buf+1, 3); i = 3;
}
printf("DONE\n");
return fd;
}
/*
* HTTP
*/
static int
iptv_input_start_http ( iptv_mux_t *im )
{
int ret = SM_CODE_TUNING_FAILED;
/* Setup connection */
im->mm_iptv_fd = http_connect(im->mm_iptv_url, &im->mm_iptv_spill);
return ret;
}
/*
* Input definition
*/
@ -131,6 +141,18 @@ const idclass_t iptv_input_class = {
}
};
/*
* HTTP
*/
static int
iptv_input_start_http ( iptv_mux_t *im )
{
/* Setup connection */
im->mm_iptv_fd = http_connect(im->mm_iptv_url);
return im->mm_iptv_fd <= 0 ? SM_CODE_TUNING_FAILED : 0;
}
static int
iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
{
@ -142,7 +164,8 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
if (!im->mm_active) {
/* HTTP */
if (!strcmp(im->mm_iptv_url, "http"))
// TODO: this needs to happen in a thread
if (!strncmp(im->mm_iptv_url, "http", 4))
ret = iptv_input_start_http(im);
/* OK */
@ -153,6 +176,12 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
ev.data.fd = im->mm_iptv_fd;
epoll_ctl(iptv_poll_fd, EPOLL_CTL_ADD, im->mm_iptv_fd, &ev);
im->mm_active = mmi;
/* Install table handlers */
mpegts_table_add(mmi->mmi_mux, 0x0, 0xff, psi_pat_callback, NULL, "pat",
MT_QUICKREQ| MT_CRC, 0);
// TODO: need to fire mux start event
}
}
pthread_mutex_unlock(&iptv_lock);
@ -163,6 +192,19 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
static void
iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
{
iptv_mux_t *im = (iptv_mux_t*)mmi->mmi_mux;
assert(mmi == &im->mm_iptv_instance);
pthread_mutex_lock(&iptv_lock);
if (im->mm_active) {
// TODO: multicast will require additional work
close(im->mm_iptv_fd); // removes from epoll
im->mm_iptv_fd = -1;
}
pthread_mutex_unlock(&iptv_lock);
}
static int
@ -180,8 +222,7 @@ iptv_input_current_weight ( mpegts_input_t *mi )
static void *
iptv_input_thread ( void *aux )
{
#if 0
int nfds, fd;
int nfds, fd, r, pos = 0;
uint8_t tsb[65536];
struct epoll_event ev;
iptv_mux_t *im;
@ -200,7 +241,7 @@ iptv_input_thread ( void *aux )
/* Read data */
fd = ev.data.fd;
r = read(fd, tsb+c, sizeof(tsb)-c);
r = read(fd, tsb+pos, sizeof(tsb)-pos);
/* Error */
if (r < 0) {
@ -212,9 +253,10 @@ iptv_input_thread ( void *aux )
pthread_mutex_lock(&iptv_lock);
/* Find mux */
LIST_FOREACH(mm, &iptv_network.mn_muxes, mm_network_link)
if (((iptv_mux_t*)mm)->im_fd == fd)
LIST_FOREACH(mm, &iptv_network.mn_muxes, mm_network_link) {
if (((iptv_mux_t*)mm)->mm_iptv_fd == fd)
break;
}
if (!mm) {
pthread_mutex_unlock(&iptv_lock);
continue;
@ -223,14 +265,13 @@ iptv_input_thread ( void *aux )
/* Raw TS */
if (!strncmp(im->mm_iptv_url, "http", 4)) {
mpegts_input_recv_packets((mpegts_input_t*)&iptv_input,
&im->mm_mux_instance,
tsb, c, NULL, NULL);
pos = mpegts_input_recv_packets((mpegts_input_t*)&iptv_input,
&im->mm_iptv_instance,
tsb, r, NULL, NULL);
}
pthread_mutex_lock(&iptv_unlock);
pthread_mutex_unlock(&iptv_lock);
}
#endif
return NULL;
}
@ -266,6 +307,8 @@ iptv_network_create_service
*/
void iptv_init ( void )
{
pthread_t tid;
/* Init Input */
mpegts_input_create0((mpegts_input_t*)&iptv_input,
&iptv_input_class, NULL);
@ -284,7 +327,10 @@ void iptv_init ( void )
mpegts_network_add_input((mpegts_network_t*)&iptv_network,
(mpegts_input_t*)&iptv_input);
/* Setup thread */
/* Set table thread */
pthread_create(&tid, NULL, mpegts_input_table_thread, &iptv_input);
/* Setup TS thread */
// TODO: could set this up only when needed
iptv_poll_fd = epoll_create(10);
pthread_mutex_init(&iptv_lock, NULL);

View file

@ -51,10 +51,9 @@ iptv_mux_create ( const char *uuid, const char *url )
iptv_mux_t *im =
mpegts_mux_create(iptv_mux, NULL,
(mpegts_network_t*)&iptv_network,
MM_ONID_NONE, MM_TSID_NONE);
MPEGTS_ONID_NONE, MPEGTS_TSID_NONE);
if (url)
im->mm_iptv_url = strdup(url);
htsbuf_queue_init(&im->mm_iptv_spill, 65536);
/* Create Instance */
mpegts_mux_instance_create0(&im->mm_iptv_instance,
@ -78,7 +77,7 @@ iptv_mux_load_one ( iptv_mux_t *im, htsmsg_t *c )
mpegts_mux_load_one((mpegts_mux_t*)im, c);
/* URL */
if ((str = htsmsg_get_str(c, "url")))
if ((str = htsmsg_get_str(c, "iptv_url")))
tvh_str_update(&im->mm_iptv_url, str);
}
@ -91,10 +90,13 @@ iptv_mux_load_all ( void )
if ((s = hts_settings_load_r(1, "input/mpegts/iptv/muxes"))) {
HTSMSG_FOREACH(f, s) {
if (!(m = htsmsg_get_map_by_field(f))) {
if (!(m = htsmsg_get_map_by_field(f)) || !(m = htsmsg_get_map(m, "config"))) {
tvhlog(LOG_ERR, "iptv", "failed to load mux config %s", f->hmf_name);
continue;
}
printf("UUID: %s\n", f->hmf_name);
printf("CONFIG:\n");
htsmsg_print(m);
/* Create */
im = iptv_mux_create(f->hmf_name, NULL);

View file

@ -44,7 +44,6 @@ struct iptv_mux
mpegts_mux_t;
int mm_iptv_fd;
htsbuf_queue_t mm_iptv_spill;
mpegts_mux_instance_t mm_iptv_instance;
char *mm_iptv_url;
};

View file

@ -168,6 +168,7 @@ mpegts_mux_start ( mpegts_mux_t *mm, const char *reason, int weight )
/* Tune */
if (!mmi->mmi_input->mi_start_mux(mmi->mmi_input, mmi))
break;
tvhtrace("mpegts", "failed to run mmi %p", mmi);
}
/* Initial scanning */
@ -219,7 +220,7 @@ mpegts_mux_load_one ( mpegts_mux_t *mm, htsmsg_t *c )
mpegts_mux_t *
mpegts_mux_create0
( mpegts_mux_t *mm, const idclass_t *class, const char *uuid,
mpegts_network_t *net, uint16_t onid, uint16_t tsid )
mpegts_network_t *mn, uint16_t onid, uint16_t tsid )
{
idnode_insert(&mm->mm_id, uuid, class);
@ -228,7 +229,8 @@ mpegts_mux_create0
mm->mm_tsid = tsid;
/* Add to network */
mm->mm_network = net;
LIST_INSERT_HEAD(&mn->mn_muxes, mm, mm_network_link);
mm->mm_network = mn;
mm->mm_start = mpegts_mux_start;
mpegts_mux_initial_scan_link(mm);

View file

@ -698,8 +698,6 @@ main(int argc, char **argv)
access_init(opt_firstrun, opt_noacl);
input_init();
#if ENABLE_TIMESHIFT
timeshift_init();
#endif
@ -726,6 +724,9 @@ main(int argc, char **argv)
tsfile_add_file(opt_tsfile.str[i]);
}
#endif
#if ENABLE_IPTV
iptv_init();
#endif
if(opt_subscribe != NULL)
subscription_dummy_join(opt_subscribe, 1);

View file

@ -916,10 +916,8 @@ service_build_stream_start(service_t *t)
t->s_setsourceinfo(t, &ss->ss_si);
ss->ss_refcount = 1;
#ifdef MOVE_TO_MPEGTS
ss->ss_pcr_pid = t->s_pcr_pid;
ss->ss_pmt_pid = t->s_pmt_pid;
#endif
return ss;
}

View file

@ -228,22 +228,27 @@ typedef struct service {
S_OTHER,
} s_source_type;
/**
// TODO: should this really be here?
/**
* PID carrying the programs PCR.
* XXX: We don't support transports that does not carry
* the PCR in one of the content streams.
*/
uint16_t s_pcr_pid;
/**
* PID for the PMT of this MPEG-TS stream.
*/
uint16_t s_pmt_pid;
/**
* Set if transport is enabled (the default). If disabled it should
* not be considered when chasing for available transports during
* subscription scheduling.
*/
int s_enabled;
#ifdef MOVE_TO_RAWTS
/**
* Last PCR seen, we use it for a simple clock for rawtsinput.c
*/
int64_t s_pcr_last;
int64_t s_pcr_last_realtime;
#endif
LIST_ENTRY(service) s_active_link;
LIST_HEAD(, th_subscription) s_subscriptions;