1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'hook-dp' into develop

This commit is contained in:
Steffen Vogel 2019-03-09 00:36:10 +01:00
commit e78302ac38
50 changed files with 2071 additions and 880 deletions

View file

@ -163,7 +163,8 @@ docker:
- docker build
--build-arg BUILDER_IMAGE=${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV}
--file packaging/docker/Dockerfile.app
--tag ${DOCKER_IMAGE}:${DOCKER_TAG_DEV} .
--tag ${DOCKER_IMAGE}:${DOCKER_TAG} .
- docker push ${DOCKER_IMAGE}:${DOCKER_TAG}
- docker push ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV}
tags:
- shell

2
common

@ -1 +1 @@
Subproject commit 27751548785eebaeb160f32b923c7949de4faa5e
Subproject commit 4c9231b18c7f1025f7c56d3b7b7ac0d805b62363

View file

@ -26,6 +26,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* @addtogroup hooks User-defined hook functions
* @ingroup path
@ -35,6 +36,7 @@
#pragma once
#include <villas/hook_type.h>
#include <villas/list.h>
#include <villas/common.h>
#ifdef __cplusplus
@ -44,7 +46,6 @@ extern "C" {
/* Forward declarations */
struct path;
struct sample;
struct vlist;
/** Descriptor for user defined hooks. See hooks[]. */
struct hook {
@ -56,6 +57,8 @@ struct hook {
struct path *path;
struct node *node;
struct vlist signals;
struct hook_type *_vt; /**< C++ like Vtable pointer. */
void *_vd; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */
@ -63,24 +66,38 @@ struct hook {
};
int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node *n);
int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path *p, struct node *n);
int hook_init_signals(struct hook *h, struct vlist *signals);
int hook_parse(struct hook *h, json_t *cfg);
int hook_prepare(struct hook *h, struct vlist *signals);
int hook_destroy(struct hook *h);
int hook_start(struct hook *h);
int hook_stop(struct hook *h);
int hook_periodic(struct hook *h);
int hook_restart(struct hook *h);
int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt);
int hook_process_list(struct vlist *hs, struct sample *smps[], unsigned cnt);
/** Compare two hook functions with their priority. Used by vlist_sort() */
int hook_cmp_priority(const void *a, const void *b);
static inline
struct hook_type * hook_type(struct hook *h)
{
return h->_vt;
}
int hook_list_init(struct vlist *hs);
int hook_list_destroy(struct vlist *hs);
/** Parses an object of hooks
*
* Example:
@ -95,7 +112,15 @@ int hook_cmp_priority(const void *a, const void *b);
* hooks = [ "print" ]
* }
*/
int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *p, struct node *n);
int hook_list_parse(struct vlist *hs, json_t *cfg, int mask, struct path *p, struct node *n);
int hook_list_prepare(struct vlist *hs, struct vlist *sigs, int mask, struct path *p, struct node *n);
int hook_list_prepare_signals(struct vlist *hs, struct vlist *signals);
int hook_list_add(struct vlist *hs, int mask, struct path *p, struct node *n);
int hook_list_process(struct vlist *hs, struct sample *smps[], unsigned cnt);
#ifdef __cplusplus
}

View file

@ -1,6 +1,6 @@
/** Hook funktions
*
* Every path can register a hook function which is called for every received
* Every path or node can register a hook function which is called for every received
* message. This can be used to debug the data flow, get statistics
* or alter the message.
*
@ -62,11 +62,13 @@ struct hook_type {
int (*parse)(struct hook *h, json_t *cfg);
int (*init)(struct hook *h); /**< Called before path is started to parsed. */
int (*init)(struct hook *h); /**< Called before hook is started to parsed. */
int (*destroy)(struct hook *h); /**< Called after path has been stopped to release memory allocated by HOOK_INIT */
int (*start)(struct hook *h); /**< Called whenever a path is started; before threads are created. */
int (*stop)(struct hook *h); /**< Called whenever a path is stopped; after threads are destoyed. */
int (*init_signals)(struct hook *h);
int (*start)(struct hook *h); /**< Called whenever a hook is started; before threads are created. */
int (*stop)(struct hook *h); /**< Called whenever a hook is stopped; after threads are destoyed. */
int (*periodic)(struct hook *h);/**< Called periodically. Period is set by global 'stats' option in the configuration file. */
int (*restart)(struct hook *h); /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */

View file

@ -88,18 +88,20 @@ struct mapping_entry {
};
};
int mapping_remap(const struct vlist *m, struct sample *remapped, const struct sample *original, const struct stats *s);
int mapping_update(const struct mapping_entry *e, struct sample *remapped, const struct sample *original, const struct stats *s);
int mapping_update(const struct mapping_entry *e, struct sample *remapped, const struct sample *original);
int mapping_parse(struct mapping_entry *e, json_t *cfg, struct vlist *nodes);
int mapping_parse_str(struct mapping_entry *e, const char *str, struct vlist *nodes);
int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes);
int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str);
int mapping_list_parse(struct vlist *ml, json_t *cfg, struct vlist *nodes);
int mapping_list_prepare(struct vlist *ml);
int mapping_list_remap(const struct vlist *ml, struct sample *remapped, const struct sample *original);
#ifdef __cplusplus
}
#endif

View file

@ -33,6 +33,7 @@
#include <jansson.h>
#include <villas/node_type.h>
#include <villas/node_direction.h>
#include <villas/sample.h>
#include <villas/list.h>
#include <villas/queue.h>
@ -52,17 +53,6 @@ extern "C" {
struct rtnl_cls;
#endif /* WITH_NETEM */
struct node_direction {
int enabled;
int builtin; /**< This node should use built-in hooks by default. */
int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
struct vlist hooks; /**< List of write hooks (struct hook). */
struct vlist signals; /**< Signal description. */
json_t *cfg; /**< A JSON object containing the configuration of the node. */
};
/** The data structure for a node.
*
* Every entity which exchanges messages is represented by a node.
@ -70,6 +60,7 @@ struct node_direction {
*/
struct node {
char *name; /**< A short identifier of the node, only used for configuration and logging */
int enabled;
enum state state;
@ -103,7 +94,7 @@ struct node {
int node_init(struct node *n, struct node_type *vt);
/** Do initialization after parsing the configuration */
int node_init2(struct node *n);
int node_prepare(struct node *n);
/** Parse settings of a node.
*
@ -123,7 +114,7 @@ int node_parse(struct node *n, json_t *cfg, const char *name);
* @param nodes The nodes will be added to this list.
* @param all This list contains all valid nodes.
*/
int node_parse_list(struct vlist *list, json_t *cfg, struct vlist *all);
int node_list_parse(struct vlist *list, json_t *cfg, struct vlist *all);
/** Parse the list of signal definitions. */
int node_parse_signals(struct vlist *list, json_t *cfg);
@ -199,11 +190,19 @@ int node_poll_fds(struct node *n, int fds[]);
int node_netem_fds(struct node *n, int fds[]);
struct node_type * node_type(struct node *n);
static inline
struct node_type * node_type(struct node *n)
{
return n->_vt;
}
struct memory_type * node_memory_type(struct node *n, struct memory_type *parent);
int node_is_valid_name(const char *name);
bool node_is_valid_name(const char *name);
bool node_is_enabled(const struct node *n);
struct vlist * node_get_signals(struct node *n, enum node_dir dir);
#ifdef __cplusplus
}

View file

@ -0,0 +1,74 @@
/** Node direction
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
/**
* @addtogroup node Node
* @{
*/
#pragma once
#include <jansson.h>
#include <villas/common.h>
#include <villas/list.h>
/* Forward declarations */
struct node;
enum node_dir {
NODE_DIR_IN, /**< VILLASnode is receiving/reading */
NODE_DIR_OUT /**< VILLASnode is sending/writing */
};
struct node_direction {
enum state state;
enum node_dir direction;
int enabled;
int builtin; /**< This node should use built-in hooks by default. */
int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
struct vlist hooks; /**< List of read / write hooks (struct hook). */
struct vlist signals; /**< Signal description. */
json_t *cfg; /**< A JSON object containing the configuration of the node. */
};
int node_direction_init(struct node_direction *nd, enum node_dir dir, struct node *n);
int node_direction_parse(struct node_direction *nd, struct node *n, json_t *cfg);
int node_direction_check(struct node_direction *nd, struct node *n);
int node_direction_prepare(struct node_direction *nd, struct node *n);
int node_direction_start(struct node_direction *nd, struct node *n);
int node_direction_stop(struct node_direction *nd, struct node *n);
int node_direction_destroy(struct node_direction *nd, struct node *n);
struct vlist * node_direction_get_signals(struct node_direction *nd);
/** @} */

View file

@ -49,21 +49,6 @@ extern "C" {
struct stats;
struct node;
struct path_source {
struct node *node;
bool masked;
struct pool pool;
struct vlist mappings; /**< List of mappings (struct mapping_entry). */
};
struct path_destination {
struct node *node;
struct queue queue;
};
/** The register mode determines under which condition the path is triggered. */
enum path_mode {
PATH_MODE_ANY, /**< The path is triggered whenever one of the sources receives samples. */
@ -112,7 +97,7 @@ struct path {
/** Initialize internal data structures. */
int path_init(struct path *p);
int path_init2(struct path *p);
int path_prepare(struct path *p);
/** Check if path configuration is proper. */
int path_check(struct path *p);
@ -172,7 +157,13 @@ int path_uses_node(struct path *p, struct node *n);
*/
int path_parse(struct path *p, json_t *cfg, struct vlist *nodes);
int path_is_simple(struct path *p);
bool path_is_simple(const struct path *p);
bool path_is_enabled(const struct path *p);
bool path_is_reversed(const struct path *p);
struct vlist * path_get_signals(struct path *p);
/** @} */

View file

@ -0,0 +1,60 @@
/** Path destination
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
/** A path connects one input node to multiple output nodes (1-to-n).
*
* @addtogroup path Path
* @{
*/
#pragma once
#include <villas/queue.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Forward declarations */
struct path;
struct sample;
struct path_destination {
struct node *node;
struct queue queue;
};
int path_destination_init(struct path_destination *pd, int queuelen);
int path_destination_destroy(struct path_destination *pd);
void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt);
void path_destination_write(struct path_destination *pd, struct path *p);
#ifdef __cplusplus
}
#endif
/** @} */

View file

@ -0,0 +1,62 @@
/** Message source
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
/** A path connects one input node to multiple output nodes (1-to-n).
*
* @addtogroup path Path
* @{
*/
#pragma once
#include <villas/pool.h>
#include <villas/list.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Forward declarations */
struct path;
struct sample;
struct path_source {
struct node *node;
bool masked;
struct pool pool;
struct vlist mappings; /**< List of mappings (struct mapping_entry). */
};
int path_source_init(struct path_source *ps);
int path_source_destroy(struct path_source *ps);
int path_source_read(struct path_source *ps, struct path *p, int i);
#ifdef __cplusplus
}
#endif
/** @} */

View file

@ -53,7 +53,7 @@ __attribute__((constructor(110))) static void UNIQUE(__ctor)() {\
} \
__attribute__((destructor(110))) static void UNIQUE(__dtor)() { \
if (plugins.state != STATE_DESTROYED) \
vlist_remove(&plugins, p); \
vlist_remove_all(&plugins, p); \
}
extern struct vlist plugins;

View file

@ -136,6 +136,10 @@ int sample_decref_many(struct sample *smps[], int cnt);
enum signal_type sample_format(const struct sample *s, unsigned idx);
void sample_data_insert(struct sample *smp, const union signal_data *src, size_t offset, size_t len);
void sample_data_remove(struct sample *smp, size_t offset, size_t len);
#ifdef __cplusplus
}
#endif

View file

@ -106,11 +106,12 @@ int signal_parse(struct signal *s, json_t *cfg);
/** Initialize signal from a mapping_entry. */
int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, unsigned index);
int signal_list_init(struct vlist *list);
int signal_list_destroy(struct vlist *list);
int signal_list_parse(struct vlist *list, json_t *cfg);
int signal_list_generate(struct vlist *list, unsigned len, enum signal_type fmt);
void signal_list_dump(const struct vlist *list, const union signal_data *data, int len);
int signal_list_copy(struct vlist *dst, const struct vlist *src);
enum signal_type signal_type_from_str(const char *str);

View file

@ -88,10 +88,14 @@ public:
int check();
/** Initialize after parsing the configuration file. */
void prepare();
void start();
void stop();
void run();
void preparePaths();
void prepareNodes();
void startPaths();
void startNodes();
void startNodeTypes();

View file

