1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'develop' of git.rwth-aachen.de:acs/public/villas/VILLASnode into develop

This commit is contained in:
Steffen Vogel 2018-05-26 01:21:23 +02:00
commit 0679985847
2 changed files with 31 additions and 8 deletions

View file

@ -93,10 +93,21 @@ static int node_direction_parse(struct node_direction *nd, struct node *n, json_
{
int ret;
json_error_t err;
json_t *json_hooks = NULL;
json_t *json_signals = NULL;
json_error_t err;
nd->cfg = cfg;
/* Before we start parsing, we will fill in a few default settings from the node config */
const char *fields[] = { "builtin", "vectorize", "signals", "hooks" };
for (int i = 0; i < ARRAY_LEN(fields); i++) {
json_t *json_field_dir = json_object_get(cfg, fields[i]);
json_t *json_field_node = json_object_get(n->cfg, fields[i]);
if (json_field_node && !json_field_dir)
json_object_set(cfg, fields[i], json_field_node);
}
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b }",
"hooks", &json_hooks,
@ -228,6 +239,8 @@ int node_parse(struct node *n, json_t *cfg, const char *name)
nt = node_type_lookup(type);
assert(nt == n->_vt);
n->cfg = cfg;
if (json_in) {
ret = node_direction_parse(&n->in, n, json_in);
if (ret)
@ -244,7 +257,6 @@ int node_parse(struct node *n, json_t *cfg, const char *name)
if (ret)
error("Failed to parse node '%s'", node_name(n));
n->cfg = cfg;
n->state = STATE_PARSED;
return ret;

View file

@ -290,8 +290,10 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
int avail, enqueued;
struct websocket *w = (struct websocket *) n->_vd;
struct sample **smps = alloca(cnt * sizeof(struct sample *));
if (!smps)
if (!smps) {
warn("Failed to allocate memory for connection: %s", websocket_connection_name(c));
break;
}
avail = sample_alloc_many(&w->pool, smps, cnt);
if (avail < cnt)
@ -303,7 +305,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
break;
}
debug(LOG_WEBSOCKET | 10, "Received %d samples to connection: %s", recvd, websocket_connection_name(c));
debug(LOG_WEBSOCKET | 10, "Received %d samples from connection: %s", recvd, websocket_connection_name(c));
/* Set receive timestamp */
for (int i = 0; i < recvd; i++) {
@ -313,12 +315,14 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
enqueued = queue_signalled_push_many(&w->queue, (void **) smps, recvd);
if (enqueued < recvd)
warn("Queue overrun for connection: %s", websocket_connection_name(c));
warn("Queue overrun in connection: %s", websocket_connection_name(c));
/* Release unused samples back to pool */
if (enqueued < avail)
sample_put_many(&smps[enqueued], avail - enqueued);
buffer_clear(&c->buffers.recv);
if (c->state == STATE_SHUTDOWN) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
return -1;
@ -348,6 +352,8 @@ int websocket_init(struct super_node *sn)
int websocket_deinit()
{
int ret;
for (size_t i = 0; i < list_length(&connections); i++) {
struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i);
@ -358,11 +364,13 @@ int websocket_deinit()
/* Wait for all connections to be closed */
while (list_length(&connections) > 0) {
info("Waiting for WebSocket connection shutdown");
info("Waiting for shutdown of %zu connections", list_length(&connections));
sleep(1);
}
list_destroy(&connections, (dtor_cb_t) websocket_destination_destroy, true);
ret = list_destroy(&connections, (dtor_cb_t) websocket_destination_destroy, true);
if (ret)
return ret;
return 0;
}
@ -466,8 +474,11 @@ int websocket_stop(struct node *n)
int websocket_destroy(struct node *n)
{
struct websocket *w = (struct websocket *) n->_vd;
int ret;
list_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true);
ret = list_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true);
if (ret)
return ret;
return 0;
}