diff --git a/etc/example.conf b/etc/example.conf index d1e3981bf..3b799686b 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -240,7 +240,16 @@ nodes = { "duration": 5 # The duration of the test case in seconds (depending on the sending rate) } ) - } + }, + influxdb_node = { + type = "influxdb", + + server = "localhost:8089", + key = "villas", + fields = [ + "a", "b", "c" + ] + } }; diff --git a/include/villas/nodes/influxdb.h b/include/villas/nodes/influxdb.h new file mode 100644 index 000000000..c05fe98f0 --- /dev/null +++ b/include/villas/nodes/influxdb.h @@ -0,0 +1,66 @@ +/** Node-type for InfluxDB. + * + * @file + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** + * @ingroup node + * @addtogroup influxdb InfluxDB node-type + * @{ + */ + +#pragma once + +#include "list.h" + +/* Forward declarations */ +struct node; +struct sample; + +/** Node-type for signal generation. + * @see node_type + */ +struct influxdb { + char *host; + char *port; + char *key; + + struct list fields; + + int sd; +}; + +/** @see node_type::print */ +char * influxdb_print(struct node *n); + +/** @see node_type::parse */ +int influxdb_parse(struct node *n, json_t *cfg); + +/** @see node_type::open */ +int influxdb_open(struct node *n); + +/** @see node_type::close */ +int influxdb_close(struct node *n); + +/** @see node_type::write */ +int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt); + +/** @} */ diff --git a/lib/nodes/Makefile.inc b/lib/nodes/Makefile.inc index 64d384286..e173fecb3 100644 --- a/lib/nodes/Makefile.inc +++ b/lib/nodes/Makefile.inc @@ -36,6 +36,7 @@ WITH_ZEROMQ ?= 1 WITH_NANOMSG ?= 1 WITH_SHMEM ?= 1 WITH_STATS ?= 1 +WITH_INFLUXDB ?= 1 # Enabled loopback node-type ifeq ($(WITH_LOOPBACK),1) @@ -49,6 +50,12 @@ ifeq ($(WITH_CBUILDER),1) LIB_CFLAGS += -DWITH_CBUILDER endif +# Enable InfluxDB node-type +ifeq ($(WITH_INFLUXDB),1) + LIB_SRCS += lib/nodes/influxdb.c + LIB_CFLAGS += -DWITH_INFLUXDB +endif + # Enable stats node-type ifeq ($(WITH_STATS),1) LIB_SRCS += lib/nodes/stats.c diff --git a/lib/nodes/influxdb.c b/lib/nodes/influxdb.c new file mode 100644 index 000000000..1c08f5bd9 --- /dev/null +++ b/lib/nodes/influxdb.c @@ -0,0 +1,206 @@ +/** Node-type for InfluxDB. + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include "node.h" +#include "plugin.h" +#include "config.h" +#include "nodes/influxdb.h" +#include "memory.h" + +int influxdb_parse(struct node *n, json_t *cfg) +{ + struct influxdb *i = n->_vd; + + json_t *json_fields = NULL; + json_error_t err; + int ret; + + char *tmp, *host, *port; + const char *server, *key; + + ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s: s, s?: o }", + "server", &server, + "key", &key, + "fields", &json_fields + ); + if (ret) + jerror(&err, "Failed to parse configuration of node %s", node_name(n)); + + tmp = strdup(server); + + host = strtok(tmp, ":"); + port = strtok(NULL, ""); + + i->key = strdup(key); + i->host = strdup(host); + i->port = strdup(port ? port : "8089"); + + free(tmp); + + ret = list_init(&i->fields); + if (ret) + return ret; + + if (json_fields) { + if (!json_is_array(json_fields)) + return -1; + + json_t *json_field; + size_t idx; + + json_array_foreach(json_fields, idx, json_field) { + if (!json_is_string(json_field)) + return -2; + + const char *field = json_string_value(json_field); + + list_push(&i->fields, strdup(field)); + } + } + + return 0; +} + +int influxdb_open(struct node *n) +{ + int ret; + struct influxdb *i = n->_vd; + + struct addrinfo hints, *servinfo, *p; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + + ret = getaddrinfo(i->host, i->port, &hints, &servinfo); + if (ret) + error("Failed to lookup server: %s", gai_strerror(ret)); + + /* Loop through all the results and connect to the first we can */ + for (p = servinfo; p != NULL; p = p->ai_next) { + i->sd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (i->sd == -1) { + serror("socket"); + continue; + } + + ret = connect(i->sd, p->ai_addr, p->ai_addrlen); + if (ret == -1) { + warn("connect"); + close(i->sd); + continue; + } + + /* If we get here, we must have connected successfully */ + break; + } + + return p ? 0 : -1; +} + +int influxdb_close(struct node *n) +{ + struct influxdb *i= n->_vd; + + close(i->sd); + + list_destroy(&i->fields, NULL, true); + + free(i->host); + free(i->port); + free(i->key); + + return 0; +} + +int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt) +{ + struct influxdb *i = n->_vd; + + char *buf = NULL; + ssize_t sentlen, buflen; + + for (int k = 0; k < cnt; k++) { + /* Key */ + strcatf(&buf, "%s", i->key); + + /* Fields */ + for (int j = 0; j < smps[k]->length; j++) { + strcatf(&buf, "%c", j == 0 ? ' ' : ','); + + if (j < list_length(&i->fields)) { + char *field = list_at(&i->fields, j); + + strcatf(&buf, "%s=", field); + } + else + strcatf(&buf, "value%d=", j); + + switch (sample_get_data_format(smps[k], j)) { + case SAMPLE_DATA_FORMAT_FLOAT: strcatf(&buf, "%f", smps[k]->data[j].f); break; + case SAMPLE_DATA_FORMAT_INT: strcatf(&buf, "%ld", smps[k]->data[j].i); break; + } + } + + /* Timestamp */ + strcatf(&buf, " %ld%09ld\n", smps[k]->ts.origin.tv_sec, smps[k]->ts.origin.tv_nsec); + } + + buflen = strlen(buf) + 1; + sentlen = send(i->sd, buf, buflen, 0); + if (sentlen < 0) + return -1; + else if (sentlen < buflen) + warn("Partial sent"); + + free(buf); + + return cnt; +} + +char * influxdb_print(struct node *n) +{ + struct influxdb *i = n->_vd; + char *buf = NULL; + + strcatf(&buf, "host=%s, port=%s, key=%s, #fields=%zu", i->host, i->port, i->key, list_length(&i->fields)); + + return buf; +} + +static struct plugin p = { + .name = "influxdb", + .description = "Write results to InfluxDB", + .type = PLUGIN_TYPE_NODE, + .node = { + .vectorize = 0, + .size = sizeof(struct influxdb), + .parse = influxdb_parse, + .print = influxdb_print, + .start = influxdb_open, + .stop = influxdb_close, + .write = influxdb_write, + } +}; + +REGISTER_PLUGIN(&p) +LIST_INIT_STATIC(&p.node.instances)