From 10854c1cf4fb7935dd46a508f90841633f7eb6ad Mon Sep 17 00:00:00 2001 From: Adam Sutton Date: Tue, 10 Sep 2013 16:18:30 +0100 Subject: [PATCH] subscription: add in/out bandwidth indicator Now have the concept of both input and output bandwidth usage of a sub. The reason for this is that while data may be flowing into the subscription, i.e. a signal is being received. Things like timeshift, or even potentially flow control, may mean the actual outgoing rate is quite different. Input rate is added by the subscription code as data enters the sub, however the output rate must be added by the subscription handler as close to the client as possible. --- src/dvr/dvr_rec.c | 9 ++++++++- src/htsp_server.c | 34 ++++++++++++++++++---------------- src/subscriptions.c | 10 ++++++---- src/subscriptions.h | 3 ++- src/webui/static/app/status.js | 19 ++++++++++++++----- src/webui/webui.c | 14 +++++++++----- 6 files changed, 57 insertions(+), 32 deletions(-) diff --git a/src/dvr/dvr_rec.c b/src/dvr/dvr_rec.c index 34256b18..ba1e880c 100644 --- a/src/dvr/dvr_rec.c +++ b/src/dvr/dvr_rec.c @@ -33,6 +33,7 @@ #include "plumbing/tsfix.h" #include "plumbing/globalheaders.h" #include "htsp_server.h" +#include "atomic.h" #include "muxer.h" @@ -422,7 +423,13 @@ dvr_thread(void *aux) pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); continue; } - + + if (de->de_s && started && + (sm->sm_type == SMT_PACKET || sm->sm_type == SMT_MPEGTS)) { + th_pkt_t *pkt = sm->sm_data; + atomic_add(&de->de_s->ths_bytes_out, pktbuf_len(pkt->pkt_payload)); + } + TAILQ_REMOVE(&sq->sq_queue, sm, sm_link); pthread_mutex_unlock(&sq->sq_mutex); diff --git a/src/htsp_server.c b/src/htsp_server.c index 20689f2f..9a5bf727 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -16,23 +16,8 @@ * along with this program. If not, see . */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - #include "tvheadend.h" +#include "atomic.h" #include "channels.h" #include "subscriptions.h" #include "tcp.h" @@ -51,6 +36,22 @@ #if ENABLE_LIBAV #include "plumbing/transcoding.h" #endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include "settings.h" #include @@ -2407,6 +2408,7 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt) htsmsg_add_binptr(m, "payload", pktbuf_ptr(pkt->pkt_payload), pktbuf_len(pkt->pkt_payload)); htsp_send(htsp, m, pkt->pkt_payload, &hs->hs_q, pktbuf_len(pkt->pkt_payload)); + atomic_add(&hs->hs_s->ths_bytes_out, pktbuf_len(pkt->pkt_payload)); if(hs->hs_last_report != dispatch_clock) { diff --git a/src/subscriptions.c b/src/subscriptions.c index 7df91924..ad66da9d 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -275,10 +275,10 @@ subscription_input_direct(void *opauqe, streaming_message_t *sm) th_pkt_t *pkt = sm->sm_data; if(pkt->pkt_err) s->ths_total_err++; - s->ths_bytes += pkt->pkt_payload->pb_size; + s->ths_bytes_in += pkt->pkt_payload->pb_size; } else if(sm->sm_type == SMT_MPEGTS) { pktbuf_t *pb = sm->sm_data; - s->ths_bytes += pb->pb_size; + s->ths_bytes_in += pb->pb_size; } /* Pass to output */ @@ -678,11 +678,13 @@ subscription_status_callback ( void *p ) LIST_FOREACH(s, &subscriptions, ths_global_link) { int errors = s->ths_total_err; - int bw = atomic_exchange(&s->ths_bytes, 0); + int in = atomic_exchange(&s->ths_bytes_in, 0); + int out = atomic_exchange(&s->ths_bytes_out, 0); htsmsg_t *m = subscription_create_msg(s); htsmsg_delete_field(m, "errors"); htsmsg_add_u32(m, "errors", errors); - htsmsg_add_u32(m, "bw", bw); + htsmsg_add_u32(m, "in", in); + htsmsg_add_u32(m, "out", out); htsmsg_add_u32(m, "updateEntry", 1); notify_by_msg("subscriptions", m); } diff --git a/src/subscriptions.h b/src/subscriptions.h index 8ebc3a1d..7095897e 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -55,7 +55,8 @@ typedef struct th_subscription { char *ths_title; /* display title */ time_t ths_start; /* time when subscription started */ int ths_total_err; /* total errors during entire subscription */ - int ths_bytes; // Reset every second to get aprox. bandwidth + int ths_bytes_in; // Reset every second to get aprox. bandwidth (in) + int ths_bytes_out; // Reset every second to get approx bandwidth (out) streaming_target_t ths_input; diff --git a/src/webui/static/app/status.js b/src/webui/static/app/status.js index 32aa9c74..8f2007ad 100644 --- a/src/webui/static/app/status.js +++ b/src/webui/static/app/status.js @@ -23,7 +23,9 @@ tvheadend.status_subs = function() { }, { name : 'errors' }, { - name : 'bw' + name : 'in' + }, { + name : 'out' }, { name : 'start', type : 'date', @@ -51,7 +53,8 @@ tvheadend.status_subs = function() { r.data.service = m.service; r.data.state = m.state; r.data.errors = m.errors; - r.data.bw = m.bw + r.data.in = m.in; + r.data.out = m.out; tvheadend.subsStore.afterEdit(r); tvheadend.subsStore.fireEvent('updated', tvheadend.subsStore, r, @@ -111,9 +114,15 @@ tvheadend.status_subs = function() { dataIndex : 'errors' }, { width : 50, - id : 'bw', - header : "Bandwidth (kb/s)", - dataIndex : 'bw', + id : 'in', + header : "Input (kb/s)", + dataIndex : 'in', + renderer: renderBw + }, { + width : 50, + id : 'out', + header : "Output (kb/s)", + dataIndex : 'out', renderer: renderBw } ]); diff --git a/src/webui/webui.c b/src/webui/webui.c index 12b05afe..dc79ee83 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -45,6 +45,7 @@ #include "imagecache.h" #include "tcp.h" #include "config2.h" +#include "atomic.h" #if defined(PLATFORM_LINUX) #include @@ -218,7 +219,8 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque) */ static void http_stream_run(http_connection_t *hc, streaming_queue_t *sq, - const char *name, muxer_container_type_t mc) + const char *name, muxer_container_type_t mc, + th_subscription_t *s) { streaming_message_t *sm; int run = 1; @@ -272,7 +274,9 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq, case SMT_MPEGTS: case SMT_PACKET: if(started) { - muxer_write_pkt(mux, sm->sm_type, sm->sm_data); + th_pkt_t *pkt = sm->sm_data; + atomic_add(&s->ths_bytes_out, pktbuf_len(pkt->pkt_payload)); + muxer_write_pkt(mux, sm->sm_type, pkt); sm->sm_data = NULL; } break; @@ -689,7 +693,7 @@ http_stream_service(http_connection_t *hc, service_t *service) if(s) { name = tvh_strdupa(service->s_nicename); pthread_mutex_unlock(&global_lock); - http_stream_run(hc, &sq, name, mc); + http_stream_run(hc, &sq, name, mc, s); pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); } @@ -733,7 +737,7 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm) return HTTP_STATUS_BAD_REQUEST; name = tvh_strdupa(s->ths_title); pthread_mutex_unlock(&global_lock); - http_stream_run(hc, &sq, name, MC_RAW); + http_stream_run(hc, &sq, name, MC_RAW, s); pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); @@ -808,7 +812,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch) if(s) { name = tvh_strdupa(ch->ch_name); pthread_mutex_unlock(&global_lock); - http_stream_run(hc, &sq, name, mc); + http_stream_run(hc, &sq, name, mc, s); pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); }