1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

use new villas::Tool class as common top-level entrypoint

This commit is contained in:
Steffen Vogel 2019-04-19 14:52:52 +02:00
parent a5f5e02bc3
commit 492004bf34
15 changed files with 1461 additions and 1139 deletions

View file

@ -23,108 +23,123 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <string.h>
#include <iostream>
#include <atomic>
#include <villas/node/config.h>
#include <villas/log.hpp>
#include <villas/node/exceptions.hpp>
#include <villas/node.h>
#include <villas/pool.h>
#include <villas/sample.h>
#include <villas/shmem.h>
#include <villas/colors.hpp>
#include <villas/utils.h>
#include <villas/tool.hpp>
#include <villas/log.hpp>
#include <villas/utils.hpp>
#include <villas/copyright.hpp>
#include <villas/node/exceptions.hpp>
using namespace villas;
namespace villas {
namespace node {
namespace tools {
static std::atomic<bool> stop(false);
class Shmem : public Tool {
static void usage()
{
std::cout << "Usage: villas-test-shmem WNAME VECTORIZE" << std::endl
<< " WNAME name of the shared memory object for the output queue" << std::endl
<< " RNAME name of the shared memory object for the input queue" << std::endl
<< " VECTORIZE maximum number of samples to read/write at a time" << std::endl;
public:
Shmem(int argc, char *argv[]) :
Tool(argc, argv, "shmem"),
stop(false)
{ }
print_copyright();
}
protected:
std::atomic<bool> stop;
void quit(int, siginfo_t*, void*)
{
stop = true;
}
void usage()
{
std::cout << "Usage: villas-test-shmem WNAME VECTORIZE" << std::endl
<< " WNAME name of the shared memory object for the output queue" << std::endl
<< " RNAME name of the shared memory object for the input queue" << std::endl
<< " VECTORIZE maximum number of samples to read/write at a time" << std::endl;
int main(int argc, char* argv[])
{
int ret, readcnt, writecnt, avail;
Logger logger = logging.get("test-shmem");
struct shmem_int shm;
struct shmem_conf conf = {
.polling = 0,
.queuelen = DEFAULT_SHMEM_QUEUELEN,
.samplelen = DEFAULT_SHMEM_SAMPLELEN
};
if (argc != 4) {
usage();
return 1;
printCopyright();
}
ret = utils::signals_init(quit);
if (ret)
throw RuntimeError("Failed to initialize signals");
char *wname = argv[1];
char *rname = argv[2];
int vectorize = atoi(argv[3]);
ret = shmem_int_open(wname, rname, &shm, &conf);
if (ret < 0)
throw RuntimeError("Failed to open shared-memory interface");
struct sample *insmps[vectorize], *outsmps[vectorize];
while (!stop) {
readcnt = shmem_int_read(&shm, insmps, vectorize);
if (readcnt == -1) {
logger->info("Node stopped, exiting");
break;
}
avail = shmem_int_alloc(&shm, outsmps, readcnt);
if (avail < readcnt)
logger->warn("Pool underrun: %d / %d\n", avail, readcnt);
for (int i = 0; i < avail; i++) {
outsmps[i]->sequence = insmps[i]->sequence;
outsmps[i]->ts = insmps[i]->ts;
int len = MIN(insmps[i]->length, outsmps[i]->capacity);
memcpy(outsmps[i]->data, insmps[i]->data, SAMPLE_DATA_LENGTH(len));
outsmps[i]->length = len;
}
for (int i = 0; i < readcnt; i++)
sample_decref(insmps[i]);
writecnt = shmem_int_write(&shm, outsmps, avail);
if (writecnt < avail)
logger->warn("Short write");
logger->info("Read / Write: {}/{}", readcnt, writecnt);
void handler(int, siginfo_t *, void *)
{
stop = true;
}
ret = shmem_int_close(&shm);
if (ret)
throw RuntimeError("Failed to close shared-memory interface");
int main()
{
int ret, readcnt, writecnt, avail;
logger->info(CLR_GRN("Goodbye!"));
struct shmem_int shm;
struct shmem_conf conf = {
.polling = 0,
.queuelen = DEFAULT_SHMEM_QUEUELEN,
.samplelen = DEFAULT_SHMEM_SAMPLELEN
};
return 0;
if (argc != 4) {
usage();
return 1;
}
std::string wname = argv[1];
std::string rname = argv[2];
int vectorize = atoi(argv[3]);
ret = shmem_int_open(wname.c_str(), rname.c_str(), &shm, &conf);
if (ret < 0)
throw RuntimeError("Failed to open shared-memory interface");
struct sample *insmps[vectorize], *outsmps[vectorize];
while (!stop) {
readcnt = shmem_int_read(&shm, insmps, vectorize);
if (readcnt == -1) {
logger->info("Node stopped, exiting");
break;
}
avail = shmem_int_alloc(&shm, outsmps, readcnt);
if (avail < readcnt)
logger->warn("Pool underrun: %d / %d\n", avail, readcnt);
for (int i = 0; i < avail; i++) {
outsmps[i]->sequence = insmps[i]->sequence;
outsmps[i]->ts = insmps[i]->ts;
int len = MIN(insmps[i]->length, outsmps[i]->capacity);
memcpy(outsmps[i]->data, insmps[i]->data, SAMPLE_DATA_LENGTH(len));
outsmps[i]->length = len;
}
for (int i = 0; i < readcnt; i++)
sample_decref(insmps[i]);
writecnt = shmem_int_write(&shm, outsmps, avail);
if (writecnt < avail)
logger->warn("Short write");
logger->info("Read / Write: {}/{}", readcnt, writecnt);
}
ret = shmem_int_close(&shm);
if (ret)
throw RuntimeError("Failed to close shared-memory interface");
return 0;
}
};
} // namespace tools
} // namespace node
} // namespace villas
int main(int argc, char *argv[])
{
auto t = villas::node::tools::Shmem(argc, argv);
return t.run();
}

View file

@ -20,6 +20,16 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
###################################################################################
set(SRCS
villas-node
villas-test-config
villas-test-rtt
villas-test-cmp
villas-convert
villas-pipe
villas-signal
)
add_executable(villas-node villas-node.cpp)
target_link_libraries(villas-node PUBLIC villas)
@ -29,24 +39,6 @@ target_link_libraries(villas-test-config PUBLIC villas)
add_executable(villas-test-rtt villas-test-rtt.cpp)
target_link_libraries(villas-test-rtt PUBLIC villas)
install(
TARGETS villas-node villas-test-rtt
COMPONENT bin
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)
if(WITH_WEB)
add_executable(villas-relay villas-relay.cpp)
target_include_directories(villas-relay PRIVATE ${LIBWEBSOCKETS_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIR} ${PROJECT_SOURCE_DIR}/common/include)
target_link_libraries(villas-relay PRIVATE PkgConfig::LIBWEBSOCKETS PkgConfig::UUID villas-common spdlog)
install(
TARGETS villas-relay
COMPONENT bin
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)
endif()
add_executable(villas-test-cmp villas-test-cmp.cpp)
target_link_libraries(villas-test-cmp PUBLIC villas)
@ -59,20 +51,37 @@ target_link_libraries(villas-pipe PUBLIC villas Threads::Threads)
add_executable(villas-signal villas-signal.cpp)
target_link_libraries(villas-signal PUBLIC villas)
install(
TARGETS villas-convert villas-pipe villas-signal villas-test-cmp
COMPONENT bin
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)
if(WITH_WEB)
add_executable(villas-relay villas-relay.cpp)
target_include_directories(villas-relay PRIVATE ${LIBWEBSOCKETS_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIR})
target_link_libraries(villas-relay PRIVATE PkgConfig::UUID villas)
list(APPEND SRCS villas-relay)
endif()
if(WITH_CONFIG)
add_executable(villas-conf2json villas-conf2json.cpp)
target_link_libraries(villas-conf2json PUBLIC villas)
list(APPEND SRCS villas-conf2json)
endif()
if(LIBZMQ_FOUND)
add_executable(villas-zmq-keygen villas-zmq-keygen.cpp)
target_link_libraries(villas-zmq-keygen PUBLIC villas-common PkgConfig::LIBZMQ)
list(APPEND SRC villas-zmq-keygen)
endif()
if(WITH_HOOKS)
add_executable(villas-hook villas-hook.cpp)
target_link_libraries(villas-hook PUBLIC villas)
install(
TARGETS villas-hook
COMPONENT bin
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)
list(APPEND SRCS villas-hook)
endif()
install(
TARGETS ${SRCS}
COMPONENT bin
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)

View file

@ -28,53 +28,78 @@
#include <villas/config.h>
#include <villas/config_helper.hpp>
#include <villas/utils.hpp>
#include <villas/copyright.hpp>
#include <villas/tool.hpp>
void usage()
{
std::cout << "Usage: conf2json input.conf > output.json" << std::endl << std::endl;
namespace villas {
namespace node {
namespace tools {
villas::print_copyright();
}
class Config2Json : public Tool {
public:
Config2Json(int argc, char *argv[]) :
Tool(argc, argv, "conf2json")
{ }
protected:
void usage()
{
std::cout << "Usage: conf2json input.conf > output.json" << std::endl << std::endl;
printCopyright();
}
int main()
{
int ret;
config_t cfg;
config_setting_t *cfg_root;
json_t *json;
if (argc != 2) {
usage();
exit(EXIT_FAILURE);
}
FILE *f = fopen(argv[1], "r");
if(f == nullptr)
return -1;
const char *confdir = dirname(argv[1]);
config_init(&cfg);
config_set_include_dir(&cfg, confdir);
ret = config_read(&cfg, f);
if (ret != CONFIG_TRUE)
return -2;
cfg_root = config_root_setting(&cfg);
json = config_to_json(cfg_root);
if (!json)
return -3;
ret = json_dumpf(json, stdout, JSON_INDENT(2)); fflush(stdout);
if (ret)
return ret;
json_decref(json);
config_destroy(&cfg);
return 0;
}
};
} // namespace tools
} // namespace node
} // namespace villas
int main(int argc, char *argv[])
{
int ret;
config_t cfg;
config_setting_t *cfg_root;
json_t *json;
auto t = villas::node::tools::Config2Json(argc, argv);
if (argc != 2) {
usage();
exit(EXIT_FAILURE);
}
FILE *f = fopen(argv[1], "r");
if(f == nullptr)
return -1;
const char *confdir = dirname(argv[1]);
config_init(&cfg);
config_set_include_dir(&cfg, confdir);
ret = config_read(&cfg, f);
if (ret != CONFIG_TRUE)
return -2;
cfg_root = config_root_setting(&cfg);
json = config_to_json(cfg_root);
if (!json)
return -3;
ret = json_dumpf(json, stdout, JSON_INDENT(2)); fflush(stdout);
if (ret)
return ret;
json_decref(json);
config_destroy(&cfg);
return 0;
return t.run();
}

View file

@ -26,54 +26,78 @@
#include <iostream>
#include <villas/tool.hpp>
#include <villas/utils.hpp>
#include <villas/log.hpp>
#include <villas/io.h>
#include <villas/sample.h>
#include <villas/plugin.h>
#include <villas/exceptions.hpp>
#include <villas/copyright.hpp>
using namespace villas;
static void usage()
{
std::cout << "Usage: villas-convert [OPTIONS]" << std::endl
<< " OPTIONS are:" << std::endl
<< " -i FMT set the input format" << std::endl
<< " -o FMT set the output format" << std::endl
<< " -t DT the data-type format string" << std::endl
<< " -d LVL set debug log level to LVL" << std::endl
<< " -h show this usage information" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl;
namespace villas {
namespace node {
namespace tools {
print_copyright();
}
class Convert : public Tool {
int main(int argc, char *argv[])
{
Logger logger = logging.get("test-rtt");
try {
public:
Convert(int argc, char *argv[]) :
Tool(argc, argv, "convert"),
dtypes("64f")
{
int ret;
const char *input_format = "villas.human";
const char *output_format = "villas.human";
const char *dtypes = "64f";
ret = memory_init(DEFAULT_NR_HUGEPAGES);
if (ret)
throw RuntimeError("Failed to initialize memory");
for (unsigned i = 0; i < ARRAY_LEN(dirs); i++) {
dirs[i].format = "villas.human";
dirs[i].io.state = STATE_DESTROYED;
}
}
protected:
std::string dtypes;
struct {
std::string name;
std::string format;
struct io io;
} dirs[2];
void usage()
{
std::cout << "Usage: villas-convert [OPTIONS]" << std::endl
<< " OPTIONS are:" << std::endl
<< " -i FMT set the input format" << std::endl
<< " -o FMT set the output format" << std::endl
<< " -t DT the data-type format string" << std::endl
<< " -d LVL set debug log level to LVL" << std::endl
<< " -h show this usage information" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl;
printCopyright();
}
void parse()
{
/* Parse optional command line arguments */
int c;
while ((c = getopt(argc, argv, "Vhd:i:o:t:")) != -1) {
switch (c) {
case 'V':
print_version();
printVersion();
exit(EXIT_SUCCESS);
case 'i':
input_format = optarg;
dirs[0].format = optarg;
break;
case 'o':
output_format = optarg;
dirs[1].format = optarg;
break;
case 't':
@ -95,36 +119,28 @@ int main(int argc, char *argv[])
usage();
exit(EXIT_FAILURE);
}
}
struct format_type *ft;
struct io input;
struct io output;
input.state = STATE_DESTROYED;
output.state = STATE_DESTROYED;
struct {
const char *name;
struct io *io;
} dirs[] = {
{ input_format, &input },
{ output_format, &output },
};
int main()
{
int ret;
for (unsigned i = 0; i < ARRAY_LEN(dirs); i++) {
ft = format_type_lookup(dirs[i].name);
if (!ft)
throw RuntimeError("Invalid format: {}", dirs[i].name);
struct format_type *ft;
ret = io_init2(dirs[i].io, ft, dtypes, SAMPLE_HAS_ALL);
ft = format_type_lookup(dirs[i].format.c_str());
if (!ft)
throw RuntimeError("Invalid format: {}", dirs[i].format);
ret = io_init2(&dirs[i].io, ft, dtypes.c_str(), SAMPLE_HAS_ALL);
if (ret)
throw RuntimeError("Failed to initialize IO: {}", dirs[i].name);
ret = io_check(dirs[i].io);
ret = io_check(&dirs[i].io);
if (ret)
throw RuntimeError("Failed to validate IO configuration");
ret = io_open(dirs[i].io, nullptr);
ret = io_open(&dirs[i].io, nullptr);
if (ret)
throw RuntimeError("Failed to open IO");
}
@ -132,32 +148,38 @@ int main(int argc, char *argv[])
struct sample *smp = sample_alloc_mem(DEFAULT_SAMPLE_LENGTH);
for (;;) {
ret = io_scan(&input, &smp, 1);
ret = io_scan(&dirs[0].io, &smp, 1);
if (ret == 0)
continue;
if (ret < 0)
break;
io_print(&output, &smp, 1);
io_print(&dirs[1].io, &smp, 1);
}
for (unsigned i = 0; i < ARRAY_LEN(dirs); i++) {
ret = io_close(dirs[i].io);
ret = io_close(&dirs[i].io);
if (ret)
throw RuntimeError("Failed to close IO");
ret = io_destroy(dirs[i].io);
ret = io_destroy(&dirs[i].io);
if (ret)
throw RuntimeError("Failed to destroy IO");
}
return 0;
}
catch (std::runtime_error &e) {
logger->error("{}", e.what());
};
return -1;
}
} // namespace tools
} // namespace node
} // namespace villas
int main(int argc, char *argv[])
{
auto t = villas::node::tools::Convert(argc, argv);
return t.run();
}
/** @} */

View file

@ -30,6 +30,7 @@
#include <atomic>
#include <unistd.h>
#include <villas/tool.hpp>
#include <villas/timing.h>
#include <villas/sample.h>
#include <villas/io.h>
@ -39,7 +40,6 @@
#include <villas/log.hpp>
#include <villas/colors.hpp>
#include <villas/exceptions.hpp>
#include <villas/copyright.hpp>
#include <villas/plugin.hpp>
#include <villas/plugin.h>
#include <villas/config_helper.hpp>
@ -50,66 +50,92 @@ using namespace villas;
using namespace villas::node;
using namespace villas::plugin;
static std::atomic<bool> stop(false);
namespace villas {
namespace node {
namespace tools {
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
stop = true;
}
class Hook : public Tool {
static void usage()
{
std::cout << "Usage: villas-hook [OPTIONS] NAME [[PARAM1] [PARAM2] ...]" << std::endl
<< " NAME the name of the hook function" << std::endl
<< " PARAM* a string of configuration settings for the hook" << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -f FMT the data format" << std::endl
<< " -t DT the data-type format string" << std::endl
<< " -d LVL set debug level to LVL" << std::endl
<< " -v CNT process CNT smps at once" << std::endl
<< " -h show this help" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl;
public:
Hook(int argc, char *argv[]) :
Tool(argc, argv, "hook"),
stop(false),
format("villas.human"),
dtypes("64f"),
cnt(1)
{
int ret;
#ifdef WITH_HOOKS
std::cout << "Supported hooks:" << std::endl;
for (Plugin *p : Registry::lookup<HookFactory>())
std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl;
std::cout << std::endl;
#endif /* WITH_HOOKS */
std::cout << "Supported IO formats:" << std::endl;
plugin_dump(PLUGIN_TYPE_FORMAT);
std::cout << std::endl;
std::cout << "Example:" << std::endl
<< " villas-signal random | villas-hook skip_first seconds=10" << std::endl
<< std::endl;
print_copyright();
}
int main(int argc, char *argv[])
{
int ret, recv, sent, cnt;
const char *format = "villas.human";
const char *dtypes = "64f";
struct format_type *ft;
struct sample **smps;
Logger logger = logging.get("hook");
try {
struct pool p;
struct io io;
ret = memory_init(DEFAULT_NR_HUGEPAGES);
if (ret)
throw RuntimeError("Failed to initialize memory");
p.state = STATE_DESTROYED;
io.state = STATE_DESTROYED;
/* Default values */
cnt = 1;
cfg_cli = json_object();
}
json_t *cfg_cli = json_object();
~Hook()
{
json_decref(cfg_cli);
}
protected:
std::atomic<bool> stop;
std::string hook;
std::string format;
std::string dtypes;
struct pool p;
struct io io;
int cnt;
json_t *cfg_cli;
void handler(int signal, siginfo_t *sinfo, void *ctx)
{
stop = true;
}
void usage()
{
std::cout << "Usage: villas-hook [OPTIONS] NAME [[PARAM1] [PARAM2] ...]" << std::endl
<< " NAME the name of the hook function" << std::endl
<< " PARAM* a string of configuration settings for the hook" << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -f FMT the data format" << std::endl
<< " -t DT the data-type format string" << std::endl
<< " -d LVL set debug level to LVL" << std::endl
<< " -v CNT process CNT smps at once" << std::endl
<< " -h show this help" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl;
#ifdef WITH_HOOKS
std::cout << "Supported hooks:" << std::endl;
for (Plugin *p : Registry::lookup<HookFactory>())
std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl;
std::cout << std::endl;
#endif /* WITH_HOOKS */
std::cout << "Supported IO formats:" << std::endl;
plugin_dump(PLUGIN_TYPE_FORMAT);
std::cout << std::endl;
std::cout << "Example:" << std::endl
<< " villas-signal random | villas-hook skip_first seconds=10" << std::endl
<< std::endl;
printCopyright();
}
void parse()
{
int ret;
/* Parse optional command line arguments */
int c;
@ -117,7 +143,7 @@ int main(int argc, char *argv[])
while ((c = getopt(argc, argv, "Vhv:d:f:t:o:")) != -1) {
switch (c) {
case 'V':
print_version();
printVersion();
exit(EXIT_SUCCESS);
case 'f':
@ -160,19 +186,19 @@ check: if (optarg == endptr)
exit(EXIT_FAILURE);
}
char *hook = argv[optind];
hook = argv[optind];
}
ret = utils::signals_init(quit);
if (ret)
throw RuntimeError("Failed to intialize signals");
int main()
{
int ret, recv, sent;
struct format_type *ft;
struct sample **smps;
if (cnt < 1)
throw RuntimeError("Vectorize option must be greater than 0");
ret = memory_init(DEFAULT_NR_HUGEPAGES);
if (ret)
throw RuntimeError("Failed to initialize memory");
smps = new struct sample*[cnt];
ret = pool_init(&p, 10 * cnt, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), &memory_hugepage);
@ -180,11 +206,11 @@ check: if (optarg == endptr)
throw RuntimeError("Failed to initilize memory pool");
/* Initialize IO */
ft = format_type_lookup(format);
ft = format_type_lookup(format.c_str());
if (!ft)
throw RuntimeError("Unknown IO format '{}'", format);
ret = io_init2(&io, ft, dtypes, SAMPLE_HAS_ALL);
ret = io_init2(&io, ft, dtypes.c_str(), SAMPLE_HAS_ALL);
if (ret)
throw RuntimeError("Failed to initialize IO");
@ -280,13 +306,17 @@ stop: sent = io_print(&io, smps, send);
if (ret)
throw RuntimeError("Failed to destroy memory pool");
logger->info(CLR_GRN("Goodbye!"));
return 0;
}
catch (std::runtime_error &e) {
logger->error("{}", e.what());
};
return -1;
}
} // namespace tools
} // namespace node
} // namespace villas
int main(int argc, char *argv[])
{
auto t = villas::node::tools::Hook(argc, argv);
return t.run();
}

View file

@ -27,6 +27,7 @@
#include <exception>
#include <atomic>
#include <villas/tool.hpp>
#include <villas/node/config.h>
#include <villas/version.hpp>
#include <villas/utils.hpp>
@ -42,7 +43,6 @@
#include <villas/web.hpp>
#include <villas/log.hpp>
#include <villas/exceptions.hpp>
#include <villas/copyright.hpp>
#include <villas/plugin.h>
#include <villas/kernel/kernel.hpp>
#include <villas/kernel/rt.hpp>
@ -55,74 +55,86 @@ using namespace villas;
using namespace villas::node;
using namespace villas::plugin;
SuperNode sn;
namespace villas {
namespace node {
namespace tools {
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
Logger logger = logging.get("node");
class Node : public Tool {
switch (signal) {
case SIGALRM:
logger->info("Reached timeout. Terminating...");
break;
public:
Node(int argc, char *argv[]) :
Tool(argc, argv, "node")
{
int ret;
default:
logger->info("Received {} signal. Terminating...", strsignal(signal));
ret = memory_init(DEFAULT_NR_HUGEPAGES);
if (ret)
throw RuntimeError("Failed to initialize memory");
}
sn.setState(STATE_STOPPING);
}
protected:
SuperNode sn;
static void usage()
{
std::cout << "Usage: villas-node [OPTIONS] [CONFIG]" << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -h show this usage information" << std::endl
<< " -d LVL set logging level" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl
<< " CONFIG is the path to an optional configuration file" << std::endl
<< " if omitted, VILLASnode will start without a configuration" << std::endl
<< " and wait for provisioning over the web interface." << std::endl << std::endl
std::string uri;
void handler(int signal, siginfo_t *sinfo, void *ctx)
{
switch (signal) {
case SIGALRM:
logger->info("Reached timeout. Terminating...");
break;
default:
logger->info("Received {} signal. Terminating...", strsignal(signal));
}
sn.setState(STATE_STOPPING);
}
void usage()
{
std::cout << "Usage: villas-node [OPTIONS] [CONFIG]" << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -h show this usage information" << std::endl
<< " -d LVL set logging level" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl
<< " CONFIG is the path to an optional configuration file" << std::endl
<< " if omitted, VILLASnode will start without a configuration" << std::endl
<< " and wait for provisioning over the web interface." << std::endl << std::endl
#ifdef ENABLE_OPAL_ASYNC
<< "Usage: villas-node OPAL_ASYNC_SHMEM_NAME OPAL_ASYNC_SHMEM_SIZE OPAL_PRINT_SHMEM_NAME" << std::endl
<< " This type of invocation is used by OPAL-RT Asynchronous processes." << std::endl
<< " See in the RT-LAB User Guide for more information." << std::endl << std::endl
<< "Usage: villas-node OPAL_ASYNC_SHMEM_NAME OPAL_ASYNC_SHMEM_SIZE OPAL_PRINT_SHMEM_NAME" << std::endl
<< " This type of invocation is used by OPAL-RT Asynchronous processes." << std::endl
<< " See in the RT-LAB User Guide for more information." << std::endl << std::endl
#endif /* ENABLE_OPAL_ASYNC */
<< "Supported node-types:" << std::endl;
plugin_dump(PLUGIN_TYPE_NODE);
std::cout << std::endl;
<< "Supported node-types:" << std::endl;
plugin_dump(PLUGIN_TYPE_NODE);
std::cout << std::endl;
#ifdef WITH_HOOKS
std::cout << "Supported hooks:" << std::endl;
for (Plugin *p : Registry::lookup<HookFactory>())
std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl;
std::cout << std::endl;
std::cout << "Supported hooks:" << std::endl;
for (Plugin *p : Registry::lookup<HookFactory>())
std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl;
std::cout << std::endl;
#endif /* WITH_HOOKS */
#ifdef WITH_API
std::cout << "Supported API commands:" << std::endl;
for (Plugin *p : Registry::lookup<api::ActionFactory>())
std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl;
std::cout << std::endl;
std::cout << "Supported API commands:" << std::endl;
for (Plugin *p : Registry::lookup<api::ActionFactory>())
std::cout << " - " << p->getName() << ": " << p->getDescription() << std::endl;
std::cout << std::endl;
#endif /* WITH_API */
std::cout << "Supported IO formats:" << std::endl;
plugin_dump(PLUGIN_TYPE_FORMAT);
std::cout << std::endl;
std::cout << "Supported IO formats:" << std::endl;
plugin_dump(PLUGIN_TYPE_FORMAT);
std::cout << std::endl;
print_copyright();
}
printCopyright();
}
int main(int argc, char *argv[])
{
int ret;
const char *uri;
Logger logger = logging.get("node");
try {
/* Check arguments */
void parse()
{
/* Check arguments */
#ifdef ENABLE_OPAL_ASYNC
if (argc != 4) {
usage();
@ -139,7 +151,7 @@ int main(int argc, char *argv[])
while ((c = getopt(argc, argv, "hVd:")) != -1) {
switch (c) {
case 'V':
print_version();
printVersion();
exit(EXIT_SUCCESS);
case 'd':
@ -157,18 +169,15 @@ int main(int argc, char *argv[])
if (argc == optind + 1)
uri = argv[optind];
else if (argc == optind)
uri = nullptr;
else {
else if (argc != optind) {
usage();
exit(EXIT_FAILURE);
}
#endif /* ENABLE_OPAL_ASYNC */
}
logger->info("This is VILLASnode {} (built on {}, {})",
CLR_BLD(CLR_YEL(PROJECT_BUILD_ID)),
CLR_BLD(CLR_MAG(__DATE__)), CLR_BLD(CLR_MAG(__TIME__)));
int main()
{
#ifdef __linux__
/* Checks system requirements*/
auto required = utils::Version(KERNEL_VERSION_MAJ, KERNEL_VERSION_MIN);
@ -176,11 +185,7 @@ int main(int argc, char *argv[])
throw RuntimeError("Your kernel version is to old: required >= {}.{}", KERNEL_VERSION_MAJ, KERNEL_VERSION_MIN);
#endif /* __linux__ */
ret = utils::signals_init(quit);
if (ret)
throw RuntimeError("Failed to initialize signal subsystem");
if (uri)
if (!uri.empty())
sn.parse(uri);
else
logger->warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance.");
@ -191,13 +196,20 @@ int main(int argc, char *argv[])
sn.run();
sn.stop();
logger->info(CLR_GRN("Goodbye!"));
return 0;
}
catch (std::runtime_error &e) {
logger->error("{}", e.what());
};
return -1;
}
} // namespace tools
} // namespace node
} // namespace villas
int main(int argc, char *argv[])
{
auto t = villas::node::tools::Node(argc, argv);
return t.run();
}
/** @} */

View file

@ -29,14 +29,14 @@
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include <thread>
#include <iostream>
#include <atomic>
#include <villas/node/config.h>
#include <villas/config_helper.hpp>
#include <villas/super_node.hpp>
#include <villas/copyright.hpp>
#include <villas/utils.hpp>
#include <villas/utils.hpp>
#include <villas/log.hpp>
@ -49,14 +49,25 @@
#include <villas/exceptions.hpp>
#include <villas/format_type.h>
#include <villas/nodes/websocket.hpp>
#include <villas/tool.hpp>
using namespace villas;
using namespace villas::node;
namespace villas {
namespace node {
namespace tools {
class Direction {
class PipeDirection {
protected:
struct pool pool;
struct node *node;
struct io *io;
std::thread thread;
bool enabled;
int limit;
public:
Direction(struct node *n, struct io *i, bool en = true, int lim = -1) :
PipeDirection(struct node *n, struct io *i, bool en = true, int lim = -1) :
node(n),
io(i),
enabled(en),
@ -65,9 +76,6 @@ public:
pool.state = STATE_DESTROYED;
pool.queue.state = STATE_DESTROYED;
/* Initialize memory */
/* Initialize memory */
unsigned vec = LOG2_CEIL(MAX(node->out.vectorize, node->in.vectorize));
unsigned pool_size = node_type(node)->pool_size ? node_type(node)->pool_size : vec;
@ -77,206 +85,236 @@ public:
throw RuntimeError("Failed to allocate memory for pool.");
}
Direction(const Direction &c)
{
io = c.io;
}
~Direction()
~PipeDirection()
{
pool_destroy(&pool);
}
struct pool pool;
struct node *node;
struct io *io;
virtual void run()
{
pthread_t thread;
}
bool enabled;
int limit;
void startThread()
{
if (enabled)
thread = std::thread(&villas::node::tools::PipeDirection::run, this);
}
void stopThread()
{
thread.join();
}
};
struct Directions {
Direction send;
Direction recv;
};
class PipeSendDirection : public PipeDirection {
static std::atomic<bool> stop(false);
public:
PipeSendDirection(struct node *n, struct io *i, bool en = true, int lim = -1) :
PipeDirection(n, i, en, lim)
{ }
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
Logger logger = logging.get("pipe");
virtual void run()
{
Logger logger = logging.get("pipe:send");
switch (signal) {
case SIGALRM:
logger->info("Reached timeout. Terminating...");
break;
unsigned last_sequenceno = 0, release;
int scanned, sent, allocated, cnt = 0;
default:
logger->info("Received {} signal. Terminating...", strsignal(signal));
break;
}
struct sample *smps[node->out.vectorize];
stop = true;
}
while (node->state == STATE_STARTED && !io_eof(io)) {
allocated = sample_alloc_many(&pool, smps, node->out.vectorize);
if (allocated < 0)
throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize);
else if (allocated < (int) node->out.vectorize)
logger->warn("Send pool underrun");
static void usage()
{
std::cout << "Usage: villas-pipe [OPTIONS] CONFIG NODE" << std::endl
<< " CONFIG path to a configuration file" << std::endl
<< " NODE the name of the node to which samples are sent and received from" << std::endl
<< " OPTIONS are:" << std::endl
<< " -f FMT set the format" << std::endl
<< " -t DT the data-type format string" << std::endl
<< " -o OPTION=VALUE overwrite options in config file" << std::endl
<< " -x swap read / write endpoints" << std::endl
<< " -s only read data from stdin and send it to node" << std::endl
<< " -r only read data from node and write it to stdout" << std::endl
<< " -T NUM terminate after NUM seconds" << std::endl
<< " -L NUM terminate after NUM samples sent" << std::endl
<< " -l NUM terminate after NUM samples received" << std::endl
<< " -h show this usage information" << std::endl
<< " -d set logging level" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl;
scanned = io_scan(io, smps, allocated);
if (scanned < 0) {
logger->warn("Failed to read samples from stdin");
continue;
}
else if (scanned == 0)
continue;
print_copyright();
}
/* Fill in missing sequence numbers */
for (int i = 0; i < scanned; i++) {
if (smps[i]->flags & SAMPLE_HAS_SEQUENCE)
last_sequenceno = smps[i]->sequence;
else
smps[i]->sequence = last_sequenceno++;
}
static void * send_loop(void *ctx)
{
Directions *dirs = static_cast<Directions*>(ctx);
Logger logger = logging.get("pipe");
release = allocated;
unsigned last_sequenceno = 0, release;
int scanned, sent, allocated, cnt = 0;
sent = node_write(node, smps, scanned, &release);
struct node *node = dirs->send.node;
struct sample *smps[node->out.vectorize];
sample_decref_many(smps, release);
while (node->state == STATE_STARTED && !io_eof(dirs->send.io)) {
allocated = sample_alloc_many(&dirs->send.pool, smps, node->out.vectorize);
if (allocated < 0)
throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize);
else if (allocated < (int) node->out.vectorize)
logger->warn("Send pool underrun");
scanned = io_scan(dirs->send.io, smps, allocated);
if (scanned < 0) {
logger->warn("Failed to read samples from stdin");
continue;
}
else if (scanned == 0)
continue;
/* Fill in missing sequence numbers */
for (int i = 0; i < scanned; i++) {
if (smps[i]->flags & SAMPLE_HAS_SEQUENCE)
last_sequenceno = smps[i]->sequence;
else
smps[i]->sequence = last_sequenceno++;
}
release = allocated;
sent = node_write(node, smps, scanned, &release);
sample_decref_many(smps, release);
cnt += sent;
if (dirs->send.limit > 0 && cnt >= dirs->send.limit)
goto leave;
pthread_testcancel();
}
leave: if (io_eof(dirs->send.io)) {
if (dirs->recv.limit < 0) {
logger->info("Reached end-of-file. Terminating...");
stop = true;
}
else
logger->info("Reached end-of-file. Wait for receive side...");
}
else {
logger->info("Reached send limit. Terminating...");
stop = true;
}
return nullptr;
}
static void * recv_loop(void *ctx)
{
Directions *dirs = static_cast<Directions*>(ctx);
Logger logger = logging.get("pipe");
int recv, cnt = 0, allocated = 0;
unsigned release;
struct node *node = dirs->recv.node;
struct sample *smps[node->in.vectorize];
while (node->state == STATE_STARTED) {
allocated = sample_alloc_many(&dirs->recv.pool, smps, node->in.vectorize);
if (allocated < 0)
throw RuntimeError("Failed to allocate {} samples from receive pool.", node->in.vectorize);
else if (allocated < (int) node->in.vectorize)
logger->warn("Receive pool underrun: allocated only {} of {} samples", allocated, node->in.vectorize);
release = allocated;
recv = node_read(node, smps, allocated, &release);
if (recv < 0) {
if (node->state == STATE_STOPPING)
goto leave2;
else
logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv);
}
else {
io_print(dirs->recv.io, smps, recv);
cnt += recv;
if (dirs->recv.limit > 0 && cnt >= dirs->recv.limit)
cnt += sent;
if (limit > 0 && cnt >= limit)
goto leave;
}
sample_decref_many(smps, release);
pthread_testcancel();
leave: if (io_eof(io)) {
if (limit < 0) {
logger->info("Reached end-of-file. Terminating...");
raise(SIGINT);
}
else
logger->info("Reached end-of-file. Wait for receive side...");
}
else {
logger->info("Reached send limit. Terminating...");
raise(SIGINT);
}
}
};
class PipeReceiveDirection : public PipeDirection {
public:
PipeReceiveDirection(struct node *n, struct io *i, bool en = true, int lim = -1) :
PipeDirection(n, i, en, lim)
{ }
virtual void run()
{
Logger logger = logging.get("pipe:recv");
int recv, cnt = 0, allocated = 0;
unsigned release;
struct sample *smps[node->in.vectorize];
while (node->state == STATE_STARTED) {
allocated = sample_alloc_many(&pool, smps, node->in.vectorize);
if (allocated < 0)
throw RuntimeError("Failed to allocate {} samples from receive pool.", node->in.vectorize);
else if (allocated < (int) node->in.vectorize)
logger->warn("Receive pool underrun: allocated only {} of {} samples", allocated, node->in.vectorize);
release = allocated;
recv = node_read(node, smps, allocated, &release);
if (recv < 0) {
if (node->state == STATE_STOPPING)
goto leave2;
else
logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv);
}
else {
io_print(io, smps, recv);
cnt += recv;
if (limit > 0 && cnt >= limit)
goto leave;
}
sample_decref_many(smps, release);
}
leave: logger->info("Reached receive limit. Terminating...");
leave2: raise(SIGINT);
}
};
class Pipe : public Tool {
public:
Pipe(int argc, char *argv[]) :
Tool(argc, argv, "pipe"),
stop(false),
timeout(0),
reverse(false),
format("villas.human"),
dtypes("64f"),
enable_send(true),
enable_recv(true),
limit_send(-1),
limit_recv(-1)
{
int ret;
ret = memory_init(DEFAULT_NR_HUGEPAGES);
if (ret)
throw RuntimeError("Failed to initialize memory");
io.state = STATE_DESTROYED;
cfg_cli = json_object();
}
leave: logger->info("Reached receive limit. Terminating...");
leave2: stop = true;
~Pipe()
{
json_decref(cfg_cli);
}
return nullptr;
}
protected:
std::atomic<bool> stop;
int main(int argc, char *argv[])
{
Logger logger = logging.get("pipe");
SuperNode sn; /**< The global configuration */
struct io io;
try {
int ret, timeout = 0;
bool reverse = false;
const char *format = "villas.human";
const char *dtypes = "64f";
int timeout;
bool reverse;
std::string format;
std::string dtypes;
std::string uri;
std::string nodestr;
struct node *node;
static struct io io = { .state = STATE_DESTROYED };
json_t *cfg_cli;
SuperNode sn; /**< The global configuration */
bool enable_send = true;
bool enable_recv = true;
int limit_send = -1;
int limit_recv = -1;
json_t *cfg_cli = json_object();
void handler(int signal, siginfo_t *sinfo, void *ctx)
{
switch (signal) {
case SIGALRM:
logger->info("Reached timeout. Terminating...");
break;
bool enable_send = true, enable_recv = true;
int limit_send = -1, limit_recv = -1;
default:
logger->info("Received {} signal. Terminating...", strsignal(signal));
break;
}
/* Parse optional command line arguments */
int c;
stop = true;
}
void usage()
{
std::cout << "Usage: villas-pipe [OPTIONS] CONFIG NODE" << std::endl
<< " CONFIG path to a configuration file" << std::endl
<< " NODE the name of the node to which samples are sent and received from" << std::endl
<< " OPTIONS are:" << std::endl
<< " -f FMT set the format" << std::endl
<< " -t DT the data-type format string" << std::endl
<< " -o OPTION=VALUE overwrite options in config file" << std::endl
<< " -x swap read / write endpoints" << std::endl
<< " -s only read data from stdin and send it to node" << std::endl
<< " -r only read data from node and write it to stdout" << std::endl
<< " -T NUM terminate after NUM seconds" << std::endl
<< " -L NUM terminate after NUM samples sent" << std::endl
<< " -l NUM terminate after NUM samples received" << std::endl
<< " -h show this usage information" << std::endl
<< " -d set logging level" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl;
printCopyright();
}
void parse()
{
int c, ret;
char *endptr;
while ((c = getopt(argc, argv, "Vhxrsd:l:L:T:f:t:o:")) != -1) {
switch (c) {
case 'V':
print_version();
printVersion();
exit(EXIT_SUCCESS);
case 'f':
@ -338,30 +376,29 @@ check: if (optarg == endptr)
exit(EXIT_FAILURE);
}
logger->info("Logging level: {}", logging.getLevelName());
uri = argv[optind];
nodestr = argv[optind+1];
}
char *uri = argv[optind];
char *nodestr = argv[optind+1];
int main()
{
int ret;
struct node *node;
struct format_type *ft;
ret = memory_init(0);
if (ret)
throw RuntimeError("Failed to intialize memory");
logger->info("Logging level: {}", logging.getLevelName());
ret = utils::signals_init(quit);
if (ret)
throw RuntimeError("Failed to initialize signals");
if (uri)
if (!uri.empty())
sn.parse(uri);
else
logger->warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance.");
ft = format_type_lookup(format);
ft = format_type_lookup(format.c_str());
if (!ft)
throw RuntimeError("Invalid format: {}", format);
ret = io_init2(&io, ft, dtypes, SAMPLE_HAS_ALL);
ret = io_init2(&io, ft, dtypes.c_str(), SAMPLE_HAS_ALL);
if (ret)
throw RuntimeError("Failed to initialize IO");
@ -406,36 +443,19 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Failed to start node {}: reason={}", node_name(node), ret);
/* Start threads */
Directions dirs = {
.send = Direction(node, &io, enable_send, limit_send),
.recv = Direction(node, &io, enable_recv, limit_recv)
};
PipeReceiveDirection recv_dir(node, &io, enable_recv, limit_recv);
PipeSendDirection send_dir(node, &io, enable_recv, limit_recv);
if (dirs.recv.enabled) {
dirs.recv.node = node;
pthread_create(&dirs.recv.thread, nullptr, recv_loop, &dirs);
}
if (dirs.send.enabled) {
dirs.send.node = node;
pthread_create(&dirs.send.thread, nullptr, send_loop, &dirs);
}
recv_dir.startThread();
send_dir.startThread();
alarm(timeout);
while (!stop)
sleep(1);
if (dirs.recv.enabled) {
pthread_cancel(dirs.recv.thread);
pthread_join(dirs.recv.thread, nullptr);
}
if (dirs.send.enabled) {
pthread_cancel(dirs.send.thread);
pthread_join(dirs.send.thread, nullptr);
}
recv_dir.stopThread();
send_dir.stopThread();
ret = node_stop(node);
if (ret)
@ -455,15 +475,20 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Failed to destroy IO");
logger->info(CLR_GRN("Goodbye!"));
return 0;
}
catch (std::runtime_error &e) {
logger->error("{}", e.what());
};
return -1;
}
} // namespace tools
} // namespace node
} // namespace villas
int main(int argc, char *argv[])
{
auto t = villas::node::tools::Pipe(argc, argv);
return t.run();
}
/** @} */

View file

@ -31,95 +31,26 @@
#include <spdlog/sinks/stdout_color_sinks.h>
#include <jansson.h>
#include <villas/config.h>
#include <villas/node/config.h>
#include <villas/compat.h>
#include <villas/memory.h>
#include <villas/tool.hpp>
#include <villas/log.hpp>
#include <villas/copyright.hpp>
#include "villas-relay.hpp"
/** The libwebsockets server context. */
static lws_context *context;
/** The libwebsockets vhost. */
static lws_vhost *vhost;
namespace villas {
namespace node {
namespace tools {
std::map<std::string, Session *> sessions;
using Logger = villas::Logger;
/* Default options */
struct Options opts = {
.loopback = false,
.port = 8088,
.protocol = "live"
};
/** List of libwebsockets protocols. */
lws_protocols protocols[] = {
{
.name = "http",
.callback = lws_callback_http_dummy,
.per_session_data_size = 0,
.rx_buffer_size = 1024
},
{
.name = "http-api",
.callback = http_protocol_cb,
.per_session_data_size = 0,
.rx_buffer_size = 1024
},
{
.name = "live",
.callback = protocol_cb,
.per_session_data_size = sizeof(Connection),
.rx_buffer_size = 0
},
{ nullptr /* terminator */ }
};
/** List of libwebsockets extensions. */
static const lws_extension extensions[] = {
{
"permessage-deflate",
lws_extension_callback_pm_deflate,
"permessage-deflate"
},
{
"deflate-frame",
lws_extension_callback_pm_deflate,
"deflate_frame"
},
{ nullptr /* terminator */ }
};
static const lws_http_mount mount = {
.mount_next = nullptr, /* linked-list "next" */
.mountpoint = "/api/v1", /* mountpoint URL */
.origin = nullptr, /* protocol */
.def = nullptr,
.protocol = "http-api",
.cgienv = nullptr,
.extra_mimetypes = nullptr,
.interpret = nullptr,
.cgi_timeout = 0,
.cache_max_age = 0,
.auth_mask = 0,
.cache_reusable = 0,
.cache_revalidate = 0,
.cache_intermediaries = 0,
.origin_protocol = LWSMPRO_CALLBACK, /* dynamic */
.mountpoint_len = 7, /* char count */
.basic_auth_login_file =nullptr,
};
Session::Session(Identifier sid) :
RelaySession::RelaySession(Identifier sid) :
identifier(sid),
connects(0)
{
Logger logger = villas::logging.get("console");
logger->info("Session created: {}", identifier);
logger->info("RelaySession created: {}", identifier);
sessions[sid] = this;
@ -128,16 +59,16 @@ Session::Session(Identifier sid) :
uuid_generate(uuid);
}
Session::~Session()
RelaySession::~RelaySession()
{
Logger logger = villas::logging.get("console");
logger->info("Session destroyed: {}", identifier);
logger->info("RelaySession destroyed: {}", identifier);
sessions.erase(identifier);
}
Session * Session::get(lws *wsi)
RelaySession * RelaySession::get(lws *wsi)
{
Logger logger = villas::logging.get("console");
@ -157,7 +88,7 @@ Session * Session::get(lws *wsi)
auto it = sessions.find(sid);
if (it == sessions.end()) {
return new Session(sid);
return new RelaySession(sid);
}
else {
logger->info("Found existing session: {}", sid);
@ -166,7 +97,7 @@ Session * Session::get(lws *wsi)
}
}
json_t * Session::toJson() const
json_t * RelaySession::toJson() const
{
json_t *json_connections = json_array();
@ -188,18 +119,21 @@ json_t * Session::toJson() const
);
}
Connection::Connection(lws *w) :
std::map<std::string, RelaySession *> RelaySession::sessions;
RelayConnection::RelayConnection(lws *w, bool lo) :
wsi(w),
currentFrame(std::make_shared<Frame>()),
outgoingFrames(),
bytes_recv(0),
bytes_sent(0),
frames_recv(0),
frames_sent(0)
frames_sent(0),
loopback(lo)
{
Logger logger = villas::logging.get("console");
session = Session::get(wsi);
session = RelaySession::get(wsi);
session->connections[wsi] = this;
session->connects++;
@ -210,11 +144,11 @@ Connection::Connection(lws *w) :
logger->info("New connection established: session={}, remote={} ({})", session->identifier, name, ip);
}
Connection::~Connection()
RelayConnection::~RelayConnection()
{
Logger logger = villas::logging.get("console");
logger->info("Connection closed: session={}, remote={} ({})", session->identifier, name, ip);
logger->info("RelayConnection closed: session={}, remote={} ({})", session->identifier, name, ip);
session->connections.erase(wsi);
@ -222,7 +156,7 @@ Connection::~Connection()
delete session;
}
json_t * Connection::toJson() const
json_t * RelayConnection::toJson() const
{
return json_pack("{ s: s, s: s, s: I, s: I, s: I, s: I, s: I }",
"name", name,
@ -235,7 +169,7 @@ json_t * Connection::toJson() const
);
}
void Connection::write()
void RelayConnection::write()
{
int ret;
@ -254,7 +188,7 @@ void Connection::write()
lws_callback_on_writable(wsi);
}
void Connection::read(void *in, size_t len)
void RelayConnection::read(void *in, size_t len)
{
Logger logger = villas::logging.get("console");
@ -264,14 +198,14 @@ void Connection::read(void *in, size_t len)
if (lws_is_final_fragment(wsi)) {
frames_recv++;
logger->debug("Received frame, relaying to {} connections", session->connections.size() - (opts.loopback ? 0 : 1));
logger->debug("Received frame, relaying to {} connections", session->connections.size() - (loopback ? 0 : 1));
for (auto p : session->connections) {
auto c = p.second;
/* We skip the current connection in order
* to avoid receiving our own data */
if (opts.loopback == false && c == this)
if (loopback == false && c == this)
continue;
c->outgoingFrames.push(currentFrame);
@ -283,7 +217,47 @@ void Connection::read(void *in, size_t len)
}
}
static void logger_cb(int level, const char *msg)
Relay::Relay(int argc, char *argv[]) :
Tool(argc, argv, "relay"),
stop(false),
loopback(false),
port(8088),
protocol("live")
{
int ret;
ret = memory_init(DEFAULT_NR_HUGEPAGES);
if (ret)
throw RuntimeError("Failed to initialize memory");
/* Initialize logging */
spdlog::stdout_color_mt("lws");
lws_set_log_level((1 << LLL_COUNT) - 1, logger_cb);
protocols = {
{
.name = "http",
.callback = lws_callback_http_dummy,
.per_session_data_size = 0,
.rx_buffer_size = 1024
},
{
.name = "http-api",
.callback = http_protocol_cb,
.per_session_data_size = 0,
.rx_buffer_size = 1024
},
{
.name = "live",
.callback = protocol_cb,
.per_session_data_size = sizeof(RelayConnection),
.rx_buffer_size = 0
},
{ nullptr /* terminator */ }
};
}
void Relay::logger_cb(int level, const char *msg)
{
auto log = spdlog::get("lws");
@ -296,20 +270,34 @@ static void logger_cb(int level, const char *msg)
level = LLL_WARN;
switch (level) {
case LLL_ERR: log->error("{}", msg); break;
case LLL_WARN: log->warn( "{}", msg); break;
case LLL_INFO: log->info( "{}", msg); break;
default: log->debug("{}", msg); break;
case LLL_ERR:
log->error("{}", msg);
break;
case LLL_WARN:
log->warn( "{}", msg);
break;
case LLL_INFO:
log->info( "{}", msg);
break;
default:
log->debug("{}", msg);
break;
}
}
int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
int Relay::http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
int ret;
size_t json_len;
json_t *json_sessions, *json_body;
Logger logger = villas::logging.get("console");
lws_context *ctx = lws_get_context(wsi);
void *user_ctx = lws_context_user(ctx);
Relay *r = reinterpret_cast<Relay *>(user_ctx);
unsigned char buf[LWS_PRE + 2048], *start = &buf[LWS_PRE], *end = &buf[sizeof(buf) - LWS_PRE - 1], *p = start;
@ -320,6 +308,7 @@ int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, voi
LWS_ILLEGAL_HTTP_CONTENT_LEN, /* no content len */
&p, end))
return 1;
if (lws_finalize_write_http_header(wsi, start, &p, end))
return 1;
@ -331,7 +320,7 @@ int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, voi
case LWS_CALLBACK_HTTP_WRITEABLE:
json_sessions = json_array();
for (auto it : sessions) {
for (auto it : RelaySession::sessions) {
auto &session = it.second;
json_array_append(json_sessions, session->toJson());
@ -345,9 +334,9 @@ int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, voi
"version", PROJECT_VERSION_STR,
"hostname", hname,
"options",
"loopback", opts.loopback,
"port", opts.port,
"protocol", opts.protocol
"loopback", r->loopback,
"port", r->port,
"protocol", r->protocol.c_str()
);
json_len = json_dumpb(json_body, (char *) buf + LWS_PRE, sizeof(buf) - LWS_PRE, JSON_INDENT(4));
@ -356,7 +345,7 @@ int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, voi
if (ret < 0)
return ret;
logger->info("Handled API request");
r->logger->info("Handled API request");
//if (lws_http_transaction_completed(wsi))
return -1;
@ -368,15 +357,19 @@ int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, voi
return lws_callback_http_dummy(wsi, reason, user, in, len);
}
int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
int Relay::protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
Connection *c = reinterpret_cast<Connection *>(user);
lws_context *ctx = lws_get_context(wsi);
void *user_ctx = lws_context_user(ctx);
Relay *r = reinterpret_cast<Relay *>(user_ctx);
RelayConnection *c = reinterpret_cast<RelayConnection *>(user);
switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
try {
new (c) Connection(wsi);
new (c) RelayConnection(wsi, r->loopback);
}
catch (InvalidUrlException &e) {
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *) "Invalid URL", strlen("Invalid URL"));
@ -386,7 +379,7 @@ int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in
break;
case LWS_CALLBACK_CLOSED:
c->~Connection();
c->~RelayConnection();
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
@ -404,104 +397,141 @@ int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in
return 0;
}
static void usage()
void Relay::usage()
{
std::cout << "Usage: villas-relay [OPTIONS]" << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -d LVL set debug level" << std::endl
<< " -p PORT the port number to listen on" << std::endl
<< " -P PROT the websocket protocol" << std::endl
<< " -l enable loopback of own data" << std::endl
<< " -V show version and exit" << std::endl
<< " -h show usage and exit" << std::endl << std::endl;
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -d LVL set debug level" << std::endl
<< " -p PORT the port number to listen on" << std::endl
<< " -P PROT the websocket protocol" << std::endl
<< " -l enable loopback of own data" << std::endl
<< " -V show version and exit" << std::endl
<< " -h show usage and exit" << std::endl << std::endl;
villas::print_copyright();
printCopyright();
}
void Relay::parse()
{
char c, *endptr;
while ((c = getopt (argc, argv, "hVp:P:ld:")) != -1) {
switch (c) {
case 'd':
spdlog::set_level(spdlog::level::from_str(optarg));
break;
case 'p':
port = strtoul(optarg, &endptr, 10);
goto check;
case 'P':
protocol = optarg;
break;
case 'l':
loopback = true;
break;
case 'V':
printVersion();
exit(EXIT_SUCCESS);
case 'h':
case '?':
usage();
exit(c == '?' ? EXIT_FAILURE : EXIT_SUCCESS);
}
continue;
check: if (optarg == endptr) {
logger->error("Failed to parse parse option argument '-{} {}'", c, optarg);
exit(EXIT_FAILURE);
}
}
if (argc - optind < 0) {
usage();
exit(EXIT_FAILURE);
}
}
int Relay::main() {
/* Start server */
lws_context_creation_info ctx_info = { 0 };
protocols[2].name = protocol.c_str();
ctx_info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
ctx_info.gid = -1;
ctx_info.uid = -1;
ctx_info.protocols = protocols.data();
ctx_info.extensions = extensions.data();
ctx_info.port = port;
ctx_info.mounts = &mount;
ctx_info.user = (void *) this;
context = lws_create_context(&ctx_info);
if (context == nullptr) {
logger->error("WebSocket: failed to initialize server context");
exit(EXIT_FAILURE);
}
vhost = lws_create_vhost(context, &ctx_info);
if (vhost == nullptr) {
logger->error("WebSocket: failed to initialize virtual host");
exit(EXIT_FAILURE);
}
while (!stop)
lws_service(context, 100);
return 0;
}
const std::vector<lws_extension> Relay::extensions = {
{
"permessage-deflate",
lws_extension_callback_pm_deflate,
"permessage-deflate"
},
{
"deflate-frame",
lws_extension_callback_pm_deflate,
"deflate_frame"
},
{ nullptr /* terminator */ }
};
const lws_http_mount Relay::mount = {
.mount_next = nullptr, /* linked-list "next" */
.mountpoint = "/api/v1", /* mountpoint URL */
.origin = nullptr, /* protocol */
.def = nullptr,
.protocol = "http-api",
.cgienv = nullptr,
.extra_mimetypes = nullptr,
.interpret = nullptr,
.cgi_timeout = 0,
.cache_max_age = 0,
.auth_mask = 0,
.cache_reusable = 0,
.cache_revalidate = 0,
.cache_intermediaries = 0,
.origin_protocol = LWSMPRO_CALLBACK, /* dynamic */
.mountpoint_len = 7, /* char count */
.basic_auth_login_file =nullptr,
};
} // namespace tools
} // namespace node
} // namespace villas
int main(int argc, char *argv[])
{
Logger logger = villas::logging.get("console");
auto t = villas::node::tools::Relay(argc, argv);
try {
/* Initialize logging */
spdlog::stdout_color_mt("lws");
lws_set_log_level((1 << LLL_COUNT) - 1, logger_cb);
/* Start server */
lws_context_creation_info ctx_info = { 0 };
char c, *endptr;
while ((c = getopt (argc, argv, "hVp:P:ld:")) != -1) {
switch (c) {
case 'd':
spdlog::set_level(spdlog::level::from_str(optarg));
break;
case 'p':
opts.port = strtoul(optarg, &endptr, 10);
goto check;
case 'P':
opts.protocol = strdup(optarg);
break;
case 'l':
opts.loopback = true;
break;
case 'V':
villas::print_version();
exit(EXIT_SUCCESS);
case 'h':
case '?':
usage();
exit(c == '?' ? EXIT_FAILURE : EXIT_SUCCESS);
}
continue;
check: if (optarg == endptr) {
logger->error("Failed to parse parse option argument '-{} {}'", c, optarg);
exit(EXIT_FAILURE);
}
}
if (argc - optind < 0) {
usage();
exit(EXIT_FAILURE);
}
protocols[2].name = opts.protocol;
ctx_info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
ctx_info.gid = -1;
ctx_info.uid = -1;
ctx_info.protocols = protocols;
ctx_info.extensions = extensions;
ctx_info.port = opts.port;
ctx_info.mounts = &mount;
context = lws_create_context(&ctx_info);
if (context == nullptr) {
logger->error("WebSocket: failed to initialize server context");
exit(EXIT_FAILURE);
}
vhost = lws_create_vhost(context, &ctx_info);
if (vhost == nullptr) {
logger->error("WebSocket: failed to initialize virtual host");
exit(EXIT_FAILURE);
}
for (;;)
lws_service(context, 100);
return 0;
}
catch (std::runtime_error &e) {
logger->error("{}", e.what());
return -1;
}
return t.run();
}

View file

@ -33,19 +33,18 @@
#include <libwebsockets.h>
namespace villas {
namespace node {
namespace tools {
/* Forward declarations */
lws_callback_function protocol_cb, http_protocol_cb;
class Session;
class Connection;
class Relay;
class RelaySession;
class RelayConnection;
class InvalidUrlException { };
struct Options {
bool loopback;
int port;
const char *protocol;
};
class Frame : public std::vector<uint8_t> {
public:
Frame() {
@ -62,31 +61,38 @@ public:
}
};
class Session {
class RelaySession {
friend RelayConnection;
friend Relay;
public:
typedef std::string Identifier;
protected:
time_t created;
uuid_t uuid;
public:
typedef std::string Identifier;
static Session * get(lws *wsi);
Session(Identifier sid);
~Session();
json_t * toJson() const;
Identifier identifier;
std::map<lws *, Connection *> connections;
std::map<lws *, RelayConnection *> connections;
int connects;
static std::map<std::string, RelaySession *> sessions;
public:
static RelaySession * get(lws *wsi);
RelaySession(Identifier sid);
~RelaySession();
json_t * toJson() const;
};
class Connection {
class RelayConnection {
protected:
lws *wsi;
@ -95,7 +101,7 @@ protected:
std::queue<std::shared_ptr<Frame>> outgoingFrames;
Session *session;
RelaySession *session;
char name[128];
char ip[128];
@ -107,13 +113,62 @@ protected:
size_t frames_recv;
size_t frames_sent;
public:
Connection(lws *w);
bool loopback;
~Connection();
public:
RelayConnection(lws *w, bool lo);
~RelayConnection();
json_t * toJson() const;
void write();
void read(void *in, size_t len);
};
class Relay : public Tool {
public:
Relay(int argc, char *argv[]);
protected:
std::atomic<bool> stop;
/** The libwebsockets server context. */
lws_context *context;
/** The libwebsockets vhost. */
lws_vhost *vhost;
bool loopback;
int port;
std::string protocol;
/** List of libwebsockets protocols. */
std::vector<lws_protocols> protocols;
/** List of libwebsockets extensions. */
static const std::vector<lws_extension> extensions;
static const lws_http_mount mount;
static void logger_cb(int level, const char *msg);
static int http_protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
static int protocol_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
void usage();
void parse();
int main();
void handler(int signal, siginfo_t *sinfo, void *ctx)
{
stop = true;
}
};
} // namespace tools
} // namespace node
} // namespace villas

View file

@ -30,11 +30,11 @@
#include <iostream>
#include <atomic>
#include <villas/tool.hpp>
#include <villas/io.h>
#include <villas/utils.hpp>
#include <villas/colors.hpp>
#include <villas/exceptions.hpp>
#include <villas/copyright.hpp>
#include <villas/log.hpp>
#include <villas/sample.h>
#include <villas/timing.h>
@ -45,162 +45,19 @@
using namespace villas;
static std::atomic<bool> stop(false);
namespace villas {
namespace node {
namespace tools {
static void usage()
{
std::cout << "Usage: villas-signal [OPTIONS] SIGNAL" << std::endl
<< " SIGNAL is on of the following signal types:" << std::endl
<< " mixed" << std::endl
<< " random" << std::endl
<< " sine" << std::endl
<< " triangle" << std::endl
<< " square" << std::endl
<< " ramp" << std::endl
<< " constants" << std::endl
<< " counter" << std::endl << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -d LVL set debug level" << std::endl
<< " -f FMT set the format" << std::endl
<< " -v NUM specifies how many values a message should contain" << std::endl
<< " -r HZ how many messages per second" << std::endl
<< " -n non real-time mode. do not throttle output." << std::endl
<< " -F HZ the frequency of the signal" << std::endl
<< " -a FLT the amplitude" << std::endl
<< " -D FLT the standard deviation for 'random' signals" << std::endl
<< " -o OFF the DC bias" << std::endl
<< " -l NUM only send LIMIT messages and stop" << std::endl << std::endl;
class Signal : public Tool {
print_copyright();
}
json_t * parse_cli(int argc, char *argv[], char **format)
{
Logger logger = logging.get("signal");
/* Default values */
double rate = 10;
double frequency = 1;
double amplitude = 1;
double stddev = 0.02;
double offset = 0;
char *type;
int rt = 1;
int values = 1;
int limit = -1;
/* Parse optional command line arguments */
int c;
char *endptr;
while ((c = getopt(argc, argv, "v:r:F:f:l:a:D:no:d:hV")) != -1) {
switch (c) {
case 'n':
rt = 0;
break;
case 'f':
*format = optarg;
break;
case 'l':
limit = strtoul(optarg, &endptr, 10);
goto check;
case 'v':
values = strtoul(optarg, &endptr, 10);
goto check;
case 'r':
rate = strtof(optarg, &endptr);
goto check;
case 'o':
offset = strtof(optarg, &endptr);
goto check;
case 'F':
frequency = strtof(optarg, &endptr);
goto check;
case 'a':
amplitude = strtof(optarg, &endptr);
goto check;
case 'D':
stddev = strtof(optarg, &endptr);
goto check;
case 'd':
logging.setLevel(optarg);
break;
case 'V':
print_version();
exit(EXIT_SUCCESS);
case 'h':
case '?':
usage();
exit(c == '?' ? EXIT_FAILURE : EXIT_SUCCESS);
}
continue;
check: if (optarg == endptr)
logger->warn("Failed to parse parse option argument '-{} {}'", c, optarg);
}
if (argc != optind + 1)
return nullptr;
type = argv[optind];
return json_pack("{ s: s, s: s, s: f, s: f, s: f, s: f, s: f, s: b, s: i, s: i }",
"type", "signal",
"signal", type,
"rate", rate,
"frequency", frequency,
"amplitude", amplitude,
"stddev", stddev,
"offset", offset,
"realtime", rt,
"values", values,
"limit", limit
);
}
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
Logger logger = logging.get("signal");
switch (signal) {
case SIGALRM:
logger->info("Reached timeout. Terminating...");
break;
default:
logger->info("Received {} signal. Terminating...", strsignal(signal));
}
stop = true;
}
int main(int argc, char *argv[])
{
Logger logger = logging.get("signal");
try {
public:
Signal(int argc, char *argv[]) :
Tool(argc, argv, "signal"),
stop(false),
format("villas.human")
{
int ret;
json_t *cfg;
struct node_type *nt;
struct format_type *ft;
char *format = (char *) "villas.human"; /** @todo hardcoded for now */
struct node n;
struct io io;
struct pool q;
struct sample *t;
n.state = STATE_DESTROYED;
n.in.state = STATE_DESTROYED;
@ -209,13 +66,164 @@ int main(int argc, char *argv[])
q.state = STATE_DESTROYED;
q.queue.state = STATE_DESTROYED;
ret = utils::signals_init(quit);
if (ret)
throw RuntimeError("Failed to intialize signals");
ret = memory_init(0);
ret = memory_init(DEFAULT_NR_HUGEPAGES);
if (ret)
throw RuntimeError("Failed to initialize memory");
}
protected:
std::atomic<bool> stop;
struct node n;
struct io io;
struct pool q;
std::string format;
void usage()
{
std::cout << "Usage: villas-signal [OPTIONS] SIGNAL" << std::endl
<< " SIGNAL is on of the following signal types:" << std::endl
<< " mixed" << std::endl
<< " random" << std::endl
<< " sine" << std::endl
<< " triangle" << std::endl
<< " square" << std::endl
<< " ramp" << std::endl
<< " constants" << std::endl
<< " counter" << std::endl << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -d LVL set debug level" << std::endl
<< " -f FMT set the format" << std::endl
<< " -v NUM specifies how many values a message should contain" << std::endl
<< " -r HZ how many messages per second" << std::endl
<< " -n non real-time mode. do not throttle output." << std::endl
<< " -F HZ the frequency of the signal" << std::endl
<< " -a FLT the amplitude" << std::endl
<< " -D FLT the standard deviation for 'random' signals" << std::endl
<< " -o OFF the DC bias" << std::endl
<< " -l NUM only send LIMIT messages and stop" << std::endl << std::endl;
printCopyright();
}
json_t * parse_cli(int argc, char *argv[])
{
/* Default values */
double rate = 10;
double frequency = 1;
double amplitude = 1;
double stddev = 0.02;
double offset = 0;
std::string type;
int rt = 1;
int values = 1;
int limit = -1;
/* Parse optional command line arguments */
int c;
char *endptr;
while ((c = getopt(argc, argv, "v:r:F:f:l:a:D:no:d:hV")) != -1) {
switch (c) {
case 'n':
rt = 0;
break;
case 'f':
format = optarg;
break;
case 'l':
limit = strtoul(optarg, &endptr, 10);
goto check;
case 'v':
values = strtoul(optarg, &endptr, 10);
goto check;
case 'r':
rate = strtof(optarg, &endptr);
goto check;
case 'o':
offset = strtof(optarg, &endptr);
goto check;
case 'F':
frequency = strtof(optarg, &endptr);
goto check;
case 'a':
amplitude = strtof(optarg, &endptr);
goto check;
case 'D':
stddev = strtof(optarg, &endptr);
goto check;
case 'd':
logging.setLevel(optarg);
break;
case 'V':
printVersion();
exit(EXIT_SUCCESS);
case 'h':
case '?':
usage();
exit(c == '?' ? EXIT_FAILURE : EXIT_SUCCESS);
}
continue;
check: if (optarg == endptr)
logger->warn("Failed to parse parse option argument '-{} {}'", c, optarg);
}
if (argc != optind + 1)
return nullptr;
type = argv[optind];
return json_pack("{ s: s, s: s, s: f, s: f, s: f, s: f, s: f, s: b, s: i, s: i }",
"type", "signal",
"signal", type.c_str(),
"rate", rate,
"frequency", frequency,
"amplitude", amplitude,
"stddev", stddev,
"offset", offset,
"realtime", rt,
"values", values,
"limit", limit
);
}
void handler(int signal, siginfo_t *sinfo, void *ctx)
{
Logger logger = logging.get("signal");
switch (signal) {
case SIGALRM:
logger->info("Reached timeout. Terminating...");
break;
default:
logger->info("Received {} signal. Terminating...", strsignal(signal));
}
stop = true;
}
int main()
{
int ret;
json_t *cfg;
struct node_type *nt;
struct format_type *ft;
struct sample *t;
nt = node_type_lookup("signal");
if (!nt)
@ -225,7 +233,7 @@ int main(int argc, char *argv[])
if (ret)
throw RuntimeError("Failed to initialize node");
cfg = parse_cli(argc, argv, &format);
cfg = parse_cli(argc, argv);
if (!cfg) {
usage();
exit(EXIT_FAILURE);
@ -237,7 +245,7 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
ft = format_type_lookup(format);
ft = format_type_lookup(format.c_str());
if (!ft)
throw RuntimeError("Invalid output format '{}'", format);
@ -310,15 +318,19 @@ out: sample_decref(t);
if (ret)
throw RuntimeError("Failed to destroy pool");
logger->info(CLR_GRN("Goodbye!"));
return 0;
}
catch (std::runtime_error &e) {
logger->error("{}", e.what());
};
return -1;
}
} // namespace tools
} // namespace node
} // namespace villas
int main(int argc, char *argv[])
{
auto t = villas::node::tools::Signal(argc, argv);
return t.run();
}
/** @} */

View file

@ -26,38 +26,42 @@
#include <jansson.h>
#include <villas/tool.hpp>
#include <villas/sample.h>
#include <villas/io.h>
#include <villas/format_type.h>
#include <villas/utils.hpp>
#include <villas/log.hpp>
#include <villas/copyright.hpp>
#include <villas/pool.h>
#include <villas/exceptions.hpp>
#include <villas/node/config.h>
using namespace villas;
class Side {
namespace villas {
namespace node {
namespace tools {
class TestCmpSide {
public:
std::string path;
std::string dtypes;
struct sample *sample;
struct io io;
struct format_type *format;
const char *dtypes;
Side(const std::string &pth, struct format_type *fmt, const char *dt, struct pool *p) :
TestCmpSide(const std::string &pth, struct format_type *fmt, const std::string &dt, struct pool *p) :
path(pth),
format(fmt),
dtypes(dt)
dtypes(dt),
format(fmt)
{
int ret;
io.state = STATE_DESTROYED;
ret = io_init2(&io, format, dtypes, 0);
ret = io_init2(&io, format, dtypes.c_str(), 0);
if (ret)
throw RuntimeError("Failed to initialize IO");
@ -74,7 +78,7 @@ public:
throw RuntimeError("Failed to allocate samples");
}
~Side() noexcept(false)
~TestCmpSide() noexcept(false)
{
int ret;
@ -90,46 +94,62 @@ public:
}
};
static void usage()
{
std::cout << "Usage: villas-test-cmp [OPTIONS] FILE1 FILE2 ... FILEn" << std::endl
<< " FILE a list of files to compare" << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -d LVL adjust the debug level" << std::endl
<< " -e EPS set epsilon for floating point comparisons to EPS" << std::endl
<< " -v ignore data values" << std::endl
<< " -T ignore timestamp" << std::endl
<< " -s ignore sequence no" << std::endl
<< " -f FMT file format for all files" << std::endl
<< " -t DT the data-type format string" << std::endl
<< " -h show this usage information" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl
<< "Return codes:" << std::endl
<< " 0 files are equal" << std::endl
<< " 1 file length not equal" << std::endl
<< " 2 sequence no not equal" << std::endl
<< " 3 timestamp not equal" << std::endl
<< " 4 number of values is not equal" << std::endl
<< " 5 data is not equal" << std::endl << std::endl;
class TestCmp : public Tool {
print_copyright();
}
public:
TestCmp(int argc, char *argv[]) :
Tool(argc, argv, "test-cmp"),
epsilon(1e-9),
format("villas.human"),
dtypes("64f"),
flags(SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA | SAMPLE_HAS_TS_ORIGIN)
{
pool.state = STATE_DESTROYED;
int main(int argc, char *argv[])
{
Logger logger = logging.get("test-cmp");
int ret;
try {
int ret, rc = 0;
ret = memory_init(DEFAULT_NR_HUGEPAGES);
if (ret)
throw RuntimeError("Failed to initialize memory");
}
/* Default values */
double epsilon = 1e-9;
const char *format = "villas.human";
const char *dtypes = "64f";
int flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA | SAMPLE_HAS_TS_ORIGIN;
protected:
struct pool pool;
struct pool pool = { .state = STATE_DESTROYED };
double epsilon;
std::string format;
std::string dtypes;
int flags;
std::vector<std::string> filenames;
void usage()
{
std::cout << "Usage: villas-test-cmp [OPTIONS] FILE1 FILE2 ... FILEn" << std::endl
<< " FILE a list of files to compare" << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -d LVL adjust the debug level" << std::endl
<< " -e EPS set epsilon for floating point comparisons to EPS" << std::endl
<< " -v ignore data values" << std::endl
<< " -T ignore timestamp" << std::endl
<< " -s ignore sequence no" << std::endl
<< " -f FMT file format for all files" << std::endl
<< " -t DT the data-type format string" << std::endl
<< " -h show this usage information" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl
<< "Return codes:" << std::endl
<< " 0 files are equal" << std::endl
<< " 1 file length not equal" << std::endl
<< " 2 sequence no not equal" << std::endl
<< " 3 timestamp not equal" << std::endl
<< " 4 number of values is not equal" << std::endl
<< " 5 data is not equal" << std::endl << std::endl;
printCopyright();
}
void parse()
{
/* Parse Arguments */
int c;
char *endptr;
@ -160,7 +180,7 @@ int main(int argc, char *argv[])
break;
case 'V':
print_version();
printVersion();
exit(EXIT_SUCCESS);
case 'd':
@ -184,38 +204,41 @@ check: if (optarg == endptr)
exit(EXIT_FAILURE);
}
int eofs, line, failed;
int n = argc - optind; /* The number of files which we compare */
Side *s[n];
/* Open files */
for (int i = 0; i < argc - optind; i++)
filenames.push_back(argv[optind + i]);
}
ret = memory_init(0);
if (ret)
throw RuntimeError("Failed to initialize memory system");
int main()
{
int ret, rc = 0, line, failed;
unsigned eofs;
ret = pool_init(&pool, n, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), &memory_heap);
if (ret)
throw RuntimeError("Failed to initialize pool");
struct format_type *fmt = format_type_lookup(format);
struct format_type *fmt = format_type_lookup(format.c_str());
if (!fmt)
throw RuntimeError("Invalid IO format: {}", format);
/* Open files */
for (int i = 0; i < n; i++)
s[i] = new Side(argv[optind + i], fmt, dtypes, &pool);
std::vector<TestCmpSide *> sides;
for (auto filename : filenames)
sides.push_back(new TestCmpSide(filename, fmt, dtypes, &pool));
ret = pool_init(&pool, sides.size(), SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), &memory_heap);
if (ret)
throw RuntimeError("Failed to initialize pool");
line = 0;
for (;;) {
/* Read next sample from all files */
retry: eofs = 0;
for (int i = 0; i < n; i++) {
ret = io_eof(&s[i]->io);
for (auto side : sides) {
ret = io_eof(&side->io);
if (ret)
eofs++;
}
if (eofs) {
if (eofs == n)
if (eofs == sides.size())
ret = 0;
else {
std::cout << "length unequal" << std::endl;
@ -226,8 +249,8 @@ retry: eofs = 0;
}
failed = 0;
for (int i = 0; i < n; i++) {
ret = io_scan(&s[i]->io, &s[i]->sample, 1);
for (auto side : sides) {
ret = io_scan(&side->io, &side->sample, 1);
if (ret <= 0)
failed++;
}
@ -235,8 +258,8 @@ retry: eofs = 0;
goto retry;
/* We compare all files against the first one */
for (int i = 1; i < n; i++) {
ret = sample_cmp(s[0]->sample, s[i]->sample, epsilon, flags);
for (auto side : sides) {
ret = sample_cmp(sides[0]->sample, side->sample, epsilon, flags);
if (ret) {
rc = ret;
goto out;
@ -246,8 +269,8 @@ retry: eofs = 0;
line++;
}
out: for (int i = 0; i < n; i++)
delete s[i];
out: for (auto side : sides)
delete side;
ret = pool_destroy(&pool);
if (ret)
@ -255,9 +278,15 @@ out: for (int i = 0; i < n; i++)
return rc;
}
catch (std::runtime_error &e) {
logger->error("{}", e.what());
};
return -1;
}
} // namespace tools
} // namespace node
} // namespace villas
int main(int argc, char *argv[])
{
auto t = villas::node::tools::TestCmp(argc, argv);
return t.run();
}

View file

@ -22,8 +22,8 @@
#include <iostream>
#include <villas/tool.hpp>
#include <villas/node/config.h>
#include <villas/copyright.hpp>
#include <villas/log.hpp>
#include <villas/version.hpp>
#include <villas/utils.hpp>
@ -33,27 +33,43 @@
using namespace villas;
using namespace villas::node;
static void usage()
{
std::cout << "Usage: villas-test-config [OPTIONS] CONFIG" << std::endl
<< " CONFIG is the path to an optional configuration file" << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -d LVL set debug level" << std::endl
<< " -V show version and exit" << std::endl
<< " -h show usage and exit" << std::endl << std::endl;
namespace villas {
namespace node {
namespace tools {
print_copyright();
}
class TestConfig : public Tool {
int main(int argc, char *argv[])
{
Logger logger = logging.get("test-config");
public:
TestConfig(int argc, char *argv[]) :
Tool(argc, argv, "test-config"),
check(false)
{
int ret;
try {
SuperNode sn;
ret = memory_init(DEFAULT_NR_HUGEPAGES);
if (ret)
throw RuntimeError("Failed to initialize memory");
}
bool check = false;
protected:
std::string uri;
bool check;
void usage()
{
std::cout << "Usage: villas-test-config [OPTIONS] CONFIG" << std::endl
<< " CONFIG is the path to an optional configuration file" << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -d LVL set debug level" << std::endl
<< " -V show version and exit" << std::endl
<< " -h show usage and exit" << std::endl << std::endl;
printCopyright();
}
void parse()
{
int c;
while ((c = getopt (argc, argv, "hcV")) != -1) {
switch (c) {
@ -62,7 +78,7 @@ int main(int argc, char *argv[])
break;
case 'V':
print_version();
printVersion();
exit(EXIT_SUCCESS);
case 'h':
@ -77,26 +93,29 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
sn.parse(argv[optind]);
uri = argv[optind];
}
int main()
{
SuperNode sn;
sn.parse(uri);
if (check)
sn.check();
return 0;
}
catch (ParseError &e) {
logger->error("{}", e.what());
};
return -1;
}
catch (ConfigError &e) {
logger->error("{}", e.what());
} // namespace tools
} // namespace node
} // namespace villas
return -1;
}
catch (std::runtime_error &e) {
logger->error("{}", e.what());
int main(int argc, char *argv[])
{
auto t = villas::node::tools::TestConfig(argc, argv);
return -1;
}
return t.run();
}

View file

@ -30,10 +30,10 @@
#include <iostream>
#include <atomic>
#include <villas/tool.hpp>
#include <villas/node/config.h>
#include <villas/super_node.hpp>
#include <villas/exceptions.hpp>
#include <villas/copyright.hpp>
#include <villas/log.hpp>
#include <villas/node.h>
#include <villas/utils.hpp>
@ -47,59 +47,73 @@
using namespace villas;
using namespace villas::node;
static std::atomic<bool> stop(false);
namespace villas {
namespace node {
namespace tools {
void quit(int signal, siginfo_t *sinfo, void *ctx)
{
stop = true;
}
class TestRtt : public Tool {
static void usage()
{
std::cout << "Usage: villas-test-rtt [OPTIONS] CONFIG NODE" << std::endl
<< " CONFIG path to a configuration file" << std::endl
<< " NODE name of the node which shoud be used" << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -c CNT send CNT messages" << std::endl
<< " -f FD use file descriptor FD for result output instead of stdout" << std::endl
<< " -b BKTS number of buckets for histogram" << std::endl
<< " -w WMUP duration of histogram warmup phase" << std::endl
<< " -h show this usage information" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl;
print_copyright();
}
int main(int argc, char *argv[])
{
Logger logger = logging.get("test-rtt");
try {
public:
TestRtt(int argc, char *argv[]) :
Tool(argc, argv, "test-rtt"),
stop(false),
fd(STDOUT_FILENO),
count(-1),
hist_warmup(100),
hist_buckets(20)
{
int ret;
struct hist hist;
struct timespec send, recv;
ret = memory_init(DEFAULT_NR_HUGEPAGES);
if (ret)
throw RuntimeError("Failed to initialize memory");
}
struct sample *smp_send = (struct sample *) new char[SAMPLE_LENGTH(2)];
struct sample *smp_recv = (struct sample *) new char[SAMPLE_LENGTH(2)];
protected:
std::atomic<bool> stop;
struct node *node;
std::string uri;
std::string nodestr;
SuperNode sn;
SuperNode sn;
/* Test options */
int count = -1; /**< Amount of messages which should be sent (default: -1 for unlimited) */
/** File descriptor for Matlab results.
* This allows you to write Matlab results in a seperate log file:
*
* ./test etc/example.conf rtt -f 3 3>> measurement_results.m
*/
int fd;
hist_cnt_t hist_warmup = 100;
int hist_buckets = 20;
/**< Amount of messages which should be sent (default: -1 for unlimited) */
int count;
/** File descriptor for Matlab results.
* This allows you to write Matlab results in a seperate log file:
*
* ./test etc/example.conf rtt -f 3 3>> measurement_results.m
*/
int fd = STDOUT_FILENO;
hist_cnt_t hist_warmup;
int hist_buckets;
void handler(int signal, siginfo_t *sinfo, void *ctx)
{
stop = true;
}
void usage()
{
std::cout << "Usage: villas-test-rtt [OPTIONS] CONFIG NODE" << std::endl
<< " CONFIG path to a configuration file" << std::endl
<< " NODE name of the node which shoud be used" << std::endl
<< " OPTIONS is one or more of the following options:" << std::endl
<< " -c CNT send CNT messages" << std::endl
<< " -f FD use file descriptor FD for result output instead of stdout" << std::endl
<< " -b BKTS number of buckets for histogram" << std::endl
<< " -w WMUP duration of histogram warmup phase" << std::endl
<< " -h show this usage information" << std::endl
<< " -V show the version of the tool" << std::endl << std::endl;
printCopyright();
}
void parse()
{
/* Parse Arguments */
int c;
char *endptr;
@ -122,7 +136,7 @@ int main(int argc, char *argv[])
goto check;
case 'V':
print_version();
printVersion();
exit(EXIT_SUCCESS);
case 'd':
@ -146,14 +160,23 @@ check: if (optarg == endptr)
exit(EXIT_FAILURE);
}
char *uri = argv[optind];
char *nodestr = argv[optind + 1];
uri = argv[optind];
nodestr = argv[optind + 1];
}
ret = utils::signals_init(quit);
if (ret)
throw RuntimeError("Failed to initialize signals subsystem");
int main()
{
int ret;
if (uri)
struct hist hist;
struct timespec send, recv;
struct sample *smp_send = (struct sample *) new char[SAMPLE_LENGTH(2)];
struct sample *smp_recv = (struct sample *) new char[SAMPLE_LENGTH(2)];
struct node *node;
if (!uri.empty())
sn.parse(uri);
else
logger->warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance.");
@ -237,9 +260,15 @@ check: if (optarg == endptr)
return 0;
}
catch (std::runtime_error &e) {
logger->error("{}", e.what());
};
return -1;
}
} // namespace tools
} // namespace node
} // namespace villas
int main(int argc, char *argv[])
{
auto t = villas::node::tools::TestRtt(argc, argv);
return t.run();
}

View file

@ -31,30 +31,55 @@
#include <stdlib.h>
#include <assert.h>
#include <zmq.h>
#include <villas/tool.hpp>
#if ZMQ_VERSION_MAJOR < 4 || (ZMQ_VERSION_MAJOR == 4 && ZMQ_VERSION_MINOR <= 1)
#include <zmq_utils.h>
#endif
namespace villas {
namespace node {
namespace tools {
class ZmqKeygen : public Tool {
public:
ZmqKeygen(int argc, char *argv[]) :
Tool(argc, argv, "zmq-keygen")
{ }
protected:
int main()
{
int ret;
char public_key[41];
char secret_key[41];
ret = zmq_curve_keypair(public_key, secret_key);
if (ret) {
if (zmq_errno() == ENOTSUP)
std::cout << "To use " << argv[0] << ", please install libsodium and then rebuild libzmq." << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "# Copy these lines to your 'zeromq' node-configuration" << std::endl;
std::cout << "curve = {" << std::endl;
std::cout << "\tpublic_key = \"" << public_key << "\";" << std::endl;
std::cout << "\tsecret_key = \"" << secret_key << "\";" << std::endl;
std::cout << "}" << std::endl;
return 0;
}
};
} // namespace tools
} // namespace node
} // namespace villas
int main(int argc, char *argv[])
{
int ret;
char public_key[41];
char secret_key[41];
auto t = villas::node::tools::ZmqKeygen(argc, argv);
ret = zmq_curve_keypair(public_key, secret_key);
if (ret) {
if (zmq_errno() == ENOTSUP)
std::cout << "To use " << argv[0] << ", please install libsodium and then rebuild libzmq." << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "# Copy these lines to your 'zeromq' node-configuration" << std::endl;
std::cout << "curve = {" << std::endl;
std::cout << "\tpublic_key = \"" << public_key << "\";" << std::endl;
std::cout << "\tsecret_key = \"" << secret_key << "\";" << std::endl;
std::cout << "}" << std::endl;
return 0;
return t.run();
}

View file

@ -20,13 +20,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
###################################################################################
if(WITH_CONFIG)
add_executable(conf2json conf2json.cpp)
target_link_libraries(conf2json PUBLIC villas)
list(APPEND TOOLS conf2json)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL Linux)
add_executable(rmshm rmshm.cpp)
target_link_libraries(rmshm PUBLIC Threads::Threads rt)
@ -37,14 +30,6 @@ if(CMAKE_SYSTEM_NAME STREQUAL Linux)
list(APPEND TOOLS rmsem rmshm)
endif()
if(LIBZMQ_FOUND)
add_executable(zmq-keygen zmq-keygen.cpp)
target_include_directories(zmq-keygen PUBLIC ${LIBZMQ_INCLUDE_DIRS})
target_link_libraries(zmq-keygen PUBLIC PkgConfig::LIBZMQ)
list(APPEND TOOLS zmq-keygen)
endif()
install(
TARGETS ${TOOLS}
COMPONENT tools