@ -46,10 +46,13 @@ set(LIB_SRC
memory/managed.c
sample.c
path.c
path_source.c
path_destination.c
node.c
node_type.c
node_direction.c
memory.c
plugin.c
node_type.c
stats.c
mapping.c
shmem.c

View file

@ -165,9 +165,26 @@ int json_to_config(json_t *json, config_setting_t *parent)
}
#endif /* LIBCONFIG_FOUND */
void json_object_extend_key_value_token(json_t *obj, const char *key, const char *value)
{
char *str = strdup(value);
char *delim = ",";
char *lasts;
char *token = strtok_r(str, delim, &lasts);
while (token) {
json_object_extend_key_value(obj, key, token);
token = strtok_r(NULL, delim, &lasts);
}
free(str);
}
void json_object_extend_key_value(json_t *obj, const char *key, const char *value)
{
char *end, *cpy, *key1, *key2;
char *end, *cpy, *key1, *key2, *lasts;
double real;
long integer;
@ -178,8 +195,8 @@ void json_object_extend_key_value(json_t *obj, const char *key, const char *valu
subobj = obj;
cpy = strdup(key);
key1 = strtok(cpy, ".");
key2 = strtok(NULL, ".");
key1 = strtok_r(cpy, ".", &lasts);
key2 = strtok_r(NULL, ".", &lasts);
while (key1 && key2) {
existing = json_object_get(subobj, key1);
@ -193,7 +210,7 @@ void json_object_extend_key_value(json_t *obj, const char *key, const char *valu
}
key1 = key2;
key2 = strtok(NULL, ".");
key2 = strtok_r(NULL, ".", &lasts);
}
/* Try to parse as integer */
@ -254,7 +271,7 @@ json_t * json_load_cli(int argc, const char *argv[])
const char *key = NULL;
const char *value = NULL;
const char *sep;
char *cpy;
char *cpy, *lasts;
json_t *json = json_object();
@ -274,10 +291,10 @@ json_t * json_load_cli(int argc, const char *argv[])
if (sep) {
cpy = strdup(key);
key = strtok(cpy, "=");
value = strtok(NULL, "");
key = strtok_r(cpy, "=", &lasts);
value = strtok_r(NULL, "", &lasts);
json_object_extend_key_value(json, key, value);
json_object_extend_key_value_token(json, key, value);
free(cpy);
key = NULL;
@ -291,7 +308,7 @@ json_t * json_load_cli(int argc, const char *argv[])
value = opt;
json_object_extend_key_value(json, key, value);
json_object_extend_key_value_token(json, key, value);
key = NULL;
}
}
@ -324,17 +341,17 @@ int json_object_extend(json_t *obj, json_t *merge)
int json_object_extend_str(json_t *obj, const char *str)
{
char *key, *value, *cpy;
char *key, *value, *cpy, *lasts;
cpy = strdup(str);
key = strtok(cpy, "=");
value = strtok(NULL, "");
key = strtok_r(cpy, "=", &lasts);
value = strtok_r(NULL, "", &lasts);
if (!key || !value)
return -1;
json_object_extend_key_value(obj, key, value);
json_object_extend_key_value_token(obj, key, value);
free(cpy);

View file

@ -42,14 +42,43 @@ int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node
h->path = p;
h->node = n;
h->_vt = vt;
h->_vd = alloc(vt->size);
h->signals.state = STATE_DESTROYED;
ret = h->_vt->init ? h->_vt->init(h) : 0;
ret = signal_list_init(&h->signals);
if (ret)
return ret;
h->state = STATE_INITIALIZED;
h->_vt = vt;
h->_vd = alloc(vt->size);
ret = hook_type(h)->init ? hook_type(h)->init(h) : 0;
if (ret)
return ret;
// We dont need to parse builtin hooks
h->state = hook_type(h)->flags & HOOK_BUILTIN ? STATE_PARSED : STATE_INITIALIZED;
return 0;
}
int hook_prepare(struct hook *h, struct vlist *signals)
{
int ret;
assert(h->state == STATE_PARSED);
if (!h->enabled)
return 0;
ret = signal_list_copy(&h->signals, signals);
if (ret)
return -1;
ret = hook_type(h)->init_signals ? hook_type(h)->init_signals(h) : 0;
if (ret)
return ret;
h->state = STATE_PREPARED;
return 0;
}
@ -66,9 +95,9 @@ int hook_parse(struct hook *h, json_t *cfg)
"enabled", &h->enabled
);
if (ret)
jerror(&err, "Failed to parse configuration of hook '%s'", hook_type_name(h->_vt));
jerror(&err, "Failed to parse configuration of hook '%s'", hook_type_name(hook_type(h)));
ret = h->_vt->parse ? h->_vt->parse(h, cfg) : 0;
ret = hook_type(h)->parse ? hook_type(h)->parse(h, cfg) : 0;
if (ret)
return ret;
@ -82,9 +111,13 @@ int hook_destroy(struct hook *h)
{
int ret;
assert(h->state != STATE_DESTROYED);
assert(h->state != STATE_DESTROYED && h->state != STATE_STARTED);
ret = h->_vt->destroy ? h->_vt->destroy(h) : 0;
ret = signal_list_destroy(&h->signals);
if (ret)
return ret;
ret = hook_type(h)->destroy ? hook_type(h)->destroy(h) : 0;
if (ret)
return ret;
@ -98,41 +131,53 @@ int hook_destroy(struct hook *h)
int hook_start(struct hook *h)
{
int ret;
assert(h->state == STATE_PREPARED);
if (!h->enabled)
return 0;
if (h->_vt->start) {
debug(LOG_HOOK | 10, "Start hook %s: priority=%d", hook_type_name(h->_vt), h->priority);
debug(LOG_HOOK | 10, "Start hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
return h->_vt->start(h);
}
else
return 0;
ret = hook_type(h)->start ? hook_type(h)->start(h) : 0;
if (ret)
return ret;
h->state = STATE_STARTED;
return 0;
}
int hook_stop(struct hook *h)
{
int ret;
assert(h->state == STATE_STARTED);
if (!h->enabled)
return 0;
if (h->_vt->stop) {
debug(LOG_HOOK | 10, "Stopping hook %s: priority=%d", hook_type_name(h->_vt), h->priority);
debug(LOG_HOOK | 10, "Stopping hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
return h->_vt->stop(h);
}
else
return 0;
ret = hook_type(h)->stop ? hook_type(h)->stop(h) : 0;
if (ret)
return ret;
h->state = STATE_STOPPED;
return 0;
}
int hook_periodic(struct hook *h)
{
assert(h->state == STATE_STARTED);
if (!h->enabled)
return 0;
if (h->_vt->periodic) {
debug(LOG_HOOK | 10, "Periodic hook %s: priority=%d", hook_type_name(h->_vt), h->priority);
if (hook_type(h)->periodic) {
debug(LOG_HOOK | 10, "Periodic hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
return h->_vt->periodic(h);
return hook_type(h)->periodic(h);
}
else
return 0;
@ -140,47 +185,36 @@ int hook_periodic(struct hook *h)
int hook_restart(struct hook *h)
{
int ret;
assert(h->state == STATE_STARTED);
if (!h->enabled)
return 0;
if (h->_vt->restart) {
debug(LOG_HOOK | 10, "Restarting hook %s: priority=%d", hook_type_name(h->_vt), h->priority);
debug(LOG_HOOK | 10, "Restarting hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
return h->_vt->restart(h);
}
else
return 0;
ret = hook_type(h)->restart ? hook_type(h)->restart(h) : 0;
if (ret)
return ret;
return 0;
}
int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
int ret;
assert(h->state == STATE_STARTED);
if (!h->enabled)
return 0;
if (h->_vt->process) {
debug(LOG_HOOK | 10, "Process hook %s: priority=%d, cnt=%d", hook_type_name(h->_vt), h->priority, *cnt);
debug(LOG_HOOK | 10, "Process hook %s: priority=%d, cnt=%d", hook_type_name(hook_type(h)), h->priority, *cnt);
return h->_vt->process(h, smps, cnt);
}
else
return 0;
}
ret = hook_type(h)->process ? hook_type(h)->process(h, smps, cnt) : 0;
if (ret)
return ret;
int hook_process_list(struct vlist *hs, struct sample *smps[], unsigned cnt)
{
unsigned ret;
for (size_t i = 0; i < vlist_length(hs); i++) {
struct hook *h = (struct hook *) vlist_at(hs, i);
ret = hook_process(h, smps, &cnt);
if (ret || !cnt)
/* Abort hook processing if earlier hooks removed all samples
* or they returned something non-zero */
break;
}
return cnt;
return 0;
}
int hook_cmp_priority(const void *a, const void *b)
@ -191,7 +225,29 @@ int hook_cmp_priority(const void *a, const void *b)
return ha->priority - hb->priority;
}
int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *o, struct node *n)
int hook_list_init(struct vlist *hs)
{
int ret;
ret = vlist_init(hs);
if (ret)
return ret;
return 0;
}
int hook_list_destroy(struct vlist *hs)
{
int ret;
ret = vlist_destroy(hs, (dtor_cb_t) hook_destroy, true);
if (ret)
return ret;
return 0;
}
int hook_list_parse(struct vlist *hs, json_t *cfg, int mask, struct path *o, struct node *n)
{
if (!json_is_array(cfg))
error("Hooks must be configured as a list of objects");
@ -225,17 +281,42 @@ int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *o, s
if (ret)
jerror(&err, "Failed to parse hook configuration");
vlist_push(list, h);
vlist_push(hs, h);
}
return 0;
}
int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path *p, struct node *n)
int hook_list_prepare(struct vlist *hs, struct vlist *sigs, int m, struct path *p, struct node *n)
{
int ret;
assert(l->state == STATE_INITIALIZED);
/* Add internal hooks if they are not already in the list */
ret = hook_list_add(hs, m, p, n);
if (ret)
return ret;
/* We sort the hooks according to their priority */
vlist_sort(hs, hook_cmp_priority);
for (size_t i = 0; i < vlist_length(hs); i++) {
struct hook *h = (struct hook *) vlist_at(hs, i);
ret = hook_prepare(h, sigs);
if (ret)
return ret;
sigs = &h->signals;
}
return 0;
}
int hook_list_add(struct vlist *hs, int mask, struct path *p, struct node *n)
{
int ret;
assert(hs->state == STATE_INITIALIZED);
for (size_t i = 0; i < vlist_length(&plugins); i++) {
struct plugin *q = (struct plugin *) vlist_at(&plugins, i);
@ -246,11 +327,7 @@ int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path
if (q->type != PLUGIN_TYPE_HOOK)
continue;
if (builtin &&
vt->flags & HOOK_BUILTIN &&
vt->flags & mask)
{
if ((vt->flags & mask) == mask) {
h = (struct hook *) alloc(sizeof(struct hook));
if (!h)
return -1;
@ -259,7 +336,7 @@ int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path
if (ret)
return ret;
vlist_push(l, h);
vlist_push(hs, h);
}
}
@ -270,3 +347,20 @@ const char * hook_type_name(struct hook_type *vt)
{
return plugin_name(vt);
}
int hook_list_process(struct vlist *hs, struct sample *smps[], unsigned cnt)
{
unsigned ret;
for (size_t i = 0; i < vlist_length(hs); i++) {
struct hook *h = (struct hook *) vlist_at(hs, i);
ret = hook_process(h, smps, &cnt);
if (ret || !cnt)
/* Abort hook processing if earlier hooks removed all samples
* or they returned something non-zero */
break;
}
return cnt;
}

View file

@ -36,6 +36,7 @@ set(HOOK_SRC
cast.c
average.c
dump.c
dp.c
)
if(WITH_IO)

View file

@ -29,43 +29,130 @@
#include <villas/hook.h>
#include <villas/plugin.h>
#include <villas/sample.h>
#include <villas/bitset.h>
struct average {
uint64_t mask;
int offset;
struct bitset mask;
struct vlist signal_names;
};
static int average_init(struct hook *h)
{
int ret;
struct average *a = (struct average *) h->_vd;
ret = vlist_init(&a->signal_names);
if (ret)
return ret;
ret = bitset_init(&a->mask, 128);
if (ret)
return ret;
bitset_clear_all(&a->mask);
return 0;
}
static int average_destroy(struct hook *h)
{
int ret;
struct average *a = (struct average *) h->_vd;
ret = vlist_destroy(&a->signal_names, NULL, true);
if (ret)
return ret;
ret = bitset_destroy(&a->mask);
if (ret)
return ret;
return 0;
}
static int average_prepare(struct hook *h)
{
int ret;
struct average *a = (struct average *) h->_vd;
struct signal *avg_sig;
/* Setup mask */
for (size_t i = 0; i < vlist_length(&a->signal_names); i++) {
char *signal_name = (char *) vlist_at_safe(&a->signal_names, i);
int index = vlist_lookup_index(&a->signal_names, signal_name);
if (index < 0)
return -1;
bitset_set(&a->mask, index);
}
/* Add averaged signal */
avg_sig = signal_create("average", NULL, SIGNAL_TYPE_FLOAT);
if (!avg_sig)
return -1;
ret = vlist_insert(&h->signals, a->offset, avg_sig);
if (ret)
return ret;
return 0;
}
static int average_parse(struct hook *h, json_t *cfg)
{
struct average *p = (struct average *) h->_vd;
struct average *a = (struct average *) h->_vd;
int ret;
size_t i;
json_error_t err;
json_t *json_signals, *json_signal;
ret = json_unpack_ex(cfg, &err, 0, "{ s: i, s: I }",
"offset", &p->offset,
"mask", &p->mask
ret = json_unpack_ex(cfg, &err, 0, "{ s: i, s: o }",
"offset", &a->offset,
"signals", &json_signals
);
if (ret)
jerror(&err, "Failed to parse configuration of hook '%s'", hook_type_name(h->_vt));
if (!json_is_array(json_signals))
error("Setting 'signals' of hook '%s' must be a list of signal names", hook_type_name(h->_vt));
json_array_foreach(json_signals, i, json_signal) {
switch (json_typeof(json_signal)) {
case JSON_STRING:
vlist_push(&a->signal_names, strdup(json_string_value(json_signal)));
break;
case JSON_INTEGER:
bitset_set(&a->mask, json_integer_value(json_signal));
break;
default:
error("Invalid value for setting 'signals' in hook '%s'", hook_type_name(h->_vt));
}
}
return 0;
}
static int average_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct average *p = (struct average *) h->_vd;
struct average *a = (struct average *) h->_vd;
for (int i = 0; i < *cnt; i++) {
struct sample *smp = smps[i];
double sum = 0;
double avg, sum = 0;
int n = 0;
for (int k = 0; k < smp->length; k++) {
if (!(p->mask & (1LL << k)))
if (!bitset_test(&a->mask, k))
continue;
switch (sample_format(smps[i], k)) {
switch (sample_format(smp, k)) {
case SIGNAL_TYPE_INTEGER:
sum += smp->data[k].i;
break;
@ -84,7 +171,9 @@ static int average_process(struct hook *h, struct sample *smps[], unsigned *cnt)
n++;
}
smp->data[p->offset].f = sum / n;
avg = n == 0 ? 0 : sum / n;
sample_data_insert(smp, (union signal_data *) &avg, a->offset, 1);
smp->signals = &h->signals;
}
return 0;
@ -99,6 +188,9 @@ static struct plugin p = {
.priority = 99,
.parse = average_parse,
.process = average_process,
.init = average_init,
.init_signals = average_prepare,
.destroy = average_destroy,
.size = sizeof(struct average)
}
};

View file

@ -29,52 +29,57 @@
#include <villas/hook.h>
#include <villas/plugin.h>
#include <villas/node.h>
#include <villas/path.h>
#include <villas/sample.h>
struct cast {
struct vlist operations;
int signal_index;
char *signal_name;
struct vlist signals;
enum signal_type new_type;
char *new_name;
char *new_unit;
};
static int cast_init(struct hook *h)
static int cast_prepare(struct hook *h)
{
int ret;
struct signal *orig_sig, *new_sig;
struct cast *c = (struct cast *) h->_vd;
struct vlist *orig_signals;
if (h->node)
orig_signals = &h->node->in.signals;
else if (h->path)
orig_signals = &h->path->signals;
else
return -1;
ret = vlist_init(&c->signals);
if (ret)
return ret;
/* Copy original signal list */
for (int i = 0; i < vlist_length(orig_signals); i++) {
struct signal *orig_sig = vlist_at(orig_signals, i);
struct signal *new_sig = signal_copy(orig_sig);
vlist_push(&c->signals, new_sig);
if (c->signal_name) {
c->signal_index = vlist_lookup_index(&h->signals, c->signal_name);
if (c->signal_index < 0)
return -1;
}
char *name, *unit;
enum signal_type type;
orig_sig = vlist_at_safe(&h->signals, c->signal_index);
type = c->new_type != SIGNAL_TYPE_AUTO ? c->new_type : orig_sig->type;
name = c->new_name ? c->new_name : orig_sig->name;
unit = c->new_unit ? c->new_unit : orig_sig->unit;
new_sig = signal_create(name, unit, type);
vlist_set(&h->signals, c->signal_index, new_sig);
signal_decref(orig_sig);
return 0;
}
static int cast_destroy(struct hook *h)
{
int ret;
struct cast *c = (struct cast *) h->_vd;
ret = vlist_destroy(&c->signals, (dtor_cb_t) signal_decref, false);
if (ret)
return ret;
if (c->signal_name)
free(c->signal_name);
if (c->new_name)
free(c->new_name);
if (c->new_unit)
free(c->new_unit);
return 0;
}
@ -83,80 +88,51 @@ static int cast_parse(struct hook *h, json_t *cfg)
{
int ret;
struct cast *c = (struct cast *) h->_vd;
struct signal *sig;
size_t i;
json_t *json_signals;
json_error_t err;
json_t *json_signal;
ret = json_unpack(cfg, "{ s: o }",
"signals", &json_signals
const char *new_name = NULL;
const char *new_unit = NULL;
const char *new_type = NULL;
ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: s, s?: s, s?: s }",
"signal", &json_signal,
"new_type", &new_type,
"new_name", &new_name,
"new_unit", &new_unit
);
if (ret)
return ret;
if (json_is_array(json_signals))
return -1;
switch (json_typeof(json_signal)) {
case JSON_STRING:
c->signal_name = strdup(json_string_value(json_signal));
break;
json_array_foreach(json_signals, i, json_signal) {
int index = -1;
const char *name = NULL;
case JSON_INTEGER:
c->signal_name = NULL;
c->signal_index = json_integer_value(json_signal);
break;
const char *new_name = NULL;
const char *new_unit = NULL;
const char *new_format = NULL;
ret = json_unpack(json_signal, "{ s?: s, s?: i, s?: s, s?: s, s?: s }",
"name", &name,
"index", &index,
"new_format", &new_format,
"new_name", &new_name,
"new_unit", &new_unit
);
if (ret)
return ret;
/* Find matching original signal descriptor */
if (index >= 0 && name != NULL)
return -1;
if (index < 0 && name == NULL)
return -1;
sig = name
? vlist_lookup(&c->signals, name)
: vlist_at_safe(&c->signals, index);
if (!sig)
return -1;
/* Cast to new format */
if (new_format) {
enum signal_type fmt;
fmt = signal_type_from_str(new_format);
if (fmt == SIGNAL_TYPE_INVALID)
return -1;
sig->type = fmt;
}
/* Set new name */
if (new_name) {
if (sig->name)
free(sig->name);
sig->name = strdup(new_name);
}
/* Set new unit */
if (new_unit) {
if (sig->unit)
free(sig->unit);
sig->unit = strdup(new_unit);
}
default:
error("Invalid value for setting 'signal' in hook '%s'", hook_type_name(h->_vt));
}
if (new_type) {
c->new_type = signal_type_from_str(new_type);
if (c->new_type == SIGNAL_TYPE_INVALID)
return -1;
}
else
c->new_type = SIGNAL_TYPE_AUTO; // We use this constant to indicate that we dont want to change the type
if (new_name)
c->new_name = strdup(new_name);
if (new_unit)
c->new_unit = strdup(new_unit);
return 0;
}
@ -167,15 +143,13 @@ static int cast_process(struct hook *h, struct sample *smps[], unsigned *cnt)
for (int i = 0; i < *cnt; i++) {
struct sample *smp = smps[i];
for (int j = 0; j < smp->length; j++) {
struct signal *orig_sig = vlist_at(smp->signals, j);
struct signal *new_sig = vlist_at(&c->signals, j);
struct signal *orig_sig = vlist_at(smp->signals, c->signal_index);
struct signal *new_sig = vlist_at(&h->signals, c->signal_index);
signal_data_cast(&smp->data[j], orig_sig, new_sig);
}
signal_data_cast(&smp->data[c->signal_index], orig_sig, new_sig);
/* Replace signal descriptors of sample */
smp->signals = &c->signals;
smp->signals = &h->signals;
}
return 0;
@ -183,13 +157,13 @@ static int cast_process(struct hook *h, struct sample *smps[], unsigned *cnt)
static struct plugin p = {
.name = "cast",
.description = "Cast signals",
.description = "Cast signals types",
.type = PLUGIN_TYPE_HOOK,
.hook = {
.flags = HOOK_NODE_READ | HOOK_PATH,
.priority = 99,
.init = cast_init,
.destroy = cast_destroy,
.init_signals = cast_prepare,
.parse = cast_parse,
.process = cast_process,
.size = sizeof(struct cast)

341
lib/hooks/dp.c Normal file
View file

@ -0,0 +1,341 @@
/** Dynamic Phasor Interface Algorithm hook.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
/** @addtogroup hooks Hook functions
* @{
*/
#include <math.h>
#include <complex.h>
#include <string.h>
#include <villas/hook.h>
#include <villas/plugin.h>
#include <villas/sample.h>
#include <villas/window.h>
#define J _Complex_I
struct dp {
char *signal_name;
int signal_index;
int offset;
int inverse;
double f0;
double dt;
double t;
double complex *coeffs;
int *fharmonics;
int fharmonics_len;
struct window window;
};
static void dp_step(struct dp *d, double *in, float complex *out)
{
int n = d->window.steps;
double r = 0.9999999999;
double complex om, corr;
double newest = *in;
double oldest = window_update(&d->window, newest);
for (int i = 0; i < d->fharmonics_len; i++) {
om = 2.0 * M_PI * J * d->fharmonics[i] / n;
/* Recursive update */
//d->coeffs[i] = cexp(om) * (d->coeffs[i] + (newest - oldest));
d->coeffs[i] = d->coeffs[i] * r * cexp(om) - powf(r, n) * oldest + newest;
/* Correction for stationary phasor */
corr = cexp(-om * (d->t - (d->window.steps + 1)));
out[i] = (2.0 / d->window.steps) * (d->coeffs[i] * corr);
/* DC component */
if (d->fharmonics[i] == 0)
out[i] /= 2.0;
}
}
static void dp_istep(struct dp *d, complex float *in, double *out)
{
double complex value = 0;
/* Reconstruct the original signal */
for (int i = 0; i < d->fharmonics_len; i++) {
double freq = d->fharmonics[i];
double complex coeff = in[i];
value += coeff * cexp(2.0 * M_PI * freq * d->t);
}
*out = creal(value);
}
static int dp_start(struct hook *h)
{
int ret;
struct dp *d = (struct dp *) h->_vd;
d->t = 0;
for (int i = 0; i < d->fharmonics_len; i++)
d->coeffs[i] = 0;
ret = window_init(&d->window, (1.0 / d->f0) / d->dt, 0.0);
if (ret)
return ret;
return 0;
}
static int dp_stop(struct hook *h)
{
int ret;
struct dp *d = (struct dp *) h->_vd;
ret = window_destroy(&d->window);
if (ret)
return ret;
return 0;
}
static int dp_init(struct hook *h)
{
struct dp *d = (struct dp *) h->_vd;
/* Default values */
d->inverse = 0;
return 0;
}
static int dp_destroy(struct hook *h)
{
struct dp *d = (struct dp *) h->_vd;
/* Release memory */
free(d->fharmonics);
free(d->coeffs);
if (d->signal_name)
free(d->signal_name);
return 0;
}
static int dp_parse(struct hook *h, json_t *cfg)
{
struct dp *d = (struct dp *) h->_vd;
int ret;
json_error_t err;
json_t *json_harmonics, *json_harmonic, *json_signal;
size_t i;
double rate = -1, dt = -1;
ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s: F, s?: F, s?: F, s: o, s?: b }",
"signal", &json_signal,
"f0", &d->f0,
"dt", &dt,
"rate", &rate,
"harmonics", &json_harmonics,
"inverse", &d->inverse
);
if (ret)
jerror(&err, "Failed to parse configuration of hook '%s'", plugin_name(h->_vt));
if (rate > 0)
d->dt = 1. / rate;
else if (dt > 0)
d->dt = dt;
else
error("Either on of the settings 'dt' or 'rate' must be gived for hook '%s'", plugin_name(h->_vt));
if (!json_is_array(json_harmonics))
error("Setting 'harmonics' of hook '%s' must be a list of integers", plugin_name(h->_vt));
switch (json_typeof(json_signal)) {
case JSON_STRING:
d->signal_name = strdup(json_string_value(json_signal));
break;
case JSON_INTEGER:
d->signal_name = NULL;
d->signal_index = json_integer_value(json_signal);
break;
default:
error("Invalid value for setting 'signal' in hook '%s'", hook_type_name(h->_vt));
}
d->fharmonics_len = json_array_size(json_harmonics);
d->fharmonics = alloc(d->fharmonics_len * sizeof(double));
d->coeffs = alloc(d->fharmonics_len * sizeof(double complex));
if (!d->fharmonics || !d->coeffs)
return -1;
json_array_foreach(json_harmonics, i, json_harmonic) {
if (!json_is_integer(json_harmonic))
error("Setting 'harmonics' of hook '%s' must be a list of integers", plugin_name(h->_vt));
d->fharmonics[i] = json_integer_value(json_harmonic);
}
return 0;
}
static int dp_prepare(struct hook *h)
{
int ret;
struct dp *d = (struct dp *) h->_vd;
char *new_sig_name;
struct signal *orig_sig, *new_sig;
if (d->signal_name) {
d->signal_index = vlist_lookup_index(&h->signals, d->signal_name);
if (d->signal_index < 0)
return -1;
}
if (d->inverse) {
/* Remove complex-valued coefficient signals */
for (int i = 0; i < d->fharmonics_len; i++) {
orig_sig = vlist_at_safe(&h->signals, d->signal_index + i);
if (!orig_sig)
return -1;
/** @todo: SIGNAL_TYPE_AUTO is bad here */
if (orig_sig->type != SIGNAL_TYPE_COMPLEX && orig_sig->type != SIGNAL_TYPE_AUTO)
return -1;
ret = vlist_remove(&h->signals, d->signal_index + i);
if (ret)
return -1;
signal_decref(orig_sig);
}
/* Add new real-valued reconstructed signals */
new_sig = signal_create("dp", "idp", SIGNAL_TYPE_FLOAT);
if (!new_sig)
return -1;
ret = vlist_insert(&h->signals, d->offset, new_sig);
if (ret)
return -1;
}
else {
orig_sig = vlist_at_safe(&h->signals, d->signal_index);
if (!orig_sig)
return -1;
/** @todo: SIGNAL_TYPE_AUTO is bad here */
if (orig_sig->type != SIGNAL_TYPE_FLOAT && orig_sig->type != SIGNAL_TYPE_AUTO)
return -1;
ret = vlist_remove(&h->signals, d->signal_index);
if (ret)
return -1;
for (int i = 0; i < d->fharmonics_len; i++) {
new_sig_name = strf("%s_harm%d", orig_sig->name, i);
new_sig = signal_create(new_sig_name, orig_sig->unit, SIGNAL_TYPE_COMPLEX);
if (!new_sig)
return -1;
ret = vlist_insert(&h->signals, d->offset + i, new_sig);
if (ret)
return -1;
}
signal_decref(orig_sig);
}
return 0;
}
static int dp_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct dp *d = (struct dp *) h->_vd;
for (unsigned j = 0; j < *cnt; j++) {
struct sample *smp = smps[j];
if (d->signal_index > smp->length)
continue;
if (d->inverse) {
double signal;
float complex *coeffs = &smp->data[d->signal_index].z;
dp_istep(d, coeffs, &signal);
sample_data_remove(smp, d->signal_index, d->fharmonics_len);
sample_data_insert(smp, (union signal_data *) &signal, d->offset, 1);
}
else {
double signal = smp->data[d->signal_index].f;
float complex coeffs[d->fharmonics_len];
dp_step(d, &signal, coeffs);
sample_data_remove(smp, d->signal_index, 1);
sample_data_insert(smp, (union signal_data *) coeffs, d->offset, d->fharmonics_len);
}
smp->signals = &h->signals;
}
d->t += d->dt;
return 0;
}
static struct plugin p = {
.name = "dp",
.description = "Transform to/from dynamic phasor domain",
.type = PLUGIN_TYPE_HOOK,
.hook = {
.flags = HOOK_PATH | HOOK_NODE_READ | HOOK_NODE_WRITE,
.priority = 99,
.init = dp_init,
.init_signals = dp_prepare,
.destroy = dp_destroy,
.start = dp_start,
.stop = dp_stop,
.parse = dp_parse,
.process = dp_process,
.size = sizeof(struct dp)
}
};
REGISTER_PLUGIN(&p)
/** @} */

View file

@ -56,18 +56,7 @@ static int print_start(struct hook *h)
struct print *p = (struct print *) h->_vd;
int ret;
struct vlist *signals;
if (h->node)
signals = &h->node->in.signals;
else if (h->path)
signals = &h->path->signals;
else
signals = NULL;
ret = signals
? io_init(&p->io, p->format, signals, SAMPLE_HAS_ALL)
: io_init_auto(&p->io, p->format, DEFAULT_SAMPLE_LENGTH, SAMPLE_HAS_ALL);
ret = io_init(&p->io, p->format, &h->signals, SAMPLE_HAS_ALL);
if (ret)
return ret;

View file

@ -31,63 +31,105 @@
#include <villas/sample.h>
struct scale {
char *signal_name;
int signal_index;
double scale;
double offset;
};
static int scale_init(struct hook *h)
{
struct scale *p = (struct scale *) h->_vd;
struct scale *s = (struct scale *) h->_vd;
p->scale = 1;
p->offset = 0;
s->scale = 1;
s->offset = 0;
return 0;
}
static int scale_prepare(struct hook *h)
{
struct scale *s = (struct scale *) h->_vd;
if (s->signal_name) {
s->signal_index = vlist_lookup_index(&h->signals, s->signal_name);
if (s->signal_index < 0)
return -1;
}
return 0;
}
static int scale_destroy(struct hook *h)
{
struct scale *s = (struct scale *) h->_vd;
if (s->signal_name)
free(s->signal_name);
return 0;
}
static int scale_parse(struct hook *h, json_t *cfg)
{
struct scale *p = (struct scale *) h->_vd;
struct scale *s = (struct scale *) h->_vd;
int ret;
json_t *json_signal;
json_error_t err;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: F, s?: F }",
"scale", &p->scale,
"offset", &p->offset
ret = json_unpack_ex(cfg, &err, 0, "{ s?: F, s?: F, s: o }",
"scale", &s->scale,
"offset", &s->offset,
"signal", &json_signal
);
if (ret)
jerror(&err, "Failed to parse configuration of hook '%s'", hook_type_name(h->_vt));
switch (json_typeof(json_signal)) {
case JSON_STRING:
s->signal_name = strdup(json_string_value(json_signal));
break;
case JSON_INTEGER:
s->signal_name = NULL;
s->signal_index = json_integer_value(json_signal);
break;
default:
error("Invalid value for setting 'signal' in hook '%s'", hook_type_name(h->_vt));
}
return 0;
}
static int scale_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct scale *p = (struct scale *) h->_vd;
struct scale *s = (struct scale *) h->_vd;
for (int i = 0; i < *cnt; i++) {
for (int k = 0; k < smps[i]->length; k++) {
struct sample *smp = smps[i];
int k = s->signal_index;
switch (sample_format(smps[i], k)) {
case SIGNAL_TYPE_INTEGER:
smps[i]->data[k].i = smps[i]->data[k].i * p->scale + p->offset;
break;
switch (sample_format(smp, k)) {
case SIGNAL_TYPE_INTEGER:
smp->data[k].i = smp->data[k].i * s->scale + s->offset;
break;
case SIGNAL_TYPE_FLOAT:
smps[i]->data[k].f = smps[i]->data[k].f * p->scale + p->offset;
break;
case SIGNAL_TYPE_FLOAT:
smp->data[k].f = smp->data[k].f * s->scale + s->offset;
break;
case SIGNAL_TYPE_COMPLEX:
smps[i]->data[k].z = smps[i]->data[k].z * p->scale + p->offset;
break;
case SIGNAL_TYPE_COMPLEX:
smp->data[k].z = smp->data[k].z * s->scale + s->offset;
break;
case SIGNAL_TYPE_BOOLEAN:
smps[i]->data[k].b = smps[i]->data[k].b * p->scale + p->offset;
break;
case SIGNAL_TYPE_BOOLEAN:
smp->data[k].b = smp->data[k].b * s->scale + s->offset;
break;
default: { }
}
default: { }
}
}
@ -102,6 +144,8 @@ static struct plugin p = {
.flags = HOOK_PATH,
.priority = 99,
.init = scale_init,
.init_signals = scale_prepare,
.destroy = scale_destroy,
.parse = scale_parse,
.process = scale_process,
.size = sizeof(struct scale)

View file

@ -115,7 +115,6 @@ int io_init_auto(struct io *io, const struct format_type *fmt, int len, int flag
struct vlist *signals;
signals = alloc(sizeof(struct vlist));
signals->state = STATE_DESTROYED;
ret = vlist_init(signals);

View file

@ -32,14 +32,14 @@
int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *nodes)
{
char *cpy, *node, *type, *field, *end;
char *cpy, *node, *type, *field, *end, *lasts;
cpy = strdup(str);
if (!cpy)
return -1;
if (nodes) {
node = strtok(cpy, ".");
node = strtok_r(cpy, ".", &lasts);
if (!node) {
warning("Missing node name");
goto invalid_format;
@ -51,14 +51,14 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
goto invalid_format;
}
type = strtok(NULL, ".[");
type = strtok_r(NULL, ".[", &lasts);
if (!type)
type = "data";
}
else {
me->node = NULL;
type = strtok(cpy, ".[");
type = strtok_r(cpy, ".[", &lasts);
if (!type)
goto invalid_format;
}
@ -67,11 +67,11 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
me->type = MAPPING_TYPE_STATS;
me->length = 1;
char *metric = strtok(NULL, ".");
char *metric = strtok_r(NULL, ".", &lasts);
if (!metric)
goto invalid_format;
type = strtok(NULL, ".");
type = strtok_r(NULL, ".", &lasts);
if (!type)
goto invalid_format;
@ -87,7 +87,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
me->type = MAPPING_TYPE_HEADER;
me->length = 1;
field = strtok(NULL, ".");
field = strtok_r(NULL, ".", &lasts);
if (!field) {
warning("Missing header type");
goto invalid_format;
@ -106,7 +106,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
me->type = MAPPING_TYPE_TIMESTAMP;
me->length = 2;
field = strtok(NULL, ".");
field = strtok_r(NULL, ".", &lasts);
if (!field) {
warning("Missing timestamp type");
goto invalid_format;
@ -127,7 +127,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
me->type = MAPPING_TYPE_DATA;
first_str = strtok(NULL, "-]");
first_str = strtok_r(NULL, "-]", &lasts);
if (first_str) {
if (me->node)
first = vlist_lookup_index(&me->node->in.signals, first_str);
@ -144,11 +144,11 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
else {
/* Map all signals */
me->data.offset = 0;
me->length = me->node ? vlist_length(&me->node->in.signals) : 0;
me->length = -1;
goto end;
}
last_str = strtok(NULL, "]");
last_str = strtok_r(NULL, "]", &lasts);
if (last_str) {
if (me->node)
last = vlist_lookup_index(&me->node->in.signals, last_str);
@ -175,7 +175,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
goto invalid_format;
end: /* Check that there is no garbage at the end */
end = strtok(NULL, "");
end = strtok_r(NULL, "", &lasts);
if (end)
goto invalid_format;
@ -201,9 +201,9 @@ int mapping_parse(struct mapping_entry *me, json_t *cfg, struct vlist *nodes)
return mapping_parse_str(me, str, nodes);
}
int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes)
int mapping_list_parse(struct vlist *ml, json_t *cfg, struct vlist *nodes)
{
int ret, off;
int ret;
size_t i;
json_t *json_entry;
@ -218,7 +218,6 @@ int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes)
else
return -1;
off = 0;
json_array_foreach(json_mapping, i, json_entry) {
struct mapping_entry *me = (struct mapping_entry *) alloc(sizeof(struct mapping_entry));
@ -226,10 +225,7 @@ int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes)
if (ret)
goto out;
me->offset = off;
off += me->length;
vlist_push(l, me);
vlist_push(ml, me);
}
ret = 0;
@ -239,22 +235,16 @@ out: json_decref(json_mapping);
return ret;
}
int mapping_update(const struct mapping_entry *me, struct sample *remapped, const struct sample *original, const struct stats *s)
int mapping_update(const struct mapping_entry *me, struct sample *remapped, const struct sample *original)
{
int len = me->length;
int off = me->offset;
/* me->length == 0 means that we want to take all values */
if (!len)
len = original->length;
if (len + off > remapped->capacity)
if (me->length + me->offset > remapped->capacity)
return -1;
switch (me->type) {
case MAPPING_TYPE_STATS:
remapped->data[off++] = stats_get_value(s, me->stats.metric, me->stats.type);
case MAPPING_TYPE_STATS: {
remapped->data[me->offset] = stats_get_value(me->node->stats, me->stats.metric, me->stats.type);
break;
}
case MAPPING_TYPE_TIMESTAMP: {
const struct timespec *ts;
@ -270,8 +260,8 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
return -1;
}
remapped->data[off++].i = ts->tv_sec;
remapped->data[off++].i = ts->tv_nsec;
remapped->data[me->offset + 0].i = ts->tv_sec;
remapped->data[me->offset + 1].i = ts->tv_nsec;
break;
}
@ -279,11 +269,13 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
case MAPPING_TYPE_HEADER:
switch (me->header.type) {
case MAPPING_HEADER_TYPE_LENGTH:
remapped->data[off++].i = original->length;
remapped->data[me->offset].i = original->length;
break;
case MAPPING_HEADER_TYPE_SEQUENCE:
remapped->data[off++].i = original->sequence;
remapped->data[me->offset].i = original->sequence;
break;
default:
return -1;
}
@ -291,11 +283,11 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
break;
case MAPPING_TYPE_DATA:
for (int j = me->data.offset; j < len + me->data.offset; j++) {
for (int j = me->data.offset, i = me->offset; j < me->length + me->data.offset; j++, i++) {
if (j >= original->length)
remapped->data[off++].f = 0;
remapped->data[i].f = -1;
else
remapped->data[off++] = original->data[j];
remapped->data[i] = original->data[j];
}
break;
@ -304,14 +296,14 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
return 0;
}
int mapping_remap(const struct vlist *m, struct sample *remapped, const struct sample *original, const struct stats *s)
int mapping_list_remap(const struct vlist *ml, struct sample *remapped, const struct sample *original)
{
int ret;
for (size_t i = 0; i < vlist_length(m); i++) {
struct mapping_entry *me = (struct mapping_entry *) vlist_at(m, i);
for (size_t i = 0; i < vlist_length(ml); i++) {
struct mapping_entry *me = (struct mapping_entry *) vlist_at(ml, i);
ret = mapping_update(me, remapped, original, s);
ret = mapping_update(me, remapped, original);
if (ret)
return ret;
}
@ -319,6 +311,24 @@ int mapping_remap(const struct vlist *m, struct sample *remapped, const struct s
return 0;
}
int mapping_list_prepare(struct vlist *ml)
{
for (size_t i = 0, off = 0; i < vlist_length(ml); i++) {
struct mapping_entry *me = (struct mapping_entry *) vlist_at(ml, i);
if (me->length < 0) {
struct vlist *sigs = node_get_signals(me->node, NODE_DIR_IN);
me->length = vlist_length(sigs);
}
me->offset = off;
off += me->length;
}
return 0;
}
int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str)
{
const char *type;

View file

@ -41,177 +41,6 @@
#include <villas/kernel/tc_netem.h>
#endif /* WITH_NETEM */
static int node_direction_init2(struct node_direction *nd, struct node *n)
{
#ifdef WITH_HOOKS
int ret;
int m = nd == &n->out
? HOOK_NODE_WRITE
: HOOK_NODE_READ;
/* Add internal hooks if they are not already in the list */
ret = hook_init_builtin_list(&nd->hooks, nd->builtin, m, NULL, n);
if (ret)
return ret;
/* We sort the hooks according to their priority before starting the path */
vlist_sort(&nd->hooks, hook_cmp_priority);
#endif /* WITH_HOOKS */
return 0;
}
static int node_direction_init(struct node_direction *nd, struct node *n)
{
int ret;
nd->enabled = 1;
nd->vectorize = 1;
nd->builtin = 1;
nd->hooks.state = STATE_DESTROYED;
nd->signals.state = STATE_DESTROYED;
#ifdef WITH_HOOKS
ret = vlist_init(&nd->hooks);
if (ret)
return ret;
#endif /* WITH_HOOKS */
ret = vlist_init(&nd->signals);
if (ret)
return ret;
return 0;
}
static int node_direction_destroy(struct node_direction *nd, struct node *n)
{
int ret = 0;
#ifdef WITH_HOOKS
ret = vlist_destroy(&nd->hooks, (dtor_cb_t) hook_destroy, true);
if (ret)
return ret;
#endif /* WITH_HOOKS */
ret = vlist_destroy(&nd->signals, (dtor_cb_t) signal_decref, false);
if (ret)
return ret;
return ret;
}
static int node_direction_parse(struct node_direction *nd, struct node *n, json_t *cfg)
{
int ret;
json_error_t err;
json_t *json_hooks = NULL;
json_t *json_signals = NULL;
nd->cfg = cfg;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b, s?: b }",
"hooks", &json_hooks,
"signals", &json_signals,
"vectorize", &nd->vectorize,
"builtin", &nd->builtin,
"enabled", &nd->enabled
);
if (ret)
jerror(&err, "Failed to parse node %s", node_name(n));
if (n->_vt->flags & NODE_TYPE_PROVIDES_SIGNALS) {
if (json_signals)
error("Node %s does not support signal definitions", node_name(n));
}
else if (json_is_array(json_signals)) {
ret = signal_list_parse(&nd->signals, json_signals);
if (ret)
error("Failed to parse signal definition of node %s", node_name(n));
}
else {
int count = DEFAULT_SAMPLE_LENGTH;
const char *type_str = "float";
if (json_is_object(json_signals)) {
json_unpack_ex(json_signals, &err, 0, "{ s: i, s: s }",
"count", &count,
"type", &type_str
);
}
else
warning("No signal definition found for node %s. Using the default config of %d floating point signals.", node_name(n), DEFAULT_SAMPLE_LENGTH);
int type = signal_type_from_str(type_str);
if (type < 0)
error("Invalid signal type %s", type_str);
signal_list_generate(&nd->signals, count, type);
}
#ifdef WITH_HOOKS
int m = nd == &n->out
? HOOK_NODE_WRITE
: HOOK_NODE_READ;
if (json_hooks) {
ret = hook_parse_list(&nd->hooks, json_hooks, m, NULL, n);
if (ret < 0)
return ret;
}
#endif /* WITH_HOOKS */
return 0;
}
static int node_direction_check(struct node_direction *nd, struct node *n)
{
if (nd->vectorize <= 0)
error("Invalid setting 'vectorize' with value %d for node %s. Must be natural number!", nd->vectorize, node_name(n));
if (node_type(n)->vectorize && node_type(n)->vectorize < nd->vectorize)
error("Invalid value for setting 'vectorize'. Node type requires a number smaller than %d!",
node_type(n)->vectorize);
return 0;
}
static int node_direction_start(struct node_direction *nd, struct node *n)
{
#ifdef WITH_HOOKS
int ret;
for (size_t i = 0; i < vlist_length(&nd->hooks); i++) {
struct hook *h = (struct hook *) vlist_at(&nd->hooks, i);
ret = hook_start(h);
if (ret)
return ret;
}
#endif /* WITH_HOOKS */
return 0;
}
static int node_direction_stop(struct node_direction *nd, struct node *n)
{
#ifdef WITH_HOOKS
int ret;
for (size_t i = 0; i < vlist_length(&nd->hooks); i++) {
struct hook *h = (struct hook *) vlist_at(&nd->hooks, i);
ret = hook_stop(h);
if (ret)
return ret;
}
#endif /* WITH_HOOKS */
return 0;
}
int node_init(struct node *n, struct node_type *vt)
{
int ret;
@ -224,6 +53,7 @@ int node_init(struct node *n, struct node_type *vt)
n->name = NULL;
n->_name = NULL;
n->_name_long = NULL;
n->enabled = 1;
#ifdef __linux__
n->fwmark = -1;
@ -235,11 +65,11 @@ int node_init(struct node *n, struct node_type *vt)
#endif /* WITH_NETEM */
/* Default values */
ret = node_direction_init(&n->in, n);
ret = node_direction_init(&n->in, NODE_DIR_IN, n);
if (ret)
return ret;
ret = node_direction_init(&n->out, n);
ret = node_direction_init(&n->out, NODE_DIR_OUT, n);
if (ret)
return ret;
@ -254,20 +84,22 @@ int node_init(struct node *n, struct node_type *vt)
return 0;
}
int node_init2(struct node *n)
int node_prepare(struct node *n)
{
int ret;
assert(n->state == STATE_CHECKED);
ret = node_direction_init2(&n->in, n);
ret = node_direction_prepare(&n->in, n);
if (ret)
return ret;
ret = node_direction_init2(&n->out, n);
ret = node_direction_prepare(&n->out, n);
if (ret)
return ret;
n->state = STATE_PREPARED;
return 0;
}
@ -284,8 +116,9 @@ int node_parse(struct node *n, json_t *json, const char *name)
n->name = strdup(name);
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o } }",
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: b, s?: { s?: o } }",
"type", &type,
"enabled", &n->enabled,
"in",
"signals", &json_signals
);
@ -389,7 +222,7 @@ int node_start(struct node *n)
{
int ret;
assert(n->state == STATE_CHECKED);
assert(n->state == STATE_PREPARED);
assert(node_type(n)->state == STATE_STARTED);
info("Starting node %s", node_name_long(n));
@ -498,9 +331,8 @@ int node_restart(struct node *n)
info("Restarting node %s", node_name(n));
if (node_type(n)->restart) {
if (node_type(n)->restart)
ret = node_type(n)->restart(n);
}
else {
ret = node_type(n)->stop ? node_type(n)->stop(n) : 0;
if (ret)
@ -533,7 +365,7 @@ int node_destroy(struct node *n)
return ret;
}
vlist_remove(&node_type(n)->instances, n);
vlist_remove_all(&node_type(n)->instances, n);
if (n->_vd)
free(n->_vd);
@ -586,7 +418,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
#ifdef WITH_HOOKS
/* Run read hooks */
int rread = hook_process_list(&n->in.hooks, smps, nread);
int rread = hook_list_process(&n->in.hooks, smps, nread);
int skipped = nread - rread;
if (skipped > 0 && n->stats != NULL) {
@ -616,7 +448,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
#ifdef WITH_HOOKS
/* Run write hooks */
cnt = hook_process_list(&n->out.hooks, smps, cnt);
cnt = hook_list_process(&n->out.hooks, smps, cnt);
if (cnt <= 0)
return cnt;
#endif /* WITH_HOOKS */
@ -647,7 +479,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
char * node_name(struct node *n)
{
if (!n->_name)
strcatf(&n->_name, CLR_RED("%s") "(" CLR_YEL("%s") ")", n->name, node_type_name(n->_vt));
strcatf(&n->_name, CLR_RED("%s") "(" CLR_YEL("%s") ")", n->name, node_type_name(node_type(n)));
return n->_name;
}
@ -704,19 +536,12 @@ int node_netem_fds(struct node *n, int fds[])
return node_type(n)->netem_fds ? node_type(n)->netem_fds(n, fds) : -1;
}
struct node_type * node_type(struct node *n)
{
assert(n->state != STATE_DESTROYED);
return n->_vt;
}
struct memory_type * node_memory_type(struct node *n, struct memory_type *parent)
{
return node_type(n)->memory_type ? node_type(n)->memory_type(n, parent) : &memory_hugepage;
}
int node_parse_list(struct vlist *list, json_t *cfg, struct vlist *all)
int node_list_parse(struct vlist *list, json_t *cfg, struct vlist *all)
{
struct node *node;
const char *str;
@ -771,14 +596,26 @@ invalid2:
return 0;
}
int node_is_valid_name(const char *name)
bool node_is_valid_name(const char *name)
{
for (const char *p = name; *p; p++) {
if (isalnum(*p) || (*p == '_') || (*p == '-'))
continue;
return -1;
return false;
}
return 0;
return true;
}
bool node_is_enabled(const struct node *n)
{
return n->enabled;
}
struct vlist * node_get_signals(struct node *n, enum node_dir dir)
{
struct node_direction *nd = dir == NODE_DIR_IN ? &n->in : &n->out;
return node_direction_get_signals(nd);
}

233
lib/node_direction.c Normal file
View file

@ -0,0 +1,233 @@
/** Node direction
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/utils.h>
#include <villas/hook.h>
#include <villas/node.h>
#include <villas/node_direction.h>
int node_direction_prepare(struct node_direction *nd, struct node *n)
{
assert(nd->state == STATE_CHECKED);
#ifdef WITH_HOOKS
int ret;
int t = nd->direction == NODE_DIR_OUT ? HOOK_NODE_WRITE : HOOK_NODE_READ;
int m = nd->builtin ? t | HOOK_BUILTIN : 0;
ret = hook_list_prepare(&nd->hooks, &nd->signals, m, NULL, n);
if (ret)
return ret;
#endif /* WITH_HOOKS */
nd->state = STATE_PREPARED;
return 0;
}
int node_direction_init(struct node_direction *nd, enum node_dir dir, struct node *n)
{
int ret;
assert(nd->state == STATE_DESTROYED);
nd->direction = dir;
nd->enabled = 1;
nd->vectorize = 1;
nd->builtin = 1;
nd->hooks.state = STATE_DESTROYED;
nd->signals.state = STATE_DESTROYED;
#ifdef WITH_HOOKS
ret = hook_list_init(&nd->hooks);
if (ret)
return ret;
#endif /* WITH_HOOKS */
ret = signal_list_init(&nd->signals);
if (ret)
return ret;
nd->state = STATE_INITIALIZED;
return 0;
}
int node_direction_destroy(struct node_direction *nd, struct node *n)
{
int ret = 0;
assert(nd->state != STATE_DESTROYED && nd->state != STATE_STARTED);
#ifdef WITH_HOOKS
ret = hook_list_destroy(&nd->hooks);
if (ret)
return ret;
#endif /* WITH_HOOKS */
ret = signal_list_destroy(&nd->signals);
if (ret)
return ret;
nd->state = STATE_DESTROYED;
return 0;
}
int node_direction_parse(struct node_direction *nd, struct node *n, json_t *cfg)
{
int ret;
assert(nd->state == STATE_INITIALIZED);
json_error_t err;
json_t *json_hooks = NULL;
json_t *json_signals = NULL;
nd->cfg = cfg;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b, s?: b }",
"hooks", &json_hooks,
"signals", &json_signals,
"vectorize", &nd->vectorize,
"builtin", &nd->builtin,
"enabled", &nd->enabled
);
if (ret)
jerror(&err, "Failed to parse node %s", node_name(n));
if (n->_vt->flags & NODE_TYPE_PROVIDES_SIGNALS) {
if (json_signals)
error("Node %s does not support signal definitions", node_name(n));
}
else if (json_is_array(json_signals)) {
ret = signal_list_parse(&nd->signals, json_signals);
if (ret)
error("Failed to parse signal definition of node %s", node_name(n));
}
else {
int count = DEFAULT_SAMPLE_LENGTH;
const char *type_str = "float";
if (json_is_object(json_signals)) {
json_unpack_ex(json_signals, &err, 0, "{ s: i, s: s }",
"count", &count,
"type", &type_str
);
}
else
warning("No signal definition found for node %s. Using the default config of %d floating point signals.", node_name(n), DEFAULT_SAMPLE_LENGTH);
int type = signal_type_from_str(type_str);
if (type < 0)
error("Invalid signal type %s", type_str);
signal_list_generate(&nd->signals, count, type);
}
#ifdef WITH_HOOKS
if (json_hooks) {
int m = nd->direction == NODE_DIR_OUT ? HOOK_NODE_WRITE : HOOK_NODE_READ;
ret = hook_list_parse(&nd->hooks, json_hooks, m, NULL, n);
if (ret < 0)
return ret;
}
#endif /* WITH_HOOKS */
nd->state = STATE_PARSED;
return 0;
}
int node_direction_check(struct node_direction *nd, struct node *n)
{
assert(nd->state == STATE_PARSED);
if (nd->vectorize <= 0)
error("Invalid setting 'vectorize' with value %d for node %s. Must be natural number!", nd->vectorize, node_name(n));
if (node_type(n)->vectorize && node_type(n)->vectorize < nd->vectorize)
error("Invalid value for setting 'vectorize'. Node type requires a number smaller than %d!",
node_type(n)->vectorize);
nd->state = STATE_CHECKED;
return 0;
}
int node_direction_start(struct node_direction *nd, struct node *n)
{
assert(nd->state == STATE_PREPARED);
#ifdef WITH_HOOKS
int ret;
for (size_t i = 0; i < vlist_length(&nd->hooks); i++) {
struct hook *h = (struct hook *) vlist_at(&nd->hooks, i);
ret = hook_start(h);
if (ret)
return ret;
}
#endif /* WITH_HOOKS */
nd->state = STATE_STARTED;
return 0;
}
int node_direction_stop(struct node_direction *nd, struct node *n)
{
assert(nd->state == STATE_STARTED);
#ifdef WITH_HOOKS
int ret;
for (size_t i = 0; i < vlist_length(&nd->hooks); i++) {
struct hook *h = (struct hook *) vlist_at(&nd->hooks, i);
ret = hook_stop(h);
if (ret)
return ret;
}
#endif /* WITH_HOOKS */
nd->state = STATE_STOPPED;
return 0;
}
struct vlist * node_direction_get_signals(struct node_direction *nd)
{
#ifdef WITH_HOOKS
assert(nd->state == STATE_PREPARED);
struct hook *h = vlist_last(&nd->hooks);
return &h->signals;
#else
return &nd->signals;
#endif
}

View file

@ -174,8 +174,7 @@ int ib_parse(struct node *n, json_t *cfg)
struct infiniband *ib = (struct infiniband *) n->_vd;
int ret;
char *local = NULL;
char *remote = NULL;
char *local = NULL, *remote = NULL, *lasts;
const char *transport_mode = "RC";
int timeout = 1000;
int recv_cq_size = 128;
@ -255,8 +254,8 @@ int ib_parse(struct node *n, json_t *cfg)
debug(LOG_IB | 4, "Set buffer subtraction to %i in node %s", buffer_subtraction, node_name(n));
// Translate IP:PORT to a struct addrinfo
char* ip_adr = strtok(local, ":");
char* port = strtok(NULL, ":");
char* ip_adr = strtok_r(local, ":", &lasts);
char* port = strtok_r(NULL, ":", &lasts);
ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.src_addr);
if (ret)
@ -327,8 +326,8 @@ int ib_parse(struct node *n, json_t *cfg)
// If node will send data, set remote address
if (ib->is_source) {
// Translate address info
char* ip_adr = strtok(remote, ":");
char* port = strtok(NULL, ":");
char *ip_adr = strtok_r(remote, ":", &lasts);
char *port = strtok_r(NULL, ":", &lasts);
ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.dst_addr);
if (ret)

View file

@ -40,7 +40,7 @@ int influxdb_parse(struct node *n, json_t *json)
json_error_t err;
int ret;
char *tmp, *host, *port;
char *tmp, *host, *port, *lasts;
const char *server, *key;
ret = json_unpack_ex(json, &err, 0, "{ s: s, s: s, s?: o }",
@ -52,8 +52,8 @@ int influxdb_parse(struct node *n, json_t *json)
tmp = strdup(server);
host = strtok(tmp, ":");
port = strtok(NULL, "");
host = strtok_r(tmp, ":", &lasts);
port = strtok_r(NULL, "", &lasts);
i->key = strdup(key);
i->host = strdup(host);

View file

@ -626,9 +626,10 @@ int socket_parse_address(const char *addr, struct sockaddr *saddr, enum socket_l
#ifdef WITH_SOCKET_LAYER_ETH
else if (layer == SOCKET_LAYER_ETH) { /* Format: "ab:cd:ef:12:34:56%ifname:protocol" */
/* Split string */
char *node = strtok(copy, "%");
char *ifname = strtok(NULL, ":");
char *proto = strtok(NULL, "\0");
char *lasts;
char *node = strtok_r(copy, "%", &lasts);
char *ifname = strtok_r(NULL, ":", &lasts);
char *proto = strtok_r(NULL, "\0", &lasts);
/* Parse link layer (MAC) address */
struct ether_addr *mac = ether_aton(node);
@ -659,8 +660,9 @@ int socket_parse_address(const char *addr, struct sockaddr *saddr, enum socket_l
};
/* Split string */
char *node = strtok(copy, ":");
char *service = strtok(NULL, "\0");
char *lasts;
char *node = strtok_r(copy, ":", &lasts);
char *service = strtok_r(NULL, "\0", &lasts);
if (node && !strcmp(node, "*"))
node = NULL;

View file

@ -51,7 +51,7 @@ int stats_node_signal_parse(struct stats_node_signal *s, json_t *cfg)
int ret;
const char *stats;
char *metric, *type, *node, *cpy;
char *metric, *type, *node, *cpy, *lasts;
ret = json_unpack_ex(cfg, &err, 0, "{ s: s }",
"stats", &stats
@ -61,15 +61,15 @@ int stats_node_signal_parse(struct stats_node_signal *s, json_t *cfg)
cpy = strdup(stats);
node = strtok(cpy, ".");
node = strtok_r(cpy, ".", &lasts);
if (!node)
goto invalid_format;
metric = strtok(NULL, ".");
metric = strtok_r(NULL, ".", &lasts);
if (!metric)
goto invalid_format;
type = strtok(NULL, ".");
type = strtok_r(NULL, ".", &lasts);
if (!type)
goto invalid_format;

View file

@ -203,7 +203,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
*/
/* Get path of incoming request */
char *node, *format;
char *node, *format, *lasts;
char uri[64];
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
@ -213,14 +213,14 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
return -1;
}
node = strtok(uri, "/.");
node = strtok_r(uri, "/.", &lasts);
if (!node) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
warning("Failed to tokenize request URI");
return -1;
}
format = strtok(NULL, "");
format = strtok_r(NULL, "", &lasts);
if (!format)
format = "villas.web";
@ -267,7 +267,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
}
if (connections.state == STATE_INITIALIZED)
vlist_remove(&connections, c);
vlist_remove_all(&connections, c);
if (c->state == WEBSOCKET_CONNECTION_STATE_INITIALIZED)
websocket_connection_destroy(c);

View file

@ -29,7 +29,6 @@
#include <villas/node/config.h>
#include <villas/utils.h>
#include <villas/path.h>
#include <villas/timing.h>
#include <villas/pool.h>
#include <villas/queue.h>
@ -39,221 +38,9 @@
#include <villas/stats.h>
#include <villas/node.h>
#include <villas/signal.h>
/* Forward declaration */
static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt);
static int path_source_init(struct path_source *ps)
{
int ret;
int pool_size = MAX(DEFAULT_QUEUE_LENGTH, ps->node->in.vectorize);
if (ps->node->_vt->pool_size)
pool_size = ps->node->_vt->pool_size;
ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(vlist_length(&ps->node->in.signals)), node_memory_type(ps->node, &memory_hugepage));
if (ret)
return ret;
return 0;
}
static int path_source_destroy(struct path_source *ps)
{
int ret;
ret = pool_destroy(&ps->pool);
if (ret)
return ret;
ret = vlist_destroy(&ps->mappings, NULL, true);
if (ret)
return ret;
return 0;
}
static int path_source_read(struct path_source *ps, struct path *p, int i)
{
int recv, tomux, allocated, cnt, toenqueue, enqueued = 0;
unsigned release;
cnt = ps->node->in.vectorize;
struct sample *read_smps[cnt];
struct sample *muxed_smps[cnt];
struct sample **tomux_smps;
/* Fill smps[] free sample blocks from the pool */
allocated = sample_alloc_many(&ps->pool, read_smps, cnt);
if (allocated != cnt)
warning("Pool underrun for path source %s", node_name(ps->node));
/* Read ready samples and store them to blocks pointed by smps[] */
release = allocated;
recv = node_read(ps->node, read_smps, allocated, &release);
if (recv == 0) {
enqueued = 0;
goto out2;
}
else if (recv < 0) {
if (ps->node->state == STATE_STOPPING) {
p->state = STATE_STOPPING;
enqueued = -1;
goto out2;
}
else
error("Failed to read samples from node %s", node_name(ps->node));
}
else if (recv < allocated)
warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated);
bitset_set(&p->received, i);
if (p->mode == PATH_MODE_ANY) { /* Mux all samples */
tomux_smps = read_smps;
tomux = recv;
}
else { /* Mux only last sample and discard others */
tomux_smps = read_smps + recv - 1;
tomux = 1;
}
for (int i = 0; i < tomux; i++) {
muxed_smps[i] = i == 0
? sample_clone(p->last_sample)
: sample_clone(muxed_smps[i-1]);
if (p->original_sequence_no)
muxed_smps[i]->sequence = tomux_smps[i]->sequence;
else {
muxed_smps[i]->sequence = p->last_sequence++;
muxed_smps[i]->flags |= SAMPLE_HAS_SEQUENCE;
}
muxed_smps[i]->ts = tomux_smps[i]->ts;
muxed_smps[i]->flags |= tomux_smps[i]->flags & (SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_TS_RECEIVED);
mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL);
}
sample_copy(p->last_sample, muxed_smps[tomux-1]);
debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received));
#ifdef WITH_HOOKS
toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux);
if (toenqueue != tomux) {
int skipped = tomux - toenqueue;
debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p));
}
#else
toenqueue = tomux;
#endif
if (bitset_test(&p->mask, i)) {
/* Check if we received an update from all nodes/ */
if ((p->mode == PATH_MODE_ANY) ||
(p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) {
path_destination_enqueue(p, muxed_smps, toenqueue);
/* Reset bitset of updated nodes */
bitset_clear_all(&p->received);
enqueued = toenqueue;
}
}
sample_decref_many(muxed_smps, tomux);
out2: sample_decref_many(read_smps, release);
return enqueued;
}
static int path_destination_init(struct path_destination *pd, int queuelen)
{
int ret;
ret = queue_init(&pd->queue, queuelen, &memory_hugepage);
if (ret)
return ret;
return 0;
}
static int path_destination_destroy(struct path_destination *pd)
{
int ret;
ret = queue_destroy(&pd->queue);
if (ret)
return ret;
return 0;
}
static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt)
{
unsigned enqueued, cloned;
struct sample *clones[cnt];
cloned = sample_clone_many(clones, smps, cnt);
if (cloned < cnt)
warning("Pool underrun in path %s", path_name(p));
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
enqueued = queue_push_many(&pd->queue, (void **) clones, cloned);
if (enqueued != cnt)
warning("Queue overrun for path %s", path_name(p));
/* Increase reference counter of these samples as they are now also owned by the queue. */
sample_incref_many(clones, cloned);
debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p));
}
sample_decref_many(clones, cloned);
}
static void path_destination_write(struct path_destination *pd, struct path *p)
{
int cnt = pd->node->out.vectorize;
int sent;
int released;
int allocated;
unsigned release;
struct sample *smps[cnt];
/* As long as there are still samples in the queue */
while (1) {
allocated = queue_pull_many(&pd->queue, (void **) smps, cnt);
if (allocated == 0)
break;
else if (allocated < cnt)
debug(LOG_PATH | 5, "Queue underrun for path %s: allocated=%u expected=%u", path_name(p), allocated, cnt);
debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", allocated, node_name(pd->node), path_name(p));
release = allocated;
sent = node_write(pd->node, smps, allocated, &release);
if (sent < 0)
error("Failed to sent %u samples to node %s: reason=%d", cnt, node_name(pd->node), sent);
else if (sent < allocated)
warning("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated);
released = sample_decref_many(smps, release);
debug(LOG_PATH | 15, "Released %d samples back to memory pool", released);
}
}
#include <villas/path.h>
#include <villas/path_source.h>
#include <villas/path_destination.h>
static void * path_run_single(void *arg)
{
@ -333,11 +120,11 @@ int path_init(struct path *p)
if (ret)
return ret;
ret = vlist_init(&p->signals);
ret = signal_list_init(&p->signals);
if (ret)
return ret;
ret = vlist_init(&p->hooks);
ret = hook_list_init(&p->hooks);
if (ret)
return ret;
@ -359,7 +146,7 @@ int path_init(struct path *p)
return 0;
}
int path_init_poll(struct path *p)
static int path_prepare_poll(struct path *p)
{
int fds[16], ret, n = 0, m;
@ -404,15 +191,17 @@ int path_init_poll(struct path *p)
return 0;
}
int path_init2(struct path *p)
int path_prepare(struct path *p)
{
int ret;
assert(p->state == STATE_CHECKED);
#ifdef WITH_HOOKS
int m = p->builtin ? HOOK_PATH | HOOK_BUILTIN : 0;
/* Add internal hooks if they are not already in the list */
ret = hook_init_builtin_list(&p->hooks, p->builtin, HOOK_PATH, p, NULL);
ret = hook_list_prepare(&p->hooks, &p->signals, m, p, NULL);
if (ret)
return ret;
@ -427,10 +216,10 @@ int path_init2(struct path *p)
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
if (pd->node->_vt->pool_size > pool_size)
pool_size = pd->node->_vt->pool_size;
if (node_type(pd->node)->pool_size > pool_size)
pool_size = node_type(pd->node)->pool_size;
if (pd->node->_vt->memory_type)
if (node_type(pd->node)->memory_type)
pool_mt = node_memory_type(pd->node, &memory_hugepage);
ret = path_destination_init(pd, p->queuelen);
@ -452,19 +241,21 @@ int path_init2(struct path *p)
if (ps->masked)
bitset_set(&p->mask, i);
ret = mapping_list_prepare(&ps->mappings);
if (ret)
return ret;
for (size_t i = 0; i < vlist_length(&ps->mappings); i++) {
struct mapping_entry *me = (struct mapping_entry *) vlist_at(&ps->mappings, i);
struct vlist *sigs = node_get_signals(me->node, NODE_DIR_IN);
int off = me->offset;
int len = me->length;
for (int j = 0; j < len; j++) {
for (int j = 0; j < me->length; j++) {
struct signal *sig;
/* For data mappings we simple refer to the existing
* signal descriptors of the source node. */
if (me->type == MAPPING_TYPE_DATA) {
sig = (struct signal *) vlist_at_safe(&me->node->in.signals, me->data.offset + j);
sig = (struct signal *) vlist_at_safe(sigs, me->data.offset + j);
if (!sig) {
warning("Failed to create signal description for path %s", path_name(p));
continue;
@ -481,8 +272,8 @@ int path_init2(struct path *p)
return -1;
}
vlist_extend(&p->signals, off + j + 1, NULL);
vlist_set(&p->signals, off + j, sig);
vlist_extend(&p->signals, me->offset + j + 1, NULL);
vlist_set(&p->signals, me->offset + j, sig);
}
}
}
@ -493,11 +284,13 @@ int path_init2(struct path *p)
/* Prepare poll() */
if (p->poll) {
ret = path_init_poll(p);
ret = path_prepare_poll(p);
if (ret)
return ret;
}
p->state = STATE_PREPARED;
return 0;
}
@ -537,7 +330,7 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
jerror(&err, "Failed to parse path configuration");
/* Input node(s) */
ret = mapping_parse_list(&sources, json_in, nodes);
ret = mapping_list_parse(&sources, json_in, nodes);
if (ret)
error("Failed to parse input mapping of path %s", path_name(p));
@ -553,7 +346,7 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
/* Output node(s) */
if (json_out) {
ret = node_parse_list(&destinations, json_out, nodes);
ret = node_list_parse(&destinations, json_out, nodes);
if (ret)
jerror(&err, "Failed to parse output nodes");
}
@ -586,6 +379,9 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
vlist_push(&p->sources, ps);
}
if (!node_is_enabled(ps->node))
error("Source %s of path %s is not enabled", node_name(ps->node), path_name(p));
vlist_push(&ps->mappings, me);
}
@ -596,6 +392,9 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
pd->node = n;
if (!node_is_enabled(pd->node))
error("Destination %s of path %s is not enabled", node_name(pd->node), path_name(p));
vlist_push(&p->destinations, pd);
}
@ -646,7 +445,7 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
#ifdef WITH_HOOKS
if (json_hooks) {
ret = hook_parse_list(&p->hooks, json_hooks, HOOK_PATH, p, NULL);
ret = hook_list_parse(&p->hooks, json_hooks, HOOK_PATH, p, NULL);
if (ret)
return ret;
}
@ -689,7 +488,7 @@ int path_check(struct path *p)
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
struct path_source *ps = (struct path_source *) vlist_at(&p->sources, i);
if (!ps->node->_vt->poll_fds)
if (!node_type(ps->node)->poll_fds)
error("Node %s can not be used in polling mode with path %s", node_name(ps->node), path_name(p));
}
}
@ -707,14 +506,14 @@ int path_check(struct path *p)
for (size_t i = 0; i < vlist_length(&p->sources); i++) {
struct path_source *ps = (struct path_source *) vlist_at(&p->sources, i);
if (!ps->node->_vt->read)
if (!node_type(ps->node)->read)
error("Node %s is not supported as a source for path %s", node_name(ps->node), path_name(p));
}
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
if (!pd->node->_vt->write)
if (!node_type(pd->node)->write)
error("Destiation node %s is not supported as a sink for path %s", node_name(pd->node), path_name(p));
}
@ -733,7 +532,7 @@ int path_start(struct path *p)
int ret;
char *mode, *mask;
assert(p->state == STATE_CHECKED);
assert(p->state == STATE_PREPARED);
switch (p->mode) {
case PATH_MODE_ANY: mode = "any"; break;
@ -750,8 +549,8 @@ int path_start(struct path *p)
p->poll ? "yes" : "no",
mask,
p->rate,
p->enabled ? "yes" : "no",
p->reverse ? "yes" : "no",
path_is_enabled(p) ? "yes" : "no",
path_is_reversed(p) ? "yes" : "no",
p->queuelen,
vlist_length(&p->hooks),
vlist_length(&p->sources),
@ -846,15 +645,27 @@ int path_stop(struct path *p)
int path_destroy(struct path *p)
{
int ret;
if (p->state == STATE_DESTROYED)
return 0;
#ifdef WITH_HOOKS
vlist_destroy(&p->hooks, (dtor_cb_t) hook_destroy, true);
ret = hook_list_destroy(&p->hooks);
if (ret)
return ret;
#endif
vlist_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true);
vlist_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true);
vlist_destroy(&p->signals, (dtor_cb_t) signal_decref, false);
ret = signal_list_destroy(&p->signals);
if (ret)
return ret;
ret = vlist_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true);
if (ret)
return ret;
ret = vlist_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true);
if (ret)
return ret;
if (p->reader.pfds)
free(p->reader.pfds);
@ -916,22 +727,37 @@ int path_uses_node(struct path *p, struct node *n)
return -1;
}
int path_is_simple(struct path *p)
bool path_is_simple(const struct path *p)
{
int ret;
const char *in = NULL, *out = NULL;
ret = json_unpack(p->cfg, "{ s: s, s: s }", "in", &in, "out", &out);
if (ret)
return ret;
return false;
ret = node_is_valid_name(in);
if (ret)
return ret;
if (!ret)
return false;
ret = node_is_valid_name(out);
if (ret)
return ret;
if (!ret)
return false;
return 0;
return true;
}
bool path_is_enabled(const struct path *p)
{
return p->enabled;
}
bool path_is_reversed(const struct path *p)
{
return p->reverse;
}
struct vlist * path_get_signals(struct path *p)
{
return &p->signals;
}

