status: collect (and post) status of all TCP connections

This will indiscrimately include all HTSP and HTTP connections, first pass
I was going to just dump the lot on a UI status tab. However it could be
some filtering might be useful.
This commit is contained in:
Adam Sutton 2013-11-11 09:52:29 +00:00
parent 509f47e30c
commit 1b22e08ab9
4 changed files with 118 additions and 16 deletions

View file

@ -2078,7 +2078,7 @@ htsp_write_scheduler(void *aux)
*
*/
static void
htsp_serve(int fd, void *opaque, struct sockaddr_storage *source,
htsp_serve(int fd, void **opaque, struct sockaddr_storage *source,
struct sockaddr_storage *self)
{
htsp_connection_t htsp;
@ -2088,6 +2088,7 @@ htsp_serve(int fd, void *opaque, struct sockaddr_storage *source,
tcp_get_ip_str((struct sockaddr*)source, buf, 50);
memset(&htsp, 0, sizeof(htsp_connection_t));
*opaque = &htsp;
TAILQ_INIT(&htsp.htsp_active_output_queues);
@ -2123,6 +2124,8 @@ htsp_serve(int fd, void *opaque, struct sockaddr_storage *source,
pthread_mutex_lock(&global_lock);
*opaque = NULL;
gtimer_disarm(&htsp.htsp_timer);
/* Beware! Closing subscriptions will invoke a lot of callbacks
@ -2168,6 +2171,18 @@ htsp_serve(int fd, void *opaque, struct sockaddr_storage *source,
close(fd);
}
/*
* Status callback
*/
static void
htsp_server_status ( void *opaque, htsmsg_t *m )
{
htsp_connection_t *htsp = opaque;
htsmsg_add_str(m, "type", "HTSP");
if (htsp->htsp_username)
htsmsg_add_str(m, "user", htsp->htsp_username);
}
/**
* Fire up HTSP server
*/
@ -2175,9 +2190,14 @@ void
htsp_init(const char *bindaddr)
{
extern int tvheadend_htsp_port_extra;
htsp_server = tcp_server_create(bindaddr, tvheadend_htsp_port, htsp_serve, NULL);
static tcp_server_ops_t ops = {
.start = htsp_serve,
.stop = NULL,
.status = htsp_server_status,
};
htsp_server = tcp_server_create(bindaddr, tvheadend_htsp_port, &ops, NULL);
if(tvheadend_htsp_port_extra)
htsp_server_2 = tcp_server_create(bindaddr, tvheadend_htsp_port_extra, htsp_serve, NULL);
htsp_server_2 = tcp_server_create(bindaddr, tvheadend_htsp_port_extra, &ops, NULL);
}
/* **************************************************************************

View file

@ -777,13 +777,14 @@ http_serve_requests(http_connection_t *hc, htsbuf_queue_t *spill)
*
*/
static void
http_serve(int fd, void *opaque, struct sockaddr_storage *peer,
http_serve(int fd, void **opaque, struct sockaddr_storage *peer,
struct sockaddr_storage *self)
{
htsbuf_queue_t spill;
http_connection_t hc;
memset(&hc, 0, sizeof(http_connection_t));
*opaque = &hc;
TAILQ_INIT(&hc.hc_args);
TAILQ_INIT(&hc.hc_req_args);
@ -806,8 +807,20 @@ http_serve(int fd, void *opaque, struct sockaddr_storage *peer,
htsbuf_queue_flush(&hc.hc_reply);
htsbuf_queue_flush(&spill);
close(fd);
pthread_mutex_lock(&global_lock);
*opaque = NULL;
pthread_mutex_unlock(&global_lock);
}
static void
http_server_status ( void *opaque, htsmsg_t *m )
{
http_connection_t *hc = opaque;
htsmsg_add_str(m, "type", "HTTP");
if (hc->hc_username)
htsmsg_add_str(m, "user", hc->hc_username);
}
/**
* Fire up HTTP server
@ -815,5 +828,10 @@ http_serve(int fd, void *opaque, struct sockaddr_storage *peer,
void
http_server_init(const char *bindaddr)
{
http_server = tcp_server_create(bindaddr, tvheadend_webui_port, http_serve, NULL);
static tcp_server_ops_t ops = {
.start = http_serve,
.stop = NULL,
.status = http_server_status,
};
http_server = tcp_server_create(bindaddr, tvheadend_webui_port, &ops, NULL);
}

View file

@ -36,6 +36,8 @@
#include "tcp.h"
#include "tvheadend.h"
#include "tvhpoll.h"
#include "queue.h"
#include "notify.h"
int tcp_preferred_address_family = AF_INET;
@ -378,19 +380,24 @@ tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen)
static tvhpoll_t *tcp_server_poll;
typedef struct tcp_server {
tcp_server_callback_t *start;
void *opaque;
int serverfd;
tcp_server_ops_t ops;
void *opaque;
} tcp_server_t;
typedef struct tcp_server_launch_t {
tcp_server_callback_t *start;
void *opaque;
typedef struct tcp_server_launch {
int fd;
tcp_server_ops_t ops;
void *opaque;
struct sockaddr_storage peer;
struct sockaddr_storage self;
time_t started;
LIST_ENTRY(tcp_server_launch) link;
} tcp_server_launch_t;
static LIST_HEAD(, tcp_server_launch) tcp_server_launches = { 0 };
static gtimer_t tcp_server_status_timer;
/**
*
@ -427,9 +434,20 @@ tcp_server_start(void *aux)
to.tv_usec = 0;
setsockopt(tsl->fd, SOL_SOCKET, SO_SNDTIMEO, &to, sizeof(to));
tsl->start(tsl->fd, tsl->opaque, &tsl->peer, &tsl->self);
free(tsl);
/* Start */
time(&tsl->started);
pthread_mutex_lock(&global_lock);
LIST_INSERT_HEAD(&tcp_server_launches, tsl, link);
pthread_mutex_unlock(&global_lock);
tsl->ops.start(tsl->fd, &tsl->opaque, &tsl->peer, &tsl->self);
/* Stop */
if (tsl->ops.stop) tsl->ops.stop(tsl->opaque);
pthread_mutex_lock(&global_lock);
LIST_REMOVE(tsl, link);
pthread_mutex_unlock(&global_lock);
free(tsl);
return NULL;
}
@ -470,7 +488,7 @@ tcp_server_loop(void *aux)
if(ev.events & TVHPOLL_IN) {
tsl = malloc(sizeof(tcp_server_launch_t));
tsl->start = ts->start;
tsl->ops = ts->ops;
tsl->opaque = ts->opaque;
slen = sizeof(struct sockaddr_storage);
@ -500,7 +518,8 @@ tcp_server_loop(void *aux)
*
*/
void *
tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start, void *opaque)
tcp_server_create
(const char *bindaddr, int port, tcp_server_ops_t *ops, void *opaque)
{
int fd, x;
tvhpoll_event_t ev;
@ -567,7 +586,7 @@ tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start,
ts = malloc(sizeof(tcp_server_t));
ts->serverfd = fd;
ts->start = start;
ts->ops = *ops;
ts->opaque = opaque;
ev.fd = fd;
@ -578,6 +597,37 @@ tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start,
return ts;
}
/*
* Connections status
*/
static void
tcp_server_status_callback ( void *opaque )
{
tcp_server_launch_t *tsl;
lock_assert(&global_lock);
htsmsg_t *l, *e, *m;
char buf[1024];
/* RE-arm */
gtimer_arm(&tcp_server_status_timer, tcp_server_status_callback, NULL, 1);
/* Build list */
l = htsmsg_create_list();
LIST_FOREACH(tsl, &tcp_server_launches, link) {
e = htsmsg_create_map();
tcp_get_ip_str((struct sockaddr*)&tsl->peer, buf, sizeof(buf));
htsmsg_add_str(e, "peer", buf);
htsmsg_add_s64(e, "started", tsl->started);
if (tsl->ops.status) tsl->ops.status(tsl->opaque, e);
htsmsg_add_msg(l, NULL, e);
}
/* Output */
m = htsmsg_create_map();
htsmsg_add_msg(m, "entries", l);
notify_by_msg("tcp_connections", m);
}
/**
*
*/
@ -591,6 +641,9 @@ tcp_server_init(int opt_ipv6)
tcp_server_poll = tvhpoll_create(10);
tvhthread_create(&tid, NULL, tcp_server_loop, NULL, 1);
/* Status timer */
gtimer_arm(&tcp_server_status_timer, tcp_server_status_callback, NULL, 1);
}

View file

@ -20,6 +20,16 @@
#define TCP_H_
#include "htsbuf.h"
#include "htsmsg.h"
typedef struct tcp_server_ops
{
void (*start) (int fd, void **opaque,
struct sockaddr_storage *peer,
struct sockaddr_storage *self);
void (*stop) (void *opaque);
void (*status) (void *opaque, htsmsg_t *m);
} tcp_server_ops_t;
extern int tcp_preferred_address_family;
@ -32,7 +42,8 @@ typedef void (tcp_server_callback_t)(int fd, void *opaque,
struct sockaddr_storage *peer,
struct sockaddr_storage *self);
void *tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start, void *opaque);
void *tcp_server_create(const char *bindaddr, int port,
tcp_server_ops_t *ops, void *opaque);
int tcp_read(int fd, void *buf, size_t len);