ACL, HTSP, HTTP: Added streaming connection limit per user

This commit is contained in:
Jaroslav Kysela 2014-10-06 15:40:12 +02:00
parent 6508c9448e
commit 6f4661ddcf
9 changed files with 195 additions and 79 deletions

View file

@ -82,6 +82,11 @@ The columns have the following functions:
<dd>
Enables access to the Configuration tab.
<dt>Limit Connections
<dd>
If nonzero, the user will be limited to this amount of streaming
connection at a time.
<dt>Min Channel Num
<dd>
If nonzero, the user will only be able to access channels with

View file

@ -270,13 +270,14 @@ access_verify(const char *username, const char *password,
if(strcmp(ae->ae_username, username) ||
strcmp(ae->ae_password, password))
continue; /* username/password mismatch */
match = 1;
}
if(!netmask_verify(ae, src))
continue; /* IP based access mismatches */
if (ae->ae_username[0] != '*')
match = 1;
bits |= ae->ae_rights;
}
@ -295,6 +296,9 @@ access_verify(const char *username, const char *password,
static void
access_update(access_t *a, access_entry_t *ae)
{
if(a->aa_conn_limit < ae->ae_conn_limit)
a->aa_conn_limit = ae->ae_conn_limit;
if(ae->ae_chmin || ae->ae_chmax) {
if(a->aa_chmin || a->aa_chmax) {
if (a->aa_chmin < ae->ae_chmin)
@ -399,6 +403,14 @@ access_get_hashed(const char *username, const uint8_t digest[20],
SHA_CTX shactx;
uint8_t d[20];
if (username) {
a->aa_username = strdup(username);
a->aa_representative = strdup(username);
} else {
a->aa_representative = malloc(50);
tcp_get_ip_str((struct sockaddr*)src, a->aa_representative, 50);
}
if(access_noacl) {
a->aa_rights = ACCESS_FULL;
return a;
@ -418,7 +430,6 @@ access_get_hashed(const char *username, const uint8_t digest[20],
}
}
TAILQ_FOREACH(ae, &access_entries, ae_link) {
if(!ae->ae_enabled)
@ -445,6 +456,8 @@ access_get_hashed(const char *username, const uint8_t digest[20],
/* Username was not matched - no access */
if (!a->aa_match) {
free(a->aa_username);
a->aa_username = NULL;
if (username && *username != '\0')
a->aa_rights = 0;
}
@ -1079,6 +1092,12 @@ const idclass_t access_entry_class = {
.name = "Admin",
.off = offsetof(access_entry_t, ae_admin),
},
{
.type = PT_U32,
.id = "conn_limit",
.name = "Limit Connections",
.off = offsetof(access_entry_t, ae_conn_limit),
},
{
.type = PT_U32,
.id = "channel_min",

View file

@ -57,6 +57,8 @@ typedef struct access_entry {
int ae_streaming;
int ae_adv_streaming;
uint32_t ae_conn_limit;
int ae_dvr;
struct dvr_config *ae_dvr_config;
LIST_ENTRY(access_entry) ae_dvr_config_link;
@ -99,6 +101,7 @@ typedef struct access {
uint32_t aa_chmax;
htsmsg_t *aa_chtags;
int aa_match;
uint32_t aa_conn_limit;
} access_t;
#define ACCESS_ANONYMOUS 0

View file

@ -2202,7 +2202,7 @@ struct {
/**
* Raise privs by field in message
*/
static void
static int
htsp_authenticate(htsp_connection_t *htsp, htsmsg_t *m)
{
const char *username;
@ -2212,7 +2212,7 @@ htsp_authenticate(htsp_connection_t *htsp, htsmsg_t *m)
int privgain;
if((username = htsmsg_get_str(m, "username")) == NULL)
return;
return 0;
if(strcmp(htsp->htsp_username ?: "", username)) {
tvhlog(LOG_INFO, "htsp", "%s: Identified as user %s",
@ -2223,7 +2223,7 @@ htsp_authenticate(htsp_connection_t *htsp, htsmsg_t *m)
}
if(htsmsg_get_bin(m, "digest", &digest, &digestlen))
return;
return 0;
rights = access_get_hashed(username, digest, htsp->htsp_challenge,
(struct sockaddr *)htsp->htsp_peer);
@ -2237,6 +2237,7 @@ htsp_authenticate(htsp_connection_t *htsp, htsmsg_t *m)
access_destroy(htsp->htsp_granted_access);
htsp->htsp_granted_access = rights;
return privgain;
}
/**
@ -2281,6 +2282,18 @@ htsp_read_message(htsp_connection_t *htsp, htsmsg_t **mp, int timeout)
return 0;
}
/*
* Status callback
*/
static void
htsp_server_status ( void *opaque, htsmsg_t *m )
{
htsp_connection_t *htsp = opaque;
htsmsg_add_str(m, "type", "HTSP");
if (htsp->htsp_username)
htsmsg_add_str(m, "user", htsp->htsp_username);
}
/**
*
*/
@ -2290,6 +2303,7 @@ htsp_read_loop(htsp_connection_t *htsp)
htsmsg_t *m = NULL, *reply;
int r, i;
const char *method;
void *tcp_id = NULL;;
if(htsp_generate_challenge(htsp)) {
tvhlog(LOG_ERR, "htsp", "%s: Unable to generate challenge",
@ -2298,10 +2312,18 @@ htsp_read_loop(htsp_connection_t *htsp)
}
pthread_mutex_lock(&global_lock);
htsp->htsp_granted_access =
access_get_by_addr((struct sockaddr *)htsp->htsp_peer);
tcp_id = tcp_connection_launch(htsp->htsp_fd, htsp_server_status,
htsp->htsp_granted_access);
pthread_mutex_unlock(&global_lock);
if (tcp_id == NULL)
return 0;
tvhlog(LOG_INFO, "htsp", "Got connection from %s", htsp->htsp_logname);
/* Session main loop */
@ -2312,37 +2334,46 @@ readmsg:
return r;
pthread_mutex_lock(&global_lock);
htsp_authenticate(htsp, m);
if (htsp_authenticate(htsp, m)) {
tcp_connection_land(tcp_id);
tcp_id = tcp_connection_launch(htsp->htsp_fd, htsp_server_status,
htsp->htsp_granted_access);
if (tcp_id == NULL) {
htsmsg_destroy(m);
pthread_mutex_unlock(&global_lock);
return 1;
}
}
if((method = htsmsg_get_str(m, "method")) != NULL) {
tvhtrace("htsp", "%s - method %s", htsp->htsp_logname, method);
for(i = 0; i < NUM_METHODS; i++) {
if(!strcmp(method, htsp_methods[i].name)) {
if(!strcmp(method, htsp_methods[i].name)) {
if((htsp->htsp_granted_access->aa_rights & htsp_methods[i].privmask) !=
htsp_methods[i].privmask) {
if((htsp->htsp_granted_access->aa_rights &
htsp_methods[i].privmask) !=
htsp_methods[i].privmask) {
pthread_mutex_unlock(&global_lock);
/* Classic authentication failed delay */
usleep(250000);
/* Classic authentication failed delay */
usleep(250000);
reply = htsmsg_create_map();
htsmsg_add_u32(reply, "noaccess", 1);
htsp_reply(htsp, m, reply);
reply = htsmsg_create_map();
htsmsg_add_u32(reply, "noaccess", 1);
htsp_reply(htsp, m, reply);
htsmsg_destroy(m);
goto readmsg;
htsmsg_destroy(m);
goto readmsg;
} else {
reply = htsp_methods[i].fn(htsp, m);
}
break;
}
} else {
reply = htsp_methods[i].fn(htsp, m);
}
break;
}
}
if(i == NUM_METHODS) {
reply = htsp_error("Method not found");
reply = htsp_error("Method not found");
}
} else {
@ -2356,6 +2387,10 @@ readmsg:
htsmsg_destroy(m);
}
pthread_mutex_lock(&global_lock);
tcp_connection_land(tcp_id);
pthread_mutex_unlock(&global_lock);
return 0;
}
@ -2526,18 +2561,6 @@ htsp_serve(int fd, void **opaque, struct sockaddr_storage *source,
*opaque = NULL;
}
/*
* Status callback
*/
static void
htsp_server_status ( void *opaque, htsmsg_t *m )
{
htsp_connection_t *htsp = opaque;
htsmsg_add_str(m, "type", "HTSP");
if (htsp->htsp_username)
htsmsg_add_str(m, "user", htsp->htsp_username);
}
/*
* Cancel callback
*/
@ -2556,7 +2579,6 @@ htsp_init(const char *bindaddr)
static tcp_server_ops_t ops = {
.start = htsp_serve,
.stop = NULL,
.status = htsp_server_status,
.cancel = htsp_server_cancel
};
htsp_server = tcp_server_create(bindaddr, tvheadend_htsp_port, &ops, NULL);

View file

@ -996,7 +996,7 @@ http_server_init(const char *bindaddr)
static tcp_server_ops_t ops = {
.start = http_serve,
.stop = NULL,
.status = NULL,
.cancel = NULL
};
http_server = tcp_server_create(bindaddr, tvheadend_webui_port, &ops, NULL);
}

View file

@ -34,11 +34,11 @@
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include "tcp.h"
#include "tvheadend.h"
#include "tcp.h"
#include "tvhpoll.h"
#include "queue.h"
#include "notify.h"
#include "access.h"
int tcp_preferred_address_family = AF_INET;
int tcp_server_running;
@ -390,6 +390,7 @@ typedef struct tcp_server_launch {
int fd;
tcp_server_ops_t ops;
void *opaque;
char *representative;
void (*status) (void *opaque, htsmsg_t *m);
struct sockaddr_storage peer;
struct sockaddr_storage self;
@ -407,23 +408,51 @@ static LIST_HEAD(, tcp_server_launch) tcp_server_join = { 0 };
*
*/
void *
tcp_connection_launch(int fd, void (*status) (void *opaque, htsmsg_t *m))
tcp_connection_launch
(int fd, void (*status) (void *opaque, htsmsg_t *m), access_t *aa)
{
tcp_server_launch_t *tsl;
tcp_server_launch_t *tsl, *res = NULL;
uint32_t used = 0;
time_t started = dispatch_clock;
lock_assert(&global_lock);
assert(status);
if (aa == NULL)
return NULL;
try_again:
LIST_FOREACH(tsl, &tcp_server_active, alink) {
if (tsl->fd == fd) {
tsl->status = status;
LIST_INSERT_HEAD(&tcp_server_launches, tsl, link);
notify_reload("connections");
return tsl;
res = tsl;
if (!aa->aa_conn_limit)
break;
continue;
}
if (!strcmp(aa->aa_representative ?: "", tsl->representative ?: ""))
used++;
}
return NULL;
if (aa->aa_conn_limit && used >= aa->aa_conn_limit) {
if (started + 3 < dispatch_clock) {
tvherror("tcp", "multiple connections are not allowed for user '%s' from '%s' (limit %u)",
aa->aa_username ?: "", aa->aa_representative ?: "", aa->aa_conn_limit);
return NULL;
}
pthread_mutex_unlock(&global_lock);
usleep(250000);
pthread_mutex_lock(&global_lock);
if (tvheadend_running)
goto try_again;
return NULL;
}
res->representative = aa->aa_representative ? strdup(aa->aa_representative) : NULL;
res->status = status;
LIST_INSERT_HEAD(&tcp_server_launches, res, link);
notify_reload("connections");
return res;
}
/**
@ -436,8 +465,14 @@ tcp_connection_land(void *id)
lock_assert(&global_lock);
if (id == NULL)
return;
LIST_REMOVE(tsl, link);
notify_reload("connections");
free(tsl->representative);
tsl->representative = NULL;
}
/*
@ -481,16 +516,10 @@ tcp_server_start(void *aux)
pthread_mutex_lock(&global_lock);
tsl->id = ++tcp_server_launch_id;
if (!tsl->id) tsl->id = ++tcp_server_launch_id;
if (tsl->ops.status) {
tsl->status = tsl->ops.status;
LIST_INSERT_HEAD(&tcp_server_launches, tsl, link);
notify_reload("connections");
}
tsl->ops.start(tsl->fd, &tsl->opaque, &tsl->peer, &tsl->self);
/* Stop */
if (tsl->ops.stop) tsl->ops.stop(tsl->opaque);
if (tsl->ops.status) tcp_connection_land(tsl);
LIST_REMOVE(tsl, alink);
LIST_INSERT_HEAD(&tcp_server_join, tsl, jlink);
pthread_mutex_unlock(&global_lock);
@ -547,8 +576,10 @@ tcp_server_loop(void *aux)
if(ev.events & TVHPOLL_IN) {
tsl = malloc(sizeof(tcp_server_launch_t));
tsl->ops = ts->ops;
tsl->opaque = ts->opaque;
tsl->ops = ts->ops;
tsl->opaque = ts->opaque;
tsl->status = NULL;
tsl->representative = NULL;
slen = sizeof(struct sockaddr_storage);
tsl->fd = accept(ts->serverfd,

View file

@ -43,7 +43,6 @@ typedef struct tcp_server_ops
struct sockaddr_storage *peer,
struct sockaddr_storage *self);
void (*stop) (void *opaque);
void (*status) (void *opaque, htsmsg_t *m);
void (*cancel) (void *opaque);
} tcp_server_ops_t;
@ -80,7 +79,10 @@ int tcp_read_timeout(int fd, void *buf, size_t len, int timeout);
char *tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen);
void *tcp_connection_launch(int fd, void (*status) (void *opaque, htsmsg_t *m));
struct access;
void *tcp_connection_launch(int fd, void (*status) (void *opaque, htsmsg_t *m),
struct access *aa);
void tcp_connection_land(void *id);
htsmsg_t *tcp_server_connections ( void );

View file

@ -5,8 +5,8 @@
tvheadend.acleditor = function(panel, index)
{
var list = 'enabled,username,password,prefix,streaming,adv_streaming,' +
'dvr,dvr_config,webui,admin,channel_min,channel_max,channel_tag,' +
'comment';
'dvr,dvr_config,webui,admin,conn_limit,channel_min,channel_max,' +
'channel_tag,comment';
tvheadend.idnode_grid(panel, {
url: 'api/access/entry',
@ -22,6 +22,7 @@ tvheadend.acleditor = function(panel, index)
dvr: { width: 100 },
webui: { width: 100 },
admin: { width: 100 },
conn_limit: { width: 100 },
channel_min: { width: 100 },
channel_max: { width: 100 },
},

View file

@ -260,7 +260,7 @@ page_static_file(http_connection_t *hc, const char *remain, void *opaque)
}
/**
* HTTP stream status callback
* HTTP subscription handling
*/
static void
http_stream_status ( void *opaque, htsmsg_t *m )
@ -271,6 +271,18 @@ http_stream_status ( void *opaque, htsmsg_t *m )
htsmsg_add_str(m, "user", hc->hc_username);
}
static inline void *
http_stream_preop ( http_connection_t *hc )
{
return tcp_connection_launch(hc->hc_fd, http_stream_status, hc->hc_access);
}
static inline void
http_stream_postop ( void *tcp_id )
{
tcp_connection_land(tcp_id);
}
/**
* HTTP stream loop
*/
@ -288,13 +300,6 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
struct timeval tp;
int err = 0;
socklen_t errlen = sizeof(err);
void *tcp_id;
tcp_id = tcp_connection_launch(hc->hc_fd, http_stream_status);
if (tcp_id == NULL)
return;
pthread_mutex_unlock(&global_lock);
mux = muxer_create(mc, mcfg);
if(muxer_open_stream(mux, hc->hc_fd))
@ -416,10 +421,6 @@ http_stream_run(http_connection_t *hc, streaming_queue_t *sq,
muxer_close(mux);
muxer_destroy(mux);
pthread_mutex_lock(&global_lock);
tcp_connection_land(tcp_id);
}
@ -800,10 +801,15 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight)
size_t qsize;
const char *name;
char addrbuf[50];
void *tcp_id;
int res = 0;
if(http_access_verify(hc, ACCESS_ADVANCED_STREAMING))
return HTTP_STATUS_UNAUTHORIZED;
if((tcp_id = http_stream_preop(hc)) == NULL)
return HTTP_STATUS_NOT_ALLOWED;
cfg = dvr_config_find_by_name_default(NULL);
/* Build muxer config - this takes the defaults from the default dvr config, which is a hack */
@ -838,8 +844,12 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight)
http_arg_get(&hc->hc_args, "User-Agent"));
if(s) {
name = tvh_strdupa(service->s_nicename);
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, mc, s, &cfg->dvr_muxcnf);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
} else {
res = HTTP_STATUS_BAD_REQUEST;
}
if(gh)
@ -850,7 +860,8 @@ http_stream_service(http_connection_t *hc, service_t *service, int weight)
streaming_queue_deinit(&sq);
return 0;
http_stream_postop(tcp_id);
return res;
}
/**
@ -868,10 +879,15 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight)
const char *name;
char addrbuf[50];
muxer_config_t muxcfg = { 0 };
void *tcp_id;
int res = 0;
if(http_access_verify(hc, ACCESS_ADVANCED_STREAMING))
return HTTP_STATUS_UNAUTHORIZED;
if((tcp_id = http_stream_preop(hc)) == NULL)
return HTTP_STATUS_NOT_ALLOWED;
streaming_queue_init(&sq, SMT_PACKET);
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, addrbuf, 50);
@ -881,15 +897,21 @@ http_stream_mux(http_connection_t *hc, mpegts_mux_t *mm, int weight)
SUBSCRIPTION_STREAMING,
addrbuf, hc->hc_username,
http_arg_get(&hc->hc_args, "User-Agent"), NULL);
if (!s)
return HTTP_STATUS_BAD_REQUEST;
name = tvh_strdupa(s->ths_title);
http_stream_run(hc, &sq, name, MC_RAW, s, &muxcfg);
subscription_unsubscribe(s);
if (s) {
name = tvh_strdupa(s->ths_title);
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, MC_RAW, s, &muxcfg);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
} else {
res = HTTP_STATUS_BAD_REQUEST;
}
streaming_queue_deinit(&sq);
return 0;
http_stream_postop(tcp_id);
return res;
}
#endif
@ -914,10 +936,15 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight)
size_t qsize;
const char *name;
char addrbuf[50];
void *tcp_id;
int res = 0;
if (http_access_verify_channel(hc, ACCESS_STREAMING, ch, 1))
return HTTP_STATUS_UNAUTHORIZED;
if((tcp_id = http_stream_preop(hc)) == NULL)
return HTTP_STATUS_NOT_ALLOWED;
cfg = dvr_config_find_by_name_default(NULL);
/* Build muxer config - this takes the defaults from the default dvr config, which is a hack */
@ -960,8 +987,12 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight)
if(s) {
name = tvh_strdupa(channel_get_name(ch));
pthread_mutex_unlock(&global_lock);
http_stream_run(hc, &sq, name, mc, s, &cfg->dvr_muxcnf);
pthread_mutex_lock(&global_lock);
subscription_unsubscribe(s);
} else {
res = HTTP_STATUS_BAD_REQUEST;
}
if(gh)
@ -977,7 +1008,9 @@ http_stream_channel(http_connection_t *hc, channel_t *ch, int weight)
streaming_queue_deinit(&sq);
return 0;
http_stream_postop(tcp_id);
return res;
}