diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index cb8f93894..77a663b07 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -31,8 +31,9 @@ #include #include #include -#include #include +#include +#include /* Private static storage */ static struct list connections = { .state = STATE_DESTROYED }; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */ @@ -247,11 +248,6 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi case LWS_CALLBACK_CLIENT_RECEIVE: case LWS_CALLBACK_RECEIVE: - if (!c->node) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Catch-all connection can not receive."); - return -1; - } - if (lws_is_first_fragment(wsi)) buffer_clear(&c->buffers.recv); @@ -264,9 +260,37 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi /* We dont try to parse the frame yet, as we have to wait for the remaining fragments */ if (lws_is_final_fragment(wsi)) { struct timespec ts_recv = time_now(); + struct node *n = c->node; - struct websocket *w = (struct websocket *) c->node->_vd; + if (!n) { + /* This is a catch-all connection without a specific attached node + * Where dont know which node will be the receiver of these samples. + * Therefore we also dont know the format of this node. + * We blindly assume that the format is villas-binary and peek into the + * data here in order to get the node id + */ + if (c->buffers.recv.len < sizeof(struct msg)) + goto out; + + struct msg *m = (struct msg *) c->buffers.recv.buf; + + for (size_t i = 0; i < list_length(&p.node.instances); i++) { + struct node *o = (struct node *) list_at(&p.node.instances, i); + + if (o->id == m->id) + n = 0; + } + } + +out: if (!n) { + warn("Failed to match incoming samples to a node"); + break; + } + + struct websocket *w = (struct websocket *) n->_vd; struct sample **smps = alloca(cnt * sizeof(struct sample *)); + if (!smps) + warn("Failed to allocate memory for connection: %s", websocket_connection_name(c)); ret = sample_alloc_many(&w->pool, smps, cnt); if (ret != cnt) {