diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index cf7939af5..82e17afdf 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -4,7 +4,7 @@ variables: PREFIX: /usr/ RSYNC_OPTS: --recursive --ignore-missing-args --chown ${DEPLOY_USER}:${DEPLOY_USER} CRITERION_OPTS: --ignore-warnings - DOCKER_TAG: $CI_COMMIT_TAG + DOCKER_TAG: ${CI_COMMIT_TAG} DOCKER_TAG_DEV: ${CI_COMMIT_REF_NAME} DOCKER_IMAGE: villas/node DOCKER_IMAGE_DEV: villas/node-dev @@ -28,7 +28,9 @@ before_script: docker-dev: stage: prepare script: - - docker build -f packaging/docker/Dockerfile.dev -t ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV} . + - docker build + --file packaging/docker/Dockerfile.dev + --tag ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV} . tags: - shell - linux @@ -105,18 +107,6 @@ test:unit: tags: - docker -test:unit-common: - stage: test - dependencies: - - build:source - script: - - mkdir -p build && cd build - - cmake .. && make unit-tests-common - - "common/tests/unit-tests-common || true" - image: ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV} - tags: - - docker - test:integration: stage: test dependencies: @@ -129,7 +119,7 @@ test:integration: name: ${CI_PROJECT_NAME}-integration-tests-${CI_BUILD_REF} when: always paths: - - build/release/tests/integration/ + - build/tests/integration/ image: ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV} tags: - docker @@ -170,8 +160,10 @@ deploy:packages: docker: stage: docker script: - - docker build -f packaging/docker/Dockerfile.app -t ${DOCKER_IMAGE}:${DOCKER_TAG} . - - docker push ${DOCKER_IMAGE}:${DOCKER_TAG} + - docker build + --build-arg BUILDER_IMAGE=${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV} + --file packaging/docker/Dockerfile.app + --tag ${DOCKER_IMAGE}:${DOCKER_TAG_DEV} . - docker push ${DOCKER_IMAGE_DEV}:${DOCKER_TAG_DEV} tags: - shell diff --git a/common b/common index 02122f404..277515487 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit 02122f404c833a90482e325cd8327fea36575e31 +Subproject commit 27751548785eebaeb160f32b923c7949de4faa5e diff --git a/doc/CMakeLists.txt b/doc/CMakeLists.txt index 75d931610..529119fb3 100644 --- a/doc/CMakeLists.txt +++ b/doc/CMakeLists.txt @@ -56,6 +56,13 @@ if(DOXYGEN_FOUND) WORKING_DIRECTORY ${PROJECT_DIR} ) + # Ensure that documentation is built before installing it + install(CODE "execute_process( + COMMAND ${CMAKE_COMMAND} --build \"${CMAKE_CURRENT_BINARY_DIR}\" --target doc + WORKING_DIRECTORY \"${CMAKE_CURRENT_BINARY_DIR}\" + )" + ) + install( DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/html DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/doc/villas/node diff --git a/etc/labs/lab10_nodes.conf b/etc/labs/lab10_nodes.conf new file mode 100644 index 000000000..c662f3fc0 --- /dev/null +++ b/etc/labs/lab10_nodes.conf @@ -0,0 +1,78 @@ +stats = 5.0; +hugepages = 200; + +nodes = { + # Node names can be any alphanumeric value + rpi-1 = { + type = "socket"; + layer = "udp"; + format = "gtnet" # pre-built format to communicate in RTDS GTNET-SKT payload + + in = { + address = "*:12005" # villas node machine IP and port number + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "192.168.0.5:12005" # remote machine IP and port number + }, + + hooks = ( + { + type = "stats", + warmup = 3000 + } + ) + }, + rpi-2 = { + type = "socket"; + layer = "udp"; + format = "gtnet" # pre-built format to communicate in RTDS GTNET-SKT payload + + in = { + address = "*:12006" # villas node machine IP and port number + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "192.168.0.6:12006" # remote machine IP and port number + }, + + hooks = ( + { + type = "stats", + warmup = 3000 + } + ) + }, + rtds-1 = { + type = "socket"; + layer = "udp"; + format = "gtnet"; + + in = { + address = "*:12083" # villas node machine IP and port number + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "192.168.0.4:12083" # remote machine IP and port number + }, + + hooks = ( + { + type = "stats", + warmup = 3000 + } + ) + } +} diff --git a/etc/labs/lab10_path_bidir.conf b/etc/labs/lab10_path_bidir.conf new file mode 100644 index 000000000..3305ec856 --- /dev/null +++ b/etc/labs/lab10_path_bidir.conf @@ -0,0 +1,22 @@ +@include "lab10_nodes.conf" + +paths = ( + # Each path dictionary corresponds to one way communnication + { + in = [ "rpi-1" ], + out = [ "rtds-1" ] + }, + { + in = [ "rtds-1" ], + out = [ "rpi-1" ] + } + + # Alternatively, you can use a single path specification + # and set reverse = true + # Example: + # { + # in = [ "rpi-1" ], + # out = [ "rtds-1" ], + # reverse = true + # } +) diff --git a/etc/labs/lab10_path_hook.conf b/etc/labs/lab10_path_hook.conf new file mode 100644 index 000000000..ac3aac655 --- /dev/null +++ b/etc/labs/lab10_path_hook.conf @@ -0,0 +1,12 @@ +@include "lab10_nodes.conf" + +paths = ( + { + in = [ "rpi-1" ], + out = [ "rtds-1" ], + + hooks = ( + { type = "print", output = "stdout" } + ) + } +) diff --git a/etc/labs/lab10_path_multiple_destinations.conf b/etc/labs/lab10_path_multiple_destinations.conf new file mode 100644 index 000000000..9a0c585e3 --- /dev/null +++ b/etc/labs/lab10_path_multiple_destinations.conf @@ -0,0 +1,8 @@ +@include "lab10_nodes.conf" + +paths = ( + { + in = [ "rtds-1" ], + out = [ "rpi-1", "rpi-2" ] + } +) diff --git a/etc/labs/lab10_path_uni.conf b/etc/labs/lab10_path_uni.conf new file mode 100644 index 000000000..db2b28756 --- /dev/null +++ b/etc/labs/lab10_path_uni.conf @@ -0,0 +1,8 @@ +@include "lab10_nodes.conf" + +paths = ( + { + in = [ "rpi-1" ], + out = [ "rtds-1" ] + } +) diff --git a/etc/labs/lab11.conf b/etc/labs/lab11.conf new file mode 100644 index 000000000..31f0173f6 --- /dev/null +++ b/etc/labs/lab11.conf @@ -0,0 +1,83 @@ +nodes = { + rtds_gtnet1 = { + type = "socket", + layer = "udp", + header = "gtnet-skt", + + in = { + address = "*:12000", + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "134.130.169.89:12000" + } + }, + rtds_gtnet2 = { + type = "socket", + layer = "udp", + header = "gtnet-skt", + + in = { + address = "*:12001", + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "134.130.169.90:12001" + } + }, + monitoring = { + type = "websocket" + }, + monitoring_log = { + type = "file", + + out = { + uri = "ftp://acs:fake@134.130.169.32/var/villas/log/monitoring_%Y-%m-%d_%H_%M_%S.dat" + } + } +} + +paths = [ + { + # Combine data from rtds_gtnet1 and rtds_gtnet2 + in = [ + "rtds_gtnet1.hdr.ts.origin", + "rtds_gtnet1.hdr.sequence", + "rtds_gtnet1.data[0-6]", + + "rtds_gtnet2.hdr.ts.origin", + "rtds_gtnet2.hdr.sequence", + "rtds_gtnet2.data[0-6]", + ], + + out = [ + "monitoring", + "monitoring_log" + ], + + reverse = false, + + # The mode of a path determines when the path is triggered + # and forwarding samples to its destintation nodes. + mode = "any", + + # List of nodes which trigger the path + mask = [ "rtds_gtnet_1", "rtds_gtnet_2" ], + + hooks = ( + # We dont want to overload the WebBrowsers + { + type = "decimate", + ratio = 10 + } + ) + } +] diff --git a/etc/labs/lab12.conf b/etc/labs/lab12.conf new file mode 100644 index 000000000..5b513a052 --- /dev/null +++ b/etc/labs/lab12.conf @@ -0,0 +1,48 @@ +nodes = { + udp_node1 = { + type = "socket", + layer = "udp", + + in = { + address = "*:12000" + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "127.0.0.1:12001" + } + }, + web_node1 = { + type = "websocket", + + vectorize = 2, + series = ( + { label = "Random walk", unit = "V" }, + { label = "Sine", unit = "A" }, + { label = "Rect", unit = "Var" }, + { label = "Ramp", unit = "°C" } + ) + } +} + +paths = ( + { + in = [ "udp_node1" ], + out = [ "web_node1" ], + + hooks = ( + # We dont want to overload the WebBrowsers + { type = "decimate", ratio = 2 } + ) + }, + + { + in = [ "web_node1" ], + out = [ "udp_node1" ] + + # Web -> UDP does not require decimation + } +) diff --git a/etc/labs/lab13.conf b/etc/labs/lab13.conf new file mode 100644 index 000000000..bf6b0d476 --- /dev/null +++ b/etc/labs/lab13.conf @@ -0,0 +1,47 @@ +affinity = 0x8, + +nodes = { + rtds_gtnet1 = { + type = "socket", + layer = "udp", + header = "gtnet-skt", + + in = { + address = "*:12000" + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "134.130.169.98:12000" + } + }, + rtds_gtnet2 = { + type = "socket", + layer = "udp", + header = "gtnet-skt", + + in = { + address = "*:12001" + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "134.130.169.99:12001" + } + } +} + +paths = ( + { + in = [ "rtds_gtnet1" ], + out = [ "rtds_gtnet2" ], + + reverse = true + } +) diff --git a/etc/labs/lab3.conf b/etc/labs/lab3.conf new file mode 100644 index 000000000..a6fd04e3b --- /dev/null +++ b/etc/labs/lab3.conf @@ -0,0 +1,18 @@ +nodes = { + udp_node1 = { + type = "socket", + layer = "udp", + + in = { + address = "*:12000" + + signals = { + count = 3 + type = "float" + } + }, + out = { + address = "127.0.0.1:12001" + } + } +} diff --git a/etc/labs/lab3.pcap b/etc/labs/lab3.pcap new file mode 100644 index 000000000..57d1e7482 Binary files /dev/null and b/etc/labs/lab3.pcap differ diff --git a/etc/labs/lab4.conf b/etc/labs/lab4.conf new file mode 100644 index 000000000..b92cdb600 --- /dev/null +++ b/etc/labs/lab4.conf @@ -0,0 +1,18 @@ +nodes = { + udp_node1 = { + type = "socket", + layer = "udp", + + in = { + address = "*:12000" + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "127.0.0.1:12001" + } + } +} diff --git a/etc/labs/lab5.conf b/etc/labs/lab5.conf new file mode 100644 index 000000000..bd8303f66 --- /dev/null +++ b/etc/labs/lab5.conf @@ -0,0 +1,19 @@ +nodes = { + rtds_gtnet1 = { + type = "socket", + layer = "udp", + header = "gtnet-skt", + + in = { + address = "*:12000" + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "134.130.169.89:12000" + } + } +} diff --git a/etc/labs/lab7.conf b/etc/labs/lab7.conf new file mode 100644 index 000000000..a80dab8bb --- /dev/null +++ b/etc/labs/lab7.conf @@ -0,0 +1,9 @@ +nodes = { + file_node1 = { + type = "file", + + in = { + uri = "file_send.dat" + } + } +} \ No newline at end of file diff --git a/etc/labs/lab8.conf b/etc/labs/lab8.conf new file mode 100644 index 000000000..c0a722a4f --- /dev/null +++ b/etc/labs/lab8.conf @@ -0,0 +1,41 @@ +nodes = { + udp_node1 = { + type = "socket", + layer = "udp", + + in = { + address = "*:12000" + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "127.0.0.1:12001" + } + } +} + +paths = [ + { + in = [ "udp_node1" ], + out = [ "udp_node1" ], + + hooks = [ + { + type = "decimate", + priority = 1, + + # Hook specific parameters follow + # [paramter1] = [value1] + ratio = 2 + }, + { + type = "map", + + mapping = [ "data[3]", "data[2]", "data[1]", "data[0]", "hdr.sequence", "ts.origin" ] + } + ] + } +] diff --git a/etc/labs/lab9_netem.conf b/etc/labs/lab9_netem.conf new file mode 100644 index 000000000..54f69dd28 --- /dev/null +++ b/etc/labs/lab9_netem.conf @@ -0,0 +1,28 @@ +nodes = { + udp_node1 = { + type = "socket", + layer = "udp", + + in = { + address = "*:12000" + + signals = { + count = 8, + type = "float" + } + }, + out = { + address = "127.0.0.1:12001", + + netem = { + enabled = true, + loss = 0, # in % + corrupt = 0, # in % + duplicate = 0, # in % + delay = 100000, # in uS + jitter = 5000, # in uS + distribution = "normal" + } + } + } +} diff --git a/etc/loopback.json b/etc/loopback.json index 2fb152a2f..54929849a 100644 --- a/etc/loopback.json +++ b/etc/loopback.json @@ -1,4 +1,6 @@ { + "idle_stop" : false, + "http" : { "port" : 8080 }, diff --git a/include/villas/api/server.hpp b/include/villas/api/server.hpp index 3b6c9c320..85bd4afc8 100644 --- a/include/villas/api/server.hpp +++ b/include/villas/api/server.hpp @@ -27,6 +27,8 @@ #include #include +#include +#include #include #include @@ -62,6 +64,8 @@ protected: void acceptNewSession(); void closeSession(sessions::Socket *s); + struct sockaddr_un getSocketAddress(); + public: Server(Api *a); ~Server(); diff --git a/include/villas/mapping.h b/include/villas/mapping.h index 8291cb2f2..6c9522af3 100644 --- a/include/villas/mapping.h +++ b/include/villas/mapping.h @@ -45,16 +45,6 @@ enum mapping_type { MAPPING_TYPE_TIMESTAMP }; -enum mapping_stats_type { - MAPPING_STATS_TYPE_LAST, - MAPPING_STATS_TYPE_HIGHEST, - MAPPING_STATS_TYPE_LOWEST, - MAPPING_STATS_TYPE_MEAN, - MAPPING_STATS_TYPE_VAR, - MAPPING_STATS_TYPE_STDDEV, - MAPPING_STATS_TYPE_TOTAL -}; - enum mapping_header_type { MAPPING_HEADER_TYPE_LENGTH, MAPPING_HEADER_TYPE_SEQUENCE @@ -84,8 +74,8 @@ struct mapping_entry { } data; struct { - enum stats_id id; - enum mapping_stats_type type; + enum stats_metric metric; + enum stats_type type; } stats; struct { diff --git a/include/villas/node.h b/include/villas/node.h index 38557ebb8..c97bf7914 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -84,12 +84,14 @@ struct node { struct node_direction in, out; -#ifdef WITH_NETEM - int mark; /**< Socket mark for netem, routing and filtering */ +#ifdef __linux__ + int fwmark; /**< Socket mark for netem, routing and filtering */ +#ifdef WITH_NETEM struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */ struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */ #endif /* WITH_NETEM */ +#endif /* __linux__ */ struct node_type *_vt; /**< Virtual functions (C++ OOP style) */ void *_vd; /**< Virtual data (used by struct node::_vt functions) */ @@ -201,6 +203,8 @@ struct node_type * node_type(struct node *n); struct memory_type * node_memory_type(struct node *n, struct memory_type *parent); +int node_is_valid_name(const char *name); + #ifdef __cplusplus } #endif diff --git a/include/villas/nodes/stats.h b/include/villas/nodes/stats.h index 455a27f1c..b4574d6b9 100644 --- a/include/villas/nodes/stats.h +++ b/include/villas/nodes/stats.h @@ -31,7 +31,9 @@ #include +#include #include +#include #ifdef __cplusplus extern "C" { @@ -42,13 +44,20 @@ struct node; struct sample; struct super_node; +struct stats_node_signal { + struct node *node; + char *node_str; + + enum stats_metric metric; + enum stats_type type; +}; + struct stats_node { double rate; - char *node_str; struct task task; - struct node *node; + struct vlist signals; /** List of type struct stats_node_signal */ }; /** @see node_type::print */ @@ -60,6 +69,8 @@ char *stats_node_print(struct node *n); /** @see node_type::parse */ int stats_node_parse(struct node *n, json_t *cfg); +int stats_node_parse_signal(struct stats_node_signal *s, json_t *cfg); + /** @see node_type::start */ int stats_node_start(struct node *n); diff --git a/include/villas/path.h b/include/villas/path.h index 8e47612cd..622c263a3 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -172,6 +172,8 @@ int path_uses_node(struct path *p, struct node *n); */ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes); +int path_is_simple(struct path *p); + /** @} */ #ifdef __cplusplus diff --git a/include/villas/stats.h b/include/villas/stats.h index 3fd65e32d..a7a75e9e5 100644 --- a/include/villas/stats.h +++ b/include/villas/stats.h @@ -27,6 +27,7 @@ #include #include +#include #ifdef __cplusplus extern "C" { @@ -42,44 +43,68 @@ enum stats_format { STATS_FORMAT_MATLAB }; -enum stats_id { - STATS_SKIPPED, /**< Counter for skipped samples due to hooks. */ - STATS_REORDERED, /**< Counter for reordered samples. */ - STATS_GAP_SAMPLE, /**< Histogram for inter sample timestamps (as sent by remote). */ - STATS_GAP_RECEIVED, /**< Histogram for inter sample arrival time (as seen by this instance). */ - STATS_OWD, /**< Histogram for one-way-delay (OWD) of received samples. */ - STATS_COUNT /**< Just here to have an updated number of statistics. */ +enum stats_metric { + STATS_METRIC_INVALID = -1, + STATS_METRIC_SKIPPED, /**< Counter for skipped samples due to hooks. */ + STATS_METRIC_REORDERED, /**< Counter for reordered samples. */ + STATS_METRIC_GAP_SAMPLE, /**< Histogram for inter sample timestamps (as sent by remote). */ + STATS_METRIC_GAP_RECEIVED, /**< Histogram for inter sample arrival time (as seen by this instance). */ + STATS_METRIC_OWD, /**< Histogram for one-way-delay (OWD) of received samples. */ + STATS_METRIC_COUNT /**< Just here to have an updated number of statistics. */ }; -struct stats_desc { +enum stats_type { + STATS_TYPE_INVALID = -1, + STATS_TYPE_LAST, + STATS_TYPE_HIGHEST, + STATS_TYPE_LOWEST, + STATS_TYPE_MEAN, + STATS_TYPE_VAR, + STATS_TYPE_STDDEV, + STATS_TYPE_TOTAL, + STATS_TYPE_COUNT +}; + +struct stats_metric_description { const char *name; + enum stats_metric metric; const char *unit; const char *desc; int hist_buckets; }; -struct stats_delta { - double values[STATS_COUNT]; +struct stats_type_description { + const char *name; + enum stats_type type; + enum signal_type signal_type; +}; - int update; /**< Bitmask of stats_id. Only those which are masked will be updated */ +struct stats_delta { + double values[STATS_METRIC_COUNT]; + + int update; /**< Bitmask of stats_metric. Only those which are masked will be updated */ }; struct stats { - struct hist histograms[STATS_COUNT]; + struct hist histograms[STATS_METRIC_COUNT]; struct stats_delta *delta; }; -extern -struct stats_desc stats_metrics[]; +extern struct stats_metric_description stats_metrics[]; +extern struct stats_type_description stats_types[]; int stats_lookup_format(const char *str); +enum stats_metric stats_lookup_metric(const char *str); + +enum stats_type stats_lookup_type(const char *str); + int stats_init(struct stats *s, int buckets, int warmup); int stats_destroy(struct stats *s); -void stats_update(struct stats *s, enum stats_id id, double val); +void stats_update(struct stats *s, enum stats_metric id, double val); void stats_collect(struct stats *s, struct sample *smps[], size_t *cnt); @@ -95,7 +120,7 @@ void stats_print_periodic(struct stats *s, FILE *f, enum stats_format fmt, int v void stats_print(struct stats *s, FILE *f, enum stats_format fmt, int verbose); -enum stats_id stats_lookup_id(const char *name); +union signal_data stats_get_value(const struct stats *s, enum stats_metric sm, enum stats_type st); #ifdef __cplusplus } diff --git a/include/villas/super_node.hpp b/include/villas/super_node.hpp index 370fcdbe1..805832d74 100644 --- a/include/villas/super_node.hpp +++ b/include/villas/super_node.hpp @@ -40,10 +40,11 @@ class SuperNode { protected: enum state state; + int idleStop; + int priority; /**< Process priority (lower is better) */ int affinity; /**< Process affinity of the server and all created threads */ int hugepages; /**< Number of hugepages to reserve. */ - double stats; /**< Interval for path statistics. Set to 0 to disable them. */ Logger logger; @@ -104,6 +105,11 @@ public: /** Run periodic hooks of this super node. */ int periodic(); + void setState(enum state st) + { + state = st; + } + struct node * getNode(const std::string &name) { return (struct node *) vlist_lookup(&nodes, name.c_str()); @@ -122,6 +128,10 @@ public: return &interfaces; } + enum state getState() { + return state; + } + #ifdef WITH_API Api * getApi() { return &api; diff --git a/lib/api/server.cpp b/lib/api/server.cpp index 5a743ed12..8f443a4a2 100644 --- a/lib/api/server.cpp +++ b/lib/api/server.cpp @@ -21,9 +21,8 @@ * along with this program. If not, see . *********************************************************************************/ -#include -#include #include +#include #include #include @@ -79,9 +78,36 @@ void Server::start() pfds.push_back(pfd); + struct sockaddr_un sun = getSocketAddress(); + + ret = bind(sd, (struct sockaddr *) &sun, sizeof(struct sockaddr_un)); + if (ret) + throw SystemError("Failed to bind API socket"); + + ret = listen(sd, 5); + if (ret) + throw SystemError("Failed to listen on API socket"); + + logger->info("Listening on UNIX socket: {}", sun.sun_path); + + state = STATE_STARTED; +} + +struct sockaddr_un Server::getSocketAddress() +{ struct sockaddr_un sun = { .sun_family = AF_UNIX }; - fs::path socketPath = PREFIX "/var/lib/villas"; + fs::path socketPath; + + if (getuid() == 0) { + socketPath = PREFIX "/var/lib/villas"; + } + else { + std::string homeDir = getenv("HOME"); + + socketPath = homeDir + "/.villas"; + } + if (!fs::exists(socketPath)) { logging.get("api")->info("Creating directory for API socket: {}", socketPath); fs::create_directories(socketPath); @@ -96,17 +122,7 @@ void Server::start() strncpy(sun.sun_path, socketPath.c_str(), sizeof(sun.sun_path) - 1); - ret = bind(sd, (struct sockaddr *) &sun, sizeof(struct sockaddr_un)); - if (ret) - throw SystemError("Failed to bind API socket"); - - ret = listen(sd, 5); - if (ret) - throw SystemError("Failed to listen on API socket"); - - logger->info("Listening on UNIX socket: {}", socketPath); - - state = STATE_STARTED; + return sun; } void Server::stop() diff --git a/lib/hooks/average.c b/lib/hooks/average.c index f50f8a982..010c76e2d 100644 --- a/lib/hooks/average.c +++ b/lib/hooks/average.c @@ -31,7 +31,7 @@ #include struct average { - int mask; + uint64_t mask; int offset; }; @@ -42,7 +42,7 @@ static int average_parse(struct hook *h, json_t *cfg) int ret; json_error_t err; - ret = json_unpack_ex(cfg, &err, 0, "{ s: i, s: i }", + ret = json_unpack_ex(cfg, &err, 0, "{ s: i, s: I }", "offset", &p->offset, "mask", &p->mask ); @@ -62,7 +62,7 @@ static int average_process(struct hook *h, struct sample *smps[], unsigned *cnt) int n = 0; for (int k = 0; k < smp->length; k++) { - if (!(p->mask & (1 << k))) + if (!(p->mask & (1LL << k))) continue; switch (sample_format(smps[i], k)) { diff --git a/lib/hooks/stats.c b/lib/hooks/stats.c index 80897c089..b5ed90675 100644 --- a/lib/hooks/stats.c +++ b/lib/hooks/stats.c @@ -168,18 +168,18 @@ static int stats_collect_process(struct hook *h, struct sample *smps[], unsigned if (previous) { if (current->flags & previous->flags & SAMPLE_HAS_TS_RECEIVED) - stats_update(s, STATS_GAP_RECEIVED, time_delta(&previous->ts.received, ¤t->ts.received)); + stats_update(s, STATS_METRIC_GAP_RECEIVED, time_delta(&previous->ts.received, ¤t->ts.received)); if (current->flags & previous->flags & SAMPLE_HAS_TS_ORIGIN) - stats_update(s, STATS_GAP_SAMPLE, time_delta(&previous->ts.origin, ¤t->ts.origin)); + stats_update(s, STATS_METRIC_GAP_SAMPLE, time_delta(&previous->ts.origin, ¤t->ts.origin)); if ((current->flags & SAMPLE_HAS_TS_ORIGIN) && (current->flags & SAMPLE_HAS_TS_RECEIVED)) - stats_update(s, STATS_OWD, time_delta(¤t->ts.origin, ¤t->ts.received)); + stats_update(s, STATS_METRIC_OWD, time_delta(¤t->ts.origin, ¤t->ts.received)); if (current->flags & previous->flags & SAMPLE_HAS_SEQUENCE) { dist = current->sequence - (int32_t) previous->sequence; if (dist != 1) - stats_update(s, STATS_REORDERED, dist); + stats_update(s, STATS_METRIC_REORDERED, dist); } } diff --git a/lib/kernel/if.c b/lib/kernel/if.c index b200c0742..d78eced93 100644 --- a/lib/kernel/if.c +++ b/lib/kernel/if.c @@ -73,23 +73,23 @@ int if_start(struct interface *i) //if_set_affinity(i, i->affinity); /* Assign fwmark's to nodes which have netem options */ - int ret, mark = 0; + int ret, fwmark = 0; for (size_t j = 0; j < vlist_length(&i->nodes); j++) { struct node *n = (struct node *) vlist_at(&i->nodes, j); - if (n->tc_qdisc) - n->mark = 1 + mark++; + if (n->tc_qdisc && n->fwmark < 0) + n->fwmark = 1 + fwmark++; } /* Abort if no node is using netem */ - if (mark == 0) + if (fwmark == 0) return 0; if (getuid() != 0) error("Network emulation requires super-user privileges!"); /* Replace root qdisc */ - ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, mark); + ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, fwmark); if (ret) error("Failed to setup priority queuing discipline: %s", nl_geterror(ret)); @@ -98,16 +98,16 @@ int if_start(struct interface *i) struct node *n = (struct node *) vlist_at(&i->nodes, j); if (n->tc_qdisc) { - ret = tc_mark(i, &n->tc_classifier, TC_HANDLE(1, n->mark), n->mark); + ret = tc_mark(i, &n->tc_classifier, TC_HANDLE(1, n->fwmark), n->fwmark); if (ret) error("Failed to setup FW mark classifier: %s", nl_geterror(ret)); char *buf = tc_netem_print(n->tc_qdisc); debug(LOG_IF | 5, "Starting network emulation on interface '%s' for FW mark %u: %s", - if_name(i), n->mark, buf); + if_name(i), n->fwmark, buf); free(buf); - ret = tc_netem(i, &n->tc_qdisc, TC_HANDLE(0x1000+n->mark, 0), TC_HANDLE(1, n->mark)); + ret = tc_netem(i, &n->tc_qdisc, TC_HANDLE(0x1000+n->fwmark, 0), TC_HANDLE(1, n->fwmark)); if (ret) error("Failed to setup netem qdisc: %s", nl_geterror(ret)); } diff --git a/lib/mapping.c b/lib/mapping.c index e09a6758d..5c89fd4ee 100644 --- a/lib/mapping.c +++ b/lib/mapping.c @@ -32,8 +32,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *nodes) { - int id; - char *cpy, *node, *type, *field, *subfield, *end; + char *cpy, *node, *type, *field, *end; cpy = strdup(str); if (!cpy) @@ -68,44 +67,21 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n me->type = MAPPING_TYPE_STATS; me->length = 1; - field = strtok(NULL, "."); - if (!field) { - warning("Missing stats type"); + char *metric = strtok(NULL, "."); + if (!metric) goto invalid_format; - } - subfield = strtok(NULL, "."); - if (!subfield) { - warning("Missing stats sub-type"); + type = strtok(NULL, "."); + if (!type) goto invalid_format; - } - id = stats_lookup_id(field); - if (id < 0) { - warning("Invalid stats type"); + me->stats.metric = stats_lookup_metric(metric); + if (me->stats.metric < 0) goto invalid_format; - } - me->stats.id = id; - - if (!strcmp(subfield, "total")) - me->stats.type = MAPPING_STATS_TYPE_TOTAL; - else if (!strcmp(subfield, "last")) - me->stats.type = MAPPING_STATS_TYPE_LAST; - else if (!strcmp(subfield, "lowest")) - me->stats.type = MAPPING_STATS_TYPE_LOWEST; - else if (!strcmp(subfield, "highest")) - me->stats.type = MAPPING_STATS_TYPE_HIGHEST; - else if (!strcmp(subfield, "mean")) - me->stats.type = MAPPING_STATS_TYPE_MEAN; - else if (!strcmp(subfield, "var")) - me->stats.type = MAPPING_STATS_TYPE_VAR; - else if (!strcmp(subfield, "stddev")) - me->stats.type = MAPPING_STATS_TYPE_STDDEV; - else { - warning("Invalid stats sub-type"); + me->stats.type = stats_lookup_type(type); + if (me->stats.type < 0) goto invalid_format; - } } else if (!strcmp(type, "hdr")) { me->type = MAPPING_TYPE_HEADER; @@ -276,35 +252,9 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons return -1; switch (me->type) { - case MAPPING_TYPE_STATS: { - const struct hist *h = &s->histograms[me->stats.id]; - - switch (me->stats.type) { - case MAPPING_STATS_TYPE_TOTAL: - remapped->data[off++].i = h->total; - break; - case MAPPING_STATS_TYPE_LAST: - remapped->data[off++].f = h->last; - break; - case MAPPING_STATS_TYPE_HIGHEST: - remapped->data[off++].f = h->highest; - break; - case MAPPING_STATS_TYPE_LOWEST: - remapped->data[off++].f = h->lowest; - break; - case MAPPING_STATS_TYPE_MEAN: - remapped->data[off++].f = hist_mean(h); - break; - case MAPPING_STATS_TYPE_STDDEV: - remapped->data[off++].f = hist_stddev(h); - break; - case MAPPING_STATS_TYPE_VAR: - remapped->data[off++].f = hist_var(h); - break; - default: - return -1; - } - } + case MAPPING_TYPE_STATS: + remapped->data[off++] = stats_get_value(s, me->stats.metric, me->stats.type); + break; case MAPPING_TYPE_TIMESTAMP: { const struct timespec *ts; @@ -380,40 +330,10 @@ int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str) switch (me->type) { case MAPPING_TYPE_STATS: - switch (me->stats.type) { - case MAPPING_STATS_TYPE_TOTAL: - type = "total"; - break; - - case MAPPING_STATS_TYPE_LAST: - type = "last"; - break; - - case MAPPING_STATS_TYPE_LOWEST: - type = "lowest"; - break; - - case MAPPING_STATS_TYPE_HIGHEST: - type = "highest"; - break; - - case MAPPING_STATS_TYPE_MEAN: - type = "mean"; - break; - - case MAPPING_STATS_TYPE_VAR: - type = "var"; - break; - - case MAPPING_STATS_TYPE_STDDEV: - type = "stddev"; - break; - - default: - type = NULL; - } - - strcatf(str, "stats.%s", type); + strcatf(str, "stats.%s.%s", + stats_metrics[me->stats.metric].name, + stats_types[me->stats.type].name + ); break; case MAPPING_TYPE_HEADER: diff --git a/lib/memory.c b/lib/memory.c index 4da9df6c3..a2f0b58d6 100644 --- a/lib/memory.c +++ b/lib/memory.c @@ -27,6 +27,7 @@ #include #include +#include #include #include @@ -70,10 +71,14 @@ int memory_init(int hugepages) int memory_lock(size_t lock) { -#if defined(__linux__) && defined(__x86_64__) int ret; + +#ifdef __linux__ + +#ifndef __arm__ struct rlimit l; + /* Increase ressource limit for locked memory */ ret = getrlimit(RLIMIT_MEMLOCK, &l); if (ret) return ret; @@ -81,11 +86,10 @@ int memory_lock(size_t lock) if (l.rlim_cur < lock) { if (l.rlim_max < lock) { if (getuid() != 0) { - warning("Failed to in increase ressource limit of locked memory from %lu to %zu bytes", l.rlim_cur, lock); - warning("Please re-run as super-user or raise manually via:"); + warning("Failed to in increase ressource limit of locked memory. Please increase manually by running as root:"); warning(" $ ulimit -Hl %zu", lock); - return -1; + goto out; } l.rlim_max = lock; @@ -99,7 +103,16 @@ int memory_lock(size_t lock) debug(LOG_MEM | 2, "Increased ressource limit of locked memory to %zd bytes", lock); } -#endif +#endif /* __arm__ */ +out: +#ifdef _POSIX_MEMLOCK + /* Lock all current and future memory allocations */ + ret = mlockall(MCL_CURRENT | MCL_FUTURE); + if (ret) + return -1; +#endif /* _POSIX_MEMLOCK */ + +#endif /* __linux__ */ return 0; } diff --git a/lib/memory/hugepage.c b/lib/memory/hugepage.c index af100a17a..276cdca25 100644 --- a/lib/memory/hugepage.c +++ b/lib/memory/hugepage.c @@ -65,10 +65,8 @@ int memory_hugepage_init(int hugepages) debug(LOG_MEM | 2, "Increased number of reserved hugepages from %d to %d", pagecnt, hugepages); } else { - warning("Failed to reserved hugepages. Please re-run as super-user or reserve manually via:"); + warning("Failed to reserved hugepages. Please reserve manually by running as root:"); warning(" $ echo %d > /proc/sys/vm/nr_hugepages", hugepages); - - return -1; } } #endif @@ -81,7 +79,7 @@ static struct memory_allocation * memory_hugepage_alloc(struct memory_type *m, s { static bool use_huge = true; - int ret, flags, fd; + int flags, fd; size_t sz; struct memory_allocation *ma = alloc(sizeof(struct memory_allocation)); @@ -120,7 +118,7 @@ retry: if (use_huge) { ma->address = mmap(NULL, ma->length, PROT_READ | PROT_WRITE, flags, fd, 0); if (ma->address == MAP_FAILED) { if (use_huge) { - warning("Failed to map hugepages, try with normal pages instead"); + warning("Failed to map hugepages, try with normal pages instead!"); use_huge = false; goto retry; } @@ -130,12 +128,6 @@ retry: if (use_huge) { } } - if (getuid() == 0) { - ret = mlock(ma->address, ma->length); - if (ret) - return NULL; - } - return ma; } diff --git a/lib/node.c b/lib/node.c index 3ef00a568..cec2e5b3a 100644 --- a/lib/node.c +++ b/lib/node.c @@ -21,6 +21,7 @@ *********************************************************************************/ #include +#include #include #include @@ -224,6 +225,10 @@ int node_init(struct node *n, struct node_type *vt) n->_name = NULL; n->_name_long = NULL; +#ifdef __linux__ + n->fwmark = -1; +#endif /* __linux__ */ + #ifdef WITH_NETEM n->tc_qdisc = NULL; n->tc_classifier = NULL; @@ -279,16 +284,24 @@ int node_parse(struct node *n, json_t *json, const char *name) n->name = strdup(name); - ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o }, s?: { s?: o } }", + ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o } }", "type", &type, "in", - "signals", &json_signals, - "out", - "netem", &json_netem + "signals", &json_signals ); if (ret) jerror(&err, "Failed to parse node %s", node_name(n)); +#ifdef __linux__ + ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: o, s?: i } }", + "out", + "netem", &json_netem, + "fwmark", &n->fwmark + ); + if (ret) + jerror(&err, "Failed to parse node %s", node_name(n)); +#endif /* __linux__ */ + nt = node_type_lookup(type); assert(nt == node_type(n)); @@ -395,18 +408,18 @@ int node_start(struct node *n) #ifdef __linux__ /* Set fwmark for outgoing packets if netem is enabled for this node */ - if (n->mark) { + if (n->fwmark) { int fds[16]; int num_sds = node_netem_fds(n, fds); for (int i = 0; i < num_sds; i++) { int fd = fds[i]; - ret = setsockopt(fd, SOL_SOCKET, SO_MARK, &n->mark, sizeof(n->mark)); + ret = setsockopt(fd, SOL_SOCKET, SO_MARK, &n->fwmark, sizeof(n->fwmark)); if (ret) serror("Failed to set FW mark for outgoing packets"); else - debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", fd, n->mark); + debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", fd, n->fwmark); } } #endif /* __linux__ */ @@ -421,7 +434,7 @@ int node_stop(struct node *n) { int ret; - if (n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT) + if (n->state != STATE_STOPPING && n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT) return 0; info("Stopping node %s", node_name(n)); @@ -548,12 +561,13 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel { int readd, nread = 0; - if (n->state == STATE_PAUSED) - return 0; - - assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT); assert(node_type(n)->read); + if (n->state == STATE_PAUSED || n->state == STATE_PENDING_CONNECT) + return 0; + else if (n->state != STATE_STARTED && n->state != STATE_CONNECTED) + return -1; + /* Send in parts if vector not supported */ if (node_type(n)->vectorize > 0 && node_type(n)->vectorize < cnt) { while (cnt - nread > 0) { @@ -576,7 +590,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel int skipped = nread - rread; if (skipped > 0 && n->stats != NULL) { - stats_update(n->stats, STATS_SKIPPED, skipped); + stats_update(n->stats, STATS_METRIC_SKIPPED, skipped); } debug(LOG_NODE | 5, "Received %u samples from node %s of which %d have been skipped", nread, node_name(n), skipped); @@ -593,9 +607,13 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re { int tosend, sent, nsent = 0; - assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED); assert(node_type(n)->write); + if (n->state == STATE_PAUSED || n->state == STATE_PENDING_CONNECT) + return 0; + else if (n->state != STATE_STARTED && n->state != STATE_CONNECTED) + return -1; + #ifdef WITH_HOOKS /* Run write hooks */ cnt = hook_process_list(&n->out.hooks, smps, cnt); @@ -608,12 +626,8 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re while (cnt - nsent > 0) { tosend = MIN(cnt - nsent, node_type(n)->vectorize); sent = node_type(n)->write(n, &smps[nsent], tosend, release); - if (sent < 0) { - warning("Failed to send samples to node %s: reason=%d", node_name(n), sent); + if (sent < 0) return sent; - } - else if (sent < tosend) - warning("Failed to send %d out of %d samples to node %s", tosend-sent, tosend, node_name(n)); nsent += sent; debug(LOG_NODE | 5, "Sent %u samples to node %s", sent, node_name(n)); @@ -621,13 +635,8 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re } else { nsent = node_type(n)->write(n, smps, cnt, release); - if (nsent < 0) { - warning("Failed to send samples to node %s: reason=%d", node_name(n), nsent); + if (nsent < 0) return nsent; - } - else if (nsent < cnt) - warning("Failed to send %d out of %d samples to node %s", cnt-nsent, cnt, node_name(n)); - debug(LOG_NODE | 5, "Sent %u samples to node %s", nsent, node_name(n)); } @@ -660,7 +669,7 @@ char * node_name_long(struct node *n) strcatf(&n->_name_long, ", out.netem=%s", n->tc_qdisc ? "yes" : "no"); if (n->tc_qdisc) - strcatf(&n->_name_long, ", mark=%d", n->mark); + strcatf(&n->_name_long, ", fwmark=%d", n->fwmark); #endif /* WITH_NETEM */ /* Append node-type specific details */ @@ -761,3 +770,15 @@ invalid2: return 0; } + +int node_is_valid_name(const char *name) +{ + for (const char *p = name; *p; p++) { + if (isalnum(*p) || (*p == '_') || (*p == '-')) + continue; + + return -1; + } + + return 0; +} diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 682e6669e..0a0f80303 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -370,10 +370,11 @@ retry: ret = io_scan(&f->io, smps, cnt); goto retry; case FILE_EOF_STOP: - info("Reached end-of-file. Stopping node %s", node_name(n)); + info("Reached end-of-file."); - killme(SIGTERM); - pause(); + n->state = STATE_STOPPING; + + return -1; } } else diff --git a/lib/nodes/iec61850.c b/lib/nodes/iec61850.c index 165fa5960..5a79fbd4b 100644 --- a/lib/nodes/iec61850.c +++ b/lib/nodes/iec61850.c @@ -101,55 +101,72 @@ int iec61850_parse_signals(json_t *json_signals, struct vlist *signals, struct v { int ret, total_size = 0; const char *iec_type; + const struct iec61850_type_descriptor *td; + struct signal *sig; ret = vlist_init(signals); if (ret) return ret; - json_t *json_signal; - size_t i; - json_array_foreach(json_signals, i, json_signal) { - const struct iec61850_type_descriptor *td; - struct signal *sig; + if (json_is_array(json_signals)) { - json_unpack(json_signal, "{ s?: s }", - "iec_type", &iec_type - ); + json_t *json_signal; + size_t i; + json_array_foreach(json_signals, i, json_signal) { + json_unpack(json_signal, "{ s?: s }", + "iec_type", &iec_type + ); - /* Try to deduct the IEC 61850 data type from VILLAS signal format */ - if (!iec_type) { - if (!node_signals) - return -1; - - sig = vlist_at(node_signals, i); - if (!sig) - return -1; - - switch (sig->type) { - case SIGNAL_TYPE_BOOLEAN: - iec_type = "boolean"; - break; - - case SIGNAL_TYPE_FLOAT: - iec_type = "float64"; - break; - - case SIGNAL_TYPE_INTEGER: - iec_type = "int64"; - break; - - default: + /* Try to deduct the IEC 61850 data type from VILLAS signal format */ + if (!iec_type) { + if (!node_signals) return -1; + + sig = vlist_at(node_signals, i); + if (!sig) + return -1; + + switch (sig->type) { + case SIGNAL_TYPE_BOOLEAN: + iec_type = "boolean"; + break; + + case SIGNAL_TYPE_FLOAT: + iec_type = "float64"; + break; + + case SIGNAL_TYPE_INTEGER: + iec_type = "int64"; + break; + + default: + return -1; + } } + + td = iec61850_lookup_type(iec_type); + if (!td) + return -1; + + vlist_push(signals, (void *) td); + + total_size += td->size; } + } + else { + ret = json_unpack(json_signals, "{ s: s }", "iec_type", &iec_type); + if (ret) + return ret; td = iec61850_lookup_type(iec_type); if (!td) return -1; - vlist_push(signals, (void *) td); + for (int i = 0; i < vlist_length(node_signals); i++) { + vlist_push(signals, (void *) td); - total_size += td->size; + total_size += td->size; + } } return total_size; diff --git a/lib/nodes/iec61850_sv.c b/lib/nodes/iec61850_sv.c index 9cf2e193c..19aeb5ecd 100644 --- a/lib/nodes/iec61850_sv.c +++ b/lib/nodes/iec61850_sv.c @@ -190,7 +190,7 @@ int iec61850_sv_parse(struct node *n, json_t *json) i->out.svid = svid ? strdup(svid) : NULL; - ret = iec61850_parse_signals(json_signals, &i->out.signals, NULL); + ret = iec61850_parse_signals(json_signals, &i->out.signals, &n->out.signals); if (ret <= 0) error("Failed to parse setting 'signals' of node %s", node_name(n)); @@ -352,7 +352,7 @@ int iec61850_sv_read(struct node *n, struct sample *smps[], unsigned cnt, unsign struct sample *smpt[cnt]; if (!i->in.enabled) - return 0; + return -1; pulled = queue_signalled_pull_many(&i->in.queue, (void **) smpt, cnt); @@ -367,7 +367,7 @@ int iec61850_sv_write(struct node *n, struct sample *smps[], unsigned cnt, unsig struct iec61850_sv *i = (struct iec61850_sv *) n->_vd; if (!i->out.enabled) - return 0; + return -1; for (unsigned j = 0; j < cnt; j++) { unsigned offset = 0; diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index 8125b1e1f..d48681823 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -63,7 +63,7 @@ static void mqtt_connect_cb(struct mosquitto *mosq, void *userdata, int result) if (m->subscribe) { ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos); if (ret) - warning("MQTT: failed to subscribe to topic '%s' for node %s", m->subscribe, node_name(n)); + warning("MQTT: failed to subscribe to topic '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); } else warning("MQTT: no subscribe for node %s as no subscribe topic is given", node_name(n)); @@ -207,11 +207,11 @@ int mqtt_parse(struct node *n, json_t *cfg) // Some checks ret = mosquitto_sub_topic_check(m->subscribe); if (ret != MOSQ_ERR_SUCCESS) - error("Invalid subscribe topic: '%s' for node %s", m->subscribe, node_name(n)); + error("Invalid subscribe topic: '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); ret = mosquitto_pub_topic_check(m->publish); if (ret != MOSQ_ERR_SUCCESS) - error("Invalid publish topic: '%s' for node %s", m->publish, node_name(n)); + error("Invalid publish topic: '%s' for node %s: %s", m->publish, node_name(n), mosquitto_strerror(ret)); return 0; } @@ -283,17 +283,17 @@ int mqtt_start(struct node *n) if (m->username && m->password) { ret = mosquitto_username_pw_set(m->client, m->username, m->password); if (ret) - return ret; + goto mosquitto_error; } if (m->ssl.enabled) { ret = mosquitto_tls_set(m->client, m->ssl.cafile, m->ssl.capath, m->ssl.certfile, m->ssl.keyfile, NULL); if (ret) - return ret; + goto mosquitto_error; ret = mosquitto_tls_insecure_set(m->client, m->ssl.insecure); if (ret) - return ret; + goto mosquitto_error; } mosquitto_log_callback_set(m->client, mqtt_log_cb); @@ -320,13 +320,18 @@ int mqtt_start(struct node *n) ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); if (ret) - return ret; + goto mosquitto_error; ret = mosquitto_loop_start(m->client); if (ret) - return ret; + goto mosquitto_error; return 0; + +mosquitto_error: + warning("MQTT: %s", mosquitto_strerror(ret)); + + return ret; } int mqtt_stop(struct node *n) @@ -336,17 +341,22 @@ int mqtt_stop(struct node *n) ret = mosquitto_disconnect(m->client); if (ret) - return ret; + goto mosquitto_error; ret = mosquitto_loop_stop(m->client, 0); if (ret) - return ret; + goto mosquitto_error; ret = io_destroy(&m->io); if (ret) return ret; return 0; + +mosquitto_error: + warning("MQTT: %s", mosquitto_strerror(ret)); + + return ret; } int mqtt_type_start(struct super_node *sn) @@ -355,9 +365,14 @@ int mqtt_type_start(struct super_node *sn) ret = mosquitto_lib_init(); if (ret) - return ret; + goto mosquitto_error; return 0; + +mosquitto_error: + warning("MQTT: %s", mosquitto_strerror(ret)); + + return ret; } int mqtt_type_stop() @@ -366,9 +381,14 @@ int mqtt_type_stop() ret = mosquitto_lib_cleanup(); if (ret) - return ret; + goto mosquitto_error; return 0; + +mosquitto_error: + warning("MQTT: %s", mosquitto_strerror(ret)); + + return ret; } int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) @@ -399,8 +419,7 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re return ret; if (m->publish) { - ret = mosquitto_publish(m->client, NULL /* mid */, m->publish, wbytes, data, m->qos, - m->retain); + ret = mosquitto_publish(m->client, NULL /* mid */, m->publish, wbytes, data, m->qos, m->retain); if (ret != MOSQ_ERR_SUCCESS) { warning("MQTT: publish failed for node %s: %s", node_name(n), mosquitto_strerror(ret)); return -abs(ret); diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 55cdf9565..2bc368320 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -58,6 +58,7 @@ static struct plugin p; static int rtp_set_rate(struct node *n, double rate) { struct rtp *r = (struct rtp *) n->_vd; + int ratio; switch (r->rtcp.throttle_mode) { case RTCP_THROTTLE_HOOK_LIMIT_RATE: @@ -65,7 +66,10 @@ static int rtp_set_rate(struct node *n, double rate) break; case RTCP_THROTTLE_HOOK_DECIMATE: - decimate_set_ratio(r->rtcp.throttle_hook, r->rate / rate); + ratio = r->rate / rate; + if (ratio == 0) + ratio = 1; + decimate_set_ratio(r->rtcp.throttle_hook, ratio); break; case RTCP_THROTTLE_DISABLED: @@ -75,7 +79,7 @@ static int rtp_set_rate(struct node *n, double rate) return -1; } - debug(5, "Set rate limiting for node %s to %f", node_name(n), rate); + info("Set rate limiting for node %s to %f", node_name(n), rate); return 0; } @@ -87,7 +91,10 @@ static int rtp_aimd(struct node *n, double loss_frac) int ret; double rate; - if (loss_frac < 1e-3) + if (!r->rtcp.enabled) + return -1; + + if (loss_frac < 0.01) rate = r->aimd.last_rate + r->aimd.a; else rate = r->aimd.last_rate * r->aimd.b; @@ -98,7 +105,10 @@ static int rtp_aimd(struct node *n, double loss_frac) if (ret) return ret; - fprintf(r->aimd.log, "%d\t%f\t%f\n", r->rtcp.num_rrs, loss_frac, rate); + if (r->aimd.log) + fprintf(r->aimd.log, "%d\t%f\t%f\n", r->rtcp.num_rrs, loss_frac, rate); + + info("AIMD: %d\t%f\t%f", r->rtcp.num_rrs, loss_frac, rate); return 0; } @@ -112,7 +122,8 @@ int rtp_init(struct node *n) r->aimd.a = 10; r->aimd.b = 0.5; - r->aimd.last_rate = 100; + r->aimd.last_rate = 2000; + r->aimd.log = NULL; r->rtcp.enabled = false; r->rtcp.throttle_mode = RTCP_THROTTLE_DISABLED; @@ -163,7 +174,7 @@ int rtp_parse(struct node *n, json_t *cfg) /* AIMD */ if (json_aimd) { - ret = json_unpack_ex(json_rtcp, &err, 0, "{ s?: F, s?: F, s?: F }", + ret = json_unpack_ex(json_aimd, &err, 0, "{ s?: F, s?: F, s?: F }", "a", &r->aimd.a, "b", &r->aimd.b, "start_rate", &r->aimd.last_rate @@ -207,7 +218,7 @@ int rtp_parse(struct node *n, json_t *cfg) /* Format */ r->format = format_type_lookup(format); - if(!r->format) + if (!r->format) error("Invalid format '%s' for node %s", format, node_name(n)); /* Remote address */ @@ -280,6 +291,9 @@ char * rtp_print(struct node *n) } strcatf(&buf, ", rtcp.mode=%s, rtcp.throttle_mode=%s", mode, throttle_mode); + + if (r->rtcp.mode == RTCP_MODE_AIMD) + strcatf(&buf, ", aimd.a=%f, aimd.b=%f, aimd.start_rate=%f", r->aimd.a, r->aimd.b, r->aimd.last_rate); } free(local); @@ -315,16 +329,16 @@ static void rtcp_handler(const struct sa *src, struct rtcp_msg *msg, void *arg) /* source not used */ (void) src; - debug(5, "rtcp: recv %s", rtcp_type_name(msg->hdr.pt)); + debug(5, "RTCP: recv %s", rtcp_type_name(msg->hdr.pt)); if (msg->hdr.pt == RTCP_SR) { - if(msg->hdr.count > 0) { + if (msg->hdr.count > 0) { const struct rtcp_rr *rr = &msg->r.sr.rrv[0]; - debug(5, "rtp: fraction lost = %d", rr->fraction); - rtp_aimd(n, rr->fraction); + debug(5, "RTP: fraction lost = %d", rr->fraction); + rtp_aimd(n, (double) rr->fraction / 256); } else - warning("Received RTCP sender report with zero reception reports"); + debug(5, "RTCP: Received sender report with zero reception reports"); } r->rtcp.num_rrs++; @@ -469,14 +483,14 @@ int rtp_type_start(struct super_node *sn) /* Initialize library */ ret = libre_init(); if (ret) { - error("Error initializing libre"); + warning("Error initializing libre"); return ret; } /* Add worker thread */ ret = pthread_create(&re_pthread, NULL, th_func, NULL); if (ret) { - error("Error creating rtp node type pthread"); + warning("Error creating rtp node type pthread"); return ret; } diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 2588db0c9..260a46ab0 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -136,8 +136,11 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re if (recv < 0) { /* This can only really mean that the other process has exited, so close * the interface to make sure the shared memory object is unlinked */ - shmem_int_close(&shm->intf); - warning("Shared memory segment has been closed for node: %s", node_name(n)); + + info("Shared memory segment has been closed."); + + n->state = STATE_STOPPING; + return recv; } diff --git a/lib/nodes/signal_generator.c b/lib/nodes/signal_generator.c index 4fdff2263..3c0d7ee77 100644 --- a/lib/nodes/signal_generator.c +++ b/lib/nodes/signal_generator.c @@ -264,9 +264,11 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, u } if (s->limit > 0 && s->counter >= s->limit) { - info("Reached limit of node %s", node_name(n)); - killme(SIGTERM); - return 0; + info("Reached limit."); + + n->state = STATE_STOPPING; + + return -1; } s->counter += steps; diff --git a/lib/nodes/stats.c b/lib/nodes/stats.c index c3135bcbc..540c33abe 100644 --- a/lib/nodes/stats.c +++ b/lib/nodes/stats.c @@ -38,55 +38,57 @@ static struct vlist *nodes; /** The global list of nodes */ -static void stats_init_signals(struct node *n) +int stats_node_signal_destroy(struct stats_node_signal *s) { - struct stats_desc *desc; - struct signal *sig; + free(s->node_str); - for (int i = 0; i < STATS_COUNT; i++) { - desc = &stats_metrics[i]; + return 0; +} - /* Total */ - sig = alloc(sizeof(struct signal)); - sig->name = strf("%s.%s", desc->name, "total"); - sig->type = SIGNAL_TYPE_INTEGER; - vlist_push(&n->in.signals, sig); +int stats_node_signal_parse(struct stats_node_signal *s, json_t *cfg) +{ + json_error_t err; - /* Last */ - sig = alloc(sizeof(struct signal)); - sig->name = strf("%s.%s", desc->name, "last"); - sig->unit = strdup(desc->unit); - sig->type = SIGNAL_TYPE_FLOAT; - vlist_push(&n->in.signals, sig); + int ret; + const char *stats; + char *metric, *type, *node, *cpy; - /* Highest */ - sig = alloc(sizeof(struct signal)); - sig->name = strf("%s.%s", desc->name, "highest"); - sig->unit = strdup(desc->unit); - sig->type = SIGNAL_TYPE_FLOAT; - vlist_push(&n->in.signals, sig); + ret = json_unpack_ex(cfg, &err, 0, "{ s: s }", + "stats", &stats + ); + if (ret) + return -1; - /* Lowest */ - sig = alloc(sizeof(struct signal)); - sig->name = strf("%s.%s", desc->name, "lowest"); - sig->unit = strdup(desc->unit); - sig->type = SIGNAL_TYPE_FLOAT; - vlist_push(&n->in.signals, sig); + cpy = strdup(stats); - /* Mean */ - sig = alloc(sizeof(struct signal)); - sig->name = strf("%s.%s", desc->name, "mean"); - sig->unit = strdup(desc->unit); - sig->type = SIGNAL_TYPE_FLOAT; - vlist_push(&n->in.signals, sig); + node = strtok(cpy, "."); + if (!node) + goto invalid_format; - /* Variance */ - sig = alloc(sizeof(struct signal)); - sig->name = strf("%s.%s", desc->name, "var"); - sig->unit = strf("%s^2", desc->unit); // variance has squared unit of variable - sig->type = SIGNAL_TYPE_FLOAT; - vlist_push(&n->in.signals, sig); - } + metric = strtok(NULL, "."); + if (!metric) + goto invalid_format; + + type = strtok(NULL, "."); + if (!type) + goto invalid_format; + + s->metric = stats_lookup_metric(metric); + if (s->metric < 0) + goto invalid_format; + + s->type = stats_lookup_type(type); + if (s->type < 0) + goto invalid_format; + + s->node_str = strdup(node); + + free(cpy); + return 0; + +invalid_format: + free(cpy); + return -1; } int stats_node_type_start(struct super_node *sn) @@ -105,9 +107,13 @@ int stats_node_start(struct node *n) if (ret) serror("Failed to create task"); - s->node = vlist_lookup(nodes, s->node_str); - if (!s->node) - error("Invalid reference node %s for setting 'node' of node %s", s->node_str, node_name(n)); + for (size_t i = 0; i < vlist_length(&s->signals); i++) { + struct stats_node_signal *stats_sig = (struct stats_node_signal *) vlist_at(&s->signals, i); + + stats_sig->node = vlist_lookup(nodes, stats_sig->node_str); + if (!stats_sig->node) + error("Invalid reference node %s for setting 'node' of node %s", stats_sig->node_str, node_name(n)); + } return 0; } @@ -128,7 +134,31 @@ char * stats_node_print(struct node *n) { struct stats_node *s = (struct stats_node *) n->_vd; - return strf("node=%s, rate=%f", s->node_str, s->rate); + return strf("rate=%f", s->rate); +} + +int stats_node_init(struct node *n) +{ + int ret; + struct stats_node *s = (struct stats_node *) n->_vd; + + ret = vlist_init(&s->signals); + if (ret) + return ret; + + return 0; +} + +int stats_node_destroy(struct node *n) +{ + int ret; + struct stats_node *s = (struct stats_node *) n->_vd; + + ret = vlist_destroy(&s->signals, (dtor_cb_t) stats_node_signal_destroy, true); + if (ret) + return ret; + + return 0; } int stats_node_parse(struct node *n, json_t *cfg) @@ -136,13 +166,14 @@ int stats_node_parse(struct node *n, json_t *cfg) struct stats_node *s = (struct stats_node *) n->_vd; int ret; + size_t i; json_error_t err; + json_t *json_signals, *json_signal; - const char *node; - - ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s: F }", - "node", &node, - "rate", &s->rate + ret = json_unpack_ex(cfg, &err, 0, "{ s: F, s: { s: o } }", + "rate", &s->rate, + "in", + "signals", &json_signals ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); @@ -150,50 +181,68 @@ int stats_node_parse(struct node *n, json_t *cfg) if (s->rate <= 0) error("Setting 'rate' of node %s must be positive", node_name(n)); - s->node_str = strdup(node); + if (!json_is_array(json_signals)) + error("Setting 'in.signals' of node %s must be an array", node_name(n)); - stats_init_signals(n); + json_array_foreach(json_signals, i, json_signal) { + struct signal *sig = (struct signal *) vlist_at(&n->in.signals, i); + struct stats_node_signal *stats_sig; - return 0; -} + stats_sig = alloc(sizeof(struct stats_node_signal)); + if (!stats_sig) + return -1; -int stats_node_destroy(struct node *n) -{ - struct stats_node *s = (struct stats_node *) n->_vd; + ret = stats_node_signal_parse(stats_sig, json_signal); + if (ret) + error("Failed to parse signal definition of node %s", node_name(n)); - if (s->node_str) - free(s->node_str); + if (!sig->name) { + const char *metric = stats_metrics[stats_sig->metric].name; + const char *type = stats_types[stats_sig->type].name; + + sig->name = strf("%s.%s.%s", stats_sig->node_str, metric, type); + } + + if (!sig->unit) + sig->unit = strdup(stats_metrics[stats_sig->metric].unit); + + if (sig->type == SIGNAL_TYPE_AUTO) + sig->type = stats_types[stats_sig->type].signal_type; + else if (sig->type != stats_types[stats_sig->type].signal_type) + error("Invalid type for signal %zu in node %s", i, node_name(n)); + + vlist_push(&s->signals, stats_sig); + } return 0; } int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { - struct stats_node *sn = (struct stats_node *) n->_vd; - struct stats *s = sn->node->stats; + struct stats_node *s = (struct stats_node *) n->_vd; if (!cnt) return 0; - if (!sn->node->stats) - return 0; + task_wait(&s->task); - task_wait(&sn->task); + int len = MIN(vlist_length(&s->signals), smps[0]->capacity); - smps[0]->length = MIN(STATS_COUNT * 6, smps[0]->capacity); - smps[0]->flags = SAMPLE_HAS_DATA; + for (size_t i = 0; i < len; i++) { + struct stats *st; + struct stats_node_signal *sig = (struct stats_node_signal *) vlist_at(&s->signals, i); - for (int i = 0; i < 6 && (i+1)*STATS_METRICS <= smps[0]->length; i++) { - int tot = hist_total(&s->histograms[i]); + st = sig->node->stats; + if (!st) + return -1; - smps[0]->data[i*STATS_METRICS+0].f = tot ? hist_total(&s->histograms[i]) : 0; - smps[0]->data[i*STATS_METRICS+1].f = tot ? hist_last(&s->histograms[i]) : 0; - smps[0]->data[i*STATS_METRICS+2].f = tot ? hist_highest(&s->histograms[i]) : 0; - smps[0]->data[i*STATS_METRICS+3].f = tot ? hist_lowest(&s->histograms[i]) : 0; - smps[0]->data[i*STATS_METRICS+4].f = tot ? hist_mean(&s->histograms[i]) : 0; - smps[0]->data[i*STATS_METRICS+5].f = tot ? hist_var(&s->histograms[i]) : 0; + smps[0]->data[i] = stats_get_value(st, sig->metric, sig->type); } + smps[0]->length = len; + smps[0]->flags = SAMPLE_HAS_DATA; + smps[0]->signals = &n->in.signals; + return 1; } @@ -212,10 +261,11 @@ static struct plugin p = { .type = PLUGIN_TYPE_NODE, .node = { .vectorize = 1, - .flags = NODE_TYPE_PROVIDES_SIGNALS, + .flags = 0, .size = sizeof(struct stats_node), .type.start = stats_node_type_start, .parse = stats_node_parse, + .init = stats_node_init, .destroy = stats_node_destroy, .print = stats_node_print, .start = stats_node_start, diff --git a/lib/nodes/test_rtt.c b/lib/nodes/test_rtt.c index 6cb89f99e..7fceb5c0d 100644 --- a/lib/nodes/test_rtt.c +++ b/lib/nodes/test_rtt.c @@ -307,9 +307,11 @@ int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned } if (t->current >= vlist_length(&t->cases)) { - info("This was the last case. Terminating."); - killme(SIGTERM); - pause(); + info("This was the last case. Stopping node %s", node_name(n)); + + n->state = STATE_STOPPING; + + return -1; } else { struct test_rtt_case *c = (struct test_rtt_case *) vlist_at(&t->cases, t->current); diff --git a/lib/path.c b/lib/path.c index aaa90ee72..97528c938 100644 --- a/lib/path.c +++ b/lib/path.c @@ -73,9 +73,9 @@ static int path_source_destroy(struct path_source *ps) return 0; } -static void path_source_read(struct path_source *ps, struct path *p, int i) +static int path_source_read(struct path_source *ps, struct path *p, int i) { - int recv, tomux, allocated, cnt; + int recv, tomux, allocated, cnt, toenqueue, enqueued = 0; unsigned release; cnt = ps->node->in.vectorize; @@ -93,10 +93,20 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) release = allocated; recv = node_read(ps->node, read_smps, allocated, &release); - if (recv == 0) + if (recv == 0) { + enqueued = 0; goto out2; - else if (recv < 0) - error("Failed to read samples from node %s", node_name(ps->node)); + } + else if (recv < 0) { + if (ps->node->state == STATE_STOPPING) { + p->state = STATE_STOPPING; + + enqueued = -1; + goto out2; + } + else + error("Failed to read samples from node %s", node_name(ps->node)); + } else if (recv < allocated) warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated); @@ -134,30 +144,33 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received)); #ifdef WITH_HOOKS - int toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux); + toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux); if (toenqueue != tomux) { int skipped = tomux - toenqueue; debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p)); } #else - int toenqueue = tomux; + toenqueue = tomux; #endif if (bitset_test(&p->mask, i)) { /* Check if we received an update from all nodes/ */ if ((p->mode == PATH_MODE_ANY) || - (p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) - { + (p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) { path_destination_enqueue(p, muxed_smps, toenqueue); /* Reset bitset of updated nodes */ bitset_clear_all(&p->received); + + enqueued = toenqueue; } } sample_decref_many(muxed_smps, tomux); out2: sample_decref_many(read_smps, release); + + return enqueued; } static int path_destination_init(struct path_destination *pd, int queuelen) @@ -244,21 +257,22 @@ static void path_destination_write(struct path_destination *pd, struct path *p) static void * path_run_single(void *arg) { + int ret; struct path *p = arg; struct path_source *ps = (struct path_source *) vlist_at(&p->sources, 0); - debug(1, "Started path %s in single mode", path_name(p)); + while (p->state == STATE_STARTED) { + pthread_testcancel(); - for (;;) { - path_source_read(ps, p, 0); + ret = path_source_read(ps, p, 0); + if (ret <= 0) + continue; for (size_t i = 0; i < vlist_length(&p->destinations); i++) { struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i); path_destination_write(pd, p); } - - pthread_testcancel(); } return NULL; @@ -270,9 +284,7 @@ static void * path_run_poll(void *arg) int ret; struct path *p = arg; - debug(1, "Started path %s in polling mode", path_name(p)); - - for (;;) { + while (p->state == STATE_STARTED) { ret = poll(p->reader.pfds, p->reader.nfds, -1); if (ret < 0) serror("Failed to poll"); @@ -292,9 +304,8 @@ static void * path_run_poll(void *arg) path_destination_enqueue(p, &p->last_sample, 1); } /* A source is ready to receive samples */ - else { + else path_source_read(ps, p, i); - } } } @@ -521,7 +532,6 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) "rate", &p->rate, "mask", &json_mask, "original_sequence_no", &p->original_sequence_no - ); if (ret) jerror(&err, "Failed to parse path configuration"); @@ -648,14 +658,8 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) p->poll = 1; else if (vlist_length(&p->sources) > 1) p->poll = 1; - else { - struct path_source *ps = (struct path_source *) vlist_at(&p->sources, 0); - - int fds[16]; - int num_fds = node_poll_fds(ps->node, fds); - - p->poll = num_fds > 0; - } + else + p->poll = 0; } ret = vlist_destroy(&sources, NULL, false); @@ -806,11 +810,15 @@ int path_stop(struct path *p) { int ret; - if (p->state != STATE_STARTED) + if (p->state != STATE_STARTED && p->state != STATE_STOPPING) return 0; info("Stopping path: %s", path_name(p)); + if (p->state != STATE_STOPPING) + p->state = STATE_STOPPING; + + /* Cancel the thread in case is currently in a blocking syscall */ ret = pthread_cancel(p->tid); if (ret) return ret; @@ -908,49 +916,22 @@ int path_uses_node(struct path *p, struct node *n) return -1; } -int path_reverse(struct path *p, struct path *r) +int path_is_simple(struct path *p) { - if (vlist_length(&p->destinations) != 1 || vlist_length(&p->sources) != 1) - return -1; + int ret; + const char *in = NULL, *out = NULL; - /* General */ - r->enabled = p->enabled; + ret = json_unpack(p->cfg, "{ s: s, s: s }", "in", &in, "out", &out); + if (ret) + return ret; - /* Source / Destinations */ - struct path_destination *orig_pd = vlist_first(&p->destinations); - struct path_source *orig_ps = vlist_first(&p->sources); + ret = node_is_valid_name(in); + if (ret) + return ret; - struct path_destination *new_pd = (struct path_destination *) alloc(sizeof(struct path_destination)); - struct path_source *new_ps = (struct path_source *) alloc(sizeof(struct path_source)); - struct mapping_entry *new_me = alloc(sizeof(struct mapping_entry)); - new_pd->node = orig_ps->node; - new_ps->node = orig_pd->node; - new_ps->masked = true; + ret = node_is_valid_name(out); + if (ret) + return ret; - new_me->node = new_ps->node; - new_me->type = MAPPING_TYPE_DATA; - new_me->data.offset = 0; - new_me->length = 0; - - vlist_init(&new_ps->mappings); - vlist_push(&new_ps->mappings, new_me); - - vlist_push(&r->destinations, new_pd); - vlist_push(&r->sources, new_ps); - -#ifdef WITH_HOOKS - for (size_t i = 0; i < vlist_length(&p->hooks); i++) { - int ret; - - struct hook *h = (struct hook *) vlist_at(&p->hooks, i); - struct hook *g = (struct hook *) alloc(sizeof(struct hook)); - - ret = hook_init(g, h->_vt, r, NULL); - if (ret) - return ret; - - vlist_push(&r->hooks, g); - } -#endif /* WITH_HOOKS */ return 0; } diff --git a/lib/signal.c b/lib/signal.c index 31b9258a9..4cd6e89d4 100644 --- a/lib/signal.c +++ b/lib/signal.c @@ -55,20 +55,7 @@ int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, u switch (me->type) { case MAPPING_TYPE_STATS: - switch (me->stats.type) { - case MAPPING_STATS_TYPE_TOTAL: - s->type = SIGNAL_TYPE_INTEGER; - break; - - case MAPPING_STATS_TYPE_LAST: - case MAPPING_STATS_TYPE_LOWEST: - case MAPPING_STATS_TYPE_HIGHEST: - case MAPPING_STATS_TYPE_MEAN: - case MAPPING_STATS_TYPE_VAR: - case MAPPING_STATS_TYPE_STDDEV: - s->type = SIGNAL_TYPE_FLOAT; - break; - } + s->type = stats_types[me->stats.type].signal_type; break; case MAPPING_TYPE_HEADER: diff --git a/lib/stats.c b/lib/stats.c index fb1859d97..b48d429b9 100644 --- a/lib/stats.c +++ b/lib/stats.c @@ -31,12 +31,22 @@ #include #include -struct stats_desc stats_metrics[] = { - { "skipped", "samples", "Skipped samples and the distance between them", 25 }, - { "reordered", "samples", "Reordered samples and the distance between them", 25 }, - { "gap_sent", "seconds", "Inter-message timestamps (as sent by remote)", 25 }, - { "gap_received", "seconds", "Inter-message arrival time (as received by this instance)", 25 }, - { "owd", "seconds", "One-way-delay (OWD) of received messages", 25 } +struct stats_metric_description stats_metrics[] = { + { "skipped", STATS_METRIC_SKIPPED, "samples", "Skipped samples and the distance between them", 25 }, + { "reordered", STATS_METRIC_REORDERED, "samples", "Reordered samples and the distance between them", 25 }, + { "gap_sent", STATS_METRIC_GAP_SAMPLE, "seconds", "Inter-message timestamps (as sent by remote)", 25 }, + { "gap_received", STATS_METRIC_GAP_RECEIVED, "seconds", "Inter-message arrival time (as received by this instance)", 25 }, + { "owd", STATS_METRIC_OWD, "seconds", "One-way-delay (OWD) of received messages", 25 } +}; + +struct stats_type_description stats_types[] = { + { "last", STATS_TYPE_LAST, SIGNAL_TYPE_FLOAT }, + { "highest", STATS_TYPE_HIGHEST, SIGNAL_TYPE_FLOAT }, + { "lowest", STATS_TYPE_LOWEST, SIGNAL_TYPE_FLOAT }, + { "mean", STATS_TYPE_MEAN, SIGNAL_TYPE_FLOAT }, + { "var", STATS_TYPE_VAR, SIGNAL_TYPE_FLOAT }, + { "stddev", STATS_TYPE_STDDEV, SIGNAL_TYPE_FLOAT }, + { "total", STATS_TYPE_TOTAL, SIGNAL_TYPE_INTEGER } }; int stats_lookup_format(const char *str) @@ -51,9 +61,33 @@ int stats_lookup_format(const char *str) return -1; } +enum stats_metric stats_lookup_metric(const char *str) +{ + for (int i = 0; i < STATS_METRIC_COUNT; i++) { + struct stats_metric_description *d = &stats_metrics[i]; + + if (!strcmp(str, d->name)) + return d->metric; + } + + return STATS_METRIC_INVALID; +} + +enum stats_type stats_lookup_type(const char *str) +{ + for (int i = 0; i < STATS_TYPE_COUNT; i++) { + struct stats_type_description *d = &stats_types[i]; + + if (!strcmp(str, d->name)) + return d->type; + } + + return STATS_TYPE_INVALID; +} + int stats_init(struct stats *s, int buckets, int warmup) { - for (int i = 0; i < STATS_COUNT; i++) + for (int i = 0; i < STATS_METRIC_COUNT; i++) hist_init(&s->histograms[i], buckets, warmup); s->delta = alloc(sizeof(struct stats_delta)); @@ -63,7 +97,7 @@ int stats_init(struct stats *s, int buckets, int warmup) int stats_destroy(struct stats *s) { - for (int i = 0; i < STATS_COUNT; i++) + for (int i = 0; i < STATS_METRIC_COUNT; i++) hist_destroy(&s->histograms[i]); free(s->delta); @@ -71,7 +105,7 @@ int stats_destroy(struct stats *s) return 0; } -void stats_update(struct stats *s, enum stats_id id, double val) +void stats_update(struct stats *s, enum stats_metric id, double val) { s->delta->values[id] = val; s->delta->update |= 1 << id; @@ -79,7 +113,7 @@ void stats_update(struct stats *s, enum stats_id id, double val) int stats_commit(struct stats *s) { - for (int i = 0; i < STATS_COUNT; i++) { + for (int i = 0; i < STATS_METRIC_COUNT; i++) { if (s->delta->update & 1 << i) { hist_put(&s->histograms[i], s->delta->values[i]); s->delta->update &= ~(1 << i); @@ -93,8 +127,8 @@ json_t * stats_json(struct stats *s) { json_t *obj = json_object(); - for (int i = 0; i < STATS_COUNT; i++) { - struct stats_desc *d = &stats_metrics[i]; + for (int i = 0; i < STATS_METRIC_COUNT; i++) { + struct stats_metric_description *d = &stats_metrics[i]; struct hist *h = &s->histograms[i]; json_object_set_new(obj, d->name, hist_json(h)); @@ -107,17 +141,17 @@ json_t * stats_json_periodic(struct stats *s, struct node *n) { return json_pack("{ s: s, s: i, s: f, s: f, s: i, s: i }", "node", node_name(n), - "processed", hist_total(&s->histograms[STATS_OWD]), - "owd", hist_last(&s->histograms[STATS_OWD]), - "rate", 1.0 / hist_last(&s->histograms[STATS_GAP_SAMPLE]), - "dropped", hist_total(&s->histograms[STATS_REORDERED]), - "skipped", hist_total(&s->histograms[STATS_SKIPPED]) + "processed", hist_total(&s->histograms[STATS_METRIC_OWD]), + "owd", hist_last(&s->histograms[STATS_METRIC_OWD]), + "rate", 1.0 / hist_last(&s->histograms[STATS_METRIC_GAP_SAMPLE]), + "dropped", hist_total(&s->histograms[STATS_METRIC_REORDERED]), + "skipped", hist_total(&s->histograms[STATS_METRIC_SKIPPED]) ); } void stats_reset(struct stats *s) { - for (int i = 0; i < STATS_COUNT; i++) + for (int i = 0; i < STATS_METRIC_COUNT; i++) hist_reset(&s->histograms[i]); } @@ -155,13 +189,13 @@ void stats_print_periodic(struct stats *s, FILE *f, enum stats_format fmt, int v case STATS_FORMAT_HUMAN: table_row(&stats_table, node_name_short(n), - hist_total(&s->histograms[STATS_OWD]), - hist_last(&s->histograms[STATS_OWD]), - hist_mean(&s->histograms[STATS_OWD]), - 1.0 / hist_last(&s->histograms[STATS_GAP_RECEIVED]), - 1.0 / hist_mean(&s->histograms[STATS_GAP_RECEIVED]), - hist_total(&s->histograms[STATS_REORDERED]), - hist_total(&s->histograms[STATS_SKIPPED]) + hist_total(&s->histograms[STATS_METRIC_OWD]), + hist_last(&s->histograms[STATS_METRIC_OWD]), + hist_mean(&s->histograms[STATS_METRIC_OWD]), + 1.0 / hist_last(&s->histograms[STATS_METRIC_GAP_RECEIVED]), + 1.0 / hist_mean(&s->histograms[STATS_METRIC_GAP_RECEIVED]), + hist_total(&s->histograms[STATS_METRIC_REORDERED]), + hist_total(&s->histograms[STATS_METRIC_SKIPPED]) ); break; @@ -179,10 +213,10 @@ void stats_print(struct stats *s, FILE *f, enum stats_format fmt, int verbose) { switch (fmt) { case STATS_FORMAT_HUMAN: - for (int i = 0; i < STATS_COUNT; i++) { - struct stats_desc *desc = &stats_metrics[i]; + for (int i = 0; i < STATS_METRIC_COUNT; i++) { + struct stats_metric_description *d = &stats_metrics[i]; - info("%s: %s", desc->name, desc->desc); + info("%s: %s", d->name, d->desc); hist_print(&s->histograms[i], verbose); } break; @@ -198,14 +232,44 @@ void stats_print(struct stats *s, FILE *f, enum stats_format fmt, int verbose) } } -enum stats_id stats_lookup_id(const char *name) +union signal_data stats_get_value(const struct stats *s, enum stats_metric sm, enum stats_type st) { - for (int i = 0; i < STATS_COUNT; i++) { - struct stats_desc *desc = &stats_metrics[i]; + const struct hist *h = &s->histograms[sm]; - if (!strcmp(desc->name, name)) - return i; + union signal_data d; + + switch (st) { + case STATS_TYPE_TOTAL: + d.i = h->total; + break; + + case STATS_TYPE_LAST: + d.f = h->last; + break; + + case STATS_TYPE_HIGHEST: + d.f = h->highest; + break; + + case STATS_TYPE_LOWEST: + d.f = h->lowest; + break; + + case STATS_TYPE_MEAN: + d.f = hist_mean(h); + break; + + case STATS_TYPE_STDDEV: + d.f = hist_stddev(h); + break; + + case STATS_TYPE_VAR: + d.f = hist_var(h); + break; + + default: + d.f = -1; } - return -1; + return d; } diff --git a/lib/super_node.cpp b/lib/super_node.cpp index 011ce1f2c..180636f27 100644 --- a/lib/super_node.cpp +++ b/lib/super_node.cpp @@ -49,10 +49,10 @@ using namespace villas::node; SuperNode::SuperNode() : state(STATE_INITIALIZED), + idleStop(false), priority(0), affinity(0), hugepages(DEFAULT_NR_HUGEPAGES), - stats(0), #ifdef WITH_API api(this), #ifdef WITH_WEB @@ -179,7 +179,9 @@ int SuperNode::parseJson(json_t *j) json_error_t err; - ret = json_unpack_ex(j, &err, 0, "{ s?: o, s?: o, s?: o, s?: o, s?: i, s?: i, s?: i, s?: F, s?: s }", + idleStop = true; + + ret = json_unpack_ex(j, &err, 0, "{ s?: o, s?: o, s?: o, s?: o, s?: i, s?: i, s?: i, s?: s, s?: b }", "http", &json_web, "logging", &json_logging, "nodes", &json_nodes, @@ -187,8 +189,8 @@ int SuperNode::parseJson(json_t *j) "hugepages", &hugepages, "affinity", &affinity, "priority", &priority, - "stats", &stats, - "name", &nme + "name", &nme, + "idle_stop", &idleStop ); if (ret) throw JsonError(err, "Failed to parse global configuration"); @@ -215,6 +217,10 @@ int SuperNode::parseJson(json_t *j) struct node_type *nt; const char *type; + ret = node_is_valid_name(name); + if (ret) + throw RuntimeError("Invalid name for node: {}", name); + ret = json_unpack_ex(json_node, &err, 0, "{ s: s }", "type", &type); if (ret) throw JsonError(err, "Failed to parse node"); @@ -245,7 +251,7 @@ int SuperNode::parseJson(json_t *j) size_t i; json_t *json_path; json_array_foreach(json_paths, i, json_path) { - path *p = (path *) alloc(sizeof(path)); +parse: path *p = (path *) alloc(sizeof(path)); ret = path_init(p); if (ret) @@ -258,17 +264,25 @@ int SuperNode::parseJson(json_t *j) vlist_push(&paths, p); if (p->reverse) { - path *r = (path *) alloc(sizeof(path)); - - ret = path_init(r); + /* Only simple paths can be reversed */ + ret = path_is_simple(p); if (ret) - throw RuntimeError("Failed to init path"); + throw RuntimeError("Complex paths can not be reversed!"); - ret = path_reverse(p, r); - if (ret) - throw RuntimeError("Failed to reverse path {}", path_name(p)); + /* Parse a second time with in/out reversed */ + json_path = json_copy(json_path); - vlist_push(&paths, r); + json_t *json_in = json_object_get(json_path, "in"); + json_t *json_out = json_object_get(json_path, "out"); + + if (json_equal(json_in, json_out)) + throw RuntimeError("Can not reverse path with identical in/out nodes!"); + + json_object_set(json_path, "reverse", json_false()); + json_object_set(json_path, "in", json_out); + json_object_set(json_path, "out", json_in); + + goto parse; } } } @@ -380,9 +394,14 @@ void SuperNode::startPaths() void SuperNode::start() { + int ret; + assert(state == STATE_CHECKED); - memory_init(hugepages); + ret = memory_init(hugepages); + if (ret) + throw RuntimeError("Failed to initialize memory system"); + kernel::rt::init(priority, affinity); #ifdef WITH_API @@ -398,17 +417,11 @@ void SuperNode::start() startNodes(); startPaths(); -#ifdef WITH_HOOKS - int ret; + ret = task_init(&task, 1.0, CLOCK_REALTIME); + if (ret) + throw RuntimeError("Failed to create timer"); - if (stats > 0) { - stats_print_header(STATS_FORMAT_HUMAN); - - ret = task_init(&task, 1.0 / stats, CLOCK_REALTIME); - if (ret) - throw RuntimeError("Failed to create stats timer"); - } -#endif /* WITH_HOOKS */ + stats_print_header(STATS_FORMAT_HUMAN); state = STATE_STARTED; } @@ -471,15 +484,11 @@ void SuperNode::stopInterfaces() void SuperNode::stop() { - -#ifdef WITH_HOOKS int ret; - if (stats > 0) { - ret = task_destroy(&task); - if (ret) - throw RuntimeError("Failed to stop stats timer"); - } -#endif /* WITH_HOOKS */ + + ret = task_destroy(&task); + if (ret) + throw RuntimeError("Failed to stop timer"); stopPaths(); stopNodes(); @@ -498,12 +507,15 @@ void SuperNode::stop() void SuperNode::run() { -#ifdef WITH_HOOKS - task_wait(&task); - periodic(); -#else - pause(); -#endif /* WITH_HOOKS */ + int ret; + + while (state == STATE_STARTED) { + task_wait(&task); + + ret = periodic(); + if (ret) + state = STATE_STOPPING; + } } SuperNode::~SuperNode() @@ -520,21 +532,25 @@ SuperNode::~SuperNode() int SuperNode::periodic() { -#ifdef WITH_HOOKS int ret; + int started = 0; + for (size_t i = 0; i < vlist_length(&paths); i++) { auto *p = (struct path *) vlist_at(&paths, i); - if (p->state != STATE_STARTED) - continue; + if (p->state == STATE_STARTED) { + started++; - for (size_t j = 0; j < vlist_length(&p->hooks); j++) { - hook *h = (struct hook *) vlist_at(&p->hooks, j); +#ifdef WITH_HOOKS + for (size_t j = 0; j < vlist_length(&p->hooks); j++) { + hook *h = (struct hook *) vlist_at(&p->hooks, j); - ret = hook_periodic(h); - if (ret) - return ret; + ret = hook_periodic(h); + if (ret) + return ret; + } +#endif /* WITH_HOOKS */ } } @@ -544,6 +560,7 @@ int SuperNode::periodic() if (n->state != STATE_STARTED) continue; +#ifdef WITH_HOOKS for (size_t j = 0; j < vlist_length(&n->in.hooks); j++) { auto *h = (struct hook *) vlist_at(&n->in.hooks, j); @@ -559,8 +576,15 @@ int SuperNode::periodic() if (ret) return ret; } +#endif /* WITH_HOOKS */ } -#endif + + if (idleStop && state == STATE_STARTED && started == 0) { + info("No more active paths. Stopping super-node"); + + return -1; + } + return 0; } diff --git a/src/villas-node.cpp b/src/villas-node.cpp index 8f2b3812a..91568425f 100644 --- a/src/villas-node.cpp +++ b/src/villas-node.cpp @@ -55,11 +55,22 @@ using namespace villas; using namespace villas::node; using namespace villas::plugin; -static std::atomic stop(false); +SuperNode sn; static void quit(int signal, siginfo_t *sinfo, void *ctx) { - stop = true; + Logger logger = logging.get("node"); + + switch (signal) { + case SIGALRM: + logger->info("Reached timeout. Terminating..."); + break; + + default: + logger->info("Received {} signal. Terminating...", strsignal(signal)); + } + + sn.setState(STATE_STOPPING); } static void usage() @@ -108,7 +119,6 @@ int main(int argc, char *argv[]) { int ret; - SuperNode sn; Logger logger = logging.get("node"); try { @@ -175,10 +185,7 @@ int main(int argc, char *argv[]) throw RuntimeError("Failed to verify configuration"); sn.start(); - - while (!stop) - sn.run(); - + sn.run(); sn.stop(); logger->info(CLR_GRN("Goodbye!")); @@ -186,7 +193,7 @@ int main(int argc, char *argv[]) return 0; } - catch (std::exception *e) { - logger->error(e->what()); + catch (RuntimeError &e) { + logger->error("{}", e.what()); } } diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index a96976308..51df16d56 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -107,8 +107,15 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) { Logger logger = logging.get("pipe"); - if (signal == SIGALRM) - logger->info("Reached timeout. Terminating..."); + switch (signal) { + case SIGALRM: + logger->info("Reached timeout. Terminating..."); + break; + + default: + logger->info("Received {} signal. Terminating...", strsignal(signal)); + break; + } stop = true; } @@ -145,7 +152,7 @@ static void * send_loop(void *ctx) struct node *node = dirs->send.node; struct sample *smps[node->out.vectorize]; - while (!io_eof(dirs->send.io)) { + while (node->state == STATE_STARTED && !io_eof(dirs->send.io)) { allocated = sample_alloc_many(&dirs->send.pool, smps, node->out.vectorize); if (allocated < 0) throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize); @@ -184,14 +191,14 @@ static void * send_loop(void *ctx) leave: if (io_eof(dirs->send.io)) { if (dirs->recv.limit < 0) { logger->info("Reached end-of-file. Terminating..."); - killme(SIGTERM); + stop = true; } else logger->info("Reached end-of-file. Wait for receive side..."); } else { logger->info("Reached send limit. Terminating..."); - killme(SIGTERM); + stop = true; } return nullptr; @@ -207,7 +214,7 @@ static void * recv_loop(void *ctx) struct node *node = dirs->recv.node; struct sample *smps[node->in.vectorize]; - for (;;) { + while (node->state == STATE_STARTED) { allocated = sample_alloc_many(&dirs->recv.pool, smps, node->in.vectorize); if (allocated < 0) throw RuntimeError("Failed to allocate {} samples from receive pool.", node->in.vectorize); @@ -217,8 +224,12 @@ static void * recv_loop(void *ctx) release = allocated; recv = node_read(node, smps, allocated, &release); - if (recv < 0) - logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv); + if (recv < 0) { + if (node->state == STATE_STOPPING) + goto leave2; + else + logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv); + } else { io_print(dirs->recv.io, smps, recv); @@ -232,7 +243,7 @@ static void * recv_loop(void *ctx) } leave: logger->info("Reached receive limit. Terminating..."); - killme(SIGTERM); +leave2: stop = true; return nullptr; } @@ -411,7 +422,7 @@ check: if (optarg == endptr) alarm(timeout); while (!stop) - pause(); + sleep(1); if (dirs.recv.enabled) { pthread_cancel(dirs.recv.thread); diff --git a/src/villas-relay.cpp b/src/villas-relay.cpp index 283b59a17..99ffd273a 100644 --- a/src/villas-relay.cpp +++ b/src/villas-relay.cpp @@ -258,7 +258,7 @@ int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in try { new (c) Connection(wsi); } - catch (InvalidUrlException e) { + catch (InvalidUrlException &e) { lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *) "Invalid URL", strlen("Invalid URL")); return -1; } diff --git a/src/villas-signal.cpp b/src/villas-signal.cpp index 9270e1beb..594b2cdb0 100644 --- a/src/villas-signal.cpp +++ b/src/villas-signal.cpp @@ -160,6 +160,17 @@ void usage() static void quit(int signal, siginfo_t *sinfo, void *ctx) { + Logger logger = logging.get("signal"); + + switch (signal) { + case SIGALRM: + logger->info("Reached timeout. Terminating..."); + break; + + default: + logger->info("Received {} signal. Terminating...", strsignal(signal)); + } + stop = true; } @@ -246,16 +257,20 @@ int main(int argc, char *argv[]) if (ret) throw RuntimeError("Failed to open output"); - while (!stop) { + while (!stop && n.state == STATE_STARTED) { t = sample_alloc(&q); unsigned release = 1; // release = allocated - ret = node_read(&n, &t, 1, &release); - if (ret > 0) - io_print(&io, &t, 1); +retry: ret = node_read(&n, &t, 1, &release); + if (ret == 0) + goto retry; + else if (ret < 0) + goto out; - sample_decref(t); + io_print(&io, &t, 1); + +out: sample_decref(t); } ret = node_stop(&n); diff --git a/tests/benchmarks/run-benchmark.sh b/tests/benchmarks/run-benchmark.sh index f0e3e2857..707f24ff0 100755 --- a/tests/benchmarks/run-benchmark.sh +++ b/tests/benchmarks/run-benchmark.sh @@ -94,7 +94,7 @@ fi # Set paths SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh # Declare location of config files CONFIG=$(mktemp /tmp/nodetype-benchmark-config-XXXX.conf) diff --git a/tests/integration/README.md b/tests/integration/README.md new file mode 100644 index 000000000..85c6cbfd7 --- /dev/null +++ b/tests/integration/README.md @@ -0,0 +1,20 @@ +# Integration Tests + +Run tests: + +``` +$ BUILDDIR=/VILLASnode/build/ /VILLASnode/tools/integration-tests.sh +``` + +There are two options for the test script: + +``` +-v Show full test output +-f FILTER Filter test cases +``` + +Example: + +``` +$ BUILDDIR=/VILLASnode/build/ /VILLASnode/tools/integration-tests.sh -f pipe-loopback-socket -v +``` diff --git a/tests/integration/node-infiniband.sh b/tests/integration/node-infiniband.sh index 411fbd699..5c0fed3ff 100755 --- a/tests/integration/node-infiniband.sh +++ b/tests/integration/node-infiniband.sh @@ -43,7 +43,7 @@ fi SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp /tmp/ib-configuration-XXXX.conf) CONFIG_FILE_TARGET=$(mktemp /tmp/ib-configuration-target-XXXX.conf) diff --git a/tests/integration/node-loopback-socket.sh b/tests/integration/node-loopback-socket.sh index cb933a884..ed4454d2d 100755 --- a/tests/integration/node-loopback-socket.sh +++ b/tests/integration/node-loopback-socket.sh @@ -24,7 +24,7 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) diff --git a/tests/integration/node-mux_demux.sh b/tests/integration/node-mux_demux.sh index 2ada0b9ab..cdf453ce6 100755 --- a/tests/integration/node-mux_demux.sh +++ b/tests/integration/node-mux_demux.sh @@ -24,7 +24,7 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) OUTPUT_FILE=$(mktemp) diff --git a/tests/integration/node-stats.sh b/tests/integration/node-stats.sh index b32c1617f..090f6e69a 100755 --- a/tests/integration/node-stats.sh +++ b/tests/integration/node-stats.sh @@ -24,42 +24,51 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) +STATS_LOG=$(mktemp) + +RATE="33.0" cat > ${CONFIG_FILE} < /dev/null +RC=$? + +rm ${STATS_LOG} ${CONFIG_FILE} + +exit ${RC} diff --git a/tests/integration/node-test_rtt.sh b/tests/integration/node-test_rtt.sh index 61d731e1f..6dd133fa5 100755 --- a/tests/integration/node-test_rtt.sh +++ b/tests/integration/node-test_rtt.sh @@ -28,7 +28,7 @@ exit 99 SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) diff --git a/tests/integration/pipe-file-advio.sh b/tests/integration/pipe-file-advio.sh index 720529b64..52b09a7ee 100755 --- a/tests/integration/pipe-file-advio.sh +++ b/tests/integration/pipe-file-advio.sh @@ -24,7 +24,7 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) diff --git a/tests/integration/pipe-loopback-amqp.sh b/tests/integration/pipe-loopback-amqp.sh index 5957e7e51..b2137286a 100755 --- a/tests/integration/pipe-loopback-amqp.sh +++ b/tests/integration/pipe-loopback-amqp.sh @@ -24,7 +24,7 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) diff --git a/tests/integration/pipe-loopback-iec61850-9-2.sh b/tests/integration/pipe-loopback-iec61850-9-2.sh index a9b625803..1d7dd2b48 100755 --- a/tests/integration/pipe-loopback-iec61850-9-2.sh +++ b/tests/integration/pipe-loopback-iec61850-9-2.sh @@ -24,7 +24,7 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) diff --git a/tests/integration/pipe-loopback-mqtt.sh b/tests/integration/pipe-loopback-mqtt.sh index 2059e51bf..0be1dfd78 100755 --- a/tests/integration/pipe-loopback-mqtt.sh +++ b/tests/integration/pipe-loopback-mqtt.sh @@ -24,7 +24,7 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) diff --git a/tests/integration/pipe-loopback-nanomsg.sh b/tests/integration/pipe-loopback-nanomsg.sh index 2f2ffe733..f6aded6bf 100755 --- a/tests/integration/pipe-loopback-nanomsg.sh +++ b/tests/integration/pipe-loopback-nanomsg.sh @@ -24,7 +24,7 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) diff --git a/tests/integration/pipe-loopback-rtp-dual.sh b/tests/integration/pipe-loopback-rtp-dual.sh index cda8dca58..d041a3587 100755 --- a/tests/integration/pipe-loopback-rtp-dual.sh +++ b/tests/integration/pipe-loopback-rtp-dual.sh @@ -23,9 +23,14 @@ # along with this program. If not, see . ################################################################################## +if [ -n "${CI}" ]; then + echo "RTP tests are not ready yet" + exit 99 +fi + SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE_SRC=$(mktemp) CONFIG_FILE_DEST=$(mktemp) @@ -35,11 +40,14 @@ OUTPUT_FILE=$(mktemp) FORMAT="villas.binary" VECTORIZE="1" -RATE=10000 +RATE=100 NUM_SAMPLES=100 cat > ${CONFIG_FILE_SRC} << EOF { + "logging" : { + "level" : "debug" + }, "nodes" : { "rtp_node" : { "type" : "rtp", @@ -47,7 +55,7 @@ cat > ${CONFIG_FILE_SRC} << EOF "vectorize" : ${VECTORIZE}, "rate" : ${RATE}, "rtcp" : { - "enabled" : false, + "enabled" : true, "mode" : "aimd", "throttle_mode" : "decimate" }, @@ -72,6 +80,9 @@ EOF cat > ${CONFIG_FILE_DEST} << EOF { + "logging" : { + "level" : "debug" + }, "nodes" : { "rtp_node" : { "type" : "rtp", @@ -79,7 +90,7 @@ cat > ${CONFIG_FILE_DEST} << EOF "vectorize" : ${VECTORIZE}, "rate" : ${RATE}, "rtcp": { - "enabled" : false, + "enabled" : true, "mode" : "aimd", "throttle_mode" : "decimate" }, @@ -102,11 +113,16 @@ cat > ${CONFIG_FILE_DEST} << EOF } EOF -villas-signal mixed -v 5 -l ${NUM_SAMPLES} > ${INPUT_FILE} - +VILLAS_LOG_PREFIX="[DEST] " \ villas-pipe -l ${NUM_SAMPLES} ${CONFIG_FILE_DEST} rtp_node > ${OUTPUT_FILE} & +PID=$! -villas-pipe ${CONFIG_FILE_SRC} rtp_node < ${INPUT_FILE} +sleep 1 + +VILLAS_LOG_PREFIX="[SIGN] " \ +villas-signal mixed -v 5 -r ${RATE} -l ${NUM_SAMPLES} | tee ${INPUT_FILE} | \ +VILLAS_LOG_PREFIX="[SRC] " \ +villas-pipe ${CONFIG_FILE_SRC} rtp_node > ${OUTPUT_FILE} # Compare data villas-test-cmp ${CMPFLAGS} ${INPUT_FILE} ${OUTPUT_FILE} @@ -114,4 +130,6 @@ RC=$? rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE} +kill $PID + exit $RC diff --git a/tests/integration/pipe-loopback-rtp-tbf.sh b/tests/integration/pipe-loopback-rtp-tbf.sh new file mode 100755 index 000000000..80ac24894 --- /dev/null +++ b/tests/integration/pipe-loopback-rtp-tbf.sh @@ -0,0 +1,140 @@ +#!/bin/bash +# +# Integration loopback test for villas-pipe. +# +# @author Steffen Vogel +# @author Marvin Klimke +# @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC +# @license GNU General Public License (version 3) +# +# VILLASnode +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +################################################################################## + +if [ -n "${CI}" ]; then + echo "RTP tests are not ready yet" + exit 99 +fi + +SCRIPT=$(realpath $0) +SCRIPTPATH=$(dirname ${SCRIPT}) +source ${SCRIPTPATH}/../../tools/villas-helper.sh + +CONFIG_FILE_SRC=$(mktemp) +CONFIG_FILE_DEST=$(mktemp) +INPUT_FILE=$(mktemp) +OUTPUT_FILE=$(mktemp) + +FORMAT="villas.binary" +VECTORIZE="1" + +RATE=500 +NUM_SAMPLES=10000000 +NUM_VALUES=5 + +cat > ${CONFIG_FILE_SRC} << EOF +{ + "logging" : { + "level" : "info" + }, + "nodes" : { + "rtp_node" : { + "type" : "rtp", + "format" : "${FORMAT}", + "vectorize" : ${VECTORIZE}, + "rate" : ${RATE}, + "rtcp" : { + "enabled" : true, + "mode" : "aimd", + "throttle_mode" : "decimate" + }, + "aimd" : { + "a" : 10, + "b" : 0.75, + "start_rate" : ${RATE} + }, + "in" : { + "address" : "0.0.0.0:12002", + "signals" : { + "count" : ${NUM_VALUES}, + "type" : "float" + } + }, + "out" : { + "address" : "127.0.0.1:12000", + "fwmark" : 123 + } + } + } +} +EOF + +cat > ${CONFIG_FILE_DEST} << EOF +{ + "logging" : { + "level" : "info" + }, + "nodes" : { + "rtp_node" : { + "type" : "rtp", + "format" : "${FORMAT}", + "vectorize" : ${VECTORIZE}, + "rate" : ${RATE}, + "rtcp": { + "enabled" : true, + "mode" : "aimd", + "throttle_mode" : "decimate" + }, + "in" : { + "address" : "0.0.0.0:12000", + "signals" : { + "count" : ${NUM_VALUES}, + "type" : "float" + } + }, + "out" : { + "address" : "127.0.0.1:12002" + } + } + } +} +EOF + +tc qdisc del dev lo root +tc qdisc add dev lo root handle 4000 prio bands 4 priomap 1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1 +tc qdisc add dev lo parent 4000:3 tbf rate 40kbps burst 32kbit latency 200ms #peakrate 40kbps mtu 1000 minburst 1520 +tc filter add dev lo protocol ip handle 123 fw flowid 4000:3 + +#exit + +VILLAS_LOG_PREFIX="[DEST] " \ +villas-pipe -l ${NUM_SAMPLES} ${CONFIG_FILE_DEST} rtp_node > ${OUTPUT_FILE} & +PID=$! + +sleep 1 + +VILLAS_LOG_PREFIX="[SIGN] " \ +villas-signal mixed -v ${NUM_VALUES} -r ${RATE} -l ${NUM_SAMPLES} | tee ${INPUT_FILE} | \ +VILLAS_LOG_PREFIX="[SRC] " \ +villas-pipe ${CONFIG_FILE_SRC} rtp_node > ${OUTPUT_FILE} + +# Compare data +villas-test-cmp ${CMPFLAGS} ${INPUT_FILE} ${OUTPUT_FILE} +RC=$? + +rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE} + +kill $PID +exit $RC diff --git a/tests/integration/pipe-loopback-rtp.sh b/tests/integration/pipe-loopback-rtp.sh index 6f5f3c4d3..4a3bb00f8 100755 --- a/tests/integration/pipe-loopback-rtp.sh +++ b/tests/integration/pipe-loopback-rtp.sh @@ -22,9 +22,14 @@ # along with this program. If not, see . ################################################################################## +if [ -n "${CI}" ]; then + echo "RTP tests are not ready yet" + exit 99 +fi + SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) diff --git a/tests/integration/pipe-loopback-socket.sh b/tests/integration/pipe-loopback-socket.sh index 9463379ce..9ea103c4b 100755 --- a/tests/integration/pipe-loopback-socket.sh +++ b/tests/integration/pipe-loopback-socket.sh @@ -24,7 +24,7 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) @@ -37,9 +37,8 @@ NUM_VALUES=${NUM_VALUES:-4} # Generate test data villas-signal -v ${NUM_VALUES} -l ${NUM_SAMPLES} -n random > ${INPUT_FILE} -for FORMAT in villas.human villas.binary villas.web csv json gtnet.fake raw.32.le raw.64.be protobuf; do +for FORMAT in villas.human gtnet.fake protobuf; do for LAYER in udp ip eth unix; do -for VERIFY_SOURCE in true false; do VECTORIZES="1" @@ -88,7 +87,6 @@ cat > ${CONFIG_FILE} << EOF }, "in" : { "address" : "${LOCAL}", - "verify_source" : ${VERIFY_SOURCE}, "signals" : { "count" : ${NUM_VALUES}, "type" : "float" @@ -111,7 +109,7 @@ villas-test-cmp ${CMPFLAGS} ${INPUT_FILE} ${OUTPUT_FILE} RC=$? if (( ${RC} != 0 )); then - echo "=========== Sub-test failed for: format=${FORMAT}, layer=${LAYER}, verify_source=${VERIFY_SOURCE}, vectorize=${VECTORIZE}" + echo "=========== Sub-test failed for: format=${FORMAT}, layer=${LAYER}, vectorize=${VECTORIZE}" echo "Config:" cat ${CONFIG_FILE} echo @@ -122,10 +120,10 @@ if (( ${RC} != 0 )); then cat ${OUTPUT_FILE} exit ${RC} else - echo "=========== Sub-test succeeded for: format=${FORMAT}, layer=${LAYER}, verify_source=${VERIFY_SOURCE}, vectorize=${VECTORIZE}" + echo "=========== Sub-test succeeded for: format=${FORMAT}, layer=${LAYER}, vectorize=${VECTORIZE}" fi -done; done; done; done +done; done; done rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE} ${THEORIES} diff --git a/tests/integration/pipe-loopback-websocket.sh b/tests/integration/pipe-loopback-websocket.sh index 2b47a5dba..e62af21e6 100755 --- a/tests/integration/pipe-loopback-websocket.sh +++ b/tests/integration/pipe-loopback-websocket.sh @@ -24,7 +24,7 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) CONFIG_FILE2=$(mktemp) diff --git a/tests/integration/pipe-loopback-zeromq.sh b/tests/integration/pipe-loopback-zeromq.sh index cd92683fc..537a0177a 100755 --- a/tests/integration/pipe-loopback-zeromq.sh +++ b/tests/integration/pipe-loopback-zeromq.sh @@ -24,7 +24,7 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) -source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh +source ${SCRIPTPATH}/../../tools/villas-helper.sh CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) diff --git a/tests/integration/pipe-python-protobuf.sh b/tests/integration/pipe-python-protobuf.sh index 7159e74ae..756322ff7 100755 --- a/tests/integration/pipe-python-protobuf.sh +++ b/tests/integration/pipe-python-protobuf.sh @@ -25,7 +25,7 @@ SCRIPT=$(realpath $0) SCRIPTPATH=$(dirname ${SCRIPT}) SRCDIR=$(realpath ${SCRIPTPATH}/../..) -source ${SRCDIR}/tools/integration-tests-helper.sh +source ${SRCDIR}/tools/villas-helper.sh CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) diff --git a/tests/unit/mapping.cpp b/tests/unit/mapping.cpp index 91d2a290c..37c92bda0 100644 --- a/tests/unit/mapping.cpp +++ b/tests/unit/mapping.cpp @@ -73,8 +73,8 @@ Test(mapping, parse_nodes) cr_assert_eq(ret, 0); cr_assert_eq(m.node, vlist_lookup(&nodes, "cherry")); cr_assert_eq(m.type, MAPPING_TYPE_STATS); - cr_assert_eq(m.stats.id, STATS_OWD); - cr_assert_eq(m.stats.type, MAPPING_STATS_TYPE_MEAN); + cr_assert_eq(m.stats.metric, STATS_METRIC_OWD); + cr_assert_eq(m.stats.type, STATS_TYPE_MEAN); ret = mapping_parse_str(&m, "carrot.data[1-2]", &nodes); cr_assert_eq(ret, 0); @@ -126,8 +126,8 @@ Test(mapping, parse) ret = mapping_parse_str(&m, "stats.owd.mean", nullptr); cr_assert_eq(ret, 0); cr_assert_eq(m.type, MAPPING_TYPE_STATS); - cr_assert_eq(m.stats.id, STATS_OWD); - cr_assert_eq(m.stats.type, MAPPING_STATS_TYPE_MEAN); + cr_assert_eq(m.stats.metric, STATS_METRIC_OWD); + cr_assert_eq(m.stats.type, STATS_TYPE_MEAN); ret = mapping_parse_str(&m, "data[1-2]", nullptr); cr_assert_eq(ret, 0); @@ -171,10 +171,10 @@ Test(mapping, parse) cr_assert_neq(ret, 0); /* Check for superfluous chars at the end */ - ret = mapping_parse_str(&m, "stats.ts.origin.bla", nullptr); + ret = mapping_parse_str(&m, "hdr.ts.origin.bla", nullptr); cr_assert_neq(ret, 0); - ret = mapping_parse_str(&m, "stats.ts.origin.", nullptr); + ret = mapping_parse_str(&m, "hdr.ts.origin.", nullptr); cr_assert_neq(ret, 0); ret = mapping_parse_str(&m, "data[1-2]bla", nullptr); diff --git a/tools/plots/plot_aimd.py b/tools/plots/plot_aimd.py new file mode 100644 index 000000000..ce8344428 --- /dev/null +++ b/tools/plots/plot_aimd.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 + +import sys +import matplotlib as mpl +import numpy as np +import matplotlib.pyplot as plt + +def read_datafile(file_name): + # the skiprows keyword is for heading, but I don't know if trailing lines + # can be specified + data = np.loadtxt(file_name, delimiter='\t', skiprows=1) + return data + +filename = sys.argv[1] + +data = read_datafile(filename) + +print(data[:,0]) + +fig, ax1 = plt.subplots() +ax1.plot(data[:,0], data[:,1]) + +ax2 = ax1.twinx() +ax2.plot(data[:,0], data[:,2], c='red') + +fig.tight_layout() +plt.show() diff --git a/tools/integration-tests-helper.sh b/tools/villas-helper.sh similarity index 100% rename from tools/integration-tests-helper.sh rename to tools/villas-helper.sh