From 6508c9448e9bbeefbff116fac316c377f045eff5 Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Mon, 6 Oct 2014 14:22:43 +0200 Subject: [PATCH] http streaming: show the HTTP streaming connections in the webui status tab --- src/tcp.c | 57 +++++++++++++++++++++++++++++++++++++++-------- src/tcp.h | 3 +++ src/webui/webui.c | 29 +++++++++++++++++++----- 3 files changed, 74 insertions(+), 15 deletions(-) diff --git a/src/tcp.c b/src/tcp.c index 8f9dbfd3..cc2cf8ca 100644 --- a/src/tcp.c +++ b/src/tcp.c @@ -376,6 +376,7 @@ tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen) * */ static tvhpoll_t *tcp_server_poll; +static uint32_t tcp_server_launch_id; typedef struct tcp_server { int serverfd; @@ -385,9 +386,11 @@ typedef struct tcp_server { typedef struct tcp_server_launch { pthread_t tid; + uint32_t id; int fd; tcp_server_ops_t ops; void *opaque; + void (*status) (void *opaque, htsmsg_t *m); struct sockaddr_storage peer; struct sockaddr_storage self; time_t started; @@ -403,6 +406,43 @@ static LIST_HEAD(, tcp_server_launch) tcp_server_join = { 0 }; /** * */ +void * +tcp_connection_launch(int fd, void (*status) (void *opaque, htsmsg_t *m)) +{ + tcp_server_launch_t *tsl; + + lock_assert(&global_lock); + + assert(status); + + LIST_FOREACH(tsl, &tcp_server_active, alink) { + if (tsl->fd == fd) { + tsl->status = status; + LIST_INSERT_HEAD(&tcp_server_launches, tsl, link); + notify_reload("connections"); + return tsl; + } + } + return NULL; +} + +/** + * + */ +void +tcp_connection_land(void *id) +{ + tcp_server_launch_t *tsl = id; + + lock_assert(&global_lock); + + LIST_REMOVE(tsl, link); + notify_reload("connections"); +} + +/* + * + */ static void * tcp_server_start(void *aux) { @@ -438,21 +478,19 @@ tcp_server_start(void *aux) /* Start */ time(&tsl->started); + pthread_mutex_lock(&global_lock); + tsl->id = ++tcp_server_launch_id; + if (!tsl->id) tsl->id = ++tcp_server_launch_id; if (tsl->ops.status) { - pthread_mutex_lock(&global_lock); + tsl->status = tsl->ops.status; LIST_INSERT_HEAD(&tcp_server_launches, tsl, link); notify_reload("connections"); - pthread_mutex_unlock(&global_lock); } - pthread_mutex_lock(&global_lock); tsl->ops.start(tsl->fd, &tsl->opaque, &tsl->peer, &tsl->self); /* Stop */ if (tsl->ops.stop) tsl->ops.stop(tsl->opaque); - if (tsl->ops.status) { - LIST_REMOVE(tsl, link); - notify_reload("connections"); - } + if (tsl->ops.status) tcp_connection_land(tsl); LIST_REMOVE(tsl, alink); LIST_INSERT_HEAD(&tcp_server_join, tsl, jlink); pthread_mutex_unlock(&global_lock); @@ -667,13 +705,14 @@ tcp_server_connections ( void ) /* Build list */ l = htsmsg_create_list(); LIST_FOREACH(tsl, &tcp_server_launches, link) { - if (!tsl->ops.status) continue; + if (!tsl->status) continue; c++; e = htsmsg_create_map(); tcp_get_ip_str((struct sockaddr*)&tsl->peer, buf, sizeof(buf)); + htsmsg_add_u32(e, "id", tsl->id); htsmsg_add_str(e, "peer", buf); htsmsg_add_s64(e, "started", tsl->started); - tsl->ops.status(tsl->opaque, e); + tsl->status(tsl->opaque, e); htsmsg_add_msg(l, NULL, e); } diff --git a/src/tcp.h b/src/tcp.h index 87d53cea..0c93e93f 100644 --- a/src/tcp.h +++ b/src/tcp.h @@ -80,6 +80,9 @@ int tcp_read_timeout(int fd, void *buf, size_t len, int timeout); char *tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen); +void *tcp_connection_launch(int fd, void (*status) (void *opaque, htsmsg_t *m)); +void tcp_connection_land(void *id); + htsmsg_t *tcp_server_connections ( void ); #endif /* TCP_H_ */ diff --git a/src/webui/webui.c b/src/webui/webui.c index a3bf0511..29240036 100644 --- a/src/webui/webui.c +++ b/src/webui/webui.c @@ -259,6 +259,18 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque) return ret; } +/** + * HTTP stream status callback + */ +static void +http_stream_status ( void *opaque, htsmsg_t *m ) +{ + http_connection_t *hc = opaque; + htsmsg_add_str(m, "type", "HTTP"); + if (hc->hc_username) + htsmsg_add_str(m, "user", hc->hc_username); +} + /** * HTTP stream loop */ @@ -276,6 +288,13 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq, struct timeval tp; int err = 0; socklen_t errlen = sizeof(err); + void *tcp_id; + + tcp_id = tcp_connection_launch(hc->hc_fd, http_stream_status); + if (tcp_id == NULL) + return; + + pthread_mutex_unlock(&global_lock); mux = muxer_create(mc, mcfg); if(muxer_open_stream(mux, hc->hc_fd)) @@ -397,6 +416,10 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq, muxer_close(mux); muxer_destroy(mux); + + pthread_mutex_lock(&global_lock); + + tcp_connection_land(tcp_id); } @@ -815,9 +838,7 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight) http_arg_get(&hc->hc_args, "User-Agent")); if(s) { name = tvh_strdupa(service->s_nicename); - pthread_mutex_unlock(&global_lock); http_stream_run(hc, &sq, name, mc, s, &cfg->dvr_muxcnf); - pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); } @@ -863,9 +884,7 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight) if (!s) return HTTP_STATUS_BAD_REQUEST; name = tvh_strdupa(s->ths_title); - pthread_mutex_unlock(&global_lock); http_stream_run(hc, &sq, name, MC_RAW, s, &muxcfg); - pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); streaming_queue_deinit(&sq); @@ -941,9 +960,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight) if(s) { name = tvh_strdupa(channel_get_name(ch)); - pthread_mutex_unlock(&global_lock); http_stream_run(hc, &sq, name, mc, s, &cfg->dvr_muxcnf); - pthread_mutex_lock(&global_lock); subscription_unsubscribe(s); }