diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 2b1ceb4f3..0696a3424 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -87,7 +87,11 @@ static int websocket_connection_init(struct websocket_connection *c) if (ret) return ret; - ret = io_init(&c->io, c->format, c->node, SAMPLE_HAS_ALL); + ret = io_init(&c->io, c->format, &c->node->signals, SAMPLE_HAS_ALL); + if (ret) + return ret; + + ret = io_check(&c->io); if (ret) return ret; @@ -204,12 +208,14 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/ if (strlen(uri) <= 0) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL"); + warn("Failed to get request URI"); return -1; } node = strtok(uri, "/."); if (!node) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node"); + warn("Failed to tokenize request URI"); return -1; } @@ -221,12 +227,14 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi c->node = list_lookup(&p.node.instances, node); if (!c->node) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node"); + warn("Failed to find node: node=%s", node); return -1; } c->format = format_type_lookup(format); if (!c->format) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown format"); + warn("Failed to find format: format=%s", format); return -1; } } @@ -234,6 +242,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi ret = websocket_connection_init(c); if (ret) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Internal error"); + warn("Failed to intialize websocket connection: reason=%d", ret); return -1; } @@ -268,12 +277,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi break; case LWS_CALLBACK_CLIENT_WRITEABLE: - case LWS_CALLBACK_SERVER_WRITEABLE: - if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); - return -1; - } - + case LWS_CALLBACK_SERVER_WRITEABLE: { struct sample **smps = alloca(cnt * sizeof(struct sample *)); pulled = queue_pull_many(&c->queue, (void **) smps, cnt); @@ -281,7 +285,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi size_t wbytes; io_sprint(&c->io, c->buffers.send.buf + LWS_PRE, c->buffers.send.size - LWS_PRE, &wbytes, smps, pulled); - ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->io.flags & FORMAT_TYPE_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); + ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->io.flags & IO_HAS_BINARY_PAYLOAD ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); sample_decref_many(smps, pulled); @@ -290,8 +294,13 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi if (queue_available(&c->queue) > 0) lws_callback_on_writable(wsi); + else if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); + return -1; + } break; + } case LWS_CALLBACK_CLIENT_RECEIVE: case LWS_CALLBACK_RECEIVE: @@ -332,7 +341,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi /* Set receive timestamp */ for (int i = 0; i < recvd; i++) { smps[i]->ts.received = ts_recv; - smps[i]->flags |= SAMPLE_HAS_RECEIVED; + smps[i]->flags |= SAMPLE_HAS_TS_RECEIVED; } enqueued = queue_signalled_push_many(&w->queue, (void **) smps, recvd); @@ -372,31 +381,6 @@ int websocket_type_start(struct super_node *sn) return 0; } -int websocket_type_stop() -{ - int ret; - - for (size_t i = 0; i < list_length(&connections); i++) { - struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); - - c->state = WEBSOCKET_CONNECTION_STATE_SHUTDOWN; - - lws_callback_on_writable(c->wsi); - } - - /* Wait for all connections to be closed */ - for (int i = 0; i < 10 && list_length(&connections) > 0; i++) { - info("Waiting for shutdown of %zu connections... %d/10", list_length(&connections), i+1); - sleep(1); - } - - ret = list_destroy(&connections, (dtor_cb_t) websocket_connection_destroy, false); - if (ret) - return ret; - - return 0; -} - int websocket_start(struct node *n) { int ret; @@ -411,13 +395,21 @@ int websocket_start(struct node *n) return ret; for (int i = 0; i < list_length(&w->destinations); i++) { + const char *format; struct websocket_destination *d = (struct websocket_destination *) list_at(&w->destinations, i); struct websocket_connection *c = (struct websocket_connection *) alloc(sizeof(struct websocket_connection)); c->state = WEBSOCKET_CONNECTION_STATE_CONNECTING; + format = strchr(d->info.path, '.'); + if (!format) + format = "villas.web"; + + c->format = format_type_lookup(format + 1); + if (!c->format) + return -1; + c->node = n; - c->format = format_type_lookup("villas.web"); c->destination = d; d->info.context = web->context; @@ -446,6 +438,25 @@ int websocket_stop(struct node *n) lws_callback_on_writable(c->wsi); } + /* Wait for all connections to be closed */ + for (int j = 1; j <= 10; j++) { + int open_connections = 0; + + /* Count open connections belonging to this node */ + for (int i = 0; i < list_length(&connections); i++) { + struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); + + if (c->node == n) + open_connections++; + } + + if (open_connections == 0) + break; + + info("Waiting for shutdown of %zu connections... %d/10", list_length(&connections), j); + sleep(1); + } + ret = queue_signalled_destroy(&w->queue); if (ret) return ret; @@ -601,7 +612,6 @@ static struct plugin p = { .vectorize = 0, /* unlimited */ .size = sizeof(struct websocket), .type.start = websocket_type_start, - .type.stop = websocket_type_stop, .start = websocket_start, .stop = websocket_stop, .destroy = websocket_destroy,