diff --git a/lib/Makefile.inc b/lib/Makefile.inc index 60c598c9a..43c0b1829 100644 --- a/lib/Makefile.inc +++ b/lib/Makefile.inc @@ -48,12 +48,12 @@ endif endif # Enable WebSocket support -#ifndef WITHOUT_WEBSOCKETS +ifndef WITHOUT_WEBSOCKETS ifeq ($(shell pkg-config libwebsockets jansson; echo $$?),0) -# LIB_SRCS += lib/nodes/websocket.c + LIB_SRCS += lib/nodes/websocket.c LIB_PKGS += libwebsockets jansson endif -#endif +endif # Enable OPAL-RT Asynchronous Process support (will result in 32bit binary!!!) ifdef WITH_OPAL diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index c2a1d1004..8fededa50 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -19,6 +19,7 @@ #include "msg.h" #include "cfg.h" #include "config.h" +#include "plugin.h" /* Internal datastructures */ struct destination { @@ -27,44 +28,26 @@ struct destination { }; /* Private static storage */ -static config_setting_t *cfg_root; /**< Root config */ -static pthread_t thread; /**< All nodes are served by a single websocket server. This server is running in a dedicated thread. */ -static struct lws_context *context; /**< The libwebsockets server context. */ +static int id = 0; /**< Highest assigned ID to websocket nodes. */ -static int port; /**< Port of the build in HTTP / WebSocket server */ - -static const char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS */ -static const char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS */ -static const char *htdocs; /**< Path to the directory which should be served by build in HTTP server */ - -static int id = 0; - -struct list connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */ +struct list connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */ /* Forward declarations */ static struct node_type vt; -static int protocol_cb_http(struct lws *, enum lws_callback_reasons, void *, void *, size_t); -static int protocol_cb_live(struct lws *, enum lws_callback_reasons, void *, void *, size_t); -static struct lws_protocols protocols[] = { - { "http-only", protocol_cb_http, 0, 0 }, - { "live", protocol_cb_live, sizeof(struct connection), 0 }, - { NULL } -}; - -__attribute__((unused)) static int connection_init(struct connection *c) +__attribute__((unused)) static int websocket_connection_init(struct websocket_connection *c) { /** @todo */ return -1; } -__attribute__((unused)) static void connection_destroy(struct connection *c) +__attribute__((unused)) static void websocket_connection_destroy(struct websocket_connection *c) { if (c->_name) free(c->_name); } -static char * connection_name(struct connection *c) +static char * websocket_connection_name(struct websocket_connection *c) { if (!c->_name) { if (c->node) @@ -76,12 +59,7 @@ static char * connection_name(struct connection *c) return c->_name; } -static void destination_destroy(struct destination *d) -{ - free(d->uri); -} - -static int connection_write(struct connection *c, struct sample *smps[], unsigned cnt) +static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt) { int blocks, enqueued; char *bufs[cnt]; @@ -107,7 +85,7 @@ static int connection_write(struct connection *c, struct sample *smps[], unsigne case WEBSOCKET_ACTIVE: blocks = pool_get_many(&w->pool, (void **) bufs, cnt); if (blocks != cnt) - warn("Pool underrun in websocket connection: %s", connection_name(c)); + warn("Pool underrun in websocket connection: %s", websocket_connection_name(c)); for (int i = 0; i < blocks; i++) { struct webmsg *msg = (struct webmsg *) (bufs[i] + LWS_PRE); @@ -126,7 +104,7 @@ static int connection_write(struct connection *c, struct sample *smps[], unsigne enqueued = queue_push_many(&c->queue, (void **) bufs, cnt); if (enqueued != blocks) - warn("Queue overrun in websocket connection: %s", connection_name(c)); + warn("Queue overrun in websocket connection: %s", websocket_connection_name(c)); lws_callback_on_writable(c->wsi); break; @@ -135,182 +113,15 @@ static int connection_write(struct connection *c, struct sample *smps[], unsigne return 0; } -static void logger(int level, const char *msg) { - int len = strlen(msg); - if (strchr(msg, '\n')) - len -= 1; - - /* Decrease severity for some errors. */ - if (strstr(msg, "Unable to open") == msg) - level = LLL_WARN; - - switch (level) { - case LLL_ERR: warn("LWS: %.*s", len, msg); break; - case LLL_WARN: warn("LWS: %.*s", len, msg); break; - case LLL_INFO: info("LWS: %.*s", len, msg); break; - default: debug(LOG_WEBSOCKET | 1, "LWS: %.*s", len, msg); break; - } -} - -static void * server_thread(void *ctx) +static void websocket_destination_destroy(struct destination *d) { - debug(LOG_WEBSOCKET | 3, "WebSocket: Started server thread"); - - while (lws_service(context, 10) >= 0); - - debug(LOG_WEBSOCKET | 3, "WebSocket: shutdown voluntarily"); - - return NULL; + free(d->uri); } -/* Choose mime type based on the file extension */ -static char * get_mimetype(const char *resource_path) -{ - char *extension = strrchr(resource_path, '.'); - - if (extension == NULL) - return "text/plain"; - else if (!strcmp(extension, ".png")) - return "image/png"; - else if (!strcmp(extension, ".svg")) - return "image/svg+xml"; - else if (!strcmp(extension, ".jpg")) - return "image/jpg"; - else if (!strcmp(extension, ".gif")) - return "image/gif"; - else if (!strcmp(extension, ".html")) - return "text/html"; - else if (!strcmp(extension, ".css")) - return "text/css"; - else if (!strcmp(extension, ".js")) - return "application/javascript"; - else - return "text/plain"; -} - -static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) -{ - switch (reason) { - case LWS_CALLBACK_HTTP: - if (!htdocs) { - lws_return_http_status(wsi, HTTP_STATUS_SERVICE_UNAVAILABLE, NULL); - goto try_to_reuse; - } - - if (len < 1) { - lws_return_http_status(wsi, HTTP_STATUS_BAD_REQUEST, NULL); - goto try_to_reuse; - } - - char *requested_uri = (char *) in; - - debug(LOG_WEBSOCKET | 3, "LWS: New HTTP request: %s", requested_uri); - - /* Handle default path */ - if (!strcmp(requested_uri, "/")) { - char *response = "HTTP/1.1 302 Found\r\n" - "Content-Length: 0\r\n" - "Location: /index.html\r\n" - "\r\n"; - - lws_write(wsi, (void *) response, strlen(response), LWS_WRITE_HTTP); - - goto try_to_reuse; - } -#ifdef WITH_JANSSON - /* Return list of websocket nodes */ - else if (!strcmp(requested_uri, "/nodes.json")) { - json_t *json_body = json_array(); - - list_foreach(struct node *n, &vt.instances) { - struct websocket *w = n->_vd; - - json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i, s: i }", - "name", node_name_short(n), - "id", w->id, - "connections", list_length(&w->connections), - "state", n->state, - "vectorize", n->vectorize, - "affinity", n->affinity - ); - - /* Add all additional fields of node here. - * This can be used for metadata */ - json_object_update(json_node, config_to_json(n->cfg)); - - json_array_append_new(json_body, json_node); - } - - char *body = json_dumps(json_body, JSON_INDENT(4)); - - char *header = "HTTP/1.1 200 OK\r\n" - "Connection: close\r\n" - "Content-Type: application/json\r\n" - "\r\n"; - - lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP); - lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP); - - free(body); - json_decref(json_body); - - return -1; - } - else if (!strcmp(requested_uri, "/config.json")) { - char *body = json_dumps(config_to_json(cfg_root), JSON_INDENT(4)); - - char *header = "HTTP/1.1 200 OK\r\n" - "Connection: close\r\n" - "Content-Type: application/json\r\n" - "\r\n"; - - lws_write(wsi, (void *) header, strlen(header), LWS_WRITE_HTTP); - lws_write(wsi, (void *) body, strlen(body), LWS_WRITE_HTTP); - - free(body); - - return -1; - } -#endif - else { - char path[4069]; - snprintf(path, sizeof(path), "%s%s", htdocs, requested_uri); - - /* refuse to serve files we don't understand */ - char *mimetype = get_mimetype(path); - if (!mimetype) { - warn("HTTP: Unknown mimetype for %s", path); - lws_return_http_status(wsi, HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, NULL); - return -1; - } - - int n = lws_serve_http_file(wsi, path, mimetype, NULL, 0); - if (n < 0) - return -1; - else if (n == 0) - break; - else - goto try_to_reuse; - } - - default: - return 0; - } - - return 0; - -try_to_reuse: - if (lws_http_transaction_completed(wsi)) - return -1; - - return 0; -} - -static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) ->>>>>>> feature-mpmc-queue +int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { int ret; - struct connection *c = user; + struct websocket_connection *c = user; struct websocket *w; switch (reason) { @@ -355,7 +166,7 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v /* Lookup peer address for debug output */ lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip)); - info("LWS: New connection %s", connection_name(c)); + info("LWS: New connection %s", websocket_connection_name(c)); if (c->node != NULL) { struct websocket *w = (struct websocket *) c->node->_vd; @@ -369,7 +180,7 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v } case LWS_CALLBACK_CLOSED: - info("LWS: Connection %s closed", connection_name(c)); + info("LWS: Connection %s closed", websocket_connection_name(c)); c->state = WEBSOCKET_CLOSED; c->wsi = NULL; @@ -419,14 +230,14 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v return -1; if (!lws_frame_is_binary(wsi) || len < WEBMSG_LEN(0)) - warn("LWS: Received invalid packet for connection %s", connection_name(c)); + warn("LWS: Received invalid packet for connection %s", websocket_connection_name(c)); struct webmsg *msg = (struct webmsg *) in; while ((char *) msg + WEBMSG_LEN(msg->length) <= (char *) in + len) { struct webmsg *msg2 = pool_get(&w->pool); if (!msg2) { - warn("Pool underrun for connection %s", connection_name(c)); + warn("Pool underrun for connection %s", websocket_connection_name(c)); break; } @@ -434,7 +245,7 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v ret = queue_push(&w->queue, msg2); if (ret != 1) { - warn("Queue overrun for connection %s", connection_name(c)); + warn("Queue overrun for connection %s", websocket_connection_name(c)); break; } @@ -477,7 +288,7 @@ int websocket_close(struct node *n) { struct websocket *w = n->_vd; - list_foreach(struct connection *c, &w->connections) { + list_foreach(struct websocket_connection *c, &w->connections) { c->state = WEBSOCKET_SHUTDOWN; lws_callback_on_writable(c->wsi); } @@ -494,7 +305,7 @@ int websocket_destroy(struct node *n) { struct websocket *w = n->_vd; - list_destroy(&w->destinations, (dtor_cb_t) destination_destroy, true); + list_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true); return 0; } @@ -529,13 +340,11 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct websocket *w = n->_vd; - list_foreach(struct connection *c, &w->connections) { - connection_write(c, smps, cnt); - } + list_foreach(struct websocket_connection *c, &w->connections) + websocket_connection_write(c, smps, cnt); - list_foreach(struct connection *c, &connections) { - connection_write(c, smps, cnt); - } + list_foreach(struct websocket_connection *c, &connections) + websocket_connection_write(c, smps, cnt); return cnt; } @@ -611,8 +420,6 @@ static struct plugin p = { .destroy = websocket_destroy, .read = websocket_read, .write = websocket_write, - .init = websocket_init, - .deinit = websocket_deinit, .print = websocket_print, .parse = websocket_parse } diff --git a/lib/web.c b/lib/web.c index 3fc77994a..ccd81baa6 100644 --- a/lib/web.c +++ b/lib/web.c @@ -31,14 +31,12 @@ static struct lws_protocols protocols[] = { .per_session_data_size = sizeof(struct api_session), .rx_buffer_size = 0 }, -#if 0 { .name = "live", .callback = websocket_protocol_cb, .per_session_data_size = sizeof(struct websocket_connection), .rx_buffer_size = 0 }, -#endif { .name = "api", .callback = api_protocol_cb,