From 6a7821467a22b04903e13ae5904dce1754d2fa23 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 01:10:33 +0200 Subject: [PATCH 1/5] node: inherit certain configuration settings for directions --- lib/node.c | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/lib/node.c b/lib/node.c index 69320ff0b..40837c00f 100644 --- a/lib/node.c +++ b/lib/node.c @@ -93,10 +93,21 @@ static int node_direction_parse(struct node_direction *nd, struct node *n, json_ { int ret; + json_error_t err; json_t *json_hooks = NULL; json_t *json_signals = NULL; - json_error_t err; + nd->cfg = cfg; + + /* Before we start parsing, we will fill in a few default settings from the node config */ + const char *fields[] = { "builtin", "vectorize", "signals", "hooks" }; + for (int i = 0; i < ARRAY_LEN(fields); i++) { + json_t *json_field_dir = json_object_get(cfg, fields[i]); + json_t *json_field_node = json_object_get(n->cfg, fields[i]); + + if (json_field_node && !json_field_dir) + json_object_set(cfg, fields[i], json_field_node); + } ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b }", "hooks", &json_hooks, @@ -228,6 +239,8 @@ int node_parse(struct node *n, json_t *cfg, const char *name) nt = node_type_lookup(type); assert(nt == n->_vt); + n->cfg = cfg; + if (json_in) { ret = node_direction_parse(&n->in, n, json_in); if (ret) @@ -244,7 +257,6 @@ int node_parse(struct node *n, json_t *cfg, const char *name) if (ret) error("Failed to parse node '%s'", node_name(n)); - n->cfg = cfg; n->state = STATE_PARSED; return ret; From a7bd77944f3fddb4fa4c1022701ac4caf3704e46 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 01:12:00 +0200 Subject: [PATCH 2/5] websocket: fix possible segmention fault --- lib/nodes/websocket.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index ee8e4d78a..1494851c6 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -290,8 +290,10 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi int avail, enqueued; struct websocket *w = (struct websocket *) n->_vd; struct sample **smps = alloca(cnt * sizeof(struct sample *)); - if (!smps) + if (!smps) { warn("Failed to allocate memory for connection: %s", websocket_connection_name(c)); + break; + } avail = sample_alloc_many(&w->pool, smps, cnt); if (avail < cnt) From 8819c4101f5c945a68b3d1c0e6f240f0e8727980 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 01:13:22 +0200 Subject: [PATCH 3/5] websocket: clear buffer of received data once completely parsed --- lib/nodes/websocket.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 1494851c6..429e100b0 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -321,6 +321,8 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi if (enqueued < avail) sample_put_many(&smps[enqueued], avail - enqueued); + buffer_clear(&c->buffers.recv); + if (c->state == STATE_SHUTDOWN) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1; From 18a1daede82a3d79d7911ef9a4dedfb755b629a3 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 01:15:23 +0200 Subject: [PATCH 4/5] websocket: check return value of list_destroy() calls --- lib/nodes/websocket.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 429e100b0..ea2a5f2c0 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -352,6 +352,8 @@ int websocket_init(struct super_node *sn) int websocket_deinit() { + int ret; + for (size_t i = 0; i < list_length(&connections); i++) { struct websocket_connection *c = (struct websocket_connection *) list_at(&connections, i); @@ -366,7 +368,9 @@ int websocket_deinit() sleep(1); } - list_destroy(&connections, (dtor_cb_t) websocket_destination_destroy, true); + ret = list_destroy(&connections, (dtor_cb_t) websocket_destination_destroy, true); + if (ret) + return ret; return 0; } @@ -470,8 +474,11 @@ int websocket_stop(struct node *n) int websocket_destroy(struct node *n) { struct websocket *w = (struct websocket *) n->_vd; + int ret; - list_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true); + ret = list_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true); + if (ret) + return ret; return 0; } From cc0bca6ab296be6c3f0cbf12e3540c6239b19307 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 01:15:34 +0200 Subject: [PATCH 5/5] websocket: improve log messages --- lib/nodes/websocket.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index ea2a5f2c0..600f8f87e 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -305,7 +305,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi break; } - debug(LOG_WEBSOCKET | 10, "Received %d samples to connection: %s", recvd, websocket_connection_name(c)); + debug(LOG_WEBSOCKET | 10, "Received %d samples from connection: %s", recvd, websocket_connection_name(c)); /* Set receive timestamp */ for (int i = 0; i < recvd; i++) { @@ -315,7 +315,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi enqueued = queue_signalled_push_many(&w->queue, (void **) smps, recvd); if (enqueued < recvd) - warn("Queue overrun for connection: %s", websocket_connection_name(c)); + warn("Queue overrun in connection: %s", websocket_connection_name(c)); /* Release unused samples back to pool */ if (enqueued < avail) @@ -364,7 +364,7 @@ int websocket_deinit() /* Wait for all connections to be closed */ while (list_length(&connections) > 0) { - info("Waiting for WebSocket connection shutdown"); + info("Waiting for shutdown of %zu connections", list_length(&connections)); sleep(1); }