iptv: update to use new mpegts_input API

This commit is contained in:
Adam Sutton 2014-04-11 23:46:16 +01:00
parent f579279c1b
commit aa29bca6ed
4 changed files with 33 additions and 47 deletions

View file

@ -148,13 +148,13 @@ iptv_input_get_weight ( mpegts_input_t *mi )
}
/* Service subs */
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
w = MIN(w, ths->ths_weight);
}
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
}
return w;
@ -176,7 +176,7 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
/* Do we need to stop something? */
if (!iptv_input_is_free(mi)) {
pthread_mutex_lock(&mi->mi_delivery_mutex);
pthread_mutex_lock(&mi->mi_output_lock);
mpegts_mux_instance_t *m, *s = NULL;
int w = 1000000;
LIST_FOREACH(m, &mi->mi_mux_active, mmi_active_link) {
@ -186,7 +186,7 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
w = t;
}
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
pthread_mutex_unlock(&mi->mi_output_lock);
/* Stop */
if (s)
@ -244,8 +244,7 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
}
/* Free memory */
free(im->mm_iptv_tsb);
im->mm_iptv_tsb = NULL;
free(im->mm_iptv_buffer.sb_data);
/* Clear bw limit */
LIST_FOREACH(mnl, &mi->mi_networks, mnl_mi_link) {
@ -266,7 +265,7 @@ static void *
iptv_input_thread ( void *aux )
{
int nfds;
ssize_t len;
ssize_t n;
size_t off;
iptv_mux_t *im;
tvhpoll_event_t ev;
@ -293,12 +292,12 @@ iptv_input_thread ( void *aux )
/* Get data */
off = 0;
if ((len = im->im_handler->read(im, &off)) < 0) {
if ((n = im->im_handler->read(im, &off)) < 0) {
tvhlog(LOG_ERR, "iptv", "read() error %s", strerror(errno));
im->im_handler->stop(im);
goto done;
}
iptv_input_recv_packets(im, off, len);
iptv_input_recv_packets(im, n, off);
done:
pthread_mutex_unlock(&iptv_lock);
@ -307,7 +306,7 @@ done:
}
void
iptv_input_recv_packets ( iptv_mux_t *im, size_t off, size_t len )
iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len, size_t off )
{
static time_t t1 = 0, t2;
iptv_network_t *in = (iptv_network_t*)im->mm_network;
@ -327,12 +326,8 @@ iptv_input_recv_packets ( iptv_mux_t *im, size_t off, size_t len )
}
/* Pass on */
im->mm_iptv_pos
= mpegts_input_recv_packets((mpegts_input_t*)iptv_input,
im->mm_active,
im->mm_iptv_tsb + off,
im->mm_iptv_pos + len - off,
NULL, NULL, "iptv");
mpegts_input_recv_packets((mpegts_input_t*)iptv_input, im->mm_active,
&im->mm_iptv_buffer, off, NULL, NULL);
}
void
@ -343,8 +338,7 @@ iptv_input_mux_started ( iptv_mux_t *im )
im->mm_display_name((mpegts_mux_t*)im, buf, sizeof(buf));
/* Allocate input buffer */
im->mm_iptv_pos = 0;
im->mm_iptv_tsb = calloc(1, IPTV_PKT_SIZE);
sbuf_init_fixed(&im->mm_iptv_buffer, IPTV_PKT_SIZE);
/* Setup poll */
if (im->mm_iptv_fd > 0) {
@ -538,9 +532,6 @@ void iptv_init ( void )
/* Init Network */
iptv_network_init();
/* Set table thread */
mpegts_input_table_thread_start((mpegts_input_t *)iptv_input);
/* Setup TS thread */
iptv_poll = tvhpoll_create(10);
pthread_mutex_init(&iptv_lock, NULL);
@ -549,12 +540,12 @@ void iptv_init ( void )
void iptv_done ( void )
{
mpegts_input_table_thread_stop((mpegts_input_t *)iptv_input);
pthread_kill(iptv_thread, SIGTERM);
pthread_join(iptv_thread, NULL);
tvhpoll_destroy(iptv_poll);
pthread_mutex_lock(&global_lock);
mpegts_network_unregister_builder(&iptv_network_class);
mpegts_input_stop_all((mpegts_input_t*)iptv_input);
mpegts_input_delete((mpegts_input_t *)iptv_input, 0);
pthread_mutex_unlock(&global_lock);
}

View file

@ -41,22 +41,18 @@ static size_t
iptv_http_data
( void *p, void *buf, size_t len )
{
uint8_t *tsb;
size_t ret = len;
iptv_mux_t *im = p;
pthread_mutex_lock(&iptv_lock);
tsb = im->mm_iptv_tsb + im->mm_iptv_pos;
len = MIN(len, IPTV_PKT_SIZE - im->mm_iptv_pos);
sbuf_append(&im->mm_iptv_buffer, buf, len);
memcpy(tsb, buf, len);
iptv_input_recv_packets(im, 0, len);
if (len > 0)
iptv_input_recv_packets(im, len, 0);
pthread_mutex_unlock(&iptv_lock);
return ret;
return len;
}
/*

View file

@ -39,10 +39,6 @@ struct iptv_handler
const char *scheme;
int (*start) ( iptv_mux_t *im, const url_t *url );
void (*stop) ( iptv_mux_t *im );
// Return number of available bytes, with optional offset
// from start of mux buffer (useful for things with wrapper
// around TS)
ssize_t (*read) ( iptv_mux_t *im, size_t *off );
RB_ENTRY(iptv_handler) link;
@ -56,7 +52,7 @@ struct iptv_input
};
void iptv_input_mux_started ( iptv_mux_t *im );
void iptv_input_recv_packets ( iptv_mux_t *im, size_t off, size_t len );
void iptv_input_recv_packets ( iptv_mux_t *im, ssize_t len, size_t off );
struct iptv_network
{
@ -81,11 +77,10 @@ struct iptv_mux
int mm_iptv_atsc;
uint8_t *mm_iptv_tsb;
int mm_iptv_pos;
char *mm_iptv_svcname;
sbuf_t mm_iptv_buffer;
iptv_handler_t *im_handler;
void *im_data;

View file

@ -182,10 +182,10 @@ static ssize_t
iptv_udp_read ( iptv_mux_t *im, size_t *off )
{
/* UDP/RTP should not have TS packets straddling datagrams, I think! */
im->mm_iptv_pos = 0;
im->mm_iptv_buffer.sb_ptr = 0;
/* Read */
return read(im->mm_iptv_fd, im->mm_iptv_tsb, IPTV_PKT_SIZE);
return sbuf_read(&im->mm_iptv_buffer, im->mm_iptv_fd);
}
static ssize_t
@ -201,22 +201,26 @@ iptv_rtp_read ( iptv_mux_t *im, size_t *off )
/* Strip RTP header */
if (len < 12)
return 0; // ignore
if ((im->mm_iptv_tsb[0] & 0xC0) != 0x80)
if ((im->mm_iptv_buffer.sb_data[0] & 0xC0) != 0x80)
return 0;
if ((im->mm_iptv_tsb[1] & 0x7F) != 33)
if ((im->mm_iptv_buffer.sb_data[1] & 0x7F) != 33)
return 0;
hlen = ((im->mm_iptv_tsb[0] & 0xf) * 4) + 12;
if (im->mm_iptv_tsb[0] & 0x10) {
hlen = ((im->mm_iptv_buffer.sb_data[0] & 0xf) * 4) + 12;
if (im->mm_iptv_buffer.sb_data[0] & 0x10) {
if (len < hlen+4)
return 0;
hlen += (im->mm_iptv_tsb[hlen+2] << 8) | (im->mm_iptv_tsb[hlen+3]*4);
hlen += (im->mm_iptv_buffer.sb_data[hlen+2] << 8)
| (im->mm_iptv_buffer.sb_data[hlen+3] * 4);
hlen += 4;
}
if (len < hlen || ((len - hlen) % 188) != 0)
return 0;
/* OK */
*off = hlen;
/* Cut */
sbuf_cut(&im->mm_iptv_buffer, hlen);
// TODO: would be nice to avoid this extra copy, it was possible until I
// changed the API!
return len;
}