diff --git a/src/input/mpegts/iptv/iptv.c b/src/input/mpegts/iptv/iptv.c index 689d34f4..d6b96d86 100644 --- a/src/input/mpegts/iptv/iptv.c +++ b/src/input/mpegts/iptv/iptv.c @@ -98,6 +98,63 @@ const idclass_t iptv_input_class = { } }; +static int +iptv_input_is_free ( mpegts_input_t *mi ) +{ + int c = 0; + mpegts_mux_instance_t *mmi; + + LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) + c++; + + /* Limit reached */ + if (iptv_network.in_max_streams && c >= iptv_network.in_max_streams) { + return 0; + } + + /* Bandwidth reached */ + if (iptv_network.in_bw_limited) { + return 0; + } + + return 1; +} + +static int +iptv_input_get_weight ( mpegts_input_t *mi ) +{ + int c = 0, w = 0; + const th_subscription_t *ths; + const service_t *s; + const mpegts_mux_instance_t *mmi; + LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) + c++; + + /* Find the "min" weight */ + if (!iptv_input_is_free(mi)) { + w = 1000000; + + /* Direct subs */ + LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) { + LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) { + w = MIN(w, ths->ths_weight); + } + } + + /* Service subs */ + pthread_mutex_lock(&mi->mi_delivery_mutex); + LIST_FOREACH(s, &mi->mi_transports, s_active_link) { + LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) { + w = MIN(w, ths->ths_weight); + } + } + pthread_mutex_unlock(&mi->mi_delivery_mutex); + } + + return w; + +} + static int iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) { @@ -112,6 +169,25 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) if (im->mm_active) return 0; + /* Do we need to stop something? */ + if (!iptv_input_is_free(mi)) { + pthread_mutex_lock(&mi->mi_delivery_mutex); + mpegts_mux_instance_t *m, *s = NULL; + int w = 1000000; + LIST_FOREACH(m, &mi->mi_mux_active, mmi_active_link) { + int t = mpegts_mux_instance_weight(m); + if (t < w) { + s = m; + w = t; + } + } + pthread_mutex_unlock(&mi->mi_delivery_mutex); + + /* Stop */ + if (s) + s->mmi_mux->mm_stop(s->mmi_mux, 1); + } + /* Parse URL */ im->mm_display_name((mpegts_mux_t*)im, buf, sizeof(buf)); if (urlparse(im->mm_iptv_url ?: "", &url)) { @@ -161,21 +237,12 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) free(im->mm_iptv_tsb); im->mm_iptv_tsb = NULL; + /* Clear bw limit */ + iptv_network.in_bw_limited = 0; + pthread_mutex_unlock(&iptv_lock); } -static int -iptv_input_is_free ( mpegts_input_t *mi ) -{ - return 1; // unlimited number of muxes -} - -static int -iptv_input_get_weight ( mpegts_input_t *mi ) -{ - return 0; // unlimited number of muxes -} - static void iptv_input_display_name ( mpegts_input_t *mi, char *buf, size_t len ) { @@ -216,13 +283,7 @@ iptv_input_thread ( void *aux ) im->im_handler->stop(im); goto done; } - - /* Pass on */ - im->mm_iptv_pos - = mpegts_input_recv_packets((mpegts_input_t*)&iptv_input, - im->mm_active, - im->mm_iptv_tsb+off, len, - NULL, NULL, "iptv"); + iptv_input_recv_packets(im, off, len); done: pthread_mutex_unlock(&iptv_lock); @@ -230,6 +291,33 @@ done: return NULL; } +void +iptv_input_recv_packets ( iptv_mux_t *im, size_t off, size_t len ) +{ + static time_t t1 = 0, t2; + iptv_network.in_bps += len * 8; + time(&t2); + if (t2 != t1) { + if (iptv_network.in_max_bandwidth && + iptv_network.in_bps > iptv_network.in_max_bandwidth * 1024) { + if (!iptv_network.in_bw_limited) { + tvhinfo("iptv", "bandwidth limited exceeded"); + iptv_network.in_bw_limited = 1; + } + } + iptv_network.in_bps = 0; + t1 = t2; + } + + /* Pass on */ + im->mm_iptv_pos + = mpegts_input_recv_packets((mpegts_input_t*)&iptv_input, + im->mm_active, + im->mm_iptv_tsb + off, + im->mm_iptv_pos + len - off, + NULL, NULL, "iptv"); +} + void iptv_input_mux_started ( iptv_mux_t *im ) { @@ -277,6 +365,20 @@ const idclass_t iptv_network_class = { .ic_class = "iptv_network", .ic_caption = "IPTV Network", .ic_properties = (const property_t[]){ + { + .type = PT_U32, + .id = "max_streams", + .name = "Max Input Streams", + .off = offsetof(iptv_network_t, in_max_streams), + .def.i = 0, + }, + { + .type = PT_U32, + .id = "max_bandwidth", + .name = "Max Bandwidth (Kbps)", + .off = offsetof(iptv_network_t, in_max_bandwidth), + .def.i = 0, + }, {} } }; diff --git a/src/input/mpegts/iptv/iptv_http.c b/src/input/mpegts/iptv/iptv_http.c index 91ef3860..7ba5f2b1 100644 --- a/src/input/mpegts/iptv/iptv_http.c +++ b/src/input/mpegts/iptv/iptv_http.c @@ -52,12 +52,7 @@ iptv_http_data memcpy(tsb, buf, len); - im->mm_iptv_pos - = mpegts_input_recv_packets((mpegts_input_t*)&iptv_input, - im->mm_active, - im->mm_iptv_tsb, - im->mm_iptv_pos + len, - NULL, NULL, "iptv"); + iptv_input_recv_packets(im, 0, len); pthread_mutex_unlock(&iptv_lock); diff --git a/src/input/mpegts/iptv/iptv_private.h b/src/input/mpegts/iptv/iptv_private.h index 0f459146..debb246e 100644 --- a/src/input/mpegts/iptv/iptv_private.h +++ b/src/input/mpegts/iptv/iptv_private.h @@ -57,10 +57,17 @@ struct iptv_input }; void iptv_input_mux_started ( iptv_mux_t *im ); +void iptv_input_recv_packets ( iptv_mux_t *im, size_t off, size_t len ); struct iptv_network { mpegts_network_t; + + int in_bps; + int in_bw_limited; + + uint32_t in_max_streams; + uint32_t in_max_bandwidth; }; struct iptv_mux diff --git a/src/input/mpegts/iptv/iptv_udp.c b/src/input/mpegts/iptv/iptv_udp.c index a490edd4..68586253 100644 --- a/src/input/mpegts/iptv/iptv_udp.c +++ b/src/input/mpegts/iptv/iptv_udp.c @@ -217,7 +217,7 @@ iptv_rtp_read ( iptv_mux_t *im, size_t *off ) /* OK */ *off = hlen; - return len-hlen; + return len; } /*