diff --git a/clients/shmem/villas-shmem.cpp b/clients/shmem/villas-shmem.cpp index 52dd044c4..ec05694fc 100644 --- a/clients/shmem/villas-shmem.cpp +++ b/clients/shmem/villas-shmem.cpp @@ -23,108 +23,123 @@ * along with this program. If not, see . *********************************************************************************/ -#include #include #include #include -#include -#include #include #include #include #include #include +#include +#include +#include #include -#include +#include -using namespace villas; +namespace villas { +namespace node { +namespace tools { -static std::atomic stop(false); +class Shmem : public Tool { -static void usage() -{ - std::cout << "Usage: villas-test-shmem WNAME VECTORIZE" << std::endl - << " WNAME name of the shared memory object for the output queue" << std::endl - << " RNAME name of the shared memory object for the input queue" << std::endl - << " VECTORIZE maximum number of samples to read/write at a time" << std::endl; +public: + Shmem(int argc, char *argv[]) : + Tool(argc, argv, "shmem"), + stop(false) + { } - print_copyright(); -} +protected: + std::atomic stop; -void quit(int, siginfo_t*, void*) -{ - stop = true; -} + void usage() + { + std::cout << "Usage: villas-test-shmem WNAME VECTORIZE" << std::endl + << " WNAME name of the shared memory object for the output queue" << std::endl + << " RNAME name of the shared memory object for the input queue" << std::endl + << " VECTORIZE maximum number of samples to read/write at a time" << std::endl; -int main(int argc, char* argv[]) -{ - int ret, readcnt, writecnt, avail; - - Logger logger = logging.get("test-shmem"); - - struct shmem_int shm; - struct shmem_conf conf = { - .polling = 0, - .queuelen = DEFAULT_SHMEM_QUEUELEN, - .samplelen = DEFAULT_SHMEM_SAMPLELEN - }; - - if (argc != 4) { - usage(); - return 1; + printCopyright(); } - ret = utils::signals_init(quit); - if (ret) - throw RuntimeError("Failed to initialize signals"); - - char *wname = argv[1]; - char *rname = argv[2]; - int vectorize = atoi(argv[3]); - - ret = shmem_int_open(wname, rname, &shm, &conf); - if (ret < 0) - throw RuntimeError("Failed to open shared-memory interface"); - - struct sample *insmps[vectorize], *outsmps[vectorize]; - - while (!stop) { - readcnt = shmem_int_read(&shm, insmps, vectorize); - if (readcnt == -1) { - logger->info("Node stopped, exiting"); - break; - } - - avail = shmem_int_alloc(&shm, outsmps, readcnt); - if (avail < readcnt) - logger->warn("Pool underrun: %d / %d\n", avail, readcnt); - - for (int i = 0; i < avail; i++) { - outsmps[i]->sequence = insmps[i]->sequence; - outsmps[i]->ts = insmps[i]->ts; - - int len = MIN(insmps[i]->length, outsmps[i]->capacity); - memcpy(outsmps[i]->data, insmps[i]->data, SAMPLE_DATA_LENGTH(len)); - - outsmps[i]->length = len; - } - - for (int i = 0; i < readcnt; i++) - sample_decref(insmps[i]); - - writecnt = shmem_int_write(&shm, outsmps, avail); - if (writecnt < avail) - logger->warn("Short write"); - - logger->info("Read / Write: {}/{}", readcnt, writecnt); + void handler(int, siginfo_t *, void *) + { + stop = true; } - ret = shmem_int_close(&shm); - if (ret) - throw RuntimeError("Failed to close shared-memory interface"); + int main() + { + int ret, readcnt, writecnt, avail; - logger->info(CLR_GRN("Goodbye!")); + struct shmem_int shm; + struct shmem_conf conf = { + .polling = 0, + .queuelen = DEFAULT_SHMEM_QUEUELEN, + .samplelen = DEFAULT_SHMEM_SAMPLELEN + }; - return 0; + if (argc != 4) { + usage(); + return 1; + } + + std::string wname = argv[1]; + std::string rname = argv[2]; + int vectorize = atoi(argv[3]); + + ret = shmem_int_open(wname.c_str(), rname.c_str(), &shm, &conf); + if (ret < 0) + throw RuntimeError("Failed to open shared-memory interface"); + + struct sample *insmps[vectorize], *outsmps[vectorize]; + + while (!stop) { + readcnt = shmem_int_read(&shm, insmps, vectorize); + if (readcnt == -1) { + logger->info("Node stopped, exiting"); + break; + } + + avail = shmem_int_alloc(&shm, outsmps, readcnt); + if (avail < readcnt) + logger->warn("Pool underrun: %d / %d\n", avail, readcnt); + + for (int i = 0; i < avail; i++) { + outsmps[i]->sequence = insmps[i]->sequence; + outsmps[i]->ts = insmps[i]->ts; + + int len = MIN(insmps[i]->length, outsmps[i]->capacity); + memcpy(outsmps[i]->data, insmps[i]->data, SAMPLE_DATA_LENGTH(len)); + + outsmps[i]->length = len; + } + + for (int i = 0; i < readcnt; i++) + sample_decref(insmps[i]); + + writecnt = shmem_int_write(&shm, outsmps, avail); + if (writecnt < avail) + logger->warn("Short write"); + + logger->info("Read / Write: {}/{}", readcnt, writecnt); + } + + ret = shmem_int_close(&shm); + if (ret) + throw RuntimeError("Failed to close shared-memory interface"); + + return 0; + } +}; + +} // namespace tools +} // namespace node +} // namespace villas + +int main(int argc, char *argv[]) +{ + auto t = villas::node::tools::Shmem(argc, argv); + + return t.run(); } diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b9fa7cd82..457facd05 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -20,6 +20,16 @@ # along with this program. If not, see . ################################################################################### +set(SRCS + villas-node + villas-test-config + villas-test-rtt + villas-test-cmp + villas-convert + villas-pipe + villas-signal +) + add_executable(villas-node villas-node.cpp) target_link_libraries(villas-node PUBLIC villas) @@ -29,24 +39,6 @@ target_link_libraries(villas-test-config PUBLIC villas) add_executable(villas-test-rtt villas-test-rtt.cpp) target_link_libraries(villas-test-rtt PUBLIC villas) -install( - TARGETS villas-node villas-test-rtt - COMPONENT bin - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} -) - -if(WITH_WEB) - add_executable(villas-relay villas-relay.cpp) - target_include_directories(villas-relay PRIVATE ${LIBWEBSOCKETS_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIR} ${PROJECT_SOURCE_DIR}/common/include) - target_link_libraries(villas-relay PRIVATE PkgConfig::LIBWEBSOCKETS PkgConfig::UUID villas-common spdlog) - - install( - TARGETS villas-relay - COMPONENT bin - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - ) -endif() - add_executable(villas-test-cmp villas-test-cmp.cpp) target_link_libraries(villas-test-cmp PUBLIC villas) @@ -59,20 +51,37 @@ target_link_libraries(villas-pipe PUBLIC villas Threads::Threads) add_executable(villas-signal villas-signal.cpp) target_link_libraries(villas-signal PUBLIC villas) -install( - TARGETS villas-convert villas-pipe villas-signal villas-test-cmp - COMPONENT bin - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} -) +if(WITH_WEB) + add_executable(villas-relay villas-relay.cpp) + target_include_directories(villas-relay PRIVATE ${LIBWEBSOCKETS_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIR}) + target_link_libraries(villas-relay PRIVATE PkgConfig::UUID villas) + + list(APPEND SRCS villas-relay) +endif() + +if(WITH_CONFIG) + add_executable(villas-conf2json villas-conf2json.cpp) + target_link_libraries(villas-conf2json PUBLIC villas) + + list(APPEND SRCS villas-conf2json) +endif() + +if(LIBZMQ_FOUND) + add_executable(villas-zmq-keygen villas-zmq-keygen.cpp) + target_link_libraries(villas-zmq-keygen PUBLIC villas-common PkgConfig::LIBZMQ) + + list(APPEND SRC villas-zmq-keygen) +endif() if(WITH_HOOKS) add_executable(villas-hook villas-hook.cpp) target_link_libraries(villas-hook PUBLIC villas) - install( - TARGETS villas-hook - COMPONENT bin - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - ) + list(APPEND SRCS villas-hook) endif() +install( + TARGETS ${SRCS} + COMPONENT bin + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} +) diff --git a/tools/conf2json.cpp b/src/villas-conf2json.cpp similarity index 51% rename from tools/conf2json.cpp rename to src/villas-conf2json.cpp index 548bc32c8..49e040e75 100644 --- a/tools/conf2json.cpp +++ b/src/villas-conf2json.cpp @@ -28,53 +28,78 @@ #include #include #include -#include +#include -void usage() -{ - std::cout << "Usage: conf2json input.conf > output.json" << std::endl << std::endl; +namespace villas { +namespace node { +namespace tools { - villas::print_copyright(); -} +class Config2Json : public Tool { + +public: + Config2Json(int argc, char *argv[]) : + Tool(argc, argv, "conf2json") + { } + +protected: + + void usage() + { + std::cout << "Usage: conf2json input.conf > output.json" << std::endl << std::endl; + + printCopyright(); + } + + int main() + { + int ret; + config_t cfg; + config_setting_t *cfg_root; + json_t *json; + + if (argc != 2) { + usage(); + exit(EXIT_FAILURE); + } + + FILE *f = fopen(argv[1], "r"); + if(f == nullptr) + return -1; + + const char *confdir = dirname(argv[1]); + + config_init(&cfg); + + config_set_include_dir(&cfg, confdir); + + ret = config_read(&cfg, f); + if (ret != CONFIG_TRUE) + return -2; + + cfg_root = config_root_setting(&cfg); + + json = config_to_json(cfg_root); + if (!json) + return -3; + + ret = json_dumpf(json, stdout, JSON_INDENT(2)); fflush(stdout); + if (ret) + return ret; + + json_decref(json); + config_destroy(&cfg); + + return 0; + } +}; + +} // namespace tools +} // namespace node +} // namespace villas int main(int argc, char *argv[]) { - int ret; - config_t cfg; - config_setting_t *cfg_root; - json_t *json; + auto t = villas::node::tools::Config2Json(argc, argv); - if (argc != 2) { - usage(); - exit(EXIT_FAILURE); - } - - FILE *f = fopen(argv[1], "r"); - if(f == nullptr) - return -1; - - const char *confdir = dirname(argv[1]); - - config_init(&cfg); - - config_set_include_dir(&cfg, confdir); - - ret = config_read(&cfg, f); - if (ret != CONFIG_TRUE) - return -2; - - cfg_root = config_root_setting(&cfg); - - json = config_to_json(cfg_root); - if (!json) - return -3; - - ret = json_dumpf(json, stdout, JSON_INDENT(2)); fflush(stdout); - if (ret) - return ret; - - json_decref(json); - config_destroy(&cfg); - - return 0; + return t.run(); } diff --git a/src/villas-convert.cpp b/src/villas-convert.cpp index a9a7abe33..ee66a18b2 100644 --- a/src/villas-convert.cpp +++ b/src/villas-convert.cpp @@ -26,54 +26,78 @@ #include +#include #include #include #include #include #include #include -#include using namespace villas; -static void usage() -{ - std::cout << "Usage: villas-convert [OPTIONS]" << std::endl - << " OPTIONS are:" << std::endl - << " -i FMT set the input format" << std::endl - << " -o FMT set the output format" << std::endl - << " -t DT the data-type format string" << std::endl - << " -d LVL set debug log level to LVL" << std::endl - << " -h show this usage information" << std::endl - << " -V show the version of the tool" << std::endl << std::endl; +namespace villas { +namespace node { +namespace tools { - print_copyright(); -} +class Convert : public Tool { -int main(int argc, char *argv[]) -{ - Logger logger = logging.get("test-rtt"); - - try { +public: + Convert(int argc, char *argv[]) : + Tool(argc, argv, "convert"), + dtypes("64f") + { int ret; - const char *input_format = "villas.human"; - const char *output_format = "villas.human"; - const char *dtypes = "64f"; + ret = memory_init(DEFAULT_NR_HUGEPAGES); + if (ret) + throw RuntimeError("Failed to initialize memory"); + + for (unsigned i = 0; i < ARRAY_LEN(dirs); i++) { + dirs[i].format = "villas.human"; + dirs[i].io.state = STATE_DESTROYED; + } + } + +protected: + std::string dtypes; + + struct { + std::string name; + std::string format; + struct io io; + } dirs[2]; + + void usage() + { + std::cout << "Usage: villas-convert [OPTIONS]" << std::endl + << " OPTIONS are:" << std::endl + << " -i FMT set the input format" << std::endl + << " -o FMT set the output format" << std::endl + << " -t DT the data-type format string" << std::endl + << " -d LVL set debug log level to LVL" << std::endl + << " -h show this usage information" << std::endl + << " -V show the version of the tool" << std::endl << std::endl; + + printCopyright(); + } + + void parse() + { /* Parse optional command line arguments */ int c; while ((c = getopt(argc, argv, "Vhd:i:o:t:")) != -1) { switch (c) { case 'V': - print_version(); + printVersion(); exit(EXIT_SUCCESS); case 'i': - input_format = optarg; + dirs[0].format = optarg; break; case 'o': - output_format = optarg; + dirs[1].format = optarg; break; case 't': @@ -95,36 +119,28 @@ int main(int argc, char *argv[]) usage(); exit(EXIT_FAILURE); } + } - struct format_type *ft; - struct io input; - struct io output; - - input.state = STATE_DESTROYED; - output.state = STATE_DESTROYED; - - struct { - const char *name; - struct io *io; - } dirs[] = { - { input_format, &input }, - { output_format, &output }, - }; + int main() + { + int ret; for (unsigned i = 0; i < ARRAY_LEN(dirs); i++) { - ft = format_type_lookup(dirs[i].name); - if (!ft) - throw RuntimeError("Invalid format: {}", dirs[i].name); + struct format_type *ft; - ret = io_init2(dirs[i].io, ft, dtypes, SAMPLE_HAS_ALL); + ft = format_type_lookup(dirs[i].format.c_str()); + if (!ft) + throw RuntimeError("Invalid format: {}", dirs[i].format); + + ret = io_init2(&dirs[i].io, ft, dtypes.c_str(), SAMPLE_HAS_ALL); if (ret) throw RuntimeError("Failed to initialize IO: {}", dirs[i].name); - ret = io_check(dirs[i].io); + ret = io_check(&dirs[i].io); if (ret) throw RuntimeError("Failed to validate IO configuration"); - ret = io_open(dirs[i].io, nullptr); + ret = io_open(&dirs[i].io, nullptr); if (ret) throw RuntimeError("Failed to open IO"); } @@ -132,32 +148,38 @@ int main(int argc, char *argv[]) struct sample *smp = sample_alloc_mem(DEFAULT_SAMPLE_LENGTH); for (;;) { - ret = io_scan(&input, &smp, 1); + ret = io_scan(&dirs[0].io, &smp, 1); if (ret == 0) continue; if (ret < 0) break; - io_print(&output, &smp, 1); + io_print(&dirs[1].io, &smp, 1); } for (unsigned i = 0; i < ARRAY_LEN(dirs); i++) { - ret = io_close(dirs[i].io); + ret = io_close(&dirs[i].io); if (ret) throw RuntimeError("Failed to close IO"); - ret = io_destroy(dirs[i].io); + ret = io_destroy(&dirs[i].io); if (ret) throw RuntimeError("Failed to destroy IO"); } return 0; } - catch (std::runtime_error &e) { - logger->error("{}", e.what()); +}; - return -1; - } +} // namespace tools +} // namespace node +} // namespace villas + +int main(int argc, char *argv[]) +{ + auto t = villas::node::tools::Convert(argc, argv); + + return t.run(); } /** @} */ diff --git a/src/villas-hook.cpp b/src/villas-hook.cpp index 4ac3455b9..0967ba9e8 100644 --- a/src/villas-hook.cpp +++ b/src/villas-hook.cpp @@ -30,6 +30,7 @@ #include #include +#include #include #include #include @@ -39,7 +40,6 @@ #include #include #include -#include #include #include #include @@ -50,66 +50,92 @@ using namespace villas; using namespace villas::node; using namespace villas::plugin; -static std::atomic stop(false); +namespace villas { +namespace node { +namespace tools { -static void quit(int signal, siginfo_t *sinfo, void *ctx) -{ - stop = true; -} +class Hook : public Tool { -static void usage() -{ - std::cout << "Usage: villas-hook [OPTIONS] NAME [[PARAM1] [PARAM2] ...]" << std::endl - << " NAME the name of the hook function" << std::endl - << " PARAM* a string of configuration settings for the hook" << std::endl - << " OPTIONS is one or more of the following options:" << std::endl - << " -f FMT the data format" << std::endl - << " -t DT the data-type format string" << std::endl - << " -d LVL set debug level to LVL" << std::endl - << " -v CNT process CNT smps at once" << std::endl - << " -h show this help" << std::endl - << " -V show the version of the tool" << std::endl << std::endl; +public: + Hook(int argc, char *argv[]) : + Tool(argc, argv, "hook"), + stop(false), + format("villas.human"), + dtypes("64f"), + cnt(1) + { + int ret; -#ifdef WITH_HOOKS - std::cout << "Supported hooks:" << std::endl; - for (Plugin *p : Registry::lookup()) - std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl; - std::cout << std::endl; -#endif /* WITH_HOOKS */ - - std::cout << "Supported IO formats:" << std::endl; - plugin_dump(PLUGIN_TYPE_FORMAT); - std::cout << std::endl; - - std::cout << "Example:" << std::endl - << " villas-signal random | villas-hook skip_first seconds=10" << std::endl - << std::endl; - - print_copyright(); -} - -int main(int argc, char *argv[]) -{ - int ret, recv, sent, cnt; - const char *format = "villas.human"; - const char *dtypes = "64f"; - - struct format_type *ft; - struct sample **smps; - - Logger logger = logging.get("hook"); - - try { - struct pool p; - struct io io; + ret = memory_init(DEFAULT_NR_HUGEPAGES); + if (ret) + throw RuntimeError("Failed to initialize memory"); p.state = STATE_DESTROYED; io.state = STATE_DESTROYED; - /* Default values */ - cnt = 1; + cfg_cli = json_object(); + } - json_t *cfg_cli = json_object(); + ~Hook() + { + json_decref(cfg_cli); + } + +protected: + + std::atomic stop; + + std::string hook; + + std::string format; + std::string dtypes; + + struct pool p; + struct io io; + + int cnt; + + json_t *cfg_cli; + + void handler(int signal, siginfo_t *sinfo, void *ctx) + { + stop = true; + } + + void usage() + { + std::cout << "Usage: villas-hook [OPTIONS] NAME [[PARAM1] [PARAM2] ...]" << std::endl + << " NAME the name of the hook function" << std::endl + << " PARAM* a string of configuration settings for the hook" << std::endl + << " OPTIONS is one or more of the following options:" << std::endl + << " -f FMT the data format" << std::endl + << " -t DT the data-type format string" << std::endl + << " -d LVL set debug level to LVL" << std::endl + << " -v CNT process CNT smps at once" << std::endl + << " -h show this help" << std::endl + << " -V show the version of the tool" << std::endl << std::endl; + +#ifdef WITH_HOOKS + std::cout << "Supported hooks:" << std::endl; + for (Plugin *p : Registry::lookup()) + std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl; + std::cout << std::endl; +#endif /* WITH_HOOKS */ + + std::cout << "Supported IO formats:" << std::endl; + plugin_dump(PLUGIN_TYPE_FORMAT); + std::cout << std::endl; + + std::cout << "Example:" << std::endl + << " villas-signal random | villas-hook skip_first seconds=10" << std::endl + << std::endl; + + printCopyright(); + } + + void parse() + { + int ret; /* Parse optional command line arguments */ int c; @@ -117,7 +143,7 @@ int main(int argc, char *argv[]) while ((c = getopt(argc, argv, "Vhv:d:f:t:o:")) != -1) { switch (c) { case 'V': - print_version(); + printVersion(); exit(EXIT_SUCCESS); case 'f': @@ -160,19 +186,19 @@ check: if (optarg == endptr) exit(EXIT_FAILURE); } - char *hook = argv[optind]; + hook = argv[optind]; + } - ret = utils::signals_init(quit); - if (ret) - throw RuntimeError("Failed to intialize signals"); + int main() + { + int ret, recv, sent; + + struct format_type *ft; + struct sample **smps; if (cnt < 1) throw RuntimeError("Vectorize option must be greater than 0"); - ret = memory_init(DEFAULT_NR_HUGEPAGES); - if (ret) - throw RuntimeError("Failed to initialize memory"); - smps = new struct sample*[cnt]; ret = pool_init(&p, 10 * cnt, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), &memory_hugepage); @@ -180,11 +206,11 @@ check: if (optarg == endptr) throw RuntimeError("Failed to initilize memory pool"); /* Initialize IO */ - ft = format_type_lookup(format); + ft = format_type_lookup(format.c_str()); if (!ft) throw RuntimeError("Unknown IO format '{}'", format); - ret = io_init2(&io, ft, dtypes, SAMPLE_HAS_ALL); + ret = io_init2(&io, ft, dtypes.c_str(), SAMPLE_HAS_ALL); if (ret) throw RuntimeError("Failed to initialize IO"); @@ -280,13 +306,17 @@ stop: sent = io_print(&io, smps, send); if (ret) throw RuntimeError("Failed to destroy memory pool"); - logger->info(CLR_GRN("Goodbye!")); - return 0; } - catch (std::runtime_error &e) { - logger->error("{}", e.what()); +}; - return -1; - } +} // namespace tools +} // namespace node +} // namespace villas + +int main(int argc, char *argv[]) +{ + auto t = villas::node::tools::Hook(argc, argv); + + return t.run(); } diff --git a/src/villas-node.cpp b/src/villas-node.cpp index 4c6af8114..963223145 100644 --- a/src/villas-node.cpp +++ b/src/villas-node.cpp @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -42,7 +43,6 @@ #include #include #include -#include #include #include #include @@ -55,74 +55,86 @@ using namespace villas; using namespace villas::node; using namespace villas::plugin; -SuperNode sn; +namespace villas { +namespace node { +namespace tools { -static void quit(int signal, siginfo_t *sinfo, void *ctx) -{ - Logger logger = logging.get("node"); +class Node : public Tool { - switch (signal) { - case SIGALRM: - logger->info("Reached timeout. Terminating..."); - break; +public: + Node(int argc, char *argv[]) : + Tool(argc, argv, "node") + { + int ret; - default: - logger->info("Received {} signal. Terminating...", strsignal(signal)); + ret = memory_init(DEFAULT_NR_HUGEPAGES); + if (ret) + throw RuntimeError("Failed to initialize memory"); } - sn.setState(STATE_STOPPING); -} +protected: + SuperNode sn; -static void usage() -{ - std::cout << "Usage: villas-node [OPTIONS] [CONFIG]" << std::endl - << " OPTIONS is one or more of the following options:" << std::endl - << " -h show this usage information" << std::endl - << " -d LVL set logging level" << std::endl - << " -V show the version of the tool" << std::endl << std::endl - << " CONFIG is the path to an optional configuration file" << std::endl - << " if omitted, VILLASnode will start without a configuration" << std::endl - << " and wait for provisioning over the web interface." << std::endl << std::endl + std::string uri; + + void handler(int signal, siginfo_t *sinfo, void *ctx) + { + switch (signal) { + case SIGALRM: + logger->info("Reached timeout. Terminating..."); + break; + + default: + logger->info("Received {} signal. Terminating...", strsignal(signal)); + } + + sn.setState(STATE_STOPPING); + } + + void usage() + { + std::cout << "Usage: villas-node [OPTIONS] [CONFIG]" << std::endl + << " OPTIONS is one or more of the following options:" << std::endl + << " -h show this usage information" << std::endl + << " -d LVL set logging level" << std::endl + << " -V show the version of the tool" << std::endl << std::endl + << " CONFIG is the path to an optional configuration file" << std::endl + << " if omitted, VILLASnode will start without a configuration" << std::endl + << " and wait for provisioning over the web interface." << std::endl << std::endl #ifdef ENABLE_OPAL_ASYNC - << "Usage: villas-node OPAL_ASYNC_SHMEM_NAME OPAL_ASYNC_SHMEM_SIZE OPAL_PRINT_SHMEM_NAME" << std::endl - << " This type of invocation is used by OPAL-RT Asynchronous processes." << std::endl - << " See in the RT-LAB User Guide for more information." << std::endl << std::endl + << "Usage: villas-node OPAL_ASYNC_SHMEM_NAME OPAL_ASYNC_SHMEM_SIZE OPAL_PRINT_SHMEM_NAME" << std::endl + << " This type of invocation is used by OPAL-RT Asynchronous processes." << std::endl + << " See in the RT-LAB User Guide for more information." << std::endl << std::endl #endif /* ENABLE_OPAL_ASYNC */ - << "Supported node-types:" << std::endl; - plugin_dump(PLUGIN_TYPE_NODE); - std::cout << std::endl; + << "Supported node-types:" << std::endl; + plugin_dump(PLUGIN_TYPE_NODE); + std::cout << std::endl; #ifdef WITH_HOOKS - std::cout << "Supported hooks:" << std::endl; - for (Plugin *p : Registry::lookup()) - std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl; - std::cout << std::endl; + std::cout << "Supported hooks:" << std::endl; + for (Plugin *p : Registry::lookup()) + std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl; + std::cout << std::endl; #endif /* WITH_HOOKS */ #ifdef WITH_API - std::cout << "Supported API commands:" << std::endl; - for (Plugin *p : Registry::lookup()) - std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl; - std::cout << std::endl; + std::cout << "Supported API commands:" << std::endl; + for (Plugin *p : Registry::lookup()) + std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl; + std::cout << std::endl; #endif /* WITH_API */ - std::cout << "Supported IO formats:" << std::endl; - plugin_dump(PLUGIN_TYPE_FORMAT); - std::cout << std::endl; + std::cout << "Supported IO formats:" << std::endl; + plugin_dump(PLUGIN_TYPE_FORMAT); + std::cout << std::endl; - print_copyright(); -} + printCopyright(); + } -int main(int argc, char *argv[]) -{ - int ret; - const char *uri; - - Logger logger = logging.get("node"); - - try { - /* Check arguments */ + void parse() + { + /* Check arguments */ #ifdef ENABLE_OPAL_ASYNC if (argc != 4) { usage(); @@ -139,7 +151,7 @@ int main(int argc, char *argv[]) while ((c = getopt(argc, argv, "hVd:")) != -1) { switch (c) { case 'V': - print_version(); + printVersion(); exit(EXIT_SUCCESS); case 'd': @@ -157,18 +169,15 @@ int main(int argc, char *argv[]) if (argc == optind + 1) uri = argv[optind]; - else if (argc == optind) - uri = nullptr; - else { + else if (argc != optind) { usage(); exit(EXIT_FAILURE); } #endif /* ENABLE_OPAL_ASYNC */ + } - logger->info("This is VILLASnode {} (built on {}, {})", - CLR_BLD(CLR_YEL(PROJECT_BUILD_ID)), - CLR_BLD(CLR_MAG(__DATE__)), CLR_BLD(CLR_MAG(__TIME__))); - + int main() + { #ifdef __linux__ /* Checks system requirements*/ auto required = utils::Version(KERNEL_VERSION_MAJ, KERNEL_VERSION_MIN); @@ -176,11 +185,7 @@ int main(int argc, char *argv[]) throw RuntimeError("Your kernel version is to old: required >= {}.{}", KERNEL_VERSION_MAJ, KERNEL_VERSION_MIN); #endif /* __linux__ */ - ret = utils::signals_init(quit); - if (ret) - throw RuntimeError("Failed to initialize signal subsystem"); - - if (uri) + if (!uri.empty()) sn.parse(uri); else logger->warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance."); @@ -191,13 +196,20 @@ int main(int argc, char *argv[]) sn.run(); sn.stop(); - logger->info(CLR_GRN("Goodbye!")); - return 0; } - catch (std::runtime_error &e) { - logger->error("{}", e.what()); +}; - return -1; - } +} // namespace tools +} // namespace node +} // namespace villas + +int main(int argc, char *argv[]) +{ + auto t = villas::node::tools::Node(argc, argv); + + return t.run(); } + +/** @} */ + diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index ec29c7917..8fe37f2c3 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -29,14 +29,14 @@ #include #include #include -#include + +#include #include #include #include #include #include -#include #include #include #include @@ -49,14 +49,25 @@ #include #include #include +#include -using namespace villas; -using namespace villas::node; +namespace villas { +namespace node { +namespace tools { -class Direction { +class PipeDirection { +protected: + struct pool pool; + struct node *node; + struct io *io; + + std::thread thread; + + bool enabled; + int limit; public: - Direction(struct node *n, struct io *i, bool en = true, int lim = -1) : + PipeDirection(struct node *n, struct io *i, bool en = true, int lim = -1) : node(n), io(i), enabled(en), @@ -65,9 +76,6 @@ public: pool.state = STATE_DESTROYED; pool.queue.state = STATE_DESTROYED; - /* Initialize memory */ - - /* Initialize memory */ unsigned vec = LOG2_CEIL(MAX(node->out.vectorize, node->in.vectorize)); unsigned pool_size = node_type(node)->pool_size ? node_type(node)->pool_size : vec; @@ -77,206 +85,236 @@ public: throw RuntimeError("Failed to allocate memory for pool."); } - Direction(const Direction &c) - { - io = c.io; - } - - ~Direction() + ~PipeDirection() { pool_destroy(&pool); } - struct pool pool; - struct node *node; - struct io *io; + virtual void run() + { - pthread_t thread; + } - bool enabled; - int limit; + void startThread() + { + if (enabled) + thread = std::thread(&villas::node::tools::PipeDirection::run, this); + } + + void stopThread() + { + thread.join(); + } }; -struct Directions { - Direction send; - Direction recv; -}; +class PipeSendDirection : public PipeDirection { -static std::atomic stop(false); +public: + PipeSendDirection(struct node *n, struct io *i, bool en = true, int lim = -1) : + PipeDirection(n, i, en, lim) + { } -static void quit(int signal, siginfo_t *sinfo, void *ctx) -{ - Logger logger = logging.get("pipe"); + virtual void run() + { + Logger logger = logging.get("pipe:send"); - switch (signal) { - case SIGALRM: - logger->info("Reached timeout. Terminating..."); - break; + unsigned last_sequenceno = 0, release; + int scanned, sent, allocated, cnt = 0; - default: - logger->info("Received {} signal. Terminating...", strsignal(signal)); - break; - } + struct sample *smps[node->out.vectorize]; - stop = true; -} + while (node->state == STATE_STARTED && !io_eof(io)) { + allocated = sample_alloc_many(&pool, smps, node->out.vectorize); + if (allocated < 0) + throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize); + else if (allocated < (int) node->out.vectorize) + logger->warn("Send pool underrun"); -static void usage() -{ - std::cout << "Usage: villas-pipe [OPTIONS] CONFIG NODE" << std::endl - << " CONFIG path to a configuration file" << std::endl - << " NODE the name of the node to which samples are sent and received from" << std::endl - << " OPTIONS are:" << std::endl - << " -f FMT set the format" << std::endl - << " -t DT the data-type format string" << std::endl - << " -o OPTION=VALUE overwrite options in config file" << std::endl - << " -x swap read / write endpoints" << std::endl - << " -s only read data from stdin and send it to node" << std::endl - << " -r only read data from node and write it to stdout" << std::endl - << " -T NUM terminate after NUM seconds" << std::endl - << " -L NUM terminate after NUM samples sent" << std::endl - << " -l NUM terminate after NUM samples received" << std::endl - << " -h show this usage information" << std::endl - << " -d set logging level" << std::endl - << " -V show the version of the tool" << std::endl << std::endl; + scanned = io_scan(io, smps, allocated); + if (scanned < 0) { + logger->warn("Failed to read samples from stdin"); + continue; + } + else if (scanned == 0) + continue; - print_copyright(); -} + /* Fill in missing sequence numbers */ + for (int i = 0; i < scanned; i++) { + if (smps[i]->flags & SAMPLE_HAS_SEQUENCE) + last_sequenceno = smps[i]->sequence; + else + smps[i]->sequence = last_sequenceno++; + } -static void * send_loop(void *ctx) -{ - Directions *dirs = static_cast(ctx); - Logger logger = logging.get("pipe"); + release = allocated; - unsigned last_sequenceno = 0, release; - int scanned, sent, allocated, cnt = 0; + sent = node_write(node, smps, scanned, &release); - struct node *node = dirs->send.node; - struct sample *smps[node->out.vectorize]; + sample_decref_many(smps, release); - while (node->state == STATE_STARTED && !io_eof(dirs->send.io)) { - allocated = sample_alloc_many(&dirs->send.pool, smps, node->out.vectorize); - if (allocated < 0) - throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize); - else if (allocated < (int) node->out.vectorize) - logger->warn("Send pool underrun"); - - scanned = io_scan(dirs->send.io, smps, allocated); - if (scanned < 0) { - logger->warn("Failed to read samples from stdin"); - continue; - } - else if (scanned == 0) - continue; - - /* Fill in missing sequence numbers */ - for (int i = 0; i < scanned; i++) { - if (smps[i]->flags & SAMPLE_HAS_SEQUENCE) - last_sequenceno = smps[i]->sequence; - else - smps[i]->sequence = last_sequenceno++; - } - - release = allocated; - - sent = node_write(node, smps, scanned, &release); - - sample_decref_many(smps, release); - - cnt += sent; - if (dirs->send.limit > 0 && cnt >= dirs->send.limit) - goto leave; - - pthread_testcancel(); - } - -leave: if (io_eof(dirs->send.io)) { - if (dirs->recv.limit < 0) { - logger->info("Reached end-of-file. Terminating..."); - stop = true; - } - else - logger->info("Reached end-of-file. Wait for receive side..."); - } - else { - logger->info("Reached send limit. Terminating..."); - stop = true; - } - - return nullptr; -} - -static void * recv_loop(void *ctx) -{ - Directions *dirs = static_cast(ctx); - Logger logger = logging.get("pipe"); - - int recv, cnt = 0, allocated = 0; - unsigned release; - struct node *node = dirs->recv.node; - struct sample *smps[node->in.vectorize]; - - while (node->state == STATE_STARTED) { - allocated = sample_alloc_many(&dirs->recv.pool, smps, node->in.vectorize); - if (allocated < 0) - throw RuntimeError("Failed to allocate {} samples from receive pool.", node->in.vectorize); - else if (allocated < (int) node->in.vectorize) - logger->warn("Receive pool underrun: allocated only {} of {} samples", allocated, node->in.vectorize); - - release = allocated; - - recv = node_read(node, smps, allocated, &release); - if (recv < 0) { - if (node->state == STATE_STOPPING) - goto leave2; - else - logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv); - } - else { - io_print(dirs->recv.io, smps, recv); - - cnt += recv; - if (dirs->recv.limit > 0 && cnt >= dirs->recv.limit) + cnt += sent; + if (limit > 0 && cnt >= limit) goto leave; } - sample_decref_many(smps, release); - pthread_testcancel(); +leave: if (io_eof(io)) { + if (limit < 0) { + logger->info("Reached end-of-file. Terminating..."); + raise(SIGINT); + } + else + logger->info("Reached end-of-file. Wait for receive side..."); + } + else { + logger->info("Reached send limit. Terminating..."); + raise(SIGINT); + } + } +}; + +class PipeReceiveDirection : public PipeDirection { + +public: + PipeReceiveDirection(struct node *n, struct io *i, bool en = true, int lim = -1) : + PipeDirection(n, i, en, lim) + { } + + virtual void run() + { + Logger logger = logging.get("pipe:recv"); + + int recv, cnt = 0, allocated = 0; + unsigned release; + struct sample *smps[node->in.vectorize]; + + while (node->state == STATE_STARTED) { + allocated = sample_alloc_many(&pool, smps, node->in.vectorize); + if (allocated < 0) + throw RuntimeError("Failed to allocate {} samples from receive pool.", node->in.vectorize); + else if (allocated < (int) node->in.vectorize) + logger->warn("Receive pool underrun: allocated only {} of {} samples", allocated, node->in.vectorize); + + release = allocated; + + recv = node_read(node, smps, allocated, &release); + if (recv < 0) { + if (node->state == STATE_STOPPING) + goto leave2; + else + logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv); + } + else { + io_print(io, smps, recv); + + cnt += recv; + if (limit > 0 && cnt >= limit) + goto leave; + } + + sample_decref_many(smps, release); + } + +leave: logger->info("Reached receive limit. Terminating..."); +leave2: raise(SIGINT); + } +}; + +class Pipe : public Tool { + +public: + Pipe(int argc, char *argv[]) : + Tool(argc, argv, "pipe"), + stop(false), + timeout(0), + reverse(false), + format("villas.human"), + dtypes("64f"), + enable_send(true), + enable_recv(true), + limit_send(-1), + limit_recv(-1) + { + int ret; + + ret = memory_init(DEFAULT_NR_HUGEPAGES); + if (ret) + throw RuntimeError("Failed to initialize memory"); + + io.state = STATE_DESTROYED; + + cfg_cli = json_object(); } -leave: logger->info("Reached receive limit. Terminating..."); -leave2: stop = true; + ~Pipe() + { + json_decref(cfg_cli); + } - return nullptr; -} +protected: + std::atomic stop; -int main(int argc, char *argv[]) -{ - Logger logger = logging.get("pipe"); + SuperNode sn; /**< The global configuration */ + struct io io; - try { - int ret, timeout = 0; - bool reverse = false; - const char *format = "villas.human"; - const char *dtypes = "64f"; + int timeout; + bool reverse; + std::string format; + std::string dtypes; + std::string uri; + std::string nodestr; - struct node *node; - static struct io io = { .state = STATE_DESTROYED }; + json_t *cfg_cli; - SuperNode sn; /**< The global configuration */ + bool enable_send = true; + bool enable_recv = true; + int limit_send = -1; + int limit_recv = -1; - json_t *cfg_cli = json_object(); + void handler(int signal, siginfo_t *sinfo, void *ctx) + { + switch (signal) { + case SIGALRM: + logger->info("Reached timeout. Terminating..."); + break; - bool enable_send = true, enable_recv = true; - int limit_send = -1, limit_recv = -1; + default: + logger->info("Received {} signal. Terminating...", strsignal(signal)); + break; + } - /* Parse optional command line arguments */ - int c; + stop = true; + } + + void usage() + { + std::cout << "Usage: villas-pipe [OPTIONS] CONFIG NODE" << std::endl + << " CONFIG path to a configuration file" << std::endl + << " NODE the name of the node to which samples are sent and received from" << std::endl + << " OPTIONS are:" << std::endl + << " -f FMT set the format" << std::endl + << " -t DT the data-type format string" << std::endl + << " -o OPTION=VALUE overwrite options in config file" << std::endl + << " -x swap read / write endpoints" << std::endl + << " -s only read data from stdin and send it to node" << std::endl + << " -r only read data from node and write it to stdout" << std::endl + << " -T NUM terminate after NUM seconds" << std::endl + << " -L NUM terminate after NUM samples sent" << std::endl + << " -l NUM terminate after NUM samples received" << std::endl + << " -h show this usage information" << std::endl + << " -d set logging level" << std::endl + << " -V show the version of the tool" << std::endl << std::endl; + + printCopyright(); + } + + void parse() + { + int c, ret; char *endptr; while ((c = getopt(argc, argv, "Vhxrsd:l:L:T:f:t:o:")) != -1) { switch (c) { case 'V': - print_version(); + printVersion(); exit(EXIT_SUCCESS); case 'f': @@ -338,30 +376,29 @@ check: if (optarg == endptr) exit(EXIT_FAILURE); } - logger->info("Logging level: {}", logging.getLevelName()); + uri = argv[optind]; + nodestr = argv[optind+1]; + } - char *uri = argv[optind]; - char *nodestr = argv[optind+1]; + int main() + { + int ret; + + struct node *node; struct format_type *ft; - ret = memory_init(0); - if (ret) - throw RuntimeError("Failed to intialize memory"); + logger->info("Logging level: {}", logging.getLevelName()); - ret = utils::signals_init(quit); - if (ret) - throw RuntimeError("Failed to initialize signals"); - - if (uri) + if (!uri.empty()) sn.parse(uri); else logger->warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance."); - ft = format_type_lookup(format); + ft = format_type_lookup(format.c_str()); if (!ft) throw RuntimeError("Invalid format: {}", format); - ret = io_init2(&io, ft, dtypes, SAMPLE_HAS_ALL); + ret = io_init2(&io, ft, dtypes.c_str(), SAMPLE_HAS_ALL); if (ret) throw RuntimeError("Failed to initialize IO"); @@ -406,36 +443,19 @@ check: if (optarg == endptr) if (ret) throw RuntimeError("Failed to start node {}: reason={}", node_name(node), ret); - /* Start threads */ - Directions dirs = { - .send = Direction(node, &io, enable_send, limit_send), - .recv = Direction(node, &io, enable_recv, limit_recv) - }; + PipeReceiveDirection recv_dir(node, &io, enable_recv, limit_recv); + PipeSendDirection send_dir(node, &io, enable_recv, limit_recv); - if (dirs.recv.enabled) { - dirs.recv.node = node; - pthread_create(&dirs.recv.thread, nullptr, recv_loop, &dirs); - } - - if (dirs.send.enabled) { - dirs.send.node = node; - pthread_create(&dirs.send.thread, nullptr, send_loop, &dirs); - } + recv_dir.startThread(); + send_dir.startThread(); alarm(timeout); while (!stop) sleep(1); - if (dirs.recv.enabled) { - pthread_cancel(dirs.recv.thread); - pthread_join(dirs.recv.thread, nullptr); - } - - if (dirs.send.enabled) { - pthread_cancel(dirs.send.thread); - pthread_join(dirs.send.thread, nullptr); - } + recv_dir.stopThread(); + send_dir.stopThread(); ret = node_stop(node); if (ret) @@ -455,15 +475,20 @@ check: if (optarg == endptr) if (ret) throw RuntimeError("Failed to destroy IO"); - logger->info(CLR_GRN("Goodbye!")); - return 0; } - catch (std::runtime_error &e) { - logger->error("{}", e.what()); +}; - return -1; - } + +} // namespace tools +} // namespace node +} // namespace villas + +int main(int argc, char *argv[]) +{ + auto t = villas::node::tools::Pipe(argc, argv); + + return t.run(); } /** @} */ diff --git a/src/villas-relay.cpp b/src/villas-relay.cpp index aeba1137e..31cc660fd 100644 --- a/src/villas-relay.cpp +++ b/src/villas-relay.cpp @@ -31,95 +31,26 @@ #include #include -#include +#include #include +#include +#include #include -#include #include "villas-relay.hpp" -/** The libwebsockets server context. */ -static lws_context *context; -/** The libwebsockets vhost. */ -static lws_vhost *vhost; +namespace villas { +namespace node { +namespace tools { -std::map sessions; - -using Logger = villas::Logger; - -/* Default options */ -struct Options opts = { - .loopback = false, - .port = 8088, - .protocol = "live" -}; - -/** List of libwebsockets protocols. */ -lws_protocols protocols[] = { - { - .name = "http", - .callback = lws_callback_http_dummy, - .per_session_data_size = 0, - .rx_buffer_size = 1024 - }, - { - .name = "http-api", - .callback = http_protocol_cb, - .per_session_data_size = 0, - .rx_buffer_size = 1024 - }, - { - .name = "live", - .callback = protocol_cb, - .per_session_data_size = sizeof(Connection), - .rx_buffer_size = 0 - }, - { nullptr /* terminator */ } -}; - -/** List of libwebsockets extensions. */ -static const lws_extension extensions[] = { - { - "permessage-deflate", - lws_extension_callback_pm_deflate, - "permessage-deflate" - }, - { - "deflate-frame", - lws_extension_callback_pm_deflate, - "deflate_frame" - }, - { nullptr /* terminator */ } -}; - -static const lws_http_mount mount = { - .mount_next = nullptr, /* linked-list "next" */ - .mountpoint = "/api/v1", /* mountpoint URL */ - .origin = nullptr, /* protocol */ - .def = nullptr, - .protocol = "http-api", - .cgienv = nullptr, - .extra_mimetypes = nullptr, - .interpret = nullptr, - .cgi_timeout = 0, - .cache_max_age = 0, - .auth_mask = 0, - .cache_reusable = 0, - .cache_revalidate = 0, - .cache_intermediaries = 0, - .origin_protocol = LWSMPRO_CALLBACK, /* dynamic */ - .mountpoint_len = 7, /* char count */ - .basic_auth_login_file =nullptr, -}; - -Session::Session(Identifier sid) : +RelaySession::RelaySession(Identifier sid) : identifier(sid), connects(0) { Logger logger = villas::logging.get("console"); - logger->info("Session created: {}", identifier); + logger->info("RelaySession created: {}", identifier); sessions[sid] = this; @@ -128,16 +59,16 @@ Session::Session(Identifier sid) : uuid_generate(uuid); } -Session::~Session() +RelaySession::~RelaySession() { Logger logger = villas::logging.get("console"); - logger->info("Session destroyed: {}", identifier); + logger->info("RelaySession destroyed: {}", identifier); sessions.erase(identifier); } -Session * Session::get(lws *wsi) +RelaySession * RelaySession::get(lws *wsi) { Logger logger = villas::logging.get("console"); @@ -157,7 +88,7 @@ Session * Session::get(lws *wsi) auto it = sessions.find(sid); if (it == sessions.end()) { - return new Session(sid); + return new RelaySession(sid); } else { logger->info("Found existing session: {}", sid); @@ -166,7 +97,7 @@ Session * Session::get(lws *wsi) } } -json_t * Session::toJson() const +json_t * RelaySession::toJson() const { json_t *json_connections = json_array(); @@ -188,18 +119,21 @@ json_t * Session::toJson() const ); } -Connection::Connection(lws *w) : +std::map RelaySession::sessions; + +RelayConnection::RelayConnection(lws *w, bool lo) : wsi(w), currentFrame(std::make_shared()), outgoingFrames(), bytes_recv(0), bytes_sent(0), frames_recv(0), - frames_sent(0) + frames_sent(0), + loopback(lo) { Logger logger = villas::logging.get("console"); - session = Session::get(wsi); + session = RelaySession::get(wsi); session->connections[wsi] = this; session->connects++; @@ -210,11 +144,11 @@ Connection::Connection(lws *w) : logger->info("New connection established: session={}, remote={} ({})", session->identifier, name, ip); } -Connection::~Connection() +RelayConnection::~RelayConnection() { Logger logger = villas::logging.get("console"); - logger->info("Connection closed: session={}, remote={} ({})", session->identifier, name, ip); + logger->info("RelayConnection closed: session={}, remote={} ({})", session->identifier, name, ip); session->connections.erase(wsi); @@ -222,7 +156,7 @@ Connection::~Connection() delete session; } -json_t * Connection::toJson() const +json_t * RelayConnection::toJson() const { return json_pack("{ s: s, s: s, s: I, s: I, s: I, s: I, s: I }", "name", name, @@ -235,7 +169,7 @@ json_t * Connection::toJson() const ); } -void Connection::write() +void RelayConnection::write() { int ret; @@ -254,7 +188,7 @@ void Connection::write() lws_callback_on_writable(wsi); } -void Connection::read(void *in, size_t len) +void RelayConnection::read(void *in, size_t len) { Logger logger = villas::logging.get("console"); @@ -264,14 +198,14 @@ void Connection::read(void *in, size_t len) if (lws_is_final_fragment(wsi)) { frames_recv++; - logger->debug("Received frame, relaying to {} connections", session->connections.size() - (opts.loopback ? 0 : 1)); + logger->debug("Received frame, relaying to {} connections", session->connections.size() - (loopback ? 0 : 1)); for (auto p : session->connections) { auto c = p.second; /* We skip the current connection in order * to avoid receiving our own data */ - if (opts.loopback == false && c == this) + if (loopback == false && c == this) continue; c->outgoingFrames.push(currentFrame); @@ -283,7 +217,47 @@ void Connection::read(void *in, size_t len) } } -static void logger_cb(int level, const char *msg) +Relay::Relay(int argc, char *argv[]) : + Tool(argc, argv, "relay"), + stop(false), + loopback(false), + port(8088), + protocol("live") +{ + int ret; + + ret = memory_init(DEFAULT_NR_HUGEPAGES); + if (ret) + throw RuntimeError("Failed to initialize memory"); + + /* Initialize logging */ + spdlog::stdout_color_mt("lws"); + lws_set_log_level((1 << LLL_COUNT) - 1, logger_cb); + + protocols = { + { + .name = "http", + .callback = lws_callback_http_dummy, + .per_session_data_size = 0, + .rx_buffer_size = 1024 + }, + { + .name = "http-api", + .callback = http_protocol_cb, + .per_session_data_size = 0, + .rx_buffer_size = 1024 + }, + { + .name = "live", + .callback = protocol_cb, + .per_session_data_size = sizeof(RelayConnection), + .rx_buffer_size = 0 + }, + { nullptr /* terminator */ } + }; +} + +void Relay::logger_cb(int level, const char *msg) { auto log = spdlog::get("lws"); @@ -296,20 +270,34 @@ static void logger_cb(int level, const char *msg) level = LLL_WARN; switch (level) { - case LLL_ERR: log->error("{}", msg); break; - case LLL_WARN: log->warn( "{}", msg); break; - case LLL_INFO: log->info( "{}", msg); break; - default: log->debug("{}", msg); break; + case LLL_ERR: + log->error("{}", msg); + break; + + case LLL_WARN: + log->warn( "{}", msg); + break; + + case LLL_INFO: + log->info( "{}", msg); + break; + + default: + log->debug("{}", msg); + break; } } -int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +int Relay::http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { int ret; size_t json_len; json_t *json_sessions, *json_body; - Logger logger = villas::logging.get("console"); + lws_context *ctx = lws_get_context(wsi); + void *user_ctx = lws_context_user(ctx); + + Relay *r = reinterpret_cast(user_ctx); unsigned char buf[LWS_PRE + 2048], *start = &buf[LWS_PRE], *end = &buf[sizeof(buf) - LWS_PRE - 1], *p = start; @@ -320,6 +308,7 @@ int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, voi LWS_ILLEGAL_HTTP_CONTENT_LEN, /* no content len */ &p, end)) return 1; + if (lws_finalize_write_http_header(wsi, start, &p, end)) return 1; @@ -331,7 +320,7 @@ int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, voi case LWS_CALLBACK_HTTP_WRITEABLE: json_sessions = json_array(); - for (auto it : sessions) { + for (auto it : RelaySession::sessions) { auto &session = it.second; json_array_append(json_sessions, session->toJson()); @@ -345,9 +334,9 @@ int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, voi "version", PROJECT_VERSION_STR, "hostname", hname, "options", - "loopback", opts.loopback, - "port", opts.port, - "protocol", opts.protocol + "loopback", r->loopback, + "port", r->port, + "protocol", r->protocol.c_str() ); json_len = json_dumpb(json_body, (char *) buf + LWS_PRE, sizeof(buf) - LWS_PRE, JSON_INDENT(4)); @@ -356,7 +345,7 @@ int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, voi if (ret < 0) return ret; - logger->info("Handled API request"); + r->logger->info("Handled API request"); //if (lws_http_transaction_completed(wsi)) return -1; @@ -368,15 +357,19 @@ int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, voi return lws_callback_http_dummy(wsi, reason, user, in, len); } -int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +int Relay::protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { - Connection *c = reinterpret_cast(user); + lws_context *ctx = lws_get_context(wsi); + void *user_ctx = lws_context_user(ctx); + + Relay *r = reinterpret_cast(user_ctx); + RelayConnection *c = reinterpret_cast(user); switch (reason) { case LWS_CALLBACK_ESTABLISHED: try { - new (c) Connection(wsi); + new (c) RelayConnection(wsi, r->loopback); } catch (InvalidUrlException &e) { lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *) "Invalid URL", strlen("Invalid URL")); @@ -386,7 +379,7 @@ int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in break; case LWS_CALLBACK_CLOSED: - c->~Connection(); + c->~RelayConnection(); break; case LWS_CALLBACK_SERVER_WRITEABLE: @@ -404,104 +397,141 @@ int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in return 0; } -static void usage() +void Relay::usage() { std::cout << "Usage: villas-relay [OPTIONS]" << std::endl - << " OPTIONS is one or more of the following options:" << std::endl - << " -d LVL set debug level" << std::endl - << " -p PORT the port number to listen on" << std::endl - << " -P PROT the websocket protocol" << std::endl - << " -l enable loopback of own data" << std::endl - << " -V show version and exit" << std::endl - << " -h show usage and exit" << std::endl << std::endl; + << " OPTIONS is one or more of the following options:" << std::endl + << " -d LVL set debug level" << std::endl + << " -p PORT the port number to listen on" << std::endl + << " -P PROT the websocket protocol" << std::endl + << " -l enable loopback of own data" << std::endl + << " -V show version and exit" << std::endl + << " -h show usage and exit" << std::endl << std::endl; - villas::print_copyright(); + printCopyright(); } +void Relay::parse() +{ + char c, *endptr; + while ((c = getopt (argc, argv, "hVp:P:ld:")) != -1) { + switch (c) { + case 'd': + spdlog::set_level(spdlog::level::from_str(optarg)); + break; + + case 'p': + port = strtoul(optarg, &endptr, 10); + goto check; + + case 'P': + protocol = optarg; + break; + + case 'l': + loopback = true; + break; + + case 'V': + printVersion(); + exit(EXIT_SUCCESS); + + case 'h': + case '?': + usage(); + exit(c == '?' ? EXIT_FAILURE : EXIT_SUCCESS); + } + + continue; + +check: if (optarg == endptr) { + logger->error("Failed to parse parse option argument '-{} {}'", c, optarg); + exit(EXIT_FAILURE); + } + } + + if (argc - optind < 0) { + usage(); + exit(EXIT_FAILURE); + } +} + +int Relay::main() { + /* Start server */ + lws_context_creation_info ctx_info = { 0 }; + + protocols[2].name = protocol.c_str(); + + ctx_info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + ctx_info.gid = -1; + ctx_info.uid = -1; + ctx_info.protocols = protocols.data(); + ctx_info.extensions = extensions.data(); + ctx_info.port = port; + ctx_info.mounts = &mount; + ctx_info.user = (void *) this; + + context = lws_create_context(&ctx_info); + if (context == nullptr) { + logger->error("WebSocket: failed to initialize server context"); + exit(EXIT_FAILURE); + } + + vhost = lws_create_vhost(context, &ctx_info); + if (vhost == nullptr) { + logger->error("WebSocket: failed to initialize virtual host"); + exit(EXIT_FAILURE); + } + + while (!stop) + lws_service(context, 100); + + return 0; +} + +const std::vector Relay::extensions = { + { + "permessage-deflate", + lws_extension_callback_pm_deflate, + "permessage-deflate" + }, + { + "deflate-frame", + lws_extension_callback_pm_deflate, + "deflate_frame" + }, + { nullptr /* terminator */ } +}; + +const lws_http_mount Relay::mount = { + .mount_next = nullptr, /* linked-list "next" */ + .mountpoint = "/api/v1", /* mountpoint URL */ + .origin = nullptr, /* protocol */ + .def = nullptr, + .protocol = "http-api", + .cgienv = nullptr, + .extra_mimetypes = nullptr, + .interpret = nullptr, + .cgi_timeout = 0, + .cache_max_age = 0, + .auth_mask = 0, + .cache_reusable = 0, + .cache_revalidate = 0, + .cache_intermediaries = 0, + .origin_protocol = LWSMPRO_CALLBACK, /* dynamic */ + .mountpoint_len = 7, /* char count */ + .basic_auth_login_file =nullptr, +}; + +} // namespace tools +} // namespace node +} // namespace villas + int main(int argc, char *argv[]) { - Logger logger = villas::logging.get("console"); + auto t = villas::node::tools::Relay(argc, argv); - try { - /* Initialize logging */ - spdlog::stdout_color_mt("lws"); - lws_set_log_level((1 << LLL_COUNT) - 1, logger_cb); - - /* Start server */ - lws_context_creation_info ctx_info = { 0 }; - - char c, *endptr; - while ((c = getopt (argc, argv, "hVp:P:ld:")) != -1) { - switch (c) { - case 'd': - spdlog::set_level(spdlog::level::from_str(optarg)); - break; - - case 'p': - opts.port = strtoul(optarg, &endptr, 10); - goto check; - - case 'P': - opts.protocol = strdup(optarg); - break; - - case 'l': - opts.loopback = true; - break; - - case 'V': - villas::print_version(); - exit(EXIT_SUCCESS); - - case 'h': - case '?': - usage(); - exit(c == '?' ? EXIT_FAILURE : EXIT_SUCCESS); - } - - continue; - -check: if (optarg == endptr) { - logger->error("Failed to parse parse option argument '-{} {}'", c, optarg); - exit(EXIT_FAILURE); - } - } - - if (argc - optind < 0) { - usage(); - exit(EXIT_FAILURE); - } - - protocols[2].name = opts.protocol; - - ctx_info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; - ctx_info.gid = -1; - ctx_info.uid = -1; - ctx_info.protocols = protocols; - ctx_info.extensions = extensions; - ctx_info.port = opts.port; - ctx_info.mounts = &mount; - - context = lws_create_context(&ctx_info); - if (context == nullptr) { - logger->error("WebSocket: failed to initialize server context"); - exit(EXIT_FAILURE); - } - - vhost = lws_create_vhost(context, &ctx_info); - if (vhost == nullptr) { - logger->error("WebSocket: failed to initialize virtual host"); - exit(EXIT_FAILURE); - } - - for (;;) - lws_service(context, 100); - - return 0; - } - catch (std::runtime_error &e) { - logger->error("{}", e.what()); - - return -1; - } + return t.run(); } + + diff --git a/src/villas-relay.hpp b/src/villas-relay.hpp index 878a9bc78..0308cefb8 100644 --- a/src/villas-relay.hpp +++ b/src/villas-relay.hpp @@ -33,19 +33,18 @@ #include +namespace villas { +namespace node { +namespace tools { + /* Forward declarations */ lws_callback_function protocol_cb, http_protocol_cb; -class Session; -class Connection; +class Relay; +class RelaySession; +class RelayConnection; class InvalidUrlException { }; -struct Options { - bool loopback; - int port; - const char *protocol; -}; - class Frame : public std::vector { public: Frame() { @@ -62,31 +61,38 @@ public: } }; -class Session { +class RelaySession { + + friend RelayConnection; + friend Relay; + +public: + typedef std::string Identifier; protected: time_t created; uuid_t uuid; -public: - typedef std::string Identifier; - - static Session * get(lws *wsi); - - Session(Identifier sid); - - ~Session(); - - json_t * toJson() const; Identifier identifier; - std::map connections; + std::map connections; int connects; + + static std::map sessions; + +public: + static RelaySession * get(lws *wsi); + + RelaySession(Identifier sid); + + ~RelaySession(); + + json_t * toJson() const; }; -class Connection { +class RelayConnection { protected: lws *wsi; @@ -95,7 +101,7 @@ protected: std::queue> outgoingFrames; - Session *session; + RelaySession *session; char name[128]; char ip[128]; @@ -107,13 +113,62 @@ protected: size_t frames_recv; size_t frames_sent; -public: - Connection(lws *w); + bool loopback; - ~Connection(); +public: + RelayConnection(lws *w, bool lo); + ~RelayConnection(); json_t * toJson() const; void write(); void read(void *in, size_t len); }; + +class Relay : public Tool { + +public: + Relay(int argc, char *argv[]); + +protected: + std::atomic stop; + + /** The libwebsockets server context. */ + lws_context *context; + + /** The libwebsockets vhost. */ + lws_vhost *vhost; + + bool loopback; + int port; + std::string protocol; + + /** List of libwebsockets protocols. */ + std::vector protocols; + + /** List of libwebsockets extensions. */ + static const std::vector extensions; + + static const lws_http_mount mount; + + static void logger_cb(int level, const char *msg); + + static int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + + static int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); + + void usage(); + + void parse(); + + int main(); + + void handler(int signal, siginfo_t *sinfo, void *ctx) + { + stop = true; + } +}; + +} // namespace tools +} // namespace node +} // namespace villas diff --git a/src/villas-signal.cpp b/src/villas-signal.cpp index 8cf72ff84..4079c6a37 100644 --- a/src/villas-signal.cpp +++ b/src/villas-signal.cpp @@ -30,11 +30,11 @@ #include #include +#include #include #include #include #include -#include #include #include #include @@ -45,162 +45,19 @@ using namespace villas; -static std::atomic stop(false); +namespace villas { +namespace node { +namespace tools { -static void usage() -{ - std::cout << "Usage: villas-signal [OPTIONS] SIGNAL" << std::endl - << " SIGNAL is on of the following signal types:" << std::endl - << " mixed" << std::endl - << " random" << std::endl - << " sine" << std::endl - << " triangle" << std::endl - << " square" << std::endl - << " ramp" << std::endl - << " constants" << std::endl - << " counter" << std::endl << std::endl - << " OPTIONS is one or more of the following options:" << std::endl - << " -d LVL set debug level" << std::endl - << " -f FMT set the format" << std::endl - << " -v NUM specifies how many values a message should contain" << std::endl - << " -r HZ how many messages per second" << std::endl - << " -n non real-time mode. do not throttle output." << std::endl - << " -F HZ the frequency of the signal" << std::endl - << " -a FLT the amplitude" << std::endl - << " -D FLT the standard deviation for 'random' signals" << std::endl - << " -o OFF the DC bias" << std::endl - << " -l NUM only send LIMIT messages and stop" << std::endl << std::endl; +class Signal : public Tool { - print_copyright(); -} - -json_t * parse_cli(int argc, char *argv[], char **format) -{ - Logger logger = logging.get("signal"); - - /* Default values */ - double rate = 10; - double frequency = 1; - double amplitude = 1; - double stddev = 0.02; - double offset = 0; - char *type; - int rt = 1; - int values = 1; - int limit = -1; - - /* Parse optional command line arguments */ - int c; - char *endptr; - while ((c = getopt(argc, argv, "v:r:F:f:l:a:D:no:d:hV")) != -1) { - switch (c) { - case 'n': - rt = 0; - break; - - case 'f': - *format = optarg; - break; - - case 'l': - limit = strtoul(optarg, &endptr, 10); - goto check; - - case 'v': - values = strtoul(optarg, &endptr, 10); - goto check; - - case 'r': - rate = strtof(optarg, &endptr); - goto check; - - case 'o': - offset = strtof(optarg, &endptr); - goto check; - - case 'F': - frequency = strtof(optarg, &endptr); - goto check; - - case 'a': - amplitude = strtof(optarg, &endptr); - goto check; - - case 'D': - stddev = strtof(optarg, &endptr); - goto check; - - case 'd': - logging.setLevel(optarg); - break; - - case 'V': - print_version(); - exit(EXIT_SUCCESS); - - case 'h': - case '?': - usage(); - exit(c == '?' ? EXIT_FAILURE : EXIT_SUCCESS); - } - - continue; - -check: if (optarg == endptr) - logger->warn("Failed to parse parse option argument '-{} {}'", c, optarg); - } - - if (argc != optind + 1) - return nullptr; - - type = argv[optind]; - - return json_pack("{ s: s, s: s, s: f, s: f, s: f, s: f, s: f, s: b, s: i, s: i }", - "type", "signal", - "signal", type, - "rate", rate, - "frequency", frequency, - "amplitude", amplitude, - "stddev", stddev, - "offset", offset, - "realtime", rt, - "values", values, - "limit", limit - ); -} - -static void quit(int signal, siginfo_t *sinfo, void *ctx) -{ - Logger logger = logging.get("signal"); - - switch (signal) { - case SIGALRM: - logger->info("Reached timeout. Terminating..."); - break; - - default: - logger->info("Received {} signal. Terminating...", strsignal(signal)); - } - - stop = true; -} - -int main(int argc, char *argv[]) -{ - Logger logger = logging.get("signal"); - - try { +public: + Signal(int argc, char *argv[]) : + Tool(argc, argv, "signal"), + stop(false), + format("villas.human") + { int ret; - json_t *cfg; - struct node_type *nt; - struct format_type *ft; - - char *format = (char *) "villas.human"; /** @todo hardcoded for now */ - - struct node n; - struct io io; - struct pool q; - struct sample *t; n.state = STATE_DESTROYED; n.in.state = STATE_DESTROYED; @@ -209,13 +66,164 @@ int main(int argc, char *argv[]) q.state = STATE_DESTROYED; q.queue.state = STATE_DESTROYED; - ret = utils::signals_init(quit); - if (ret) - throw RuntimeError("Failed to intialize signals"); - - ret = memory_init(0); + ret = memory_init(DEFAULT_NR_HUGEPAGES); if (ret) throw RuntimeError("Failed to initialize memory"); + } + +protected: + std::atomic stop; + + struct node n; + struct io io; + struct pool q; + + std::string format; + + void usage() + { + std::cout << "Usage: villas-signal [OPTIONS] SIGNAL" << std::endl + << " SIGNAL is on of the following signal types:" << std::endl + << " mixed" << std::endl + << " random" << std::endl + << " sine" << std::endl + << " triangle" << std::endl + << " square" << std::endl + << " ramp" << std::endl + << " constants" << std::endl + << " counter" << std::endl << std::endl + << " OPTIONS is one or more of the following options:" << std::endl + << " -d LVL set debug level" << std::endl + << " -f FMT set the format" << std::endl + << " -v NUM specifies how many values a message should contain" << std::endl + << " -r HZ how many messages per second" << std::endl + << " -n non real-time mode. do not throttle output." << std::endl + << " -F HZ the frequency of the signal" << std::endl + << " -a FLT the amplitude" << std::endl + << " -D FLT the standard deviation for 'random' signals" << std::endl + << " -o OFF the DC bias" << std::endl + << " -l NUM only send LIMIT messages and stop" << std::endl << std::endl; + + printCopyright(); + } + + json_t * parse_cli(int argc, char *argv[]) + { + /* Default values */ + double rate = 10; + double frequency = 1; + double amplitude = 1; + double stddev = 0.02; + double offset = 0; + std::string type; + int rt = 1; + int values = 1; + int limit = -1; + + /* Parse optional command line arguments */ + int c; + char *endptr; + while ((c = getopt(argc, argv, "v:r:F:f:l:a:D:no:d:hV")) != -1) { + switch (c) { + case 'n': + rt = 0; + break; + + case 'f': + format = optarg; + break; + + case 'l': + limit = strtoul(optarg, &endptr, 10); + goto check; + + case 'v': + values = strtoul(optarg, &endptr, 10); + goto check; + + case 'r': + rate = strtof(optarg, &endptr); + goto check; + + case 'o': + offset = strtof(optarg, &endptr); + goto check; + + case 'F': + frequency = strtof(optarg, &endptr); + goto check; + + case 'a': + amplitude = strtof(optarg, &endptr); + goto check; + + case 'D': + stddev = strtof(optarg, &endptr); + goto check; + + case 'd': + logging.setLevel(optarg); + break; + + case 'V': + printVersion(); + exit(EXIT_SUCCESS); + + case 'h': + case '?': + usage(); + exit(c == '?' ? EXIT_FAILURE : EXIT_SUCCESS); + } + + continue; + +check: if (optarg == endptr) + logger->warn("Failed to parse parse option argument '-{} {}'", c, optarg); + } + + if (argc != optind + 1) + return nullptr; + + type = argv[optind]; + + return json_pack("{ s: s, s: s, s: f, s: f, s: f, s: f, s: f, s: b, s: i, s: i }", + "type", "signal", + "signal", type.c_str(), + "rate", rate, + "frequency", frequency, + "amplitude", amplitude, + "stddev", stddev, + "offset", offset, + "realtime", rt, + "values", values, + "limit", limit + ); + } + + void handler(int signal, siginfo_t *sinfo, void *ctx) + { + Logger logger = logging.get("signal"); + + switch (signal) { + case SIGALRM: + logger->info("Reached timeout. Terminating..."); + break; + + default: + logger->info("Received {} signal. Terminating...", strsignal(signal)); + } + + stop = true; + } + + int main() + { + int ret; + json_t *cfg; + struct node_type *nt; + struct format_type *ft; + + struct sample *t; nt = node_type_lookup("signal"); if (!nt) @@ -225,7 +233,7 @@ int main(int argc, char *argv[]) if (ret) throw RuntimeError("Failed to initialize node"); - cfg = parse_cli(argc, argv, &format); + cfg = parse_cli(argc, argv); if (!cfg) { usage(); exit(EXIT_FAILURE); @@ -237,7 +245,7 @@ int main(int argc, char *argv[]) exit(EXIT_FAILURE); } - ft = format_type_lookup(format); + ft = format_type_lookup(format.c_str()); if (!ft) throw RuntimeError("Invalid output format '{}'", format); @@ -310,15 +318,19 @@ out: sample_decref(t); if (ret) throw RuntimeError("Failed to destroy pool"); - logger->info(CLR_GRN("Goodbye!")); - return 0; } - catch (std::runtime_error &e) { - logger->error("{}", e.what()); +}; - return -1; - } +} // namespace tools +} // namespace node +} // namespace villas + +int main(int argc, char *argv[]) +{ + auto t = villas::node::tools::Signal(argc, argv); + + return t.run(); } /** @} */ diff --git a/src/villas-test-cmp.cpp b/src/villas-test-cmp.cpp index 63c09749d..f06bfc5fe 100644 --- a/src/villas-test-cmp.cpp +++ b/src/villas-test-cmp.cpp @@ -26,38 +26,42 @@ #include +#include #include #include #include #include #include -#include #include #include #include using namespace villas; -class Side { +namespace villas { +namespace node { +namespace tools { + +class TestCmpSide { public: std::string path; + std::string dtypes; struct sample *sample; struct io io; struct format_type *format; - const char *dtypes; - Side(const std::string &pth, struct format_type *fmt, const char *dt, struct pool *p) : + TestCmpSide(const std::string &pth, struct format_type *fmt, const std::string &dt, struct pool *p) : path(pth), - format(fmt), - dtypes(dt) + dtypes(dt), + format(fmt) { int ret; io.state = STATE_DESTROYED; - ret = io_init2(&io, format, dtypes, 0); + ret = io_init2(&io, format, dtypes.c_str(), 0); if (ret) throw RuntimeError("Failed to initialize IO"); @@ -74,7 +78,7 @@ public: throw RuntimeError("Failed to allocate samples"); } - ~Side() noexcept(false) + ~TestCmpSide() noexcept(false) { int ret; @@ -90,46 +94,62 @@ public: } }; -static void usage() -{ - std::cout << "Usage: villas-test-cmp [OPTIONS] FILE1 FILE2 ... FILEn" << std::endl - << " FILE a list of files to compare" << std::endl - << " OPTIONS is one or more of the following options:" << std::endl - << " -d LVL adjust the debug level" << std::endl - << " -e EPS set epsilon for floating point comparisons to EPS" << std::endl - << " -v ignore data values" << std::endl - << " -T ignore timestamp" << std::endl - << " -s ignore sequence no" << std::endl - << " -f FMT file format for all files" << std::endl - << " -t DT the data-type format string" << std::endl - << " -h show this usage information" << std::endl - << " -V show the version of the tool" << std::endl << std::endl - << "Return codes:" << std::endl - << " 0 files are equal" << std::endl - << " 1 file length not equal" << std::endl - << " 2 sequence no not equal" << std::endl - << " 3 timestamp not equal" << std::endl - << " 4 number of values is not equal" << std::endl - << " 5 data is not equal" << std::endl << std::endl; +class TestCmp : public Tool { - print_copyright(); -} +public: + TestCmp(int argc, char *argv[]) : + Tool(argc, argv, "test-cmp"), + epsilon(1e-9), + format("villas.human"), + dtypes("64f"), + flags(SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA | SAMPLE_HAS_TS_ORIGIN) + { + pool.state = STATE_DESTROYED; -int main(int argc, char *argv[]) -{ - Logger logger = logging.get("test-cmp"); + int ret; - try { - int ret, rc = 0; + ret = memory_init(DEFAULT_NR_HUGEPAGES); + if (ret) + throw RuntimeError("Failed to initialize memory"); + } - /* Default values */ - double epsilon = 1e-9; - const char *format = "villas.human"; - const char *dtypes = "64f"; - int flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA | SAMPLE_HAS_TS_ORIGIN; +protected: + struct pool pool; - struct pool pool = { .state = STATE_DESTROYED }; + double epsilon; + std::string format; + std::string dtypes; + int flags; + std::vector filenames; + + void usage() + { + std::cout << "Usage: villas-test-cmp [OPTIONS] FILE1 FILE2 ... FILEn" << std::endl + << " FILE a list of files to compare" << std::endl + << " OPTIONS is one or more of the following options:" << std::endl + << " -d LVL adjust the debug level" << std::endl + << " -e EPS set epsilon for floating point comparisons to EPS" << std::endl + << " -v ignore data values" << std::endl + << " -T ignore timestamp" << std::endl + << " -s ignore sequence no" << std::endl + << " -f FMT file format for all files" << std::endl + << " -t DT the data-type format string" << std::endl + << " -h show this usage information" << std::endl + << " -V show the version of the tool" << std::endl << std::endl + << "Return codes:" << std::endl + << " 0 files are equal" << std::endl + << " 1 file length not equal" << std::endl + << " 2 sequence no not equal" << std::endl + << " 3 timestamp not equal" << std::endl + << " 4 number of values is not equal" << std::endl + << " 5 data is not equal" << std::endl << std::endl; + + printCopyright(); + } + + void parse() + { /* Parse Arguments */ int c; char *endptr; @@ -160,7 +180,7 @@ int main(int argc, char *argv[]) break; case 'V': - print_version(); + printVersion(); exit(EXIT_SUCCESS); case 'd': @@ -184,38 +204,41 @@ check: if (optarg == endptr) exit(EXIT_FAILURE); } - int eofs, line, failed; - int n = argc - optind; /* The number of files which we compare */ - Side *s[n]; + /* Open files */ + for (int i = 0; i < argc - optind; i++) + filenames.push_back(argv[optind + i]); + } - ret = memory_init(0); - if (ret) - throw RuntimeError("Failed to initialize memory system"); + int main() + { + int ret, rc = 0, line, failed; + unsigned eofs; - ret = pool_init(&pool, n, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), &memory_heap); - if (ret) - throw RuntimeError("Failed to initialize pool"); - - struct format_type *fmt = format_type_lookup(format); + struct format_type *fmt = format_type_lookup(format.c_str()); if (!fmt) throw RuntimeError("Invalid IO format: {}", format); /* Open files */ - for (int i = 0; i < n; i++) - s[i] = new Side(argv[optind + i], fmt, dtypes, &pool); + std::vector sides; + for (auto filename : filenames) + sides.push_back(new TestCmpSide(filename, fmt, dtypes, &pool)); + + ret = pool_init(&pool, sides.size(), SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), &memory_heap); + if (ret) + throw RuntimeError("Failed to initialize pool"); line = 0; for (;;) { /* Read next sample from all files */ retry: eofs = 0; - for (int i = 0; i < n; i++) { - ret = io_eof(&s[i]->io); + for (auto side : sides) { + ret = io_eof(&side->io); if (ret) eofs++; } if (eofs) { - if (eofs == n) + if (eofs == sides.size()) ret = 0; else { std::cout << "length unequal" << std::endl; @@ -226,8 +249,8 @@ retry: eofs = 0; } failed = 0; - for (int i = 0; i < n; i++) { - ret = io_scan(&s[i]->io, &s[i]->sample, 1); + for (auto side : sides) { + ret = io_scan(&side->io, &side->sample, 1); if (ret <= 0) failed++; } @@ -235,8 +258,8 @@ retry: eofs = 0; goto retry; /* We compare all files against the first one */ - for (int i = 1; i < n; i++) { - ret = sample_cmp(s[0]->sample, s[i]->sample, epsilon, flags); + for (auto side : sides) { + ret = sample_cmp(sides[0]->sample, side->sample, epsilon, flags); if (ret) { rc = ret; goto out; @@ -246,8 +269,8 @@ retry: eofs = 0; line++; } -out: for (int i = 0; i < n; i++) - delete s[i]; +out: for (auto side : sides) + delete side; ret = pool_destroy(&pool); if (ret) @@ -255,9 +278,15 @@ out: for (int i = 0; i < n; i++) return rc; } - catch (std::runtime_error &e) { - logger->error("{}", e.what()); +}; - return -1; - } +} // namespace tools +} // namespace node +} // namespace villas + +int main(int argc, char *argv[]) +{ + auto t = villas::node::tools::TestCmp(argc, argv); + + return t.run(); } diff --git a/src/villas-test-config.cpp b/src/villas-test-config.cpp index 60ccf489a..86fb50e15 100644 --- a/src/villas-test-config.cpp +++ b/src/villas-test-config.cpp @@ -22,8 +22,8 @@ #include +#include #include -#include #include #include #include @@ -33,27 +33,43 @@ using namespace villas; using namespace villas::node; -static void usage() -{ - std::cout << "Usage: villas-test-config [OPTIONS] CONFIG" << std::endl - << " CONFIG is the path to an optional configuration file" << std::endl - << " OPTIONS is one or more of the following options:" << std::endl - << " -d LVL set debug level" << std::endl - << " -V show version and exit" << std::endl - << " -h show usage and exit" << std::endl << std::endl; +namespace villas { +namespace node { +namespace tools { - print_copyright(); -} +class TestConfig : public Tool { -int main(int argc, char *argv[]) -{ - Logger logger = logging.get("test-config"); +public: + TestConfig(int argc, char *argv[]) : + Tool(argc, argv, "test-config"), + check(false) + { + int ret; - try { - SuperNode sn; + ret = memory_init(DEFAULT_NR_HUGEPAGES); + if (ret) + throw RuntimeError("Failed to initialize memory"); + } - bool check = false; +protected: + std::string uri; + bool check; + + void usage() + { + std::cout << "Usage: villas-test-config [OPTIONS] CONFIG" << std::endl + << " CONFIG is the path to an optional configuration file" << std::endl + << " OPTIONS is one or more of the following options:" << std::endl + << " -d LVL set debug level" << std::endl + << " -V show version and exit" << std::endl + << " -h show usage and exit" << std::endl << std::endl; + + printCopyright(); + } + + void parse() + { int c; while ((c = getopt (argc, argv, "hcV")) != -1) { switch (c) { @@ -62,7 +78,7 @@ int main(int argc, char *argv[]) break; case 'V': - print_version(); + printVersion(); exit(EXIT_SUCCESS); case 'h': @@ -77,26 +93,29 @@ int main(int argc, char *argv[]) exit(EXIT_FAILURE); } - sn.parse(argv[optind]); + uri = argv[optind]; + } + + int main() + { + SuperNode sn; + + sn.parse(uri); if (check) sn.check(); return 0; } - catch (ParseError &e) { - logger->error("{}", e.what()); +}; - return -1; - } - catch (ConfigError &e) { - logger->error("{}", e.what()); +} // namespace tools +} // namespace node +} // namespace villas - return -1; - } - catch (std::runtime_error &e) { - logger->error("{}", e.what()); +int main(int argc, char *argv[]) +{ + auto t = villas::node::tools::TestConfig(argc, argv); - return -1; - } + return t.run(); } diff --git a/src/villas-test-rtt.cpp b/src/villas-test-rtt.cpp index be033d822..d848eab74 100644 --- a/src/villas-test-rtt.cpp +++ b/src/villas-test-rtt.cpp @@ -30,10 +30,10 @@ #include #include +#include #include #include #include -#include #include #include #include @@ -47,59 +47,73 @@ using namespace villas; using namespace villas::node; -static std::atomic stop(false); +namespace villas { +namespace node { +namespace tools { -void quit(int signal, siginfo_t *sinfo, void *ctx) -{ - stop = true; -} +class TestRtt : public Tool { -static void usage() -{ - std::cout << "Usage: villas-test-rtt [OPTIONS] CONFIG NODE" << std::endl - << " CONFIG path to a configuration file" << std::endl - << " NODE name of the node which shoud be used" << std::endl - << " OPTIONS is one or more of the following options:" << std::endl - << " -c CNT send CNT messages" << std::endl - << " -f FD use file descriptor FD for result output instead of stdout" << std::endl - << " -b BKTS number of buckets for histogram" << std::endl - << " -w WMUP duration of histogram warmup phase" << std::endl - << " -h show this usage information" << std::endl - << " -V show the version of the tool" << std::endl << std::endl; - - print_copyright(); -} - -int main(int argc, char *argv[]) -{ - Logger logger = logging.get("test-rtt"); - - try { +public: + TestRtt(int argc, char *argv[]) : + Tool(argc, argv, "test-rtt"), + stop(false), + fd(STDOUT_FILENO), + count(-1), + hist_warmup(100), + hist_buckets(20) + { int ret; - struct hist hist; - struct timespec send, recv; + ret = memory_init(DEFAULT_NR_HUGEPAGES); + if (ret) + throw RuntimeError("Failed to initialize memory"); + } - struct sample *smp_send = (struct sample *) new char[SAMPLE_LENGTH(2)]; - struct sample *smp_recv = (struct sample *) new char[SAMPLE_LENGTH(2)]; +protected: + std::atomic stop; - struct node *node; + std::string uri; + std::string nodestr; - SuperNode sn; + SuperNode sn; - /* Test options */ - int count = -1; /**< Amount of messages which should be sent (default: -1 for unlimited) */ + /** File descriptor for Matlab results. + * This allows you to write Matlab results in a seperate log file: + * + * ./test etc/example.conf rtt -f 3 3>> measurement_results.m + */ + int fd; - hist_cnt_t hist_warmup = 100; - int hist_buckets = 20; + /**< Amount of messages which should be sent (default: -1 for unlimited) */ + int count; - /** File descriptor for Matlab results. - * This allows you to write Matlab results in a seperate log file: - * - * ./test etc/example.conf rtt -f 3 3>> measurement_results.m - */ - int fd = STDOUT_FILENO; + hist_cnt_t hist_warmup; + int hist_buckets; + + void handler(int signal, siginfo_t *sinfo, void *ctx) + { + stop = true; + } + + void usage() + { + std::cout << "Usage: villas-test-rtt [OPTIONS] CONFIG NODE" << std::endl + << " CONFIG path to a configuration file" << std::endl + << " NODE name of the node which shoud be used" << std::endl + << " OPTIONS is one or more of the following options:" << std::endl + << " -c CNT send CNT messages" << std::endl + << " -f FD use file descriptor FD for result output instead of stdout" << std::endl + << " -b BKTS number of buckets for histogram" << std::endl + << " -w WMUP duration of histogram warmup phase" << std::endl + << " -h show this usage information" << std::endl + << " -V show the version of the tool" << std::endl << std::endl; + + printCopyright(); + } + + void parse() + { /* Parse Arguments */ int c; char *endptr; @@ -122,7 +136,7 @@ int main(int argc, char *argv[]) goto check; case 'V': - print_version(); + printVersion(); exit(EXIT_SUCCESS); case 'd': @@ -146,14 +160,23 @@ check: if (optarg == endptr) exit(EXIT_FAILURE); } - char *uri = argv[optind]; - char *nodestr = argv[optind + 1]; + uri = argv[optind]; + nodestr = argv[optind + 1]; + } - ret = utils::signals_init(quit); - if (ret) - throw RuntimeError("Failed to initialize signals subsystem"); + int main() + { + int ret; - if (uri) + struct hist hist; + struct timespec send, recv; + + struct sample *smp_send = (struct sample *) new char[SAMPLE_LENGTH(2)]; + struct sample *smp_recv = (struct sample *) new char[SAMPLE_LENGTH(2)]; + + struct node *node; + + if (!uri.empty()) sn.parse(uri); else logger->warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance."); @@ -237,9 +260,15 @@ check: if (optarg == endptr) return 0; } - catch (std::runtime_error &e) { - logger->error("{}", e.what()); +}; - return -1; - } +} // namespace tools +} // namespace node +} // namespace villas + +int main(int argc, char *argv[]) +{ + auto t = villas::node::tools::TestRtt(argc, argv); + + return t.run(); } diff --git a/tools/zmq-keygen.cpp b/src/villas-zmq-keygen.cpp similarity index 64% rename from tools/zmq-keygen.cpp rename to src/villas-zmq-keygen.cpp index 2d4ede2ca..2c0bb124e 100644 --- a/tools/zmq-keygen.cpp +++ b/src/villas-zmq-keygen.cpp @@ -31,30 +31,55 @@ #include #include #include +#include #if ZMQ_VERSION_MAJOR < 4 || (ZMQ_VERSION_MAJOR == 4 && ZMQ_VERSION_MINOR <= 1) #include #endif +namespace villas { +namespace node { +namespace tools { + +class ZmqKeygen : public Tool { + +public: + ZmqKeygen(int argc, char *argv[]) : + Tool(argc, argv, "zmq-keygen") + { } + +protected: + int main() + { + int ret; + char public_key[41]; + char secret_key[41]; + + ret = zmq_curve_keypair(public_key, secret_key); + if (ret) { + if (zmq_errno() == ENOTSUP) + std::cout << "To use " << argv[0] << ", please install libsodium and then rebuild libzmq." << std::endl; + + exit(EXIT_FAILURE); + } + + std::cout << "# Copy these lines to your 'zeromq' node-configuration" << std::endl; + std::cout << "curve = {" << std::endl; + std::cout << "\tpublic_key = \"" << public_key << "\";" << std::endl; + std::cout << "\tsecret_key = \"" << secret_key << "\";" << std::endl; + std::cout << "}" << std::endl; + + return 0; + } +}; + +} // namespace tools +} // namespace node +} // namespace villas + int main(int argc, char *argv[]) { - int ret; - char public_key[41]; - char secret_key[41]; + auto t = villas::node::tools::ZmqKeygen(argc, argv); - ret = zmq_curve_keypair(public_key, secret_key); - if (ret) { - if (zmq_errno() == ENOTSUP) - std::cout << "To use " << argv[0] << ", please install libsodium and then rebuild libzmq." << std::endl; - - exit(EXIT_FAILURE); - } - - std::cout << "# Copy these lines to your 'zeromq' node-configuration" << std::endl; - std::cout << "curve = {" << std::endl; - std::cout << "\tpublic_key = \"" << public_key << "\";" << std::endl; - std::cout << "\tsecret_key = \"" << secret_key << "\";" << std::endl; - std::cout << "}" << std::endl; - - return 0; + return t.run(); } diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 6f918e026..c23009ac4 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -20,13 +20,6 @@ # along with this program. If not, see . ################################################################################### -if(WITH_CONFIG) - add_executable(conf2json conf2json.cpp) - target_link_libraries(conf2json PUBLIC villas) - - list(APPEND TOOLS conf2json) -endif() - if(CMAKE_SYSTEM_NAME STREQUAL Linux) add_executable(rmshm rmshm.cpp) target_link_libraries(rmshm PUBLIC Threads::Threads rt) @@ -37,14 +30,6 @@ if(CMAKE_SYSTEM_NAME STREQUAL Linux) list(APPEND TOOLS rmsem rmshm) endif() -if(LIBZMQ_FOUND) - add_executable(zmq-keygen zmq-keygen.cpp) - target_include_directories(zmq-keygen PUBLIC ${LIBZMQ_INCLUDE_DIRS}) - target_link_libraries(zmq-keygen PUBLIC PkgConfig::LIBZMQ) - - list(APPEND TOOLS zmq-keygen) -endif() - install( TARGETS ${TOOLS} COMPONENT tools