diff --git a/src/dvr/dvr_rec.c b/src/dvr/dvr_rec.c index 34256b18..ba1e880c 100644 --- a/src/dvr/dvr_rec.c +++ b/src/dvr/dvr_rec.c @@ -33,6 +33,7 @@ #include "plumbing/tsfix.h" #include "plumbing/globalheaders.h" #include "htsp_server.h" +#include "atomic.h" #include "muxer.h" @@ -422,7 +423,13 @@ dvr_thread(void *aux) pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex); continue; } - + + if (de->de_s && started && + (sm->sm_type == SMT_PACKET || sm->sm_type == SMT_MPEGTS)) { + th_pkt_t *pkt = sm->sm_data; + atomic_add(&de->de_s->ths_bytes_out, pktbuf_len(pkt->pkt_payload)); + } + TAILQ_REMOVE(&sq->sq_queue, sm, sm_link); pthread_mutex_unlock(&sq->sq_mutex); diff --git a/src/htsp_server.c b/src/htsp_server.c index 20689f2f..9a5bf727 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -16,23 +16,8 @@ * along with this program. If not, see . */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - #include "tvheadend.h" +#include "atomic.h" #include "channels.h" #include "subscriptions.h" #include "tcp.h" @@ -51,6 +36,22 @@ #if ENABLE_LIBAV #include "plumbing/transcoding.h" #endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include "settings.h" #include @@ -2407,6 +2408,7 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt) htsmsg_add_binptr(m, "payload", pktbuf_ptr(pkt->pkt_payload), pktbuf_len(pkt->pkt_payload)); htsp_send(htsp, m, pkt->pkt_payload, &hs->hs_q, pktbuf_len(pkt->pkt_payload)); + atomic_add(&hs->hs_s->ths_bytes_out, pktbuf_len(pkt->pkt_payload)); if(hs->hs_last_report != dispatch_clock) { diff --git a/src/subscriptions.c b/src/subscriptions.c index 7df91924..ad66da9d 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -275,10 +275,10 @@ subscription_input_direct(void *opauqe, streaming_message_t *sm) th_pkt_t *pkt = sm->sm_data; if(pkt->pkt_err) s->ths_total_err++; - s->ths_bytes += pkt->pkt_payload->pb_size; + s->ths_bytes_in += pkt->pkt_payload->pb_size; } else if(sm->sm_type == SMT_MPEGTS) { pktbuf_t *pb = sm->sm_data; - s->ths_bytes += pb->pb_size; + s->ths_bytes_in += pb->pb_size; } /* Pass to output */ @@ -678,11 +678,13 @@ subscription_status_callback ( void *p ) LIST_FOREACH(s, &subscriptions, ths_global_link) { int errors = s->ths_total_err; - int bw = atomic_exchange(&s->ths_bytes, 0); + int in = atomic_exchange(&s->ths_bytes_in, 0); + int out = atomic_exchange(&s->ths_bytes_out, 0); htsmsg_t *m = subscription_create_msg(s); htsmsg_delete_field(m, "errors"); htsmsg_add_u32(m, "errors", errors); - htsmsg_add_u32(m, "bw", bw); + htsmsg_add_u32(m, "in", in); + htsmsg_add_u32(m, "out", out); htsmsg_add_u32(m, "updateEntry", 1); notify_by_msg("subscriptions", m); } diff --git a/src/subscriptions.h b/src/subscriptions.h index 8ebc3a1d..7095897e 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -55,7 +55,8 @@ typedef struct th_subscription { char *ths_title; /* display title */ time_t ths_start; /* time when subscription started */ int ths_total_err; /* total errors during entire subscription */ - int ths_bytes; // Reset every second to get aprox. bandwidth + int ths_bytes_in; // Reset every second to get aprox. bandwidth (in) + int ths_bytes_out; // Reset every second to get approx bandwidth (out) streaming_target_t ths_input; diff --git a/src/webui/static/app/status.js b/src/webui/static/app/status.js index 32aa9c74..8f2007ad 100644 --- a/src/webui/static/app/status.js +++ b/src/webui/static/app/status.js @@ -23,7 +23,9 @@ tvheadend.status_subs = function() { }, { name : 'errors' }, { - name : 'bw' + name : 'in' + }, { + name : 'out' }, { name : 'start', type : 'date', @@ -51,7 +53,8 @@ tvheadend.status_subs = function() { r.data.service = m.service; r.data.state = m.state; r.data.errors = m.errors; - r.data.bw = m.bw + r.data.in = m.in; + r.data.out = m.out; tvheadend.subsStore.afterEdit(r); tvheadend.subsStore.fireEvent('updated', tvheadend.subsStore, r, @@ -111,9 +114,15 @@ tvheadend.status_subs = function() { dataIndex : 'errors' }, { width : 50, - id : 'bw', - header : "Bandwidth (kb/s)", - dataIndex : 'bw', + id : 'in', + header : "Input (kb/s)", + dataIndex : 'in', + renderer: renderBw + }, { + width : 50, + id : 'out', + header : "Output (kb/s)", + dataIndex : 'out', renderer: renderBw } ]); diff --git a/src/webui/webui.c b/src/webui/webui.c index 12b05afe..dc79ee83 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -45,6 +45,7 @@ #include "imagecache.h" #include "tcp.h" #include "config2.h" +#include "atomic.h" #if defined(PLATFORM_LINUX) #include @@ -218,7 +219,8 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque) */ static void http_stream_run(http_connection_t *hc, streaming_queue_t *sq, - const char *name, muxer_container_type_t mc) + const char *name, muxer_container_type_t mc, + th_subscription_t *s) { streaming_message_t *sm; int run = 1; @@ -272,7 +274,9 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq, case SMT_MPEGTS: case SMT_PACKET: if(started) { - muxer_write_pkt(mux, sm->sm_type, sm->sm_data); + th_pkt_t *pkt = sm->sm_data; + atomic_add(&s->ths_bytes_out, pktbuf_len(pkt->pkt_payload)); + muxer_write_pkt(mux, sm->sm_type, pkt); sm->sm_data = NULL; } break; @@ -689,7 +693,7 @@ http_stream_service(http_connection_t *hc, service_t *service) if(s) { name = tvh_strdupa(service->s_nicename); pthread_mutex_unlock(&global_lock); - http_stream_run(hc, &sq, name, mc); + http_stream_run(hc, &sq, name, mc, s); pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); } @@ -733,7 +737,7 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm) return HTTP_STATUS_BAD_REQUEST; name = tvh_strdupa(s->ths_title); pthread_mutex_unlock(&global_lock); - http_stream_run(hc, &sq, name, MC_RAW); + http_stream_run(hc, &sq, name, MC_RAW, s); pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); @@ -808,7 +812,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch) if(s) { name = tvh_strdupa(ch->ch_name); pthread_mutex_unlock(&global_lock); - http_stream_run(hc, &sq, name, mc); + http_stream_run(hc, &sq, name, mc, s); pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); }