mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
influx: use new signal description for specifying the InfluxDB fields
This commit is contained in:
parent
6c4127da2e
commit
88af9b8c8d
2 changed files with 19 additions and 36 deletions
|
@ -256,9 +256,13 @@ nodes = {
|
|||
|
||||
server = "localhost:8089",
|
||||
key = "villas",
|
||||
fields = [
|
||||
"a", "b", "c"
|
||||
]
|
||||
out = {
|
||||
signals = ( # The signal name will be used as fields for the InfluxDB
|
||||
{ name = "a" },
|
||||
{ name = "b" },
|
||||
{ name = "c" },
|
||||
}
|
||||
}
|
||||
},
|
||||
amqp_node = {
|
||||
type = "amqp",
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
|
||||
#include <villas/node.h>
|
||||
#include <villas/plugin.h>
|
||||
#include <villas/signal.h>
|
||||
#include <villas/config.h>
|
||||
#include <villas/nodes/influxdb.h>
|
||||
#include <villas/memory.h>
|
||||
|
@ -33,7 +34,6 @@ int influxdb_parse(struct node *n, json_t *json)
|
|||
{
|
||||
struct influxdb *i = (struct influxdb *) n->_vd;
|
||||
|
||||
json_t *json_fields = NULL;
|
||||
json_error_t err;
|
||||
int ret;
|
||||
|
||||
|
@ -42,8 +42,7 @@ int influxdb_parse(struct node *n, json_t *json)
|
|||
|
||||
ret = json_unpack_ex(json, &err, 0, "{ s: s, s: s, s?: o }",
|
||||
"server", &server,
|
||||
"key", &key,
|
||||
"fields", &json_fields
|
||||
"key", &key
|
||||
);
|
||||
if (ret)
|
||||
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
|
||||
|
@ -59,27 +58,6 @@ int influxdb_parse(struct node *n, json_t *json)
|
|||
|
||||
free(tmp);
|
||||
|
||||
ret = list_init(&i->fields);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
if (json_fields) {
|
||||
if (!json_is_array(json_fields))
|
||||
return -1;
|
||||
|
||||
json_t *json_field;
|
||||
size_t idx;
|
||||
|
||||
json_array_foreach(json_fields, idx, json_field) {
|
||||
if (!json_is_string(json_field))
|
||||
return -2;
|
||||
|
||||
const char *field = json_string_value(json_field);
|
||||
|
||||
list_push(&i->fields, strdup(field));
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -126,11 +104,12 @@ int influxdb_close(struct node *n)
|
|||
|
||||
close(i->sd);
|
||||
|
||||
list_destroy(&i->fields, NULL, true);
|
||||
|
||||
free(i->host);
|
||||
free(i->port);
|
||||
free(i->key);
|
||||
if (i->host)
|
||||
free(i->host);
|
||||
if (i->port)
|
||||
free(i->port);
|
||||
if (i->key)
|
||||
free(i->key);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -150,10 +129,10 @@ int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
for (int j = 0; j < smps[k]->length; j++) {
|
||||
strcatf(&buf, "%c", j == 0 ? ' ' : ',');
|
||||
|
||||
if (j < list_length(&i->fields)) {
|
||||
char *field = (char *) list_at(&i->fields, j);
|
||||
if (j < list_length(&n->out.signals)) {
|
||||
struct signal *sig = (struct signal *) list_at(&n->out.signals, j);
|
||||
|
||||
strcatf(&buf, "%s=", field);
|
||||
strcatf(&buf, "%s=", sig->name);
|
||||
}
|
||||
else
|
||||
strcatf(&buf, "value%d=", j);
|
||||
|
@ -185,7 +164,7 @@ char * influxdb_print(struct node *n)
|
|||
struct influxdb *i = (struct influxdb *) n->_vd;
|
||||
char *buf = NULL;
|
||||
|
||||
strcatf(&buf, "host=%s, port=%s, key=%s, #fields=%zu", i->host, i->port, i->key, list_length(&i->fields));
|
||||
strcatf(&buf, "host=%s, port=%s, key=%s", i->host, i->port, i->key);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue