diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 77a663b07..cb6d3f10e 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -132,10 +132,6 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi * Example: ws://example.com/node_1.json * Will select the node with the name 'node_1' * and format 'json'. - * - * If the node name is omitted, this connection - * will receive sample data from all websocket nodes - * (catch all). */ /* Get path of incoming request */ @@ -149,22 +145,22 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi } node = strtok(uri, "/."); - if (!node || strlen(node) == 0) - c->node = NULL; - else { - format = strtok(NULL, ""); - - /* Search for node whose name matches the URI. */ - c->node = list_lookup(&p.node.instances, node); - if (c->node == NULL) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node"); - return -1; - } + if (!node) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node"); + return -1; } + format = strtok(NULL, ""); if (!format) format = "villas-web"; + /* Search for node whose name matches the URI. */ + c->node = list_lookup(&p.node.instances, node); + if (!c->node) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node"); + return -1; + } + c->format = io_format_lookup(format); if (!c->format) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid format"); @@ -181,7 +177,6 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi list_push(&connections, c); debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c)); - break; case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: @@ -262,31 +257,6 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi struct timespec ts_recv = time_now(); struct node *n = c->node; - if (!n) { - /* This is a catch-all connection without a specific attached node - * Where dont know which node will be the receiver of these samples. - * Therefore we also dont know the format of this node. - * We blindly assume that the format is villas-binary and peek into the - * data here in order to get the node id - */ - if (c->buffers.recv.len < sizeof(struct msg)) - goto out; - - struct msg *m = (struct msg *) c->buffers.recv.buf; - - for (size_t i = 0; i < list_length(&p.node.instances); i++) { - struct node *o = (struct node *) list_at(&p.node.instances, i); - - if (o->id == m->id) - n = 0; - } - } - -out: if (!n) { - warn("Failed to match incoming samples to a node"); - break; - } - struct websocket *w = (struct websocket *) n->_vd; struct sample **smps = alloca(cnt * sizeof(struct sample *)); if (!smps) @@ -472,10 +442,8 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt) if (avail < 0) return avail; - for (int i = 0; i < avail; i++) { - sample_copy(smps[i], cpys[i]); - sample_put(cpys[i]); - } + sample_copy_many(smps, cpys, avail); + sample_put_many(cpys, avail); return avail; } @@ -492,17 +460,12 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) if (avail < cnt) warn("Pool underrun for node %s: avail=%u", node_name(n), avail); - for (int i = 0; i < avail; i++) { - sample_copy(cpys[i], smps[i]); - - cpys[i]->source = n; - cpys[i]->id = n->id; - } + sample_copy_many(cpys, smps, avail); for (size_t i = 0; i < list_length(&connections); i++) { struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); - if (c->node == n || c->node == NULL) + if (c->node == n) websocket_connection_write(c, cpys, cnt); }