110
lib/path_destination.c Normal file
View file

@ -0,0 +1,110 @@
/** Path destination
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/utils.h>
#include <villas/memory.h>
#include <villas/sample.h>
#include <villas/node.h>
#include <villas/path.h>
#include <villas/path_destination.h>
int path_destination_init(struct path_destination *pd, int queuelen)
{
int ret;
ret = queue_init(&pd->queue, queuelen, &memory_hugepage);
if (ret)
return ret;
return 0;
}
int path_destination_destroy(struct path_destination *pd)
{
int ret;
ret = queue_destroy(&pd->queue);
if (ret)
return ret;
return 0;
}
void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt)
{
unsigned enqueued, cloned;
struct sample *clones[cnt];
cloned = sample_clone_many(clones, smps, cnt);
if (cloned < cnt)
warning("Pool underrun in path %s", path_name(p));
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
enqueued = queue_push_many(&pd->queue, (void **) clones, cloned);
if (enqueued != cnt)
warning("Queue overrun for path %s", path_name(p));
/* Increase reference counter of these samples as they are now also owned by the queue. */
sample_incref_many(clones, cloned);
debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p));
}
sample_decref_many(clones, cloned);
}
void path_destination_write(struct path_destination *pd, struct path *p)
{
int cnt = pd->node->out.vectorize;
int sent;
int released;
int allocated;
unsigned release;
struct sample *smps[cnt];
/* As long as there are still samples in the queue */
while (1) {
allocated = queue_pull_many(&pd->queue, (void **) smps, cnt);
if (allocated == 0)
break;
else if (allocated < cnt)
debug(LOG_PATH | 5, "Queue underrun for path %s: allocated=%u expected=%u", path_name(p), allocated, cnt);
debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", allocated, node_name(pd->node), path_name(p));
release = allocated;
sent = node_write(pd->node, smps, allocated, &release);
if (sent < 0)
error("Failed to sent %u samples to node %s: reason=%d", cnt, node_name(pd->node), sent);
else if (sent < allocated)
warning("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated);
released = sample_decref_many(smps, release);
debug(LOG_PATH | 15, "Released %d samples back to memory pool", released);
}
}

159
lib/path_source.c Normal file
View file

@ -0,0 +1,159 @@
/** Path source
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <villas/utils.h>
#include <villas/bitset.h>
#include <villas/sample.h>
#include <villas/node.h>
#include <villas/path.h>
#include <villas/path_destination.h>
#include <villas/path_source.h>
int path_source_init(struct path_source *ps)
{
int ret;
int pool_size = MAX(DEFAULT_QUEUE_LENGTH, ps->node->in.vectorize);
if (ps->node->_vt->pool_size)
pool_size = ps->node->_vt->pool_size;
ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(vlist_length(&ps->node->in.signals)), node_memory_type(ps->node, &memory_hugepage));
if (ret)
return ret;
return 0;
}
int path_source_destroy(struct path_source *ps)
{
int ret;
ret = pool_destroy(&ps->pool);
if (ret)
return ret;
ret = vlist_destroy(&ps->mappings, NULL, true);
if (ret)
return ret;
return 0;
}
int path_source_read(struct path_source *ps, struct path *p, int i)
{
int recv, tomux, allocated, cnt, toenqueue, enqueued = 0;
unsigned release;
cnt = ps->node->in.vectorize;
struct sample *read_smps[cnt];
struct sample *muxed_smps[cnt];
struct sample **tomux_smps;
/* Fill smps[] free sample blocks from the pool */
allocated = sample_alloc_many(&ps->pool, read_smps, cnt);
if (allocated != cnt)
warning("Pool underrun for path source %s", node_name(ps->node));
/* Read ready samples and store them to blocks pointed by smps[] */
release = allocated;
recv = node_read(ps->node, read_smps, allocated, &release);
if (recv == 0) {
enqueued = 0;
goto out2;
}
else if (recv < 0) {
if (ps->node->state == STATE_STOPPING) {
p->state = STATE_STOPPING;
enqueued = -1;
goto out2;
}
else
error("Failed to read samples from node %s", node_name(ps->node));
}
else if (recv < allocated)
warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated);
bitset_set(&p->received, i);
if (p->mode == PATH_MODE_ANY) { /* Mux all samples */
tomux_smps = read_smps;
tomux = recv;
}
else { /* Mux only last sample and discard others */
tomux_smps = read_smps + recv - 1;
tomux = 1;
}
for (int i = 0; i < tomux; i++) {
muxed_smps[i] = i == 0
? sample_clone(p->last_sample)
: sample_clone(muxed_smps[i-1]);
if (p->original_sequence_no)
muxed_smps[i]->sequence = tomux_smps[i]->sequence;
else {
muxed_smps[i]->sequence = p->last_sequence++;
muxed_smps[i]->flags |= SAMPLE_HAS_SEQUENCE;
}
muxed_smps[i]->ts = tomux_smps[i]->ts;
muxed_smps[i]->flags |= tomux_smps[i]->flags & (SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_TS_RECEIVED);
mapping_list_remap(&ps->mappings, muxed_smps[i], tomux_smps[i]);
}
sample_copy(p->last_sample, muxed_smps[tomux-1]);
debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received));
#ifdef WITH_HOOKS
toenqueue = hook_list_process(&p->hooks, muxed_smps, tomux);
if (toenqueue != tomux) {
int skipped = tomux - toenqueue;
debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p));
}
#else
toenqueue = tomux;
#endif
if (bitset_test(&p->mask, i)) {
/* Check if we received an update from all nodes/ */
if ((p->mode == PATH_MODE_ANY) ||
(p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) {
path_destination_enqueue(p, muxed_smps, toenqueue);
/* Reset bitset of updated nodes */
bitset_clear_all(&p->received);
enqueued = toenqueue;
}
}
sample_decref_many(muxed_smps, tomux);
out2: sample_decref_many(read_smps, release);
return enqueued;
}

