1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

websocket: associate received samples to correct node

This commit is contained in:
Steffen Vogel 2018-02-06 23:30:37 +01:00
parent e5afb8ec48
commit 8a6237fa98

View file

@ -31,8 +31,9 @@
#include <villas/utils.h>
#include <villas/buffer.h>
#include <villas/plugin.h>
#include <villas/io_format.h>
#include <villas/nodes/websocket.h>
#include <villas/io_format.h>
#include <villas/io/msg_format.h>
/* Private static storage */
static struct list connections = { .state = STATE_DESTROYED }; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
@ -247,11 +248,6 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
case LWS_CALLBACK_CLIENT_RECEIVE:
case LWS_CALLBACK_RECEIVE:
if (!c->node) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Catch-all connection can not receive.");
return -1;
}
if (lws_is_first_fragment(wsi))
buffer_clear(&c->buffers.recv);
@ -264,9 +260,37 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
/* We dont try to parse the frame yet, as we have to wait for the remaining fragments */
if (lws_is_final_fragment(wsi)) {
struct timespec ts_recv = time_now();
struct node *n = c->node;
struct websocket *w = (struct websocket *) c->node->_vd;
if (!n) {
/* This is a catch-all connection without a specific attached node
* Where dont know which node will be the receiver of these samples.
* Therefore we also dont know the format of this node.
* We blindly assume that the format is villas-binary and peek into the
* data here in order to get the node id
*/
if (c->buffers.recv.len < sizeof(struct msg))
goto out;
struct msg *m = (struct msg *) c->buffers.recv.buf;
for (size_t i = 0; i < list_length(&p.node.instances); i++) {
struct node *o = (struct node *) list_at(&p.node.instances, i);
if (o->id == m->id)
n = 0;
}
}
out: if (!n) {
warn("Failed to match incoming samples to a node");
break;
}
struct websocket *w = (struct websocket *) n->_vd;
struct sample **smps = alloca(cnt * sizeof(struct sample *));
if (!smps)
warn("Failed to allocate memory for connection: %s", websocket_connection_name(c));
ret = sample_alloc_many(&w->pool, smps, cnt);
if (ret != cnt) {