diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index f4461ddab..9ccd80af0 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -58,12 +58,13 @@ struct websocket { /* Internal datastructures */ struct websocket_connection { enum websocket_connection_state { - STATE_DISCONNECTED, - STATE_CONNECTING, - STATE_RECONNECTING, - STATE_ESTABLISHED, - STATE_SHUTDOWN, - STATE_ERROR + WEBSOCKET_CONNECTION_STATE_DESTROYED, + WEBSOCKET_CONNECTION_STATE_INITIALIZED, + WEBSOCKET_CONNECTION_STATE_CONNECTING, + WEBSOCKET_CONNECTION_STATE_RECONNECTING, + WEBSOCKET_CONNECTION_STATE_ESTABLISHED, + WEBSOCKET_CONNECTION_STATE_SHUTDOWN, + WEBSOCKET_CONNECTION_STATE_ERROR } state; /**< The current status of this connection. */ enum { @@ -76,6 +77,7 @@ struct websocket_connection { struct io io; struct queue queue; /**< For samples which are sent to the WebSocket */ + struct format_type *format; struct websocket_destination *destination; struct { diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 600f8f87e..90e820f19 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -55,8 +55,8 @@ static char * websocket_connection_name(struct websocket_connection *c) strcatf(&c->_name, "remote.ip=%s, remote.name=%s", ip, name); } - else if (c->destination) - strcatf(&c->_name, "%s:%d", c->destination->info.address, c->destination->info.port); + else if (c->mode == WEBSOCKET_MODE_CLIENT && c->destination != NULL) + strcatf(&c->_name, "dest=%s:%d", c->destination->info.address, c->destination->info.port); if (c->node) strcatf(&c->_name, ", node=%s", node_name(c->node)); @@ -75,10 +75,79 @@ static void websocket_destination_destroy(struct websocket_destination *d) free((char *) d->info.address); } +static int websocket_connection_init(struct websocket_connection *c) +{ + int ret; + + c->_name = NULL; + + ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); + if (ret) + return ret; + + ret = io_init(&c->io, c->format, c->node, SAMPLE_HAS_ALL); + if (ret) + return ret; + + ret = buffer_init(&c->buffers.recv, DEFAULT_WEBSOCKET_BUFFER_SIZE); + if (ret) + return ret; + + ret = buffer_init(&c->buffers.send, DEFAULT_WEBSOCKET_BUFFER_SIZE); + if (ret) + return ret; + + c->state = WEBSOCKET_CONNECTION_STATE_INITIALIZED; + + return 0; +} + +static int websocket_connection_destroy(struct websocket_connection *c) +{ + int ret; + + assert(c->state != WEBSOCKET_CONNECTION_STATE_DESTROYED); + + if (c->_name) + free(c->_name); + + /* Return all samples to pool */ + int avail; + struct sample *smp; + while ((avail = queue_pull(&c->queue, (void **) &smp))) + sample_put(smp); + + ret = queue_destroy(&c->queue); + if (ret) + return ret; + + ret = io_destroy(&c->io); + if (ret) + return ret; + + ret = buffer_destroy(&c->buffers.recv); + if (ret) + return ret; + + ret = buffer_destroy(&c->buffers.send); + if (ret) + return ret; + + c->wsi = NULL; + c->_name = NULL; + + c->state = WEBSOCKET_CONNECTION_STATE_DESTROYED; + + return 0; +} + static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt) { int pushed; + if (c->state != WEBSOCKET_CONNECTION_STATE_ESTABLISHED) + return -1; + pushed = queue_push_many(&c->queue, (void **) smps, cnt); if (pushed < cnt) warn("Queue overrun in WebSocket connection: %s", websocket_connection_name(c)); @@ -108,95 +177,69 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED: - c->wsi = wsi; - c->state = STATE_ESTABLISHED; - - ret = buffer_init(&c->buffers.recv, DEFAULT_WEBSOCKET_BUFFER_SIZE); - if (ret) - return -1; - - ret = buffer_init(&c->buffers.send, DEFAULT_WEBSOCKET_BUFFER_SIZE); - if (ret) - return ret; - - debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c)); - - /* Schedule writable callback in case we have something to send */ - if (queue_available(&c->queue) > 0) - lws_callback_on_writable(wsi); - - break; - case LWS_CALLBACK_ESTABLISHED: c->wsi = wsi; - c->state = STATE_ESTABLISHED; - c->mode = WEBSOCKET_MODE_SERVER; - c->_name = NULL; + c->state = WEBSOCKET_CONNECTION_STATE_ESTABLISHED; - /* We use the URI to associate this connection to a node - * and choose a protocol. - * - * Example: ws://example.com/node_1.json - * Will select the node with the name 'node_1' - * and format 'json'. - */ + if (reason == LWS_CALLBACK_CLIENT_ESTABLISHED) + c->mode = WEBSOCKET_MODE_CLIENT; + else { + c->mode = WEBSOCKET_MODE_SERVER; + /* We use the URI to associate this connection to a node + * and choose a protocol. + * + * Example: ws://example.com/node_1.json + * Will select the node with the name 'node_1' + * and format 'json'. + */ - /* Get path of incoming request */ - char *node, *format = NULL; - char uri[64]; + /* Get path of incoming request */ + char *node, *format; + char uri[64]; - lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/ - if (strlen(uri) <= 0) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL"); - return -1; + lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/ + if (strlen(uri) <= 0) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL"); + return -1; + } + + node = strtok(uri, "/."); + if (!node) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node"); + return -1; + } + + format = strtok(NULL, ""); + if (!format) + format = "villas.web"; + + /* Search for node whose name matches the URI. */ + c->node = list_lookup(&p.node.instances, node); + if (!c->node) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node"); + return -1; + } + + c->format = format_type_lookup(format); + if (!c->format) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown format"); + return -1; + } } - node = strtok(uri, "/."); - if (!node) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node"); + ret = websocket_connection_init(c); + if (ret) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Internal error"); return -1; } - format = strtok(NULL, ""); - if (!format) - format = "villas.web"; - - /* Search for node whose name matches the URI. */ - c->node = list_lookup(&p.node.instances, node); - if (!c->node) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node"); - return -1; - } - - struct format_type *fmt = format_type_lookup(format); - if (!fmt) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid format"); - return -1; - } - - ret = io_init(&c->io, fmt, c->node, SAMPLE_HAS_ALL); - if (ret) - return -1; - - ret = buffer_init(&c->buffers.recv, DEFAULT_WEBSOCKET_BUFFER_SIZE); - if (ret) - return -1; - - ret = buffer_init(&c->buffers.send, DEFAULT_WEBSOCKET_BUFFER_SIZE); - if (ret) - return ret; - - ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); - if (ret) - return -1; - list_push(&connections, c); debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c)); break; case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: - c->state = STATE_ERROR; + c->state = WEBSOCKET_CONNECTION_STATE_ERROR; warn("Failed to establish WebSocket connection: %s, reason=%s", websocket_connection_name(c), in ? (char *) in : "unkown"); @@ -205,40 +248,14 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi case LWS_CALLBACK_CLOSED: debug(LOG_WEBSOCKET | 10, "Closed WebSocket connection: %s", websocket_connection_name(c)); - if (c->state != STATE_SHUTDOWN) { + if (c->state != WEBSOCKET_CONNECTION_STATE_SHUTDOWN) { /** @todo Attempt reconnect here */ } if (connections.state == STATE_INITIALIZED) list_remove(&connections, c); - if (c->_name) - free(c->_name); - - /* Return all samples to pool */ - int avail; - struct sample *smp; - while ((avail = queue_pull(&c->queue, (void **) &smp))) - sample_put(smp); - - /* Destroy queue */ - ret = queue_destroy(&c->queue); - if (ret) - return ret; - - ret = io_destroy(&c->io); - if (ret) - return ret; - - ret = buffer_destroy(&c->buffers.recv); - if (ret) - return ret; - - ret = buffer_destroy(&c->buffers.send); - if (ret) - return ret; - - c->wsi = NULL; + websocket_connection_destroy(c); if (c->mode == WEBSOCKET_MODE_CLIENT) free(c); @@ -247,7 +264,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi case LWS_CALLBACK_CLIENT_WRITEABLE: case LWS_CALLBACK_SERVER_WRITEABLE: - if (c->state == STATE_SHUTDOWN) { + if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1; } @@ -323,7 +340,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi buffer_clear(&c->buffers.recv); - if (c->state == STATE_SHUTDOWN) { + if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1; } @@ -357,7 +374,7 @@ int websocket_deinit() for (size_t i = 0; i < list_length(&connections); i++) { struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); - c->state = STATE_SHUTDOWN; + c->state = WEBSOCKET_CONNECTION_STATE_SHUTDOWN; lws_callback_on_writable(c->wsi); } @@ -392,32 +409,16 @@ int websocket_start(struct node *n) struct websocket_destination *d = (struct websocket_destination *) list_at(&w->destinations, i); struct websocket_connection *c = (struct websocket_connection *) alloc(sizeof(struct websocket_connection)); - c->state = STATE_CONNECTING; - c->mode = WEBSOCKET_MODE_CLIENT; + c->state = WEBSOCKET_CONNECTION_STATE_CONNECTING; + c->node = n; + c->format = format_type_lookup("villas.web"); c->destination = d; - c->_name = NULL; - - struct format_type *fmt; - - fmt = format_type_lookup("villas-web"); /** @todo We could parse the format from the URI */ - if (!fmt) - return -1; - - ret = io_init(&c->io, fmt, n, SAMPLE_HAS_ALL); - if (ret) - return ret; d->info.context = web->context; d->info.vhost = web->vhost; d->info.userdata = c; - ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); - if (ret) - return -1; - - list_push(&connections, c); - lws_client_connect_via_info(&d->info); } @@ -429,33 +430,13 @@ int websocket_stop(struct node *n) int ret; struct websocket *w = (struct websocket *) n->_vd; - /* Wait for all connections to be closed */ - for (;;) { - int connecting = 0; - - for (int i = 0; i < list_length(&w->destinations); i++) { - struct websocket_destination *d = (struct websocket_destination *) list_at(&w->destinations, i); - struct websocket_connection *c = d->info.userdata; - - if (c->state == STATE_CONNECTING) - connecting++; - } - - if (connecting == 0) - break; - - debug(LOG_WEBSOCKET | 10, "Waiting for %d client connections to be established", connecting); - sleep(1); - } - for (size_t i = 0; i < list_length(&connections); i++) { struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); if (c->node != n) continue; - if (c->state != STATE_CONNECTING) - c->state = STATE_SHUTDOWN; + c->state = WEBSOCKET_CONNECTION_STATE_SHUTDOWN; lws_callback_on_writable(c->wsi); }