diff --git a/lib/advio.c b/lib/advio.c index ae95fd094..dff6ea9bf 100644 --- a/lib/advio.c +++ b/lib/advio.c @@ -173,16 +173,16 @@ int aislocal(const char *uri) { char *sep; const char *supported_schemas[] = { "file", "http", "https", "tftp", "ftp", "scp", "sftp", "smb", "smbs" }; - + sep = strstr(uri, "://"); if (!sep) return 1; /* no schema, we assume its a local file */ - + for (int i = 0; i < ARRAY_LEN(supported_schemas); i++) { if (!strncmp(supported_schemas[i], uri, sep - uri)) return 0; } - + return -1; /* none of the supported schemas match. this is an invalid uri */ } @@ -210,6 +210,8 @@ AFILE * afopen(const char *uri, const char *mode) cwd = getcwd(NULL, 0); af->uri = strf("file://%s/%s", cwd, uri); + + free(cwd); } else af->uri = strf("file://%s", uri); @@ -261,7 +263,7 @@ int afclose(AFILE *af) ret = afflush(af); if (ret) return ret; - + curl_easy_cleanup(af->curl); ret = fclose(af->file); diff --git a/lib/api.c b/lib/api.c index 3571ae0f7..04870ba7a 100644 --- a/lib/api.c +++ b/lib/api.c @@ -60,11 +60,11 @@ int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void * ret = api_session_init(s, API_MODE_WS); if (ret) return -1; - + s->state = API_SESSION_STATE_ESTABLISHED; s->wsi = wsi; s->api = w->api; - + list_push(&s->api->sessions, s); debug(LOG_API, "Initiated API session: %s", api_session_name(s)); @@ -77,7 +77,7 @@ int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void * ret = api_session_destroy(s); if (ret) return -1; - + if (w->api->sessions.state == STATE_INITIALIZED) list_remove(&w->api->sessions, s); @@ -86,14 +86,14 @@ int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void * case LWS_CALLBACK_RECEIVE: if (lws_is_first_fragment(wsi)) buffer_clear(&s->request.buffer); - + buffer_append(&s->request.buffer, in, len); - + if (lws_is_final_fragment(wsi)) { ret = buffer_parse_json(&s->request.buffer, &req); if (ret) break; - + pushed = queue_push(&s->request.queue, req); if (pushed != 1) warn("Queue overun in API session"); @@ -104,22 +104,23 @@ int api_ws_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void * } break; - + case LWS_CALLBACK_SERVER_WRITEABLE: if (s->state == API_SESSION_STATE_SHUTDOWN) return -1; - - pulled = queue_pull(&s->response.queue, (void **) &resp); - if (pulled < 1) - break; - - char pad[LWS_PRE]; - - buffer_clear(&s->response.buffer); - buffer_append(&s->response.buffer, pad, sizeof(pad)); - buffer_append_json(&s->response.buffer, resp); - lws_write(wsi, (unsigned char *) s->response.buffer.buf + LWS_PRE, s->response.buffer.len - LWS_PRE, LWS_WRITE_TEXT); + pulled = queue_pull(&s->response.queue, (void **) &resp); + if (pulled) { + char pad[LWS_PRE]; + + buffer_clear(&s->response.buffer); + buffer_append(&s->response.buffer, pad, sizeof(pad)); + buffer_append_json(&s->response.buffer, resp); + + json_decref(resp); + + lws_write(wsi, (unsigned char *) s->response.buffer.buf + LWS_PRE, s->response.buffer.len - LWS_PRE, LWS_WRITE_TEXT); + } break; default: @@ -156,7 +157,7 @@ int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void s->state = API_SESSION_STATE_ESTABLISHED; s->wsi = wsi; s->api = w->api; - + list_push(&s->api->sessions, s); debug(LOG_API, "Initiated API session: %s", api_session_name(s)); @@ -166,16 +167,16 @@ int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void case LWS_CALLBACK_CLOSED_HTTP: if (!s) return -1; - + debug(LOG_API, "Closed API session: %s, runs=%d", api_session_name(s), s->runs); ret = api_session_destroy(s); if (ret) return -1; - + if (w->api->sessions.state == STATE_INITIALIZED) list_remove(&w->api->sessions, s); - + break; case LWS_CALLBACK_HTTP_BODY: @@ -189,7 +190,7 @@ int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void break; buffer_clear(&s->request.buffer); - + pushed = queue_push(&s->request.queue, req); if (pushed != 1) warn("Queue overrun for API session: %s", api_session_name(s)); @@ -211,10 +212,12 @@ int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void "Access-Control-Allow-Headers: Content-Type\r\n" "Access-Control-Max-Age: 86400\r\n" "\r\n"; - + buffer_clear(&s->response.buffer); buffer_append_json(&s->response.buffer, resp); - + + json_decref(resp); + lws_write(wsi, (unsigned char *) headers, strlen(headers), LWS_WRITE_HTTP_HEADERS); lws_write(wsi, (unsigned char *) s->response.buffer.buf, s->response.buffer.len, LWS_WRITE_HTTP); @@ -239,7 +242,7 @@ int api_init(struct api *a, struct super_node *sn) ret = list_init(&a->sessions); if (ret) return ret; - + ret = queue_signalled_init(&a->pending, 1024, &memtype_heap, 0); if (ret) return ret; @@ -255,7 +258,7 @@ int api_destroy(struct api *a) int ret; assert(a->state != STATE_STARTED); - + ret = queue_signalled_destroy(&a->pending); if (ret) return ret; @@ -270,7 +273,7 @@ int api_start(struct api *a) int ret; info("Starting API sub-system"); - + ret = pthread_create(&a->thread, NULL, api_worker, a); if (ret) error("Failed to start API worker thread"); @@ -285,18 +288,18 @@ int api_stop(struct api *a) int ret; info("Stopping API sub-system"); - + if (a->state != STATE_STARTED) return 0; - + for (int i = 0; i < list_length(&a->sessions); i++) { struct api_session *s = list_at(&a->sessions, i); - + s->state = API_SESSION_STATE_SHUTDOWN; - + lws_callback_on_writable(s->wsi); } - + for (int i = 0; i < 10 && list_length(&a->sessions) > 0; i++) { INDENT info("Wait for API sessions to close"); usleep(1 * 1e6); @@ -305,7 +308,7 @@ int api_stop(struct api *a) ret = list_destroy(&a->sessions, (dtor_cb_t) api_session_destroy, false); if (ret) return ret; - + ret = pthread_cancel(a->thread); if (ret) serror("Failed to cancel API worker thread"); @@ -325,7 +328,7 @@ static void * api_worker(void *ctx) struct api *a = ctx; struct api_session *s; - + json_t *req, *resp; for (;;) { @@ -336,11 +339,13 @@ static void * api_worker(void *ctx) queue_pull(&s->request.queue, (void **) &req); api_session_run_command(s, req, &resp); - + + json_decref(req); + queue_push(&s->response.queue, resp); - + lws_callback_on_writable(s->wsi); } - + return NULL; } diff --git a/lib/api/actions/config.c b/lib/api/actions/config.c index 6eb977051..c3f86021d 100644 --- a/lib/api/actions/config.c +++ b/lib/api/actions/config.c @@ -27,7 +27,7 @@ static int api_config(struct api_action *h, json_t *args, json_t **resp, struct api_session *s) { - *resp = s->api->super_node->cfg; + *resp = json_incref(s->api->super_node->cfg); return 0; } diff --git a/lib/api/actions/nodes.c b/lib/api/actions/nodes.c index 220a3a2dd..0c4a9ea70 100644 --- a/lib/api/actions/nodes.c +++ b/lib/api/actions/nodes.c @@ -46,7 +46,7 @@ static int api_nodes(struct api_action *r, json_t *args, json_t **resp, struct a ); if (n->stats) - json_object_set(json_node, "stats", stats_json(n->stats)); + json_object_set_new(json_node, "stats", stats_json(n->stats)); /* Add all additional fields of node here. * This can be used for metadata */ diff --git a/lib/api/session.c b/lib/api/session.c index 79bb83d41..7b084ac8c 100644 --- a/lib/api/session.c +++ b/lib/api/session.c @@ -32,15 +32,15 @@ int api_session_init(struct api_session *s, enum api_mode m) s->runs = 0; s->mode = m; - + ret = buffer_init(&s->request.buffer, 0); if (ret) return ret; - + ret = buffer_init(&s->response.buffer, 0); if (ret) return ret; - + ret = queue_init(&s->request.queue, 128, &memtype_heap); if (ret) return ret; @@ -48,7 +48,7 @@ int api_session_init(struct api_session *s, enum api_mode m) ret = queue_init(&s->response.queue, 128, &memtype_heap); if (ret) return ret; - + s->_name = NULL; return 0; @@ -65,7 +65,7 @@ int api_session_destroy(struct api_session *s) ret = buffer_destroy(&s->response.buffer); if (ret) return ret; - + ret = queue_destroy(&s->request.queue); if (ret) return ret; @@ -73,7 +73,7 @@ int api_session_destroy(struct api_session *s) ret = queue_destroy(&s->response.queue); if (ret) return ret; - + if (s->_name) free(s->_name); @@ -128,7 +128,7 @@ int api_session_run_command(struct api_session *s, json_t *json_in, json_t **jso "id", id); if (json_resp) - json_object_set(*json_out, "response", json_resp); + json_object_set_new(*json_out, "response", json_resp); out: debug(LOG_API, "Completed API request: action=%s, id=%s, code=%d", action, id, ret); @@ -141,24 +141,24 @@ char * api_session_name(struct api_session *s) { if (!s->_name) { char *mode; - + switch (s->mode) { case API_MODE_WS: mode = "ws"; break; case API_MODE_HTTP: mode = "http"; break; default: mode = "unknown"; break; } - + strcatf(&s->_name, "version=%d, mode=%s", s->version, mode); - + if (s->wsi) { char name[128]; char ip[128]; - + lws_get_peer_addresses(s->wsi, lws_get_socket_fd(s->wsi), name, sizeof(name), ip, sizeof(ip)); - + strcatf(&s->_name, ", remote.name=%s, remote.ip=%s", name, ip); } } - + return s->_name; } diff --git a/lib/mapping.c b/lib/mapping.c index fe43d6b7d..3463c8cc5 100644 --- a/lib/mapping.c +++ b/lib/mapping.c @@ -205,7 +205,7 @@ int mapping_parse_list(struct list *l, json_t *cfg, struct list *nodes) json_array_append(json_mapping, cfg); } else if (json_is_array(cfg)) - json_mapping = cfg; + json_mapping = json_incref(cfg); else return -1; @@ -215,7 +215,7 @@ int mapping_parse_list(struct list *l, json_t *cfg, struct list *nodes) ret = mapping_parse(e, json_entry, nodes); if (ret) - return ret; + goto out; e->offset = off; off += e->length; @@ -223,7 +223,12 @@ int mapping_parse_list(struct list *l, json_t *cfg, struct list *nodes) list_push(l, e); } - return 0; + ret = 0; + +out: + json_decref(json_mapping); + + return ret; } int mapping_update(struct mapping_entry *me, struct sample *remapped, struct sample *original, struct stats *s) diff --git a/lib/nodes/stats.c b/lib/nodes/stats.c index 6f7a160c2..6b1b842bf 100644 --- a/lib/nodes/stats.c +++ b/lib/nodes/stats.c @@ -105,6 +105,16 @@ int stats_node_parse(struct node *n, json_t *cfg) return 0; } +int stats_node_destroy(struct node *n) +{ + struct stats_node *s = n->_vd; + + if (s->node_str) + free(&s->node_str); + + return 0; +} + int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt) { struct stats_node *sn = n->_vd; @@ -152,6 +162,7 @@ static struct plugin p = { .size = sizeof(struct stats_node), .init = stats_node_init, .parse = stats_node_parse, + .destroy= stats_node_destroy, .print = stats_node_print, .start = stats_node_start, .stop = stats_node_stop, diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 6fc4a2120..d6f55eb8d 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -197,7 +197,8 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi /** @todo Attempt reconnect here */ } - list_remove(&connections, c); + if (connections.state == STATE_INITIALIZED) + list_remove(&connections, c); if (c->_name) free(c->_name); diff --git a/lib/path.c b/lib/path.c index 14cc69ec3..5d4c2f07c 100644 --- a/lib/path.c +++ b/lib/path.c @@ -58,6 +58,10 @@ static int path_source_destroy(struct path_source *ps) if (ret) return ret; + ret = list_destroy(&ps->mappings, NULL, true); + if (ret) + return ret; + return 0; } @@ -330,7 +334,7 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) list_init(&sources); list_init(&destinations); - ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: i, s?: i }", + ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: i }", "in", &json_in, "out", &json_out, "hooks", &json_hooks, diff --git a/lib/stats.c b/lib/stats.c index 29531e2ea..a2f4ad6ac 100644 --- a/lib/stats.c +++ b/lib/stats.c @@ -100,11 +100,10 @@ json_t * stats_json(struct stats *s) json_t *obj = json_object(); for (int i = 0; i < STATS_COUNT; i++) { - struct stats_desc *desc = &stats_metrics[i]; + struct stats_desc *d = &stats_metrics[i]; + struct hist *h = &s->histograms[i]; - json_t *stats = hist_json(&s->histograms[i]); - - json_object_set(obj, desc->name, stats); + json_object_set_new(obj, d->name, hist_json(h)); } return obj; diff --git a/lib/web.c b/lib/web.c index 6a479cc0e..aaf35591d 100644 --- a/lib/web.c +++ b/lib/web.c @@ -79,7 +79,6 @@ static struct lws_protocols protocols[] = { /** List of libwebsockets mounts. */ static struct lws_http_mount mounts[] = { { - .mount_next = &mounts[1], .mountpoint = "/", .origin = NULL, .def = "/index.html", @@ -90,11 +89,11 @@ static struct lws_http_mount mounts[] = { .cache_revalidate = 0, .cache_intermediaries = 0, .origin_protocol = LWSMPRO_FILE, - .mountpoint_len = 1 - }, + .mountpoint_len = 1, #ifdef WITH_API + .mount_next = &mounts[1] + }, { - .mount_next = NULL, .mountpoint = "/api/v1/", .origin = "http-api", .def = NULL, @@ -105,9 +104,10 @@ static struct lws_http_mount mounts[] = { .cache_revalidate = 0, .cache_intermediaries = 0, .origin_protocol = LWSMPRO_CALLBACK, - .mountpoint_len = 8 - } + .mountpoint_len = 8, #endif + .mount_next = NULL + } }; /** List of libwebsockets extensions. */ diff --git a/tests/integration/node-mux_demux.sh b/tests/integration/node-mux_demux.sh new file mode 100755 index 000000000..3ae9c22b0 --- /dev/null +++ b/tests/integration/node-mux_demux.sh @@ -0,0 +1,101 @@ +#!/bin/bash +# +# Integration loopback test using villas-node. +# +# @author Steffen Vogel +# @copyright 2017, Institute for Automation of Complex Power Systems, EONERC +# @license GNU General Public License (version 3) +# +# VILLASnode +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +################################################################################## + +SCRIPT=$(realpath $0) +SCRIPTPATH=$(dirname ${SCRIPT}) +source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh + +CONFIG_FILE=$(mktemp) +OUTPUT_FILE=$(mktemp) + +cat > ${CONFIG_FILE} < +# @copyright 2017, Institute for Automation of Complex Power Systems, EONERC +# @license GNU General Public License (version 3) +# +# VILLASnode +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +################################################################################## + +SCRIPT=$(realpath $0) +SCRIPTPATH=$(dirname ${SCRIPT}) +source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh + +CONFIG_FILE=$(mktemp) + +cat > ${CONFIG_FILE} <