1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

cpp: ported SuperNode

This commit is contained in:
Steffen Vogel 2018-07-03 21:51:48 +02:00
parent 640acafbce
commit 987a59a5aa
33 changed files with 292 additions and 328 deletions

View file

@ -37,8 +37,6 @@ extern "C" {
#endif
/* Forward declarations */
struct super_node;
struct api;
struct api_action;
@ -71,7 +69,7 @@ struct api_action {
*
* Save references to list of paths / nodes for command execution.
*/
int api_init(struct api *a, struct super_node *sn);
int api_init(struct api *a);//, struct super_node *sn); // @todo: port to C++
int api_destroy(struct api *a);

View file

@ -22,6 +22,7 @@
#pragma once
#include <libconfig.h>
#include <jansson.h>
#ifdef __cplusplus
@ -32,6 +33,11 @@ extern "C" {
size_t json_dumpb(const json_t *json, char *buffer, size_t size, size_t flags);
#endif
#if (LIBCONFIG_VER_MAJOR <= 1) && (LIBCONFIG_VER_MINOR < 5)
#define config_setting_lookup config_lookup_from
#endif
#ifdef __MACH__
#include <libkern/OSByteOrder.h>

View file

@ -38,7 +38,6 @@ extern "C" {
/* Forward declarations */
struct node;
struct super_node;
struct sample;
enum node_type_flags {
@ -66,7 +65,7 @@ struct node_type {
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int (*start)(struct super_node *sn);
int (*start)();
/** Global de-initialization per node type.
*
@ -79,12 +78,6 @@ struct node_type {
int (*stop)();
} type;
/** Allocate memory for an instance of this type.
*
* @return A pointer to the node-type specific private data.
*/
void * (*create)();
/** Initialize a new node instance.
*
* @retval 0 Success. Everything went well.
@ -229,7 +222,7 @@ struct node_type {
*
* @see node_type::init
*/
int node_type_start(struct node_type *vt, struct super_node *sn);
int node_type_start(struct node_type *vt);
/** De-initialize node type subsystems.
*

View file

@ -101,7 +101,7 @@ struct iec61850_receiver {
};
/** @see node_type::type_start */
int iec61850_type_start(struct super_node *sn);
int iec61850_type_start();
/** @see node_type::type_stop */
int iec61850_type_stop();

View file

@ -81,9 +81,6 @@ struct iec61850_sv {
} out;
};
/** @see node_type::type_start */
int iec61850_sv_type_start(struct super_node *sn);
/** @see node_type::type_stop */
int iec61850_sv_type_stop();

View file

@ -127,12 +127,6 @@ int ib_destroy(struct node *n);
/** @see node_type::close */
int ib_stop(struct node *n);
/** @see node_type::type_start */
int ib_type_start(struct super_node *n);
/** @see node_type::type_stop */
int ib_type_stop();
/** @see node_type::read */
int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);

View file

@ -39,7 +39,6 @@
#include <jansson.h>
#include <villas/list.h>
#include <villas/super_node.h>
#include <villas/node.h>
#include <villas/task.h>
@ -47,6 +46,7 @@
extern "C" {
#endif
/* Forward declarations */
struct node;
struct ngsi {
@ -72,7 +72,7 @@ struct ngsi {
*
* @see node_type::type_start
*/
int ngsi_type_start(struct super_node *sn);
int ngsi_type_start();
/** Free global NGSI settings and unmaps shared memory regions.
*

View file

@ -60,7 +60,7 @@ struct opal {
*
* @see node_type::type_start
*/
int opal_type_start(struct super_node *sn);
int opal_type_start();
/** Free global OPAL settings and unmaps shared memory regions.
*

View file

@ -113,7 +113,7 @@ struct socket {
/** @see node_vtable::type_start */
int socket_type_start(struct super_node *sn);
int socket_type_start();
/** @see node_type::type_stop */
int socket_type_stop();

View file

@ -40,7 +40,6 @@ extern "C" {
/* Forward declarations */
struct node;
struct sample;
struct super_node;
struct stats_node {
double rate;
@ -52,7 +51,7 @@ struct stats_node {
};
/** @see node_type::print */
int stats_node_type_start(struct super_node *sn);
int stats_node_type_start();
/** @see node_type::print */
char *stats_node_print(struct node *n);

View file

@ -95,7 +95,7 @@ struct websocket_destination {
int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
/** @see node_type::type_start */
int websocket_type_start(struct super_node *sn);
int websocket_type_start(); /// @todo: Port to C++
/** @see node_type::type_stop */
int websocket_type_stop();

View file

@ -48,7 +48,6 @@ extern "C" {
/* Forward declarations */
struct stats;
struct node;
struct super_node;
struct path_source {
struct node *node;

View file

@ -27,14 +27,16 @@
#include <villas/api.h>
#include <villas/web.h>
#include <villas/log.h>
#include <villas/node.h>
#include <villas/common.h>
#ifdef __cplusplus
extern "C" {
#endif
namespace villas {
namespace node {
/** Global configuration */
struct super_node {
class SuperNode {
protected:
enum state state;
int priority; /**< Process priority (lower is better) */
@ -54,43 +56,47 @@ struct super_node {
char *uri; /**< URI of configuration */
json_t *cfg; /**< JSON representation of the configuration. */
json_t *json; /**< JSON representation of the configuration. */
public:
/** Inititalize configuration object before parsing the configuration. */
SuperNode();
int init();
/** Wrapper for super_node_parse() */
int parseUri(const char *uri);
/** Parse super-node configuration.
*
* @param cfg A libjansson object which contains the configuration.
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int parseJson(json_t *cfg);
/** Check validity of super node configuration. */
int check();
/** Initialize after parsing the configuration file. */
int start();
int stop();
void run();
/** Run periodic hooks of this super node. */
int periodic();
struct node * getNode(const char *name) { return (struct node *) list_lookup(&nodes, name); }
struct list * getNodes() { return &nodes; }
struct list * getPaths() { return &paths; }
struct web * getWeb() { return &web; }
struct api * getApi() { return &api; }
struct log * getLog() { return &log; }
/** Desctroy configuration object. */
~SuperNode();
};
/* Compatibility with libconfig < 1.5 */
#if (LIBCONFIG_VER_MAJOR <= 1) && (LIBCONFIG_VER_MINOR < 5)
#define config_setting_lookup config_lookup_from
#endif
/** Inititalize configuration object before parsing the configuration. */
int super_node_init(struct super_node *sn);
/** Wrapper for super_node_parse() */
int super_node_parse_uri(struct super_node *sn, const char *uri);
/** Parse super-node configuration.
*
* @param sn The super-node datastructure to fill.
* @param cfg A libconfig setting object.
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int super_node_parse_json(struct super_node *sn, json_t *cfg);
/** Check validity of super node configuration. */
int super_node_check(struct super_node *sn);
/** Initialize after parsing the configuration file. */
int super_node_start(struct super_node *sn);
int super_node_stop(struct super_node *sn);
/** Desctroy configuration object. */
int super_node_destroy(struct super_node *sn);
/** Run periodic hooks of this super node. */
int super_node_periodic(struct super_node *sn);
#ifdef __cplusplus
}
#endif
} // node
} // villas

View file

@ -25,6 +25,8 @@
#include <pthread.h>
#include <jansson.h>
#include <villas/common.h>
#include <villas/queue.h>

View file

@ -38,6 +38,7 @@ set(LIBRARIES
)
set(LIB_SRC
super_node.cpp
kernel/kernel.c
kernel/rt.c
memory/heap.c
@ -49,7 +50,6 @@ set(LIB_SRC
log.c
log_config.c
utils.c
super_node.c
hist.c
timing.c
pool.c

View file

@ -262,7 +262,7 @@ try_to_reuse:
return 0;
}
int api_init(struct api *a, struct super_node *sn)
int api_init(struct api *a)//, struct super_node *sn) // @todo: port to C++
{
int ret;
@ -276,7 +276,7 @@ int api_init(struct api *a, struct super_node *sn)
if (ret)
return ret;
a->super_node = sn;
a->super_node = NULL; //sn; // @todo: port to C++
a->state = STATE_INITIALIZED;
return 0;

View file

@ -24,13 +24,14 @@
set(API_SRC
session.c
actions/capabiltities.c
actions/config.c
actions/nodes.c
actions/paths.c
actions/restart.c
actions/shutdown.c
actions/status.c
actions/node.c
# @todo: port to C++
# actions/node.c
# actions/config.c
# actions/nodes.c
# actions/paths.c
# actions/restart.c
)
add_library(api STATIC ${API_SRC})

View file

@ -26,7 +26,6 @@
#include <villas/config.h>
#include <villas/utils.h>
#include <villas/super_node.h>
#include <villas/kernel/kernel.h>
#include <villas/kernel/rt.h>

View file

@ -24,12 +24,11 @@
#include <villas/sample.h>
#include <villas/node.h>
#include <villas/super_node.h>
#include <villas/utils.h>
#include <villas/config.h>
#include <villas/plugin.h>
int node_type_start(struct node_type *vt, struct super_node *sn)
int node_type_start(struct node_type *vt)
{
int ret;
@ -37,10 +36,8 @@ int node_type_start(struct node_type *vt, struct super_node *sn)
return 0;
info("Initializing " CLR_YEL("%s") " node type which is used by %zu nodes", node_type_name(vt), list_length(&vt->instances));
{
ret = vt->type.start ? vt->type.start(sn) : 0;
}
ret = vt->type.start ? vt->type.start() : 0; // @todo: port to C++
if (ret == 0)
vt->state = STATE_STARTED;
@ -55,10 +52,8 @@ int node_type_stop(struct node_type *vt)
return 0;
info("De-initializing " CLR_YEL("%s") " node type", node_type_name(vt));
{
ret = vt->type.stop ? vt->type.stop() : 0;
}
ret = vt->type.stop ? vt->type.stop() : 0;
if (ret == 0)
vt->state = STATE_DESTROYED;

View file

@ -22,8 +22,8 @@
set(NODE_SRC
influxdb.c
stats.c
signal_generator.c
# stats.c
signal_generator.c
loopback.c
)

View file

@ -157,7 +157,7 @@ int iec61850_parse_signals(json_t *json_signals, struct list *signals, struct li
return total_size;
}
int iec61850_type_start(struct super_node *sn)
int iec61850_type_start()
{
int ret;

View file

@ -389,7 +389,7 @@ out: json_decref(request);
return ret;
}
int ngsi_type_start(struct super_node *sn)
int ngsi_type_start()
{
return curl_global_init(CURL_GLOBAL_ALL);
}

View file

@ -48,12 +48,13 @@ int opal_register_region(int argc, char *argv[])
print_shmem_name = argv[3];
}
int opal_init(struct super_node *sn)
int opal_type_start() /// @todo: Port to C++
{
int err;
if (sn->cli.argc != 4)
return -1;
/// @todo: Port to C++
//if (sn->cli.argc != 4)
// return -1;
pthread_mutex_init(&lock, NULL);
@ -98,7 +99,7 @@ int opal_init(struct super_node *sn)
return 0;
}
int opal_deinit()
int opal_type_stop()
{
int err;

View file

@ -54,7 +54,7 @@ static struct plugin p;
/* Private static storage */
struct list interfaces = { .state = STATE_DESTROYED };
int socket_type_start(struct super_node *sn)
int socket_type_start()
{
#ifdef WITH_NETEM
int ret;

View file

@ -89,14 +89,11 @@ static void stats_init_signals(struct node *n)
}
}
int stats_node_type_start(struct super_node *sn)
int stats_node_type_start() /// @todo: Port to C++
{
if (!sn)
return -1;
nodes = NULL;
nodes = &sn->nodes;
return 0;
return -1;
}
int stats_node_start(struct node *n)

View file

@ -28,7 +28,7 @@
#include <libwebsockets.h>
#include <villas/super_node.h>
#include <villas/web.h>
#include <villas/timing.h>
#include <villas/utils.h>
#include <villas/buffer.h>
@ -369,11 +369,13 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
return 0;
}
int websocket_type_start(struct super_node *sn)
int websocket_type_start() /// @todo: Port to C++
{
list_init(&connections);
web = &sn->web;
web = NULL; /// @todo: Port to C++ &sn->web;
return -1;
if (web->state != STATE_STARTED)
return -1;

View file

@ -240,7 +240,7 @@ char * zeromq_print(struct node *n)
return buf;
}
int zeromq_type_start(struct super_node *sn)
int zeromq_type_start() /// @todo: Port to C++
{
context = zmq_ctx_new();

View file

@ -43,83 +43,86 @@
#include <villas/kernel/rt.h>
int super_node_init(struct super_node *sn)
using namespace villas::node;
SuperNode::SuperNode() :
state(STATE_INITIALIZED),
priority(0),
affinity(0),
hugepages(DEFAULT_NR_HUGEPAGES),
stats(0)
{
list_init(&nodes);
list_init(&paths);
list_init(&plugins);
name = (char *) alloc(128);
gethostname(name, 128);
init();
}
int SuperNode::init()
{
int ret;
assert(sn->state == STATE_DESTROYED);
ret = log_init(&log, 2, LOG_ALL);
if (ret)
return ret;
ret = log_init(&sn->log, 2, LOG_ALL);
ret = rt_init(priority, affinity);
if (ret)
return ret;
ret = memory_init(hugepages);
if (ret)
return ret;
#ifdef WITH_API
ret = api_init(&sn->api, sn);
ret = api_init(&api);//, this); // @todo: port to C++
if (ret)
return ret;
#endif /* WITH_API */
#ifdef WITH_WEB
ret = web_init(&sn->web, &sn->api);
ret = web_init(&web, &api);
if (ret)
return ret;
#endif /* WITH_WEB */
ret = list_init(&sn->nodes);
if (ret)
return ret;
ret = list_init(&sn->paths);
if (ret)
return ret;
ret = list_init(&sn->plugins);
if (ret)
return ret;
/* Default values */
sn->affinity = 0;
sn->priority = 0;
sn->stats = 0;
sn->hugepages = DEFAULT_NR_HUGEPAGES;
sn->name = alloc(128); /** @todo missing free */
gethostname(sn->name, 128);
sn->state = STATE_INITIALIZED;
return 0;
}
int super_node_parse_uri(struct super_node *sn, const char *uri)
int SuperNode::parseUri(const char *u)
{
json_error_t err;
info("Parsing configuration");
if (uri) {
if (u) {
FILE *f;
AFILE *af;
/* Via stdin */
if (!strcmp("-", uri)) {
if (!strcmp("-", u)) {
info("Reading configuration from stdin");
af = NULL;
f = stdin;
}
else {
info("Reading configuration from URI: %s", uri);
info("Reading configuration from URI: %s", u);
af = afopen(uri, "r");
af = afopen(u, "r");
if (!af)
error("Failed to open configuration from: %s", uri);
error("Failed to open configuration from: %s", u);
f = af->file;
}
/* Parse config */
sn->cfg = json_loadf(f, 0, &err);
if (sn->cfg == NULL) {
json = json_loadf(f, 0, &err);
if (json == NULL) {
#ifdef LIBCONFIG_FOUND
int ret;
@ -160,8 +163,8 @@ int super_node_parse_uri(struct super_node *sn, const char *uri)
json_root = config_root_setting(&cfg);
sn->cfg = config_to_json(json_root);
if (sn->cfg == NULL)
json = config_to_json(json_root);
if (json == NULL)
error("Failed to convert JSON to configuration file");
config_destroy(&cfg);
@ -176,9 +179,9 @@ int super_node_parse_uri(struct super_node *sn, const char *uri)
else if (f != stdin)
fclose(f);
sn->uri = strdup(uri);
uri = strdup(u);
return super_node_parse_json(sn, sn->cfg);
return parseJson(json);
}
else {
warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance.");
@ -187,13 +190,12 @@ int super_node_parse_uri(struct super_node *sn, const char *uri)
return 0;
}
int super_node_parse_json(struct super_node *sn, json_t *cfg)
int SuperNode::parseJson(json_t *j)
{
int ret;
const char *name = NULL;
const char *nme = NULL;
assert(sn->state != STATE_STARTED);
assert(sn->state != STATE_DESTROYED);
assert(state != STATE_STARTED);
json_t *json_nodes = NULL;
json_t *json_paths = NULL;
@ -203,33 +205,33 @@ int super_node_parse_json(struct super_node *sn, json_t *cfg)
json_error_t err;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: o, s?: o, s?: o, s?: i, s?: i, s?: i, s?: F, s?: s }",
ret = json_unpack_ex(j, &err, 0, "{ s?: o, s?: o, s?: o, s?: o, s?: o, s?: i, s?: i, s?: i, s?: F, s?: s }",
"http", &json_web,
"logging", &json_logging,
"plugins", &json_plugins,
"nodes", &json_nodes,
"paths", &json_paths,
"hugepages", &sn->hugepages,
"affinity", &sn->affinity,
"priority", &sn->priority,
"stats", &sn->stats,
"name", &name
"hugepages", &hugepages,
"affinity", &affinity,
"priority", &priority,
"stats", &stats,
"name", &nme
);
if (ret)
jerror(&err, "Failed to parse global configuration");
if (name) {
sn->name = realloc(sn->name, strlen(name)+1);
sprintf(sn->name, "%s", name);
if (nme) {
name = (char *) realloc(name, strlen(nme)+1);
sprintf(name, "%s", nme);
}
#ifdef WITH_WEB
if (json_web)
web_parse(&sn->web, json_web);
web_parse(&web, json_web);
#endif /* WITH_WEB */
if (json_logging)
log_parse(&sn->log, json_logging);
log_parse(&log, json_logging);
/* Parse plugins */
if (json_plugins) {
@ -249,7 +251,7 @@ int super_node_parse_json(struct super_node *sn, json_t *cfg)
if (ret)
error("Failed to parse plugin");
list_push(&sn->plugins, p);
list_push(&plugins, p);
}
}
@ -282,7 +284,7 @@ int super_node_parse_json(struct super_node *sn, json_t *cfg)
if (ret)
error("Failed to parse node");
list_push(&sn->nodes, n);
list_push(&nodes, n);
}
}
@ -300,11 +302,11 @@ int super_node_parse_json(struct super_node *sn, json_t *cfg)
if (ret)
error("Failed to initialize path");
ret = path_parse(p, json_path, &sn->nodes);
ret = path_parse(p, json_path, &nodes);
if (ret)
error("Failed to parse path");
list_push(&sn->paths, p);
list_push(&paths, p);
if (p->reverse) {
struct path *r = (struct path *) alloc(sizeof(struct path));
@ -317,78 +319,80 @@ int super_node_parse_json(struct super_node *sn, json_t *cfg)
if (ret)
error("Failed to reverse path %s", path_name(p));
list_push(&sn->paths, r);
list_push(&paths, r);
}
}
}
sn->state = STATE_PARSED;
json = j;
state = STATE_PARSED;
return 0;
}
int super_node_check(struct super_node *sn)
int SuperNode::check()
{
int ret;
assert(sn->state != STATE_DESTROYED);
assert(state == STATE_PARSED || state == STATE_PARSED || state == STATE_CHECKED);
for (size_t i = 0; i < list_length(&sn->nodes); i++) {
struct node *n = (struct node *) list_at(&sn->nodes, i);
for (size_t i = 0; i < list_length(&nodes); i++) {
struct node *n = (struct node *) list_at(&nodes, i);
ret = node_check(n);
if (ret)
error("Invalid configuration for node %s", node_name(n));
}
for (size_t i = 0; i < list_length(&sn->paths); i++) {
struct path *p = (struct path *) list_at(&sn->paths, i);
for (size_t i = 0; i < list_length(&paths); i++) {
struct path *p = (struct path *) list_at(&paths, i);
ret = path_check(p);
if (ret)
error("Invalid configuration for path %s", path_name(p));
}
sn->state = STATE_CHECKED;
state = STATE_CHECKED;
return 0;
}
int super_node_start(struct super_node *sn)
int SuperNode::start()
{
int ret;
assert(sn->state == STATE_CHECKED);
assert(state == STATE_CHECKED);
memory_init(sn->hugepages);
rt_init(sn->priority, sn->affinity);
memory_init(hugepages);
rt_init(priority, affinity);
log_open(&sn->log);
log_open(&log);
#ifdef WITH_API
api_start(&sn->api);
api_start(&api);
#endif
#ifdef WITH_WEB
web_start(&sn->web);
web_start(&web);
#endif
info("Starting node-types");
for (size_t i = 0; i < list_length(&sn->nodes); i++) {
struct node *n = (struct node *) list_at(&sn->nodes, i);
for (size_t i = 0; i < list_length(&nodes); i++) {
struct node *n = (struct node *) list_at(&nodes, i);
ret = node_type_start(n->_vt, sn);
ret = node_type_start(n->_vt);//, this); // @todo: port to C++
if (ret)
error("Failed to start node-type: %s", node_type_name(n->_vt));
}
info("Starting nodes");
for (size_t i = 0; i < list_length(&sn->nodes); i++) {
struct node *n = (struct node *) list_at(&sn->nodes, i);
for (size_t i = 0; i < list_length(&nodes); i++) {
struct node *n = (struct node *) list_at(&nodes, i);
ret = node_init2(n);
if (ret)
error("Failed to prepare node: %s", node_name(n));
int refs = list_count(&sn->paths, (cmp_cb_t) path_uses_node, n);
int refs = list_count(&paths, (cmp_cb_t) path_uses_node, n);
if (refs > 0) {
ret = node_start(n);
if (ret)
@ -399,8 +403,8 @@ int super_node_start(struct super_node *sn)
}
info("Starting paths");
for (size_t i = 0; i < list_length(&sn->paths); i++) {
struct path *p = (struct path *) list_at(&sn->paths, i);
for (size_t i = 0; i < list_length(&paths); i++) {
struct path *p = (struct path *) list_at(&paths, i);
if (p->enabled) {
ret = path_init2(p);
@ -415,18 +419,21 @@ int super_node_start(struct super_node *sn)
warn("Path %s is disabled. Skipping...", path_name(p));
}
sn->state = STATE_STARTED;
state = STATE_STARTED;
return 0;
}
int super_node_stop(struct super_node *sn)
int SuperNode::stop()
{
int ret;
if (stats > 0)
stats_print_footer(STATS_FORMAT_HUMAN);
info("Stopping paths");
for (size_t i = 0; i < list_length(&sn->paths); i++) {
struct path *p = (struct path *) list_at(&sn->paths, i);
for (size_t i = 0; i < list_length(&paths); i++) {
struct path *p = (struct path *) list_at(&paths, i);
ret = path_stop(p);
if (ret)
@ -434,8 +441,8 @@ int super_node_stop(struct super_node *sn)
}
info("Stopping nodes");
for (size_t i = 0; i < list_length(&sn->nodes); i++) {
struct node *n = (struct node *) list_at(&sn->nodes, i);
for (size_t i = 0; i < list_length(&nodes); i++) {
struct node *n = (struct node *) list_at(&nodes, i);
ret = node_stop(n);
if (ret)
@ -454,51 +461,73 @@ int super_node_stop(struct super_node *sn)
}
#ifdef WITH_API
api_stop(&sn->api);
api_stop(&api);
#endif
#ifdef WITH_WEB
web_stop(&sn->web);
web_stop(&web);
#endif
log_close(&sn->log);
log_close(&log);
sn->state = STATE_STOPPED;
state = STATE_STOPPED;
return 0;
}
int super_node_destroy(struct super_node *sn)
{
assert(sn->state != STATE_DESTROYED);
list_destroy(&sn->plugins, (dtor_cb_t) plugin_destroy, false);
list_destroy(&sn->paths, (dtor_cb_t) path_destroy, true);
list_destroy(&sn->nodes, (dtor_cb_t) node_destroy, true);
#ifdef WITH_WEB
web_destroy(&sn->web);
#endif /* WITH_WEB */
#ifdef WITH_API
api_destroy(&sn->api);
#endif /* WITH_API */
json_decref(sn->cfg);
log_destroy(&sn->log);
if (sn->name)
free(sn->name);
sn->state = STATE_DESTROYED;
return 0;
}
int super_node_periodic(struct super_node *sn)
void SuperNode::run()
{
#ifdef WITH_HOOKS
int ret;
for (size_t i = 0; i < list_length(&sn->paths); i++) {
struct path *p = (struct path *) list_at(&sn->paths, i);
if (stats > 0) {
stats_print_header(STATS_FORMAT_HUMAN);
struct task t;
ret = task_init(&t, 1.0 / stats, CLOCK_REALTIME);
if (ret)
error("Failed to create stats timer");
for (;;) {
task_wait(&t);
periodic();
}
}
else
#endif /* WITH_HOOKS */
for (;;) pause();
}
SuperNode::~SuperNode()
{
assert(state == STATE_STOPPED);
list_destroy(&plugins, (dtor_cb_t) plugin_destroy, false);
list_destroy(&paths, (dtor_cb_t) path_destroy, true);
list_destroy(&nodes, (dtor_cb_t) node_destroy, true);
#ifdef WITH_WEB
web_destroy(&web);
#endif /* WITH_WEB */
#ifdef WITH_API
api_destroy(&api);
#endif /* WITH_API */
json_decref(json);
log_destroy(&log);
if (name)
free(name);
}
int SuperNode::periodic()
{
#ifdef WITH_HOOKS
int ret;
for (size_t i = 0; i < list_length(&paths); i++) {
struct path *p = (struct path *) list_at(&paths, i);
if (p->state != STATE_STARTED)
continue;
@ -512,8 +541,8 @@ int super_node_periodic(struct super_node *sn)
}
}
for (size_t i = 0; i < list_length(&sn->nodes); i++) {
struct node *n = (struct node *) list_at(&sn->nodes, i);
for (size_t i = 0; i < list_length(&nodes); i++) {
struct node *n = (struct node *) list_at(&nodes, i);
if (n->state != STATE_STARTED)
continue;

View file

@ -37,7 +37,7 @@ lws_callback_function api_http_protocol_cb;
lws_callback_function websocket_protocol_cb;
/** List of libwebsockets protocols. */
struct lws_protocols protocols[] = {
struct lws_protocols protocols[] = {
{
.name = "http",
.callback = lws_callback_http_dummy,

View file

@ -45,23 +45,18 @@
#include <villas/nodes/opal.h>
#endif
struct super_node sn;
using namespace villas::node;
SuperNode sn;
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
int ret;
if (sn.stats > 0)
stats_print_footer(STATS_FORMAT_HUMAN);
ret = super_node_stop(&sn);
ret = sn.stop();
if (ret)
error("Failed to stop super node");
ret = super_node_destroy(&sn);
if (ret)
error("Failed to destroy super node");
info(CLR_GRN("Goodbye!"));
exit(EXIT_SUCCESS);
}
@ -149,40 +144,23 @@ int main(int argc, char *argv[])
if (ret)
error("Failed to initialize signal subsystem");
ret = super_node_init(&sn);
if (ret)
error("Failed to initialize super node");
ret = super_node_parse_uri(&sn, uri);
ret = sn.parseUri(uri);
if (ret)
error("Failed to parse command line arguments");
ret = super_node_check(&sn);
ret = sn.init();
if (ret)
error("Failed to initialize super node");
ret = sn.check();
if (ret)
error("Failed to verify configuration");
ret = super_node_start(&sn);
ret = sn.start();
if (ret)
error("Failed to start super node");
#ifdef WITH_HOOKS
if (sn.stats > 0) {
stats_print_header(STATS_FORMAT_HUMAN);
struct task t;
ret = task_init(&t, 1.0 / sn.stats, CLOCK_REALTIME);
if (ret)
error("Failed to create stats timer");
for (;;) {
task_wait(&t);
super_node_periodic(&sn);
}
}
else
#endif /* WITH_HOOKS */
for (;;) pause();
sn.run();
return 0;
}

View file

@ -45,7 +45,9 @@
#include <villas/nodes/websocket.h>
static struct super_node sn = { .state = STATE_DESTROYED }; /**< The global configuration */
using namespace villas::node;
static SuperNode sn; /**< The global configuration */
static struct io io = { .state = STATE_DESTROYED };
static struct dir {
@ -74,7 +76,7 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
pthread_join(sendd.thread, NULL);
}
ret = super_node_stop(&sn);
ret = sn.stop();
if (ret)
error("Failed to stop super node");
@ -90,10 +92,6 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
error("Failed to destroy pool");
}
ret = super_node_destroy(&sn);
if (ret)
error("Failed to destroy super node");
ret = io_close(&io);
if (ret)
error("Failed to close IO");
@ -113,7 +111,6 @@ static void usage()
std::cout << " NODE the name of the node to which samples are sent and received from" << std::endl;
std::cout << " OPTIONS are:" << std::endl;
std::cout << " -f FMT set the format" << std::endl;
std::cout << " -d LVL set debug log level to LVL" << std::endl;
std::cout << " -o OPTION=VALUE overwrite options in config file" << std::endl;
std::cout << " -x swap read / write endpoints" << std::endl;
std::cout << " -s only read data from stdin and send it to node" << std::endl;
@ -243,7 +240,7 @@ leave: info("Reached receive limit. Terminating...");
int main(int argc, char *argv[])
{
int ret, level = 2, timeout = 0;
int ret, timeout = 0;
bool reverse = false;
const char *format = "villas.human";
@ -274,9 +271,6 @@ int main(int argc, char *argv[])
case 'r':
sendd.enabled = false; // receive only
break;
case 'd':
level = strtoul(optarg, &endptr, 10);
goto check;
case 'l':
recvv.limit = strtoul(optarg, &endptr, 10);
goto check;
@ -312,34 +306,22 @@ check: if (optarg == endptr)
char *nodestr = argv[optind+1];
struct format_type *fmt;
ret = log_init(&sn.log, level, LOG_ALL);
if (ret)
error("Failed to initialize log");
ret = signals_init(quit);
if (ret)
error("Failed to initialize signals");
ret = super_node_init(&sn);
if (ret)
error("Failed to initialize super-node");
ret = super_node_parse_uri(&sn, configfile);
ret = sn.parseUri(configfile);
if (ret)
error("Failed to parse configuration");
ret = log_open(&sn.log);
ret = sn.init();
if (ret)
error("Failed to initialize super-node");
ret = log_open(sn.getLog());
if (ret)
error("Failed to start log");
ret = memory_init(sn.hugepages);
if (ret)
error("Failed to initialize memory");
ret = rt_init(sn.priority, sn.affinity);
if (ret)
error("Failed to initalize real-time");
fmt = format_type_lookup(format);
if (!fmt)
error("Invalid format: %s", format);
@ -356,18 +338,18 @@ check: if (optarg == endptr)
if (ret)
error("Failed to open IO");
node = (struct node *) list_lookup(&sn.nodes, nodestr);
node = sn.getNode(nodestr);
if (!node)
error("Node %s does not exist!", nodestr);
#ifdef LIBWEBSOCKETS_FOUND
/* Only start web subsystem if villas-pipe is used with a websocket node */
if (node->_vt->start == websocket_start) {
ret = web_start(&sn.web);
if (node_type(node)->start == websocket_start) {
ret = web_start(sn.getWeb());
if (ret)
error("Failed to start web subsystem");
ret = api_start(&sn.api);
ret = api_start(sn.getApi());
if (ret)
error("Failed to start API subsystem");
}
@ -376,7 +358,7 @@ check: if (optarg == endptr)
if (reverse)
node_reverse(node);
ret = node_type_start(node->_vt, &sn);
ret = node_type_start(node->_vt);//, &sn); // @todo: port to C++
if (ret)
error("Failed to intialize node type %s: reason=%d", node_type_name(node->_vt), ret);

View file

@ -236,7 +236,7 @@ int main(int argc, char *argv[])
}
// nt == n._vt
ret = node_type_start(nt, NULL);
ret = node_type_start(nt); /// @todo: Port to C++
if (ret)
error("Failed to initialize node type: %s", node_type_name(nt));

View file

@ -38,7 +38,9 @@
#include <villas/pool.h>
#include <villas/kernel/rt.h>
struct super_node sn; /** <The global configuration */
using namespace villas::node;
SuperNode sn; /** <The global configuration */
static struct node *node;
@ -115,7 +117,7 @@ int main(int argc, char *argv[])
continue;
check: if (optarg == endptr)
error("Failed to parse parse option argument '-%c %s'", c, optarg);
error("Failed to parse parse option argument '-%c %s'", c, optarg);
}
if (argc != optind + 2) {
@ -128,37 +130,25 @@ check: if (optarg == endptr)
ret = signals_init(quit);
if (ret)
error("Failed to initialize signals");
error("Failed to initialize signals subsystem");
ret = log_init(&sn.log, 2, LOG_ALL);
if (ret)
return ret;
ret = super_node_init(&sn);
if (ret)
error("Failed to initialize super-node");
ret = super_node_parse_uri(&sn, configfile);
ret = sn.parseUri(configfile);
if (ret)
error("Failed to parse configuration");
ret = log_open(&sn.log);
ret = sn.init();
if (ret)
error("Failed to open log file");
error("Initialization failed!");
ret = rt_init(sn.priority, sn.affinity);
ret = log_open(sn.getLog());
if (ret)
return ret;
error("Failed to open log");
ret = memory_init(sn.hugepages);
if (ret)
error("Failed to stop node-type %s: reason=%d", node_type_name(node->_vt), ret);
node = (struct node *) list_lookup(&sn.nodes, nodestr);
node = sn.getNode(nodestr);
if (!node)
error("There's no node with the name '%s'", nodestr);
ret = node_type_start(node->_vt, &sn);
ret = node_type_start(node->_vt);//, &sn); // @todo: port to C++
if (ret)
error("Failed to start node-type %s: reason=%d", node_type_name(node->_vt), ret);
@ -180,10 +170,6 @@ check: if (optarg == endptr)
if (ret)
error("Failed to stop node-type %s: reason=%d", node_type_name(node->_vt), ret);
ret = super_node_destroy(&sn);
if (ret)
error("Failed to destroy super-node");
return 0;
}