From fb0547a1635e9a69ef99f895aa67ee714aab73c4 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Thu, 24 May 2018 15:50:43 +0200 Subject: [PATCH 01/11] mqtt: do not show username if not provided --- lib/nodes/mqtt.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index 17dca2585..f48106661 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -302,14 +302,17 @@ char * mqtt_print(struct node *n) char *buf = NULL; - strcatf(&buf, "format=%s, host=%s, port=%d, username=%s, keepalive=%s, ssl=%s", plugin_name(m->format), + strcatf(&buf, "format=%s, host=%s, port=%d, keepalive=%s, ssl=%s", plugin_name(m->format), m->host, m->port, - m->username, m->keepalive ? "yes" : "no", m->ssl.enabled ? "yes" : "no" ); + /* Only show if not default */ + if (m->username) + strcatf(&buf, ", username=%s", m->username); + if (m->publish) strcatf(&buf, ", publish=%s", m->publish); From 0273e97c06dea3226d55fb948649e37afb40c3a2 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Fri, 25 May 2018 12:55:01 +0200 Subject: [PATCH 02/11] signal: add enable flag --- include/villas/signal.h | 1 + lib/signal.c | 13 ++++++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/include/villas/signal.h b/include/villas/signal.h index ec9e5b8f4..56502e8f3 100644 --- a/include/villas/signal.h +++ b/include/villas/signal.h @@ -32,6 +32,7 @@ struct node; struct signal { char *name; /**< The name of the signal. */ char *unit; + int enabled; enum { SIGNAL_FORMAT_INTEGER, SIGNAL_FORMAT_REAL diff --git a/lib/signal.c b/lib/signal.c index 15c045fdf..cf3dbcf1e 100644 --- a/lib/signal.c +++ b/lib/signal.c @@ -42,9 +42,13 @@ int signal_parse(struct signal *s, json_t *cfg) const char *name; const char *unit = NULL; - ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s }", + /* Default values */ + s->enabled = true; + + ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: b }", "name", &name, - "unit", &unit + "unit", &unit, + "enabled", &s->enabled ); if (ret) return -1; @@ -60,6 +64,7 @@ int signal_parse(struct signal *s, json_t *cfg) int signal_parse_list(struct list *list, json_t *cfg) { int ret; + struct signal *s; if (!json_is_array(cfg)) return -1; @@ -67,7 +72,9 @@ int signal_parse_list(struct list *list, json_t *cfg) size_t index; json_t *json_signal; json_array_foreach(cfg, index, json_signal) { - struct signal *s = alloc(sizeof(struct signal)); + s = alloc(sizeof(struct signal)); + if (!s) + return -1; ret = signal_parse(s, json_signal); if (ret) From 5ed12a45906a09673492d1535070d1294275fb9c Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Fri, 25 May 2018 12:56:29 +0200 Subject: [PATCH 03/11] json.reserve: skip disabled signals --- lib/formats/json_reserve.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/formats/json_reserve.c b/lib/formats/json_reserve.c index 58e0b77b3..e20fccb5a 100644 --- a/lib/formats/json_reserve.c +++ b/lib/formats/json_reserve.c @@ -51,6 +51,9 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm sig = NULL; if (sig) { + if (!sig->enabled) + continue; + json_name = json_string(sig->name); json_unit = json_string(sig->unit); } @@ -141,8 +144,16 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa if (ret) return -1; - idx = list_lookup_index(io->input.signals, name); - if (idx < 0) { + struct signal *sig; + + sig = (struct signal *) list_lookup(io->input.signals, name); + if (sig) { + if (!sig->enabled) + continue; + + idx = list_index(io->input.signals, sig); + } + else { ret = sscanf(name, "signal_%d", &idx); if (ret != 1) continue; From 002ef7bdac60d41edd8fdb21966d65f7e49a9ef4 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Fri, 25 May 2018 12:57:31 +0200 Subject: [PATCH 04/11] json.reserve: accept integers as timestamps --- lib/formats/json_reserve.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/formats/json_reserve.c b/lib/formats/json_reserve.c index e20fccb5a..5daba7e9f 100644 --- a/lib/formats/json_reserve.c +++ b/lib/formats/json_reserve.c @@ -135,7 +135,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa const char *name, *unit = NULL; double value; - ret = json_unpack_ex(json_value, &err, 0, "{ s: s, s?: s, s: f, s?: f }", + ret = json_unpack_ex(json_value, &err, 0, "{ s: s, s?: s, s: f, s?: F }", "name", &name, "unit", &unit, "value", &value, From 3fad36b62bd14eb2884ff5439e7643707bc9873d Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 01:23:57 +0200 Subject: [PATCH 05/11] json.reserve: add support for numeric target field --- lib/formats/json_reserve.c | 56 ++++++++++++++++++++++++++++++++------ 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/lib/formats/json_reserve.c b/lib/formats/json_reserve.c index 5daba7e9f..bf5bf7d6b 100644 --- a/lib/formats/json_reserve.c +++ b/lib/formats/json_reserve.c @@ -29,6 +29,8 @@ #include #include +#define JSON_RESERVE_INTEGER_TARGET 1 + static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *smp) { json_error_t err; @@ -93,10 +95,26 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm if (*j == NULL) return -1; +#ifdef JSON_RESERVE_INTEGER_TARGET + if (io->output.node) { + char *endptr; + char *id_str = strrchr(io->output.node->name, '_'); + if (!id_str) + return -1; + + int id = strtoul(id_str+1, &endptr, 10); + if (endptr[0] != 0) + return -1; + + json_object_set_new(*j, "target", json_integer(id)); + } +#else if (io->output.node) json_object_set_new(*j, "target", json_string(io->output.node->name)); - //if (smp->source) - // json_object_set_new(*j, "origin", json_string(smp->source->name)); + + if (smp->source) + json_object_set_new(*j, "origin", json_string(smp->source->name)); +#endif return 0; } @@ -107,23 +125,45 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa double created = -1; json_error_t err; json_t *json_value, *json_data = NULL; + json_t *json_origin = NULL, *json_target = NULL; size_t i; - const char *origin = NULL, *target = NULL; - - ret = json_unpack_ex(json_smp, &err, 0, "{ s?: s, s?: s, s?: o, s?: o }", - "origin", &origin, - "target", &target, + ret = json_unpack_ex(json_smp, &err, 0, "{ s?: o, s?: o, s?: o, s?: o }", + "origin", &json_origin, + "target", &json_target, "measurements", &json_data, "setpoints", &json_data ); if (ret) return -1; - if (target && io->input.node) { +#ifdef JSON_RESERVE_INTEGER_TARGET + if (json_target && io->input.node) { + if (!json_is_integer(json_target)) + return -1; + + char *endptr; + char *id_str = strrchr(io->input.node->name, '_'); + if (!id_str) + return -1; + + int id = strtoul(id_str+1, &endptr, 10); + if (endptr[0] != 0) + return -1; + + if (id != json_integer_value(json_target)) + return -1; + } +#else + if (json_target && io->input.node) { + const char *target = json_string_value(json_target); + if (!target) + return -1; + if (strcmp(target, io->input.node->name)) return -1; } +#endif if (!json_data || !json_is_array(json_data)) return -1; From f054d92845c645d376446a5da770890de27bfb1a Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 02:23:00 +0200 Subject: [PATCH 06/11] json.reserve: fix parsing of timestamp and values --- lib/formats/json_reserve.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/formats/json_reserve.c b/lib/formats/json_reserve.c index bf5bf7d6b..1145a7775 100644 --- a/lib/formats/json_reserve.c +++ b/lib/formats/json_reserve.c @@ -39,7 +39,7 @@ static int json_reserve_pack_sample(struct io *io, json_t **j, struct sample *sm struct signal *sig; if (smp->flags & SAMPLE_HAS_ORIGIN) - json_created = json_real(time_to_double(&smp->ts.origin)); + json_created = json_integer(time_to_double(&smp->ts.origin) * 1e3); if (smp->flags & SAMPLE_HAS_SEQUENCE) json_sequence = json_integer(smp->sequence); @@ -175,7 +175,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa const char *name, *unit = NULL; double value; - ret = json_unpack_ex(json_value, &err, 0, "{ s: s, s?: s, s: f, s?: F }", + ret = json_unpack_ex(json_value, &err, 0, "{ s: s, s?: s, s: F, s?: F }", "name", &name, "unit", &unit, "value", &value, @@ -211,7 +211,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa smp->flags |= SAMPLE_HAS_VALUES; if (created > 0) { - smp->ts.origin = time_from_double(created); + smp->ts.origin = time_from_double(created * 1e-3); smp->flags |= SAMPLE_HAS_ORIGIN; } From 851bfa78b42b5d9b5f0907f8b97b60d5e20ecdef Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 02:23:32 +0200 Subject: [PATCH 07/11] mqtt: remove broken threading mode and fixed a race condition --- lib/nodes/mqtt.c | 178 ++++++----------------------------------------- 1 file changed, 23 insertions(+), 155 deletions(-) diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index f48106661..6ff753b16 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -28,110 +28,6 @@ #include #include -//#define MQTT_THREAD 1 - -#ifdef MQTT_THREAD -#include - -static struct { - pthread_t thread; - pthread_mutex_t mutex; - int length; - struct pollfd *fds; - struct mosquitto **clients; -} polling = { - .length = 0, - .fds = NULL, - .clients = NULL -}; - -static pthread_t thread; - -static int mqtt_register_client(struct mosquitto *mosq) -{ - pthread_mutex_lock(&polling.mutex); - - /* Add this client to the pollfd list */ - int i = polling.length++; - - polling.fds = realloc(polling.fds, polling.length * sizeof(struct pollfd)); - polling.clients = realloc(polling.clients, polling.length * sizeof(struct mosquitto *)); - - if (!polling.fds || !polling.clients) { - pthread_mutex_unlock(&polling.mutex); - return -1; - } - - polling.clients[i] = mosq; - polling.fds[i].events = POLLIN; - polling.fds[i].fd = mosquitto_socket(mosq); - - pthread_mutex_unlock(&polling.mutex); - - info("MQTT: registered new client"); - - return 0; -} - -static int mqtt_unregister_client(struct mosquitto *mosq) -{ - pthread_mutex_lock(&polling.mutex); - - /* Find client */ - int i; - for (i = 0; i < polling.length; i++) { - if (polling.clients[i] == mosq) - break; - } - - if (i >= polling.length) { - pthread_mutex_unlock(&polling.mutex); - return -1; /* Otherwise something wrong happened! */ - } - - /* Remove this client to the pollfd list */ - memmove(polling.fds + i, polling.fds + i + 1, (polling.length - i - 1) * sizeof(struct pollfd)); - memmove(polling.clients + i, polling.clients + i + 1, (polling.length - i - 1) * sizeof(struct mosquitto *)); - - polling.length--; - - pthread_mutex_unlock(&polling.mutex); - - info("MQTT: deregistered client"); - - return 0; -} - -static void * mqtt_thread(void *ctx) -{ - int ret; - - debug(5, "MQTT: started thread"); - - while (1) { -// pthread_mutex_lock(&polling.mutex); - - debug(5, "MQTT: Polling on %d clients", polling.length); - - ret = poll(polling.fds, polling.length, -1); - if (ret < 0) - serror("Failed to poll"); - - for (int i = 0; i < polling.length; i++) { - if (polling.fds[i].revents & POLLIN) { - ret = mosquitto_loop(polling.clients[i], -1, 1); - if (ret) - warn("MQTT: Loop failed for clients %p!", polling.clients[i]); - } - } - -// pthread_mutex_unlock(&polling.mutex); - } - - return NULL; -} -#endif - static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) { switch (level) { @@ -160,11 +56,13 @@ static void mqtt_connect_cb(struct mosquitto *mosq, void *userdata, int result) struct node *n = (struct node *) userdata; struct mqtt *m = (struct mqtt *) n->_vd; + int ret; + debug(5, "MQTT: Node %s connected to broker %s", node_name(n), m->host); -#ifdef MQTT_THREAD - mqtt_register_client(mosq); -#endif + ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos); + if (ret) + warn("MQTT: failed to subscribe to topic '%s' for node %s", m->subscribe, node_name(n)); } static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int result) @@ -173,10 +71,6 @@ static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int resul struct mqtt *m = (struct mqtt *) n->_vd; debug(5, "MQTT: Node %s disconnected from broker %s", node_name(n), m->host); - -#ifdef MQTT_THREAD - mqtt_unregister_client(mosq); -#endif } static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) @@ -195,8 +89,15 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct } ret = io_sscan(&m->io, msg->payload, msg->payloadlen, NULL, &smp, 1); - if (ret != 1) + if (ret < 0) { + warn("MQTT: Node %s received an invalid message", node_name(n)); + warn(" Payload: %s", (char *) msg->payload); return; + } + if (ret != 1) { + debug(4, "MQTT: skip empty message for node %s", node_name(n)); + return; + } queue_signalled_push(&m->queue, (void *) smp); } @@ -293,6 +194,15 @@ int mqtt_parse(struct node *n, json_t *cfg) if (!m->format) error("Invalid format '%s' for node %s", format, node_name(n)); + // Some checks + ret = mosquitto_sub_topic_check(m->subscribe); + if (ret != MOSQ_ERR_SUCCESS) + error("Invalid subscribe topic: '%s' for node %s", m->subscribe, node_name(n)); + + ret = mosquitto_pub_topic_check(m->publish); + if (ret != MOSQ_ERR_SUCCESS) + error("Invalid publish topic: '%s' for node %s", m->publish, node_name(n)); + return 0; } @@ -364,12 +274,6 @@ int mqtt_start(struct node *n) if (!m->client) return -1; -#ifdef MQTT_THREAD - ret = mosquitto_threaded_set(m->client, 1); - if (ret) - return ret; -#endif - if (m->username && m->password) { ret = mosquitto_username_pw_set(m->client, m->username, m->password); if (ret) @@ -404,19 +308,11 @@ int mqtt_start(struct node *n) if (ret) return ret; -#ifdef MQTT_THREAD - mqtt_register_client(m->client); -#else - ret = mosquitto_loop_start(m->client); - if (ret) - return ret; -#endif - ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); if (ret) return ret; - ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos); + ret = mosquitto_loop_start(m->client); if (ret) return ret; @@ -432,13 +328,9 @@ int mqtt_stop(struct node *n) if (ret) return ret; -#ifdef MQTT_THREAD - mqtt_unregister_client(m->client); -#else ret = mosquitto_loop_stop(m->client, 0); if (ret) return ret; -#endif return 0; } @@ -451,16 +343,6 @@ int mqtt_init() if (ret) return ret; -#ifdef MQTT_THREAD - ret = pthread_mutex_init(&polling.mutex, NULL); - if (ret) - return ret; - - ret = pthread_create(&thread, NULL, mqtt_thread, NULL); - if (ret) - return ret; -#endif - return 0; } @@ -468,20 +350,6 @@ int mqtt_deinit() { int ret; -#ifdef MQTT_THREAD - ret = pthread_cancel(thread); - if (ret) - return ret; - - ret = pthread_join(thread, NULL); - if (ret) - return ret; - - ret = pthread_mutex_destroy(&polling.mutex); - if (ret) - return ret; -#endif - ret = mosquitto_lib_cleanup(); if (ret) return ret; From 26c567e8cb6bc8acb100b4f2b91a6691986fcffc Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 02:33:16 +0200 Subject: [PATCH 08/11] improve logging --- lib/mapping.c | 2 -- lib/nodes/mqtt.c | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/mapping.c b/lib/mapping.c index 0d7160fa7..d4432b9c9 100644 --- a/lib/mapping.c +++ b/lib/mapping.c @@ -137,8 +137,6 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct list *no first_str = strtok(NULL, "-]"); if (first_str) { - info("Mapping: %s", first_str); - if (me->node) first = list_lookup_index(&me->node->in.signals, first_str); diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index 6ff753b16..52ecc1457 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -58,7 +58,7 @@ static void mqtt_connect_cb(struct mosquitto *mosq, void *userdata, int result) int ret; - debug(5, "MQTT: Node %s connected to broker %s", node_name(n), m->host); + info("MQTT: Node %s connected to broker %s", node_name(n), m->host); ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos); if (ret) @@ -70,7 +70,7 @@ static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int resul struct node *n = (struct node *) userdata; struct mqtt *m = (struct mqtt *) n->_vd; - debug(5, "MQTT: Node %s disconnected from broker %s", node_name(n), m->host); + info("MQTT: Node %s disconnected from broker %s", node_name(n), m->host); } static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) From 4a581a5e845ee4f50449c5d12808f48005209b06 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 02:33:41 +0200 Subject: [PATCH 09/11] pipe: fix memleak if failed to sent samples --- src/pipe.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pipe.c b/src/pipe.c index 4c887d3a0..ee27df078 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -160,10 +160,10 @@ static void * send_loop(void *ctx) } sent = node_write(node, smps, scanned); - if (sent < 0) { + if (sent < 0) warn("Failed to sent samples to node %s: reason=%d", node_name(node), sent); - continue; - } + else if (sent < scanned) + warn("Failed to sent %d out of %d samples to node %s", scanned-sent, scanned, node_name(node)); sample_put_many(smps, ready); From 64d18c0260265057fbc97cd80815ba6ebc1dcd85 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 02:53:20 +0200 Subject: [PATCH 10/11] json.reserve: non-matching messages should be silently discarded --- lib/formats/json_reserve.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/formats/json_reserve.c b/lib/formats/json_reserve.c index 1145a7775..fe14d927c 100644 --- a/lib/formats/json_reserve.c +++ b/lib/formats/json_reserve.c @@ -152,7 +152,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa return -1; if (id != json_integer_value(json_target)) - return -1; + return 0; } #else if (json_target && io->input.node) { @@ -161,7 +161,7 @@ static int json_reserve_unpack_sample(struct io *io, json_t *json_smp, struct sa return -1; if (strcmp(target, io->input.node->name)) - return -1; + return 0; } #endif From 5252dc105917dc2b4b1888c63b5f7e62d1f3861b Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 26 May 2018 02:53:40 +0200 Subject: [PATCH 11/11] mqtt: fix memory leak --- lib/nodes/mqtt.c | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index 52ecc1457..bc3692f13 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -96,6 +96,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct } if (ret != 1) { debug(4, "MQTT: skip empty message for node %s", node_name(n)); + sample_put(smp); return; }