subscription: add in/out bandwidth indicator

Now have the concept of both input and output bandwidth usage of a sub.

The reason for this is that while data may be flowing into the subscription,
i.e. a signal is being received. Things like timeshift, or even potentially
flow control, may mean the actual outgoing rate is quite different.

Input rate is added by the subscription code as data enters the sub, however
the output rate must be added by the subscription handler as close to the
client as possible.
This commit is contained in:
Adam Sutton 2013-09-10 16:18:30 +01:00
parent e857acca84
commit 10854c1cf4
6 changed files with 57 additions and 32 deletions

View file

@ -33,6 +33,7 @@
#include "plumbing/tsfix.h"
#include "plumbing/globalheaders.h"
#include "htsp_server.h"
#include "atomic.h"
#include "muxer.h"
@ -422,7 +423,13 @@ dvr_thread(void *aux)
pthread_cond_wait(&sq->sq_cond, &sq->sq_mutex);
continue;
}
if (de->de_s && started &&
(sm->sm_type == SMT_PACKET || sm->sm_type == SMT_MPEGTS)) {
th_pkt_t *pkt = sm->sm_data;
atomic_add(&de->de_s->ths_bytes_out, pktbuf_len(pkt->pkt_payload));
}
TAILQ_REMOVE(&sq->sq_queue, sm, sm_link);
pthread_mutex_unlock(&sq->sq_mutex);

View file

@ -16,23 +16,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <fcntl.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "tvheadend.h"
#include "atomic.h"
#include "channels.h"
#include "subscriptions.h"
#include "tcp.h"
@ -51,6 +36,22 @@
#if ENABLE_LIBAV
#include "plumbing/transcoding.h"
#endif
#include <pthread.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <fcntl.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/statvfs.h>
#include "settings.h"
#include <sys/time.h>
@ -2407,6 +2408,7 @@ htsp_stream_deliver(htsp_subscription_t *hs, th_pkt_t *pkt)
htsmsg_add_binptr(m, "payload", pktbuf_ptr(pkt->pkt_payload),
pktbuf_len(pkt->pkt_payload));
htsp_send(htsp, m, pkt->pkt_payload, &hs->hs_q, pktbuf_len(pkt->pkt_payload));
atomic_add(&hs->hs_s->ths_bytes_out, pktbuf_len(pkt->pkt_payload));
if(hs->hs_last_report != dispatch_clock) {

View file

@ -275,10 +275,10 @@ subscription_input_direct(void *opauqe, streaming_message_t *sm)
th_pkt_t *pkt = sm->sm_data;
if(pkt->pkt_err)
s->ths_total_err++;
s->ths_bytes += pkt->pkt_payload->pb_size;
s->ths_bytes_in += pkt->pkt_payload->pb_size;
} else if(sm->sm_type == SMT_MPEGTS) {
pktbuf_t *pb = sm->sm_data;
s->ths_bytes += pb->pb_size;
s->ths_bytes_in += pb->pb_size;
}
/* Pass to output */
@ -678,11 +678,13 @@ subscription_status_callback ( void *p )
LIST_FOREACH(s, &subscriptions, ths_global_link) {
int errors = s->ths_total_err;
int bw = atomic_exchange(&s->ths_bytes, 0);
int in = atomic_exchange(&s->ths_bytes_in, 0);
int out = atomic_exchange(&s->ths_bytes_out, 0);
htsmsg_t *m = subscription_create_msg(s);
htsmsg_delete_field(m, "errors");
htsmsg_add_u32(m, "errors", errors);
htsmsg_add_u32(m, "bw", bw);
htsmsg_add_u32(m, "in", in);
htsmsg_add_u32(m, "out", out);
htsmsg_add_u32(m, "updateEntry", 1);
notify_by_msg("subscriptions", m);
}

View file

@ -55,7 +55,8 @@ typedef struct th_subscription {
char *ths_title; /* display title */
time_t ths_start; /* time when subscription started */
int ths_total_err; /* total errors during entire subscription */
int ths_bytes; // Reset every second to get aprox. bandwidth
int ths_bytes_in; // Reset every second to get aprox. bandwidth (in)
int ths_bytes_out; // Reset every second to get approx bandwidth (out)
streaming_target_t ths_input;

View file

@ -23,7 +23,9 @@ tvheadend.status_subs = function() {
}, {
name : 'errors'
}, {
name : 'bw'
name : 'in'
}, {
name : 'out'
}, {
name : 'start',
type : 'date',
@ -51,7 +53,8 @@ tvheadend.status_subs = function() {
r.data.service = m.service;
r.data.state = m.state;
r.data.errors = m.errors;
r.data.bw = m.bw
r.data.in = m.in;
r.data.out = m.out;
tvheadend.subsStore.afterEdit(r);
tvheadend.subsStore.fireEvent('updated', tvheadend.subsStore, r,
@ -111,9 +114,15 @@ tvheadend.status_subs = function() {
dataIndex : 'errors'
}, {
width : 50,
id : 'bw',
header : "Bandwidth (kb/s)",
dataIndex : 'bw',
id : 'in',
header : "Input (kb/s)",
dataIndex : 'in',
renderer: renderBw
}, {
width : 50,
id : 'out',
header : "Output (kb/s)",
dataIndex : 'out',
renderer: renderBw
} ]);

View file

@ -45,6 +45,7 @@
#include "imagecache.h"
#include "tcp.h"
#include "config2.h"
#include "atomic.h"
#if defined(PLATFORM_LINUX)
#include <sys/sendfile.h>
@ -218,7 +219,8 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque)
*/
static void
http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
const char *name, muxer_container_type_t mc)
const char *name, muxer_container_type_t mc,
th_subscription_t *s)
{
streaming_message_t *sm;
int run = 1;
@ -272,7 +274,9 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
case SMT_MPEGTS:
case SMT_PACKET:
if(started) {
muxer_write_pkt(mux, sm->sm_type, sm->sm_data);
th_pkt_t *pkt = sm->sm_data;
atomic_add(&s->ths_bytes_out, pktbuf_len(pkt->pkt_payload));
muxer_write_pkt(mux, sm->sm_type, pkt);
sm->sm_data = NULL;
}
break;
@ -689,7 +693,7 @@ http_stream_service(http_connection_t *hc, service_t *service)
if(s) {
name = tvh_strdupa(service->s_nicename);
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, mc);
http_stream_run(hc, &sq, name, mc, s);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
}
@ -733,7 +737,7 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm)
return HTTP_STATUS_BAD_REQUEST;
name = tvh_strdupa(s->ths_title);
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, MC_RAW);
http_stream_run(hc, &sq, name, MC_RAW, s);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
@ -808,7 +812,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch)
if(s) {
name = tvh_strdupa(ch->ch_name);
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, mc);
http_stream_run(hc, &sq, name, mc, s);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
}