From 1b22e08ab991c837343c4a35a9fce2f68265e708 Mon Sep 17 00:00:00 2001 From: Adam Sutton Date: Mon, 11 Nov 2013 09:52:29 +0000 Subject: [PATCH] status: collect (and post) status of all TCP connections This will indiscrimately include all HTSP and HTTP connections, first pass I was going to just dump the lot on a UI status tab. However it could be some filtering might be useful. --- src/htsp_server.c | 26 +++++++++++++++-- src/http.c | 22 ++++++++++++-- src/tcp.c | 73 ++++++++++++++++++++++++++++++++++++++++------- src/tcp.h | 13 ++++++++- 4 files changed, 118 insertions(+), 16 deletions(-) diff --git a/src/htsp_server.c b/src/htsp_server.c index 28f0f31e..7116cc7a 100644 --- a/src/htsp_server.c +++ b/src/htsp_server.c @@ -2078,7 +2078,7 @@ htsp_write_scheduler(void *aux) * */ static void -htsp_serve(int fd, void *opaque, struct sockaddr_storage *source, +htsp_serve(int fd, void **opaque, struct sockaddr_storage *source, struct sockaddr_storage *self) { htsp_connection_t htsp; @@ -2088,6 +2088,7 @@ htsp_serve(int fd, void *opaque, struct sockaddr_storage *source, tcp_get_ip_str((struct sockaddr*)source, buf, 50); memset(&htsp, 0, sizeof(htsp_connection_t)); + *opaque = &htsp; TAILQ_INIT(&htsp.htsp_active_output_queues); @@ -2123,6 +2124,8 @@ htsp_serve(int fd, void *opaque, struct sockaddr_storage *source, pthread_mutex_lock(&global_lock); + *opaque = NULL; + gtimer_disarm(&htsp.htsp_timer); /* Beware! Closing subscriptions will invoke a lot of callbacks @@ -2168,6 +2171,18 @@ htsp_serve(int fd, void *opaque, struct sockaddr_storage *source, close(fd); } +/* + * Status callback + */ +static void +htsp_server_status ( void *opaque, htsmsg_t *m ) +{ + htsp_connection_t *htsp = opaque; + htsmsg_add_str(m, "type", "HTSP"); + if (htsp->htsp_username) + htsmsg_add_str(m, "user", htsp->htsp_username); +} + /** * Fire up HTSP server */ @@ -2175,9 +2190,14 @@ void htsp_init(const char *bindaddr) { extern int tvheadend_htsp_port_extra; - htsp_server = tcp_server_create(bindaddr, tvheadend_htsp_port, htsp_serve, NULL); + static tcp_server_ops_t ops = { + .start = htsp_serve, + .stop = NULL, + .status = htsp_server_status, + }; + htsp_server = tcp_server_create(bindaddr, tvheadend_htsp_port, &ops, NULL); if(tvheadend_htsp_port_extra) - htsp_server_2 = tcp_server_create(bindaddr, tvheadend_htsp_port_extra, htsp_serve, NULL); + htsp_server_2 = tcp_server_create(bindaddr, tvheadend_htsp_port_extra, &ops, NULL); } /* ************************************************************************** diff --git a/src/http.c b/src/http.c index ef36fb91..9aaf4cf0 100644 --- a/src/http.c +++ b/src/http.c @@ -777,13 +777,14 @@ http_serve_requests(http_connection_t *hc, htsbuf_queue_t *spill) * */ static void -http_serve(int fd, void *opaque, struct sockaddr_storage *peer, +http_serve(int fd, void **opaque, struct sockaddr_storage *peer, struct sockaddr_storage *self) { htsbuf_queue_t spill; http_connection_t hc; memset(&hc, 0, sizeof(http_connection_t)); + *opaque = &hc; TAILQ_INIT(&hc.hc_args); TAILQ_INIT(&hc.hc_req_args); @@ -806,8 +807,20 @@ http_serve(int fd, void *opaque, struct sockaddr_storage *peer, htsbuf_queue_flush(&hc.hc_reply); htsbuf_queue_flush(&spill); close(fd); + + pthread_mutex_lock(&global_lock); + *opaque = NULL; + pthread_mutex_unlock(&global_lock); } +static void +http_server_status ( void *opaque, htsmsg_t *m ) +{ + http_connection_t *hc = opaque; + htsmsg_add_str(m, "type", "HTTP"); + if (hc->hc_username) + htsmsg_add_str(m, "user", hc->hc_username); +} /** * Fire up HTTP server @@ -815,5 +828,10 @@ http_serve(int fd, void *opaque, struct sockaddr_storage *peer, void http_server_init(const char *bindaddr) { - http_server = tcp_server_create(bindaddr, tvheadend_webui_port, http_serve, NULL); + static tcp_server_ops_t ops = { + .start = http_serve, + .stop = NULL, + .status = http_server_status, + }; + http_server = tcp_server_create(bindaddr, tvheadend_webui_port, &ops, NULL); } diff --git a/src/tcp.c b/src/tcp.c index 8be7f367..051778da 100644 --- a/src/tcp.c +++ b/src/tcp.c @@ -36,6 +36,8 @@ #include "tcp.h" #include "tvheadend.h" #include "tvhpoll.h" +#include "queue.h" +#include "notify.h" int tcp_preferred_address_family = AF_INET; @@ -378,19 +380,24 @@ tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen) static tvhpoll_t *tcp_server_poll; typedef struct tcp_server { - tcp_server_callback_t *start; - void *opaque; int serverfd; + tcp_server_ops_t ops; + void *opaque; } tcp_server_t; -typedef struct tcp_server_launch_t { - tcp_server_callback_t *start; - void *opaque; +typedef struct tcp_server_launch { int fd; + tcp_server_ops_t ops; + void *opaque; struct sockaddr_storage peer; struct sockaddr_storage self; + time_t started; + LIST_ENTRY(tcp_server_launch) link; } tcp_server_launch_t; +static LIST_HEAD(, tcp_server_launch) tcp_server_launches = { 0 }; + +static gtimer_t tcp_server_status_timer; /** * @@ -427,9 +434,20 @@ tcp_server_start(void *aux) to.tv_usec = 0; setsockopt(tsl->fd, SOL_SOCKET, SO_SNDTIMEO, &to, sizeof(to)); - tsl->start(tsl->fd, tsl->opaque, &tsl->peer, &tsl->self); - free(tsl); + /* Start */ + time(&tsl->started); + pthread_mutex_lock(&global_lock); + LIST_INSERT_HEAD(&tcp_server_launches, tsl, link); + pthread_mutex_unlock(&global_lock); + tsl->ops.start(tsl->fd, &tsl->opaque, &tsl->peer, &tsl->self); + /* Stop */ + if (tsl->ops.stop) tsl->ops.stop(tsl->opaque); + pthread_mutex_lock(&global_lock); + LIST_REMOVE(tsl, link); + pthread_mutex_unlock(&global_lock); + + free(tsl); return NULL; } @@ -470,7 +488,7 @@ tcp_server_loop(void *aux) if(ev.events & TVHPOLL_IN) { tsl = malloc(sizeof(tcp_server_launch_t)); - tsl->start = ts->start; + tsl->ops = ts->ops; tsl->opaque = ts->opaque; slen = sizeof(struct sockaddr_storage); @@ -500,7 +518,8 @@ tcp_server_loop(void *aux) * */ void * -tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start, void *opaque) +tcp_server_create + (const char *bindaddr, int port, tcp_server_ops_t *ops, void *opaque) { int fd, x; tvhpoll_event_t ev; @@ -567,7 +586,7 @@ tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start, ts = malloc(sizeof(tcp_server_t)); ts->serverfd = fd; - ts->start = start; + ts->ops = *ops; ts->opaque = opaque; ev.fd = fd; @@ -578,6 +597,37 @@ tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start, return ts; } +/* + * Connections status + */ +static void +tcp_server_status_callback ( void *opaque ) +{ + tcp_server_launch_t *tsl; + lock_assert(&global_lock); + htsmsg_t *l, *e, *m; + char buf[1024]; + + /* RE-arm */ + gtimer_arm(&tcp_server_status_timer, tcp_server_status_callback, NULL, 1); + + /* Build list */ + l = htsmsg_create_list(); + LIST_FOREACH(tsl, &tcp_server_launches, link) { + e = htsmsg_create_map(); + tcp_get_ip_str((struct sockaddr*)&tsl->peer, buf, sizeof(buf)); + htsmsg_add_str(e, "peer", buf); + htsmsg_add_s64(e, "started", tsl->started); + if (tsl->ops.status) tsl->ops.status(tsl->opaque, e); + htsmsg_add_msg(l, NULL, e); + } + + /* Output */ + m = htsmsg_create_map(); + htsmsg_add_msg(m, "entries", l); + notify_by_msg("tcp_connections", m); +} + /** * */ @@ -591,6 +641,9 @@ tcp_server_init(int opt_ipv6) tcp_server_poll = tvhpoll_create(10); tvhthread_create(&tid, NULL, tcp_server_loop, NULL, 1); + + /* Status timer */ + gtimer_arm(&tcp_server_status_timer, tcp_server_status_callback, NULL, 1); } diff --git a/src/tcp.h b/src/tcp.h index 98649f0b..8e329dc5 100644 --- a/src/tcp.h +++ b/src/tcp.h @@ -20,6 +20,16 @@ #define TCP_H_ #include "htsbuf.h" +#include "htsmsg.h" + +typedef struct tcp_server_ops +{ + void (*start) (int fd, void **opaque, + struct sockaddr_storage *peer, + struct sockaddr_storage *self); + void (*stop) (void *opaque); + void (*status) (void *opaque, htsmsg_t *m); +} tcp_server_ops_t; extern int tcp_preferred_address_family; @@ -32,7 +42,8 @@ typedef void (tcp_server_callback_t)(int fd, void *opaque, struct sockaddr_storage *peer, struct sockaddr_storage *self); -void *tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start, void *opaque); +void *tcp_server_create(const char *bindaddr, int port, + tcp_server_ops_t *ops, void *opaque); int tcp_read(int fd, void *buf, size_t len);