1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

added first version of web socket client support

This commit is contained in:
Steffen Vogel 2017-04-24 19:28:45 +02:00
parent a1a52d4336
commit 4e5dc58e18
2 changed files with 266 additions and 117 deletions

View file

@ -51,6 +51,11 @@ struct websocket_connection {
char ip[64];
} peer;
enum {
WEBSOCKET_MODE_CLIENT,
WEBSOCKET_MODE_SERVER,
} mode;
enum state state;
char *_name;

View file

@ -23,6 +23,7 @@
/* Private static storage */
static struct list connections = { .state = STATE_DESTROYED }; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
static struct web *web;
/* Forward declarations */
static struct plugin p;
@ -30,10 +31,12 @@ static struct plugin p;
static char * websocket_connection_name(struct websocket_connection *c)
{
if (!c->_name) {
strcatf(&c->_name, "%s (%s)", c->peer.name, c->peer.ip);
if (c->node)
asprintf(&c->_name, "%s (%s) for node %s", c->peer.name, c->peer.ip, node_name(c->node));
else
asprintf(&c->_name, "%s (%s) for all nodes", c->peer.name, c->peer.ip);
strcatf(&c->_name, " for node %s", node_name(c->node));
strcatf(&c->_name, " in %s mode", c->mode == WEBSOCKET_MODE_CLIENT ? "client" : "server");
}
return c->_name;
@ -42,9 +45,7 @@ static char * websocket_connection_name(struct websocket_connection *c)
static int websocket_connection_init(struct websocket_connection *c, struct lws *wsi)
{
int ret;
struct websocket *w = c->node->_vd;
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
info("LWS: New connection %s", websocket_connection_name(c));
@ -52,86 +53,75 @@ static int websocket_connection_init(struct websocket_connection *c, struct lws
c->state = STATE_INITIALIZED;
c->wsi = wsi;
if (c->node != NULL)
if (c->node) {
struct websocket *w = c->node->_vd;
list_push(&w->connections, c);
}
else
list_push(&connections, c);
ret = queue_init(&c->queue, DEFAULT_WEBSOCKET_QUEUELEN, &memtype_hugepage);
if (ret) {
warn("Failed to create queue for incoming websocket connection. Closing..");
return -1;
}
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
if (ret)
return ret;
return 0;
}
static void websocket_connection_destroy(struct websocket_connection *c)
static int websocket_connection_destroy(struct websocket_connection *c)
{
if (c->state == STATE_DESTROYED)
return;
int ret;
struct websocket *w = c->node->_vd;
if (c->state == STATE_DESTROYED)
return 0;
info("LWS: Connection %s closed", websocket_connection_name(c));
c->state = STATE_DESTROYED;
c->wsi = NULL;
if (c->node)
if (c->node) {
struct websocket *w = c->node->_vd;
list_remove(&w->connections, c);
}
else
list_remove(&connections, c);
if (c->_name)
free(c->_name);
queue_destroy(&c->queue);
ret = queue_destroy(&c->queue);
if (ret)
return ret;
c->state = STATE_DESTROYED;
c->wsi = NULL;
return ret;
}
static void websocket_destination_destroy(struct websocket_destination *d)
{
free(d->uri);
free((char *) d->info.path);
free((char *) d->info.address);
}
static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt)
{
int blocks, enqueued;
char *bufs[cnt];
struct websocket *w = c->node->_vd;
int ret;
switch (c->state) {
case STATE_DESTROYED:
case STATE_STOPPED:
return -1;
case STATE_INITIALIZED:
c->state = STATE_STARTED;
/* fall through */
case STATE_STARTED:
blocks = pool_get_many(&w->pool, (void **) bufs, cnt);
if (blocks != cnt)
warn("Pool underrun in websocket connection: %s", websocket_connection_name(c));
for (int i = 0; i < blocks; i++) {
struct webmsg *msg = (struct webmsg *) (bufs[i] + LWS_PRE);
msg->version = WEBMSG_VERSION;
msg->type = WEBMSG_TYPE_DATA;
msg->length = smps[i]->length;
msg->sequence = smps[i]->sequence;
msg->id = c->node->id;
msg->ts.sec = smps[i]->ts.origin.tv_sec;
msg->ts.nsec = smps[i]->ts.origin.tv_nsec;
memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4);
for (int i = 0; i < cnt; i++) {
sample_get(smps[i]); /* increase reference count */
ret = queue_push(&c->queue, (void **) smps[i]);
if (ret != 1)
warn("Queue overrun in websocket connection: %s", websocket_connection_name(c));
}
enqueued = queue_push_many(&c->queue, (void **) bufs, cnt);
if (enqueued != blocks)
warn("Queue overrun in websocket connection: %s", websocket_connection_name(c));
lws_callback_on_writable(c->wsi);
break;
@ -142,25 +132,50 @@ static int websocket_connection_write(struct websocket_connection *c, struct sam
return 0;
}
static void websocket_connection_close(struct websocket_connection *c, struct lws *wsi, enum lws_close_status status, const char *reason)
{
lws_close_reason(wsi, status, (unsigned char *) reason, strlen(reason));
char *msg = strf("LWS: Closing connection");
if (c)
msg = strcatf(&msg, " with %s", websocket_connection_name(c));
msg = strcatf(&msg, ": status=%u, reason=%s", status, reason);
warn(msg);
free(msg);
}
int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
int ret;
struct websocket_connection *c = user;
struct websocket *w;
struct webmsg *msg;
struct sample *smp;
switch (reason) {
case LWS_CALLBACK_CLIENT_ESTABLISHED:
ret = websocket_connection_init(c, wsi);
if (ret)
return -1;
return 0;
case LWS_CALLBACK_ESTABLISHED:
c->state = STATE_DESTROYED;
c->mode = WEBSOCKET_MODE_SERVER;
/* Get path of incoming request */
char uri[64];
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
if (strlen(uri) <= 0) {
warn("LWS: Closing connection with invalid URL: %s", uri);
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL");
return -1;
}
if ((uri[0] == '/' && uri[1] == 0) || uri[0] == 0){
/* Catch all connection */
c->node = NULL;
@ -171,13 +186,9 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
/* Search for node whose name matches the URI. */
c->node = list_lookup(&p.node.instances, node);
if (c->node == NULL) {
warn("LWS: Closing Connection for non-existent node: %s", uri + 1);
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
return -1;
}
/* Check if node is running */
if (c->node->state != STATE_STARTED)
return -1;
}
ret = websocket_connection_init(c, wsi);
@ -188,35 +199,60 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
case LWS_CALLBACK_CLOSED:
websocket_connection_destroy(c);
if (c->mode == WEBSOCKET_MODE_CLIENT)
free(c);
return 0;
case LWS_CALLBACK_CLIENT_WRITEABLE:
case LWS_CALLBACK_SERVER_WRITEABLE:
w = c->node->_vd;
if (c->node && c->node->state != STATE_STARTED)
return -1;
if (c->state == STATE_STOPPED) {
lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Node stopped", 4);
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_NORMAL, "Goodbye");
return -1;
}
char *buf;
int cnt;
while ((cnt = queue_pull(&c->queue, (void **) &buf))) {
struct webmsg *msg = (struct webmsg *) (buf + LWS_PRE);
pool_put(&w->pool, (void *) buf);
if (c->node && c->node->state != STATE_STARTED) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
return -1;
}
char *buf = NULL;
while (queue_pull(&c->queue, (void **) &smp)) {
buf = realloc(buf, LWS_PRE + WEBMSG_LEN(smp->length));
if (!buf)
serror("realloc failed:");
msg = (struct webmsg *) (buf + LWS_PRE);
msg->version = WEBMSG_VERSION;
msg->type = WEBMSG_TYPE_DATA;
msg->length = smp->length;
msg->sequence = smp->sequence;
msg->id = smp->source->id;
msg->ts.sec = smp->ts.origin.tv_sec;
msg->ts.nsec = smp->ts.origin.tv_nsec;
memcpy(&msg->data, &smp->data, SAMPLE_DATA_LEN(smp->length));
webmsg_hton(msg);
sample_put(smp);
ret = lws_write(wsi, (unsigned char *) msg, WEBMSG_LEN(msg->length), LWS_WRITE_BINARY);
if (ret < WEBMSG_LEN(msg->length))
error("Failed lws_write()");
if (ret < 0) {
warn("Failed lws_write() for connection %s", websocket_connection_name(c));
return -1;
}
if (lws_send_pipe_choked(wsi))
break;
}
free(buf);
/* There are still samples in the queue */
if (queue_available(&c->queue) > 0)
lws_callback_on_writable(wsi);
@ -224,34 +260,76 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
case LWS_CALLBACK_CLIENT_RECEIVE:
case LWS_CALLBACK_RECEIVE:
w = c->node->_vd;
if (c->node->state != STATE_STARTED)
if (!lws_frame_is_binary(wsi)) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Binary data expected");
return -1;
}
if (!lws_frame_is_binary(wsi) || len < WEBMSG_LEN(0))
warn("LWS: Received invalid packet for connection %s", websocket_connection_name(c));
if (len < WEBMSG_LEN(0)) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid packet");
return -1;
}
struct timespec ts_recv = time_now();
struct webmsg *msg = (struct webmsg *) in;
while ((char *) msg + WEBMSG_LEN(msg->length) <= (char *) in + len) {
struct webmsg *msg2 = pool_get(&w->pool);
if (!msg2) {
warn("Pool underrun for connection %s", websocket_connection_name(c));
msg = (struct webmsg *) in;
while ((char *) msg + WEBMSG_LEN(msg->length) < (char *) in + len) {
struct node *dest;
/* Convert message to host byte-order */
webmsg_ntoh(msg);
/* Find destination node of this message */
if (c->node)
dest = c->node;
else {
dest = NULL;
for (int i = 0; i < list_length(&p.node.instances); i++) {
struct node *n = list_at(&p.node.instances, i);
if (n->id == msg->id) {
dest = n;
break;
}
}
if (!dest) {
warn("Ignoring message due to invalid node id");
goto next;
}
}
struct websocket *w = dest->_vd;
ret = sample_alloc(&w->pool, &smp, 1);
if (ret != 1) {
warn("Pool underrun for connection: %s", websocket_connection_name(c));
break;
}
memcpy(msg2, msg, WEBMSG_LEN(msg->length));
smp->ts.origin = WEBMSG_TS(msg);
smp->ts.received = ts_recv;
smp->sequence = msg->sequence;
smp->length = msg->length;
if (smp->length > smp->capacity) {
smp->length = smp->capacity;
warn("Dropping values for connection: %s", websocket_connection_name(c));
}
ret = queue_signalled_push_many(&w->queue, (void **) msg2, 1);
memcpy(&smp->data, &msg->data, SAMPLE_DATA_LEN(smp->length));
ret = queue_signalled_push(&w->queue, (void **) smp);
if (ret != 1) {
warn("Queue overrun for connection %s", websocket_connection_name(c));
break;
}
/* Next message */
msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length));
next: msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length));
}
return 0;
default:
@ -259,6 +337,37 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
}
}
int websocket_init(struct super_node *sn)
{
list_init(&connections);
web = &sn->web;
if (web->state != STATE_STARTED)
return -1;
return 0;
}
int websocket_deinit()
{
for (size_t i = 0; i < list_length(&connections); i++) {
struct websocket_connection *c = list_at(&connections, i);
c->state = STATE_STOPPED;
lws_callback_on_writable(c->wsi);
}
/* Wait for all connections to be closed */
while (list_length(&connections) > 0)
sleep(0.2);
list_destroy(&connections, (dtor_cb_t) websocket_destination_destroy, true);
return 0;
}
int websocket_start(struct node *n)
{
int ret;
@ -274,7 +383,21 @@ int websocket_start(struct node *n)
if (ret)
return ret;
/** @todo Connection to destinations via WebSocket client */
for (int i = 0; i < list_length(&w->destinations); i++) {
struct websocket_destination *d = list_at(&w->destinations, i);
struct websocket_connection *c = alloc(sizeof(struct websocket_connection));
c->state = STATE_DESTROYED;
c->mode = WEBSOCKET_MODE_CLIENT;
c->node = n;
d->info.context = web->context;
d->info.vhost = web->vhost;
d->info.userdata = c;
lws_client_connect_via_info(&d->info);
}
return 0;
}
@ -291,15 +414,19 @@ int websocket_stop(struct node *n)
lws_callback_on_writable(c->wsi);
}
ret = pool_destroy(&w->pool);
if (ret)
return ret;
/* Wait for all connections to be closed */
while (list_length(&w->connections) > 0)
sleep(1);
ret = queue_signalled_destroy(&w->queue);
if (ret)
return ret;
ret = pool_destroy(&w->pool);
if (ret)
return ret;
return 0;
}
@ -315,46 +442,56 @@ int websocket_destroy(struct node *n)
int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
{
int got;
int avail;
struct websocket *w = n->_vd;
struct webmsg *msgs[cnt];
struct sample *cpys[cnt];
do {
got = queue_signalled_pull_many(&w->queue, (void **) msgs, cnt);
if (got < 0)
return got;
} while (got == 0);
for (int i = 0; i < got; i++) {
smps[i]->sequence = msgs[i]->sequence;
smps[i]->length = msgs[i]->length;
smps[i]->ts.origin = WEBMSG_TS(msgs[i]);
memcpy(&smps[i]->data, &msgs[i]->data, WEBMSG_DATA_LEN(msgs[i]->length));
}
pool_put_many(&w->pool, (void **) msgs, got);
avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt);
if (avail < 0)
return avail;
} while (avail == 0);
return got;
for (int i = 0; i < avail; i++) {
sample_copy(smps[i], cpys[i]);
sample_put(cpys[i]);
}
return avail;
}
int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
{
int avail;
struct websocket *w = n->_vd;
struct sample *cpys[cnt];
/* Make copies of all samples */
avail = sample_alloc(&w->pool, cpys, cnt);
if (avail < cnt)
warn("Pool underrun for node %s: avail=%u", node_name(n), avail);
for (int i = 0; i < avail; i++) {
sample_copy(cpys[i], smps[i]);
cpys[i]->source = n;
}
for (size_t i = 0; i < list_length(&w->connections); i++) {
struct websocket_connection *c = list_at(&w->connections, i);
websocket_connection_write(c, smps, cnt);
websocket_connection_write(c, cpys, cnt);
}
for (size_t i = 0; i < list_length(&connections); i++) {
struct websocket_connection *c = list_at(&connections, i);
websocket_connection_write(c, smps, cnt);
websocket_connection_write(c, cpys, cnt);
}
sample_put_many(cpys, avail);
return cnt;
}
@ -381,6 +518,8 @@ int websocket_parse(struct node *n, config_setting_t *cfg)
cerror(cfg_dests, "The 'destinations' setting must be an array of URLs");
struct websocket_destination d;
memset(&d, 0, sizeof(d));
d.uri = strdup(uri);
if (!d.uri)
@ -391,10 +530,13 @@ int websocket_parse(struct node *n, config_setting_t *cfg)
cerror(cfg_dests, "Failed to parse websocket URI: '%s'", uri);
d.info.ssl_connection = !strcmp(prot, "https");
d.info.address = ads;
d.info.path = path;
d.info.protocol = prot;
d.info.address = strdup(ads);
d.info.host = d.info.address;
d.info.origin = d.info.address;
d.info.ietf_version_or_minus_one = -1;
d.info.protocol = "live";
asprintf((char **) &d.info.path, "/%s", path);
list_push(&w->destinations, memdup(&d, sizeof(d)));
}
@ -434,6 +576,8 @@ static struct plugin p = {
.node = {
.vectorize = 0, /* unlimited */
.size = sizeof(struct websocket),
.init = websocket_init,
.deinit = websocket_deinit,
.start = websocket_start,
.stop = websocket_stop,
.destroy = websocket_destroy,