iptv: added new configuration to allow subs and bandwidth limitations

The bandwidth limit is a soft one, it will stop any new subscriptions if
the peak bandwidth on the current services exceeds the limit. Possibly
this will need a bit of additional averaging in future to stop false
triggers.
This commit is contained in:
Adam Sutton 2013-11-24 20:32:15 +00:00
parent 5fede076db
commit ccc6e4f58a
4 changed files with 130 additions and 26 deletions

View file

@ -98,6 +98,63 @@ const idclass_t iptv_input_class = {
}
};
static int
iptv_input_is_free ( mpegts_input_t *mi )
{
int c = 0;
mpegts_mux_instance_t *mmi;
LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
c++;
/* Limit reached */
if (iptv_network.in_max_streams && c >= iptv_network.in_max_streams) {
return 0;
}
/* Bandwidth reached */
if (iptv_network.in_bw_limited) {
return 0;
}
return 1;
}
static int
iptv_input_get_weight ( mpegts_input_t *mi )
{
int c = 0, w = 0;
const th_subscription_t *ths;
const service_t *s;
const mpegts_mux_instance_t *mmi;
LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link)
c++;
/* Find the "min" weight */
if (!iptv_input_is_free(mi)) {
w = 1000000;
/* Direct subs */
LIST_FOREACH(mmi, &mi->mi_mux_active, mmi_active_link) {
LIST_FOREACH(ths, &mmi->mmi_subs, ths_mmi_link) {
w = MIN(w, ths->ths_weight);
}
}
/* Service subs */
pthread_mutex_lock(&mi->mi_delivery_mutex);
LIST_FOREACH(s, &mi->mi_transports, s_active_link) {
LIST_FOREACH(ths, &s->s_subscriptions, ths_service_link) {
w = MIN(w, ths->ths_weight);
}
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
}
return w;
}
static int
iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
{
@ -112,6 +169,25 @@ iptv_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
if (im->mm_active)
return 0;
/* Do we need to stop something? */
if (!iptv_input_is_free(mi)) {
pthread_mutex_lock(&mi->mi_delivery_mutex);
mpegts_mux_instance_t *m, *s = NULL;
int w = 1000000;
LIST_FOREACH(m, &mi->mi_mux_active, mmi_active_link) {
int t = mpegts_mux_instance_weight(m);
if (t < w) {
s = m;
w = t;
}
}
pthread_mutex_unlock(&mi->mi_delivery_mutex);
/* Stop */
if (s)
s->mmi_mux->mm_stop(s->mmi_mux, 1);
}
/* Parse URL */
im->mm_display_name((mpegts_mux_t*)im, buf, sizeof(buf));
if (urlparse(im->mm_iptv_url ?: "", &url)) {
@ -161,21 +237,12 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
free(im->mm_iptv_tsb);
im->mm_iptv_tsb = NULL;
/* Clear bw limit */
iptv_network.in_bw_limited = 0;
pthread_mutex_unlock(&iptv_lock);
}
static int
iptv_input_is_free ( mpegts_input_t *mi )
{
return 1; // unlimited number of muxes
}
static int
iptv_input_get_weight ( mpegts_input_t *mi )
{
return 0; // unlimited number of muxes
}
static void
iptv_input_display_name ( mpegts_input_t *mi, char *buf, size_t len )
{
@ -216,13 +283,7 @@ iptv_input_thread ( void *aux )
im->im_handler->stop(im);
goto done;
}
/* Pass on */
im->mm_iptv_pos
= mpegts_input_recv_packets((mpegts_input_t*)&iptv_input,
im->mm_active,
im->mm_iptv_tsb+off, len,
NULL, NULL, "iptv");
iptv_input_recv_packets(im, off, len);
done:
pthread_mutex_unlock(&iptv_lock);
@ -230,6 +291,33 @@ done:
return NULL;
}
void
iptv_input_recv_packets ( iptv_mux_t *im, size_t off, size_t len )
{
static time_t t1 = 0, t2;
iptv_network.in_bps += len * 8;
time(&t2);
if (t2 != t1) {
if (iptv_network.in_max_bandwidth &&
iptv_network.in_bps > iptv_network.in_max_bandwidth * 1024) {
if (!iptv_network.in_bw_limited) {
tvhinfo("iptv", "bandwidth limited exceeded");
iptv_network.in_bw_limited = 1;
}
}
iptv_network.in_bps = 0;
t1 = t2;
}
/* Pass on */
im->mm_iptv_pos
= mpegts_input_recv_packets((mpegts_input_t*)&iptv_input,
im->mm_active,
im->mm_iptv_tsb + off,
im->mm_iptv_pos + len - off,
NULL, NULL, "iptv");
}
void
iptv_input_mux_started ( iptv_mux_t *im )
{
@ -277,6 +365,20 @@ const idclass_t iptv_network_class = {
.ic_class = "iptv_network",
.ic_caption = "IPTV Network",
.ic_properties = (const property_t[]){
{
.type = PT_U32,
.id = "max_streams",
.name = "Max Input Streams",
.off = offsetof(iptv_network_t, in_max_streams),
.def.i = 0,
},
{
.type = PT_U32,
.id = "max_bandwidth",
.name = "Max Bandwidth (Kbps)",
.off = offsetof(iptv_network_t, in_max_bandwidth),
.def.i = 0,
},
{}
}
};

View file

@ -52,12 +52,7 @@ iptv_http_data
memcpy(tsb, buf, len);
im->mm_iptv_pos
= mpegts_input_recv_packets((mpegts_input_t*)&iptv_input,
im->mm_active,
im->mm_iptv_tsb,
im->mm_iptv_pos + len,
NULL, NULL, "iptv");
iptv_input_recv_packets(im, 0, len);
pthread_mutex_unlock(&iptv_lock);

View file

@ -57,10 +57,17 @@ struct iptv_input
};
void iptv_input_mux_started ( iptv_mux_t *im );
void iptv_input_recv_packets ( iptv_mux_t *im, size_t off, size_t len );
struct iptv_network
{
mpegts_network_t;
int in_bps;
int in_bw_limited;
uint32_t in_max_streams;
uint32_t in_max_bandwidth;
};
struct iptv_mux

View file

@ -217,7 +217,7 @@ iptv_rtp_read ( iptv_mux_t *im, size_t *off )
/* OK */
*off = hlen;
return len-hlen;
return len;
}
/*