View file

@ -301,3 +301,20 @@ void sample_dump(struct sample *s)
if (s->signals)
signal_list_dump(s->signals, s->data, s->length);
}
void sample_data_insert(struct sample *smp, const union signal_data *src, size_t offset, size_t len)
{
memmove(&smp->data[offset + len], &smp->data[offset], sizeof(smp->data[0]) * (smp->length - offset));
memcpy(&smp->data[offset], src, sizeof(smp->data[0]) * len);
smp->length += len;
}
void sample_data_remove(struct sample *smp, size_t offset, size_t len)
{
size_t sz = sizeof(smp->data[0]) * len;
memmove(&smp->data[offset], &smp->data[offset + len], sz);
smp->length -= len;
}

View file

@ -210,6 +210,28 @@ int signal_parse(struct signal *s, json_t *cfg)
/* Signal list */
int signal_list_init(struct vlist *list)
{
int ret;
ret = vlist_init(list);
if (ret)
return ret;
return 0;
}
int signal_list_destroy(struct vlist *list)
{
int ret;
ret = vlist_destroy(list, (dtor_cb_t) signal_decref, false);
if (ret)
return ret;
return 0;
}
int signal_list_parse(struct vlist *list, json_t *cfg)
{
int ret;
@ -285,6 +307,22 @@ void signal_list_dump(const struct vlist *list, const union signal_data *data, i
}
}
int signal_list_copy(struct vlist *dst, const struct vlist *src)
{
assert(src->state == STATE_INITIALIZED);
assert(dst->state == STATE_INITIALIZED);
for (size_t i = 0; i < vlist_length(src); i++) {
struct signal *s = (struct signal *) vlist_at_safe(src, i);
signal_incref(s);
vlist_push(dst, s);
}
return 0;
}
/* Signal type */
enum signal_type signal_type_from_str(const char *str)

