http streaming: show the HTTP streaming connections in the webui status tab

This commit is contained in:
Jaroslav Kysela 2014-10-06 14:22:43 +02:00
parent b92231bd13
commit 6508c9448e
3 changed files with 74 additions and 15 deletions

View file

@ -376,6 +376,7 @@ tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen)
*
*/
static tvhpoll_t *tcp_server_poll;
static uint32_t tcp_server_launch_id;
typedef struct tcp_server {
int serverfd;
@ -385,9 +386,11 @@ typedef struct tcp_server {
typedef struct tcp_server_launch {
pthread_t tid;
uint32_t id;
int fd;
tcp_server_ops_t ops;
void *opaque;
void (*status) (void *opaque, htsmsg_t *m);
struct sockaddr_storage peer;
struct sockaddr_storage self;
time_t started;
@ -403,6 +406,43 @@ static LIST_HEAD(, tcp_server_launch) tcp_server_join = { 0 };
/**
*
*/
void *
tcp_connection_launch(int fd, void (*status) (void *opaque, htsmsg_t *m))
{
tcp_server_launch_t *tsl;
lock_assert(&global_lock);
assert(status);
LIST_FOREACH(tsl, &tcp_server_active, alink) {
if (tsl->fd == fd) {
tsl->status = status;
LIST_INSERT_HEAD(&tcp_server_launches, tsl, link);
notify_reload("connections");
return tsl;
}
}
return NULL;
}
/**
*
*/
void
tcp_connection_land(void *id)
{
tcp_server_launch_t *tsl = id;
lock_assert(&global_lock);
LIST_REMOVE(tsl, link);
notify_reload("connections");
}
/*
*
*/
static void *
tcp_server_start(void *aux)
{
@ -438,21 +478,19 @@ tcp_server_start(void *aux)
/* Start */
time(&tsl->started);
pthread_mutex_lock(&global_lock);
tsl->id = ++tcp_server_launch_id;
if (!tsl->id) tsl->id = ++tcp_server_launch_id;
if (tsl->ops.status) {
pthread_mutex_lock(&global_lock);
tsl->status = tsl->ops.status;
LIST_INSERT_HEAD(&tcp_server_launches, tsl, link);
notify_reload("connections");
pthread_mutex_unlock(&global_lock);
}
pthread_mutex_lock(&global_lock);
tsl->ops.start(tsl->fd, &tsl->opaque, &tsl->peer, &tsl->self);
/* Stop */
if (tsl->ops.stop) tsl->ops.stop(tsl->opaque);
if (tsl->ops.status) {
LIST_REMOVE(tsl, link);
notify_reload("connections");
}
if (tsl->ops.status) tcp_connection_land(tsl);
LIST_REMOVE(tsl, alink);
LIST_INSERT_HEAD(&tcp_server_join, tsl, jlink);
pthread_mutex_unlock(&global_lock);
@ -667,13 +705,14 @@ tcp_server_connections ( void )
/* Build list */
l = htsmsg_create_list();
LIST_FOREACH(tsl, &tcp_server_launches, link) {
if (!tsl->ops.status) continue;
if (!tsl->status) continue;
c++;
e = htsmsg_create_map();
tcp_get_ip_str((struct sockaddr*)&tsl->peer, buf, sizeof(buf));
htsmsg_add_u32(e, "id", tsl->id);
htsmsg_add_str(e, "peer", buf);
htsmsg_add_s64(e, "started", tsl->started);
tsl->ops.status(tsl->opaque, e);
tsl->status(tsl->opaque, e);
htsmsg_add_msg(l, NULL, e);
}

View file

@ -80,6 +80,9 @@ int tcp_read_timeout(int fd, void *buf, size_t len, int timeout);
char *tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen);
void *tcp_connection_launch(int fd, void (*status) (void *opaque, htsmsg_t *m));
void tcp_connection_land(void *id);
htsmsg_t *tcp_server_connections ( void );
#endif /* TCP_H_ */

View file

@ -259,6 +259,18 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque)
return ret;
}
/**
* HTTP stream status callback
*/
static void
http_stream_status ( void *opaque, htsmsg_t *m )
{
http_connection_t *hc = opaque;
htsmsg_add_str(m, "type", "HTTP");
if (hc->hc_username)
htsmsg_add_str(m, "user", hc->hc_username);
}
/**
* HTTP stream loop
*/
@ -276,6 +288,13 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
struct timeval tp;
int err = 0;
socklen_t errlen = sizeof(err);
void *tcp_id;
tcp_id = tcp_connection_launch(hc->hc_fd, http_stream_status);
if (tcp_id == NULL)
return;
pthread_mutex_unlock(&global_lock);
mux = muxer_create(mc, mcfg);
if(muxer_open_stream(mux, hc->hc_fd))
@ -397,6 +416,10 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
muxer_close(mux);
muxer_destroy(mux);
pthread_mutex_lock(&global_lock);
tcp_connection_land(tcp_id);
}
@ -815,9 +838,7 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight)
http_arg_get(&hc->hc_args, "User-Agent"));
if(s) {
name = tvh_strdupa(service->s_nicename);
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, mc, s, &cfg->dvr_muxcnf);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
}
@ -863,9 +884,7 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight)
if (!s)
return HTTP_STATUS_BAD_REQUEST;
name = tvh_strdupa(s->ths_title);
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, MC_RAW, s, &muxcfg);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
streaming_queue_deinit(&sq);
@ -941,9 +960,7 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight)
if(s) {
name = tvh_strdupa(channel_get_name(ch));
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, mc, s, &cfg->dvr_muxcnf);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
}