diff --git a/Makefile b/Makefile index fa75918bc..2725915f6 100644 --- a/Makefile +++ b/Makefile @@ -41,8 +41,7 @@ LDFLAGS += -pthread -L. -Wl,-rpath,'$$ORIGIN' # pkg-config dependencies PKGS = libconfig -#DOCKEROPTS = -p 80:80 -p 443:443 --ulimit memlock=1073741824 --security-opt seccomp:unconfined -DOCKEROPTS = -p 1234 --ulimit memlock=1073741824 --security-opt seccomp:unconfined +DOCKEROPTS = -p 80:80 -p 443:443 -p 1234:1234 --ulimit memlock=1073741824 --security-opt seccomp:unconfined # Add more compiler flags ifdef DEBUG @@ -79,11 +78,11 @@ ifeq ($(shell pkg-config libcurl jansson uuid; echo $$?),0) PKGS += libcurl jansson uuid endif -## Enable WebSocket support -#ifeq ($(shell pkg-config libwebsockets jansson; echo $$?),0) -# LIB_OBJS += websocket.o websocket-live.o websocket-http.o -# PKGS += libwebsockets jansson -#endif +# Enable WebSocket support +ifeq ($(shell pkg-config libwebsockets jansson; echo $$?),0) + LIB_OBJS += websocket.o + PKGS += libwebsockets jansson +endif ## Add support for LAPACK / BLAS benchmarks / solvers ifeq ($(shell pkg-config blas lapack; echo $$?),0) diff --git a/etc/websocket.conf b/etc/websocket.conf index 63d1a065e..9b22a26a6 100644 --- a/etc/websocket.conf +++ b/etc/websocket.conf @@ -20,20 +20,12 @@ ############ Dictionary of nodes ############ nodes = { - file = { - type = "file", - #vectorize = 10, - in = { - path = "/test_fifo" - } - }, - ws = { type = "websocket", unit = "MVa", units = [ "V", "A", "Var" ], description = "Das ist ein Test", - vectorize = 100, + #vectorize = 10, source = { simulator = "OP5600", location = "ACS lab" @@ -52,8 +44,8 @@ nodes = { paths = ( { - in = "file", + in = "ws", out = "ws", - hook = [ "stats_send:ws_stats" ] +# hook = [ "stats_send:ws_stats" ] } ); diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 79a5174dc..4f3149534 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -20,19 +20,46 @@ #define _WEBSOCKET_H_ #include "node.h" +#include "pool.h" +#include "queue.h" + +/* Forward declaration */ +struct lws; /** Internal data per websocket node */ struct websocket { - struct list connections; /**< List of active libwebsocket connections (struct websocket_connection) */ - struct list destinations; /**< List of struct lws_client_connect_info to connect to. */ + struct list connections; /**< List of active libwebsocket connections in server mode (struct websocket_connection) */ + struct list destinations; /**< List of struct lws_client_connect_info to connect to in client mode. */ - struct websocket_connection *writer; + struct pool pool; + + struct queue queue_tx; /**< For samples which are sent to WebSockets */ + struct queue queue_rx; /**< For samples which are received from WebSockets */ + + qptr_t sent; + qptr_t received; + + int shutdown; }; struct websocket_connection { + enum { + WEBSOCKET_ESTABLISHED, + WEBSOCKET_ACTIVE, + WEBSOCKET_CLOSED + } state; + struct node *node; struct path *path; + struct lws *wsi; + + struct { + char name[64]; + char ip[64]; + } peer; + + qptr_t sent; qptr_t received; }; diff --git a/lib/nodes/websocket-http.c b/lib/nodes/websocket-http.c deleted file mode 100644 index ef7153e9e..000000000 --- a/lib/nodes/websocket-http.c +++ /dev/null @@ -1,154 +0,0 @@ -/** HTTP protocol of the websocket node type - * - * @author Steffen Vogel - * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC - * This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential. - * Unauthorized copying of this file, via any medium is strictly prohibited. - *********************************************************************************/ - -#include -#include - -#ifdef WITH_JANSSON - #include -#endif - -/* Choose mime type based on the file extension */ -static char * get_mimetype(const char *resource_path) -{ - char *extension = strrchr(resource_path, '.'); - - if (extension == NULL) - return "text/plain"; - else if (!strcmp(extension, ".png")) - return "image/png"; - else if (!strcmp(extension, ".jpg")) - return "image/jpg"; - else if (!strcmp(extension, ".gif")) - return "image/gif"; - else if (!strcmp(extension, ".html")) - return "text/html"; - else if (!strcmp(extension, ".css")) - return "text/css"; - else if (!strcmp(extension, ".js")) - return "application/javascript"; - else - return "text/plain"; -} - -static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) -{ - switch (reason) { - case LWS_CALLBACK_HTTP: - if (!htdocs) { - lws_return_http_status(wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL); - goto try_to_reuse; - } - - if (len < 1) { - lws_return_http_status(wsi, HTTP_STATUS_BAD_REQUEST, NULL); - goto try_to_reuse; - } - - char *requested_uri = (char *) in; - - debug(DBG_WEBSOCKET | 3, "WebSocket: New HTTP request: %s", requested_uri); - - /* Handle default path */ - if (!strcmp(requested_uri, "/")) { - char *response = "HTTP/1.1 302 Found\r\n" - "Content-Length: 0\r\n" - "Location: /index.html\r\n" - "\r\n"; - - lws_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP); - - goto try_to_reuse; - } -#ifdef WITH_JANSSON - /* Return list of websocket nodes */ - else if (!strcmp(requested_uri, "/nodes.json")) { - json_t *json_body = json_array(); - - list_foreach(struct node *n, &vt.instances) { - struct websocket *w = n->_vd; - - json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }", - "name", node_name_short(n), - "connections", list_length(&w->connections), - "state", n->state, - "vectorize", n->vectorize, - "affinity", n->affinity - ); - - /* Add all additional fields of node here. - * This can be used for metadata */ - json_object_update(json_node, config_to_json(n->cfg)); - - json_array_append_new(json_body, json_node); - } - - char *body = json_dumps(json_body, JSON_INDENT(4)); - - char *header = "HTTP/1.1 200 OK\r\n" - "Connection: close\r\n" - "Content-Type: application/json\r\n" - "\r\n"; - - lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP); - lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP); - - free(body); - json_decref(json_body); - - return -1; - } - else if (!strcmp(requested_uri, "/config.json")) { - char *body = json_dumps(config_to_json(cfg_root), JSON_INDENT(4)); - - char *header = "HTTP/1.1 200 OK\r\n" - "Connection: close\r\n" - "Content-Type: application/json\r\n" - "\r\n"; - - lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP); - lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP); - - free(body); - - return -1; - } -#endif - else { - char path[4069]; - snprintf(path, sizeof(path), "%s%s", htdocs, requested_uri); - - /* refuse to serve files we don't understand */ - char *mimetype = get_mimetype(path); - if (!mimetype) { - warn("HTTP: Unknown mimetype for %s", path); - lws_return_http_status(wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL); - return -1; - } - - int n = lws_serve_http_file(wsi, path, mimetype, NULL, 0); - if (n < 0) - return -1; - else if (n == 0) - break; - else - goto try_to_reuse; - } - - default: - break; - } - - return 0; - -try_to_reuse: - if (lws_http_transaction_completed(wsi)) - return -1; - - return 0; -} \ No newline at end of file diff --git a/lib/nodes/websocket-live.c b/lib/nodes/websocket-live.c deleted file mode 100644 index 864b22887..000000000 --- a/lib/nodes/websocket-live.c +++ /dev/null @@ -1,133 +0,0 @@ -/** Live protocol of the websocket node type - * - * This protocol callback function is used to handle the binary websocket protoocol - * which is used to send / receive struct msg's. - * - * @author Steffen Vogel - * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC - * This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential. - * Unauthorized copying of this file, via any medium is strictly prohibited. - *********************************************************************************/ - -static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) -{ - struct conn *c = user; - - switch (reason) { - case LWS_CALLBACK_WSI_CREATE: - c->wsi = wsi; - return 0; - - case LWS_CALLBACK_WSI_DESTROY: - c->wsi = NULL; - - connection_destroy(c); /* release c->peer.ip & c->peer.name */ - return 0; - - case LWS_CALLBACK_CLIENT_ESTABLISHED: - case LWS_CALLBACK_ESTABLISHED: - c->state = CONN_STATE_ESTABLISHED; - c->role = (reason == LWS_CALLBACK_ESTABLISHED) - ? CONN_ROLE_SERVER - : CONN_ROLE_CLIENT; - - /* Get path of incoming request */ - char uri[64] - lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/ - if (strlen(uri) <= 0) { - warn("WebSocket: Closing connection with invalid URL: %s") - return -1; - } - - /* Search for node whose name matches the URI. */ - c->node = list_lookup(&vt.instances, uri + 1); - if (c->node == NULL) { - warn("WebSocket: Closing Connection for non-existent node: %s", uri + 1); - return -1; - } - - /* Check if node is running */ - if (c->node.state != NODE_RUNNING) - return -1; - - /* Alias to ease readability */ - c->ws = n->_vd; - - /* Lookup peer address for debug output */ - c->peer.name = alloc(64); - c->peer.ip = alloc(64); - lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, 64, c->peer.ip, 64); - - info("WebSocket: New Connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); - list_push(&c->ws->connections, c); - - return 0; - - case LWS_CALLBACK_CLOSED: - c->state = CONN_STATE_CLOSED; - - info("WebSocket: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); - list_remove(&c->ws->connections, c); - - return 0; - - case LWS_CALLBACK_CLIENT_WRITABLE: - case LWS_CALLBACK_SERVER_WRITEABLE: - if (c->node.state != NODE_RUNNING) - return -1; - - int sent, sz, remain; - struct queue *q = &c->tx_queue; - - pthread_mutex_lock(&q->lock); - - sz = q->tail - q->head; - if (len == 0) - goto out; /* nothing to sent at the moment */ - - sent = lws_write(wsi, q->head, sz, LWS_WRITE_BINARY); - if (sent < 0) - goto out; - - /* Move unsent part to head of queue */ - remain = sz - sent; - if (remain > 0) - memmove(q->head, q->head + sent, remain); - - /* Update queue tail */ - q->tail = q->head + remain; - -out: pthread_mutex_unlock(&q->lock); - - return (sent < 0) ? -1 : 0; - - case LWS_CALLBACK_CLIENT_RECEIVE: - case LWS_CALLBACK_RECEIVE: - if (c->node.state != NODE_RUNNING) - return -1; - - if (!lws_frame_is_binary(wsi)) { - warn("WebSocket: Received non-binary frame for node %s", node_name(c->node)); - return -1; - } - - int ret = 0; - struct queue *q = &w->rx_queue; - - pthread_mutex_lock(&q->lock); - - - - memcpy(q->tail, in, len); - - q->tail += len; - - pthread_cond_broadcast(&q->cond); -out2: pthread_mutex_unlock(&q->lock); - - return ret; - - default: - return 0; - } -} \ No newline at end of file diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 728a9be28..62aff97a3 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -15,21 +15,12 @@ #include #include -#ifdef WITH_JANSSON - #include -#endif - -#include "websocket.h" +#include "nodes/websocket.h" #include "timing.h" #include "utils.h" #include "msg.h" #include "cfg.h" - -/* Forward declarations */ -static struct node_type vt; - -static int protocol_cb_http(struct lws *, enum lws_callback_reasons, void *, void *, size_t); -static int protocol_cb_live(struct lws *, enum lws_callback_reasons, void *, void *, size_t); +#include "config.h" /* Private static storage */ static config_setting_t *cfg_root; /**< Root config */ @@ -42,12 +33,71 @@ static const char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS static const char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS */ static const char *htdocs; /**< Path to the directory which should be served by build in HTTP server */ +/* Forward declarations */ +static struct node_type vt; +static int protocol_cb_http(struct lws *, enum lws_callback_reasons, void *, void *, size_t); +static int protocol_cb_live(struct lws *, enum lws_callback_reasons, void *, void *, size_t); + static struct lws_protocols protocols[] = { - { "http-only", protocol_cb_http, 0, 0 }, - { "live", protocol_cb_live, sizeof(struct node *), 10 }, - { 0 } /* terminator */ + { + "http-only", + protocol_cb_http, + 0, + 0 + }, + { + "live", + protocol_cb_live, + sizeof(struct websocket_connection), + 0 + }, + { 0 /* terminator */ } }; +#if 0 +static const struct lws_extension exts[] = { + { + "permessage-deflate", + lws_extension_callback_pm_deflate, + "permessage-deflate" + }, + { + "deflate-frame", + lws_extension_callback_pm_deflate, + "deflate_frame" + }, + { NULL, NULL, NULL /* terminator */ } +}; +#endif + +static void logger(int level, const char *msg) { + int len = strlen(msg); + if (strchr(msg, '\n')) + len -= 1; + + /* Decrease severity for some errors. */ + if (strstr(msg, "Unable to open") == msg) + level = LLL_WARN; + + switch (level) { + case LLL_ERR: error("LWS: %.*s", len, msg); break; + case LLL_WARN: warn("LWS: %.*s", len, msg); break; + case LLL_INFO: info("LWS: %.*s", len, msg); break; + default: debug(DBG_WEBSOCKET | 1, "LWS: %.*s", len, msg); break; + } +} + +static void * server_thread(void *ctx) +{ + debug(DBG_WEBSOCKET | 3, "WebSocket: Started server thread"); + + while (lws_service(context, 10) >= 0); + + debug(DBG_WEBSOCKET | 3, "WebSocket: shutdown voluntarily"); + + return NULL; +} + /* Choose mime type based on the file extension */ static char * get_mimetype(const char *resource_path) { @@ -57,6 +107,8 @@ static char * get_mimetype(const char *resource_path) return "text/plain"; else if (!strcmp(extension, ".png")) return "image/png"; + else if (!strcmp(extension, ".svg")) + return "image/svg+xml"; else if (!strcmp(extension, ".jpg")) return "image/jpg"; else if (!strcmp(extension, ".gif")) @@ -71,10 +123,10 @@ static char * get_mimetype(const char *resource_path) return "text/plain"; } -static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { switch (reason) { - case LWS_CALLBACK_HTTP: + case LWS_CALLBACK_HTTP: if (!htdocs) { lws_return_http_status(wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL); goto try_to_reuse; @@ -87,7 +139,7 @@ static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, v char *requested_uri = (char *) in; - debug(DBG_WEBSOCKET | 3, "WebSocket: New HTTP request: %s", requested_uri); + debug(DBG_WEBSOCKET | 3, "LWS: New HTTP request: %s", requested_uri); /* Handle default path */ if (!strcmp(requested_uri, "/")) { @@ -127,7 +179,7 @@ static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, v char *header = "HTTP/1.1 200 OK\r\n" "Connection: close\r\n" - "Content-Type: application/json\r\n" + "Content-Type: application/json\r\n" "\r\n"; lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP); @@ -143,7 +195,7 @@ static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, v char *header = "HTTP/1.1 200 OK\r\n" "Connection: close\r\n" - "Content-Type: application/json\r\n" + "Content-Type: application/json\r\n" "\r\n"; lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP); @@ -188,174 +240,128 @@ try_to_reuse: return 0; } -static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { - struct node *n; + struct websocket_connection *c = user; struct websocket *w; - - char *buf, uri[1024]; switch (reason) { - case LWS_CALLBACK_ESTABLISHED: + case LWS_CALLBACK_CLIENT_ESTABLISHED: + case LWS_CALLBACK_ESTABLISHED: { + /* Get path of incoming request */ + char uri[64]; lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/ - - /* Search for node which matches uri */ - list_foreach(n, &vt.instances) { - if (!strcmp(n->name, uri + 1)) /* we skip leading '/' */ - goto found; + if (strlen(uri) <= 0) { + warn("LWS: Closing connection with invalid URL: %s", uri); + return -1; } - - warn("WebSocket: Closing Connection for non-existent node: %s", uri + 1); - - return -1; - -found: * (void **) user = n; - w = n->_vd; - if (w->shutdown) - goto shutdown; + /* Search for node whose name matches the URI. */ + c->node = list_lookup(&vt.instances, uri + 1); + if (c->node == NULL) { + warn("LWS: Closing Connection for non-existent node: %s", uri + 1); + return -1; + } - list_push(&w->connections, wsi); + /* Check if node is running */ + if (c->node->state != NODE_RUNNING) + return -1; - /* Get peer information */ - char client_name[128], client_ip[128]; - lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), - client_name, sizeof(client_name), - client_ip, sizeof(client_ip) - ); + c->state = WEBSOCKET_ESTABLISHED; + c->wsi = wsi; - info("WebSocket: New Connection for node: %s from %s (%s)", node_name(n), client_name, client_ip); + /* Lookup peer address for debug output */ + lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip)); + + info("LWS: New Connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); + + struct websocket *w = (struct websocket *) c->node->_vd; + list_push(&w->connections, c); return 0; - + } + case LWS_CALLBACK_CLOSED: - n = * (struct node **) user; - w = n->_vd; - - list_remove(&w->connections, wsi); + info("LWS: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip); - info("WebSocket: Connection closed for node: %s", node_name(n)); + c->state = WEBSOCKET_CLOSED; + c->wsi = NULL; return 0; + + case LWS_CALLBACK_CLIENT_WRITEABLE: + case LWS_CALLBACK_SERVER_WRITEABLE: { + w = (struct websocket *) c->node->_vd; + + if (c->node->state != NODE_RUNNING) + return -1; + + if (w->shutdown) { + lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Bye", 4); + return -1; + } + + + int cnt, sent, ret; + unsigned char *bufs[DEFAULT_QUEUELEN]; + + cnt = queue_get_many(&w->queue_tx, (void **) bufs, DEFAULT_QUEUELEN, c->sent); + + for (sent = 0; sent < cnt; sent++) { + struct msg *msg = (struct msg *) (bufs[sent] + LWS_PRE); + + ret = lws_write(wsi, (unsigned char *) msg, MSG_LEN(msg->length), LWS_WRITE_BINARY); + if (ret < MSG_LEN(msg->length)) + error("Failed lws_write()"); + + if (lws_send_pipe_choked(wsi)) + break; + } + + queue_pull_many(&w->queue_tx, (void **) bufs, sent, &c->sent); + + pool_put_many(&w->pool, (void **) bufs, sent); + + if (sent < cnt) + lws_callback_on_writable(wsi); + + return 0; + } + + case LWS_CALLBACK_CLIENT_RECEIVE: + case LWS_CALLBACK_RECEIVE: { + w = (struct websocket *) c->node->_vd; + + if (c->node->state != NODE_RUNNING) + return -1; + + if (!lws_frame_is_binary(wsi) || len < MSG_LEN(0)) + warn("LWS: Received invalid packet for node: %s", node_name(c->node)); + + struct msg *msg = (struct msg *) in; + + while ((char *) msg + MSG_LEN(msg->length) <= (char *) in + len) { + struct msg *msg2 = pool_get(&w->pool); + if (!msg2) { + warn("Pool underrun for node: %s", node_name(c->node)); + return -1; + } + + memcpy(msg2, msg, MSG_LEN(msg->length)); + + queue_push(&w->queue_rx, msg2, &c->received); + + /* Next message */ + msg = (struct msg *) ((char *) msg + MSG_LEN(msg->length)); + } - case LWS_CALLBACK_SERVER_WRITEABLE: - n = * (struct node **) user; - if (!n) - return -1; - - w = n->_vd; - if (!w) - return -1; - - if (w->shutdown) - goto shutdown; - - pthread_mutex_lock(&w->write.mutex); - - if (w->write.pool == NULL || w->write.cnt == 0) - return 0; /* no samples available to send */ - - /* Calculate required buffer size */ - size_t bytes = 0; - for (int i = 0; i < w->write.cnt; i++) { - struct msg *src = pool_getrel(w->write.pool, i); - bytes += MSG_LEN(src->values); - } - - /* Allocate buffer */ - buf = malloc(LWS_SEND_BUFFER_PRE_PADDING + bytes); - - /* Fill buffer */ - for (int i = 0; i < w->write.cnt; i++) { - struct msg *src = pool_getrel(w->write.pool, i); - struct msg *dst = (struct msg *) (buf + len + LWS_SEND_BUFFER_PRE_PADDING); - - size_t bytes = MSG_LEN(src->values); - len += bytes; - - memcpy(dst, src, bytes); - } - - /* We've done our work here. Do not send again.. */ - w->write.pool = NULL; - w->write.pool = 0; - - pthread_mutex_unlock(&w->write.mutex); - - lws_write(wsi, (unsigned char *) (buf + LWS_SEND_BUFFER_PRE_PADDING), len, LWS_WRITE_BINARY); - + /** @todo Implement */ return 0; + } - case LWS_CALLBACK_RECEIVE: - n = * (struct node **) user; - if (!n) - return -1; - - w = n->_vd; - - if (w->read.pool == NULL || w->read.cnt == 0) - return 0; - - pthread_mutex_lock(&w->read.mutex); - - size_t offset = 0; - for (int i = 0; i < w->read.cnt; i++) { - struct msg *dst = pool_getrel(w->read.pool, i); - struct msg *src = (struct msg *) in + offset; - - memcpy(dst, src, MSG_LEN(src->values)); - - offset += MSG_LEN(src->values); - if (offset >= len) - break; - } - - pthread_mutex_unlock(&w->read.mutex); - pthread_cond_broadcast(&w->read.cond); /* new data available, wake-up websocket_read() */ - - return 0; - default: return 0; } - -shutdown: - warn("Dropping connection: node is currently shutting down"); - -#if LWS_LIBRARY_VERSION_NUMBER > 1006002 - char *bye = "VILLASnode is shutting down. Bye"; - lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) bye, strlen(bye)); -#endif - - return -1; -} - -static void logger(int level, const char *msg) { - int len = strlen(msg); - if (strchr(msg, '\n')) - len -= 1; - - /* Decrease severity for some errors. */ - if (strstr(msg, "Unable to open") == msg) - level = LLL_WARN; - - switch (level) { - case LLL_ERR: error("WebSocket: %.*s", len, msg); break; - case LLL_WARN: warn("WebSocket: %.*s", len, msg); break; - case LLL_INFO: info("WebSocket: %.*s", len, msg); break; - default: debug(DBG_WEBSOCKET | 1, "WebSocket: %.*s", len, msg); break; - } -} - -static void * server_thread(void *ctx) -{ - debug(DBG_WEBSOCKET | 3, "WebSocket: Started server thread"); - - while (lws_service(context, 10) >= 0); - - debug(DBG_WEBSOCKET | 3, "WebSocket: shutdown voluntarily"); - - return NULL; } int websocket_init(int argc, char * argv[], config_setting_t *cfg) @@ -383,7 +389,7 @@ int websocket_init(int argc, char * argv[], config_setting_t *cfg) struct lws_context_creation_info info = { .port = port, .protocols = protocols, - .extensions = lws_get_internal_extensions(), + .extensions = NULL, //exts, .ssl_cert_filepath = ssl_cert, .ssl_private_key_filepath = ssl_private_key, .gid = -1, @@ -417,9 +423,27 @@ int websocket_open(struct node *n) { struct websocket *w = n->_vd; + int ret; + list_init(&w->connections); list_init(&w->destinations); + size_t blocklen = LWS_PRE + MSG_LEN(DEFAULT_VALUES); + + ret = pool_init_mmap(&w->pool, blocklen, 2 * DEFAULT_QUEUELEN); + if (ret) + return ret; + + ret = queue_init(&w->queue_tx, DEFAULT_QUEUELEN); + if (ret) + return ret; + + ret = queue_init(&w->queue_rx, DEFAULT_QUEUELEN); + if (ret) + return ret; + + queue_reader_add(&w->queue_rx, 0, 0); + return 0; } @@ -432,38 +456,98 @@ int websocket_close(struct node *n) list_foreach(struct lws *wsi, &w->connections) lws_callback_on_writable(wsi); - /** @todo Is is safe? */ - list_destroy(&w->connections); + pool_destroy(&w->pool); + queue_destroy(&w->queue_tx); + + list_destroy(&w->connections, NULL, false); return 0; } int websocket_destroy(struct node *n) { - struct websocket *w = n->_vd; +// struct websocket *w = n->_vd; return 0; } -int websocket_read(struct node *n, struct pool *pool, unsigned cnt) +int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) { struct websocket *w = n->_vd; - /* Check for new websocket connections and more readers to queue */ - list_foreach(struct) + struct msg *msgs[cnt]; + + int got; + + got = queue_pull_many(&w->queue_rx, (void **) msgs, cnt, &w->received); + + if (got) + info("got %u", got); + + for (int i = 0; i < got; i++) { + smps[i]->sequence = msgs[i]->sequence; + smps[i]->length = msgs[i]->length; + smps[i]->ts.origin = MSG_TS(msgs[i]); + + memcpy(&smps[i]->data, &msgs[i]->data, MSG_DATA_LEN(msgs[i]->length)); + } + + pool_put_many(&w->pool, (void **) msgs, got); - return cnt; + return got; } -int websocket_write(struct node *n, struct pool *pool, unsigned cnt) +int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct websocket *w = n->_vd; + + int blocks, enqueued; + char *bufs[cnt]; + + /* Copy samples to websocket queue */ + blocks = pool_get_many(&w->pool, (void **) bufs, cnt); + if (blocks != cnt) + warn("Pool underrun in websocket node: %s", node_name(n)); + + for (int i = 0; i < blocks; i++) { + struct msg *msg = (struct msg *) (bufs[i] + LWS_PRE); + + msg->version = MSG_VERSION; + msg->type = MSG_TYPE_DATA; + msg->endian = MSG_ENDIAN_HOST; + msg->length = smps[i]->length; + msg->sequence = smps[i]->sequence; + msg->ts.sec = smps[i]->ts.origin.tv_sec; + msg->ts.nsec = smps[i]->ts.origin.tv_nsec; + + memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4); + } + + enqueued = queue_push_many(&w->queue_tx, (void **) bufs, cnt, &w->sent); + if (enqueued != blocks) + warn("Queue overrun in websocket node: %s", node_name(n)); /* Notify all active websocket connections to send new data */ - list_foreach(struct lws *wsi, &w->connections) - lws_callback_on_writable(wsi); + list_foreach(struct websocket_connection *c, &w->connections) { + switch (c->state) { + case WEBSOCKET_CLOSED: + queue_reader_remove(&w->queue_tx, c->sent, w->sent); + list_remove(&w->connections, c); + break; + + case WEBSOCKET_ESTABLISHED: + c->sent = w->sent; + c->state = WEBSOCKET_ACTIVE; + + queue_reader_add(&w->queue_tx, c->sent, w->sent); + + case WEBSOCKET_ACTIVE: + lws_callback_on_writable(c->wsi); + break; + } + } - return 1; + return cnt; } static struct node_type vt = {