View file

@ -218,7 +218,7 @@ int SuperNode::parseJson(json_t *j)
const char *type;
ret = node_is_valid_name(name);
if (ret)
if (!ret)
throw RuntimeError("Invalid name for node: {}", name);
ret = json_unpack_ex(json_node, &err, 0, "{ s: s }", "type", &type);
@ -266,7 +266,7 @@ parse: path *p = (path *) alloc(sizeof(path));
if (p->reverse) {
/* Only simple paths can be reversed */
ret = path_is_simple(p);
if (ret)
if (!ret)
throw RuntimeError("Complex paths can not be reversed!");
/* Parse a second time with in/out reversed */
@ -356,18 +356,12 @@ void SuperNode::startNodes()
for (size_t i = 0; i < vlist_length(&nodes); i++) {
auto *n = (struct node *) vlist_at(&nodes, i);
ret = node_init2(n);
if (ret)
throw RuntimeError("Failed to prepare node: {}", node_name(n));
if (!node_is_enabled(n))
continue;
int refs = vlist_count(&paths, (cmp_cb_t) path_uses_node, n);
if (refs > 0) {
ret = node_start(n);
if (ret)
throw RuntimeError("Failed to start node: {}", node_name(n));
}
else
logger->warn("No path is using the node {}. Skipping...", node_name(n));
ret = node_start(n);
if (ret)
throw RuntimeError("Failed to start node: {}", node_name(n));
}
}
@ -378,21 +372,54 @@ void SuperNode::startPaths()
for (size_t i = 0; i < vlist_length(&paths); i++) {
auto *p = (struct path *) vlist_at(&paths, i);
if (p->enabled) {
ret = path_init2(p);
if (ret)
throw RuntimeError("Failed to prepare path: {}", path_name(p));
if (!path_is_enabled(p))
continue;
ret = path_start(p);
if (ret)
throw RuntimeError("Failed to start path: {}", path_name(p));
}
else
logger->warn("Path {} is disabled. Skipping...", path_name(p));
ret = path_start(p);
if (ret)
throw RuntimeError("Failed to start path: {}", path_name(p));
}
}
void SuperNode::start()
void SuperNode::prepareNodes()
{
int ret, refs;
for (size_t i = 0; i < vlist_length(&nodes); i++) {
auto *n = (struct node *) vlist_at(&nodes, i);
refs = vlist_count(&paths, (cmp_cb_t) path_uses_node, n);
if (refs <= 0) {
logger->warn("No path is using the node {}. Skipping...", node_name(n));
n->enabled = false;
}
if (!node_is_enabled(n))
continue;
ret = node_prepare(n);
if (ret)
throw RuntimeError("Failed to prepare node: {}", node_name(n));
}
}
void SuperNode::preparePaths()
{
int ret;
for (size_t i = 0; i < vlist_length(&paths); i++) {
auto *p = (struct path *) vlist_at(&paths, i);
if (!path_is_enabled(p))
continue;
ret = path_prepare(p);
if (ret)
throw RuntimeError("Failed to prepare path: {}", path_name(p));
}
}
void SuperNode::prepare()
{
int ret;
@ -404,6 +431,18 @@ void SuperNode::start()
kernel::rt::init(priority, affinity);
prepareNodes();
preparePaths();
state = STATE_PREPARED;
}
void SuperNode::start()
{
int ret;
assert(state == STATE_PREPARED);
#ifdef WITH_API
api.start();
#endif

View file

@ -13,7 +13,7 @@ setup(
description = 'Python-support for VILLASnode simulation-data gateway',
license = 'GPL-3.0',
keywords = 'simulation power system real-time villas',
url = 'https://git.rwth-aachen.de/acs/public/villas/dataprocessing',
url = 'https://git.rwth-aachen.de/acs/public/villas/VILLASnode',
packages = [ 'villas.node' ],
long_description = long_description,
long_description_content_type = 'text/markdown',

View file

@ -0,0 +1,83 @@
import re
class Timestamp:
"""Parsing the VILLASnode human-readable timestamp format"""
def __init__(self, seconds = 0, nanoseconds = None, offset = None, sequence = None):
self.seconds = seconds
self.nanoseconds = nanoseconds
self.offset = offset
self.sequence = sequence
@classmethod
def parse(self, ts):
m = re.match('(\d+)(?:\.(\d+))?([-+]\d+(?:\.\d+)?(?:e[+-]?\d+)?)?(?:\((\d+)\))?', ts)
seconds = int(m.group(1)); # Mandatory
nanoseconds = int(m.group(2)) if m.group(2) else None
offset = float(m.group(3)) if m.group(3) else None
sequence = int(m.group(4)) if m.group(4) else None
return Timestamp(seconds, nanoseconds, offset, sequence)
def __str__(self):
str = "%u" % (self.seconds)
if self.nanoseconds is not None:
str += ".%09u" % self.nanoseconds
if self.offset is not None:
str += "+%u" % self.offset
if self.sequence is not None:
str += "(%u)" % self.sequence
return str
def __float__(self):
sum = float(self.seconds)
if self.nanoseconds is not None:
sum += self.nanoseconds * 1e-9
if self.offset is not None:
sum += self.offset
return sum
def __cmp__(self, other):
return cmp(float(self), float(other))
class Sample:
"""Parsing a VILLASnode sample from a file (not a UDP package!!)"""
def __init__(self, ts, values):
self.ts = ts
self.values = values
@classmethod
def parse(self, line):
csv = line.split()
ts = Timestamp.parse(csv[0])
vs = [ ]
for value in csv[1:]:
try:
v = float(value)
except ValueError:
value = value.lower()
try:
v = complex(value)
except:
if value.endswith('i'):
v = complex(value.replace('i', 'j'))
else:
raise ValueError()
vs.append(v)
return Sample(ts, vs)
def __str__(self):
return '%s %s' % (self.ts, " ".join(map(str, self.values)))
def __cmp__(self, other):
return cmp(self.ts, other.ts)

View file

@ -194,6 +194,10 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Failed to parse hook config");
ret = hook_prepare(&h, io.signals);
if (ret)
throw RuntimeError("Failed to prepare hook");
ret = hook_start(&h);
if (ret)
throw RuntimeError("Failed to start hook");
@ -215,7 +219,9 @@ check: if (optarg == endptr)
unsigned send = recv;
hook_process(&h, smps, (unsigned *) &send);
ret = hook_process(&h, smps, (unsigned *) &send);
if (ret < 0)
throw RuntimeError("Failed to process samples");
sent = io_print(&io, smps, send);
if (sent < 0)

View file

@ -184,6 +184,7 @@ int main(int argc, char *argv[])
if (ret)
throw RuntimeError("Failed to verify configuration");
sn.prepare();
sn.start();
sn.run();
sn.stop();

View file

@ -395,7 +395,7 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Invalid node configuration");
ret = node_init2(node);
ret = node_prepare(node);
if (ret)
throw RuntimeError("Failed to start node {}: reason={}", node_name(node), ret);

View file

@ -109,7 +109,7 @@ json_t * parse_cli(int argc, char *argv[])
continue;
check: if (optarg == endptr)
logger->warn("Failed to parse parse option argument '-%c %s'", c, optarg);
logger->warn("Failed to parse parse option argument '-{} {}'", c, optarg);
}
if (argc != optind + 1)
@ -237,7 +237,7 @@ int main(int argc, char *argv[])
if (ret)
throw RuntimeError("Failed to initialize pool");
ret = node_init2(&n);
ret = node_prepare(&n);
if (ret)
throw RuntimeError("Failed to start node {}: reason={}", node_name(&n), ret);

View file

@ -167,7 +167,7 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Failed to start node-type {}: reason={}", node_type_name(node->_vt), ret);
ret = node_init2(node);
ret = node_prepare(node);
if (ret)
throw RuntimeError("Failed to start node {}: reason={}", node_name(node), ret);

View file

@ -40,26 +40,30 @@ cat <<EOF > ${INPUT_FILE}
EOF
cat <<EOF > ${EXPECT_FILE}
1548104309.033621000(0) -0.488878 0.590769 -1.000000 0.597649 0.100588
1548104309.133998900(1) -0.492331 0.952914 -1.000000 0.196137 0.200966
1548104309.233542500(2) -0.486250 0.950063 -1.000000 -0.202037 0.300509
1548104309.334019400(3) -0.479840 0.582761 -1.000000 -0.603945 0.400986
1548104309.433952200(4) 0.513039 -0.005774 1.000000 -0.996324 0.500919
1548104309.533756400(5) 0.524631 -0.591455 1.000000 -0.597107 0.600723
1548104309.637440300(6) 0.507441 -0.959248 1.000000 -0.182372 0.704407
1548104309.736158700(7) 0.511616 -0.944805 1.000000 0.212502 0.803126
1548104309.833614900(8) 0.507615 -0.584824 1.000000 0.602327 0.900582
1548104309.934288200(9) -0.469575 0.007885 -1.000000 0.994980 0.001255
# seconds.nanoseconds+offset(sequence) average signal0 signal1 signal2 signal3 signal4
1548104309.033621000(0) 0.062250 0.022245 0.590769 -1.000000 0.597649 0.100588
1548104309.133998900(1) 0.073071 0.015339 0.952914 -1.000000 0.196137 0.200966
1548104309.233542500(2) 0.015207 0.027500 0.950063 -1.000000 -0.202037 0.300509
1548104309.334019400(3) -0.115976 0.040320 0.582761 -1.000000 -0.603945 0.400986
1548104309.433952200(4) 0.104980 0.026079 -0.005774 1.000000 -0.996324 0.500919
1548104309.533756400(5) 0.092285 0.049262 -0.591455 1.000000 -0.597107 0.600723
1548104309.637440300(6) 0.115534 0.014883 -0.959248 1.000000 -0.182372 0.704407
1548104309.736158700(7) 0.218811 0.023232 -0.944805 1.000000 0.212502 0.803126
1548104309.833614900(8) 0.386663 0.015231 -0.584824 1.000000 0.602327 0.900582
1548104309.934288200(9) 0.012994 0.060849 0.007885 -1.000000 0.994980 0.001255
EOF
# Average over first and third signal (mask = 0b101 = 5)
villas-hook -o mask=5 -o offset=0 average < ${INPUT_FILE} > ${OUTPUT_FILE}
villas-hook -o mask=5 -o offset=0 -o signals=0,1,2,3,4 average < ${INPUT_FILE} > ${OUTPUT_FILE}
# Compare only the data values
villas-test-cmp ${OUTPUT_FILE} ${EXPECT_FILE}
RC=$?
cat ${INPUT_FILE}
echo
cat ${OUTPUT_FILE}
rm -f ${INPUT_FILE} ${OUTPUT_FILE} ${EXPECT_FILE}
exit $RC

View file

@ -1,6 +1,6 @@
#!/bin/bash
#
# Integration test for convert hook.
# Integration test for cast hook.
#
# @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
# @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
@ -22,50 +22,42 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
##################################################################################
# We skip this test for now
echo "Test not yet supported"
exit 99
INPUT_FILE=$(mktemp)
OUTPUT_FILE=$(mktemp)
EXPECT_FILE=$(mktemp)
cat <<EOF > ${INPUT_FILE}
1490500399.776379108(0) 0.000000 0.000000 0.000000 0.000000
1490500399.876379108(1) 0.587785 0.587785 0.587785 0.587785
1490500399.976379108(2) 0.951057 0.951057 0.951057 0.951057
1490500400.076379108(3) 0.951057 0.951057 0.951057 0.951057
1490500400.176379108(4) 0.587785 0.587785 0.587785 0.587785
1490500400.276379108(5) 0.000000 0.000000 0.000000 0.000000
1490500400.376379108(6) -0.587785 -0.587785 -0.587785 -0.587785
1490500400.476379108(7) -0.951057 -0.951057 -0.951057 -0.951057
1490500400.576379108(8) -0.951057 -0.951057 -0.951057 -0.951057
1490500400.676379108(9) -0.587785 -0.587785 -0.587785 -0.587785
# seconds.nanoseconds+offset(sequence) signal0 signal1 signal2 signal3 signal4
1551015508.801653200(0) 0.022245 0.000000 -1.000000 1.000000 0.000000
1551015508.901653200(1) 0.015339 58.778500 -1.000000 0.600000 0.100000
1551015509.001653200(2) 0.027500 95.105700 -1.000000 0.200000 0.200000
1551015509.101653200(3) 0.040320 95.105700 -1.000000 -0.200000 0.300000
1551015509.201653200(4) 0.026079 58.778500 -1.000000 -0.600000 0.400000
1551015509.301653200(5) 0.049262 0.000000 1.000000 -1.000000 0.500000
1551015509.401653200(6) 0.014883 -58.778500 1.000000 -0.600000 0.600000
1551015509.501653200(7) 0.023232 -95.105700 1.000000 -0.200000 0.700000
1551015509.601653200(8) 0.015231 -95.105700 1.000000 0.200000 0.800000
1551015509.701653200(9) 0.060849 -58.778500 1.000000 0.600000 0.900000
EOF
cat <<EOF > ${EXPECT_FILE}
1490500399.776379108(0) 0.000000 0 0 0.000000
1490500399.876379108(1) 0.587785 58 58 0.587785
1490500399.976379108(2) 0.951057 95 95 0.951057
1490500400.076379108(3) 0.951057 95 95 0.951057
1490500400.176379108(4) 0.587785 58 58 0.587785
1490500400.276379108(5) 0.000000 0 0 0.000000
1490500400.376379108(6) -0.587785 -58 -58 -0.587785
1490500400.476379108(7) -0.951057 -95 -95 -0.951057
1490500400.576379108(8) -0.951057 -95 -95 -0.951057
1490500400.676379108(9) -0.587785 -58 -58 -0.587785
# seconds.nanoseconds+offset(sequence) signal0 test[V] signal2 signal3 signal4
1551015508.801653200(0) 0.022245 0 -1.000000 1.000000 0.000000
1551015508.901653200(1) 0.015339 58 -1.000000 0.600000 0.100000
1551015509.001653200(2) 0.027500 95 -1.000000 0.200000 0.200000
1551015509.101653200(3) 0.040320 95 -1.000000 -0.200000 0.300000
1551015509.201653200(4) 0.026079 58 -1.000000 -0.600000 0.400000
1551015509.301653200(5) 0.049262 0 1.000000 -1.000000 0.500000
1551015509.401653200(6) 0.014883 -58 1.000000 -0.600000 0.600000
1551015509.501653200(7) 0.023232 -95 1.000000 -0.200000 0.700000
1551015509.601653200(8) 0.015231 -95 1.000000 0.200000 0.800000
1551015509.701653200(9) 0.060849 -58 1.000000 0.600000 0.900000
EOF
cat ${INPUT_FILE} | \
villas-hook -o scale=100 scale | \
villas-hook cast | \
tee ${OUTPUT_FILE}
cat ${OUTPUT_FILE}
villas-hook cast -o new_name=test -o new_unit=V -o new_type=integer -o signal=1 < ${INPUT_FILE} > ${OUTPUT_FILE}
# Compare only the data values
villas-test-cmp ${OUTPUT_FILE} ${EXPECT_FILE}
RC=$?
rm -f ${INPUT_FILE} ${OUTPUT_FILE} ${EXPECT_FILE}

55
tests/integration/hook-dp.sh Executable file
View file

@ -0,0 +1,55 @@
#!/bin/bash
#
# Integration test for dp hook.
#
# @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
# @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
# @license GNU General Public License (version 3)
#
# VILLASnode
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
##################################################################################
set -x
#INPUT_FILE=$(mktemp)
#OUTPUT_FILE=$(mktemp)
#RECON_FILE=$(mktemp)
INPUT_FILE=in
OUTPUT_FILE=out
RECON_FILE=recon
NUM_SAMPLES=10000
RATE=5000
F0=50
OPTS="-o f0=${F0} -o rate=${RATE} -o signal=0 -o harmonics=0,1,3,5,7"
villas-signal sine -v1 -l ${NUM_SAMPLES} -f ${F0} -r ${RATE} -n > ${INPUT_FILE}
villas-hook dp -o inverse=false ${OPTS} < ${INPUT_FILE} > ${OUTPUT_FILE}
villas-hook dp -o inverse=true ${OPTS} < ${OUTPUT_FILE} > ${RECON_FILE}
exit 0
# Compare only the data values
villas-test-cmp ${OUTPUT_FILE} ${EXPECT_FILE}
RC=$?
rm -f ${INPUT_FILE} ${OUTPUT_FILE} ${EXPECT_FILE}
exit $RC

View file

@ -27,36 +27,37 @@ OUTPUT_FILE=$(mktemp)
EXPECT_FILE=$(mktemp)
cat <<EOF > ${INPUT_FILE}
1490500399.776379108(0) 0.000000 0.000000 0.000000 0.000000
1490500399.876379108(1) 0.587785 0.587785 0.587785 0.587785
1490500399.976379108(2) 0.951057 0.951057 0.951057 0.951057
1490500400.076379108(3) 0.951057 0.951057 0.951057 0.951057
1490500400.176379108(4) 0.587785 0.587785 0.587785 0.587785
1490500400.276379108(5) 0.000000 0.000000 0.000000 0.000000
1490500400.376379108(6) -0.587785 -0.587785 -0.587785 -0.587785
1490500400.476379108(7) -0.951057 -0.951057 -0.951057 -0.951057
1490500400.576379108(8) -0.951057 -0.951057 -0.951057 -0.951057
1490500400.676379108(9) -0.587785 -0.587785 -0.587785 -0.587785
# seconds.nanoseconds(sequence) random sine square triangle ramp
1551015508.801653200(0) 0.022245 0.000000 -1.000000 1.000000 0.000000
1551015508.901653200(1) 0.015339 0.587785 -1.000000 0.600000 0.100000
1551015509.001653200(2) 0.027500 0.951057 -1.000000 0.200000 0.200000
1551015509.101653200(3) 0.040320 0.951057 -1.000000 -0.200000 0.300000
1551015509.201653200(4) 0.026079 0.587785 -1.000000 -0.600000 0.400000
1551015509.301653200(5) 0.049262 0.000000 1.000000 -1.000000 0.500000
1551015509.401653200(6) 0.014883 -0.587785 1.000000 -0.600000 0.600000
1551015509.501653200(7) 0.023232 -0.951057 1.000000 -0.200000 0.700000
1551015509.601653200(8) 0.015231 -0.951057 1.000000 0.200000 0.800000
1551015509.701653200(9) 0.060849 -0.587785 1.000000 0.600000 0.900000
EOF
cat <<EOF > ${EXPECT_FILE}
1490500399.776379108-1.490500e+09(0) -10.000000 -10.000000 -10.000000 -10.000000
1490500399.876379108-1.490500e+09(1) -4.122150 -4.122150 -4.122150 -4.122150
1490500399.976379108-1.490500e+09(2) -0.489430 -0.489430 -0.489430 -0.489430
1490500400.076379108-1.490500e+09(3) -0.489430 -0.489430 -0.489430 -0.489430
1490500400.176379108-1.490500e+09(4) -4.122150 -4.122150 -4.122150 -4.122150
1490500400.276379108-1.490500e+09(5) -10.000000 -10.000000 -10.000000 -10.000000
1490500400.376379108-1.490500e+09(6) -15.877850 -15.877850 -15.877850 -15.877850
1490500400.476379108-1.490500e+09(7) -19.510570 -19.510570 -19.510570 -19.510570
1490500400.576379108-1.490500e+09(8) -19.510570 -19.510570 -19.510570 -19.510570
1490500400.676379108-1.490500e+09(9) -15.877850 -15.877850 -15.877850 -15.877850
# seconds.nanoseconds+offset(sequence) signal0 signal1 signal2 signal3 signal4
1551015508.801653200(0) 0.022245 0.000000 -1.000000 1.000000 55.000000
1551015508.901653200(1) 0.015339 0.587785 -1.000000 0.600000 65.000000
1551015509.001653200(2) 0.027500 0.951057 -1.000000 0.200000 75.000000
1551015509.101653200(3) 0.040320 0.951057 -1.000000 -0.200000 85.000000
1551015509.201653200(4) 0.026079 0.587785 -1.000000 -0.600000 95.000000
1551015509.301653200(5) 0.049262 0.000000 1.000000 -1.000000 105.000000
1551015509.401653200(6) 0.014883 -0.587785 1.000000 -0.600000 115.000000
1551015509.501653200(7) 0.023232 -0.951057 1.000000 -0.200000 125.000000
1551015509.601653200(8) 0.015231 -0.951057 1.000000 0.200000 135.000000
1551015509.701653200(9) 0.060849 -0.587785 1.000000 0.600000 145.000000
EOF
villas-hook -o scale=10 -o offset=-10 scale < ${INPUT_FILE} > ${OUTPUT_FILE}
villas-hook scale -o scale=100 -o offset=55 -o signal=signal4 < ${INPUT_FILE} > ${OUTPUT_FILE}
# Compare only the data values
villas-test-cmp ${OUTPUT_FILE} ${EXPECT_FILE}
RC=$?
rm -f ${INPUT_FILE} ${OUTPUT_FILE} ${EXPECT_FILE}

View file

@ -88,7 +88,7 @@ Test(mapping, parse_nodes)
cr_assert_eq(m.node, vlist_lookup(&nodes, "carrot"));
cr_assert_eq(m.type, MAPPING_TYPE_DATA);
cr_assert_eq(m.data.offset, 0);
cr_assert_eq(m.length, vlist_length(&m.node->in.signals));
cr_assert_eq(m.length, -1);
ret = mapping_parse_str(&m, "carrot.data[sole]", &nodes);
cr_assert_eq(ret, 0);
@ -151,13 +151,13 @@ Test(mapping, parse)
cr_assert_eq(ret, 0);
cr_assert_eq(m.type, MAPPING_TYPE_DATA);
cr_assert_eq(m.data.offset, 0);
cr_assert_eq(m.length, 0);
cr_assert_eq(m.length, -1);
ret = mapping_parse_str(&m, "data[]", nullptr);
cr_assert_eq(ret, 0);
cr_assert_eq(m.type, MAPPING_TYPE_DATA);
cr_assert_eq(m.data.offset, 0);
cr_assert_eq(m.length, 0);
cr_assert_eq(m.length, -1);
ret = mapping_parse_str(&m, "data[1.1-2f]", nullptr);
cr_assert_neq(ret, 0);
@ -182,5 +182,5 @@ Test(mapping, parse)
/* Negative length of chunk */
ret = mapping_parse_str(&m, "data[5-3]", nullptr);
cr_assert_eq(ret, -1);
cr_assert_neq(ret, 0);
}