From 8a6237fa98d59752a57facfdbf78b57be99d25a4 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 6 Feb 2018 23:30:37 +0100 Subject: [PATCH] websocket: associate received samples to correct node --- lib/nodes/websocket.c | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index cb8f93894..77a663b07 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -31,8 +31,9 @@ #include #include #include -#include #include +#include +#include /* Private static storage */ static struct list connections = { .state = STATE_DESTROYED }; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */ @@ -247,11 +248,6 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi case LWS_CALLBACK_CLIENT_RECEIVE: case LWS_CALLBACK_RECEIVE: - if (!c->node) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Catch-all connection can not receive."); - return -1; - } - if (lws_is_first_fragment(wsi)) buffer_clear(&c->buffers.recv); @@ -264,9 +260,37 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi /* We dont try to parse the frame yet, as we have to wait for the remaining fragments */ if (lws_is_final_fragment(wsi)) { struct timespec ts_recv = time_now(); + struct node *n = c->node; - struct websocket *w = (struct websocket *) c->node->_vd; + if (!n) { + /* This is a catch-all connection without a specific attached node + * Where dont know which node will be the receiver of these samples. + * Therefore we also dont know the format of this node. + * We blindly assume that the format is villas-binary and peek into the + * data here in order to get the node id + */ + if (c->buffers.recv.len < sizeof(struct msg)) + goto out; + + struct msg *m = (struct msg *) c->buffers.recv.buf; + + for (size_t i = 0; i < list_length(&p.node.instances); i++) { + struct node *o = (struct node *) list_at(&p.node.instances, i); + + if (o->id == m->id) + n = 0; + } + } + +out: if (!n) { + warn("Failed to match incoming samples to a node"); + break; + } + + struct websocket *w = (struct websocket *) n->_vd; struct sample **smps = alloca(cnt * sizeof(struct sample *)); + if (!smps) + warn("Failed to allocate memory for connection: %s", websocket_connection_name(c)); ret = sample_alloc_many(&w->pool, smps, cnt); if (ret != cnt) {