mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Merge branch 'develop' of git.rwth-aachen.de:/acs/public/villas/VILLASnode into develop
This commit is contained in:
commit
0c53172816
73 changed files with 1476 additions and 602 deletions
|
@ -4,7 +4,7 @@ variables:
|
|||
PREFIX: /usr/
|
||||
RSYNC_OPTS: --recursive --ignore-missing-args --chown ${DEPLOY_USER}:${DEPLOY_USER}
|
||||
CRITERION_OPTS: --ignore-warnings
|
||||
DOCKER_TAG: $CI_COMMIT_TAG
|
||||
DOCKER_TAG: ${CI_COMMIT_TAG}
|
||||
DOCKER_TAG_DEV: ${CI_COMMIT_REF_NAME}
|
||||
DOCKER_IMAGE: villas/node
|
||||
DOCKER_IMAGE_DEV: villas/node-dev
|
||||
|
@ -28,7 +28,9 @@ before_script:
|
|||
docker-dev:
|
||||
stage: prepare
|
||||
script:
|
||||
- docker build -f packaging/docker/Dockerfile.dev -t ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV} .
|
||||
- docker build
|
||||
--file packaging/docker/Dockerfile.dev
|
||||
--tag ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV} .
|
||||
tags:
|
||||
- shell
|
||||
- linux
|
||||
|
@ -105,18 +107,6 @@ test:unit:
|
|||
tags:
|
||||
- docker
|
||||
|
||||
test:unit-common:
|
||||
stage: test
|
||||
dependencies:
|
||||
- build:source
|
||||
script:
|
||||
- mkdir -p build && cd build
|
||||
- cmake .. && make unit-tests-common
|
||||
- "common/tests/unit-tests-common || true"
|
||||
image: ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV}
|
||||
tags:
|
||||
- docker
|
||||
|
||||
test:integration:
|
||||
stage: test
|
||||
dependencies:
|
||||
|
@ -129,7 +119,7 @@ test:integration:
|
|||
name: ${CI_PROJECT_NAME}-integration-tests-${CI_BUILD_REF}
|
||||
when: always
|
||||
paths:
|
||||
- build/release/tests/integration/
|
||||
- build/tests/integration/
|
||||
image: ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV}
|
||||
tags:
|
||||
- docker
|
||||
|
@ -170,8 +160,10 @@ deploy:packages:
|
|||
docker:
|
||||
stage: docker
|
||||
script:
|
||||
- docker build -f packaging/docker/Dockerfile.app -t ${DOCKER_IMAGE}:${DOCKER_TAG} .
|
||||
- docker push ${DOCKER_IMAGE}:${DOCKER_TAG}
|
||||
- docker build
|
||||
--build-arg BUILDER_IMAGE=${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV}
|
||||
--file packaging/docker/Dockerfile.app
|
||||
--tag ${DOCKER_IMAGE}:${DOCKER_TAG_DEV} .
|
||||
- docker push ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV}
|
||||
tags:
|
||||
- shell
|
||||
|
|
2
common
2
common
|
@ -1 +1 @@
|
|||
Subproject commit 02122f404c833a90482e325cd8327fea36575e31
|
||||
Subproject commit 27751548785eebaeb160f32b923c7949de4faa5e
|
|
@ -56,6 +56,13 @@ if(DOXYGEN_FOUND)
|
|||
WORKING_DIRECTORY ${PROJECT_DIR}
|
||||
)
|
||||
|
||||
# Ensure that documentation is built before installing it
|
||||
install(CODE "execute_process(
|
||||
COMMAND ${CMAKE_COMMAND} --build \"${CMAKE_CURRENT_BINARY_DIR}\" --target doc
|
||||
WORKING_DIRECTORY \"${CMAKE_CURRENT_BINARY_DIR}\"
|
||||
)"
|
||||
)
|
||||
|
||||
install(
|
||||
DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/html
|
||||
DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/doc/villas/node
|
||||
|
|
78
etc/labs/lab10_nodes.conf
Normal file
78
etc/labs/lab10_nodes.conf
Normal file
|
@ -0,0 +1,78 @@
|
|||
stats = 5.0;
|
||||
hugepages = 200;
|
||||
|
||||
nodes = {
|
||||
# Node names can be any alphanumeric value
|
||||
rpi-1 = {
|
||||
type = "socket";
|
||||
layer = "udp";
|
||||
format = "gtnet" # pre-built format to communicate in RTDS GTNET-SKT payload
|
||||
|
||||
in = {
|
||||
address = "*:12005" # villas node machine IP and port number
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "192.168.0.5:12005" # remote machine IP and port number
|
||||
},
|
||||
|
||||
hooks = (
|
||||
{
|
||||
type = "stats",
|
||||
warmup = 3000
|
||||
}
|
||||
)
|
||||
},
|
||||
rpi-2 = {
|
||||
type = "socket";
|
||||
layer = "udp";
|
||||
format = "gtnet" # pre-built format to communicate in RTDS GTNET-SKT payload
|
||||
|
||||
in = {
|
||||
address = "*:12006" # villas node machine IP and port number
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "192.168.0.6:12006" # remote machine IP and port number
|
||||
},
|
||||
|
||||
hooks = (
|
||||
{
|
||||
type = "stats",
|
||||
warmup = 3000
|
||||
}
|
||||
)
|
||||
},
|
||||
rtds-1 = {
|
||||
type = "socket";
|
||||
layer = "udp";
|
||||
format = "gtnet";
|
||||
|
||||
in = {
|
||||
address = "*:12083" # villas node machine IP and port number
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "192.168.0.4:12083" # remote machine IP and port number
|
||||
},
|
||||
|
||||
hooks = (
|
||||
{
|
||||
type = "stats",
|
||||
warmup = 3000
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
22
etc/labs/lab10_path_bidir.conf
Normal file
22
etc/labs/lab10_path_bidir.conf
Normal file
|
@ -0,0 +1,22 @@
|
|||
@include "lab10_nodes.conf"
|
||||
|
||||
paths = (
|
||||
# Each path dictionary corresponds to one way communnication
|
||||
{
|
||||
in = [ "rpi-1" ],
|
||||
out = [ "rtds-1" ]
|
||||
},
|
||||
{
|
||||
in = [ "rtds-1" ],
|
||||
out = [ "rpi-1" ]
|
||||
}
|
||||
|
||||
# Alternatively, you can use a single path specification
|
||||
# and set reverse = true
|
||||
# Example:
|
||||
# {
|
||||
# in = [ "rpi-1" ],
|
||||
# out = [ "rtds-1" ],
|
||||
# reverse = true
|
||||
# }
|
||||
)
|
12
etc/labs/lab10_path_hook.conf
Normal file
12
etc/labs/lab10_path_hook.conf
Normal file
|
@ -0,0 +1,12 @@
|
|||
@include "lab10_nodes.conf"
|
||||
|
||||
paths = (
|
||||
{
|
||||
in = [ "rpi-1" ],
|
||||
out = [ "rtds-1" ],
|
||||
|
||||
hooks = (
|
||||
{ type = "print", output = "stdout" }
|
||||
)
|
||||
}
|
||||
)
|
8
etc/labs/lab10_path_multiple_destinations.conf
Normal file
8
etc/labs/lab10_path_multiple_destinations.conf
Normal file
|
@ -0,0 +1,8 @@
|
|||
@include "lab10_nodes.conf"
|
||||
|
||||
paths = (
|
||||
{
|
||||
in = [ "rtds-1" ],
|
||||
out = [ "rpi-1", "rpi-2" ]
|
||||
}
|
||||
)
|
8
etc/labs/lab10_path_uni.conf
Normal file
8
etc/labs/lab10_path_uni.conf
Normal file
|
@ -0,0 +1,8 @@
|
|||
@include "lab10_nodes.conf"
|
||||
|
||||
paths = (
|
||||
{
|
||||
in = [ "rpi-1" ],
|
||||
out = [ "rtds-1" ]
|
||||
}
|
||||
)
|
83
etc/labs/lab11.conf
Normal file
83
etc/labs/lab11.conf
Normal file
|
@ -0,0 +1,83 @@
|
|||
nodes = {
|
||||
rtds_gtnet1 = {
|
||||
type = "socket",
|
||||
layer = "udp",
|
||||
header = "gtnet-skt",
|
||||
|
||||
in = {
|
||||
address = "*:12000",
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "134.130.169.89:12000"
|
||||
}
|
||||
},
|
||||
rtds_gtnet2 = {
|
||||
type = "socket",
|
||||
layer = "udp",
|
||||
header = "gtnet-skt",
|
||||
|
||||
in = {
|
||||
address = "*:12001",
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "134.130.169.90:12001"
|
||||
}
|
||||
},
|
||||
monitoring = {
|
||||
type = "websocket"
|
||||
},
|
||||
monitoring_log = {
|
||||
type = "file",
|
||||
|
||||
out = {
|
||||
uri = "ftp://acs:fake@134.130.169.32/var/villas/log/monitoring_%Y-%m-%d_%H_%M_%S.dat"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
paths = [
|
||||
{
|
||||
# Combine data from rtds_gtnet1 and rtds_gtnet2
|
||||
in = [
|
||||
"rtds_gtnet1.hdr.ts.origin",
|
||||
"rtds_gtnet1.hdr.sequence",
|
||||
"rtds_gtnet1.data[0-6]",
|
||||
|
||||
"rtds_gtnet2.hdr.ts.origin",
|
||||
"rtds_gtnet2.hdr.sequence",
|
||||
"rtds_gtnet2.data[0-6]",
|
||||
],
|
||||
|
||||
out = [
|
||||
"monitoring",
|
||||
"monitoring_log"
|
||||
],
|
||||
|
||||
reverse = false,
|
||||
|
||||
# The mode of a path determines when the path is triggered
|
||||
# and forwarding samples to its destintation nodes.
|
||||
mode = "any",
|
||||
|
||||
# List of nodes which trigger the path
|
||||
mask = [ "rtds_gtnet_1", "rtds_gtnet_2" ],
|
||||
|
||||
hooks = (
|
||||
# We dont want to overload the WebBrowsers
|
||||
{
|
||||
type = "decimate",
|
||||
ratio = 10
|
||||
}
|
||||
)
|
||||
}
|
||||
]
|
48
etc/labs/lab12.conf
Normal file
48
etc/labs/lab12.conf
Normal file
|
@ -0,0 +1,48 @@
|
|||
nodes = {
|
||||
udp_node1 = {
|
||||
type = "socket",
|
||||
layer = "udp",
|
||||
|
||||
in = {
|
||||
address = "*:12000"
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "127.0.0.1:12001"
|
||||
}
|
||||
},
|
||||
web_node1 = {
|
||||
type = "websocket",
|
||||
|
||||
vectorize = 2,
|
||||
series = (
|
||||
{ label = "Random walk", unit = "V" },
|
||||
{ label = "Sine", unit = "A" },
|
||||
{ label = "Rect", unit = "Var" },
|
||||
{ label = "Ramp", unit = "°C" }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
paths = (
|
||||
{
|
||||
in = [ "udp_node1" ],
|
||||
out = [ "web_node1" ],
|
||||
|
||||
hooks = (
|
||||
# We dont want to overload the WebBrowsers
|
||||
{ type = "decimate", ratio = 2 }
|
||||
)
|
||||
},
|
||||
|
||||
{
|
||||
in = [ "web_node1" ],
|
||||
out = [ "udp_node1" ]
|
||||
|
||||
# Web -> UDP does not require decimation
|
||||
}
|
||||
)
|
47
etc/labs/lab13.conf
Normal file
47
etc/labs/lab13.conf
Normal file
|
@ -0,0 +1,47 @@
|
|||
affinity = 0x8,
|
||||
|
||||
nodes = {
|
||||
rtds_gtnet1 = {
|
||||
type = "socket",
|
||||
layer = "udp",
|
||||
header = "gtnet-skt",
|
||||
|
||||
in = {
|
||||
address = "*:12000"
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "134.130.169.98:12000"
|
||||
}
|
||||
},
|
||||
rtds_gtnet2 = {
|
||||
type = "socket",
|
||||
layer = "udp",
|
||||
header = "gtnet-skt",
|
||||
|
||||
in = {
|
||||
address = "*:12001"
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "134.130.169.99:12001"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
paths = (
|
||||
{
|
||||
in = [ "rtds_gtnet1" ],
|
||||
out = [ "rtds_gtnet2" ],
|
||||
|
||||
reverse = true
|
||||
}
|
||||
)
|
18
etc/labs/lab3.conf
Normal file
18
etc/labs/lab3.conf
Normal file
|
@ -0,0 +1,18 @@
|
|||
nodes = {
|
||||
udp_node1 = {
|
||||
type = "socket",
|
||||
layer = "udp",
|
||||
|
||||
in = {
|
||||
address = "*:12000"
|
||||
|
||||
signals = {
|
||||
count = 3
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "127.0.0.1:12001"
|
||||
}
|
||||
}
|
||||
}
|
BIN
etc/labs/lab3.pcap
Normal file
BIN
etc/labs/lab3.pcap
Normal file
Binary file not shown.
18
etc/labs/lab4.conf
Normal file
18
etc/labs/lab4.conf
Normal file
|
@ -0,0 +1,18 @@
|
|||
nodes = {
|
||||
udp_node1 = {
|
||||
type = "socket",
|
||||
layer = "udp",
|
||||
|
||||
in = {
|
||||
address = "*:12000"
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "127.0.0.1:12001"
|
||||
}
|
||||
}
|
||||
}
|
19
etc/labs/lab5.conf
Normal file
19
etc/labs/lab5.conf
Normal file
|
@ -0,0 +1,19 @@
|
|||
nodes = {
|
||||
rtds_gtnet1 = {
|
||||
type = "socket",
|
||||
layer = "udp",
|
||||
header = "gtnet-skt",
|
||||
|
||||
in = {
|
||||
address = "*:12000"
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "134.130.169.89:12000"
|
||||
}
|
||||
}
|
||||
}
|
9
etc/labs/lab7.conf
Normal file
9
etc/labs/lab7.conf
Normal file
|
@ -0,0 +1,9 @@
|
|||
nodes = {
|
||||
file_node1 = {
|
||||
type = "file",
|
||||
|
||||
in = {
|
||||
uri = "file_send.dat"
|
||||
}
|
||||
}
|
||||
}
|
41
etc/labs/lab8.conf
Normal file
41
etc/labs/lab8.conf
Normal file
|
@ -0,0 +1,41 @@
|
|||
nodes = {
|
||||
udp_node1 = {
|
||||
type = "socket",
|
||||
layer = "udp",
|
||||
|
||||
in = {
|
||||
address = "*:12000"
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "127.0.0.1:12001"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
paths = [
|
||||
{
|
||||
in = [ "udp_node1" ],
|
||||
out = [ "udp_node1" ],
|
||||
|
||||
hooks = [
|
||||
{
|
||||
type = "decimate",
|
||||
priority = 1,
|
||||
|
||||
# Hook specific parameters follow
|
||||
# [paramter1] = [value1]
|
||||
ratio = 2
|
||||
},
|
||||
{
|
||||
type = "map",
|
||||
|
||||
mapping = [ "data[3]", "data[2]", "data[1]", "data[0]", "hdr.sequence", "ts.origin" ]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
28
etc/labs/lab9_netem.conf
Normal file
28
etc/labs/lab9_netem.conf
Normal file
|
@ -0,0 +1,28 @@
|
|||
nodes = {
|
||||
udp_node1 = {
|
||||
type = "socket",
|
||||
layer = "udp",
|
||||
|
||||
in = {
|
||||
address = "*:12000"
|
||||
|
||||
signals = {
|
||||
count = 8,
|
||||
type = "float"
|
||||
}
|
||||
},
|
||||
out = {
|
||||
address = "127.0.0.1:12001",
|
||||
|
||||
netem = {
|
||||
enabled = true,
|
||||
loss = 0, # in %
|
||||
corrupt = 0, # in %
|
||||
duplicate = 0, # in %
|
||||
delay = 100000, # in uS
|
||||
jitter = 5000, # in uS
|
||||
distribution = "normal"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,4 +1,6 @@
|
|||
{
|
||||
"idle_stop" : false,
|
||||
|
||||
"http" : {
|
||||
"port" : 8080
|
||||
},
|
||||
|
|
|
@ -27,6 +27,8 @@
|
|||
#include <vector>
|
||||
#include <map>
|
||||
|
||||
#include <sys/socket.h>
|
||||
#include <sys/un.h>
|
||||
#include <poll.h>
|
||||
|
||||
#include <villas/common.h>
|
||||
|
@ -62,6 +64,8 @@ protected:
|
|||
void acceptNewSession();
|
||||
void closeSession(sessions::Socket *s);
|
||||
|
||||
struct sockaddr_un getSocketAddress();
|
||||
|
||||
public:
|
||||
Server(Api *a);
|
||||
~Server();
|
||||
|
|
|
@ -45,16 +45,6 @@ enum mapping_type {
|
|||
MAPPING_TYPE_TIMESTAMP
|
||||
};
|
||||
|
||||
enum mapping_stats_type {
|
||||
MAPPING_STATS_TYPE_LAST,
|
||||
MAPPING_STATS_TYPE_HIGHEST,
|
||||
MAPPING_STATS_TYPE_LOWEST,
|
||||
MAPPING_STATS_TYPE_MEAN,
|
||||
MAPPING_STATS_TYPE_VAR,
|
||||
MAPPING_STATS_TYPE_STDDEV,
|
||||
MAPPING_STATS_TYPE_TOTAL
|
||||
};
|
||||
|
||||
enum mapping_header_type {
|
||||
MAPPING_HEADER_TYPE_LENGTH,
|
||||
MAPPING_HEADER_TYPE_SEQUENCE
|
||||
|
@ -84,8 +74,8 @@ struct mapping_entry {
|
|||
} data;
|
||||
|
||||
struct {
|
||||
enum stats_id id;
|
||||
enum mapping_stats_type type;
|
||||
enum stats_metric metric;
|
||||
enum stats_type type;
|
||||
} stats;
|
||||
|
||||
struct {
|
||||
|
|
|
@ -84,12 +84,14 @@ struct node {
|
|||
|
||||
struct node_direction in, out;
|
||||
|
||||
#ifdef WITH_NETEM
|
||||
int mark; /**< Socket mark for netem, routing and filtering */
|
||||
#ifdef __linux__
|
||||
int fwmark; /**< Socket mark for netem, routing and filtering */
|
||||
|
||||
#ifdef WITH_NETEM
|
||||
struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */
|
||||
struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */
|
||||
#endif /* WITH_NETEM */
|
||||
#endif /* __linux__ */
|
||||
|
||||
struct node_type *_vt; /**< Virtual functions (C++ OOP style) */
|
||||
void *_vd; /**< Virtual data (used by struct node::_vt functions) */
|
||||
|
@ -201,6 +203,8 @@ struct node_type * node_type(struct node *n);
|
|||
|
||||
struct memory_type * node_memory_type(struct node *n, struct memory_type *parent);
|
||||
|
||||
int node_is_valid_name(const char *name);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -31,7 +31,9 @@
|
|||
|
||||
#include <jansson.h>
|
||||
|
||||
#include <villas/stats.h>
|
||||
#include <villas/task.h>
|
||||
#include <villas/list.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -42,13 +44,20 @@ struct node;
|
|||
struct sample;
|
||||
struct super_node;
|
||||
|
||||
struct stats_node_signal {
|
||||
struct node *node;
|
||||
char *node_str;
|
||||
|
||||
enum stats_metric metric;
|
||||
enum stats_type type;
|
||||
};
|
||||
|
||||
struct stats_node {
|
||||
double rate;
|
||||
char *node_str;
|
||||
|
||||
struct task task;
|
||||
|
||||
struct node *node;
|
||||
struct vlist signals; /** List of type struct stats_node_signal */
|
||||
};
|
||||
|
||||
/** @see node_type::print */
|
||||
|
@ -60,6 +69,8 @@ char *stats_node_print(struct node *n);
|
|||
/** @see node_type::parse */
|
||||
int stats_node_parse(struct node *n, json_t *cfg);
|
||||
|
||||
int stats_node_parse_signal(struct stats_node_signal *s, json_t *cfg);
|
||||
|
||||
/** @see node_type::start */
|
||||
int stats_node_start(struct node *n);
|
||||
|
||||
|
|
|
@ -172,6 +172,8 @@ int path_uses_node(struct path *p, struct node *n);
|
|||
*/
|
||||
int path_parse(struct path *p, json_t *cfg, struct vlist *nodes);
|
||||
|
||||
int path_is_simple(struct path *p);
|
||||
|
||||
/** @} */
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include <jansson.h>
|
||||
|
||||
#include <villas/hist.h>
|
||||
#include <villas/signal.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -42,44 +43,68 @@ enum stats_format {
|
|||
STATS_FORMAT_MATLAB
|
||||
};
|
||||
|
||||
enum stats_id {
|
||||
STATS_SKIPPED, /**< Counter for skipped samples due to hooks. */
|
||||
STATS_REORDERED, /**< Counter for reordered samples. */
|
||||
STATS_GAP_SAMPLE, /**< Histogram for inter sample timestamps (as sent by remote). */
|
||||
STATS_GAP_RECEIVED, /**< Histogram for inter sample arrival time (as seen by this instance). */
|
||||
STATS_OWD, /**< Histogram for one-way-delay (OWD) of received samples. */
|
||||
STATS_COUNT /**< Just here to have an updated number of statistics. */
|
||||
enum stats_metric {
|
||||
STATS_METRIC_INVALID = -1,
|
||||
STATS_METRIC_SKIPPED, /**< Counter for skipped samples due to hooks. */
|
||||
STATS_METRIC_REORDERED, /**< Counter for reordered samples. */
|
||||
STATS_METRIC_GAP_SAMPLE, /**< Histogram for inter sample timestamps (as sent by remote). */
|
||||
STATS_METRIC_GAP_RECEIVED, /**< Histogram for inter sample arrival time (as seen by this instance). */
|
||||
STATS_METRIC_OWD, /**< Histogram for one-way-delay (OWD) of received samples. */
|
||||
STATS_METRIC_COUNT /**< Just here to have an updated number of statistics. */
|
||||
};
|
||||
|
||||
struct stats_desc {
|
||||
enum stats_type {
|
||||
STATS_TYPE_INVALID = -1,
|
||||
STATS_TYPE_LAST,
|
||||
STATS_TYPE_HIGHEST,
|
||||
STATS_TYPE_LOWEST,
|
||||
STATS_TYPE_MEAN,
|
||||
STATS_TYPE_VAR,
|
||||
STATS_TYPE_STDDEV,
|
||||
STATS_TYPE_TOTAL,
|
||||
STATS_TYPE_COUNT
|
||||
};
|
||||
|
||||
struct stats_metric_description {
|
||||
const char *name;
|
||||
enum stats_metric metric;
|
||||
const char *unit;
|
||||
const char *desc;
|
||||
int hist_buckets;
|
||||
};
|
||||
|
||||
struct stats_delta {
|
||||
double values[STATS_COUNT];
|
||||
struct stats_type_description {
|
||||
const char *name;
|
||||
enum stats_type type;
|
||||
enum signal_type signal_type;
|
||||
};
|
||||
|
||||
int update; /**< Bitmask of stats_id. Only those which are masked will be updated */
|
||||
struct stats_delta {
|
||||
double values[STATS_METRIC_COUNT];
|
||||
|
||||
int update; /**< Bitmask of stats_metric. Only those which are masked will be updated */
|
||||
};
|
||||
|
||||
struct stats {
|
||||
struct hist histograms[STATS_COUNT];
|
||||
struct hist histograms[STATS_METRIC_COUNT];
|
||||
|
||||
struct stats_delta *delta;
|
||||
};
|
||||
|
||||
extern
|
||||
struct stats_desc stats_metrics[];
|
||||
extern struct stats_metric_description stats_metrics[];
|
||||
extern struct stats_type_description stats_types[];
|
||||
|
||||
int stats_lookup_format(const char *str);
|
||||
|
||||
enum stats_metric stats_lookup_metric(const char *str);
|
||||
|
||||
enum stats_type stats_lookup_type(const char *str);
|
||||
|
||||
int stats_init(struct stats *s, int buckets, int warmup);
|
||||
|
||||
int stats_destroy(struct stats *s);
|
||||
|
||||
void stats_update(struct stats *s, enum stats_id id, double val);
|
||||
void stats_update(struct stats *s, enum stats_metric id, double val);
|
||||
|
||||
void stats_collect(struct stats *s, struct sample *smps[], size_t *cnt);
|
||||
|
||||
|
@ -95,7 +120,7 @@ void stats_print_periodic(struct stats *s, FILE *f, enum stats_format fmt, int v
|
|||
|
||||
void stats_print(struct stats *s, FILE *f, enum stats_format fmt, int verbose);
|
||||
|
||||
enum stats_id stats_lookup_id(const char *name);
|
||||
union signal_data stats_get_value(const struct stats *s, enum stats_metric sm, enum stats_type st);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -40,10 +40,11 @@ class SuperNode {
|
|||
protected:
|
||||
enum state state;
|
||||
|
||||
int idleStop;
|
||||
|
||||
int priority; /**< Process priority (lower is better) */
|
||||
int affinity; /**< Process affinity of the server and all created threads */
|
||||
int hugepages; /**< Number of hugepages to reserve. */
|
||||
double stats; /**< Interval for path statistics. Set to 0 to disable them. */
|
||||
|
||||
Logger logger;
|
||||
|
||||
|
@ -104,6 +105,11 @@ public:
|
|||
/** Run periodic hooks of this super node. */
|
||||
int periodic();
|
||||
|
||||
void setState(enum state st)
|
||||
{
|
||||
state = st;
|
||||
}
|
||||
|
||||
struct node * getNode(const std::string &name)
|
||||
{
|
||||
return (struct node *) vlist_lookup(&nodes, name.c_str());
|
||||
|
@ -122,6 +128,10 @@ public:
|
|||
return &interfaces;
|
||||
}
|
||||
|
||||
enum state getState() {
|
||||
return state;
|
||||
}
|
||||
|
||||
#ifdef WITH_API
|
||||
Api * getApi() {
|
||||
return &api;
|
||||
|
|
|
@ -21,9 +21,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*********************************************************************************/
|
||||
|
||||
#include <sys/socket.h>
|
||||
#include <sys/un.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <exception>
|
||||
#include <algorithm>
|
||||
|
@ -79,9 +78,36 @@ void Server::start()
|
|||
|
||||
pfds.push_back(pfd);
|
||||
|
||||
struct sockaddr_un sun = getSocketAddress();
|
||||
|
||||
ret = bind(sd, (struct sockaddr *) &sun, sizeof(struct sockaddr_un));
|
||||
if (ret)
|
||||
throw SystemError("Failed to bind API socket");
|
||||
|
||||
ret = listen(sd, 5);
|
||||
if (ret)
|
||||
throw SystemError("Failed to listen on API socket");
|
||||
|
||||
logger->info("Listening on UNIX socket: {}", sun.sun_path);
|
||||
|
||||
state = STATE_STARTED;
|
||||
}
|
||||
|
||||
struct sockaddr_un Server::getSocketAddress()
|
||||
{
|
||||
struct sockaddr_un sun = { .sun_family = AF_UNIX };
|
||||
|
||||
fs::path socketPath = PREFIX "/var/lib/villas";
|
||||
fs::path socketPath;
|
||||
|
||||
if (getuid() == 0) {
|
||||
socketPath = PREFIX "/var/lib/villas";
|
||||
}
|
||||
else {
|
||||
std::string homeDir = getenv("HOME");
|
||||
|
||||
socketPath = homeDir + "/.villas";
|
||||
}
|
||||
|
||||
if (!fs::exists(socketPath)) {
|
||||
logging.get("api")->info("Creating directory for API socket: {}", socketPath);
|
||||
fs::create_directories(socketPath);
|
||||
|
@ -96,17 +122,7 @@ void Server::start()
|
|||
|
||||
strncpy(sun.sun_path, socketPath.c_str(), sizeof(sun.sun_path) - 1);
|
||||
|
||||
ret = bind(sd, (struct sockaddr *) &sun, sizeof(struct sockaddr_un));
|
||||
if (ret)
|
||||
throw SystemError("Failed to bind API socket");
|
||||
|
||||
ret = listen(sd, 5);
|
||||
if (ret)
|
||||
throw SystemError("Failed to listen on API socket");
|
||||
|
||||
logger->info("Listening on UNIX socket: {}", socketPath);
|
||||
|
||||
state = STATE_STARTED;
|
||||
return sun;
|
||||
}
|
||||
|
||||
void Server::stop()
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
#include <villas/sample.h>
|
||||
|
||||
struct average {
|
||||
int mask;
|
||||
uint64_t mask;
|
||||
int offset;
|
||||
};
|
||||
|
||||
|
@ -42,7 +42,7 @@ static int average_parse(struct hook *h, json_t *cfg)
|
|||
int ret;
|
||||
json_error_t err;
|
||||
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s: i, s: i }",
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s: i, s: I }",
|
||||
"offset", &p->offset,
|
||||
"mask", &p->mask
|
||||
);
|
||||
|
@ -62,7 +62,7 @@ static int average_process(struct hook *h, struct sample *smps[], unsigned *cnt)
|
|||
int n = 0;
|
||||
|
||||
for (int k = 0; k < smp->length; k++) {
|
||||
if (!(p->mask & (1 << k)))
|
||||
if (!(p->mask & (1LL << k)))
|
||||
continue;
|
||||
|
||||
switch (sample_format(smps[i], k)) {
|
||||
|
|
|
@ -168,18 +168,18 @@ static int stats_collect_process(struct hook *h, struct sample *smps[], unsigned
|
|||
|
||||
if (previous) {
|
||||
if (current->flags & previous->flags & SAMPLE_HAS_TS_RECEIVED)
|
||||
stats_update(s, STATS_GAP_RECEIVED, time_delta(&previous->ts.received, ¤t->ts.received));
|
||||
stats_update(s, STATS_METRIC_GAP_RECEIVED, time_delta(&previous->ts.received, ¤t->ts.received));
|
||||
|
||||
if (current->flags & previous->flags & SAMPLE_HAS_TS_ORIGIN)
|
||||
stats_update(s, STATS_GAP_SAMPLE, time_delta(&previous->ts.origin, ¤t->ts.origin));
|
||||
stats_update(s, STATS_METRIC_GAP_SAMPLE, time_delta(&previous->ts.origin, ¤t->ts.origin));
|
||||
|
||||
if ((current->flags & SAMPLE_HAS_TS_ORIGIN) && (current->flags & SAMPLE_HAS_TS_RECEIVED))
|
||||
stats_update(s, STATS_OWD, time_delta(¤t->ts.origin, ¤t->ts.received));
|
||||
stats_update(s, STATS_METRIC_OWD, time_delta(¤t->ts.origin, ¤t->ts.received));
|
||||
|
||||
if (current->flags & previous->flags & SAMPLE_HAS_SEQUENCE) {
|
||||
dist = current->sequence - (int32_t) previous->sequence;
|
||||
if (dist != 1)
|
||||
stats_update(s, STATS_REORDERED, dist);
|
||||
stats_update(s, STATS_METRIC_REORDERED, dist);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -73,23 +73,23 @@ int if_start(struct interface *i)
|
|||
//if_set_affinity(i, i->affinity);
|
||||
|
||||
/* Assign fwmark's to nodes which have netem options */
|
||||
int ret, mark = 0;
|
||||
int ret, fwmark = 0;
|
||||
for (size_t j = 0; j < vlist_length(&i->nodes); j++) {
|
||||
struct node *n = (struct node *) vlist_at(&i->nodes, j);
|
||||
|
||||
if (n->tc_qdisc)
|
||||
n->mark = 1 + mark++;
|
||||
if (n->tc_qdisc && n->fwmark < 0)
|
||||
n->fwmark = 1 + fwmark++;
|
||||
}
|
||||
|
||||
/* Abort if no node is using netem */
|
||||
if (mark == 0)
|
||||
if (fwmark == 0)
|
||||
return 0;
|
||||
|
||||
if (getuid() != 0)
|
||||
error("Network emulation requires super-user privileges!");
|
||||
|
||||
/* Replace root qdisc */
|
||||
ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, mark);
|
||||
ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, fwmark);
|
||||
if (ret)
|
||||
error("Failed to setup priority queuing discipline: %s", nl_geterror(ret));
|
||||
|
||||
|
@ -98,16 +98,16 @@ int if_start(struct interface *i)
|
|||
struct node *n = (struct node *) vlist_at(&i->nodes, j);
|
||||
|
||||
if (n->tc_qdisc) {
|
||||
ret = tc_mark(i, &n->tc_classifier, TC_HANDLE(1, n->mark), n->mark);
|
||||
ret = tc_mark(i, &n->tc_classifier, TC_HANDLE(1, n->fwmark), n->fwmark);
|
||||
if (ret)
|
||||
error("Failed to setup FW mark classifier: %s", nl_geterror(ret));
|
||||
|
||||
char *buf = tc_netem_print(n->tc_qdisc);
|
||||
debug(LOG_IF | 5, "Starting network emulation on interface '%s' for FW mark %u: %s",
|
||||
if_name(i), n->mark, buf);
|
||||
if_name(i), n->fwmark, buf);
|
||||
free(buf);
|
||||
|
||||
ret = tc_netem(i, &n->tc_qdisc, TC_HANDLE(0x1000+n->mark, 0), TC_HANDLE(1, n->mark));
|
||||
ret = tc_netem(i, &n->tc_qdisc, TC_HANDLE(0x1000+n->fwmark, 0), TC_HANDLE(1, n->fwmark));
|
||||
if (ret)
|
||||
error("Failed to setup netem qdisc: %s", nl_geterror(ret));
|
||||
}
|
||||
|
|
112
lib/mapping.c
112
lib/mapping.c
|
@ -32,8 +32,7 @@
|
|||
|
||||
int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *nodes)
|
||||
{
|
||||
int id;
|
||||
char *cpy, *node, *type, *field, *subfield, *end;
|
||||
char *cpy, *node, *type, *field, *end;
|
||||
|
||||
cpy = strdup(str);
|
||||
if (!cpy)
|
||||
|
@ -68,44 +67,21 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
|
|||
me->type = MAPPING_TYPE_STATS;
|
||||
me->length = 1;
|
||||
|
||||
field = strtok(NULL, ".");
|
||||
if (!field) {
|
||||
warning("Missing stats type");
|
||||
char *metric = strtok(NULL, ".");
|
||||
if (!metric)
|
||||
goto invalid_format;
|
||||
}
|
||||
|
||||
subfield = strtok(NULL, ".");
|
||||
if (!subfield) {
|
||||
warning("Missing stats sub-type");
|
||||
type = strtok(NULL, ".");
|
||||
if (!type)
|
||||
goto invalid_format;
|
||||
}
|
||||
|
||||
id = stats_lookup_id(field);
|
||||
if (id < 0) {
|
||||
warning("Invalid stats type");
|
||||
me->stats.metric = stats_lookup_metric(metric);
|
||||
if (me->stats.metric < 0)
|
||||
goto invalid_format;
|
||||
}
|
||||
|
||||
me->stats.id = id;
|
||||
|
||||
if (!strcmp(subfield, "total"))
|
||||
me->stats.type = MAPPING_STATS_TYPE_TOTAL;
|
||||
else if (!strcmp(subfield, "last"))
|
||||
me->stats.type = MAPPING_STATS_TYPE_LAST;
|
||||
else if (!strcmp(subfield, "lowest"))
|
||||
me->stats.type = MAPPING_STATS_TYPE_LOWEST;
|
||||
else if (!strcmp(subfield, "highest"))
|
||||
me->stats.type = MAPPING_STATS_TYPE_HIGHEST;
|
||||
else if (!strcmp(subfield, "mean"))
|
||||
me->stats.type = MAPPING_STATS_TYPE_MEAN;
|
||||
else if (!strcmp(subfield, "var"))
|
||||
me->stats.type = MAPPING_STATS_TYPE_VAR;
|
||||
else if (!strcmp(subfield, "stddev"))
|
||||
me->stats.type = MAPPING_STATS_TYPE_STDDEV;
|
||||
else {
|
||||
warning("Invalid stats sub-type");
|
||||
me->stats.type = stats_lookup_type(type);
|
||||
if (me->stats.type < 0)
|
||||
goto invalid_format;
|
||||
}
|
||||
}
|
||||
else if (!strcmp(type, "hdr")) {
|
||||
me->type = MAPPING_TYPE_HEADER;
|
||||
|
@ -276,35 +252,9 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
|
|||
return -1;
|
||||
|
||||
switch (me->type) {
|
||||
case MAPPING_TYPE_STATS: {
|
||||
const struct hist *h = &s->histograms[me->stats.id];
|
||||
|
||||
switch (me->stats.type) {
|
||||
case MAPPING_STATS_TYPE_TOTAL:
|
||||
remapped->data[off++].i = h->total;
|
||||
break;
|
||||
case MAPPING_STATS_TYPE_LAST:
|
||||
remapped->data[off++].f = h->last;
|
||||
break;
|
||||
case MAPPING_STATS_TYPE_HIGHEST:
|
||||
remapped->data[off++].f = h->highest;
|
||||
break;
|
||||
case MAPPING_STATS_TYPE_LOWEST:
|
||||
remapped->data[off++].f = h->lowest;
|
||||
break;
|
||||
case MAPPING_STATS_TYPE_MEAN:
|
||||
remapped->data[off++].f = hist_mean(h);
|
||||
break;
|
||||
case MAPPING_STATS_TYPE_STDDEV:
|
||||
remapped->data[off++].f = hist_stddev(h);
|
||||
break;
|
||||
case MAPPING_STATS_TYPE_VAR:
|
||||
remapped->data[off++].f = hist_var(h);
|
||||
break;
|
||||
default:
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
case MAPPING_TYPE_STATS:
|
||||
remapped->data[off++] = stats_get_value(s, me->stats.metric, me->stats.type);
|
||||
break;
|
||||
|
||||
case MAPPING_TYPE_TIMESTAMP: {
|
||||
const struct timespec *ts;
|
||||
|
@ -380,40 +330,10 @@ int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str)
|
|||
|
||||
switch (me->type) {
|
||||
case MAPPING_TYPE_STATS:
|
||||
switch (me->stats.type) {
|
||||
case MAPPING_STATS_TYPE_TOTAL:
|
||||
type = "total";
|
||||
break;
|
||||
|
||||
case MAPPING_STATS_TYPE_LAST:
|
||||
type = "last";
|
||||
break;
|
||||
|
||||
case MAPPING_STATS_TYPE_LOWEST:
|
||||
type = "lowest";
|
||||
break;
|
||||
|
||||
case MAPPING_STATS_TYPE_HIGHEST:
|
||||
type = "highest";
|
||||
break;
|
||||
|
||||
case MAPPING_STATS_TYPE_MEAN:
|
||||
type = "mean";
|
||||
break;
|
||||
|
||||
case MAPPING_STATS_TYPE_VAR:
|
||||
type = "var";
|
||||
break;
|
||||
|
||||
case MAPPING_STATS_TYPE_STDDEV:
|
||||
type = "stddev";
|
||||
break;
|
||||
|
||||
default:
|
||||
type = NULL;
|
||||
}
|
||||
|
||||
strcatf(str, "stats.%s", type);
|
||||
strcatf(str, "stats.%s.%s",
|
||||
stats_metrics[me->stats.metric].name,
|
||||
stats_types[me->stats.type].name
|
||||
);
|
||||
break;
|
||||
|
||||
case MAPPING_TYPE_HEADER:
|
||||
|
|
23
lib/memory.c
23
lib/memory.c
|
@ -27,6 +27,7 @@
|
|||
|
||||
#include <sys/time.h>
|
||||
#include <sys/resource.h>
|
||||
#include <sys/mman.h>
|
||||
|
||||
#include <villas/log.h>
|
||||
#include <villas/memory.h>
|
||||
|
@ -70,10 +71,14 @@ int memory_init(int hugepages)
|
|||
|
||||
int memory_lock(size_t lock)
|
||||
{
|
||||
#if defined(__linux__) && defined(__x86_64__)
|
||||
int ret;
|
||||
|
||||
#ifdef __linux__
|
||||
|
||||
#ifndef __arm__
|
||||
struct rlimit l;
|
||||
|
||||
/* Increase ressource limit for locked memory */
|
||||
ret = getrlimit(RLIMIT_MEMLOCK, &l);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
@ -81,11 +86,10 @@ int memory_lock(size_t lock)
|
|||
if (l.rlim_cur < lock) {
|
||||
if (l.rlim_max < lock) {
|
||||
if (getuid() != 0) {
|
||||
warning("Failed to in increase ressource limit of locked memory from %lu to %zu bytes", l.rlim_cur, lock);
|
||||
warning("Please re-run as super-user or raise manually via:");
|
||||
warning("Failed to in increase ressource limit of locked memory. Please increase manually by running as root:");
|
||||
warning(" $ ulimit -Hl %zu", lock);
|
||||
|
||||
return -1;
|
||||
goto out;
|
||||
}
|
||||
|
||||
l.rlim_max = lock;
|
||||
|
@ -99,7 +103,16 @@ int memory_lock(size_t lock)
|
|||
|
||||
debug(LOG_MEM | 2, "Increased ressource limit of locked memory to %zd bytes", lock);
|
||||
}
|
||||
#endif
|
||||
#endif /* __arm__ */
|
||||
out:
|
||||
#ifdef _POSIX_MEMLOCK
|
||||
/* Lock all current and future memory allocations */
|
||||
ret = mlockall(MCL_CURRENT | MCL_FUTURE);
|
||||
if (ret)
|
||||
return -1;
|
||||
#endif /* _POSIX_MEMLOCK */
|
||||
|
||||
#endif /* __linux__ */
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -65,10 +65,8 @@ int memory_hugepage_init(int hugepages)
|
|||
debug(LOG_MEM | 2, "Increased number of reserved hugepages from %d to %d", pagecnt, hugepages);
|
||||
}
|
||||
else {
|
||||
warning("Failed to reserved hugepages. Please re-run as super-user or reserve manually via:");
|
||||
warning("Failed to reserved hugepages. Please reserve manually by running as root:");
|
||||
warning(" $ echo %d > /proc/sys/vm/nr_hugepages", hugepages);
|
||||
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
@ -81,7 +79,7 @@ static struct memory_allocation * memory_hugepage_alloc(struct memory_type *m, s
|
|||
{
|
||||
static bool use_huge = true;
|
||||
|
||||
int ret, flags, fd;
|
||||
int flags, fd;
|
||||
size_t sz;
|
||||
|
||||
struct memory_allocation *ma = alloc(sizeof(struct memory_allocation));
|
||||
|
@ -120,7 +118,7 @@ retry: if (use_huge) {
|
|||
ma->address = mmap(NULL, ma->length, PROT_READ | PROT_WRITE, flags, fd, 0);
|
||||
if (ma->address == MAP_FAILED) {
|
||||
if (use_huge) {
|
||||
warning("Failed to map hugepages, try with normal pages instead");
|
||||
warning("Failed to map hugepages, try with normal pages instead!");
|
||||
use_huge = false;
|
||||
goto retry;
|
||||
}
|
||||
|
@ -130,12 +128,6 @@ retry: if (use_huge) {
|
|||
}
|
||||
}
|
||||
|
||||
if (getuid() == 0) {
|
||||
ret = mlock(ma->address, ma->length);
|
||||
if (ret)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return ma;
|
||||
}
|
||||
|
||||
|
|
73
lib/node.c
73
lib/node.c
|
@ -21,6 +21,7 @@
|
|||
*********************************************************************************/
|
||||
|
||||
#include <string.h>
|
||||
#include <ctype.h>
|
||||
|
||||
#include <villas/node/config.h>
|
||||
#include <villas/hook.h>
|
||||
|
@ -224,6 +225,10 @@ int node_init(struct node *n, struct node_type *vt)
|
|||
n->_name = NULL;
|
||||
n->_name_long = NULL;
|
||||
|
||||
#ifdef __linux__
|
||||
n->fwmark = -1;
|
||||
#endif /* __linux__ */
|
||||
|
||||
#ifdef WITH_NETEM
|
||||
n->tc_qdisc = NULL;
|
||||
n->tc_classifier = NULL;
|
||||
|
@ -279,16 +284,24 @@ int node_parse(struct node *n, json_t *json, const char *name)
|
|||
|
||||
n->name = strdup(name);
|
||||
|
||||
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o }, s?: { s?: o } }",
|
||||
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o } }",
|
||||
"type", &type,
|
||||
"in",
|
||||
"signals", &json_signals,
|
||||
"out",
|
||||
"netem", &json_netem
|
||||
"signals", &json_signals
|
||||
);
|
||||
if (ret)
|
||||
jerror(&err, "Failed to parse node %s", node_name(n));
|
||||
|
||||
#ifdef __linux__
|
||||
ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: o, s?: i } }",
|
||||
"out",
|
||||
"netem", &json_netem,
|
||||
"fwmark", &n->fwmark
|
||||
);
|
||||
if (ret)
|
||||
jerror(&err, "Failed to parse node %s", node_name(n));
|
||||
#endif /* __linux__ */
|
||||
|
||||
nt = node_type_lookup(type);
|
||||
assert(nt == node_type(n));
|
||||
|
||||
|
@ -395,18 +408,18 @@ int node_start(struct node *n)
|
|||
|
||||
#ifdef __linux__
|
||||
/* Set fwmark for outgoing packets if netem is enabled for this node */
|
||||
if (n->mark) {
|
||||
if (n->fwmark) {
|
||||
int fds[16];
|
||||
int num_sds = node_netem_fds(n, fds);
|
||||
|
||||
for (int i = 0; i < num_sds; i++) {
|
||||
int fd = fds[i];
|
||||
|
||||
ret = setsockopt(fd, SOL_SOCKET, SO_MARK, &n->mark, sizeof(n->mark));
|
||||
ret = setsockopt(fd, SOL_SOCKET, SO_MARK, &n->fwmark, sizeof(n->fwmark));
|
||||
if (ret)
|
||||
serror("Failed to set FW mark for outgoing packets");
|
||||
else
|
||||
debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", fd, n->mark);
|
||||
debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", fd, n->fwmark);
|
||||
}
|
||||
}
|
||||
#endif /* __linux__ */
|
||||
|
@ -421,7 +434,7 @@ int node_stop(struct node *n)
|
|||
{
|
||||
int ret;
|
||||
|
||||
if (n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT)
|
||||
if (n->state != STATE_STOPPING && n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT)
|
||||
return 0;
|
||||
|
||||
info("Stopping node %s", node_name(n));
|
||||
|
@ -548,12 +561,13 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
|
|||
{
|
||||
int readd, nread = 0;
|
||||
|
||||
if (n->state == STATE_PAUSED)
|
||||
return 0;
|
||||
|
||||
assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT);
|
||||
assert(node_type(n)->read);
|
||||
|
||||
if (n->state == STATE_PAUSED || n->state == STATE_PENDING_CONNECT)
|
||||
return 0;
|
||||
else if (n->state != STATE_STARTED && n->state != STATE_CONNECTED)
|
||||
return -1;
|
||||
|
||||
/* Send in parts if vector not supported */
|
||||
if (node_type(n)->vectorize > 0 && node_type(n)->vectorize < cnt) {
|
||||
while (cnt - nread > 0) {
|
||||
|
@ -576,7 +590,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
|
|||
int skipped = nread - rread;
|
||||
|
||||
if (skipped > 0 && n->stats != NULL) {
|
||||
stats_update(n->stats, STATS_SKIPPED, skipped);
|
||||
stats_update(n->stats, STATS_METRIC_SKIPPED, skipped);
|
||||
}
|
||||
|
||||
debug(LOG_NODE | 5, "Received %u samples from node %s of which %d have been skipped", nread, node_name(n), skipped);
|
||||
|
@ -593,9 +607,13 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
|
|||
{
|
||||
int tosend, sent, nsent = 0;
|
||||
|
||||
assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED);
|
||||
assert(node_type(n)->write);
|
||||
|
||||
if (n->state == STATE_PAUSED || n->state == STATE_PENDING_CONNECT)
|
||||
return 0;
|
||||
else if (n->state != STATE_STARTED && n->state != STATE_CONNECTED)
|
||||
return -1;
|
||||
|
||||
#ifdef WITH_HOOKS
|
||||
/* Run write hooks */
|
||||
cnt = hook_process_list(&n->out.hooks, smps, cnt);
|
||||
|
@ -608,12 +626,8 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
|
|||
while (cnt - nsent > 0) {
|
||||
tosend = MIN(cnt - nsent, node_type(n)->vectorize);
|
||||
sent = node_type(n)->write(n, &smps[nsent], tosend, release);
|
||||
if (sent < 0) {
|
||||
warning("Failed to send samples to node %s: reason=%d", node_name(n), sent);
|
||||
if (sent < 0)
|
||||
return sent;
|
||||
}
|
||||
else if (sent < tosend)
|
||||
warning("Failed to send %d out of %d samples to node %s", tosend-sent, tosend, node_name(n));
|
||||
|
||||
nsent += sent;
|
||||
debug(LOG_NODE | 5, "Sent %u samples to node %s", sent, node_name(n));
|
||||
|
@ -621,13 +635,8 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
|
|||
}
|
||||
else {
|
||||
nsent = node_type(n)->write(n, smps, cnt, release);
|
||||
if (nsent < 0) {
|
||||
warning("Failed to send samples to node %s: reason=%d", node_name(n), nsent);
|
||||
if (nsent < 0)
|
||||
return nsent;
|
||||
}
|
||||
else if (nsent < cnt)
|
||||
warning("Failed to send %d out of %d samples to node %s", cnt-nsent, cnt, node_name(n));
|
||||
|
||||
|
||||
debug(LOG_NODE | 5, "Sent %u samples to node %s", nsent, node_name(n));
|
||||
}
|
||||
|
@ -660,7 +669,7 @@ char * node_name_long(struct node *n)
|
|||
strcatf(&n->_name_long, ", out.netem=%s", n->tc_qdisc ? "yes" : "no");
|
||||
|
||||
if (n->tc_qdisc)
|
||||
strcatf(&n->_name_long, ", mark=%d", n->mark);
|
||||
strcatf(&n->_name_long, ", fwmark=%d", n->fwmark);
|
||||
#endif /* WITH_NETEM */
|
||||
|
||||
/* Append node-type specific details */
|
||||
|
@ -761,3 +770,15 @@ invalid2:
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int node_is_valid_name(const char *name)
|
||||
{
|
||||
for (const char *p = name; *p; p++) {
|
||||
if (isalnum(*p) || (*p == '_') || (*p == '-'))
|
||||
continue;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -370,10 +370,11 @@ retry: ret = io_scan(&f->io, smps, cnt);
|
|||
goto retry;
|
||||
|
||||
case FILE_EOF_STOP:
|
||||
info("Reached end-of-file. Stopping node %s", node_name(n));
|
||||
info("Reached end-of-file.");
|
||||
|
||||
killme(SIGTERM);
|
||||
pause();
|
||||
n->state = STATE_STOPPING;
|
||||
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
|
|
@ -101,55 +101,72 @@ int iec61850_parse_signals(json_t *json_signals, struct vlist *signals, struct v
|
|||
{
|
||||
int ret, total_size = 0;
|
||||
const char *iec_type;
|
||||
const struct iec61850_type_descriptor *td;
|
||||
struct signal *sig;
|
||||
|
||||
ret = vlist_init(signals);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
json_t *json_signal;
|
||||
size_t i;
|
||||
json_array_foreach(json_signals, i, json_signal) {
|
||||
const struct iec61850_type_descriptor *td;
|
||||
struct signal *sig;
|
||||
if (json_is_array(json_signals)) {
|
||||
|
||||
json_unpack(json_signal, "{ s?: s }",
|
||||
"iec_type", &iec_type
|
||||
);
|
||||
json_t *json_signal;
|
||||
size_t i;
|
||||
json_array_foreach(json_signals, i, json_signal) {
|
||||
json_unpack(json_signal, "{ s?: s }",
|
||||
"iec_type", &iec_type
|
||||
);
|
||||
|
||||
/* Try to deduct the IEC 61850 data type from VILLAS signal format */
|
||||
if (!iec_type) {
|
||||
if (!node_signals)
|
||||
return -1;
|
||||
|
||||
sig = vlist_at(node_signals, i);
|
||||
if (!sig)
|
||||
return -1;
|
||||
|
||||
switch (sig->type) {
|
||||
case SIGNAL_TYPE_BOOLEAN:
|
||||
iec_type = "boolean";
|
||||
break;
|
||||
|
||||
case SIGNAL_TYPE_FLOAT:
|
||||
iec_type = "float64";
|
||||
break;
|
||||
|
||||
case SIGNAL_TYPE_INTEGER:
|
||||
iec_type = "int64";
|
||||
break;
|
||||
|
||||
default:
|
||||
/* Try to deduct the IEC 61850 data type from VILLAS signal format */
|
||||
if (!iec_type) {
|
||||
if (!node_signals)
|
||||
return -1;
|
||||
|
||||
sig = vlist_at(node_signals, i);
|
||||
if (!sig)
|
||||
return -1;
|
||||
|
||||
switch (sig->type) {
|
||||
case SIGNAL_TYPE_BOOLEAN:
|
||||
iec_type = "boolean";
|
||||
break;
|
||||
|
||||
case SIGNAL_TYPE_FLOAT:
|
||||
iec_type = "float64";
|
||||
break;
|
||||
|
||||
case SIGNAL_TYPE_INTEGER:
|
||||
iec_type = "int64";
|
||||
break;
|
||||
|
||||
default:
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
td = iec61850_lookup_type(iec_type);
|
||||
if (!td)
|
||||
return -1;
|
||||
|
||||
vlist_push(signals, (void *) td);
|
||||
|
||||
total_size += td->size;
|
||||
}
|
||||
}
|
||||
else {
|
||||
ret = json_unpack(json_signals, "{ s: s }", "iec_type", &iec_type);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
td = iec61850_lookup_type(iec_type);
|
||||
if (!td)
|
||||
return -1;
|
||||
|
||||
vlist_push(signals, (void *) td);
|
||||
for (int i = 0; i < vlist_length(node_signals); i++) {
|
||||
vlist_push(signals, (void *) td);
|
||||
|
||||
total_size += td->size;
|
||||
total_size += td->size;
|
||||
}
|
||||
}
|
||||
|
||||
return total_size;
|
||||
|
|
|
@ -190,7 +190,7 @@ int iec61850_sv_parse(struct node *n, json_t *json)
|
|||
|
||||
i->out.svid = svid ? strdup(svid) : NULL;
|
||||
|
||||
ret = iec61850_parse_signals(json_signals, &i->out.signals, NULL);
|
||||
ret = iec61850_parse_signals(json_signals, &i->out.signals, &n->out.signals);
|
||||
if (ret <= 0)
|
||||
error("Failed to parse setting 'signals' of node %s", node_name(n));
|
||||
|
||||
|
@ -352,7 +352,7 @@ int iec61850_sv_read(struct node *n, struct sample *smps[], unsigned cnt, unsign
|
|||
struct sample *smpt[cnt];
|
||||
|
||||
if (!i->in.enabled)
|
||||
return 0;
|
||||
return -1;
|
||||
|
||||
pulled = queue_signalled_pull_many(&i->in.queue, (void **) smpt, cnt);
|
||||
|
||||
|
@ -367,7 +367,7 @@ int iec61850_sv_write(struct node *n, struct sample *smps[], unsigned cnt, unsig
|
|||
struct iec61850_sv *i = (struct iec61850_sv *) n->_vd;
|
||||
|
||||
if (!i->out.enabled)
|
||||
return 0;
|
||||
return -1;
|
||||
|
||||
for (unsigned j = 0; j < cnt; j++) {
|
||||
unsigned offset = 0;
|
||||
|
|
|
@ -63,7 +63,7 @@ static void mqtt_connect_cb(struct mosquitto *mosq, void *userdata, int result)
|
|||
if (m->subscribe) {
|
||||
ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos);
|
||||
if (ret)
|
||||
warning("MQTT: failed to subscribe to topic '%s' for node %s", m->subscribe, node_name(n));
|
||||
warning("MQTT: failed to subscribe to topic '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret));
|
||||
}
|
||||
else
|
||||
warning("MQTT: no subscribe for node %s as no subscribe topic is given", node_name(n));
|
||||
|
@ -207,11 +207,11 @@ int mqtt_parse(struct node *n, json_t *cfg)
|
|||
// Some checks
|
||||
ret = mosquitto_sub_topic_check(m->subscribe);
|
||||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
error("Invalid subscribe topic: '%s' for node %s", m->subscribe, node_name(n));
|
||||
error("Invalid subscribe topic: '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret));
|
||||
|
||||
ret = mosquitto_pub_topic_check(m->publish);
|
||||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
error("Invalid publish topic: '%s' for node %s", m->publish, node_name(n));
|
||||
error("Invalid publish topic: '%s' for node %s: %s", m->publish, node_name(n), mosquitto_strerror(ret));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -283,17 +283,17 @@ int mqtt_start(struct node *n)
|
|||
if (m->username && m->password) {
|
||||
ret = mosquitto_username_pw_set(m->client, m->username, m->password);
|
||||
if (ret)
|
||||
return ret;
|
||||
goto mosquitto_error;
|
||||
}
|
||||
|
||||
if (m->ssl.enabled) {
|
||||
ret = mosquitto_tls_set(m->client, m->ssl.cafile, m->ssl.capath, m->ssl.certfile, m->ssl.keyfile, NULL);
|
||||
if (ret)
|
||||
return ret;
|
||||
goto mosquitto_error;
|
||||
|
||||
ret = mosquitto_tls_insecure_set(m->client, m->ssl.insecure);
|
||||
if (ret)
|
||||
return ret;
|
||||
goto mosquitto_error;
|
||||
}
|
||||
|
||||
mosquitto_log_callback_set(m->client, mqtt_log_cb);
|
||||
|
@ -320,13 +320,18 @@ int mqtt_start(struct node *n)
|
|||
|
||||
ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive);
|
||||
if (ret)
|
||||
return ret;
|
||||
goto mosquitto_error;
|
||||
|
||||
ret = mosquitto_loop_start(m->client);
|
||||
if (ret)
|
||||
return ret;
|
||||
goto mosquitto_error;
|
||||
|
||||
return 0;
|
||||
|
||||
mosquitto_error:
|
||||
warning("MQTT: %s", mosquitto_strerror(ret));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int mqtt_stop(struct node *n)
|
||||
|
@ -336,17 +341,22 @@ int mqtt_stop(struct node *n)
|
|||
|
||||
ret = mosquitto_disconnect(m->client);
|
||||
if (ret)
|
||||
return ret;
|
||||
goto mosquitto_error;
|
||||
|
||||
ret = mosquitto_loop_stop(m->client, 0);
|
||||
if (ret)
|
||||
return ret;
|
||||
goto mosquitto_error;
|
||||
|
||||
ret = io_destroy(&m->io);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
|
||||
mosquitto_error:
|
||||
warning("MQTT: %s", mosquitto_strerror(ret));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int mqtt_type_start(struct super_node *sn)
|
||||
|
@ -355,9 +365,14 @@ int mqtt_type_start(struct super_node *sn)
|
|||
|
||||
ret = mosquitto_lib_init();
|
||||
if (ret)
|
||||
return ret;
|
||||
goto mosquitto_error;
|
||||
|
||||
return 0;
|
||||
|
||||
mosquitto_error:
|
||||
warning("MQTT: %s", mosquitto_strerror(ret));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int mqtt_type_stop()
|
||||
|
@ -366,9 +381,14 @@ int mqtt_type_stop()
|
|||
|
||||
ret = mosquitto_lib_cleanup();
|
||||
if (ret)
|
||||
return ret;
|
||||
goto mosquitto_error;
|
||||
|
||||
return 0;
|
||||
|
||||
mosquitto_error:
|
||||
warning("MQTT: %s", mosquitto_strerror(ret));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
|
||||
|
@ -399,8 +419,7 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
|
|||
return ret;
|
||||
|
||||
if (m->publish) {
|
||||
ret = mosquitto_publish(m->client, NULL /* mid */, m->publish, wbytes, data, m->qos,
|
||||
m->retain);
|
||||
ret = mosquitto_publish(m->client, NULL /* mid */, m->publish, wbytes, data, m->qos, m->retain);
|
||||
if (ret != MOSQ_ERR_SUCCESS) {
|
||||
warning("MQTT: publish failed for node %s: %s", node_name(n), mosquitto_strerror(ret));
|
||||
return -abs(ret);
|
||||
|
|
|
@ -58,6 +58,7 @@ static struct plugin p;
|
|||
static int rtp_set_rate(struct node *n, double rate)
|
||||
{
|
||||
struct rtp *r = (struct rtp *) n->_vd;
|
||||
int ratio;
|
||||
|
||||
switch (r->rtcp.throttle_mode) {
|
||||
case RTCP_THROTTLE_HOOK_LIMIT_RATE:
|
||||
|
@ -65,7 +66,10 @@ static int rtp_set_rate(struct node *n, double rate)
|
|||
break;
|
||||
|
||||
case RTCP_THROTTLE_HOOK_DECIMATE:
|
||||
decimate_set_ratio(r->rtcp.throttle_hook, r->rate / rate);
|
||||
ratio = r->rate / rate;
|
||||
if (ratio == 0)
|
||||
ratio = 1;
|
||||
decimate_set_ratio(r->rtcp.throttle_hook, ratio);
|
||||
break;
|
||||
|
||||
case RTCP_THROTTLE_DISABLED:
|
||||
|
@ -75,7 +79,7 @@ static int rtp_set_rate(struct node *n, double rate)
|
|||
return -1;
|
||||
}
|
||||
|
||||
debug(5, "Set rate limiting for node %s to %f", node_name(n), rate);
|
||||
info("Set rate limiting for node %s to %f", node_name(n), rate);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -87,7 +91,10 @@ static int rtp_aimd(struct node *n, double loss_frac)
|
|||
int ret;
|
||||
double rate;
|
||||
|
||||
if (loss_frac < 1e-3)
|
||||
if (!r->rtcp.enabled)
|
||||
return -1;
|
||||
|
||||
if (loss_frac < 0.01)
|
||||
rate = r->aimd.last_rate + r->aimd.a;
|
||||
else
|
||||
rate = r->aimd.last_rate * r->aimd.b;
|
||||
|
@ -98,7 +105,10 @@ static int rtp_aimd(struct node *n, double loss_frac)
|
|||
if (ret)
|
||||
return ret;
|
||||
|
||||
fprintf(r->aimd.log, "%d\t%f\t%f\n", r->rtcp.num_rrs, loss_frac, rate);
|
||||
if (r->aimd.log)
|
||||
fprintf(r->aimd.log, "%d\t%f\t%f\n", r->rtcp.num_rrs, loss_frac, rate);
|
||||
|
||||
info("AIMD: %d\t%f\t%f", r->rtcp.num_rrs, loss_frac, rate);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -112,7 +122,8 @@ int rtp_init(struct node *n)
|
|||
|
||||
r->aimd.a = 10;
|
||||
r->aimd.b = 0.5;
|
||||
r->aimd.last_rate = 100;
|
||||
r->aimd.last_rate = 2000;
|
||||
r->aimd.log = NULL;
|
||||
|
||||
r->rtcp.enabled = false;
|
||||
r->rtcp.throttle_mode = RTCP_THROTTLE_DISABLED;
|
||||
|
@ -163,7 +174,7 @@ int rtp_parse(struct node *n, json_t *cfg)
|
|||
|
||||
/* AIMD */
|
||||
if (json_aimd) {
|
||||
ret = json_unpack_ex(json_rtcp, &err, 0, "{ s?: F, s?: F, s?: F }",
|
||||
ret = json_unpack_ex(json_aimd, &err, 0, "{ s?: F, s?: F, s?: F }",
|
||||
"a", &r->aimd.a,
|
||||
"b", &r->aimd.b,
|
||||
"start_rate", &r->aimd.last_rate
|
||||
|
@ -207,7 +218,7 @@ int rtp_parse(struct node *n, json_t *cfg)
|
|||
|
||||
/* Format */
|
||||
r->format = format_type_lookup(format);
|
||||
if(!r->format)
|
||||
if (!r->format)
|
||||
error("Invalid format '%s' for node %s", format, node_name(n));
|
||||
|
||||
/* Remote address */
|
||||
|
@ -280,6 +291,9 @@ char * rtp_print(struct node *n)
|
|||
}
|
||||
|
||||
strcatf(&buf, ", rtcp.mode=%s, rtcp.throttle_mode=%s", mode, throttle_mode);
|
||||
|
||||
if (r->rtcp.mode == RTCP_MODE_AIMD)
|
||||
strcatf(&buf, ", aimd.a=%f, aimd.b=%f, aimd.start_rate=%f", r->aimd.a, r->aimd.b, r->aimd.last_rate);
|
||||
}
|
||||
|
||||
free(local);
|
||||
|
@ -315,16 +329,16 @@ static void rtcp_handler(const struct sa *src, struct rtcp_msg *msg, void *arg)
|
|||
/* source not used */
|
||||
(void) src;
|
||||
|
||||
debug(5, "rtcp: recv %s", rtcp_type_name(msg->hdr.pt));
|
||||
debug(5, "RTCP: recv %s", rtcp_type_name(msg->hdr.pt));
|
||||
|
||||
if (msg->hdr.pt == RTCP_SR) {
|
||||
if(msg->hdr.count > 0) {
|
||||
if (msg->hdr.count > 0) {
|
||||
const struct rtcp_rr *rr = &msg->r.sr.rrv[0];
|
||||
debug(5, "rtp: fraction lost = %d", rr->fraction);
|
||||
rtp_aimd(n, rr->fraction);
|
||||
debug(5, "RTP: fraction lost = %d", rr->fraction);
|
||||
rtp_aimd(n, (double) rr->fraction / 256);
|
||||
}
|
||||
else
|
||||
warning("Received RTCP sender report with zero reception reports");
|
||||
debug(5, "RTCP: Received sender report with zero reception reports");
|
||||
}
|
||||
|
||||
r->rtcp.num_rrs++;
|
||||
|
@ -469,14 +483,14 @@ int rtp_type_start(struct super_node *sn)
|
|||
/* Initialize library */
|
||||
ret = libre_init();
|
||||
if (ret) {
|
||||
error("Error initializing libre");
|
||||
warning("Error initializing libre");
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Add worker thread */
|
||||
ret = pthread_create(&re_pthread, NULL, th_func, NULL);
|
||||
if (ret) {
|
||||
error("Error creating rtp node type pthread");
|
||||
warning("Error creating rtp node type pthread");
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -136,8 +136,11 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
|
|||
if (recv < 0) {
|
||||
/* This can only really mean that the other process has exited, so close
|
||||
* the interface to make sure the shared memory object is unlinked */
|
||||
shmem_int_close(&shm->intf);
|
||||
warning("Shared memory segment has been closed for node: %s", node_name(n));
|
||||
|
||||
info("Shared memory segment has been closed.");
|
||||
|
||||
n->state = STATE_STOPPING;
|
||||
|
||||
return recv;
|
||||
}
|
||||
|
||||
|
|
|
@ -264,9 +264,11 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, u
|
|||
}
|
||||
|
||||
if (s->limit > 0 && s->counter >= s->limit) {
|
||||
info("Reached limit of node %s", node_name(n));
|
||||
killme(SIGTERM);
|
||||
return 0;
|
||||
info("Reached limit.");
|
||||
|
||||
n->state = STATE_STOPPING;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
s->counter += steps;
|
||||
|
|
|
@ -38,55 +38,57 @@
|
|||
|
||||
static struct vlist *nodes; /** The global list of nodes */
|
||||
|
||||
static void stats_init_signals(struct node *n)
|
||||
int stats_node_signal_destroy(struct stats_node_signal *s)
|
||||
{
|
||||
struct stats_desc *desc;
|
||||
struct signal *sig;
|
||||
free(s->node_str);
|
||||
|
||||
for (int i = 0; i < STATS_COUNT; i++) {
|
||||
desc = &stats_metrics[i];
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Total */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "total");
|
||||
sig->type = SIGNAL_TYPE_INTEGER;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
int stats_node_signal_parse(struct stats_node_signal *s, json_t *cfg)
|
||||
{
|
||||
json_error_t err;
|
||||
|
||||
/* Last */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "last");
|
||||
sig->unit = strdup(desc->unit);
|
||||
sig->type = SIGNAL_TYPE_FLOAT;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
int ret;
|
||||
const char *stats;
|
||||
char *metric, *type, *node, *cpy;
|
||||
|
||||
/* Highest */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "highest");
|
||||
sig->unit = strdup(desc->unit);
|
||||
sig->type = SIGNAL_TYPE_FLOAT;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s: s }",
|
||||
"stats", &stats
|
||||
);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
/* Lowest */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "lowest");
|
||||
sig->unit = strdup(desc->unit);
|
||||
sig->type = SIGNAL_TYPE_FLOAT;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
cpy = strdup(stats);
|
||||
|
||||
/* Mean */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "mean");
|
||||
sig->unit = strdup(desc->unit);
|
||||
sig->type = SIGNAL_TYPE_FLOAT;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
node = strtok(cpy, ".");
|
||||
if (!node)
|
||||
goto invalid_format;
|
||||
|
||||
/* Variance */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "var");
|
||||
sig->unit = strf("%s^2", desc->unit); // variance has squared unit of variable
|
||||
sig->type = SIGNAL_TYPE_FLOAT;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
}
|
||||
metric = strtok(NULL, ".");
|
||||
if (!metric)
|
||||
goto invalid_format;
|
||||
|
||||
type = strtok(NULL, ".");
|
||||
if (!type)
|
||||
goto invalid_format;
|
||||
|
||||
s->metric = stats_lookup_metric(metric);
|
||||
if (s->metric < 0)
|
||||
goto invalid_format;
|
||||
|
||||
s->type = stats_lookup_type(type);
|
||||
if (s->type < 0)
|
||||
goto invalid_format;
|
||||
|
||||
s->node_str = strdup(node);
|
||||
|
||||
free(cpy);
|
||||
return 0;
|
||||
|
||||
invalid_format:
|
||||
free(cpy);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int stats_node_type_start(struct super_node *sn)
|
||||
|
@ -105,9 +107,13 @@ int stats_node_start(struct node *n)
|
|||
if (ret)
|
||||
serror("Failed to create task");
|
||||
|
||||
s->node = vlist_lookup(nodes, s->node_str);
|
||||
if (!s->node)
|
||||
error("Invalid reference node %s for setting 'node' of node %s", s->node_str, node_name(n));
|
||||
for (size_t i = 0; i < vlist_length(&s->signals); i++) {
|
||||
struct stats_node_signal *stats_sig = (struct stats_node_signal *) vlist_at(&s->signals, i);
|
||||
|
||||
stats_sig->node = vlist_lookup(nodes, stats_sig->node_str);
|
||||
if (!stats_sig->node)
|
||||
error("Invalid reference node %s for setting 'node' of node %s", stats_sig->node_str, node_name(n));
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -128,7 +134,31 @@ char * stats_node_print(struct node *n)
|
|||
{
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
return strf("node=%s, rate=%f", s->node_str, s->rate);
|
||||
return strf("rate=%f", s->rate);
|
||||
}
|
||||
|
||||
int stats_node_init(struct node *n)
|
||||
{
|
||||
int ret;
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
ret = vlist_init(&s->signals);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_node_destroy(struct node *n)
|
||||
{
|
||||
int ret;
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
ret = vlist_destroy(&s->signals, (dtor_cb_t) stats_node_signal_destroy, true);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_node_parse(struct node *n, json_t *cfg)
|
||||
|
@ -136,13 +166,14 @@ int stats_node_parse(struct node *n, json_t *cfg)
|
|||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
int ret;
|
||||
size_t i;
|
||||
json_error_t err;
|
||||
json_t *json_signals, *json_signal;
|
||||
|
||||
const char *node;
|
||||
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s: F }",
|
||||
"node", &node,
|
||||
"rate", &s->rate
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s: F, s: { s: o } }",
|
||||
"rate", &s->rate,
|
||||
"in",
|
||||
"signals", &json_signals
|
||||
);
|
||||
if (ret)
|
||||
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
|
||||
|
@ -150,50 +181,68 @@ int stats_node_parse(struct node *n, json_t *cfg)
|
|||
if (s->rate <= 0)
|
||||
error("Setting 'rate' of node %s must be positive", node_name(n));
|
||||
|
||||
s->node_str = strdup(node);
|
||||
if (!json_is_array(json_signals))
|
||||
error("Setting 'in.signals' of node %s must be an array", node_name(n));
|
||||
|
||||
stats_init_signals(n);
|
||||
json_array_foreach(json_signals, i, json_signal) {
|
||||
struct signal *sig = (struct signal *) vlist_at(&n->in.signals, i);
|
||||
struct stats_node_signal *stats_sig;
|
||||
|
||||
return 0;
|
||||
}
|
||||
stats_sig = alloc(sizeof(struct stats_node_signal));
|
||||
if (!stats_sig)
|
||||
return -1;
|
||||
|
||||
int stats_node_destroy(struct node *n)
|
||||
{
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
ret = stats_node_signal_parse(stats_sig, json_signal);
|
||||
if (ret)
|
||||
error("Failed to parse signal definition of node %s", node_name(n));
|
||||
|
||||
if (s->node_str)
|
||||
free(s->node_str);
|
||||
if (!sig->name) {
|
||||
const char *metric = stats_metrics[stats_sig->metric].name;
|
||||
const char *type = stats_types[stats_sig->type].name;
|
||||
|
||||
sig->name = strf("%s.%s.%s", stats_sig->node_str, metric, type);
|
||||
}
|
||||
|
||||
if (!sig->unit)
|
||||
sig->unit = strdup(stats_metrics[stats_sig->metric].unit);
|
||||
|
||||
if (sig->type == SIGNAL_TYPE_AUTO)
|
||||
sig->type = stats_types[stats_sig->type].signal_type;
|
||||
else if (sig->type != stats_types[stats_sig->type].signal_type)
|
||||
error("Invalid type for signal %zu in node %s", i, node_name(n));
|
||||
|
||||
vlist_push(&s->signals, stats_sig);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
|
||||
{
|
||||
struct stats_node *sn = (struct stats_node *) n->_vd;
|
||||
struct stats *s = sn->node->stats;
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
if (!cnt)
|
||||
return 0;
|
||||
|
||||
if (!sn->node->stats)
|
||||
return 0;
|
||||
task_wait(&s->task);
|
||||
|
||||
task_wait(&sn->task);
|
||||
int len = MIN(vlist_length(&s->signals), smps[0]->capacity);
|
||||
|
||||
smps[0]->length = MIN(STATS_COUNT * 6, smps[0]->capacity);
|
||||
smps[0]->flags = SAMPLE_HAS_DATA;
|
||||
for (size_t i = 0; i < len; i++) {
|
||||
struct stats *st;
|
||||
struct stats_node_signal *sig = (struct stats_node_signal *) vlist_at(&s->signals, i);
|
||||
|
||||
for (int i = 0; i < 6 && (i+1)*STATS_METRICS <= smps[0]->length; i++) {
|
||||
int tot = hist_total(&s->histograms[i]);
|
||||
st = sig->node->stats;
|
||||
if (!st)
|
||||
return -1;
|
||||
|
||||
smps[0]->data[i*STATS_METRICS+0].f = tot ? hist_total(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*STATS_METRICS+1].f = tot ? hist_last(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*STATS_METRICS+2].f = tot ? hist_highest(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*STATS_METRICS+3].f = tot ? hist_lowest(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*STATS_METRICS+4].f = tot ? hist_mean(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*STATS_METRICS+5].f = tot ? hist_var(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i] = stats_get_value(st, sig->metric, sig->type);
|
||||
}
|
||||
|
||||
smps[0]->length = len;
|
||||
smps[0]->flags = SAMPLE_HAS_DATA;
|
||||
smps[0]->signals = &n->in.signals;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -212,10 +261,11 @@ static struct plugin p = {
|
|||
.type = PLUGIN_TYPE_NODE,
|
||||
.node = {
|
||||
.vectorize = 1,
|
||||
.flags = NODE_TYPE_PROVIDES_SIGNALS,
|
||||
.flags = 0,
|
||||
.size = sizeof(struct stats_node),
|
||||
.type.start = stats_node_type_start,
|
||||
.parse = stats_node_parse,
|
||||
.init = stats_node_init,
|
||||
.destroy = stats_node_destroy,
|
||||
.print = stats_node_print,
|
||||
.start = stats_node_start,
|
||||
|
|
|
@ -307,9 +307,11 @@ int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned
|
|||
}
|
||||
|
||||
if (t->current >= vlist_length(&t->cases)) {
|
||||
info("This was the last case. Terminating.");
|
||||
killme(SIGTERM);
|
||||
pause();
|
||||
info("This was the last case. Stopping node %s", node_name(n));
|
||||
|
||||
n->state = STATE_STOPPING;
|
||||
|
||||
return -1;
|
||||
}
|
||||
else {
|
||||
struct test_rtt_case *c = (struct test_rtt_case *) vlist_at(&t->cases, t->current);
|
||||
|
|
117
lib/path.c
117
lib/path.c
|
@ -73,9 +73,9 @@ static int path_source_destroy(struct path_source *ps)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void path_source_read(struct path_source *ps, struct path *p, int i)
|
||||
static int path_source_read(struct path_source *ps, struct path *p, int i)
|
||||
{
|
||||
int recv, tomux, allocated, cnt;
|
||||
int recv, tomux, allocated, cnt, toenqueue, enqueued = 0;
|
||||
unsigned release;
|
||||
|
||||
cnt = ps->node->in.vectorize;
|
||||
|
@ -93,10 +93,20 @@ static void path_source_read(struct path_source *ps, struct path *p, int i)
|
|||
release = allocated;
|
||||
|
||||
recv = node_read(ps->node, read_smps, allocated, &release);
|
||||
if (recv == 0)
|
||||
if (recv == 0) {
|
||||
enqueued = 0;
|
||||
goto out2;
|
||||
else if (recv < 0)
|
||||
error("Failed to read samples from node %s", node_name(ps->node));
|
||||
}
|
||||
else if (recv < 0) {
|
||||
if (ps->node->state == STATE_STOPPING) {
|
||||
p->state = STATE_STOPPING;
|
||||
|
||||
enqueued = -1;
|
||||
goto out2;
|
||||
}
|
||||
else
|
||||
error("Failed to read samples from node %s", node_name(ps->node));
|
||||
}
|
||||
else if (recv < allocated)
|
||||
warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated);
|
||||
|
||||
|
@ -134,30 +144,33 @@ static void path_source_read(struct path_source *ps, struct path *p, int i)
|
|||
debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received));
|
||||
|
||||
#ifdef WITH_HOOKS
|
||||
int toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux);
|
||||
toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux);
|
||||
if (toenqueue != tomux) {
|
||||
int skipped = tomux - toenqueue;
|
||||
|
||||
debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p));
|
||||
}
|
||||
#else
|
||||
int toenqueue = tomux;
|
||||
toenqueue = tomux;
|
||||
#endif
|
||||
|
||||
if (bitset_test(&p->mask, i)) {
|
||||
/* Check if we received an update from all nodes/ */
|
||||
if ((p->mode == PATH_MODE_ANY) ||
|
||||
(p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received)))
|
||||
{
|
||||
(p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) {
|
||||
path_destination_enqueue(p, muxed_smps, toenqueue);
|
||||
|
||||
/* Reset bitset of updated nodes */
|
||||
bitset_clear_all(&p->received);
|
||||
|
||||
enqueued = toenqueue;
|
||||
}
|
||||
}
|
||||
|
||||
sample_decref_many(muxed_smps, tomux);
|
||||
out2: sample_decref_many(read_smps, release);
|
||||
|
||||
return enqueued;
|
||||
}
|
||||
|
||||
static int path_destination_init(struct path_destination *pd, int queuelen)
|
||||
|
@ -244,21 +257,22 @@ static void path_destination_write(struct path_destination *pd, struct path *p)
|
|||
|
||||
static void * path_run_single(void *arg)
|
||||
{
|
||||
int ret;
|
||||
struct path *p = arg;
|
||||
struct path_source *ps = (struct path_source *) vlist_at(&p->sources, 0);
|
||||
|
||||
debug(1, "Started path %s in single mode", path_name(p));
|
||||
while (p->state == STATE_STARTED) {
|
||||
pthread_testcancel();
|
||||
|
||||
for (;;) {
|
||||
path_source_read(ps, p, 0);
|
||||
ret = path_source_read(ps, p, 0);
|
||||
if (ret <= 0)
|
||||
continue;
|
||||
|
||||
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
|
||||
struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
|
||||
|
||||
path_destination_write(pd, p);
|
||||
}
|
||||
|
||||
pthread_testcancel();
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -270,9 +284,7 @@ static void * path_run_poll(void *arg)
|
|||
int ret;
|
||||
struct path *p = arg;
|
||||
|
||||
debug(1, "Started path %s in polling mode", path_name(p));
|
||||
|
||||
for (;;) {
|
||||
while (p->state == STATE_STARTED) {
|
||||
ret = poll(p->reader.pfds, p->reader.nfds, -1);
|
||||
if (ret < 0)
|
||||
serror("Failed to poll");
|
||||
|
@ -292,9 +304,8 @@ static void * path_run_poll(void *arg)
|
|||
path_destination_enqueue(p, &p->last_sample, 1);
|
||||
}
|
||||
/* A source is ready to receive samples */
|
||||
else {
|
||||
else
|
||||
path_source_read(ps, p, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -521,7 +532,6 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
|
|||
"rate", &p->rate,
|
||||
"mask", &json_mask,
|
||||
"original_sequence_no", &p->original_sequence_no
|
||||
|
||||
);
|
||||
if (ret)
|
||||
jerror(&err, "Failed to parse path configuration");
|
||||
|
@ -648,14 +658,8 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
|
|||
p->poll = 1;
|
||||
else if (vlist_length(&p->sources) > 1)
|
||||
p->poll = 1;
|
||||
else {
|
||||
struct path_source *ps = (struct path_source *) vlist_at(&p->sources, 0);
|
||||
|
||||
int fds[16];
|
||||
int num_fds = node_poll_fds(ps->node, fds);
|
||||
|
||||
p->poll = num_fds > 0;
|
||||
}
|
||||
else
|
||||
p->poll = 0;
|
||||
}
|
||||
|
||||
ret = vlist_destroy(&sources, NULL, false);
|
||||
|
@ -806,11 +810,15 @@ int path_stop(struct path *p)
|
|||
{
|
||||
int ret;
|
||||
|
||||
if (p->state != STATE_STARTED)
|
||||
if (p->state != STATE_STARTED && p->state != STATE_STOPPING)
|
||||
return 0;
|
||||
|
||||
info("Stopping path: %s", path_name(p));
|
||||
|
||||
if (p->state != STATE_STOPPING)
|
||||
p->state = STATE_STOPPING;
|
||||
|
||||
/* Cancel the thread in case is currently in a blocking syscall */
|
||||
ret = pthread_cancel(p->tid);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
@ -908,49 +916,22 @@ int path_uses_node(struct path *p, struct node *n)
|
|||
return -1;
|
||||
}
|
||||
|
||||
int path_reverse(struct path *p, struct path *r)
|
||||
int path_is_simple(struct path *p)
|
||||
{
|
||||
if (vlist_length(&p->destinations) != 1 || vlist_length(&p->sources) != 1)
|
||||
return -1;
|
||||
int ret;
|
||||
const char *in = NULL, *out = NULL;
|
||||
|
||||
/* General */
|
||||
r->enabled = p->enabled;
|
||||
ret = json_unpack(p->cfg, "{ s: s, s: s }", "in", &in, "out", &out);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
/* Source / Destinations */
|
||||
struct path_destination *orig_pd = vlist_first(&p->destinations);
|
||||
struct path_source *orig_ps = vlist_first(&p->sources);
|
||||
ret = node_is_valid_name(in);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
struct path_destination *new_pd = (struct path_destination *) alloc(sizeof(struct path_destination));
|
||||
struct path_source *new_ps = (struct path_source *) alloc(sizeof(struct path_source));
|
||||
struct mapping_entry *new_me = alloc(sizeof(struct mapping_entry));
|
||||
new_pd->node = orig_ps->node;
|
||||
new_ps->node = orig_pd->node;
|
||||
new_ps->masked = true;
|
||||
ret = node_is_valid_name(out);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
new_me->node = new_ps->node;
|
||||
new_me->type = MAPPING_TYPE_DATA;
|
||||
new_me->data.offset = 0;
|
||||
new_me->length = 0;
|
||||
|
||||
vlist_init(&new_ps->mappings);
|
||||
vlist_push(&new_ps->mappings, new_me);
|
||||
|
||||
vlist_push(&r->destinations, new_pd);
|
||||
vlist_push(&r->sources, new_ps);
|
||||
|
||||
#ifdef WITH_HOOKS
|
||||
for (size_t i = 0; i < vlist_length(&p->hooks); i++) {
|
||||
int ret;
|
||||
|
||||
struct hook *h = (struct hook *) vlist_at(&p->hooks, i);
|
||||
struct hook *g = (struct hook *) alloc(sizeof(struct hook));
|
||||
|
||||
ret = hook_init(g, h->_vt, r, NULL);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
vlist_push(&r->hooks, g);
|
||||
}
|
||||
#endif /* WITH_HOOKS */
|
||||
return 0;
|
||||
}
|
||||
|
|
15
lib/signal.c
15
lib/signal.c
|
@ -55,20 +55,7 @@ int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, u
|
|||
|
||||
switch (me->type) {
|
||||
case MAPPING_TYPE_STATS:
|
||||
switch (me->stats.type) {
|
||||
case MAPPING_STATS_TYPE_TOTAL:
|
||||
s->type = SIGNAL_TYPE_INTEGER;
|
||||
break;
|
||||
|
||||
case MAPPING_STATS_TYPE_LAST:
|
||||
case MAPPING_STATS_TYPE_LOWEST:
|
||||
case MAPPING_STATS_TYPE_HIGHEST:
|
||||
case MAPPING_STATS_TYPE_MEAN:
|
||||
case MAPPING_STATS_TYPE_VAR:
|
||||
case MAPPING_STATS_TYPE_STDDEV:
|
||||
s->type = SIGNAL_TYPE_FLOAT;
|
||||
break;
|
||||
}
|
||||
s->type = stats_types[me->stats.type].signal_type;
|
||||
break;
|
||||
|
||||
case MAPPING_TYPE_HEADER:
|
||||
|
|
132
lib/stats.c
132
lib/stats.c
|
@ -31,12 +31,22 @@
|
|||
#include <villas/node.h>
|
||||
#include <villas/table.h>
|
||||
|
||||
struct stats_desc stats_metrics[] = {
|
||||
{ "skipped", "samples", "Skipped samples and the distance between them", 25 },
|
||||
{ "reordered", "samples", "Reordered samples and the distance between them", 25 },
|
||||
{ "gap_sent", "seconds", "Inter-message timestamps (as sent by remote)", 25 },
|
||||
{ "gap_received", "seconds", "Inter-message arrival time (as received by this instance)", 25 },
|
||||
{ "owd", "seconds", "One-way-delay (OWD) of received messages", 25 }
|
||||
struct stats_metric_description stats_metrics[] = {
|
||||
{ "skipped", STATS_METRIC_SKIPPED, "samples", "Skipped samples and the distance between them", 25 },
|
||||
{ "reordered", STATS_METRIC_REORDERED, "samples", "Reordered samples and the distance between them", 25 },
|
||||
{ "gap_sent", STATS_METRIC_GAP_SAMPLE, "seconds", "Inter-message timestamps (as sent by remote)", 25 },
|
||||
{ "gap_received", STATS_METRIC_GAP_RECEIVED, "seconds", "Inter-message arrival time (as received by this instance)", 25 },
|
||||
{ "owd", STATS_METRIC_OWD, "seconds", "One-way-delay (OWD) of received messages", 25 }
|
||||
};
|
||||
|
||||
struct stats_type_description stats_types[] = {
|
||||
{ "last", STATS_TYPE_LAST, SIGNAL_TYPE_FLOAT },
|
||||
{ "highest", STATS_TYPE_HIGHEST, SIGNAL_TYPE_FLOAT },
|
||||
{ "lowest", STATS_TYPE_LOWEST, SIGNAL_TYPE_FLOAT },
|
||||
{ "mean", STATS_TYPE_MEAN, SIGNAL_TYPE_FLOAT },
|
||||
{ "var", STATS_TYPE_VAR, SIGNAL_TYPE_FLOAT },
|
||||
{ "stddev", STATS_TYPE_STDDEV, SIGNAL_TYPE_FLOAT },
|
||||
{ "total", STATS_TYPE_TOTAL, SIGNAL_TYPE_INTEGER }
|
||||
};
|
||||
|
||||
int stats_lookup_format(const char *str)
|
||||
|
@ -51,9 +61,33 @@ int stats_lookup_format(const char *str)
|
|||
return -1;
|
||||
}
|
||||
|
||||
enum stats_metric stats_lookup_metric(const char *str)
|
||||
{
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
|
||||
struct stats_metric_description *d = &stats_metrics[i];
|
||||
|
||||
if (!strcmp(str, d->name))
|
||||
return d->metric;
|
||||
}
|
||||
|
||||
return STATS_METRIC_INVALID;
|
||||
}
|
||||
|
||||
enum stats_type stats_lookup_type(const char *str)
|
||||
{
|
||||
for (int i = 0; i < STATS_TYPE_COUNT; i++) {
|
||||
struct stats_type_description *d = &stats_types[i];
|
||||
|
||||
if (!strcmp(str, d->name))
|
||||
return d->type;
|
||||
}
|
||||
|
||||
return STATS_TYPE_INVALID;
|
||||
}
|
||||
|
||||
int stats_init(struct stats *s, int buckets, int warmup)
|
||||
{
|
||||
for (int i = 0; i < STATS_COUNT; i++)
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++)
|
||||
hist_init(&s->histograms[i], buckets, warmup);
|
||||
|
||||
s->delta = alloc(sizeof(struct stats_delta));
|
||||
|
@ -63,7 +97,7 @@ int stats_init(struct stats *s, int buckets, int warmup)
|
|||
|
||||
int stats_destroy(struct stats *s)
|
||||
{
|
||||
for (int i = 0; i < STATS_COUNT; i++)
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++)
|
||||
hist_destroy(&s->histograms[i]);
|
||||
|
||||
free(s->delta);
|
||||
|
@ -71,7 +105,7 @@ int stats_destroy(struct stats *s)
|
|||
return 0;
|
||||
}
|
||||
|
||||
void stats_update(struct stats *s, enum stats_id id, double val)
|
||||
void stats_update(struct stats *s, enum stats_metric id, double val)
|
||||
{
|
||||
s->delta->values[id] = val;
|
||||
s->delta->update |= 1 << id;
|
||||
|
@ -79,7 +113,7 @@ void stats_update(struct stats *s, enum stats_id id, double val)
|
|||
|
||||
int stats_commit(struct stats *s)
|
||||
{
|
||||
for (int i = 0; i < STATS_COUNT; i++) {
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
|
||||
if (s->delta->update & 1 << i) {
|
||||
hist_put(&s->histograms[i], s->delta->values[i]);
|
||||
s->delta->update &= ~(1 << i);
|
||||
|
@ -93,8 +127,8 @@ json_t * stats_json(struct stats *s)
|
|||
{
|
||||
json_t *obj = json_object();
|
||||
|
||||
for (int i = 0; i < STATS_COUNT; i++) {
|
||||
struct stats_desc *d = &stats_metrics[i];
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
|
||||
struct stats_metric_description *d = &stats_metrics[i];
|
||||
struct hist *h = &s->histograms[i];
|
||||
|
||||
json_object_set_new(obj, d->name, hist_json(h));
|
||||
|
@ -107,17 +141,17 @@ json_t * stats_json_periodic(struct stats *s, struct node *n)
|
|||
{
|
||||
return json_pack("{ s: s, s: i, s: f, s: f, s: i, s: i }",
|
||||
"node", node_name(n),
|
||||
"processed", hist_total(&s->histograms[STATS_OWD]),
|
||||
"owd", hist_last(&s->histograms[STATS_OWD]),
|
||||
"rate", 1.0 / hist_last(&s->histograms[STATS_GAP_SAMPLE]),
|
||||
"dropped", hist_total(&s->histograms[STATS_REORDERED]),
|
||||
"skipped", hist_total(&s->histograms[STATS_SKIPPED])
|
||||
"processed", hist_total(&s->histograms[STATS_METRIC_OWD]),
|
||||
"owd", hist_last(&s->histograms[STATS_METRIC_OWD]),
|
||||
"rate", 1.0 / hist_last(&s->histograms[STATS_METRIC_GAP_SAMPLE]),
|
||||
"dropped", hist_total(&s->histograms[STATS_METRIC_REORDERED]),
|
||||
"skipped", hist_total(&s->histograms[STATS_METRIC_SKIPPED])
|
||||
);
|
||||
}
|
||||
|
||||
void stats_reset(struct stats *s)
|
||||
{
|
||||
for (int i = 0; i < STATS_COUNT; i++)
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++)
|
||||
hist_reset(&s->histograms[i]);
|
||||
}
|
||||
|
||||
|
@ -155,13 +189,13 @@ void stats_print_periodic(struct stats *s, FILE *f, enum stats_format fmt, int v
|
|||
case STATS_FORMAT_HUMAN:
|
||||
table_row(&stats_table,
|
||||
node_name_short(n),
|
||||
hist_total(&s->histograms[STATS_OWD]),
|
||||
hist_last(&s->histograms[STATS_OWD]),
|
||||
hist_mean(&s->histograms[STATS_OWD]),
|
||||
1.0 / hist_last(&s->histograms[STATS_GAP_RECEIVED]),
|
||||
1.0 / hist_mean(&s->histograms[STATS_GAP_RECEIVED]),
|
||||
hist_total(&s->histograms[STATS_REORDERED]),
|
||||
hist_total(&s->histograms[STATS_SKIPPED])
|
||||
hist_total(&s->histograms[STATS_METRIC_OWD]),
|
||||
hist_last(&s->histograms[STATS_METRIC_OWD]),
|
||||
hist_mean(&s->histograms[STATS_METRIC_OWD]),
|
||||
1.0 / hist_last(&s->histograms[STATS_METRIC_GAP_RECEIVED]),
|
||||
1.0 / hist_mean(&s->histograms[STATS_METRIC_GAP_RECEIVED]),
|
||||
hist_total(&s->histograms[STATS_METRIC_REORDERED]),
|
||||
hist_total(&s->histograms[STATS_METRIC_SKIPPED])
|
||||
);
|
||||
break;
|
||||
|
||||
|
@ -179,10 +213,10 @@ void stats_print(struct stats *s, FILE *f, enum stats_format fmt, int verbose)
|
|||
{
|
||||
switch (fmt) {
|
||||
case STATS_FORMAT_HUMAN:
|
||||
for (int i = 0; i < STATS_COUNT; i++) {
|
||||
struct stats_desc *desc = &stats_metrics[i];
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
|
||||
struct stats_metric_description *d = &stats_metrics[i];
|
||||
|
||||
info("%s: %s", desc->name, desc->desc);
|
||||
info("%s: %s", d->name, d->desc);
|
||||
hist_print(&s->histograms[i], verbose);
|
||||
}
|
||||
break;
|
||||
|
@ -198,14 +232,44 @@ void stats_print(struct stats *s, FILE *f, enum stats_format fmt, int verbose)
|
|||
}
|
||||
}
|
||||
|
||||
enum stats_id stats_lookup_id(const char *name)
|
||||
union signal_data stats_get_value(const struct stats *s, enum stats_metric sm, enum stats_type st)
|
||||
{
|
||||
for (int i = 0; i < STATS_COUNT; i++) {
|
||||
struct stats_desc *desc = &stats_metrics[i];
|
||||
const struct hist *h = &s->histograms[sm];
|
||||
|
||||
if (!strcmp(desc->name, name))
|
||||
return i;
|
||||
union signal_data d;
|
||||
|
||||
switch (st) {
|
||||
case STATS_TYPE_TOTAL:
|
||||
d.i = h->total;
|
||||
break;
|
||||
|
||||
case STATS_TYPE_LAST:
|
||||
d.f = h->last;
|
||||
break;
|
||||
|
||||
case STATS_TYPE_HIGHEST:
|
||||
d.f = h->highest;
|
||||
break;
|
||||
|
||||
case STATS_TYPE_LOWEST:
|
||||
d.f = h->lowest;
|
||||
break;
|
||||
|
||||
case STATS_TYPE_MEAN:
|
||||
d.f = hist_mean(h);
|
||||
break;
|
||||
|
||||
case STATS_TYPE_STDDEV:
|
||||
d.f = hist_stddev(h);
|
||||
break;
|
||||
|
||||
case STATS_TYPE_VAR:
|
||||
d.f = hist_var(h);
|
||||
break;
|
||||
|
||||
default:
|
||||
d.f = -1;
|
||||
}
|
||||
|
||||
return -1;
|
||||
return d;
|
||||
}
|
||||
|
|
|
@ -49,10 +49,10 @@ using namespace villas::node;
|
|||
|
||||
SuperNode::SuperNode() :
|
||||
state(STATE_INITIALIZED),
|
||||
idleStop(false),
|
||||
priority(0),
|
||||
affinity(0),
|
||||
hugepages(DEFAULT_NR_HUGEPAGES),
|
||||
stats(0),
|
||||
#ifdef WITH_API
|
||||
api(this),
|
||||
#ifdef WITH_WEB
|
||||
|
@ -179,7 +179,9 @@ int SuperNode::parseJson(json_t *j)
|
|||
|
||||
json_error_t err;
|
||||
|
||||
ret = json_unpack_ex(j, &err, 0, "{ s?: o, s?: o, s?: o, s?: o, s?: i, s?: i, s?: i, s?: F, s?: s }",
|
||||
idleStop = true;
|
||||
|
||||
ret = json_unpack_ex(j, &err, 0, "{ s?: o, s?: o, s?: o, s?: o, s?: i, s?: i, s?: i, s?: s, s?: b }",
|
||||
"http", &json_web,
|
||||
"logging", &json_logging,
|
||||
"nodes", &json_nodes,
|
||||
|
@ -187,8 +189,8 @@ int SuperNode::parseJson(json_t *j)
|
|||
"hugepages", &hugepages,
|
||||
"affinity", &affinity,
|
||||
"priority", &priority,
|
||||
"stats", &stats,
|
||||
"name", &nme
|
||||
"name", &nme,
|
||||
"idle_stop", &idleStop
|
||||
);
|
||||
if (ret)
|
||||
throw JsonError(err, "Failed to parse global configuration");
|
||||
|
@ -215,6 +217,10 @@ int SuperNode::parseJson(json_t *j)
|
|||
struct node_type *nt;
|
||||
const char *type;
|
||||
|
||||
ret = node_is_valid_name(name);
|
||||
if (ret)
|
||||
throw RuntimeError("Invalid name for node: {}", name);
|
||||
|
||||
ret = json_unpack_ex(json_node, &err, 0, "{ s: s }", "type", &type);
|
||||
if (ret)
|
||||
throw JsonError(err, "Failed to parse node");
|
||||
|
@ -245,7 +251,7 @@ int SuperNode::parseJson(json_t *j)
|
|||
size_t i;
|
||||
json_t *json_path;
|
||||
json_array_foreach(json_paths, i, json_path) {
|
||||
path *p = (path *) alloc(sizeof(path));
|
||||
parse: path *p = (path *) alloc(sizeof(path));
|
||||
|
||||
ret = path_init(p);
|
||||
if (ret)
|
||||
|
@ -258,17 +264,25 @@ int SuperNode::parseJson(json_t *j)
|
|||
vlist_push(&paths, p);
|
||||
|
||||
if (p->reverse) {
|
||||
path *r = (path *) alloc(sizeof(path));
|
||||
|
||||
ret = path_init(r);
|
||||
/* Only simple paths can be reversed */
|
||||
ret = path_is_simple(p);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to init path");
|
||||
throw RuntimeError("Complex paths can not be reversed!");
|
||||
|
||||
ret = path_reverse(p, r);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to reverse path {}", path_name(p));
|
||||
/* Parse a second time with in/out reversed */
|
||||
json_path = json_copy(json_path);
|
||||
|
||||
vlist_push(&paths, r);
|
||||
json_t *json_in = json_object_get(json_path, "in");
|
||||
json_t *json_out = json_object_get(json_path, "out");
|
||||
|
||||
if (json_equal(json_in, json_out))
|
||||
throw RuntimeError("Can not reverse path with identical in/out nodes!");
|
||||
|
||||
json_object_set(json_path, "reverse", json_false());
|
||||
json_object_set(json_path, "in", json_out);
|
||||
json_object_set(json_path, "out", json_in);
|
||||
|
||||
goto parse;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -380,9 +394,14 @@ void SuperNode::startPaths()
|
|||
|
||||
void SuperNode::start()
|
||||
{
|
||||
int ret;
|
||||
|
||||
assert(state == STATE_CHECKED);
|
||||
|
||||
memory_init(hugepages);
|
||||
ret = memory_init(hugepages);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to initialize memory system");
|
||||
|
||||
kernel::rt::init(priority, affinity);
|
||||
|
||||
#ifdef WITH_API
|
||||
|
@ -398,17 +417,11 @@ void SuperNode::start()
|
|||
startNodes();
|
||||
startPaths();
|
||||
|
||||
#ifdef WITH_HOOKS
|
||||
int ret;
|
||||
ret = task_init(&task, 1.0, CLOCK_REALTIME);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to create timer");
|
||||
|
||||
if (stats > 0) {
|
||||
stats_print_header(STATS_FORMAT_HUMAN);
|
||||
|
||||
ret = task_init(&task, 1.0 / stats, CLOCK_REALTIME);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to create stats timer");
|
||||
}
|
||||
#endif /* WITH_HOOKS */
|
||||
stats_print_header(STATS_FORMAT_HUMAN);
|
||||
|
||||
state = STATE_STARTED;
|
||||
}
|
||||
|
@ -471,15 +484,11 @@ void SuperNode::stopInterfaces()
|
|||
|
||||
void SuperNode::stop()
|
||||
{
|
||||
|
||||
#ifdef WITH_HOOKS
|
||||
int ret;
|
||||
if (stats > 0) {
|
||||
ret = task_destroy(&task);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to stop stats timer");
|
||||
}
|
||||
#endif /* WITH_HOOKS */
|
||||
|
||||
ret = task_destroy(&task);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to stop timer");
|
||||
|
||||
stopPaths();
|
||||
stopNodes();
|
||||
|
@ -498,12 +507,15 @@ void SuperNode::stop()
|
|||
|
||||
void SuperNode::run()
|
||||
{
|
||||
#ifdef WITH_HOOKS
|
||||
task_wait(&task);
|
||||
periodic();
|
||||
#else
|
||||
pause();
|
||||
#endif /* WITH_HOOKS */
|
||||
int ret;
|
||||
|
||||
while (state == STATE_STARTED) {
|
||||
task_wait(&task);
|
||||
|
||||
ret = periodic();
|
||||
if (ret)
|
||||
state = STATE_STOPPING;
|
||||
}
|
||||
}
|
||||
|
||||
SuperNode::~SuperNode()
|
||||
|
@ -520,21 +532,25 @@ SuperNode::~SuperNode()
|
|||
|
||||
int SuperNode::periodic()
|
||||
{
|
||||
#ifdef WITH_HOOKS
|
||||
int ret;
|
||||
|
||||
int started = 0;
|
||||
|
||||
for (size_t i = 0; i < vlist_length(&paths); i++) {
|
||||
auto *p = (struct path *) vlist_at(&paths, i);
|
||||
|
||||
if (p->state != STATE_STARTED)
|
||||
continue;
|
||||
if (p->state == STATE_STARTED) {
|
||||
started++;
|
||||
|
||||
for (size_t j = 0; j < vlist_length(&p->hooks); j++) {
|
||||
hook *h = (struct hook *) vlist_at(&p->hooks, j);
|
||||
#ifdef WITH_HOOKS
|
||||
for (size_t j = 0; j < vlist_length(&p->hooks); j++) {
|
||||
hook *h = (struct hook *) vlist_at(&p->hooks, j);
|
||||
|
||||
ret = hook_periodic(h);
|
||||
if (ret)
|
||||
return ret;
|
||||
ret = hook_periodic(h);
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
#endif /* WITH_HOOKS */
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -544,6 +560,7 @@ int SuperNode::periodic()
|
|||
if (n->state != STATE_STARTED)
|
||||
continue;
|
||||
|
||||
#ifdef WITH_HOOKS
|
||||
for (size_t j = 0; j < vlist_length(&n->in.hooks); j++) {
|
||||
auto *h = (struct hook *) vlist_at(&n->in.hooks, j);
|
||||
|
||||
|
@ -559,8 +576,15 @@ int SuperNode::periodic()
|
|||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
#endif /* WITH_HOOKS */
|
||||
}
|
||||
#endif
|
||||
|
||||
if (idleStop && state == STATE_STARTED && started == 0) {
|
||||
info("No more active paths. Stopping super-node");
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -55,11 +55,22 @@ using namespace villas;
|
|||
using namespace villas::node;
|
||||
using namespace villas::plugin;
|
||||
|
||||
static std::atomic<bool> stop(false);
|
||||
SuperNode sn;
|
||||
|
||||
static void quit(int signal, siginfo_t *sinfo, void *ctx)
|
||||
{
|
||||
stop = true;
|
||||
Logger logger = logging.get("node");
|
||||
|
||||
switch (signal) {
|
||||
case SIGALRM:
|
||||
logger->info("Reached timeout. Terminating...");
|
||||
break;
|
||||
|
||||
default:
|
||||
logger->info("Received {} signal. Terminating...", strsignal(signal));
|
||||
}
|
||||
|
||||
sn.setState(STATE_STOPPING);
|
||||
}
|
||||
|
||||
static void usage()
|
||||
|
@ -108,7 +119,6 @@ int main(int argc, char *argv[])
|
|||
{
|
||||
int ret;
|
||||
|
||||
SuperNode sn;
|
||||
Logger logger = logging.get("node");
|
||||
|
||||
try {
|
||||
|
@ -175,10 +185,7 @@ int main(int argc, char *argv[])
|
|||
throw RuntimeError("Failed to verify configuration");
|
||||
|
||||
sn.start();
|
||||
|
||||
while (!stop)
|
||||
sn.run();
|
||||
|
||||
sn.run();
|
||||
sn.stop();
|
||||
|
||||
logger->info(CLR_GRN("Goodbye!"));
|
||||
|
@ -186,7 +193,7 @@ int main(int argc, char *argv[])
|
|||
return 0;
|
||||
|
||||
}
|
||||
catch (std::exception *e) {
|
||||
logger->error(e->what());
|
||||
catch (RuntimeError &e) {
|
||||
logger->error("{}", e.what());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -107,8 +107,15 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
|
|||
{
|
||||
Logger logger = logging.get("pipe");
|
||||
|
||||
if (signal == SIGALRM)
|
||||
logger->info("Reached timeout. Terminating...");
|
||||
switch (signal) {
|
||||
case SIGALRM:
|
||||
logger->info("Reached timeout. Terminating...");
|
||||
break;
|
||||
|
||||
default:
|
||||
logger->info("Received {} signal. Terminating...", strsignal(signal));
|
||||
break;
|
||||
}
|
||||
|
||||
stop = true;
|
||||
}
|
||||
|
@ -145,7 +152,7 @@ static void * send_loop(void *ctx)
|
|||
struct node *node = dirs->send.node;
|
||||
struct sample *smps[node->out.vectorize];
|
||||
|
||||
while (!io_eof(dirs->send.io)) {
|
||||
while (node->state == STATE_STARTED && !io_eof(dirs->send.io)) {
|
||||
allocated = sample_alloc_many(&dirs->send.pool, smps, node->out.vectorize);
|
||||
if (allocated < 0)
|
||||
throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize);
|
||||
|
@ -184,14 +191,14 @@ static void * send_loop(void *ctx)
|
|||
leave: if (io_eof(dirs->send.io)) {
|
||||
if (dirs->recv.limit < 0) {
|
||||
logger->info("Reached end-of-file. Terminating...");
|
||||
killme(SIGTERM);
|
||||
stop = true;
|
||||
}
|
||||
else
|
||||
logger->info("Reached end-of-file. Wait for receive side...");
|
||||
}
|
||||
else {
|
||||
logger->info("Reached send limit. Terminating...");
|
||||
killme(SIGTERM);
|
||||
stop = true;
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
|
@ -207,7 +214,7 @@ static void * recv_loop(void *ctx)
|
|||
struct node *node = dirs->recv.node;
|
||||
struct sample *smps[node->in.vectorize];
|
||||
|
||||
for (;;) {
|
||||
while (node->state == STATE_STARTED) {
|
||||
allocated = sample_alloc_many(&dirs->recv.pool, smps, node->in.vectorize);
|
||||
if (allocated < 0)
|
||||
throw RuntimeError("Failed to allocate {} samples from receive pool.", node->in.vectorize);
|
||||
|
@ -217,8 +224,12 @@ static void * recv_loop(void *ctx)
|
|||
release = allocated;
|
||||
|
||||
recv = node_read(node, smps, allocated, &release);
|
||||
if (recv < 0)
|
||||
logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv);
|
||||
if (recv < 0) {
|
||||
if (node->state == STATE_STOPPING)
|
||||
goto leave2;
|
||||
else
|
||||
logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv);
|
||||
}
|
||||
else {
|
||||
io_print(dirs->recv.io, smps, recv);
|
||||
|
||||
|
@ -232,7 +243,7 @@ static void * recv_loop(void *ctx)
|
|||
}
|
||||
|
||||
leave: logger->info("Reached receive limit. Terminating...");
|
||||
killme(SIGTERM);
|
||||
leave2: stop = true;
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
@ -411,7 +422,7 @@ check: if (optarg == endptr)
|
|||
alarm(timeout);
|
||||
|
||||
while (!stop)
|
||||
pause();
|
||||
sleep(1);
|
||||
|
||||
if (dirs.recv.enabled) {
|
||||
pthread_cancel(dirs.recv.thread);
|
||||
|
|
|
@ -258,7 +258,7 @@ int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in
|
|||
try {
|
||||
new (c) Connection(wsi);
|
||||
}
|
||||
catch (InvalidUrlException e) {
|
||||
catch (InvalidUrlException &e) {
|
||||
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *) "Invalid URL", strlen("Invalid URL"));
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -160,6 +160,17 @@ void usage()
|
|||
|
||||
static void quit(int signal, siginfo_t *sinfo, void *ctx)
|
||||
{
|
||||
Logger logger = logging.get("signal");
|
||||
|
||||
switch (signal) {
|
||||
case SIGALRM:
|
||||
logger->info("Reached timeout. Terminating...");
|
||||
break;
|
||||
|
||||
default:
|
||||
logger->info("Received {} signal. Terminating...", strsignal(signal));
|
||||
}
|
||||
|
||||
stop = true;
|
||||
}
|
||||
|
||||
|
@ -246,16 +257,20 @@ int main(int argc, char *argv[])
|
|||
if (ret)
|
||||
throw RuntimeError("Failed to open output");
|
||||
|
||||
while (!stop) {
|
||||
while (!stop && n.state == STATE_STARTED) {
|
||||
t = sample_alloc(&q);
|
||||
|
||||
unsigned release = 1; // release = allocated
|
||||
|
||||
ret = node_read(&n, &t, 1, &release);
|
||||
if (ret > 0)
|
||||
io_print(&io, &t, 1);
|
||||
retry: ret = node_read(&n, &t, 1, &release);
|
||||
if (ret == 0)
|
||||
goto retry;
|
||||
else if (ret < 0)
|
||||
goto out;
|
||||
|
||||
sample_decref(t);
|
||||
io_print(&io, &t, 1);
|
||||
|
||||
out: sample_decref(t);
|
||||
}
|
||||
|
||||
ret = node_stop(&n);
|
||||
|
|
|
@ -94,7 +94,7 @@ fi
|
|||
# Set paths
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
# Declare location of config files
|
||||
CONFIG=$(mktemp /tmp/nodetype-benchmark-config-XXXX.conf)
|
||||
|
|
20
tests/integration/README.md
Normal file
20
tests/integration/README.md
Normal file
|
@ -0,0 +1,20 @@
|
|||
# Integration Tests
|
||||
|
||||
Run tests:
|
||||
|
||||
```
|
||||
$ BUILDDIR=/VILLASnode/build/ /VILLASnode/tools/integration-tests.sh
|
||||
```
|
||||
|
||||
There are two options for the test script:
|
||||
|
||||
```
|
||||
-v Show full test output
|
||||
-f FILTER Filter test cases
|
||||
```
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
$ BUILDDIR=/VILLASnode/build/ /VILLASnode/tools/integration-tests.sh -f pipe-loopback-socket -v
|
||||
```
|
|
@ -43,7 +43,7 @@ fi
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp /tmp/ib-configuration-XXXX.conf)
|
||||
CONFIG_FILE_TARGET=$(mktemp /tmp/ib-configuration-target-XXXX.conf)
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
INPUT_FILE=$(mktemp)
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
OUTPUT_FILE=$(mktemp)
|
||||
|
|
|
@ -24,42 +24,51 @@
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
STATS_LOG=$(mktemp)
|
||||
|
||||
RATE="33.0"
|
||||
|
||||
cat > ${CONFIG_FILE} <<EOF
|
||||
{
|
||||
"stats": 1.0,
|
||||
"nodes": {
|
||||
"stats_1": {
|
||||
"type": "stats",
|
||||
"node": "signal_1",
|
||||
"rate": 1.0
|
||||
"rate": 10.0,
|
||||
"in": {
|
||||
"signals": [
|
||||
{ "name": "gap", "stats": "signal_1.gap_sent.mean" },
|
||||
{ "name": "total", "stats": "signal_1.owd.total" }
|
||||
]
|
||||
}
|
||||
},
|
||||
"signal_1": {
|
||||
"type": "signal",
|
||||
"limit": 100,
|
||||
"rate": 10.0,
|
||||
"rate": ${RATE},
|
||||
"in" : {
|
||||
"hooks": [
|
||||
{ "type": "stats", "verbose": true }
|
||||
]
|
||||
}
|
||||
},
|
||||
"stats_log_1": {
|
||||
"type": "file",
|
||||
"format": "json",
|
||||
|
||||
"uri": "${STATS_LOG}"
|
||||
}
|
||||
},
|
||||
"paths": [
|
||||
{
|
||||
"in": "signal_1",
|
||||
"hooks" : [
|
||||
{ "type" : "print", "enabled" : false }
|
||||
]
|
||||
"in": "signal_1"
|
||||
},
|
||||
{
|
||||
"in": "stats_1",
|
||||
"hooks" : [
|
||||
{ "type" : "print" }
|
||||
]
|
||||
"out": "stats_log_1"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
@ -67,4 +76,16 @@ EOF
|
|||
|
||||
# Start node
|
||||
VILLAS_LOG_PREFIX=$(colorize "[Node] ") \
|
||||
villas-node ${CONFIG_FILE}
|
||||
villas-node ${CONFIG_FILE} &
|
||||
PID=$!
|
||||
|
||||
sleep 5
|
||||
|
||||
kill ${PID}
|
||||
|
||||
tail -n1 ${STATS_LOG} | jq -e "(.data[0] - 1/${RATE} | length) < 1e-4 and .data[1] == 99" > /dev/null
|
||||
RC=$?
|
||||
|
||||
rm ${STATS_LOG} ${CONFIG_FILE}
|
||||
|
||||
exit ${RC}
|
||||
|
|
|
@ -28,7 +28,7 @@ exit 99
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
INPUT_FILE=$(mktemp)
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
INPUT_FILE=$(mktemp)
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
INPUT_FILE=$(mktemp)
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
INPUT_FILE=$(mktemp)
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
INPUT_FILE=$(mktemp)
|
||||
|
|
|
@ -23,9 +23,14 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
##################################################################################
|
||||
|
||||
if [ -n "${CI}" ]; then
|
||||
echo "RTP tests are not ready yet"
|
||||
exit 99
|
||||
fi
|
||||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE_SRC=$(mktemp)
|
||||
CONFIG_FILE_DEST=$(mktemp)
|
||||
|
@ -35,11 +40,14 @@ OUTPUT_FILE=$(mktemp)
|
|||
FORMAT="villas.binary"
|
||||
VECTORIZE="1"
|
||||
|
||||
RATE=10000
|
||||
RATE=100
|
||||
NUM_SAMPLES=100
|
||||
|
||||
cat > ${CONFIG_FILE_SRC} << EOF
|
||||
{
|
||||
"logging" : {
|
||||
"level" : "debug"
|
||||
},
|
||||
"nodes" : {
|
||||
"rtp_node" : {
|
||||
"type" : "rtp",
|
||||
|
@ -47,7 +55,7 @@ cat > ${CONFIG_FILE_SRC} << EOF
|
|||
"vectorize" : ${VECTORIZE},
|
||||
"rate" : ${RATE},
|
||||
"rtcp" : {
|
||||
"enabled" : false,
|
||||
"enabled" : true,
|
||||
"mode" : "aimd",
|
||||
"throttle_mode" : "decimate"
|
||||
},
|
||||
|
@ -72,6 +80,9 @@ EOF
|
|||
|
||||
cat > ${CONFIG_FILE_DEST} << EOF
|
||||
{
|
||||
"logging" : {
|
||||
"level" : "debug"
|
||||
},
|
||||
"nodes" : {
|
||||
"rtp_node" : {
|
||||
"type" : "rtp",
|
||||
|
@ -79,7 +90,7 @@ cat > ${CONFIG_FILE_DEST} << EOF
|
|||
"vectorize" : ${VECTORIZE},
|
||||
"rate" : ${RATE},
|
||||
"rtcp": {
|
||||
"enabled" : false,
|
||||
"enabled" : true,
|
||||
"mode" : "aimd",
|
||||
"throttle_mode" : "decimate"
|
||||
},
|
||||
|
@ -102,11 +113,16 @@ cat > ${CONFIG_FILE_DEST} << EOF
|
|||
}
|
||||
EOF
|
||||
|
||||
villas-signal mixed -v 5 -l ${NUM_SAMPLES} > ${INPUT_FILE}
|
||||
|
||||
VILLAS_LOG_PREFIX="[DEST] " \
|
||||
villas-pipe -l ${NUM_SAMPLES} ${CONFIG_FILE_DEST} rtp_node > ${OUTPUT_FILE} &
|
||||
PID=$!
|
||||
|
||||
villas-pipe ${CONFIG_FILE_SRC} rtp_node < ${INPUT_FILE}
|
||||
sleep 1
|
||||
|
||||
VILLAS_LOG_PREFIX="[SIGN] " \
|
||||
villas-signal mixed -v 5 -r ${RATE} -l ${NUM_SAMPLES} | tee ${INPUT_FILE} | \
|
||||
VILLAS_LOG_PREFIX="[SRC] " \
|
||||
villas-pipe ${CONFIG_FILE_SRC} rtp_node > ${OUTPUT_FILE}
|
||||
|
||||
# Compare data
|
||||
villas-test-cmp ${CMPFLAGS} ${INPUT_FILE} ${OUTPUT_FILE}
|
||||
|
@ -114,4 +130,6 @@ RC=$?
|
|||
|
||||
rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE}
|
||||
|
||||
kill $PID
|
||||
|
||||
exit $RC
|
||||
|
|
140
tests/integration/pipe-loopback-rtp-tbf.sh
Executable file
140
tests/integration/pipe-loopback-rtp-tbf.sh
Executable file
|
@ -0,0 +1,140 @@
|
|||
#!/bin/bash
|
||||
#
|
||||
# Integration loopback test for villas-pipe.
|
||||
#
|
||||
# @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
# @author Marvin Klimke <marvin.klimke@rwth-aachen.de>
|
||||
# @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
|
||||
# @license GNU General Public License (version 3)
|
||||
#
|
||||
# VILLASnode
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
##################################################################################
|
||||
|
||||
if [ -n "${CI}" ]; then
|
||||
echo "RTP tests are not ready yet"
|
||||
exit 99
|
||||
fi
|
||||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE_SRC=$(mktemp)
|
||||
CONFIG_FILE_DEST=$(mktemp)
|
||||
INPUT_FILE=$(mktemp)
|
||||
OUTPUT_FILE=$(mktemp)
|
||||
|
||||
FORMAT="villas.binary"
|
||||
VECTORIZE="1"
|
||||
|
||||
RATE=500
|
||||
NUM_SAMPLES=10000000
|
||||
NUM_VALUES=5
|
||||
|
||||
cat > ${CONFIG_FILE_SRC} << EOF
|
||||
{
|
||||
"logging" : {
|
||||
"level" : "info"
|
||||
},
|
||||
"nodes" : {
|
||||
"rtp_node" : {
|
||||
"type" : "rtp",
|
||||
"format" : "${FORMAT}",
|
||||
"vectorize" : ${VECTORIZE},
|
||||
"rate" : ${RATE},
|
||||
"rtcp" : {
|
||||
"enabled" : true,
|
||||
"mode" : "aimd",
|
||||
"throttle_mode" : "decimate"
|
||||
},
|
||||
"aimd" : {
|
||||
"a" : 10,
|
||||
"b" : 0.75,
|
||||
"start_rate" : ${RATE}
|
||||
},
|
||||
"in" : {
|
||||
"address" : "0.0.0.0:12002",
|
||||
"signals" : {
|
||||
"count" : ${NUM_VALUES},
|
||||
"type" : "float"
|
||||
}
|
||||
},
|
||||
"out" : {
|
||||
"address" : "127.0.0.1:12000",
|
||||
"fwmark" : 123
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
cat > ${CONFIG_FILE_DEST} << EOF
|
||||
{
|
||||
"logging" : {
|
||||
"level" : "info"
|
||||
},
|
||||
"nodes" : {
|
||||
"rtp_node" : {
|
||||
"type" : "rtp",
|
||||
"format" : "${FORMAT}",
|
||||
"vectorize" : ${VECTORIZE},
|
||||
"rate" : ${RATE},
|
||||
"rtcp": {
|
||||
"enabled" : true,
|
||||
"mode" : "aimd",
|
||||
"throttle_mode" : "decimate"
|
||||
},
|
||||
"in" : {
|
||||
"address" : "0.0.0.0:12000",
|
||||
"signals" : {
|
||||
"count" : ${NUM_VALUES},
|
||||
"type" : "float"
|
||||
}
|
||||
},
|
||||
"out" : {
|
||||
"address" : "127.0.0.1:12002"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
EOF
|
||||
|
||||
tc qdisc del dev lo root
|
||||
tc qdisc add dev lo root handle 4000 prio bands 4 priomap 1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1
|
||||
tc qdisc add dev lo parent 4000:3 tbf rate 40kbps burst 32kbit latency 200ms #peakrate 40kbps mtu 1000 minburst 1520
|
||||
tc filter add dev lo protocol ip handle 123 fw flowid 4000:3
|
||||
|
||||
#exit
|
||||
|
||||
VILLAS_LOG_PREFIX="[DEST] " \
|
||||
villas-pipe -l ${NUM_SAMPLES} ${CONFIG_FILE_DEST} rtp_node > ${OUTPUT_FILE} &
|
||||
PID=$!
|
||||
|
||||
sleep 1
|
||||
|
||||
VILLAS_LOG_PREFIX="[SIGN] " \
|
||||
villas-signal mixed -v ${NUM_VALUES} -r ${RATE} -l ${NUM_SAMPLES} | tee ${INPUT_FILE} | \
|
||||
VILLAS_LOG_PREFIX="[SRC] " \
|
||||
villas-pipe ${CONFIG_FILE_SRC} rtp_node > ${OUTPUT_FILE}
|
||||
|
||||
# Compare data
|
||||
villas-test-cmp ${CMPFLAGS} ${INPUT_FILE} ${OUTPUT_FILE}
|
||||
RC=$?
|
||||
|
||||
rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE}
|
||||
|
||||
kill $PID
|
||||
exit $RC
|
|
@ -22,9 +22,14 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
##################################################################################
|
||||
|
||||
if [ -n "${CI}" ]; then
|
||||
echo "RTP tests are not ready yet"
|
||||
exit 99
|
||||
fi
|
||||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
INPUT_FILE=$(mktemp)
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
INPUT_FILE=$(mktemp)
|
||||
|
@ -37,9 +37,8 @@ NUM_VALUES=${NUM_VALUES:-4}
|
|||
# Generate test data
|
||||
villas-signal -v ${NUM_VALUES} -l ${NUM_SAMPLES} -n random > ${INPUT_FILE}
|
||||
|
||||
for FORMAT in villas.human villas.binary villas.web csv json gtnet.fake raw.32.le raw.64.be protobuf; do
|
||||
for FORMAT in villas.human gtnet.fake protobuf; do
|
||||
for LAYER in udp ip eth unix; do
|
||||
for VERIFY_SOURCE in true false; do
|
||||
|
||||
VECTORIZES="1"
|
||||
|
||||
|
@ -88,7 +87,6 @@ cat > ${CONFIG_FILE} << EOF
|
|||
},
|
||||
"in" : {
|
||||
"address" : "${LOCAL}",
|
||||
"verify_source" : ${VERIFY_SOURCE},
|
||||
"signals" : {
|
||||
"count" : ${NUM_VALUES},
|
||||
"type" : "float"
|
||||
|
@ -111,7 +109,7 @@ villas-test-cmp ${CMPFLAGS} ${INPUT_FILE} ${OUTPUT_FILE}
|
|||
RC=$?
|
||||
|
||||
if (( ${RC} != 0 )); then
|
||||
echo "=========== Sub-test failed for: format=${FORMAT}, layer=${LAYER}, verify_source=${VERIFY_SOURCE}, vectorize=${VECTORIZE}"
|
||||
echo "=========== Sub-test failed for: format=${FORMAT}, layer=${LAYER}, vectorize=${VECTORIZE}"
|
||||
echo "Config:"
|
||||
cat ${CONFIG_FILE}
|
||||
echo
|
||||
|
@ -122,10 +120,10 @@ if (( ${RC} != 0 )); then
|
|||
cat ${OUTPUT_FILE}
|
||||
exit ${RC}
|
||||
else
|
||||
echo "=========== Sub-test succeeded for: format=${FORMAT}, layer=${LAYER}, verify_source=${VERIFY_SOURCE}, vectorize=${VECTORIZE}"
|
||||
echo "=========== Sub-test succeeded for: format=${FORMAT}, layer=${LAYER}, vectorize=${VECTORIZE}"
|
||||
fi
|
||||
|
||||
done; done; done; done
|
||||
done; done; done
|
||||
|
||||
rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE} ${THEORIES}
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
CONFIG_FILE2=$(mktemp)
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
|
||||
source ${SCRIPTPATH}/../../tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
INPUT_FILE=$(mktemp)
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
SCRIPT=$(realpath $0)
|
||||
SCRIPTPATH=$(dirname ${SCRIPT})
|
||||
SRCDIR=$(realpath ${SCRIPTPATH}/../..)
|
||||
source ${SRCDIR}/tools/integration-tests-helper.sh
|
||||
source ${SRCDIR}/tools/villas-helper.sh
|
||||
|
||||
CONFIG_FILE=$(mktemp)
|
||||
INPUT_FILE=$(mktemp)
|
||||
|
|
|
@ -73,8 +73,8 @@ Test(mapping, parse_nodes)
|
|||
cr_assert_eq(ret, 0);
|
||||
cr_assert_eq(m.node, vlist_lookup(&nodes, "cherry"));
|
||||
cr_assert_eq(m.type, MAPPING_TYPE_STATS);
|
||||
cr_assert_eq(m.stats.id, STATS_OWD);
|
||||
cr_assert_eq(m.stats.type, MAPPING_STATS_TYPE_MEAN);
|
||||
cr_assert_eq(m.stats.metric, STATS_METRIC_OWD);
|
||||
cr_assert_eq(m.stats.type, STATS_TYPE_MEAN);
|
||||
|
||||
ret = mapping_parse_str(&m, "carrot.data[1-2]", &nodes);
|
||||
cr_assert_eq(ret, 0);
|
||||
|
@ -126,8 +126,8 @@ Test(mapping, parse)
|
|||
ret = mapping_parse_str(&m, "stats.owd.mean", nullptr);
|
||||
cr_assert_eq(ret, 0);
|
||||
cr_assert_eq(m.type, MAPPING_TYPE_STATS);
|
||||
cr_assert_eq(m.stats.id, STATS_OWD);
|
||||
cr_assert_eq(m.stats.type, MAPPING_STATS_TYPE_MEAN);
|
||||
cr_assert_eq(m.stats.metric, STATS_METRIC_OWD);
|
||||
cr_assert_eq(m.stats.type, STATS_TYPE_MEAN);
|
||||
|
||||
ret = mapping_parse_str(&m, "data[1-2]", nullptr);
|
||||
cr_assert_eq(ret, 0);
|
||||
|
@ -171,10 +171,10 @@ Test(mapping, parse)
|
|||
cr_assert_neq(ret, 0);
|
||||
|
||||
/* Check for superfluous chars at the end */
|
||||
ret = mapping_parse_str(&m, "stats.ts.origin.bla", nullptr);
|
||||
ret = mapping_parse_str(&m, "hdr.ts.origin.bla", nullptr);
|
||||
cr_assert_neq(ret, 0);
|
||||
|
||||
ret = mapping_parse_str(&m, "stats.ts.origin.", nullptr);
|
||||
ret = mapping_parse_str(&m, "hdr.ts.origin.", nullptr);
|
||||
cr_assert_neq(ret, 0);
|
||||
|
||||
ret = mapping_parse_str(&m, "data[1-2]bla", nullptr);
|
||||
|
|
27
tools/plots/plot_aimd.py
Normal file
27
tools/plots/plot_aimd.py
Normal file
|
@ -0,0 +1,27 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import matplotlib as mpl
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
def read_datafile(file_name):
|
||||
# the skiprows keyword is for heading, but I don't know if trailing lines
|
||||
# can be specified
|
||||
data = np.loadtxt(file_name, delimiter='\t', skiprows=1)
|
||||
return data
|
||||
|
||||
filename = sys.argv[1]
|
||||
|
||||
data = read_datafile(filename)
|
||||
|
||||
print(data[:,0])
|
||||
|
||||
fig, ax1 = plt.subplots()
|
||||
ax1.plot(data[:,0], data[:,1])
|
||||
|
||||
ax2 = ax1.twinx()
|
||||
ax2.plot(data[:,0], data[:,2], c='red')
|
||||
|
||||
fig.tight_layout()
|
||||
plt.show()
|
Loading…
Add table
Reference in a new issue