diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index db7ce1862..495b074b8 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -23,19 +23,17 @@ /* Internal datastructures */ struct connection { enum { - WEBSOCKET_ESTABLISHED, - WEBSOCKET_ACTIVE, - WEBSOCKET_SHUTDOWN, - WEBSOCKET_CLOSED + WEBSOCKET_CONNECTION_CLOSED, + WEBSOCKET_CONNECTION_ESTABLISHED, + WEBSOCKET_CONNECTION_ACTIVE, + WEBSOCKET_CONNECTION_SHUTDOWN } state; struct node *node; - struct path *path; - - struct queue queue; /**< For samples which are sent to the WebSocket */ - struct lws *wsi; + struct queue queue; /**< For samples which are sent to the WebSocket */ + struct { char name[64]; char ip[64]; @@ -60,7 +58,7 @@ static const char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS static const char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS */ static const char *htdocs; /**< Path to the directory which should be served by build in HTTP server */ -static int id = 0; +static int id = 0; /**< Counter for Websocket node id field. See struct webmsg. */ struct list connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */ @@ -75,18 +73,6 @@ static struct lws_protocols protocols[] = { { NULL } }; -__attribute__((unused)) static int connection_init(struct connection *c) -{ - /** @todo */ - return -1; -} - -__attribute__((unused)) static void connection_destroy(struct connection *c) -{ - if (c->_name) - free(c->_name); -} - static char * connection_name(struct connection *c) { if (!c->_name) { @@ -99,6 +85,53 @@ static char * connection_name(struct connection *c) return c->_name; } +static int connection_init(struct connection *c, struct lws *wsi) +{ + int ret; + + struct websocket *w = (struct websocket *) c->node->_vd; + + lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip)); + + info("LWS: New connection %s", connection_name(c)); + + c->state = WEBSOCKET_CONNECTION_ESTABLISHED; + c->wsi = wsi; + + if (c->node != NULL) + list_push(&w->connections, c); + else + list_push(&connections, c); + + ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); + if (ret) { + warn("Failed to create queue for incoming websocket connection. Closing.."); + return -1; + } + + return 0; +} + +static void connection_destroy(struct connection *c) +{ + struct websocket *w = (struct websocket *) c->node->_vd; + + info("LWS: Connection %s closed", connection_name(c)); + + c->state = WEBSOCKET_CONNECTION_CLOSED; + c->wsi = NULL; + + if (c->node) + list_remove(&w->connections, c); + else + list_remove(&connections, c); + + if (c->_name) + free(c->_name); + + queue_destroy(&c->queue); +} + static void destination_destroy(struct destination *d) { free(d->uri); @@ -112,22 +145,15 @@ static int connection_write(struct connection *c, struct sample *smps[], unsigne struct websocket *w = c->node->_vd; switch (c->state) { - case WEBSOCKET_SHUTDOWN: + case WEBSOCKET_CONNECTION_CLOSED: + case WEBSOCKET_CONNECTION_SHUTDOWN: return -1; - case WEBSOCKET_CLOSED: - if (c->node) { - struct websocket *w = (struct websocket *) c->node->_vd; - list_remove(&w->connections, c); - } - else - list_remove(&connections, c); - break; - case WEBSOCKET_ESTABLISHED: - c->state = WEBSOCKET_ACTIVE; + case WEBSOCKET_CONNECTION_ESTABLISHED: + c->state = WEBSOCKET_CONNECTION_ACTIVE; /* fall through */ - case WEBSOCKET_ACTIVE: + case WEBSOCKET_CONNECTION_ACTIVE: blocks = pool_get_many(&w->pool, (void **) bufs, cnt); if (blocks != cnt) warn("Pool underrun in websocket connection: %s", connection_name(c)); @@ -365,38 +391,15 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v return -1; } - c->state = WEBSOCKET_ESTABLISHED; - c->wsi = wsi; - - ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); - if (ret) { - warn("Failed to create queue for incoming websocket connection. Closing.."); + ret = connection_init(c, wsi); + if (ret) return -1; - } - - /* Lookup peer address for debug output */ - lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip)); - - info("LWS: New connection %s", connection_name(c)); - - if (c->node != NULL) { - struct websocket *w = (struct websocket *) c->node->_vd; - list_push(&w->connections, c); - } - else { - list_push(&connections, c); - } return 0; } case LWS_CALLBACK_CLOSED: - info("LWS: Connection %s closed", connection_name(c)); - - c->state = WEBSOCKET_CLOSED; - c->wsi = NULL; - - queue_destroy(&c->queue); + connection_destroy(c); return 0; @@ -407,7 +410,7 @@ static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, v if (c->node && c->node->state != NODE_RUNNING) return -1; - if (c->state == WEBSOCKET_SHUTDOWN) { + if (c->state == WEBSOCKET_CONNECTION_SHUTDOWN) { lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Node stopped", 4); return -1; } @@ -534,9 +537,6 @@ int websocket_open(struct node *n) w->id = id++; - list_init(&w->connections); - list_init(&w->destinations); - size_t blocklen = LWS_PRE + WEBMSG_LEN(DEFAULT_VALUES); ret = pool_init(&w->pool, 64 * DEFAULT_QUEUELEN, blocklen, &memtype_hugepage); @@ -547,6 +547,8 @@ int websocket_open(struct node *n) if (ret) return ret; + /** @todo Connection to destinations via WebSocket client */ + return 0; } @@ -555,22 +557,21 @@ int websocket_close(struct node *n) struct websocket *w = n->_vd; list_foreach(struct connection *c, &w->connections) { - c->state = WEBSOCKET_SHUTDOWN; + c->state = WEBSOCKET_CONNECTION_SHUTDOWN; lws_callback_on_writable(c->wsi); } pool_destroy(&w->pool); queue_destroy(&w->queue); - - list_destroy(&w->connections, NULL, false); - + return 0; } int websocket_destroy(struct node *n) { struct websocket *w = n->_vd; - + + list_destroy(&w->connections, NULL, false); list_destroy(&w->destinations, (dtor_cb_t) destination_destroy, true); return 0; @@ -578,17 +579,13 @@ int websocket_destroy(struct node *n) int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) { + int got; + struct websocket *w = n->_vd; struct webmsg *msgs[cnt]; - - int got; - - do { - got = queue_pull_many(&w->queue, (void **) msgs, cnt); - pthread_yield(); - } while (got == 0); - + + got = queue_pull_many(&w->queue, (void **) msgs, cnt); for (int i = 0; i < got; i++) { smps[i]->sequence = msgs[i]->sequence; smps[i]->length = msgs[i]->length; @@ -606,13 +603,11 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct websocket *w = n->_vd; - list_foreach(struct connection *c, &w->connections) { + list_foreach(struct connection *c, &w->connections) connection_write(c, smps, cnt); - } - list_foreach(struct connection *c, &connections) { + list_foreach(struct connection *c, &connections) connection_write(c, smps, cnt); - } return cnt; } @@ -622,7 +617,10 @@ int websocket_parse(struct node *n, config_setting_t *cfg) struct websocket *w = n->_vd; config_setting_t *cfg_dests; int ret; - + + list_init(&w->connections); + list_init(&w->destinations); + cfg_dests = config_setting_get_member(cfg, "destinations"); if (cfg_dests) { if (!config_setting_is_array(cfg_dests))