diff --git a/etc/websocket.conf b/etc/websocket.conf index 21e2513b8..2a012a0a3 100644 --- a/etc/websocket.conf +++ b/etc/websocket.conf @@ -33,7 +33,8 @@ nodes = { series = ( { label = "Random walk" }, { label = "Sine" }, - { label = "Rect" } + { label = "Rect" }, + { label = "Ramp" } ) } }; diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 4f3149534..9d4679d51 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -29,38 +29,35 @@ struct lws; /** Internal data per websocket node */ struct websocket { struct list connections; /**< List of active libwebsocket connections in server mode (struct websocket_connection) */ + struct list destinations; /**< List of struct lws_client_connect_info to connect to in client mode. */ struct pool pool; - struct queue queue_tx; /**< For samples which are sent to WebSockets */ - struct queue queue_rx; /**< For samples which are received from WebSockets */ - - qptr_t sent; - qptr_t received; - - int shutdown; + struct queue queue; /**< For samples which are received from WebSockets a */ + + int id; /**< The index of this node */ }; struct websocket_connection { enum { WEBSOCKET_ESTABLISHED, WEBSOCKET_ACTIVE, + WEBSOCKET_SHUTDOWN, WEBSOCKET_CLOSED } state; struct node *node; struct path *path; + struct queue queue; /**< For samples which are sent to the WebSocket */ + struct lws *wsi; struct { char name[64]; char ip[64]; } peer; - - qptr_t sent; - qptr_t received; }; /** @see node_vtable::init */ diff --git a/include/villas/sample.h b/include/villas/sample.h index c412f7127..4caf3f927 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -45,8 +45,6 @@ struct sample { atomic_int refcnt; /**< Reference counter. */ struct pool *pool; /**< This sample is belong to this memory pool. */ - - int endian; /**< Endianess of data in the sample. */ /** All timestamps are seconds / nano seconds after 1.1.1970 UTC */ struct { @@ -57,9 +55,9 @@ struct sample { /** The values. */ union { - float f; /**< Floating point values (note msg::endian) */ - uint32_t i; /**< Integer values (note msg::endian) */ - } data[]; + float f; /**< Floating point values. */ + uint32_t i; /**< Integer values. */ + } data[]; /**< Data is in host endianess! */ }; /** Request \p cnt samples from memory pool \p p and initialize them. diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index d7b51733c..901d1c8ce 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -16,6 +16,7 @@ #include #include "nodes/websocket.h" +#include "webmsg_format.h" #include "timing.h" #include "utils.h" #include "msg.h" @@ -33,43 +34,21 @@ static const char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS static const char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS */ static const char *htdocs; /**< Path to the directory which should be served by build in HTTP server */ +static int id = 0; + +struct list connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */ + /* Forward declarations */ static struct node_type vt; static int protocol_cb_http(struct lws *, enum lws_callback_reasons, void *, void *, size_t); static int protocol_cb_live(struct lws *, enum lws_callback_reasons, void *, void *, size_t); static struct lws_protocols protocols[] = { - { - "http-only", - protocol_cb_http, - 0, - 0 - }, - { - "live", - protocol_cb_live, - sizeof(struct websocket_connection), - 0 - }, - { 0 /* terminator */ } + { "http-only", protocol_cb_http, 0, 0 }, + { "live", protocol_cb_live, sizeof(struct websocket_connection), 0 }, + { NULL } }; -#if 0 -static const struct lws_extension exts[] = { - { - "permessage-deflate", - lws_extension_callback_pm_deflate, - "permessage-deflate" - }, - { - "deflate-frame", - lws_extension_callback_pm_deflate, - "deflate_frame" - }, - { NULL, NULL, NULL /* terminator */ } -}; -#endif - static void logger(int level, const char *msg) { int len = strlen(msg); if (strchr(msg, '\n')) @@ -80,7 +59,7 @@ static void logger(int level, const char *msg) { level = LLL_WARN; switch (level) { - case LLL_ERR: error("LWS: %.*s", len, msg); break; + case LLL_ERR: warn("LWS: %.*s", len, msg); break; case LLL_WARN: warn("LWS: %.*s", len, msg); break; case LLL_INFO: info("LWS: %.*s", len, msg); break; default: debug(DBG_WEBSOCKET | 1, "LWS: %.*s", len, msg); break; @@ -160,8 +139,9 @@ int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *us list_foreach(struct node *n, &vt.instances) { struct websocket *w = n->_vd; - json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }", + json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i, s: i }", "name", node_name_short(n), + "id", w->id, "connections", list_length(&w->connections), "state", n->state, "vectorize", n->vectorize, @@ -228,7 +208,7 @@ int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *us } default: - break; + return 0; } return 0; @@ -242,6 +222,7 @@ try_to_reuse: int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { + int ret; struct websocket_connection *c = user; struct websocket *w; @@ -256,36 +237,63 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us return -1; } - /* Search for node whose name matches the URI. */ - c->node = list_lookup(&vt.instances, uri + 1); - if (c->node == NULL) { - warn("LWS: Closing Connection for non-existent node: %s", uri + 1); - return -1; + if ((uri[0] == '/' && uri[1] == 0) || uri[0] == 0){ + /* Catch all connection */ + c->node = NULL; } + else { + char *node = uri + 1; - /* Check if node is running */ - if (c->node->state != NODE_RUNNING) - return -1; + /* Search for node whose name matches the URI. */ + c->node = list_lookup(&vt.instances, node); + if (c->node == NULL) { + warn("LWS: Closing Connection for non-existent node: %s", uri + 1); + return -1; + } + + /* Check if node is running */ + if (c->node->state != NODE_RUNNING) + return -1; + } c->state = WEBSOCKET_ESTABLISHED; c->wsi = wsi; + + ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); + if (ret) { + warn("Failed to create queue for incoming websocket connection. Closing.."); + return -1; + } /* Lookup peer address for debug output */ lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip)); - info("LWS: New Connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); + if (c->node != NULL) + info("LWS: New connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); + else + info("LWS: New connection from %s (%s)", c->peer.name, c->peer.ip); - struct websocket *w = (struct websocket *) c->node->_vd; - list_push(&w->connections, c); + if (c->node != NULL) { + struct websocket *w = (struct websocket *) c->node->_vd; + list_push(&w->connections, c); + } + else { + list_push(&connections, c); + } return 0; } case LWS_CALLBACK_CLOSED: - info("LWS: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); + if (c->node != NULL) + info("LWS: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); + else + info("LWS: Connection closed from %s (%s)", c->peer.name, c->peer.ip); c->state = WEBSOCKET_CLOSED; c->wsi = NULL; + + queue_destroy(&c->queue); return 0; @@ -293,36 +301,30 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us case LWS_CALLBACK_SERVER_WRITEABLE: { w = (struct websocket *) c->node->_vd; - if (c->node->state != NODE_RUNNING) + if (c->node && c->node->state != NODE_RUNNING) return -1; - if (w->shutdown) { - lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Bye", 4); + if (w->state == WEBSOCKET_SHUTDOWN) { + lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Node stopped", 4); return -1; } - - - int cnt, sent, ret; - unsigned char *bufs[DEFAULT_QUEUELEN]; - - cnt = queue_get_many(&w->queue_tx, (void **) bufs, DEFAULT_QUEUELEN, c->sent); - for (sent = 0; sent < cnt; sent++) { - struct msg *msg = (struct msg *) (bufs[sent] + LWS_PRE); + char *buf; + int cnt; + while ((cnt = queue_pull(&c->queue, (void **) &buf))) { + struct webmsg *msg = (struct webmsg *) (buf + LWS_PRE); - ret = lws_write(wsi, (unsigned char *) msg, MSG_LEN(msg->length), LWS_WRITE_BINARY); - if (ret < MSG_LEN(msg->length)) + pool_put(&w->pool, (void *) buf); + + ret = lws_write(wsi, (unsigned char *) msg, WEBMSG_LEN(msg->length), LWS_WRITE_BINARY); + if (ret < WEBMSG_LEN(msg->length)) error("Failed lws_write()"); if (lws_send_pipe_choked(wsi)) - break; + break; } - - queue_pull_many(&w->queue_tx, (void **) bufs, sent, &c->sent); - - pool_put_many(&w->pool, (void **) bufs, sent); - if (sent < cnt) + if (queue_available(&c->queue) > 0) lws_callback_on_writable(wsi); return 0; @@ -335,27 +337,30 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us if (c->node->state != NODE_RUNNING) return -1; - if (!lws_frame_is_binary(wsi) || len < MSG_LEN(0)) + if (!lws_frame_is_binary(wsi) || len < WEBMSG_LEN(0)) warn("LWS: Received invalid packet for node: %s", node_name(c->node)); - struct msg *msg = (struct msg *) in; + struct webmsg *msg = (struct webmsg *) in; - while ((char *) msg + MSG_LEN(msg->length) <= (char *) in + len) { - struct msg *msg2 = pool_get(&w->pool); + while ((char *) msg + WEBMSG_LEN(msg->length) <= (char *) in + len) { + struct webmsg *msg2 = pool_get(&w->pool); if (!msg2) { warn("Pool underrun for node: %s", node_name(c->node)); - return -1; + break; } - memcpy(msg2, msg, MSG_LEN(msg->length)); + memcpy(msg2, msg, WEBMSG_LEN(msg->length)); - queue_push(&w->queue_rx, msg2, &c->received); + ret = queue_push(&w->queue, msg2); + if (ret != 1) { + warn("Queue overrun for node: %s", node_name(c->node)); + break; + } /* Next message */ - msg = (struct msg *) ((char *) msg + MSG_LEN(msg->length)); + msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length)); } - /** @todo Implement */ return 0; } @@ -421,29 +426,24 @@ int websocket_deinit() int websocket_open(struct node *n) { - struct websocket *w = n->_vd; - int ret; + struct websocket *w = n->_vd; + + w->id = id++; list_init(&w->connections); list_init(&w->destinations); - size_t blocklen = LWS_PRE + MSG_LEN(DEFAULT_VALUES); + size_t blocklen = LWS_PRE + WEBMSG_LEN(DEFAULT_VALUES); - ret = pool_init_mmap(&w->pool, blocklen, 2 * DEFAULT_QUEUELEN); + ret = pool_init(&w->pool, 64 * DEFAULT_QUEUELEN, blocklen, &memtype_hugepage); if (ret) return ret; - ret = queue_init(&w->queue_tx, DEFAULT_QUEUELEN); + ret = queue_init(&w->queue, DEFAULT_QUEUELEN, &memtype_hugepage); if (ret) return ret; - - ret = queue_init(&w->queue_rx, DEFAULT_QUEUELEN); - if (ret) - return ret; - - queue_reader_add(&w->queue_rx, 0, 0); - + return 0; } @@ -451,13 +451,13 @@ int websocket_close(struct node *n) { struct websocket *w = n->_vd; - w->shutdown = 1; - - list_foreach(struct lws *wsi, &w->connections) - lws_callback_on_writable(wsi); + list_foreach(struct websocket_connection *c, &w->connections) { + c->state = WEBSOCKET_SHUTDOWN; + lws_callback_on_writable(c->wsi); + } pool_destroy(&w->pool); - queue_destroy(&w->queue_tx); + queue_destroy(&w->queue); list_destroy(&w->connections, NULL, false); @@ -475,17 +475,21 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) { struct websocket *w = n->_vd; - struct msg *msgs[cnt]; + struct webmsg *msgs[cnt]; int got; - got = queue_pull_many(&w->queue_rx, (void **) msgs, cnt, &w->received); + do { + got = queue_pull_many(&w->queue, (void **) msgs, cnt); + pthread_yield(); + } while (got == 0); + for (int i = 0; i < got; i++) { smps[i]->sequence = msgs[i]->sequence; smps[i]->length = msgs[i]->length; - smps[i]->ts.origin = MSG_TS(msgs[i]); + smps[i]->ts.origin = WEBMSG_TS(msgs[i]); - memcpy(&smps[i]->data, &msgs[i]->data, MSG_DATA_LEN(msgs[i]->length)); + memcpy(&smps[i]->data, &msgs[i]->data, WEBMSG_DATA_LEN(msgs[i]->length)); } pool_put_many(&w->pool, (void **) msgs, got); @@ -493,59 +497,118 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) return got; } +static int websocket_connection_init(struct websocket_connection *c) +{ + /** @todo */ + return -1; +} + +static void websocket_connection_destroy(struct websocket_connection *c) +{ + /** @todo */ +} + +static char * websocket_connection_name(struct websocket_connection *c) +{ + /** @todo */ + return "(todo)"; +} + +static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt) +{ + int blocks, enqueued; + char *bufs[cnt]; + + switch (c->state) { + case WEBSOCKET_SHUTDOWN: + return -1; + case WEBSOCKET_CLOSED: + if (c->node) { + struct websocket *w = (struct websocket *) c->node->_vd; + list_remove(&w->connections, c); + } + else + list_remove(&connections, c); + break; + + case WEBSOCKET_ESTABLISHED: + c->state = WEBSOCKET_ACTIVE; + /* fall through */ + + case WEBSOCKET_ACTIVE: + blocks = pool_get_many(&w->pool, (void **) bufs, cnt); + if (blocks != cnt) + warn("Pool underrun in websocket connection: %s", websocket_connection_name(c)); + + for (int i = 0; i < blocks; i++) { + struct webmsg *msg = (struct webmsg *) (bufs[i] + LWS_PRE); + + msg->version = WEBMSG_VERSION; + msg->type = WEBMSG_TYPE_DATA; + msg->endian = WEBMSG_ENDIAN_HOST; + msg->length = smps[i]->length; + msg->sequence = smps[i]->sequence; + msg->id = w->id; + msg->ts.sec = smps[i]->ts.origin.tv_sec; + msg->ts.nsec = smps[i]->ts.origin.tv_nsec; + + memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4); + } + + enqueued = queue_push_many(&c->queue, (void **) bufs, cnt); + if (enqueued != blocks) + warn("Queue overrun in websocket connection: %s", websocket_connection_name(c)); + + lws_callback_on_writable(c->wsi); + break; + } + + return 0; +} + int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct websocket *w = n->_vd; - int blocks, enqueued; - char *bufs[cnt]; - - /* Copy samples to websocket queue */ - blocks = pool_get_many(&w->pool, (void **) bufs, cnt); - if (blocks != cnt) - warn("Pool underrun in websocket node: %s", node_name(n)); - - for (int i = 0; i < blocks; i++) { - struct msg *msg = (struct msg *) (bufs[i] + LWS_PRE); - - msg->version = MSG_VERSION; - msg->type = MSG_TYPE_DATA; - msg->endian = MSG_ENDIAN_HOST; - msg->length = smps[i]->length; - msg->sequence = smps[i]->sequence; - msg->ts.sec = smps[i]->ts.origin.tv_sec; - msg->ts.nsec = smps[i]->ts.origin.tv_nsec; - - memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4); + list_foreach(struct websocket_connection *c, &w->connections) { + websocket_write_connection(c, smps, cnt); } - enqueued = queue_push_many(&w->queue_tx, (void **) bufs, cnt, &w->sent); - if (enqueued != blocks) - warn("Queue overrun in websocket node: %s", node_name(n)); - - /* Notify all active websocket connections to send new data */ - list_foreach(struct websocket_connection *c, &w->connections) { - switch (c->state) { - case WEBSOCKET_CLOSED: - queue_reader_remove(&w->queue_tx, c->sent, w->sent); - list_remove(&w->connections, c); - break; - - case WEBSOCKET_ESTABLISHED: - c->sent = w->sent; - c->state = WEBSOCKET_ACTIVE; - - queue_reader_add(&w->queue_tx, c->sent, w->sent); - - case WEBSOCKET_ACTIVE: - lws_callback_on_writable(c->wsi); - break; - } + list_foreach(struct websocket_connection *c, &connections) { + websocket_write_connection(c, smps, cnt); } return cnt; } +int websocket_parse(struct node *n, config_setting_t *cfg) +{ + config_setting_t *cfg_dests; + + cfg_dests = config_setting_get_member(cfg, "destinations"); + + if (!config_setting_is_array(cfg_dests)) + cerror(dests, "The 'destinations' setting must be an array of URLs"); + + for (int i = 0; i < config_setting_length(cfg_dests); i++) { + config_setting_t *cfg_dest; + const char *url; + struct lws_client_connect_info *i; + + url = config_setting_get_string_elem(cfg_dests, i); + if (!url) + cerror(dests, "The 'destinations' setting must be an array of URLs"); + + i = alloc(sizeof()) + + } +} + +char * websocket_print(struct node *n) +{ + +} + static struct node_type vt = { .name = "websocket", .description = "Send and receive samples of a WebSocket connection (libwebsockets)", @@ -557,7 +620,9 @@ static struct node_type vt = { .read = websocket_read, .write = websocket_write, .init = websocket_init, - .deinit = websocket_deinit + .deinit = websocket_deinit, + .print = websocket_print, + .parse = websocket_parse }; REGISTER_NODE_TYPE(&vt) \ No newline at end of file diff --git a/thirdparty/libwebsockets b/thirdparty/libwebsockets index 0c984014f..5fb327754 160000 --- a/thirdparty/libwebsockets +++ b/thirdparty/libwebsockets @@ -1 +1 @@ -Subproject commit 0c984014f0a82e184af2ff18f97b45e2cbccd0bd +Subproject commit 5fb327754ab4d202fca903dd5bd6b546b340eecb