diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 82e17afdf..b6aac3b99 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -163,7 +163,8 @@ docker:
- docker build
--build-arg BUILDER_IMAGE=${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV}
--file packaging/docker/Dockerfile.app
- --tag ${DOCKER_IMAGE}:${DOCKER_TAG_DEV} .
+ --tag ${DOCKER_IMAGE}:${DOCKER_TAG} .
+ - docker push ${DOCKER_IMAGE}:${DOCKER_TAG}
- docker push ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV}
tags:
- shell
diff --git a/common b/common
index 277515487..4c9231b18 160000
--- a/common
+++ b/common
@@ -1 +1 @@
-Subproject commit 27751548785eebaeb160f32b923c7949de4faa5e
+Subproject commit 4c9231b18c7f1025f7c56d3b7b7ac0d805b62363
diff --git a/include/villas/hook.h b/include/villas/hook.h
index 28ecfddf6..4451ad2c0 100644
--- a/include/villas/hook.h
+++ b/include/villas/hook.h
@@ -26,6 +26,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
+
/**
* @addtogroup hooks User-defined hook functions
* @ingroup path
@@ -35,6 +36,7 @@
#pragma once
#include
+#include
#include
#ifdef __cplusplus
@@ -44,7 +46,6 @@ extern "C" {
/* Forward declarations */
struct path;
struct sample;
-struct vlist;
/** Descriptor for user defined hooks. See hooks[]. */
struct hook {
@@ -56,6 +57,8 @@ struct hook {
struct path *path;
struct node *node;
+ struct vlist signals;
+
struct hook_type *_vt; /**< C++ like Vtable pointer. */
void *_vd; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */
@@ -63,24 +66,38 @@ struct hook {
};
int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node *n);
-int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path *p, struct node *n);
+
+int hook_init_signals(struct hook *h, struct vlist *signals);
int hook_parse(struct hook *h, json_t *cfg);
+int hook_prepare(struct hook *h, struct vlist *signals);
+
int hook_destroy(struct hook *h);
int hook_start(struct hook *h);
+
int hook_stop(struct hook *h);
int hook_periodic(struct hook *h);
+
int hook_restart(struct hook *h);
int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt);
-int hook_process_list(struct vlist *hs, struct sample *smps[], unsigned cnt);
/** Compare two hook functions with their priority. Used by vlist_sort() */
int hook_cmp_priority(const void *a, const void *b);
+static inline
+struct hook_type * hook_type(struct hook *h)
+{
+ return h->_vt;
+}
+
+int hook_list_init(struct vlist *hs);
+
+int hook_list_destroy(struct vlist *hs);
+
/** Parses an object of hooks
*
* Example:
@@ -95,7 +112,15 @@ int hook_cmp_priority(const void *a, const void *b);
* hooks = [ "print" ]
* }
*/
-int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *p, struct node *n);
+int hook_list_parse(struct vlist *hs, json_t *cfg, int mask, struct path *p, struct node *n);
+
+int hook_list_prepare(struct vlist *hs, struct vlist *sigs, int mask, struct path *p, struct node *n);
+
+int hook_list_prepare_signals(struct vlist *hs, struct vlist *signals);
+
+int hook_list_add(struct vlist *hs, int mask, struct path *p, struct node *n);
+
+int hook_list_process(struct vlist *hs, struct sample *smps[], unsigned cnt);
#ifdef __cplusplus
}
diff --git a/include/villas/hook_type.h b/include/villas/hook_type.h
index d7ff2f642..632b8705d 100644
--- a/include/villas/hook_type.h
+++ b/include/villas/hook_type.h
@@ -1,6 +1,6 @@
/** Hook funktions
*
- * Every path can register a hook function which is called for every received
+ * Every path or node can register a hook function which is called for every received
* message. This can be used to debug the data flow, get statistics
* or alter the message.
*
@@ -62,11 +62,13 @@ struct hook_type {
int (*parse)(struct hook *h, json_t *cfg);
- int (*init)(struct hook *h); /**< Called before path is started to parsed. */
+ int (*init)(struct hook *h); /**< Called before hook is started to parsed. */
int (*destroy)(struct hook *h); /**< Called after path has been stopped to release memory allocated by HOOK_INIT */
- int (*start)(struct hook *h); /**< Called whenever a path is started; before threads are created. */
- int (*stop)(struct hook *h); /**< Called whenever a path is stopped; after threads are destoyed. */
+ int (*init_signals)(struct hook *h);
+
+ int (*start)(struct hook *h); /**< Called whenever a hook is started; before threads are created. */
+ int (*stop)(struct hook *h); /**< Called whenever a hook is stopped; after threads are destoyed. */
int (*periodic)(struct hook *h);/**< Called periodically. Period is set by global 'stats' option in the configuration file. */
int (*restart)(struct hook *h); /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */
diff --git a/include/villas/mapping.h b/include/villas/mapping.h
index 6c9522af3..9515da8f0 100644
--- a/include/villas/mapping.h
+++ b/include/villas/mapping.h
@@ -88,18 +88,20 @@ struct mapping_entry {
};
};
-int mapping_remap(const struct vlist *m, struct sample *remapped, const struct sample *original, const struct stats *s);
-
-int mapping_update(const struct mapping_entry *e, struct sample *remapped, const struct sample *original, const struct stats *s);
+int mapping_update(const struct mapping_entry *e, struct sample *remapped, const struct sample *original);
int mapping_parse(struct mapping_entry *e, json_t *cfg, struct vlist *nodes);
int mapping_parse_str(struct mapping_entry *e, const char *str, struct vlist *nodes);
-int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes);
-
int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str);
+int mapping_list_parse(struct vlist *ml, json_t *cfg, struct vlist *nodes);
+
+int mapping_list_prepare(struct vlist *ml);
+
+int mapping_list_remap(const struct vlist *ml, struct sample *remapped, const struct sample *original);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/villas/node.h b/include/villas/node.h
index c97bf7914..0275343d5 100644
--- a/include/villas/node.h
+++ b/include/villas/node.h
@@ -33,6 +33,7 @@
#include
#include
+#include
#include
#include
#include
@@ -52,17 +53,6 @@ extern "C" {
struct rtnl_cls;
#endif /* WITH_NETEM */
-struct node_direction {
- int enabled;
- int builtin; /**< This node should use built-in hooks by default. */
- int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
-
- struct vlist hooks; /**< List of write hooks (struct hook). */
- struct vlist signals; /**< Signal description. */
-
- json_t *cfg; /**< A JSON object containing the configuration of the node. */
-};
-
/** The data structure for a node.
*
* Every entity which exchanges messages is represented by a node.
@@ -70,6 +60,7 @@ struct node_direction {
*/
struct node {
char *name; /**< A short identifier of the node, only used for configuration and logging */
+ int enabled;
enum state state;
@@ -103,7 +94,7 @@ struct node {
int node_init(struct node *n, struct node_type *vt);
/** Do initialization after parsing the configuration */
-int node_init2(struct node *n);
+int node_prepare(struct node *n);
/** Parse settings of a node.
*
@@ -123,7 +114,7 @@ int node_parse(struct node *n, json_t *cfg, const char *name);
* @param nodes The nodes will be added to this list.
* @param all This list contains all valid nodes.
*/
-int node_parse_list(struct vlist *list, json_t *cfg, struct vlist *all);
+int node_list_parse(struct vlist *list, json_t *cfg, struct vlist *all);
/** Parse the list of signal definitions. */
int node_parse_signals(struct vlist *list, json_t *cfg);
@@ -199,11 +190,19 @@ int node_poll_fds(struct node *n, int fds[]);
int node_netem_fds(struct node *n, int fds[]);
-struct node_type * node_type(struct node *n);
+static inline
+struct node_type * node_type(struct node *n)
+{
+ return n->_vt;
+}
struct memory_type * node_memory_type(struct node *n, struct memory_type *parent);
-int node_is_valid_name(const char *name);
+bool node_is_valid_name(const char *name);
+
+bool node_is_enabled(const struct node *n);
+
+struct vlist * node_get_signals(struct node *n, enum node_dir dir);
#ifdef __cplusplus
}
diff --git a/include/villas/node_direction.h b/include/villas/node_direction.h
new file mode 100644
index 000000000..1fd86ccfb
--- /dev/null
+++ b/include/villas/node_direction.h
@@ -0,0 +1,74 @@
+/** Node direction
+ *
+ * @file
+ * @author Steffen Vogel
+ * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+/**
+ * @addtogroup node Node
+ * @{
+ */
+
+#pragma once
+
+#include
+
+#include
+#include
+
+/* Forward declarations */
+struct node;
+
+enum node_dir {
+ NODE_DIR_IN, /**< VILLASnode is receiving/reading */
+ NODE_DIR_OUT /**< VILLASnode is sending/writing */
+};
+
+struct node_direction {
+ enum state state;
+ enum node_dir direction;
+
+ int enabled;
+ int builtin; /**< This node should use built-in hooks by default. */
+ int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
+
+ struct vlist hooks; /**< List of read / write hooks (struct hook). */
+ struct vlist signals; /**< Signal description. */
+
+ json_t *cfg; /**< A JSON object containing the configuration of the node. */
+};
+
+int node_direction_init(struct node_direction *nd, enum node_dir dir, struct node *n);
+
+int node_direction_parse(struct node_direction *nd, struct node *n, json_t *cfg);
+
+int node_direction_check(struct node_direction *nd, struct node *n);
+
+int node_direction_prepare(struct node_direction *nd, struct node *n);
+
+int node_direction_start(struct node_direction *nd, struct node *n);
+
+int node_direction_stop(struct node_direction *nd, struct node *n);
+
+int node_direction_destroy(struct node_direction *nd, struct node *n);
+
+struct vlist * node_direction_get_signals(struct node_direction *nd);
+
+/** @} */
diff --git a/include/villas/path.h b/include/villas/path.h
index 622c263a3..e899fe036 100644
--- a/include/villas/path.h
+++ b/include/villas/path.h
@@ -49,21 +49,6 @@ extern "C" {
struct stats;
struct node;
-struct path_source {
- struct node *node;
-
- bool masked;
-
- struct pool pool;
- struct vlist mappings; /**< List of mappings (struct mapping_entry). */
-};
-
-struct path_destination {
- struct node *node;
-
- struct queue queue;
-};
-
/** The register mode determines under which condition the path is triggered. */
enum path_mode {
PATH_MODE_ANY, /**< The path is triggered whenever one of the sources receives samples. */
@@ -112,7 +97,7 @@ struct path {
/** Initialize internal data structures. */
int path_init(struct path *p);
-int path_init2(struct path *p);
+int path_prepare(struct path *p);
/** Check if path configuration is proper. */
int path_check(struct path *p);
@@ -172,7 +157,13 @@ int path_uses_node(struct path *p, struct node *n);
*/
int path_parse(struct path *p, json_t *cfg, struct vlist *nodes);
-int path_is_simple(struct path *p);
+bool path_is_simple(const struct path *p);
+
+bool path_is_enabled(const struct path *p);
+
+bool path_is_reversed(const struct path *p);
+
+struct vlist * path_get_signals(struct path *p);
/** @} */
diff --git a/include/villas/path_destination.h b/include/villas/path_destination.h
new file mode 100644
index 000000000..fede6ffb6
--- /dev/null
+++ b/include/villas/path_destination.h
@@ -0,0 +1,60 @@
+/** Path destination
+ *
+ * @file
+ * @author Steffen Vogel
+ * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+/** A path connects one input node to multiple output nodes (1-to-n).
+ *
+ * @addtogroup path Path
+ * @{
+ */
+
+#pragma once
+
+#include
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Forward declarations */
+struct path;
+struct sample;
+
+struct path_destination {
+ struct node *node;
+
+ struct queue queue;
+};
+
+int path_destination_init(struct path_destination *pd, int queuelen);
+
+int path_destination_destroy(struct path_destination *pd);
+
+void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt);
+
+void path_destination_write(struct path_destination *pd, struct path *p);
+
+#ifdef __cplusplus
+}
+#endif
+
+/** @} */
diff --git a/include/villas/path_source.h b/include/villas/path_source.h
new file mode 100644
index 000000000..4114dcda2
--- /dev/null
+++ b/include/villas/path_source.h
@@ -0,0 +1,62 @@
+/** Message source
+ *
+ * @file
+ * @author Steffen Vogel
+ * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+/** A path connects one input node to multiple output nodes (1-to-n).
+ *
+ * @addtogroup path Path
+ * @{
+ */
+
+#pragma once
+
+#include
+#include
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Forward declarations */
+struct path;
+struct sample;
+
+struct path_source {
+ struct node *node;
+
+ bool masked;
+
+ struct pool pool;
+ struct vlist mappings; /**< List of mappings (struct mapping_entry). */
+};
+
+int path_source_init(struct path_source *ps);
+
+int path_source_destroy(struct path_source *ps);
+
+int path_source_read(struct path_source *ps, struct path *p, int i);
+
+#ifdef __cplusplus
+}
+#endif
+
+/** @} */
diff --git a/include/villas/plugin.h b/include/villas/plugin.h
index 017b02189..d2a4c9571 100644
--- a/include/villas/plugin.h
+++ b/include/villas/plugin.h
@@ -53,7 +53,7 @@ __attribute__((constructor(110))) static void UNIQUE(__ctor)() {\
} \
__attribute__((destructor(110))) static void UNIQUE(__dtor)() { \
if (plugins.state != STATE_DESTROYED) \
- vlist_remove(&plugins, p); \
+ vlist_remove_all(&plugins, p); \
}
extern struct vlist plugins;
diff --git a/include/villas/sample.h b/include/villas/sample.h
index f5bc0d35f..9659e306a 100644
--- a/include/villas/sample.h
+++ b/include/villas/sample.h
@@ -136,6 +136,10 @@ int sample_decref_many(struct sample *smps[], int cnt);
enum signal_type sample_format(const struct sample *s, unsigned idx);
+void sample_data_insert(struct sample *smp, const union signal_data *src, size_t offset, size_t len);
+
+void sample_data_remove(struct sample *smp, size_t offset, size_t len);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/villas/signal.h b/include/villas/signal.h
index d57f85851..93d054437 100644
--- a/include/villas/signal.h
+++ b/include/villas/signal.h
@@ -106,11 +106,12 @@ int signal_parse(struct signal *s, json_t *cfg);
/** Initialize signal from a mapping_entry. */
int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, unsigned index);
+int signal_list_init(struct vlist *list);
+int signal_list_destroy(struct vlist *list);
int signal_list_parse(struct vlist *list, json_t *cfg);
-
int signal_list_generate(struct vlist *list, unsigned len, enum signal_type fmt);
-
void signal_list_dump(const struct vlist *list, const union signal_data *data, int len);
+int signal_list_copy(struct vlist *dst, const struct vlist *src);
enum signal_type signal_type_from_str(const char *str);
diff --git a/include/villas/super_node.hpp b/include/villas/super_node.hpp
index 805832d74..de484788f 100644
--- a/include/villas/super_node.hpp
+++ b/include/villas/super_node.hpp
@@ -88,10 +88,14 @@ public:
int check();
/** Initialize after parsing the configuration file. */
+ void prepare();
void start();
void stop();
void run();
+ void preparePaths();
+ void prepareNodes();
+
void startPaths();
void startNodes();
void startNodeTypes();
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index 22a797a8e..055ac29aa 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -46,10 +46,13 @@ set(LIB_SRC
memory/managed.c
sample.c
path.c
+ path_source.c
+ path_destination.c
node.c
+ node_type.c
+ node_direction.c
memory.c
plugin.c
- node_type.c
stats.c
mapping.c
shmem.c
diff --git a/lib/config_helper.c b/lib/config_helper.c
index 23b924598..6c3b7471a 100644
--- a/lib/config_helper.c
+++ b/lib/config_helper.c
@@ -165,9 +165,26 @@ int json_to_config(json_t *json, config_setting_t *parent)
}
#endif /* LIBCONFIG_FOUND */
+void json_object_extend_key_value_token(json_t *obj, const char *key, const char *value)
+{
+ char *str = strdup(value);
+ char *delim = ",";
+
+ char *lasts;
+ char *token = strtok_r(str, delim, &lasts);
+
+ while (token) {
+ json_object_extend_key_value(obj, key, token);
+
+ token = strtok_r(NULL, delim, &lasts);
+ }
+
+ free(str);
+}
+
void json_object_extend_key_value(json_t *obj, const char *key, const char *value)
{
- char *end, *cpy, *key1, *key2;
+ char *end, *cpy, *key1, *key2, *lasts;
double real;
long integer;
@@ -178,8 +195,8 @@ void json_object_extend_key_value(json_t *obj, const char *key, const char *valu
subobj = obj;
cpy = strdup(key);
- key1 = strtok(cpy, ".");
- key2 = strtok(NULL, ".");
+ key1 = strtok_r(cpy, ".", &lasts);
+ key2 = strtok_r(NULL, ".", &lasts);
while (key1 && key2) {
existing = json_object_get(subobj, key1);
@@ -193,7 +210,7 @@ void json_object_extend_key_value(json_t *obj, const char *key, const char *valu
}
key1 = key2;
- key2 = strtok(NULL, ".");
+ key2 = strtok_r(NULL, ".", &lasts);
}
/* Try to parse as integer */
@@ -254,7 +271,7 @@ json_t * json_load_cli(int argc, const char *argv[])
const char *key = NULL;
const char *value = NULL;
const char *sep;
- char *cpy;
+ char *cpy, *lasts;
json_t *json = json_object();
@@ -274,10 +291,10 @@ json_t * json_load_cli(int argc, const char *argv[])
if (sep) {
cpy = strdup(key);
- key = strtok(cpy, "=");
- value = strtok(NULL, "");
+ key = strtok_r(cpy, "=", &lasts);
+ value = strtok_r(NULL, "", &lasts);
- json_object_extend_key_value(json, key, value);
+ json_object_extend_key_value_token(json, key, value);
free(cpy);
key = NULL;
@@ -291,7 +308,7 @@ json_t * json_load_cli(int argc, const char *argv[])
value = opt;
- json_object_extend_key_value(json, key, value);
+ json_object_extend_key_value_token(json, key, value);
key = NULL;
}
}
@@ -324,17 +341,17 @@ int json_object_extend(json_t *obj, json_t *merge)
int json_object_extend_str(json_t *obj, const char *str)
{
- char *key, *value, *cpy;
+ char *key, *value, *cpy, *lasts;
cpy = strdup(str);
- key = strtok(cpy, "=");
- value = strtok(NULL, "");
+ key = strtok_r(cpy, "=", &lasts);
+ value = strtok_r(NULL, "", &lasts);
if (!key || !value)
return -1;
- json_object_extend_key_value(obj, key, value);
+ json_object_extend_key_value_token(obj, key, value);
free(cpy);
diff --git a/lib/hook.c b/lib/hook.c
index 17b693e7a..751f32729 100644
--- a/lib/hook.c
+++ b/lib/hook.c
@@ -42,14 +42,43 @@ int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node
h->path = p;
h->node = n;
- h->_vt = vt;
- h->_vd = alloc(vt->size);
+ h->signals.state = STATE_DESTROYED;
- ret = h->_vt->init ? h->_vt->init(h) : 0;
+ ret = signal_list_init(&h->signals);
if (ret)
return ret;
- h->state = STATE_INITIALIZED;
+ h->_vt = vt;
+ h->_vd = alloc(vt->size);
+
+ ret = hook_type(h)->init ? hook_type(h)->init(h) : 0;
+ if (ret)
+ return ret;
+
+ // We dont need to parse builtin hooks
+ h->state = hook_type(h)->flags & HOOK_BUILTIN ? STATE_PARSED : STATE_INITIALIZED;
+
+ return 0;
+}
+
+int hook_prepare(struct hook *h, struct vlist *signals)
+{
+ int ret;
+
+ assert(h->state == STATE_PARSED);
+
+ if (!h->enabled)
+ return 0;
+
+ ret = signal_list_copy(&h->signals, signals);
+ if (ret)
+ return -1;
+
+ ret = hook_type(h)->init_signals ? hook_type(h)->init_signals(h) : 0;
+ if (ret)
+ return ret;
+
+ h->state = STATE_PREPARED;
return 0;
}
@@ -66,9 +95,9 @@ int hook_parse(struct hook *h, json_t *cfg)
"enabled", &h->enabled
);
if (ret)
- jerror(&err, "Failed to parse configuration of hook '%s'", hook_type_name(h->_vt));
+ jerror(&err, "Failed to parse configuration of hook '%s'", hook_type_name(hook_type(h)));
- ret = h->_vt->parse ? h->_vt->parse(h, cfg) : 0;
+ ret = hook_type(h)->parse ? hook_type(h)->parse(h, cfg) : 0;
if (ret)
return ret;
@@ -82,9 +111,13 @@ int hook_destroy(struct hook *h)
{
int ret;
- assert(h->state != STATE_DESTROYED);
+ assert(h->state != STATE_DESTROYED && h->state != STATE_STARTED);
- ret = h->_vt->destroy ? h->_vt->destroy(h) : 0;
+ ret = signal_list_destroy(&h->signals);
+ if (ret)
+ return ret;
+
+ ret = hook_type(h)->destroy ? hook_type(h)->destroy(h) : 0;
if (ret)
return ret;
@@ -98,41 +131,53 @@ int hook_destroy(struct hook *h)
int hook_start(struct hook *h)
{
+ int ret;
+ assert(h->state == STATE_PREPARED);
+
if (!h->enabled)
return 0;
- if (h->_vt->start) {
- debug(LOG_HOOK | 10, "Start hook %s: priority=%d", hook_type_name(h->_vt), h->priority);
+ debug(LOG_HOOK | 10, "Start hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
- return h->_vt->start(h);
- }
- else
- return 0;
+ ret = hook_type(h)->start ? hook_type(h)->start(h) : 0;
+ if (ret)
+ return ret;
+
+ h->state = STATE_STARTED;
+
+ return 0;
}
int hook_stop(struct hook *h)
{
+ int ret;
+ assert(h->state == STATE_STARTED);
+
if (!h->enabled)
return 0;
- if (h->_vt->stop) {
- debug(LOG_HOOK | 10, "Stopping hook %s: priority=%d", hook_type_name(h->_vt), h->priority);
+ debug(LOG_HOOK | 10, "Stopping hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
- return h->_vt->stop(h);
- }
- else
- return 0;
+ ret = hook_type(h)->stop ? hook_type(h)->stop(h) : 0;
+ if (ret)
+ return ret;
+
+ h->state = STATE_STOPPED;
+
+ return 0;
}
int hook_periodic(struct hook *h)
{
+ assert(h->state == STATE_STARTED);
+
if (!h->enabled)
return 0;
- if (h->_vt->periodic) {
- debug(LOG_HOOK | 10, "Periodic hook %s: priority=%d", hook_type_name(h->_vt), h->priority);
+ if (hook_type(h)->periodic) {
+ debug(LOG_HOOK | 10, "Periodic hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
- return h->_vt->periodic(h);
+ return hook_type(h)->periodic(h);
}
else
return 0;
@@ -140,47 +185,36 @@ int hook_periodic(struct hook *h)
int hook_restart(struct hook *h)
{
+ int ret;
+ assert(h->state == STATE_STARTED);
+
if (!h->enabled)
return 0;
- if (h->_vt->restart) {
- debug(LOG_HOOK | 10, "Restarting hook %s: priority=%d", hook_type_name(h->_vt), h->priority);
+ debug(LOG_HOOK | 10, "Restarting hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
- return h->_vt->restart(h);
- }
- else
- return 0;
+ ret = hook_type(h)->restart ? hook_type(h)->restart(h) : 0;
+ if (ret)
+ return ret;
+
+ return 0;
}
int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
+ int ret;
+ assert(h->state == STATE_STARTED);
+
if (!h->enabled)
return 0;
- if (h->_vt->process) {
- debug(LOG_HOOK | 10, "Process hook %s: priority=%d, cnt=%d", hook_type_name(h->_vt), h->priority, *cnt);
+ debug(LOG_HOOK | 10, "Process hook %s: priority=%d, cnt=%d", hook_type_name(hook_type(h)), h->priority, *cnt);
- return h->_vt->process(h, smps, cnt);
- }
- else
- return 0;
-}
+ ret = hook_type(h)->process ? hook_type(h)->process(h, smps, cnt) : 0;
+ if (ret)
+ return ret;
-int hook_process_list(struct vlist *hs, struct sample *smps[], unsigned cnt)
-{
- unsigned ret;
-
- for (size_t i = 0; i < vlist_length(hs); i++) {
- struct hook *h = (struct hook *) vlist_at(hs, i);
-
- ret = hook_process(h, smps, &cnt);
- if (ret || !cnt)
- /* Abort hook processing if earlier hooks removed all samples
- * or they returned something non-zero */
- break;
- }
-
- return cnt;
+ return 0;
}
int hook_cmp_priority(const void *a, const void *b)
@@ -191,7 +225,29 @@ int hook_cmp_priority(const void *a, const void *b)
return ha->priority - hb->priority;
}
-int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *o, struct node *n)
+int hook_list_init(struct vlist *hs)
+{
+ int ret;
+
+ ret = vlist_init(hs);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+int hook_list_destroy(struct vlist *hs)
+{
+ int ret;
+
+ ret = vlist_destroy(hs, (dtor_cb_t) hook_destroy, true);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+int hook_list_parse(struct vlist *hs, json_t *cfg, int mask, struct path *o, struct node *n)
{
if (!json_is_array(cfg))
error("Hooks must be configured as a list of objects");
@@ -225,17 +281,42 @@ int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *o, s
if (ret)
jerror(&err, "Failed to parse hook configuration");
- vlist_push(list, h);
+ vlist_push(hs, h);
}
return 0;
}
-int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path *p, struct node *n)
+int hook_list_prepare(struct vlist *hs, struct vlist *sigs, int m, struct path *p, struct node *n)
{
int ret;
- assert(l->state == STATE_INITIALIZED);
+ /* Add internal hooks if they are not already in the list */
+ ret = hook_list_add(hs, m, p, n);
+ if (ret)
+ return ret;
+
+ /* We sort the hooks according to their priority */
+ vlist_sort(hs, hook_cmp_priority);
+
+ for (size_t i = 0; i < vlist_length(hs); i++) {
+ struct hook *h = (struct hook *) vlist_at(hs, i);
+
+ ret = hook_prepare(h, sigs);
+ if (ret)
+ return ret;
+
+ sigs = &h->signals;
+ }
+
+ return 0;
+}
+
+int hook_list_add(struct vlist *hs, int mask, struct path *p, struct node *n)
+{
+ int ret;
+
+ assert(hs->state == STATE_INITIALIZED);
for (size_t i = 0; i < vlist_length(&plugins); i++) {
struct plugin *q = (struct plugin *) vlist_at(&plugins, i);
@@ -246,11 +327,7 @@ int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path
if (q->type != PLUGIN_TYPE_HOOK)
continue;
- if (builtin &&
- vt->flags & HOOK_BUILTIN &&
- vt->flags & mask)
- {
-
+ if ((vt->flags & mask) == mask) {
h = (struct hook *) alloc(sizeof(struct hook));
if (!h)
return -1;
@@ -259,7 +336,7 @@ int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path
if (ret)
return ret;
- vlist_push(l, h);
+ vlist_push(hs, h);
}
}
@@ -270,3 +347,20 @@ const char * hook_type_name(struct hook_type *vt)
{
return plugin_name(vt);
}
+
+int hook_list_process(struct vlist *hs, struct sample *smps[], unsigned cnt)
+{
+ unsigned ret;
+
+ for (size_t i = 0; i < vlist_length(hs); i++) {
+ struct hook *h = (struct hook *) vlist_at(hs, i);
+
+ ret = hook_process(h, smps, &cnt);
+ if (ret || !cnt)
+ /* Abort hook processing if earlier hooks removed all samples
+ * or they returned something non-zero */
+ break;
+ }
+
+ return cnt;
+}
diff --git a/lib/hooks/CMakeLists.txt b/lib/hooks/CMakeLists.txt
index 2872a0354..afc12824d 100644
--- a/lib/hooks/CMakeLists.txt
+++ b/lib/hooks/CMakeLists.txt
@@ -36,6 +36,7 @@ set(HOOK_SRC
cast.c
average.c
dump.c
+ dp.c
)
if(WITH_IO)
diff --git a/lib/hooks/average.c b/lib/hooks/average.c
index 010c76e2d..340fe0096 100644
--- a/lib/hooks/average.c
+++ b/lib/hooks/average.c
@@ -29,43 +29,130 @@
#include
#include
#include
+#include
struct average {
uint64_t mask;
int offset;
+
+ struct bitset mask;
+ struct vlist signal_names;
};
+static int average_init(struct hook *h)
+{
+ int ret;
+ struct average *a = (struct average *) h->_vd;
+
+ ret = vlist_init(&a->signal_names);
+ if (ret)
+ return ret;
+
+ ret = bitset_init(&a->mask, 128);
+ if (ret)
+ return ret;
+
+ bitset_clear_all(&a->mask);
+
+ return 0;
+}
+
+static int average_destroy(struct hook *h)
+{
+ int ret;
+ struct average *a = (struct average *) h->_vd;
+
+ ret = vlist_destroy(&a->signal_names, NULL, true);
+ if (ret)
+ return ret;
+
+ ret = bitset_destroy(&a->mask);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+static int average_prepare(struct hook *h)
+{
+ int ret;
+ struct average *a = (struct average *) h->_vd;
+ struct signal *avg_sig;
+
+ /* Setup mask */
+ for (size_t i = 0; i < vlist_length(&a->signal_names); i++) {
+ char *signal_name = (char *) vlist_at_safe(&a->signal_names, i);
+
+ int index = vlist_lookup_index(&a->signal_names, signal_name);
+ if (index < 0)
+ return -1;
+
+ bitset_set(&a->mask, index);
+ }
+
+ /* Add averaged signal */
+ avg_sig = signal_create("average", NULL, SIGNAL_TYPE_FLOAT);
+ if (!avg_sig)
+ return -1;
+
+ ret = vlist_insert(&h->signals, a->offset, avg_sig);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
static int average_parse(struct hook *h, json_t *cfg)
{
- struct average *p = (struct average *) h->_vd;
+ struct average *a = (struct average *) h->_vd;
int ret;
+ size_t i;
json_error_t err;
+ json_t *json_signals, *json_signal;
- ret = json_unpack_ex(cfg, &err, 0, "{ s: i, s: I }",
- "offset", &p->offset,
- "mask", &p->mask
+ ret = json_unpack_ex(cfg, &err, 0, "{ s: i, s: o }",
+ "offset", &a->offset,
+ "signals", &json_signals
);
if (ret)
jerror(&err, "Failed to parse configuration of hook '%s'", hook_type_name(h->_vt));
+ if (!json_is_array(json_signals))
+ error("Setting 'signals' of hook '%s' must be a list of signal names", hook_type_name(h->_vt));
+
+ json_array_foreach(json_signals, i, json_signal) {
+ switch (json_typeof(json_signal)) {
+ case JSON_STRING:
+ vlist_push(&a->signal_names, strdup(json_string_value(json_signal)));
+ break;
+
+ case JSON_INTEGER:
+ bitset_set(&a->mask, json_integer_value(json_signal));
+ break;
+
+ default:
+ error("Invalid value for setting 'signals' in hook '%s'", hook_type_name(h->_vt));
+ }
+ }
+
return 0;
}
static int average_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
- struct average *p = (struct average *) h->_vd;
+ struct average *a = (struct average *) h->_vd;
for (int i = 0; i < *cnt; i++) {
struct sample *smp = smps[i];
- double sum = 0;
+ double avg, sum = 0;
int n = 0;
for (int k = 0; k < smp->length; k++) {
- if (!(p->mask & (1LL << k)))
+ if (!bitset_test(&a->mask, k))
continue;
- switch (sample_format(smps[i], k)) {
+ switch (sample_format(smp, k)) {
case SIGNAL_TYPE_INTEGER:
sum += smp->data[k].i;
break;
@@ -84,7 +171,9 @@ static int average_process(struct hook *h, struct sample *smps[], unsigned *cnt)
n++;
}
- smp->data[p->offset].f = sum / n;
+ avg = n == 0 ? 0 : sum / n;
+ sample_data_insert(smp, (union signal_data *) &avg, a->offset, 1);
+ smp->signals = &h->signals;
}
return 0;
@@ -99,6 +188,9 @@ static struct plugin p = {
.priority = 99,
.parse = average_parse,
.process = average_process,
+ .init = average_init,
+ .init_signals = average_prepare,
+ .destroy = average_destroy,
.size = sizeof(struct average)
}
};
diff --git a/lib/hooks/cast.c b/lib/hooks/cast.c
index c74b41563..7d5382815 100644
--- a/lib/hooks/cast.c
+++ b/lib/hooks/cast.c
@@ -29,52 +29,57 @@
#include
#include
-#include
-#include
#include
struct cast {
- struct vlist operations;
+ int signal_index;
+ char *signal_name;
- struct vlist signals;
+ enum signal_type new_type;
+ char *new_name;
+ char *new_unit;
};
-static int cast_init(struct hook *h)
+static int cast_prepare(struct hook *h)
{
- int ret;
+ struct signal *orig_sig, *new_sig;
struct cast *c = (struct cast *) h->_vd;
- struct vlist *orig_signals;
- if (h->node)
- orig_signals = &h->node->in.signals;
- else if (h->path)
- orig_signals = &h->path->signals;
- else
- return -1;
-
- ret = vlist_init(&c->signals);
- if (ret)
- return ret;
-
- /* Copy original signal list */
- for (int i = 0; i < vlist_length(orig_signals); i++) {
- struct signal *orig_sig = vlist_at(orig_signals, i);
- struct signal *new_sig = signal_copy(orig_sig);
-
- vlist_push(&c->signals, new_sig);
+ if (c->signal_name) {
+ c->signal_index = vlist_lookup_index(&h->signals, c->signal_name);
+ if (c->signal_index < 0)
+ return -1;
}
+ char *name, *unit;
+ enum signal_type type;
+
+ orig_sig = vlist_at_safe(&h->signals, c->signal_index);
+
+ type = c->new_type != SIGNAL_TYPE_AUTO ? c->new_type : orig_sig->type;
+ name = c->new_name ? c->new_name : orig_sig->name;
+ unit = c->new_unit ? c->new_unit : orig_sig->unit;
+
+ new_sig = signal_create(name, unit, type);
+
+ vlist_set(&h->signals, c->signal_index, new_sig);
+ signal_decref(orig_sig);
+
return 0;
}
static int cast_destroy(struct hook *h)
{
- int ret;
struct cast *c = (struct cast *) h->_vd;
- ret = vlist_destroy(&c->signals, (dtor_cb_t) signal_decref, false);
- if (ret)
- return ret;
+ if (c->signal_name)
+ free(c->signal_name);
+
+ if (c->new_name)
+ free(c->new_name);
+
+ if (c->new_unit)
+ free(c->new_unit);
return 0;
}
@@ -83,80 +88,51 @@ static int cast_parse(struct hook *h, json_t *cfg)
{
int ret;
struct cast *c = (struct cast *) h->_vd;
- struct signal *sig;
- size_t i;
- json_t *json_signals;
+ json_error_t err;
json_t *json_signal;
- ret = json_unpack(cfg, "{ s: o }",
- "signals", &json_signals
+ const char *new_name = NULL;
+ const char *new_unit = NULL;
+ const char *new_type = NULL;
+
+ ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: s, s?: s, s?: s }",
+ "signal", &json_signal,
+ "new_type", &new_type,
+ "new_name", &new_name,
+ "new_unit", &new_unit
);
if (ret)
return ret;
- if (json_is_array(json_signals))
- return -1;
+ switch (json_typeof(json_signal)) {
+ case JSON_STRING:
+ c->signal_name = strdup(json_string_value(json_signal));
+ break;
- json_array_foreach(json_signals, i, json_signal) {
- int index = -1;
- const char *name = NULL;
+ case JSON_INTEGER:
+ c->signal_name = NULL;
+ c->signal_index = json_integer_value(json_signal);
+ break;
- const char *new_name = NULL;
- const char *new_unit = NULL;
- const char *new_format = NULL;
-
- ret = json_unpack(json_signal, "{ s?: s, s?: i, s?: s, s?: s, s?: s }",
- "name", &name,
- "index", &index,
- "new_format", &new_format,
- "new_name", &new_name,
- "new_unit", &new_unit
- );
- if (ret)
- return ret;
-
- /* Find matching original signal descriptor */
- if (index >= 0 && name != NULL)
- return -1;
-
- if (index < 0 && name == NULL)
- return -1;
-
- sig = name
- ? vlist_lookup(&c->signals, name)
- : vlist_at_safe(&c->signals, index);
- if (!sig)
- return -1;
-
- /* Cast to new format */
- if (new_format) {
- enum signal_type fmt;
-
- fmt = signal_type_from_str(new_format);
- if (fmt == SIGNAL_TYPE_INVALID)
- return -1;
-
- sig->type = fmt;
- }
-
- /* Set new name */
- if (new_name) {
- if (sig->name)
- free(sig->name);
-
- sig->name = strdup(new_name);
- }
-
- /* Set new unit */
- if (new_unit) {
- if (sig->unit)
- free(sig->unit);
-
- sig->unit = strdup(new_unit);
- }
+ default:
+ error("Invalid value for setting 'signal' in hook '%s'", hook_type_name(h->_vt));
}
+ if (new_type) {
+ c->new_type = signal_type_from_str(new_type);
+ if (c->new_type == SIGNAL_TYPE_INVALID)
+ return -1;
+ }
+ else
+ c->new_type = SIGNAL_TYPE_AUTO; // We use this constant to indicate that we dont want to change the type
+
+ if (new_name)
+ c->new_name = strdup(new_name);
+
+ if (new_unit)
+ c->new_unit = strdup(new_unit);
+
return 0;
}
@@ -167,15 +143,13 @@ static int cast_process(struct hook *h, struct sample *smps[], unsigned *cnt)
for (int i = 0; i < *cnt; i++) {
struct sample *smp = smps[i];
- for (int j = 0; j < smp->length; j++) {
- struct signal *orig_sig = vlist_at(smp->signals, j);
- struct signal *new_sig = vlist_at(&c->signals, j);
+ struct signal *orig_sig = vlist_at(smp->signals, c->signal_index);
+ struct signal *new_sig = vlist_at(&h->signals, c->signal_index);
- signal_data_cast(&smp->data[j], orig_sig, new_sig);
- }
+ signal_data_cast(&smp->data[c->signal_index], orig_sig, new_sig);
/* Replace signal descriptors of sample */
- smp->signals = &c->signals;
+ smp->signals = &h->signals;
}
return 0;
@@ -183,13 +157,13 @@ static int cast_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static struct plugin p = {
.name = "cast",
- .description = "Cast signals",
+ .description = "Cast signals types",
.type = PLUGIN_TYPE_HOOK,
.hook = {
.flags = HOOK_NODE_READ | HOOK_PATH,
.priority = 99,
- .init = cast_init,
.destroy = cast_destroy,
+ .init_signals = cast_prepare,
.parse = cast_parse,
.process = cast_process,
.size = sizeof(struct cast)
diff --git a/lib/hooks/dp.c b/lib/hooks/dp.c
new file mode 100644
index 000000000..d540b6332
--- /dev/null
+++ b/lib/hooks/dp.c
@@ -0,0 +1,341 @@
+/** Dynamic Phasor Interface Algorithm hook.
+ *
+ * @author Steffen Vogel
+ * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+/** @addtogroup hooks Hook functions
+ * @{
+ */
+
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+#define J _Complex_I
+
+struct dp {
+ char *signal_name;
+ int signal_index;
+
+ int offset;
+ int inverse;
+
+ double f0;
+ double dt;
+ double t;
+
+ double complex *coeffs;
+ int *fharmonics;
+ int fharmonics_len;
+
+ struct window window;
+};
+
+static void dp_step(struct dp *d, double *in, float complex *out)
+{
+ int n = d->window.steps;
+ double r = 0.9999999999;
+ double complex om, corr;
+ double newest = *in;
+ double oldest = window_update(&d->window, newest);
+
+ for (int i = 0; i < d->fharmonics_len; i++) {
+ om = 2.0 * M_PI * J * d->fharmonics[i] / n;
+
+ /* Recursive update */
+ //d->coeffs[i] = cexp(om) * (d->coeffs[i] + (newest - oldest));
+ d->coeffs[i] = d->coeffs[i] * r * cexp(om) - powf(r, n) * oldest + newest;
+
+ /* Correction for stationary phasor */
+ corr = cexp(-om * (d->t - (d->window.steps + 1)));
+
+ out[i] = (2.0 / d->window.steps) * (d->coeffs[i] * corr);
+
+ /* DC component */
+ if (d->fharmonics[i] == 0)
+ out[i] /= 2.0;
+ }
+}
+
+static void dp_istep(struct dp *d, complex float *in, double *out)
+{
+ double complex value = 0;
+
+ /* Reconstruct the original signal */
+ for (int i = 0; i < d->fharmonics_len; i++) {
+ double freq = d->fharmonics[i];
+ double complex coeff = in[i];
+
+ value += coeff * cexp(2.0 * M_PI * freq * d->t);
+ }
+
+ *out = creal(value);
+}
+
+static int dp_start(struct hook *h)
+{
+ int ret;
+ struct dp *d = (struct dp *) h->_vd;
+
+ d->t = 0;
+
+ for (int i = 0; i < d->fharmonics_len; i++)
+ d->coeffs[i] = 0;
+
+ ret = window_init(&d->window, (1.0 / d->f0) / d->dt, 0.0);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+static int dp_stop(struct hook *h)
+{
+ int ret;
+ struct dp *d = (struct dp *) h->_vd;
+
+ ret = window_destroy(&d->window);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+static int dp_init(struct hook *h)
+{
+ struct dp *d = (struct dp *) h->_vd;
+
+ /* Default values */
+ d->inverse = 0;
+
+ return 0;
+}
+
+static int dp_destroy(struct hook *h)
+{
+ struct dp *d = (struct dp *) h->_vd;
+
+ /* Release memory */
+ free(d->fharmonics);
+ free(d->coeffs);
+
+ if (d->signal_name)
+ free(d->signal_name);
+
+ return 0;
+}
+
+static int dp_parse(struct hook *h, json_t *cfg)
+{
+ struct dp *d = (struct dp *) h->_vd;
+
+ int ret;
+ json_error_t err;
+ json_t *json_harmonics, *json_harmonic, *json_signal;
+ size_t i;
+
+ double rate = -1, dt = -1;
+
+ ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s: F, s?: F, s?: F, s: o, s?: b }",
+ "signal", &json_signal,
+ "f0", &d->f0,
+ "dt", &dt,
+ "rate", &rate,
+ "harmonics", &json_harmonics,
+ "inverse", &d->inverse
+ );
+ if (ret)
+ jerror(&err, "Failed to parse configuration of hook '%s'", plugin_name(h->_vt));
+
+ if (rate > 0)
+ d->dt = 1. / rate;
+ else if (dt > 0)
+ d->dt = dt;
+ else
+ error("Either on of the settings 'dt' or 'rate' must be gived for hook '%s'", plugin_name(h->_vt));
+
+ if (!json_is_array(json_harmonics))
+ error("Setting 'harmonics' of hook '%s' must be a list of integers", plugin_name(h->_vt));
+
+ switch (json_typeof(json_signal)) {
+ case JSON_STRING:
+ d->signal_name = strdup(json_string_value(json_signal));
+ break;
+
+ case JSON_INTEGER:
+ d->signal_name = NULL;
+ d->signal_index = json_integer_value(json_signal);
+ break;
+
+ default:
+ error("Invalid value for setting 'signal' in hook '%s'", hook_type_name(h->_vt));
+ }
+
+ d->fharmonics_len = json_array_size(json_harmonics);
+ d->fharmonics = alloc(d->fharmonics_len * sizeof(double));
+ d->coeffs = alloc(d->fharmonics_len * sizeof(double complex));
+ if (!d->fharmonics || !d->coeffs)
+ return -1;
+
+ json_array_foreach(json_harmonics, i, json_harmonic) {
+ if (!json_is_integer(json_harmonic))
+ error("Setting 'harmonics' of hook '%s' must be a list of integers", plugin_name(h->_vt));
+
+ d->fharmonics[i] = json_integer_value(json_harmonic);
+ }
+
+ return 0;
+}
+
+static int dp_prepare(struct hook *h)
+{
+ int ret;
+ struct dp *d = (struct dp *) h->_vd;
+
+ char *new_sig_name;
+ struct signal *orig_sig, *new_sig;
+
+ if (d->signal_name) {
+ d->signal_index = vlist_lookup_index(&h->signals, d->signal_name);
+ if (d->signal_index < 0)
+ return -1;
+ }
+
+ if (d->inverse) {
+ /* Remove complex-valued coefficient signals */
+ for (int i = 0; i < d->fharmonics_len; i++) {
+ orig_sig = vlist_at_safe(&h->signals, d->signal_index + i);
+ if (!orig_sig)
+ return -1;
+
+ /** @todo: SIGNAL_TYPE_AUTO is bad here */
+ if (orig_sig->type != SIGNAL_TYPE_COMPLEX && orig_sig->type != SIGNAL_TYPE_AUTO)
+ return -1;
+
+ ret = vlist_remove(&h->signals, d->signal_index + i);
+ if (ret)
+ return -1;
+
+ signal_decref(orig_sig);
+ }
+
+ /* Add new real-valued reconstructed signals */
+ new_sig = signal_create("dp", "idp", SIGNAL_TYPE_FLOAT);
+ if (!new_sig)
+ return -1;
+
+ ret = vlist_insert(&h->signals, d->offset, new_sig);
+ if (ret)
+ return -1;
+ }
+ else {
+ orig_sig = vlist_at_safe(&h->signals, d->signal_index);
+ if (!orig_sig)
+ return -1;
+
+ /** @todo: SIGNAL_TYPE_AUTO is bad here */
+ if (orig_sig->type != SIGNAL_TYPE_FLOAT && orig_sig->type != SIGNAL_TYPE_AUTO)
+ return -1;
+
+ ret = vlist_remove(&h->signals, d->signal_index);
+ if (ret)
+ return -1;
+
+ for (int i = 0; i < d->fharmonics_len; i++) {
+ new_sig_name = strf("%s_harm%d", orig_sig->name, i);
+
+ new_sig = signal_create(new_sig_name, orig_sig->unit, SIGNAL_TYPE_COMPLEX);
+ if (!new_sig)
+ return -1;
+
+ ret = vlist_insert(&h->signals, d->offset + i, new_sig);
+ if (ret)
+ return -1;
+ }
+
+ signal_decref(orig_sig);
+ }
+
+ return 0;
+}
+
+static int dp_process(struct hook *h, struct sample *smps[], unsigned *cnt)
+{
+ struct dp *d = (struct dp *) h->_vd;
+
+ for (unsigned j = 0; j < *cnt; j++) {
+ struct sample *smp = smps[j];
+
+ if (d->signal_index > smp->length)
+ continue;
+
+ if (d->inverse) {
+ double signal;
+ float complex *coeffs = &smp->data[d->signal_index].z;
+
+ dp_istep(d, coeffs, &signal);
+
+ sample_data_remove(smp, d->signal_index, d->fharmonics_len);
+ sample_data_insert(smp, (union signal_data *) &signal, d->offset, 1);
+ }
+ else {
+ double signal = smp->data[d->signal_index].f;
+ float complex coeffs[d->fharmonics_len];
+
+ dp_step(d, &signal, coeffs);
+
+ sample_data_remove(smp, d->signal_index, 1);
+ sample_data_insert(smp, (union signal_data *) coeffs, d->offset, d->fharmonics_len);
+ }
+
+ smp->signals = &h->signals;
+ }
+
+ d->t += d->dt;
+
+ return 0;
+}
+
+static struct plugin p = {
+ .name = "dp",
+ .description = "Transform to/from dynamic phasor domain",
+ .type = PLUGIN_TYPE_HOOK,
+ .hook = {
+ .flags = HOOK_PATH | HOOK_NODE_READ | HOOK_NODE_WRITE,
+ .priority = 99,
+ .init = dp_init,
+ .init_signals = dp_prepare,
+ .destroy = dp_destroy,
+ .start = dp_start,
+ .stop = dp_stop,
+ .parse = dp_parse,
+ .process = dp_process,
+ .size = sizeof(struct dp)
+ }
+};
+
+REGISTER_PLUGIN(&p)
+
+/** @} */
diff --git a/lib/hooks/print.c b/lib/hooks/print.c
index ce930ff00..a88a541db 100644
--- a/lib/hooks/print.c
+++ b/lib/hooks/print.c
@@ -56,18 +56,7 @@ static int print_start(struct hook *h)
struct print *p = (struct print *) h->_vd;
int ret;
- struct vlist *signals;
-
- if (h->node)
- signals = &h->node->in.signals;
- else if (h->path)
- signals = &h->path->signals;
- else
- signals = NULL;
-
- ret = signals
- ? io_init(&p->io, p->format, signals, SAMPLE_HAS_ALL)
- : io_init_auto(&p->io, p->format, DEFAULT_SAMPLE_LENGTH, SAMPLE_HAS_ALL);
+ ret = io_init(&p->io, p->format, &h->signals, SAMPLE_HAS_ALL);
if (ret)
return ret;
diff --git a/lib/hooks/scale.c b/lib/hooks/scale.c
index 343b54f49..3b8b7e59f 100644
--- a/lib/hooks/scale.c
+++ b/lib/hooks/scale.c
@@ -31,63 +31,105 @@
#include
struct scale {
+ char *signal_name;
+ int signal_index;
+
double scale;
double offset;
};
static int scale_init(struct hook *h)
{
- struct scale *p = (struct scale *) h->_vd;
+ struct scale *s = (struct scale *) h->_vd;
- p->scale = 1;
- p->offset = 0;
+ s->scale = 1;
+ s->offset = 0;
+
+ return 0;
+}
+
+static int scale_prepare(struct hook *h)
+{
+ struct scale *s = (struct scale *) h->_vd;
+
+ if (s->signal_name) {
+ s->signal_index = vlist_lookup_index(&h->signals, s->signal_name);
+ if (s->signal_index < 0)
+ return -1;
+ }
+
+ return 0;
+}
+
+static int scale_destroy(struct hook *h)
+{
+ struct scale *s = (struct scale *) h->_vd;
+
+ if (s->signal_name)
+ free(s->signal_name);
return 0;
}
static int scale_parse(struct hook *h, json_t *cfg)
{
- struct scale *p = (struct scale *) h->_vd;
+ struct scale *s = (struct scale *) h->_vd;
int ret;
+ json_t *json_signal;
json_error_t err;
- ret = json_unpack_ex(cfg, &err, 0, "{ s?: F, s?: F }",
- "scale", &p->scale,
- "offset", &p->offset
+ ret = json_unpack_ex(cfg, &err, 0, "{ s?: F, s?: F, s: o }",
+ "scale", &s->scale,
+ "offset", &s->offset,
+ "signal", &json_signal
);
if (ret)
jerror(&err, "Failed to parse configuration of hook '%s'", hook_type_name(h->_vt));
+ switch (json_typeof(json_signal)) {
+ case JSON_STRING:
+ s->signal_name = strdup(json_string_value(json_signal));
+ break;
+
+ case JSON_INTEGER:
+ s->signal_name = NULL;
+ s->signal_index = json_integer_value(json_signal);
+ break;
+
+ default:
+ error("Invalid value for setting 'signal' in hook '%s'", hook_type_name(h->_vt));
+ }
+
return 0;
}
static int scale_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
- struct scale *p = (struct scale *) h->_vd;
+ struct scale *s = (struct scale *) h->_vd;
for (int i = 0; i < *cnt; i++) {
- for (int k = 0; k < smps[i]->length; k++) {
+ struct sample *smp = smps[i];
+ int k = s->signal_index;
- switch (sample_format(smps[i], k)) {
- case SIGNAL_TYPE_INTEGER:
- smps[i]->data[k].i = smps[i]->data[k].i * p->scale + p->offset;
- break;
+ switch (sample_format(smp, k)) {
+ case SIGNAL_TYPE_INTEGER:
+ smp->data[k].i = smp->data[k].i * s->scale + s->offset;
+ break;
- case SIGNAL_TYPE_FLOAT:
- smps[i]->data[k].f = smps[i]->data[k].f * p->scale + p->offset;
- break;
+ case SIGNAL_TYPE_FLOAT:
+ smp->data[k].f = smp->data[k].f * s->scale + s->offset;
+ break;
- case SIGNAL_TYPE_COMPLEX:
- smps[i]->data[k].z = smps[i]->data[k].z * p->scale + p->offset;
- break;
+ case SIGNAL_TYPE_COMPLEX:
+ smp->data[k].z = smp->data[k].z * s->scale + s->offset;
+ break;
- case SIGNAL_TYPE_BOOLEAN:
- smps[i]->data[k].b = smps[i]->data[k].b * p->scale + p->offset;
- break;
+ case SIGNAL_TYPE_BOOLEAN:
+ smp->data[k].b = smp->data[k].b * s->scale + s->offset;
+ break;
- default: { }
- }
+ default: { }
}
}
@@ -102,6 +144,8 @@ static struct plugin p = {
.flags = HOOK_PATH,
.priority = 99,
.init = scale_init,
+ .init_signals = scale_prepare,
+ .destroy = scale_destroy,
.parse = scale_parse,
.process = scale_process,
.size = sizeof(struct scale)
diff --git a/lib/io.c b/lib/io.c
index 740d08cb8..75bc916e9 100644
--- a/lib/io.c
+++ b/lib/io.c
@@ -115,7 +115,6 @@ int io_init_auto(struct io *io, const struct format_type *fmt, int len, int flag
struct vlist *signals;
signals = alloc(sizeof(struct vlist));
-
signals->state = STATE_DESTROYED;
ret = vlist_init(signals);
diff --git a/lib/mapping.c b/lib/mapping.c
index 5c89fd4ee..5108471e7 100644
--- a/lib/mapping.c
+++ b/lib/mapping.c
@@ -32,14 +32,14 @@
int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *nodes)
{
- char *cpy, *node, *type, *field, *end;
+ char *cpy, *node, *type, *field, *end, *lasts;
cpy = strdup(str);
if (!cpy)
return -1;
if (nodes) {
- node = strtok(cpy, ".");
+ node = strtok_r(cpy, ".", &lasts);
if (!node) {
warning("Missing node name");
goto invalid_format;
@@ -51,14 +51,14 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
goto invalid_format;
}
- type = strtok(NULL, ".[");
+ type = strtok_r(NULL, ".[", &lasts);
if (!type)
type = "data";
}
else {
me->node = NULL;
- type = strtok(cpy, ".[");
+ type = strtok_r(cpy, ".[", &lasts);
if (!type)
goto invalid_format;
}
@@ -67,11 +67,11 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
me->type = MAPPING_TYPE_STATS;
me->length = 1;
- char *metric = strtok(NULL, ".");
+ char *metric = strtok_r(NULL, ".", &lasts);
if (!metric)
goto invalid_format;
- type = strtok(NULL, ".");
+ type = strtok_r(NULL, ".", &lasts);
if (!type)
goto invalid_format;
@@ -87,7 +87,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
me->type = MAPPING_TYPE_HEADER;
me->length = 1;
- field = strtok(NULL, ".");
+ field = strtok_r(NULL, ".", &lasts);
if (!field) {
warning("Missing header type");
goto invalid_format;
@@ -106,7 +106,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
me->type = MAPPING_TYPE_TIMESTAMP;
me->length = 2;
- field = strtok(NULL, ".");
+ field = strtok_r(NULL, ".", &lasts);
if (!field) {
warning("Missing timestamp type");
goto invalid_format;
@@ -127,7 +127,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
me->type = MAPPING_TYPE_DATA;
- first_str = strtok(NULL, "-]");
+ first_str = strtok_r(NULL, "-]", &lasts);
if (first_str) {
if (me->node)
first = vlist_lookup_index(&me->node->in.signals, first_str);
@@ -144,11 +144,11 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
else {
/* Map all signals */
me->data.offset = 0;
- me->length = me->node ? vlist_length(&me->node->in.signals) : 0;
+ me->length = -1;
goto end;
}
- last_str = strtok(NULL, "]");
+ last_str = strtok_r(NULL, "]", &lasts);
if (last_str) {
if (me->node)
last = vlist_lookup_index(&me->node->in.signals, last_str);
@@ -175,7 +175,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
goto invalid_format;
end: /* Check that there is no garbage at the end */
- end = strtok(NULL, "");
+ end = strtok_r(NULL, "", &lasts);
if (end)
goto invalid_format;
@@ -201,9 +201,9 @@ int mapping_parse(struct mapping_entry *me, json_t *cfg, struct vlist *nodes)
return mapping_parse_str(me, str, nodes);
}
-int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes)
+int mapping_list_parse(struct vlist *ml, json_t *cfg, struct vlist *nodes)
{
- int ret, off;
+ int ret;
size_t i;
json_t *json_entry;
@@ -218,7 +218,6 @@ int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes)
else
return -1;
- off = 0;
json_array_foreach(json_mapping, i, json_entry) {
struct mapping_entry *me = (struct mapping_entry *) alloc(sizeof(struct mapping_entry));
@@ -226,10 +225,7 @@ int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes)
if (ret)
goto out;
- me->offset = off;
- off += me->length;
-
- vlist_push(l, me);
+ vlist_push(ml, me);
}
ret = 0;
@@ -239,22 +235,16 @@ out: json_decref(json_mapping);
return ret;
}
-int mapping_update(const struct mapping_entry *me, struct sample *remapped, const struct sample *original, const struct stats *s)
+int mapping_update(const struct mapping_entry *me, struct sample *remapped, const struct sample *original)
{
- int len = me->length;
- int off = me->offset;
-
- /* me->length == 0 means that we want to take all values */
- if (!len)
- len = original->length;
-
- if (len + off > remapped->capacity)
+ if (me->length + me->offset > remapped->capacity)
return -1;
switch (me->type) {
- case MAPPING_TYPE_STATS:
- remapped->data[off++] = stats_get_value(s, me->stats.metric, me->stats.type);
+ case MAPPING_TYPE_STATS: {
+ remapped->data[me->offset] = stats_get_value(me->node->stats, me->stats.metric, me->stats.type);
break;
+ }
case MAPPING_TYPE_TIMESTAMP: {
const struct timespec *ts;
@@ -270,8 +260,8 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
return -1;
}
- remapped->data[off++].i = ts->tv_sec;
- remapped->data[off++].i = ts->tv_nsec;
+ remapped->data[me->offset + 0].i = ts->tv_sec;
+ remapped->data[me->offset + 1].i = ts->tv_nsec;
break;
}
@@ -279,11 +269,13 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
case MAPPING_TYPE_HEADER:
switch (me->header.type) {
case MAPPING_HEADER_TYPE_LENGTH:
- remapped->data[off++].i = original->length;
+ remapped->data[me->offset].i = original->length;
break;
+
case MAPPING_HEADER_TYPE_SEQUENCE:
- remapped->data[off++].i = original->sequence;
+ remapped->data[me->offset].i = original->sequence;
break;
+
default:
return -1;
}
@@ -291,11 +283,11 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
break;
case MAPPING_TYPE_DATA:
- for (int j = me->data.offset; j < len + me->data.offset; j++) {
+ for (int j = me->data.offset, i = me->offset; j < me->length + me->data.offset; j++, i++) {
if (j >= original->length)
- remapped->data[off++].f = 0;
+ remapped->data[i].f = -1;
else
- remapped->data[off++] = original->data[j];
+ remapped->data[i] = original->data[j];
}
break;
@@ -304,14 +296,14 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
return 0;
}
-int mapping_remap(const struct vlist *m, struct sample *remapped, const struct sample *original, const struct stats *s)
+int mapping_list_remap(const struct vlist *ml, struct sample *remapped, const struct sample *original)
{
int ret;
- for (size_t i = 0; i < vlist_length(m); i++) {
- struct mapping_entry *me = (struct mapping_entry *) vlist_at(m, i);
+ for (size_t i = 0; i < vlist_length(ml); i++) {
+ struct mapping_entry *me = (struct mapping_entry *) vlist_at(ml, i);
- ret = mapping_update(me, remapped, original, s);
+ ret = mapping_update(me, remapped, original);
if (ret)
return ret;
}
@@ -319,6 +311,24 @@ int mapping_remap(const struct vlist *m, struct sample *remapped, const struct s
return 0;
}
+int mapping_list_prepare(struct vlist *ml)
+{
+ for (size_t i = 0, off = 0; i < vlist_length(ml); i++) {
+ struct mapping_entry *me = (struct mapping_entry *) vlist_at(ml, i);
+
+ if (me->length < 0) {
+ struct vlist *sigs = node_get_signals(me->node, NODE_DIR_IN);
+
+ me->length = vlist_length(sigs);
+ }
+
+ me->offset = off;
+ off += me->length;
+ }
+
+ return 0;
+}
+
int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str)
{
const char *type;
diff --git a/lib/node.c b/lib/node.c
index cec2e5b3a..b1f94f6ec 100644
--- a/lib/node.c
+++ b/lib/node.c
@@ -41,177 +41,6 @@
#include
#endif /* WITH_NETEM */
-static int node_direction_init2(struct node_direction *nd, struct node *n)
-{
-#ifdef WITH_HOOKS
- int ret;
- int m = nd == &n->out
- ? HOOK_NODE_WRITE
- : HOOK_NODE_READ;
-
- /* Add internal hooks if they are not already in the list */
- ret = hook_init_builtin_list(&nd->hooks, nd->builtin, m, NULL, n);
- if (ret)
- return ret;
-
- /* We sort the hooks according to their priority before starting the path */
- vlist_sort(&nd->hooks, hook_cmp_priority);
-#endif /* WITH_HOOKS */
-
- return 0;
-}
-
-static int node_direction_init(struct node_direction *nd, struct node *n)
-{
- int ret;
-
- nd->enabled = 1;
- nd->vectorize = 1;
- nd->builtin = 1;
-
- nd->hooks.state = STATE_DESTROYED;
- nd->signals.state = STATE_DESTROYED;
-
-#ifdef WITH_HOOKS
- ret = vlist_init(&nd->hooks);
- if (ret)
- return ret;
-#endif /* WITH_HOOKS */
-
- ret = vlist_init(&nd->signals);
- if (ret)
- return ret;
-
- return 0;
-}
-
-static int node_direction_destroy(struct node_direction *nd, struct node *n)
-{
- int ret = 0;
-
-#ifdef WITH_HOOKS
- ret = vlist_destroy(&nd->hooks, (dtor_cb_t) hook_destroy, true);
- if (ret)
- return ret;
-#endif /* WITH_HOOKS */
-
- ret = vlist_destroy(&nd->signals, (dtor_cb_t) signal_decref, false);
- if (ret)
- return ret;
-
- return ret;
-}
-
-static int node_direction_parse(struct node_direction *nd, struct node *n, json_t *cfg)
-{
- int ret;
-
- json_error_t err;
- json_t *json_hooks = NULL;
- json_t *json_signals = NULL;
-
- nd->cfg = cfg;
-
- ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b, s?: b }",
- "hooks", &json_hooks,
- "signals", &json_signals,
- "vectorize", &nd->vectorize,
- "builtin", &nd->builtin,
- "enabled", &nd->enabled
- );
- if (ret)
- jerror(&err, "Failed to parse node %s", node_name(n));
-
- if (n->_vt->flags & NODE_TYPE_PROVIDES_SIGNALS) {
- if (json_signals)
- error("Node %s does not support signal definitions", node_name(n));
- }
- else if (json_is_array(json_signals)) {
- ret = signal_list_parse(&nd->signals, json_signals);
- if (ret)
- error("Failed to parse signal definition of node %s", node_name(n));
- }
- else {
- int count = DEFAULT_SAMPLE_LENGTH;
- const char *type_str = "float";
-
- if (json_is_object(json_signals)) {
- json_unpack_ex(json_signals, &err, 0, "{ s: i, s: s }",
- "count", &count,
- "type", &type_str
- );
- }
- else
- warning("No signal definition found for node %s. Using the default config of %d floating point signals.", node_name(n), DEFAULT_SAMPLE_LENGTH);
-
- int type = signal_type_from_str(type_str);
- if (type < 0)
- error("Invalid signal type %s", type_str);
-
- signal_list_generate(&nd->signals, count, type);
- }
-
-#ifdef WITH_HOOKS
- int m = nd == &n->out
- ? HOOK_NODE_WRITE
- : HOOK_NODE_READ;
-
- if (json_hooks) {
- ret = hook_parse_list(&nd->hooks, json_hooks, m, NULL, n);
- if (ret < 0)
- return ret;
- }
-#endif /* WITH_HOOKS */
-
- return 0;
-}
-
-static int node_direction_check(struct node_direction *nd, struct node *n)
-{
- if (nd->vectorize <= 0)
- error("Invalid setting 'vectorize' with value %d for node %s. Must be natural number!", nd->vectorize, node_name(n));
-
- if (node_type(n)->vectorize && node_type(n)->vectorize < nd->vectorize)
- error("Invalid value for setting 'vectorize'. Node type requires a number smaller than %d!",
- node_type(n)->vectorize);
-
- return 0;
-}
-
-static int node_direction_start(struct node_direction *nd, struct node *n)
-{
-#ifdef WITH_HOOKS
- int ret;
-
- for (size_t i = 0; i < vlist_length(&nd->hooks); i++) {
- struct hook *h = (struct hook *) vlist_at(&nd->hooks, i);
-
- ret = hook_start(h);
- if (ret)
- return ret;
- }
-#endif /* WITH_HOOKS */
-
- return 0;
-}
-
-static int node_direction_stop(struct node_direction *nd, struct node *n)
-{
-#ifdef WITH_HOOKS
- int ret;
-
- for (size_t i = 0; i < vlist_length(&nd->hooks); i++) {
- struct hook *h = (struct hook *) vlist_at(&nd->hooks, i);
-
- ret = hook_stop(h);
- if (ret)
- return ret;
- }
-#endif /* WITH_HOOKS */
-
- return 0;
-}
-
int node_init(struct node *n, struct node_type *vt)
{
int ret;
@@ -224,6 +53,7 @@ int node_init(struct node *n, struct node_type *vt)
n->name = NULL;
n->_name = NULL;
n->_name_long = NULL;
+ n->enabled = 1;
#ifdef __linux__
n->fwmark = -1;
@@ -235,11 +65,11 @@ int node_init(struct node *n, struct node_type *vt)
#endif /* WITH_NETEM */
/* Default values */
- ret = node_direction_init(&n->in, n);
+ ret = node_direction_init(&n->in, NODE_DIR_IN, n);
if (ret)
return ret;
- ret = node_direction_init(&n->out, n);
+ ret = node_direction_init(&n->out, NODE_DIR_OUT, n);
if (ret)
return ret;
@@ -254,20 +84,22 @@ int node_init(struct node *n, struct node_type *vt)
return 0;
}
-int node_init2(struct node *n)
+int node_prepare(struct node *n)
{
int ret;
assert(n->state == STATE_CHECKED);
- ret = node_direction_init2(&n->in, n);
+ ret = node_direction_prepare(&n->in, n);
if (ret)
return ret;
- ret = node_direction_init2(&n->out, n);
+ ret = node_direction_prepare(&n->out, n);
if (ret)
return ret;
+ n->state = STATE_PREPARED;
+
return 0;
}
@@ -284,8 +116,9 @@ int node_parse(struct node *n, json_t *json, const char *name)
n->name = strdup(name);
- ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o } }",
+ ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: b, s?: { s?: o } }",
"type", &type,
+ "enabled", &n->enabled,
"in",
"signals", &json_signals
);
@@ -389,7 +222,7 @@ int node_start(struct node *n)
{
int ret;
- assert(n->state == STATE_CHECKED);
+ assert(n->state == STATE_PREPARED);
assert(node_type(n)->state == STATE_STARTED);
info("Starting node %s", node_name_long(n));
@@ -498,9 +331,8 @@ int node_restart(struct node *n)
info("Restarting node %s", node_name(n));
- if (node_type(n)->restart) {
+ if (node_type(n)->restart)
ret = node_type(n)->restart(n);
- }
else {
ret = node_type(n)->stop ? node_type(n)->stop(n) : 0;
if (ret)
@@ -533,7 +365,7 @@ int node_destroy(struct node *n)
return ret;
}
- vlist_remove(&node_type(n)->instances, n);
+ vlist_remove_all(&node_type(n)->instances, n);
if (n->_vd)
free(n->_vd);
@@ -586,7 +418,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
#ifdef WITH_HOOKS
/* Run read hooks */
- int rread = hook_process_list(&n->in.hooks, smps, nread);
+ int rread = hook_list_process(&n->in.hooks, smps, nread);
int skipped = nread - rread;
if (skipped > 0 && n->stats != NULL) {
@@ -616,7 +448,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
#ifdef WITH_HOOKS
/* Run write hooks */
- cnt = hook_process_list(&n->out.hooks, smps, cnt);
+ cnt = hook_list_process(&n->out.hooks, smps, cnt);
if (cnt <= 0)
return cnt;
#endif /* WITH_HOOKS */
@@ -647,7 +479,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
char * node_name(struct node *n)
{
if (!n->_name)
- strcatf(&n->_name, CLR_RED("%s") "(" CLR_YEL("%s") ")", n->name, node_type_name(n->_vt));
+ strcatf(&n->_name, CLR_RED("%s") "(" CLR_YEL("%s") ")", n->name, node_type_name(node_type(n)));
return n->_name;
}
@@ -704,19 +536,12 @@ int node_netem_fds(struct node *n, int fds[])
return node_type(n)->netem_fds ? node_type(n)->netem_fds(n, fds) : -1;
}
-struct node_type * node_type(struct node *n)
-{
- assert(n->state != STATE_DESTROYED);
-
- return n->_vt;
-}
-
struct memory_type * node_memory_type(struct node *n, struct memory_type *parent)
{
return node_type(n)->memory_type ? node_type(n)->memory_type(n, parent) : &memory_hugepage;
}
-int node_parse_list(struct vlist *list, json_t *cfg, struct vlist *all)
+int node_list_parse(struct vlist *list, json_t *cfg, struct vlist *all)
{
struct node *node;
const char *str;
@@ -771,14 +596,26 @@ invalid2:
return 0;
}
-int node_is_valid_name(const char *name)
+bool node_is_valid_name(const char *name)
{
for (const char *p = name; *p; p++) {
if (isalnum(*p) || (*p == '_') || (*p == '-'))
continue;
- return -1;
+ return false;
}
- return 0;
+ return true;
+}
+
+bool node_is_enabled(const struct node *n)
+{
+ return n->enabled;
+}
+
+struct vlist * node_get_signals(struct node *n, enum node_dir dir)
+{
+ struct node_direction *nd = dir == NODE_DIR_IN ? &n->in : &n->out;
+
+ return node_direction_get_signals(nd);
}
diff --git a/lib/node_direction.c b/lib/node_direction.c
new file mode 100644
index 000000000..6d9361021
--- /dev/null
+++ b/lib/node_direction.c
@@ -0,0 +1,233 @@
+
+/** Node direction
+ *
+ * @author Steffen Vogel
+ * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+#include
+#include
+#include
+#include
+
+int node_direction_prepare(struct node_direction *nd, struct node *n)
+{
+ assert(nd->state == STATE_CHECKED);
+
+#ifdef WITH_HOOKS
+ int ret;
+ int t = nd->direction == NODE_DIR_OUT ? HOOK_NODE_WRITE : HOOK_NODE_READ;
+ int m = nd->builtin ? t | HOOK_BUILTIN : 0;
+
+ ret = hook_list_prepare(&nd->hooks, &nd->signals, m, NULL, n);
+ if (ret)
+ return ret;
+#endif /* WITH_HOOKS */
+
+ nd->state = STATE_PREPARED;
+
+ return 0;
+}
+
+int node_direction_init(struct node_direction *nd, enum node_dir dir, struct node *n)
+{
+ int ret;
+
+ assert(nd->state == STATE_DESTROYED);
+
+ nd->direction = dir;
+ nd->enabled = 1;
+ nd->vectorize = 1;
+ nd->builtin = 1;
+
+ nd->hooks.state = STATE_DESTROYED;
+ nd->signals.state = STATE_DESTROYED;
+
+#ifdef WITH_HOOKS
+ ret = hook_list_init(&nd->hooks);
+ if (ret)
+ return ret;
+#endif /* WITH_HOOKS */
+
+ ret = signal_list_init(&nd->signals);
+ if (ret)
+ return ret;
+
+ nd->state = STATE_INITIALIZED;
+
+ return 0;
+}
+
+int node_direction_destroy(struct node_direction *nd, struct node *n)
+{
+ int ret = 0;
+
+ assert(nd->state != STATE_DESTROYED && nd->state != STATE_STARTED);
+
+#ifdef WITH_HOOKS
+ ret = hook_list_destroy(&nd->hooks);
+ if (ret)
+ return ret;
+#endif /* WITH_HOOKS */
+
+ ret = signal_list_destroy(&nd->signals);
+ if (ret)
+ return ret;
+
+ nd->state = STATE_DESTROYED;
+
+ return 0;
+}
+
+int node_direction_parse(struct node_direction *nd, struct node *n, json_t *cfg)
+{
+ int ret;
+
+ assert(nd->state == STATE_INITIALIZED);
+
+ json_error_t err;
+ json_t *json_hooks = NULL;
+ json_t *json_signals = NULL;
+
+ nd->cfg = cfg;
+
+ ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b, s?: b }",
+ "hooks", &json_hooks,
+ "signals", &json_signals,
+ "vectorize", &nd->vectorize,
+ "builtin", &nd->builtin,
+ "enabled", &nd->enabled
+ );
+ if (ret)
+ jerror(&err, "Failed to parse node %s", node_name(n));
+
+ if (n->_vt->flags & NODE_TYPE_PROVIDES_SIGNALS) {
+ if (json_signals)
+ error("Node %s does not support signal definitions", node_name(n));
+ }
+ else if (json_is_array(json_signals)) {
+ ret = signal_list_parse(&nd->signals, json_signals);
+ if (ret)
+ error("Failed to parse signal definition of node %s", node_name(n));
+ }
+ else {
+ int count = DEFAULT_SAMPLE_LENGTH;
+ const char *type_str = "float";
+
+ if (json_is_object(json_signals)) {
+ json_unpack_ex(json_signals, &err, 0, "{ s: i, s: s }",
+ "count", &count,
+ "type", &type_str
+ );
+ }
+ else
+ warning("No signal definition found for node %s. Using the default config of %d floating point signals.", node_name(n), DEFAULT_SAMPLE_LENGTH);
+
+ int type = signal_type_from_str(type_str);
+ if (type < 0)
+ error("Invalid signal type %s", type_str);
+
+ signal_list_generate(&nd->signals, count, type);
+ }
+
+#ifdef WITH_HOOKS
+ if (json_hooks) {
+ int m = nd->direction == NODE_DIR_OUT ? HOOK_NODE_WRITE : HOOK_NODE_READ;
+
+ ret = hook_list_parse(&nd->hooks, json_hooks, m, NULL, n);
+ if (ret < 0)
+ return ret;
+ }
+#endif /* WITH_HOOKS */
+
+ nd->state = STATE_PARSED;
+
+ return 0;
+}
+
+int node_direction_check(struct node_direction *nd, struct node *n)
+{
+ assert(nd->state == STATE_PARSED);
+
+ if (nd->vectorize <= 0)
+ error("Invalid setting 'vectorize' with value %d for node %s. Must be natural number!", nd->vectorize, node_name(n));
+
+ if (node_type(n)->vectorize && node_type(n)->vectorize < nd->vectorize)
+ error("Invalid value for setting 'vectorize'. Node type requires a number smaller than %d!",
+ node_type(n)->vectorize);
+
+ nd->state = STATE_CHECKED;
+
+ return 0;
+}
+
+int node_direction_start(struct node_direction *nd, struct node *n)
+{
+ assert(nd->state == STATE_PREPARED);
+
+#ifdef WITH_HOOKS
+ int ret;
+
+ for (size_t i = 0; i < vlist_length(&nd->hooks); i++) {
+ struct hook *h = (struct hook *) vlist_at(&nd->hooks, i);
+
+ ret = hook_start(h);
+ if (ret)
+ return ret;
+ }
+#endif /* WITH_HOOKS */
+
+ nd->state = STATE_STARTED;
+
+ return 0;
+}
+
+int node_direction_stop(struct node_direction *nd, struct node *n)
+{
+ assert(nd->state == STATE_STARTED);
+
+#ifdef WITH_HOOKS
+ int ret;
+
+ for (size_t i = 0; i < vlist_length(&nd->hooks); i++) {
+ struct hook *h = (struct hook *) vlist_at(&nd->hooks, i);
+
+ ret = hook_stop(h);
+ if (ret)
+ return ret;
+ }
+#endif /* WITH_HOOKS */
+
+ nd->state = STATE_STOPPED;
+
+ return 0;
+}
+
+struct vlist * node_direction_get_signals(struct node_direction *nd)
+{
+#ifdef WITH_HOOKS
+ assert(nd->state == STATE_PREPARED);
+
+ struct hook *h = vlist_last(&nd->hooks);
+
+ return &h->signals;
+#else
+ return &nd->signals;
+#endif
+}
diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c
index 063542b57..a1cf20ae3 100644
--- a/lib/nodes/infiniband.c
+++ b/lib/nodes/infiniband.c
@@ -174,8 +174,7 @@ int ib_parse(struct node *n, json_t *cfg)
struct infiniband *ib = (struct infiniband *) n->_vd;
int ret;
- char *local = NULL;
- char *remote = NULL;
+ char *local = NULL, *remote = NULL, *lasts;
const char *transport_mode = "RC";
int timeout = 1000;
int recv_cq_size = 128;
@@ -255,8 +254,8 @@ int ib_parse(struct node *n, json_t *cfg)
debug(LOG_IB | 4, "Set buffer subtraction to %i in node %s", buffer_subtraction, node_name(n));
// Translate IP:PORT to a struct addrinfo
- char* ip_adr = strtok(local, ":");
- char* port = strtok(NULL, ":");
+ char* ip_adr = strtok_r(local, ":", &lasts);
+ char* port = strtok_r(NULL, ":", &lasts);
ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.src_addr);
if (ret)
@@ -327,8 +326,8 @@ int ib_parse(struct node *n, json_t *cfg)
// If node will send data, set remote address
if (ib->is_source) {
// Translate address info
- char* ip_adr = strtok(remote, ":");
- char* port = strtok(NULL, ":");
+ char *ip_adr = strtok_r(remote, ":", &lasts);
+ char *port = strtok_r(NULL, ":", &lasts);
ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.dst_addr);
if (ret)
diff --git a/lib/nodes/influxdb.c b/lib/nodes/influxdb.c
index 2b22c3b63..cf57192d3 100644
--- a/lib/nodes/influxdb.c
+++ b/lib/nodes/influxdb.c
@@ -40,7 +40,7 @@ int influxdb_parse(struct node *n, json_t *json)
json_error_t err;
int ret;
- char *tmp, *host, *port;
+ char *tmp, *host, *port, *lasts;
const char *server, *key;
ret = json_unpack_ex(json, &err, 0, "{ s: s, s: s, s?: o }",
@@ -52,8 +52,8 @@ int influxdb_parse(struct node *n, json_t *json)
tmp = strdup(server);
- host = strtok(tmp, ":");
- port = strtok(NULL, "");
+ host = strtok_r(tmp, ":", &lasts);
+ port = strtok_r(NULL, "", &lasts);
i->key = strdup(key);
i->host = strdup(host);
diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c
index 9d8fb2c28..f56b1f610 100644
--- a/lib/nodes/socket.c
+++ b/lib/nodes/socket.c
@@ -626,9 +626,10 @@ int socket_parse_address(const char *addr, struct sockaddr *saddr, enum socket_l
#ifdef WITH_SOCKET_LAYER_ETH
else if (layer == SOCKET_LAYER_ETH) { /* Format: "ab:cd:ef:12:34:56%ifname:protocol" */
/* Split string */
- char *node = strtok(copy, "%");
- char *ifname = strtok(NULL, ":");
- char *proto = strtok(NULL, "\0");
+ char *lasts;
+ char *node = strtok_r(copy, "%", &lasts);
+ char *ifname = strtok_r(NULL, ":", &lasts);
+ char *proto = strtok_r(NULL, "\0", &lasts);
/* Parse link layer (MAC) address */
struct ether_addr *mac = ether_aton(node);
@@ -659,8 +660,9 @@ int socket_parse_address(const char *addr, struct sockaddr *saddr, enum socket_l
};
/* Split string */
- char *node = strtok(copy, ":");
- char *service = strtok(NULL, "\0");
+ char *lasts;
+ char *node = strtok_r(copy, ":", &lasts);
+ char *service = strtok_r(NULL, "\0", &lasts);
if (node && !strcmp(node, "*"))
node = NULL;
diff --git a/lib/nodes/stats.c b/lib/nodes/stats.c
index 540c33abe..e60455758 100644
--- a/lib/nodes/stats.c
+++ b/lib/nodes/stats.c
@@ -51,7 +51,7 @@ int stats_node_signal_parse(struct stats_node_signal *s, json_t *cfg)
int ret;
const char *stats;
- char *metric, *type, *node, *cpy;
+ char *metric, *type, *node, *cpy, *lasts;
ret = json_unpack_ex(cfg, &err, 0, "{ s: s }",
"stats", &stats
@@ -61,15 +61,15 @@ int stats_node_signal_parse(struct stats_node_signal *s, json_t *cfg)
cpy = strdup(stats);
- node = strtok(cpy, ".");
+ node = strtok_r(cpy, ".", &lasts);
if (!node)
goto invalid_format;
- metric = strtok(NULL, ".");
+ metric = strtok_r(NULL, ".", &lasts);
if (!metric)
goto invalid_format;
- type = strtok(NULL, ".");
+ type = strtok_r(NULL, ".", &lasts);
if (!type)
goto invalid_format;
diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c
index 308e7e16d..9a17ca0c6 100644
--- a/lib/nodes/websocket.c
+++ b/lib/nodes/websocket.c
@@ -203,7 +203,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
*/
/* Get path of incoming request */
- char *node, *format;
+ char *node, *format, *lasts;
char uri[64];
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
@@ -213,14 +213,14 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
return -1;
}
- node = strtok(uri, "/.");
+ node = strtok_r(uri, "/.", &lasts);
if (!node) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
warning("Failed to tokenize request URI");
return -1;
}
- format = strtok(NULL, "");
+ format = strtok_r(NULL, "", &lasts);
if (!format)
format = "villas.web";
@@ -267,7 +267,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
}
if (connections.state == STATE_INITIALIZED)
- vlist_remove(&connections, c);
+ vlist_remove_all(&connections, c);
if (c->state == WEBSOCKET_CONNECTION_STATE_INITIALIZED)
websocket_connection_destroy(c);
diff --git a/lib/path.c b/lib/path.c
index 97528c938..767fd2abf 100644
--- a/lib/path.c
+++ b/lib/path.c
@@ -29,7 +29,6 @@
#include
#include
-#include
#include
#include
#include
@@ -39,221 +38,9 @@
#include
#include
#include
-
-/* Forward declaration */
-static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt);
-
-static int path_source_init(struct path_source *ps)
-{
- int ret;
- int pool_size = MAX(DEFAULT_QUEUE_LENGTH, ps->node->in.vectorize);
-
- if (ps->node->_vt->pool_size)
- pool_size = ps->node->_vt->pool_size;
-
- ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(vlist_length(&ps->node->in.signals)), node_memory_type(ps->node, &memory_hugepage));
- if (ret)
- return ret;
-
- return 0;
-}
-
-static int path_source_destroy(struct path_source *ps)
-{
- int ret;
-
- ret = pool_destroy(&ps->pool);
- if (ret)
- return ret;
-
- ret = vlist_destroy(&ps->mappings, NULL, true);
- if (ret)
- return ret;
-
- return 0;
-}
-
-static int path_source_read(struct path_source *ps, struct path *p, int i)
-{
- int recv, tomux, allocated, cnt, toenqueue, enqueued = 0;
- unsigned release;
-
- cnt = ps->node->in.vectorize;
-
- struct sample *read_smps[cnt];
- struct sample *muxed_smps[cnt];
- struct sample **tomux_smps;
-
- /* Fill smps[] free sample blocks from the pool */
- allocated = sample_alloc_many(&ps->pool, read_smps, cnt);
- if (allocated != cnt)
- warning("Pool underrun for path source %s", node_name(ps->node));
-
- /* Read ready samples and store them to blocks pointed by smps[] */
- release = allocated;
-
- recv = node_read(ps->node, read_smps, allocated, &release);
- if (recv == 0) {
- enqueued = 0;
- goto out2;
- }
- else if (recv < 0) {
- if (ps->node->state == STATE_STOPPING) {
- p->state = STATE_STOPPING;
-
- enqueued = -1;
- goto out2;
- }
- else
- error("Failed to read samples from node %s", node_name(ps->node));
- }
- else if (recv < allocated)
- warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated);
-
- bitset_set(&p->received, i);
-
- if (p->mode == PATH_MODE_ANY) { /* Mux all samples */
- tomux_smps = read_smps;
- tomux = recv;
- }
- else { /* Mux only last sample and discard others */
- tomux_smps = read_smps + recv - 1;
- tomux = 1;
- }
-
- for (int i = 0; i < tomux; i++) {
- muxed_smps[i] = i == 0
- ? sample_clone(p->last_sample)
- : sample_clone(muxed_smps[i-1]);
-
- if (p->original_sequence_no)
- muxed_smps[i]->sequence = tomux_smps[i]->sequence;
- else {
- muxed_smps[i]->sequence = p->last_sequence++;
- muxed_smps[i]->flags |= SAMPLE_HAS_SEQUENCE;
- }
-
- muxed_smps[i]->ts = tomux_smps[i]->ts;
- muxed_smps[i]->flags |= tomux_smps[i]->flags & (SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_TS_RECEIVED);
-
- mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL);
- }
-
- sample_copy(p->last_sample, muxed_smps[tomux-1]);
-
- debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received));
-
-#ifdef WITH_HOOKS
- toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux);
- if (toenqueue != tomux) {
- int skipped = tomux - toenqueue;
-
- debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p));
- }
-#else
- toenqueue = tomux;
-#endif
-
- if (bitset_test(&p->mask, i)) {
- /* Check if we received an update from all nodes/ */
- if ((p->mode == PATH_MODE_ANY) ||
- (p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) {
- path_destination_enqueue(p, muxed_smps, toenqueue);
-
- /* Reset bitset of updated nodes */
- bitset_clear_all(&p->received);
-
- enqueued = toenqueue;
- }
- }
-
- sample_decref_many(muxed_smps, tomux);
-out2: sample_decref_many(read_smps, release);
-
- return enqueued;
-}
-
-static int path_destination_init(struct path_destination *pd, int queuelen)
-{
- int ret;
-
- ret = queue_init(&pd->queue, queuelen, &memory_hugepage);
- if (ret)
- return ret;
-
- return 0;
-}
-
-static int path_destination_destroy(struct path_destination *pd)
-{
- int ret;
-
- ret = queue_destroy(&pd->queue);
- if (ret)
- return ret;
-
- return 0;
-}
-
-static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt)
-{
- unsigned enqueued, cloned;
-
- struct sample *clones[cnt];
-
- cloned = sample_clone_many(clones, smps, cnt);
- if (cloned < cnt)
- warning("Pool underrun in path %s", path_name(p));
-
- for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
- struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
-
- enqueued = queue_push_many(&pd->queue, (void **) clones, cloned);
- if (enqueued != cnt)
- warning("Queue overrun for path %s", path_name(p));
-
- /* Increase reference counter of these samples as they are now also owned by the queue. */
- sample_incref_many(clones, cloned);
-
- debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p));
- }
-
- sample_decref_many(clones, cloned);
-}
-
-static void path_destination_write(struct path_destination *pd, struct path *p)
-{
- int cnt = pd->node->out.vectorize;
- int sent;
- int released;
- int allocated;
- unsigned release;
-
- struct sample *smps[cnt];
-
- /* As long as there are still samples in the queue */
- while (1) {
- allocated = queue_pull_many(&pd->queue, (void **) smps, cnt);
- if (allocated == 0)
- break;
- else if (allocated < cnt)
- debug(LOG_PATH | 5, "Queue underrun for path %s: allocated=%u expected=%u", path_name(p), allocated, cnt);
-
- debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", allocated, node_name(pd->node), path_name(p));
-
- release = allocated;
-
- sent = node_write(pd->node, smps, allocated, &release);
- if (sent < 0)
- error("Failed to sent %u samples to node %s: reason=%d", cnt, node_name(pd->node), sent);
- else if (sent < allocated)
- warning("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated);
-
- released = sample_decref_many(smps, release);
-
- debug(LOG_PATH | 15, "Released %d samples back to memory pool", released);
- }
-}
+#include
+#include
+#include
static void * path_run_single(void *arg)
{
@@ -333,11 +120,11 @@ int path_init(struct path *p)
if (ret)
return ret;
- ret = vlist_init(&p->signals);
+ ret = signal_list_init(&p->signals);
if (ret)
return ret;
- ret = vlist_init(&p->hooks);
+ ret = hook_list_init(&p->hooks);
if (ret)
return ret;
@@ -359,7 +146,7 @@ int path_init(struct path *p)
return 0;
}
-int path_init_poll(struct path *p)
+static int path_prepare_poll(struct path *p)
{
int fds[16], ret, n = 0, m;
@@ -404,15 +191,17 @@ int path_init_poll(struct path *p)
return 0;
}
-int path_init2(struct path *p)
+int path_prepare(struct path *p)
{
int ret;
assert(p->state == STATE_CHECKED);
#ifdef WITH_HOOKS
+ int m = p->builtin ? HOOK_PATH | HOOK_BUILTIN : 0;
+
/* Add internal hooks if they are not already in the list */
- ret = hook_init_builtin_list(&p->hooks, p->builtin, HOOK_PATH, p, NULL);
+ ret = hook_list_prepare(&p->hooks, &p->signals, m, p, NULL);
if (ret)
return ret;
@@ -427,10 +216,10 @@ int path_init2(struct path *p)
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
- if (pd->node->_vt->pool_size > pool_size)
- pool_size = pd->node->_vt->pool_size;
+ if (node_type(pd->node)->pool_size > pool_size)
+ pool_size = node_type(pd->node)->pool_size;
- if (pd->node->_vt->memory_type)
+ if (node_type(pd->node)->memory_type)
pool_mt = node_memory_type(pd->node, &memory_hugepage);
ret = path_destination_init(pd, p->queuelen);
@@ -452,19 +241,21 @@ int path_init2(struct path *p)
if (ps->masked)
bitset_set(&p->mask, i);
+ ret = mapping_list_prepare(&ps->mappings);
+ if (ret)
+ return ret;
+
for (size_t i = 0; i < vlist_length(&ps->mappings); i++) {
struct mapping_entry *me = (struct mapping_entry *) vlist_at(&ps->mappings, i);
+ struct vlist *sigs = node_get_signals(me->node, NODE_DIR_IN);
- int off = me->offset;
- int len = me->length;
-
- for (int j = 0; j < len; j++) {
+ for (int j = 0; j < me->length; j++) {
struct signal *sig;
/* For data mappings we simple refer to the existing
* signal descriptors of the source node. */
if (me->type == MAPPING_TYPE_DATA) {
- sig = (struct signal *) vlist_at_safe(&me->node->in.signals, me->data.offset + j);
+ sig = (struct signal *) vlist_at_safe(sigs, me->data.offset + j);
if (!sig) {
warning("Failed to create signal description for path %s", path_name(p));
continue;
@@ -481,8 +272,8 @@ int path_init2(struct path *p)
return -1;
}
- vlist_extend(&p->signals, off + j + 1, NULL);
- vlist_set(&p->signals, off + j, sig);
+ vlist_extend(&p->signals, me->offset + j + 1, NULL);
+ vlist_set(&p->signals, me->offset + j, sig);
}
}
}
@@ -493,11 +284,13 @@ int path_init2(struct path *p)
/* Prepare poll() */
if (p->poll) {
- ret = path_init_poll(p);
+ ret = path_prepare_poll(p);
if (ret)
return ret;
}
+ p->state = STATE_PREPARED;
+
return 0;
}
@@ -537,7 +330,7 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
jerror(&err, "Failed to parse path configuration");
/* Input node(s) */
- ret = mapping_parse_list(&sources, json_in, nodes);
+ ret = mapping_list_parse(&sources, json_in, nodes);
if (ret)
error("Failed to parse input mapping of path %s", path_name(p));
@@ -553,7 +346,7 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
/* Output node(s) */
if (json_out) {
- ret = node_parse_list(&destinations, json_out, nodes);
+ ret = node_list_parse(&destinations, json_out, nodes);
if (ret)
jerror(&err, "Failed to parse output nodes");
}
@@ -586,6 +379,9 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
vlist_push(&p->sources, ps);
}
+ if (!node_is_enabled(ps->node))
+ error("Source %s of path %s is not enabled", node_name(ps->node), path_name(p));
+
vlist_push(&ps->mappings, me);
}
@@ -596,6 +392,9 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
pd->node = n;
+ if (!node_is_enabled(pd->node))
+ error("Destination %s of path %s is not enabled", node_name(pd->node), path_name(p));
+
vlist_push(&p->destinations, pd);
}
@@ -646,7 +445,7 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
#ifdef WITH_HOOKS
if (json_hooks) {
- ret = hook_parse_list(&p->hooks, json_hooks, HOOK_PATH, p, NULL);
+ ret = hook_list_parse(&p->hooks, json_hooks, HOOK_PATH, p, NULL);
if (ret)
return ret;
}
@@ -689,7 +488,7 @@ int path_check(struct path *p)
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
struct path_source *ps = (struct path_source *) vlist_at(&p->sources, i);
- if (!ps->node->_vt->poll_fds)
+ if (!node_type(ps->node)->poll_fds)
error("Node %s can not be used in polling mode with path %s", node_name(ps->node), path_name(p));
}
}
@@ -707,14 +506,14 @@ int path_check(struct path *p)
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
struct path_source *ps = (struct path_source *) vlist_at(&p->sources, i);
- if (!ps->node->_vt->read)
+ if (!node_type(ps->node)->read)
error("Node %s is not supported as a source for path %s", node_name(ps->node), path_name(p));
}
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
- if (!pd->node->_vt->write)
+ if (!node_type(pd->node)->write)
error("Destiation node %s is not supported as a sink for path %s", node_name(pd->node), path_name(p));
}
@@ -733,7 +532,7 @@ int path_start(struct path *p)
int ret;
char *mode, *mask;
- assert(p->state == STATE_CHECKED);
+ assert(p->state == STATE_PREPARED);
switch (p->mode) {
case PATH_MODE_ANY: mode = "any"; break;
@@ -750,8 +549,8 @@ int path_start(struct path *p)
p->poll ? "yes" : "no",
mask,
p->rate,
- p->enabled ? "yes" : "no",
- p->reverse ? "yes" : "no",
+ path_is_enabled(p) ? "yes" : "no",
+ path_is_reversed(p) ? "yes" : "no",
p->queuelen,
vlist_length(&p->hooks),
vlist_length(&p->sources),
@@ -846,15 +645,27 @@ int path_stop(struct path *p)
int path_destroy(struct path *p)
{
+ int ret;
+
if (p->state == STATE_DESTROYED)
return 0;
#ifdef WITH_HOOKS
- vlist_destroy(&p->hooks, (dtor_cb_t) hook_destroy, true);
+ ret = hook_list_destroy(&p->hooks);
+ if (ret)
+ return ret;
#endif
- vlist_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true);
- vlist_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true);
- vlist_destroy(&p->signals, (dtor_cb_t) signal_decref, false);
+ ret = signal_list_destroy(&p->signals);
+ if (ret)
+ return ret;
+
+ ret = vlist_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true);
+ if (ret)
+ return ret;
+
+ ret = vlist_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true);
+ if (ret)
+ return ret;
if (p->reader.pfds)
free(p->reader.pfds);
@@ -916,22 +727,37 @@ int path_uses_node(struct path *p, struct node *n)
return -1;
}
-int path_is_simple(struct path *p)
+bool path_is_simple(const struct path *p)
{
int ret;
const char *in = NULL, *out = NULL;
ret = json_unpack(p->cfg, "{ s: s, s: s }", "in", &in, "out", &out);
if (ret)
- return ret;
+ return false;
ret = node_is_valid_name(in);
- if (ret)
- return ret;
+ if (!ret)
+ return false;
ret = node_is_valid_name(out);
- if (ret)
- return ret;
+ if (!ret)
+ return false;
- return 0;
+ return true;
+}
+
+bool path_is_enabled(const struct path *p)
+{
+ return p->enabled;
+}
+
+bool path_is_reversed(const struct path *p)
+{
+ return p->reverse;
+}
+
+struct vlist * path_get_signals(struct path *p)
+{
+ return &p->signals;
}
diff --git a/lib/path_destination.c b/lib/path_destination.c
new file mode 100644
index 000000000..fa0d2c032
--- /dev/null
+++ b/lib/path_destination.c
@@ -0,0 +1,110 @@
+/** Path destination
+ *
+ * @author Steffen Vogel
+ * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+int path_destination_init(struct path_destination *pd, int queuelen)
+{
+ int ret;
+
+ ret = queue_init(&pd->queue, queuelen, &memory_hugepage);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+int path_destination_destroy(struct path_destination *pd)
+{
+ int ret;
+
+ ret = queue_destroy(&pd->queue);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt)
+{
+ unsigned enqueued, cloned;
+
+ struct sample *clones[cnt];
+
+ cloned = sample_clone_many(clones, smps, cnt);
+ if (cloned < cnt)
+ warning("Pool underrun in path %s", path_name(p));
+
+ for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
+ struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
+
+ enqueued = queue_push_many(&pd->queue, (void **) clones, cloned);
+ if (enqueued != cnt)
+ warning("Queue overrun for path %s", path_name(p));
+
+ /* Increase reference counter of these samples as they are now also owned by the queue. */
+ sample_incref_many(clones, cloned);
+
+ debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p));
+ }
+
+ sample_decref_many(clones, cloned);
+}
+
+void path_destination_write(struct path_destination *pd, struct path *p)
+{
+ int cnt = pd->node->out.vectorize;
+ int sent;
+ int released;
+ int allocated;
+ unsigned release;
+
+ struct sample *smps[cnt];
+
+ /* As long as there are still samples in the queue */
+ while (1) {
+ allocated = queue_pull_many(&pd->queue, (void **) smps, cnt);
+ if (allocated == 0)
+ break;
+ else if (allocated < cnt)
+ debug(LOG_PATH | 5, "Queue underrun for path %s: allocated=%u expected=%u", path_name(p), allocated, cnt);
+
+ debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", allocated, node_name(pd->node), path_name(p));
+
+ release = allocated;
+
+ sent = node_write(pd->node, smps, allocated, &release);
+ if (sent < 0)
+ error("Failed to sent %u samples to node %s: reason=%d", cnt, node_name(pd->node), sent);
+ else if (sent < allocated)
+ warning("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated);
+
+ released = sample_decref_many(smps, release);
+
+ debug(LOG_PATH | 15, "Released %d samples back to memory pool", released);
+ }
+}
diff --git a/lib/path_source.c b/lib/path_source.c
new file mode 100644
index 000000000..21bd21f36
--- /dev/null
+++ b/lib/path_source.c
@@ -0,0 +1,159 @@
+/** Path source
+ *
+ * @author Steffen Vogel
+ * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+int path_source_init(struct path_source *ps)
+{
+ int ret;
+ int pool_size = MAX(DEFAULT_QUEUE_LENGTH, ps->node->in.vectorize);
+
+ if (ps->node->_vt->pool_size)
+ pool_size = ps->node->_vt->pool_size;
+
+ ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(vlist_length(&ps->node->in.signals)), node_memory_type(ps->node, &memory_hugepage));
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+int path_source_destroy(struct path_source *ps)
+{
+ int ret;
+
+ ret = pool_destroy(&ps->pool);
+ if (ret)
+ return ret;
+
+ ret = vlist_destroy(&ps->mappings, NULL, true);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+int path_source_read(struct path_source *ps, struct path *p, int i)
+{
+ int recv, tomux, allocated, cnt, toenqueue, enqueued = 0;
+ unsigned release;
+
+ cnt = ps->node->in.vectorize;
+
+ struct sample *read_smps[cnt];
+ struct sample *muxed_smps[cnt];
+ struct sample **tomux_smps;
+
+ /* Fill smps[] free sample blocks from the pool */
+ allocated = sample_alloc_many(&ps->pool, read_smps, cnt);
+ if (allocated != cnt)
+ warning("Pool underrun for path source %s", node_name(ps->node));
+
+ /* Read ready samples and store them to blocks pointed by smps[] */
+ release = allocated;
+
+ recv = node_read(ps->node, read_smps, allocated, &release);
+ if (recv == 0) {
+ enqueued = 0;
+ goto out2;
+ }
+ else if (recv < 0) {
+ if (ps->node->state == STATE_STOPPING) {
+ p->state = STATE_STOPPING;
+
+ enqueued = -1;
+ goto out2;
+ }
+ else
+ error("Failed to read samples from node %s", node_name(ps->node));
+ }
+ else if (recv < allocated)
+ warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated);
+
+ bitset_set(&p->received, i);
+
+ if (p->mode == PATH_MODE_ANY) { /* Mux all samples */
+ tomux_smps = read_smps;
+ tomux = recv;
+ }
+ else { /* Mux only last sample and discard others */
+ tomux_smps = read_smps + recv - 1;
+ tomux = 1;
+ }
+
+ for (int i = 0; i < tomux; i++) {
+ muxed_smps[i] = i == 0
+ ? sample_clone(p->last_sample)
+ : sample_clone(muxed_smps[i-1]);
+
+ if (p->original_sequence_no)
+ muxed_smps[i]->sequence = tomux_smps[i]->sequence;
+ else {
+ muxed_smps[i]->sequence = p->last_sequence++;
+ muxed_smps[i]->flags |= SAMPLE_HAS_SEQUENCE;
+ }
+
+ muxed_smps[i]->ts = tomux_smps[i]->ts;
+ muxed_smps[i]->flags |= tomux_smps[i]->flags & (SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_TS_RECEIVED);
+
+ mapping_list_remap(&ps->mappings, muxed_smps[i], tomux_smps[i]);
+ }
+
+ sample_copy(p->last_sample, muxed_smps[tomux-1]);
+
+ debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received));
+
+#ifdef WITH_HOOKS
+ toenqueue = hook_list_process(&p->hooks, muxed_smps, tomux);
+ if (toenqueue != tomux) {
+ int skipped = tomux - toenqueue;
+
+ debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p));
+ }
+#else
+ toenqueue = tomux;
+#endif
+
+ if (bitset_test(&p->mask, i)) {
+ /* Check if we received an update from all nodes/ */
+ if ((p->mode == PATH_MODE_ANY) ||
+ (p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) {
+ path_destination_enqueue(p, muxed_smps, toenqueue);
+
+ /* Reset bitset of updated nodes */
+ bitset_clear_all(&p->received);
+
+ enqueued = toenqueue;
+ }
+ }
+
+ sample_decref_many(muxed_smps, tomux);
+out2: sample_decref_many(read_smps, release);
+
+ return enqueued;
+}
diff --git a/lib/sample.c b/lib/sample.c
index afe6ce98f..f18225afb 100644
--- a/lib/sample.c
+++ b/lib/sample.c
@@ -301,3 +301,20 @@ void sample_dump(struct sample *s)
if (s->signals)
signal_list_dump(s->signals, s->data, s->length);
}
+
+void sample_data_insert(struct sample *smp, const union signal_data *src, size_t offset, size_t len)
+{
+ memmove(&smp->data[offset + len], &smp->data[offset], sizeof(smp->data[0]) * (smp->length - offset));
+ memcpy(&smp->data[offset], src, sizeof(smp->data[0]) * len);
+
+ smp->length += len;
+}
+
+void sample_data_remove(struct sample *smp, size_t offset, size_t len)
+{
+ size_t sz = sizeof(smp->data[0]) * len;
+
+ memmove(&smp->data[offset], &smp->data[offset + len], sz);
+
+ smp->length -= len;
+}
diff --git a/lib/signal.c b/lib/signal.c
index 4cd6e89d4..bc8c96060 100644
--- a/lib/signal.c
+++ b/lib/signal.c
@@ -210,6 +210,28 @@ int signal_parse(struct signal *s, json_t *cfg)
/* Signal list */
+int signal_list_init(struct vlist *list)
+{
+ int ret;
+
+ ret = vlist_init(list);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
+int signal_list_destroy(struct vlist *list)
+{
+ int ret;
+
+ ret = vlist_destroy(list, (dtor_cb_t) signal_decref, false);
+ if (ret)
+ return ret;
+
+ return 0;
+}
+
int signal_list_parse(struct vlist *list, json_t *cfg)
{
int ret;
@@ -285,6 +307,22 @@ void signal_list_dump(const struct vlist *list, const union signal_data *data, i
}
}
+int signal_list_copy(struct vlist *dst, const struct vlist *src)
+{
+ assert(src->state == STATE_INITIALIZED);
+ assert(dst->state == STATE_INITIALIZED);
+
+ for (size_t i = 0; i < vlist_length(src); i++) {
+ struct signal *s = (struct signal *) vlist_at_safe(src, i);
+
+ signal_incref(s);
+
+ vlist_push(dst, s);
+ }
+
+ return 0;
+}
+
/* Signal type */
enum signal_type signal_type_from_str(const char *str)
diff --git a/lib/super_node.cpp b/lib/super_node.cpp
index 180636f27..78c241e04 100644
--- a/lib/super_node.cpp
+++ b/lib/super_node.cpp
@@ -218,7 +218,7 @@ int SuperNode::parseJson(json_t *j)
const char *type;
ret = node_is_valid_name(name);
- if (ret)
+ if (!ret)
throw RuntimeError("Invalid name for node: {}", name);
ret = json_unpack_ex(json_node, &err, 0, "{ s: s }", "type", &type);
@@ -266,7 +266,7 @@ parse: path *p = (path *) alloc(sizeof(path));
if (p->reverse) {
/* Only simple paths can be reversed */
ret = path_is_simple(p);
- if (ret)
+ if (!ret)
throw RuntimeError("Complex paths can not be reversed!");
/* Parse a second time with in/out reversed */
@@ -356,18 +356,12 @@ void SuperNode::startNodes()
for (size_t i = 0; i < vlist_length(&nodes); i++) {
auto *n = (struct node *) vlist_at(&nodes, i);
- ret = node_init2(n);
- if (ret)
- throw RuntimeError("Failed to prepare node: {}", node_name(n));
+ if (!node_is_enabled(n))
+ continue;
- int refs = vlist_count(&paths, (cmp_cb_t) path_uses_node, n);
- if (refs > 0) {
- ret = node_start(n);
- if (ret)
- throw RuntimeError("Failed to start node: {}", node_name(n));
- }
- else
- logger->warn("No path is using the node {}. Skipping...", node_name(n));
+ ret = node_start(n);
+ if (ret)
+ throw RuntimeError("Failed to start node: {}", node_name(n));
}
}
@@ -378,21 +372,54 @@ void SuperNode::startPaths()
for (size_t i = 0; i < vlist_length(&paths); i++) {
auto *p = (struct path *) vlist_at(&paths, i);
- if (p->enabled) {
- ret = path_init2(p);
- if (ret)
- throw RuntimeError("Failed to prepare path: {}", path_name(p));
+ if (!path_is_enabled(p))
+ continue;
- ret = path_start(p);
- if (ret)
- throw RuntimeError("Failed to start path: {}", path_name(p));
- }
- else
- logger->warn("Path {} is disabled. Skipping...", path_name(p));
+ ret = path_start(p);
+ if (ret)
+ throw RuntimeError("Failed to start path: {}", path_name(p));
}
}
-void SuperNode::start()
+void SuperNode::prepareNodes()
+{
+ int ret, refs;
+
+ for (size_t i = 0; i < vlist_length(&nodes); i++) {
+ auto *n = (struct node *) vlist_at(&nodes, i);
+
+ refs = vlist_count(&paths, (cmp_cb_t) path_uses_node, n);
+ if (refs <= 0) {
+ logger->warn("No path is using the node {}. Skipping...", node_name(n));
+ n->enabled = false;
+ }
+
+ if (!node_is_enabled(n))
+ continue;
+
+ ret = node_prepare(n);
+ if (ret)
+ throw RuntimeError("Failed to prepare node: {}", node_name(n));
+ }
+}
+
+void SuperNode::preparePaths()
+{
+ int ret;
+
+ for (size_t i = 0; i < vlist_length(&paths); i++) {
+ auto *p = (struct path *) vlist_at(&paths, i);
+
+ if (!path_is_enabled(p))
+ continue;
+
+ ret = path_prepare(p);
+ if (ret)
+ throw RuntimeError("Failed to prepare path: {}", path_name(p));
+ }
+}
+
+void SuperNode::prepare()
{
int ret;
@@ -404,6 +431,18 @@ void SuperNode::start()
kernel::rt::init(priority, affinity);
+ prepareNodes();
+ preparePaths();
+
+ state = STATE_PREPARED;
+}
+
+void SuperNode::start()
+{
+ int ret;
+
+ assert(state == STATE_PREPARED);
+
#ifdef WITH_API
api.start();
#endif
diff --git a/python/setup.py b/python/setup.py
index 9afba3213..f2259cdeb 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -13,7 +13,7 @@ setup(
description = 'Python-support for VILLASnode simulation-data gateway',
license = 'GPL-3.0',
keywords = 'simulation power system real-time villas',
- url = 'https://git.rwth-aachen.de/acs/public/villas/dataprocessing',
+ url = 'https://git.rwth-aachen.de/acs/public/villas/VILLASnode',
packages = [ 'villas.node' ],
long_description = long_description,
long_description_content_type = 'text/markdown',
diff --git a/python/villas/node/sample.py b/python/villas/node/sample.py
new file mode 100644
index 000000000..8a37e09ac
--- /dev/null
+++ b/python/villas/node/sample.py
@@ -0,0 +1,83 @@
+import re
+
+class Timestamp:
+ """Parsing the VILLASnode human-readable timestamp format"""
+
+ def __init__(self, seconds = 0, nanoseconds = None, offset = None, sequence = None):
+ self.seconds = seconds
+ self.nanoseconds = nanoseconds
+ self.offset = offset
+ self.sequence = sequence
+
+ @classmethod
+ def parse(self, ts):
+ m = re.match('(\d+)(?:\.(\d+))?([-+]\d+(?:\.\d+)?(?:e[+-]?\d+)?)?(?:\((\d+)\))?', ts)
+
+ seconds = int(m.group(1)); # Mandatory
+ nanoseconds = int(m.group(2)) if m.group(2) else None
+ offset = float(m.group(3)) if m.group(3) else None
+ sequence = int(m.group(4)) if m.group(4) else None
+
+ return Timestamp(seconds, nanoseconds, offset, sequence)
+
+ def __str__(self):
+ str = "%u" % (self.seconds)
+
+ if self.nanoseconds is not None:
+ str += ".%09u" % self.nanoseconds
+ if self.offset is not None:
+ str += "+%u" % self.offset
+ if self.sequence is not None:
+ str += "(%u)" % self.sequence
+
+ return str
+
+ def __float__(self):
+ sum = float(self.seconds)
+
+ if self.nanoseconds is not None:
+ sum += self.nanoseconds * 1e-9
+ if self.offset is not None:
+ sum += self.offset
+
+ return sum
+
+ def __cmp__(self, other):
+ return cmp(float(self), float(other))
+
+class Sample:
+ """Parsing a VILLASnode sample from a file (not a UDP package!!)"""
+
+ def __init__(self, ts, values):
+ self.ts = ts
+ self.values = values
+
+ @classmethod
+ def parse(self, line):
+ csv = line.split()
+
+ ts = Timestamp.parse(csv[0])
+ vs = [ ]
+
+ for value in csv[1:]:
+ try:
+ v = float(value)
+ except ValueError:
+ value = value.lower()
+ try:
+ v = complex(value)
+ except:
+ if value.endswith('i'):
+ v = complex(value.replace('i', 'j'))
+ else:
+ raise ValueError()
+
+ vs.append(v)
+
+ return Sample(ts, vs)
+
+ def __str__(self):
+ return '%s %s' % (self.ts, " ".join(map(str, self.values)))
+
+ def __cmp__(self, other):
+ return cmp(self.ts, other.ts)
diff --git a/src/villas-hook.cpp b/src/villas-hook.cpp
index 45f9288fd..f4fee472b 100644
--- a/src/villas-hook.cpp
+++ b/src/villas-hook.cpp
@@ -194,6 +194,10 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Failed to parse hook config");
+ ret = hook_prepare(&h, io.signals);
+ if (ret)
+ throw RuntimeError("Failed to prepare hook");
+
ret = hook_start(&h);
if (ret)
throw RuntimeError("Failed to start hook");
@@ -215,7 +219,9 @@ check: if (optarg == endptr)
unsigned send = recv;
- hook_process(&h, smps, (unsigned *) &send);
+ ret = hook_process(&h, smps, (unsigned *) &send);
+ if (ret < 0)
+ throw RuntimeError("Failed to process samples");
sent = io_print(&io, smps, send);
if (sent < 0)
diff --git a/src/villas-node.cpp b/src/villas-node.cpp
index 91568425f..5c5a14e8e 100644
--- a/src/villas-node.cpp
+++ b/src/villas-node.cpp
@@ -184,6 +184,7 @@ int main(int argc, char *argv[])
if (ret)
throw RuntimeError("Failed to verify configuration");
+ sn.prepare();
sn.start();
sn.run();
sn.stop();
diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp
index 51df16d56..0d679656f 100644
--- a/src/villas-pipe.cpp
+++ b/src/villas-pipe.cpp
@@ -395,7 +395,7 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Invalid node configuration");
- ret = node_init2(node);
+ ret = node_prepare(node);
if (ret)
throw RuntimeError("Failed to start node {}: reason={}", node_name(node), ret);
diff --git a/src/villas-signal.cpp b/src/villas-signal.cpp
index 594b2cdb0..0bbd8aeee 100644
--- a/src/villas-signal.cpp
+++ b/src/villas-signal.cpp
@@ -109,7 +109,7 @@ json_t * parse_cli(int argc, char *argv[])
continue;
check: if (optarg == endptr)
- logger->warn("Failed to parse parse option argument '-%c %s'", c, optarg);
+ logger->warn("Failed to parse parse option argument '-{} {}'", c, optarg);
}
if (argc != optind + 1)
@@ -237,7 +237,7 @@ int main(int argc, char *argv[])
if (ret)
throw RuntimeError("Failed to initialize pool");
- ret = node_init2(&n);
+ ret = node_prepare(&n);
if (ret)
throw RuntimeError("Failed to start node {}: reason={}", node_name(&n), ret);
diff --git a/src/villas-test-rtt.cpp b/src/villas-test-rtt.cpp
index 8ae798a2c..8c6c1812f 100644
--- a/src/villas-test-rtt.cpp
+++ b/src/villas-test-rtt.cpp
@@ -167,7 +167,7 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Failed to start node-type {}: reason={}", node_type_name(node->_vt), ret);
- ret = node_init2(node);
+ ret = node_prepare(node);
if (ret)
throw RuntimeError("Failed to start node {}: reason={}", node_name(node), ret);
diff --git a/tests/integration/hook-average.sh b/tests/integration/hook-average.sh
index 33b9d9224..aca7d593a 100755
--- a/tests/integration/hook-average.sh
+++ b/tests/integration/hook-average.sh
@@ -40,26 +40,30 @@ cat < ${INPUT_FILE}
EOF
cat < ${EXPECT_FILE}
-1548104309.033621000(0) -0.488878 0.590769 -1.000000 0.597649 0.100588
-1548104309.133998900(1) -0.492331 0.952914 -1.000000 0.196137 0.200966
-1548104309.233542500(2) -0.486250 0.950063 -1.000000 -0.202037 0.300509
-1548104309.334019400(3) -0.479840 0.582761 -1.000000 -0.603945 0.400986
-1548104309.433952200(4) 0.513039 -0.005774 1.000000 -0.996324 0.500919
-1548104309.533756400(5) 0.524631 -0.591455 1.000000 -0.597107 0.600723
-1548104309.637440300(6) 0.507441 -0.959248 1.000000 -0.182372 0.704407
-1548104309.736158700(7) 0.511616 -0.944805 1.000000 0.212502 0.803126
-1548104309.833614900(8) 0.507615 -0.584824 1.000000 0.602327 0.900582
-1548104309.934288200(9) -0.469575 0.007885 -1.000000 0.994980 0.001255
+# seconds.nanoseconds+offset(sequence) average signal0 signal1 signal2 signal3 signal4
+1548104309.033621000(0) 0.062250 0.022245 0.590769 -1.000000 0.597649 0.100588
+1548104309.133998900(1) 0.073071 0.015339 0.952914 -1.000000 0.196137 0.200966
+1548104309.233542500(2) 0.015207 0.027500 0.950063 -1.000000 -0.202037 0.300509
+1548104309.334019400(3) -0.115976 0.040320 0.582761 -1.000000 -0.603945 0.400986
+1548104309.433952200(4) 0.104980 0.026079 -0.005774 1.000000 -0.996324 0.500919
+1548104309.533756400(5) 0.092285 0.049262 -0.591455 1.000000 -0.597107 0.600723
+1548104309.637440300(6) 0.115534 0.014883 -0.959248 1.000000 -0.182372 0.704407
+1548104309.736158700(7) 0.218811 0.023232 -0.944805 1.000000 0.212502 0.803126
+1548104309.833614900(8) 0.386663 0.015231 -0.584824 1.000000 0.602327 0.900582
+1548104309.934288200(9) 0.012994 0.060849 0.007885 -1.000000 0.994980 0.001255
EOF
# Average over first and third signal (mask = 0b101 = 5)
-villas-hook -o mask=5 -o offset=0 average < ${INPUT_FILE} > ${OUTPUT_FILE}
+villas-hook -o mask=5 -o offset=0 -o signals=0,1,2,3,4 average < ${INPUT_FILE} > ${OUTPUT_FILE}
# Compare only the data values
villas-test-cmp ${OUTPUT_FILE} ${EXPECT_FILE}
-
RC=$?
+cat ${INPUT_FILE}
+echo
+cat ${OUTPUT_FILE}
+
rm -f ${INPUT_FILE} ${OUTPUT_FILE} ${EXPECT_FILE}
exit $RC
diff --git a/tests/integration/hook-cast.sh b/tests/integration/hook-cast.sh
index be186e76b..dd08a522a 100755
--- a/tests/integration/hook-cast.sh
+++ b/tests/integration/hook-cast.sh
@@ -1,6 +1,6 @@
#!/bin/bash
#
-# Integration test for convert hook.
+# Integration test for cast hook.
#
# @author Steffen Vogel
# @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
@@ -22,50 +22,42 @@
# along with this program. If not, see .
##################################################################################
-# We skip this test for now
-echo "Test not yet supported"
-exit 99
-
INPUT_FILE=$(mktemp)
OUTPUT_FILE=$(mktemp)
EXPECT_FILE=$(mktemp)
cat < ${INPUT_FILE}
-1490500399.776379108(0) 0.000000 0.000000 0.000000 0.000000
-1490500399.876379108(1) 0.587785 0.587785 0.587785 0.587785
-1490500399.976379108(2) 0.951057 0.951057 0.951057 0.951057
-1490500400.076379108(3) 0.951057 0.951057 0.951057 0.951057
-1490500400.176379108(4) 0.587785 0.587785 0.587785 0.587785
-1490500400.276379108(5) 0.000000 0.000000 0.000000 0.000000
-1490500400.376379108(6) -0.587785 -0.587785 -0.587785 -0.587785
-1490500400.476379108(7) -0.951057 -0.951057 -0.951057 -0.951057
-1490500400.576379108(8) -0.951057 -0.951057 -0.951057 -0.951057
-1490500400.676379108(9) -0.587785 -0.587785 -0.587785 -0.587785
+# seconds.nanoseconds+offset(sequence) signal0 signal1 signal2 signal3 signal4
+1551015508.801653200(0) 0.022245 0.000000 -1.000000 1.000000 0.000000
+1551015508.901653200(1) 0.015339 58.778500 -1.000000 0.600000 0.100000
+1551015509.001653200(2) 0.027500 95.105700 -1.000000 0.200000 0.200000
+1551015509.101653200(3) 0.040320 95.105700 -1.000000 -0.200000 0.300000
+1551015509.201653200(4) 0.026079 58.778500 -1.000000 -0.600000 0.400000
+1551015509.301653200(5) 0.049262 0.000000 1.000000 -1.000000 0.500000
+1551015509.401653200(6) 0.014883 -58.778500 1.000000 -0.600000 0.600000
+1551015509.501653200(7) 0.023232 -95.105700 1.000000 -0.200000 0.700000
+1551015509.601653200(8) 0.015231 -95.105700 1.000000 0.200000 0.800000
+1551015509.701653200(9) 0.060849 -58.778500 1.000000 0.600000 0.900000
EOF
cat < ${EXPECT_FILE}
-1490500399.776379108(0) 0.000000 0 0 0.000000
-1490500399.876379108(1) 0.587785 58 58 0.587785
-1490500399.976379108(2) 0.951057 95 95 0.951057
-1490500400.076379108(3) 0.951057 95 95 0.951057
-1490500400.176379108(4) 0.587785 58 58 0.587785
-1490500400.276379108(5) 0.000000 0 0 0.000000
-1490500400.376379108(6) -0.587785 -58 -58 -0.587785
-1490500400.476379108(7) -0.951057 -95 -95 -0.951057
-1490500400.576379108(8) -0.951057 -95 -95 -0.951057
-1490500400.676379108(9) -0.587785 -58 -58 -0.587785
+# seconds.nanoseconds+offset(sequence) signal0 test[V] signal2 signal3 signal4
+1551015508.801653200(0) 0.022245 0 -1.000000 1.000000 0.000000
+1551015508.901653200(1) 0.015339 58 -1.000000 0.600000 0.100000
+1551015509.001653200(2) 0.027500 95 -1.000000 0.200000 0.200000
+1551015509.101653200(3) 0.040320 95 -1.000000 -0.200000 0.300000
+1551015509.201653200(4) 0.026079 58 -1.000000 -0.600000 0.400000
+1551015509.301653200(5) 0.049262 0 1.000000 -1.000000 0.500000
+1551015509.401653200(6) 0.014883 -58 1.000000 -0.600000 0.600000
+1551015509.501653200(7) 0.023232 -95 1.000000 -0.200000 0.700000
+1551015509.601653200(8) 0.015231 -95 1.000000 0.200000 0.800000
+1551015509.701653200(9) 0.060849 -58 1.000000 0.600000 0.900000
EOF
-cat ${INPUT_FILE} | \
-villas-hook -o scale=100 scale | \
-villas-hook cast | \
-tee ${OUTPUT_FILE}
-
-cat ${OUTPUT_FILE}
+villas-hook cast -o new_name=test -o new_unit=V -o new_type=integer -o signal=1 < ${INPUT_FILE} > ${OUTPUT_FILE}
# Compare only the data values
villas-test-cmp ${OUTPUT_FILE} ${EXPECT_FILE}
-
RC=$?
rm -f ${INPUT_FILE} ${OUTPUT_FILE} ${EXPECT_FILE}
diff --git a/tests/integration/hook-dp.sh b/tests/integration/hook-dp.sh
new file mode 100755
index 000000000..3b3f9c417
--- /dev/null
+++ b/tests/integration/hook-dp.sh
@@ -0,0 +1,55 @@
+#!/bin/bash
+#
+# Integration test for dp hook.
+#
+# @author Steffen Vogel
+# @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
+# @license GNU General Public License (version 3)
+#
+# VILLASnode
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+##################################################################################
+
+set -x
+
+#INPUT_FILE=$(mktemp)
+#OUTPUT_FILE=$(mktemp)
+#RECON_FILE=$(mktemp)
+
+INPUT_FILE=in
+OUTPUT_FILE=out
+RECON_FILE=recon
+
+NUM_SAMPLES=10000
+RATE=5000
+F0=50
+
+OPTS="-o f0=${F0} -o rate=${RATE} -o signal=0 -o harmonics=0,1,3,5,7"
+
+villas-signal sine -v1 -l ${NUM_SAMPLES} -f ${F0} -r ${RATE} -n > ${INPUT_FILE}
+
+villas-hook dp -o inverse=false ${OPTS} < ${INPUT_FILE} > ${OUTPUT_FILE}
+
+villas-hook dp -o inverse=true ${OPTS} < ${OUTPUT_FILE} > ${RECON_FILE}
+
+exit 0
+
+# Compare only the data values
+villas-test-cmp ${OUTPUT_FILE} ${EXPECT_FILE}
+RC=$?
+
+rm -f ${INPUT_FILE} ${OUTPUT_FILE} ${EXPECT_FILE}
+
+exit $RC
diff --git a/tests/integration/hook-scale.sh b/tests/integration/hook-scale.sh
index 0e222dff8..0518974ce 100755
--- a/tests/integration/hook-scale.sh
+++ b/tests/integration/hook-scale.sh
@@ -27,36 +27,37 @@ OUTPUT_FILE=$(mktemp)
EXPECT_FILE=$(mktemp)
cat < ${INPUT_FILE}
-1490500399.776379108(0) 0.000000 0.000000 0.000000 0.000000
-1490500399.876379108(1) 0.587785 0.587785 0.587785 0.587785
-1490500399.976379108(2) 0.951057 0.951057 0.951057 0.951057
-1490500400.076379108(3) 0.951057 0.951057 0.951057 0.951057
-1490500400.176379108(4) 0.587785 0.587785 0.587785 0.587785
-1490500400.276379108(5) 0.000000 0.000000 0.000000 0.000000
-1490500400.376379108(6) -0.587785 -0.587785 -0.587785 -0.587785
-1490500400.476379108(7) -0.951057 -0.951057 -0.951057 -0.951057
-1490500400.576379108(8) -0.951057 -0.951057 -0.951057 -0.951057
-1490500400.676379108(9) -0.587785 -0.587785 -0.587785 -0.587785
+# seconds.nanoseconds(sequence) random sine square triangle ramp
+1551015508.801653200(0) 0.022245 0.000000 -1.000000 1.000000 0.000000
+1551015508.901653200(1) 0.015339 0.587785 -1.000000 0.600000 0.100000
+1551015509.001653200(2) 0.027500 0.951057 -1.000000 0.200000 0.200000
+1551015509.101653200(3) 0.040320 0.951057 -1.000000 -0.200000 0.300000
+1551015509.201653200(4) 0.026079 0.587785 -1.000000 -0.600000 0.400000
+1551015509.301653200(5) 0.049262 0.000000 1.000000 -1.000000 0.500000
+1551015509.401653200(6) 0.014883 -0.587785 1.000000 -0.600000 0.600000
+1551015509.501653200(7) 0.023232 -0.951057 1.000000 -0.200000 0.700000
+1551015509.601653200(8) 0.015231 -0.951057 1.000000 0.200000 0.800000
+1551015509.701653200(9) 0.060849 -0.587785 1.000000 0.600000 0.900000
EOF
cat < ${EXPECT_FILE}
-1490500399.776379108-1.490500e+09(0) -10.000000 -10.000000 -10.000000 -10.000000
-1490500399.876379108-1.490500e+09(1) -4.122150 -4.122150 -4.122150 -4.122150
-1490500399.976379108-1.490500e+09(2) -0.489430 -0.489430 -0.489430 -0.489430
-1490500400.076379108-1.490500e+09(3) -0.489430 -0.489430 -0.489430 -0.489430
-1490500400.176379108-1.490500e+09(4) -4.122150 -4.122150 -4.122150 -4.122150
-1490500400.276379108-1.490500e+09(5) -10.000000 -10.000000 -10.000000 -10.000000
-1490500400.376379108-1.490500e+09(6) -15.877850 -15.877850 -15.877850 -15.877850
-1490500400.476379108-1.490500e+09(7) -19.510570 -19.510570 -19.510570 -19.510570
-1490500400.576379108-1.490500e+09(8) -19.510570 -19.510570 -19.510570 -19.510570
-1490500400.676379108-1.490500e+09(9) -15.877850 -15.877850 -15.877850 -15.877850
+# seconds.nanoseconds+offset(sequence) signal0 signal1 signal2 signal3 signal4
+1551015508.801653200(0) 0.022245 0.000000 -1.000000 1.000000 55.000000
+1551015508.901653200(1) 0.015339 0.587785 -1.000000 0.600000 65.000000
+1551015509.001653200(2) 0.027500 0.951057 -1.000000 0.200000 75.000000
+1551015509.101653200(3) 0.040320 0.951057 -1.000000 -0.200000 85.000000
+1551015509.201653200(4) 0.026079 0.587785 -1.000000 -0.600000 95.000000
+1551015509.301653200(5) 0.049262 0.000000 1.000000 -1.000000 105.000000
+1551015509.401653200(6) 0.014883 -0.587785 1.000000 -0.600000 115.000000
+1551015509.501653200(7) 0.023232 -0.951057 1.000000 -0.200000 125.000000
+1551015509.601653200(8) 0.015231 -0.951057 1.000000 0.200000 135.000000
+1551015509.701653200(9) 0.060849 -0.587785 1.000000 0.600000 145.000000
EOF
-villas-hook -o scale=10 -o offset=-10 scale < ${INPUT_FILE} > ${OUTPUT_FILE}
+villas-hook scale -o scale=100 -o offset=55 -o signal=signal4 < ${INPUT_FILE} > ${OUTPUT_FILE}
# Compare only the data values
villas-test-cmp ${OUTPUT_FILE} ${EXPECT_FILE}
-
RC=$?
rm -f ${INPUT_FILE} ${OUTPUT_FILE} ${EXPECT_FILE}
diff --git a/tests/unit/mapping.cpp b/tests/unit/mapping.cpp
index 37c92bda0..68995eb9d 100644
--- a/tests/unit/mapping.cpp
+++ b/tests/unit/mapping.cpp
@@ -88,7 +88,7 @@ Test(mapping, parse_nodes)
cr_assert_eq(m.node, vlist_lookup(&nodes, "carrot"));
cr_assert_eq(m.type, MAPPING_TYPE_DATA);
cr_assert_eq(m.data.offset, 0);
- cr_assert_eq(m.length, vlist_length(&m.node->in.signals));
+ cr_assert_eq(m.length, -1);
ret = mapping_parse_str(&m, "carrot.data[sole]", &nodes);
cr_assert_eq(ret, 0);
@@ -151,13 +151,13 @@ Test(mapping, parse)
cr_assert_eq(ret, 0);
cr_assert_eq(m.type, MAPPING_TYPE_DATA);
cr_assert_eq(m.data.offset, 0);
- cr_assert_eq(m.length, 0);
+ cr_assert_eq(m.length, -1);
ret = mapping_parse_str(&m, "data[]", nullptr);
cr_assert_eq(ret, 0);
cr_assert_eq(m.type, MAPPING_TYPE_DATA);
cr_assert_eq(m.data.offset, 0);
- cr_assert_eq(m.length, 0);
+ cr_assert_eq(m.length, -1);
ret = mapping_parse_str(&m, "data[1.1-2f]", nullptr);
cr_assert_neq(ret, 0);
@@ -182,5 +182,5 @@ Test(mapping, parse)
/* Negative length of chunk */
ret = mapping_parse_str(&m, "data[5-3]", nullptr);
- cr_assert_eq(ret, -1);
+ cr_assert_neq(ret, 0);
}