1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

refactoring: struct cfg => struct super_node

This commit is contained in:
Steffen Vogel 2017-03-12 17:01:24 -03:00
parent 595604b6db
commit ccf2f28b9f
21 changed files with 542 additions and 484 deletions

View file

@ -15,7 +15,7 @@
/* Forward declarations */
enum lws_callback_reasons;
struct lws;
struct cfg;
struct super_node;
struct api;
struct api_ressource;
@ -46,7 +46,7 @@ struct api {
enum state state;
struct cfg *cfg;
struct super_node *super_node;
};
struct api_buffer {
@ -88,7 +88,7 @@ struct api_ressource {
*
* Save references to list of paths / nodes for command execution.
*/
int api_init(struct api *a, struct cfg *cfg);
int api_init(struct api *a, struct super_node *sn);
int api_destroy(struct api *a);

View file

@ -23,13 +23,13 @@
#include "queue.h"
#include "list.h"
#include "cfg.h"
#include "super_node.h"
/* Forward declarations */
struct path;
struct hook;
struct sample;
struct cfg;
struct super_node;
/** Optional parameters to hook callbacks */
struct hook_info {
@ -109,7 +109,7 @@ struct hook {
};
/** Save references to global nodes, paths and settings */
int hook_init(struct hook *h, struct cfg *cfg);
int hook_init(struct hook *h, struct super_node *sn);
int hook_destroy(struct hook *h);

View file

@ -27,7 +27,7 @@
#include "common.h"
/* Forward declarations */
struct cfg;
struct super_node;
struct path_source
{
@ -65,21 +65,18 @@ struct path
struct stats *stats; /**< Statistic counters. This is a pointer to the statistic hooks private data. */
struct super_node *super_node; /**< The super node this path belongs to. */
config_setting_t *cfg; /**< A pointer to the libconfig object which instantiated this path */
};
/** Initialize internal data structures. */
int path_init(struct path *p, struct cfg *cfg);
int path_init(struct path *p, struct super_node *sn);
int path_init2(struct path *p);
/** Check if path configuration is proper. */
int path_check(struct path *p);
/** Destroy path by freeing dynamically allocated memory.
*
* @param i A pointer to the path structure.
*/
int path_destroy(struct path *p);
/** Start a path.
*
* Start a new pthread for receiving/sending messages over this path.
@ -98,6 +95,12 @@ int path_start(struct path *p);
*/
int path_stop(struct path *p);
/** Destroy path by freeing dynamically allocated memory.
*
* @param i A pointer to the path structure.
*/
int path_destroy(struct path *p);
/** Show some basic statistics for a path.
*
* @param p A pointer to the path structure.

View file

@ -20,7 +20,7 @@
#include "common.h"
/** Global configuration */
struct cfg {
struct super_node {
int priority; /**< Process priority (lower is better) */
int affinity; /**< Process affinity of the server and all created threads */
int hugepages; /**< Number of hugepages to reserve. */
@ -51,23 +51,30 @@ struct cfg {
#endif
/** Inititalize configuration object before parsing the configuration. */
int cfg_init(struct cfg *cfg);
int super_node_init(struct super_node *sn);
/** Initialize after parsing the configuration file. */
int cfg_start(struct cfg *cfg);
/** Wrapper for super_node_parse() */
int super_node_parse_cli(struct super_node *sn, int argc, char *argv[]);
int cfg_stop(struct cfg *cfg);
/** Wrapper for super_node_parse() */
int super_node_parse_uri(struct super_node *sn, const char *uri);
/** Desctroy configuration object. */
int cfg_destroy(struct cfg *cfg);
/** Parse config file and store settings in supplied struct cfg.
/** Parse super-node configuration.
*
* @param cfg A configuration object.
* @param filename The path to the configration file (relative or absolute)
* @param sn The super-node datastructure to fill.
* @param cfg A libconfig setting object.
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int cfg_parse(struct cfg *cfg, const char *uri);
int super_node_parse(struct super_node *sn, config_setting_t *cfg);
int cfg_parse_cli(struct cfg *cfg, int argc, char *argv[]);
/** Check validity of super node configuration. */
int super_node_check(struct super_node *sn);
/** Initialize after parsing the configuration file. */
int super_node_start(struct super_node *sn);
int super_node_stop(struct super_node *sn);
/** Desctroy configuration object. */
int super_node_destroy(struct super_node *sn);

View file

@ -92,7 +92,6 @@
#define BIT(nr) (1UL << (nr))
/* Forward declarations */
struct cfg;
struct timespec;
/** Print copyright message to screen. */

View file

@ -5,9 +5,9 @@ LIBS = $(BUILDDIR)/libvillas.so
LIB_SRCS = $(addprefix lib/nodes/, file.c cbuilder.c) \
$(addprefix lib/kernel/, kernel.c rt.c) \
$(addprefix lib/, sample.c path.c node.c hook.c \
log.c utils.c cfg.c hist.c timing.c pool.c list.c \
queue.c memory.c advio.c web.c api.c plugin.c \
node_type.c stats.c \
log.c utils.c super_node.c hist.c timing.c pool.c \
list.c queue.c memory.c advio.c web.c api.c \
plugin.c node_type.c stats.c \
)
LIB_CFLAGS = $(CFLAGS) -fPIC

View file

@ -231,13 +231,13 @@ int api_buffer_append(struct api_buffer *b, const char *in, size_t len)
return 0;
}
int api_init(struct api *a, struct cfg *cfg)
int api_init(struct api *a, struct super_node *sn)
{
info("Initialize API sub-system");
list_init(&a->sessions);
a->cfg = cfg;
a->super_node = sn;
a->state = STATE_INITIALIZED;
return 0;

View file

@ -14,7 +14,7 @@
static int api_config(struct api_ressource *h, json_t *args, json_t **resp, struct api_session *s)
{
config_setting_t *cfg_root = config_root_setting(&s->api->cfg->cfg);
config_setting_t *cfg_root = config_root_setting(&s->api->super_node->cfg);
*resp = config_to_json(cfg_root);

View file

@ -18,8 +18,8 @@ extern struct list nodes;
static int api_nodes(struct api_ressource *r, json_t *args, json_t **resp, struct api_session *s)
{
json_t *json_nodes = json_array();
list_foreach(struct node *n, &s->api->cfg->nodes) {
list_foreach(struct node *n, &s->api->super_node->nodes) {
json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }",
"name", node_name_short(n),
"state", n->state,

317
lib/cfg.c
View file

@ -1,317 +0,0 @@
/** Configuration parser.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
*********************************************************************************/
#include <stdlib.h>
#include <string.h>
#include <libgen.h>
#include <unistd.h>
#include "utils.h"
#include "list.h"
#include "cfg.h"
#include "node.h"
#include "path.h"
#include "hook.h"
#include "advio.h"
#include "web.h"
#include "log.h"
#include "api.h"
#include "plugin.h"
#include "node.h"
#include "memory.h"
#include "kernel/rt.h"
static int cfg_parse_global(struct cfg *cfg, config_setting_t *cfg_root)
{
if (!config_setting_is_group(cfg_root))
cerror(cfg_root, "Global section must be a dictionary.");
config_setting_lookup_int(cfg_root, "hugepages", &cfg->hugepages);
config_setting_lookup_int(cfg_root, "affinity", &cfg->affinity);
config_setting_lookup_int(cfg_root, "priority", &cfg->priority);
config_setting_lookup_float(cfg_root, "stats", &cfg->stats);
return 0;
}
int cfg_init(struct cfg *cfg)
{
config_init(&cfg->cfg);
log_init(&cfg->log, V, LOG_ALL);
api_init(&cfg->api, cfg);
web_init(&cfg->web, &cfg->api);
list_init(&cfg->nodes);
list_init(&cfg->paths);
list_init(&cfg->plugins);
/* Default values */
cfg->affinity = -1;
cfg->stats = 0;
cfg->priority = 50;
cfg->hugepages = DEFAULT_NR_HUGEPAGES;
cfg->state = STATE_INITIALIZED;
return 0;
}
int cfg_parse_cli(struct cfg *cfg, int argc, char *argv[])
{
cfg->cli.argc = argc;
cfg->cli.argv = argv;
char *uri = (argc == 2) ? argv[1] : NULL;
return cfg_parse(cfg, uri);
}
int cfg_parse(struct cfg *cfg, const char *uri)
{
config_setting_t *cfg_root, *cfg_nodes, *cfg_paths, *cfg_plugins, *cfg_logging, *cfg_web;
info("Parsing configuration: uri=%s", uri);
{ INDENT
int ret = CONFIG_FALSE;
if (uri) {
/* Setup libconfig */
config_set_auto_convert(&cfg->cfg, 1);
FILE *f;
AFILE *af;
/* Via stdin */
if (strcmp("-", uri) == 0) {
af = NULL;
f = stdin;
}
/* Local file? */
else if (access(uri, F_OK) != -1) {
/* Setup libconfig include path.
* This is only supported for local files */
char *uri_cpy = strdup(uri);
char *include_dir = dirname(uri_cpy);
config_set_include_dir(&cfg->cfg, include_dir);
free(uri_cpy);
af = NULL;
f = fopen(uri, "r");
}
/* Use advio (libcurl) to fetch the config from a remote */
else {
af = afopen(uri, "r", ADVIO_MEM);
f = af ? af->file : NULL;
}
/* Check if file could be loaded / opened */
if (!f)
error("Failed to open configuration from: %s", uri);
/* Parse config */
ret = config_read(&cfg->cfg, f);
if (ret != CONFIG_TRUE)
error("Failed to parse configuration: %s in %s:%d", config_error_text(&cfg->cfg), uri, config_error_line(&cfg->cfg));
/* Close configuration file */
if (af)
afclose(af);
else
fclose(f);
}
else {
warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance.");
cfg->web.port = 80;
cfg->web.htdocs = "/villas/web/socket/";
}
cfg_root = config_root_setting(&cfg->cfg);
if (cfg_root)
cfg_parse_global(cfg, cfg_root);
cfg_web = config_setting_get_member(cfg_root, "http");
if (cfg_web)
web_parse(&cfg->web, cfg_web);
/* Parse logging settings */
cfg_logging = config_setting_get_member(cfg_root, "logging");
if (cfg_logging)
log_parse(&cfg->log, cfg_logging);
/* Parse plugins */
cfg_plugins = config_setting_get_member(cfg_root, "plugins");
if (cfg_plugins) {
if (!config_setting_is_array(cfg_plugins))
cerror(cfg_plugins, "Setting 'plugins' must be a list of strings");
for (int i = 0; i < config_setting_length(cfg_plugins); i++) {
struct config_setting_t *cfg_plugin = config_setting_get_elem(cfg_plugins, i);
struct plugin plugin;
ret = plugin_parse(&plugin, cfg_plugin);
if (ret)
cerror(cfg_plugin, "Failed to parse plugin");
list_push(&cfg->plugins, memdup(&plugin, sizeof(plugin)));
}
}
/* Parse nodes */
cfg_nodes = config_setting_get_member(cfg_root, "nodes");
if (cfg_nodes) {
if (!config_setting_is_group(cfg_nodes))
warn("Setting 'nodes' must be a group with node name => group mappings.");
for (int i = 0; i < config_setting_length(cfg_nodes); i++) {
config_setting_t *cfg_node = config_setting_get_elem(cfg_nodes, i);
struct plugin *p;
const char *type;
/* Required settings */
if (!config_setting_lookup_string(cfg_node, "type", &type))
cerror(cfg_node, "Missing node type");
p = plugin_lookup(PLUGIN_TYPE_NODE, type);
if (!p)
cerror(cfg_node, "Invalid node type: %s", type);
struct node n;
ret = node_parse(&n, cfg_node);
if (ret)
cerror(cfg_node, "Failed to parse node");
list_push(&cfg->nodes, memdup(&n, sizeof(n)));
}
}
/* Parse paths */
cfg_paths = config_setting_get_member(cfg_root, "paths");
if (cfg_paths) {
if (!config_setting_is_list(cfg_paths))
warn("Setting 'paths' must be a list.");
for (int i = 0; i < config_setting_length(cfg_paths); i++) {
config_setting_t *cfg_path = config_setting_get_elem(cfg_paths, i);
struct path p;
ret = path_parse(&p, cfg_path, &cfg->nodes);
if (ret)
cerror(cfg_path, "Failed to parse path");
list_push(&cfg->paths, memdup(&p, sizeof(p)));
if (p.reverse) {
struct path r;
ret = path_reverse(&p, &r);
if (ret)
cerror(cfg_path, "Failed to reverse path %s", path_name(&p));
list_push(&cfg->paths, memdup(&r, sizeof(p)));
}
}
}
}
cfg->state = STATE_PARSED;
return 0;
}
int cfg_start(struct cfg *cfg)
{
memory_init(cfg->hugepages);
rt_init(cfg->priority, cfg->affinity);
api_start(&cfg->api);
web_start(&cfg->web);
info("Start node types");
list_foreach(struct node *n, &cfg->nodes) { INDENT
config_setting_t *cfg_root = config_root_setting(&cfg->cfg);
node_type_start(n->_vt, cfg->cli.argc, cfg->cli.argv, cfg_root);
}
info("Starting nodes");
list_foreach(struct node *n, &cfg->nodes) { INDENT
int refs = list_count(&cfg->paths, (cmp_cb_t) path_uses_node, n);
if (refs > 0)
node_start(n);
else
warn("No path is using the node %s. Skipping...", node_name(n));
}
info("Starting paths");
list_foreach(struct path *p, &cfg->paths) { INDENT
if (p->enabled) {
path_init(p, cfg);
path_start(p);
}
else
warn("Path %s is disabled. Skipping...", path_name(p));
}
cfg->state = STATE_STARTED;
return 0;
}
int cfg_stop(struct cfg *cfg)
{
info("Stopping paths");
list_foreach(struct path *p, &cfg->paths) { INDENT
path_stop(p);
}
info("Stopping nodes");
list_foreach(struct node *n, &cfg->nodes) { INDENT
node_stop(n);
}
info("De-initializing node types");
list_foreach(struct plugin *p, &plugins) { INDENT
if (p->type == PLUGIN_TYPE_NODE)
node_type_stop(&p->node);
}
web_stop(&cfg->web);
api_stop(&cfg->api);
log_stop(&cfg->log);
cfg->state = STATE_STOPPED;
return 0;
}
int cfg_destroy(struct cfg *cfg)
{
config_destroy(&cfg->cfg);
web_destroy(&cfg->web);
log_destroy(&cfg->log);
api_destroy(&cfg->api);
list_destroy(&cfg->plugins, (dtor_cb_t) plugin_destroy, false);
list_destroy(&cfg->paths, (dtor_cb_t) path_destroy, true);
list_destroy(&cfg->nodes, (dtor_cb_t) node_destroy, true);
cfg->state = STATE_DESTROYED;
return 0;
}

View file

@ -19,11 +19,11 @@
#include "node.h"
#include "plugin.h"
int hook_init(struct hook *h, struct cfg *cfg)
int hook_init(struct hook *h, struct super_node *sn)
{
struct hook_info i = {
.nodes = &cfg->nodes,
.paths = &cfg->paths
.nodes = &sn->nodes,
.paths = &sn->paths
};
if (h->type & HOOK_INIT)

View file

@ -9,7 +9,7 @@
#include "config.h"
#include "utils.h"
#include "cfg.h"
#include "super_node.h"
#include "kernel/kernel.h"
#include "kernel/rt.h"

View file

@ -9,7 +9,6 @@
#include "sample.h"
#include "node.h"
#include "cfg.h"
#include "utils.h"
#include "config.h"
#include "plugin.h"

View file

@ -9,7 +9,7 @@
#include "sample.h"
#include "node.h"
#include "cfg.h"
#include "super_node.h"
#include "utils.h"
#include "config.h"
#include "plugin.h"

View file

@ -13,11 +13,11 @@
#include <libconfig.h>
#include "nodes/websocket.h"
#include "super_node.h"
#include "webmsg_format.h"
#include "timing.h"
#include "utils.h"
#include "msg.h"
#include "cfg.h"
#include "config.h"
#include "plugin.h"

View file

@ -19,7 +19,7 @@
#include "queue.h"
#include "hook.h"
#include "plugin.h"
#include "cfg.h"
#include "super_node.h"
#include "memory.h"
static void path_read(struct path *p)
@ -140,52 +140,17 @@ static int path_destination_destroy(struct path_destination *pd)
return 0;
}
int path_init(struct path *p, struct cfg *cfg)
int path_init(struct path *p, struct super_node *sn)
{
int ret, max_queuelen = 0;
if (p->state != STATE_DESTROYED)
return -1;
/* Add internal hooks if they are not already in the list*/
list_foreach(struct plugin *pl, &plugins) {
if (pl->type == PLUGIN_TYPE_HOOK) {
struct hook *h = &pl->hook;
assert(p->state == STATE_DESTROYED);
if ((h->type & HOOK_AUTO) && /* should this hook be added implicitely? */
(list_lookup(&p->hooks, pl->name) == NULL)) /* is not already in list? */
list_push(&p->hooks, memdup(h, sizeof(h)));
}
}
list_init(&p->hooks);
list_init(&p->destinations);
/* We sort the hooks according to their priority before starting the path */
list_sort(&p->hooks, hook_cmp_priority);
list_foreach(struct hook *h, &p->hooks)
hook_init(h, cfg);
/* Parse hook arguments */
ret = hook_run(p, NULL, 0, HOOK_PARSE);
if (ret)
error("Failed to parse arguments for hooks of path: %s", path_name(p));
/* Initialize destinations */
list_foreach(struct path_destination *pd, &p->destinations) {
ret = queue_init(&pd->queue, pd->queuelen, &memtype_hugepage);
if (ret)
error("Failed to initialize queue for path");
if (pd->queuelen > max_queuelen)
max_queuelen = pd->queuelen;
}
/* Initialize source */
ret = pool_init(&p->source->pool, max_queuelen, SAMPLE_LEN(p->source->samplelen), &memtype_hugepage);
if (ret)
error("Failed to allocate memory pool for path");
p->super_node = sn;
p->state = STATE_INITIALIZED;
return 0;
}
@ -196,7 +161,9 @@ int path_parse(struct path *p, config_setting_t *cfg, struct list *nodes)
int ret, samplelen, queuelen;
struct node *source;
struct list destinations;
struct list destinations = { .state = STATE_DESTROYED };
list_init(&destinations);
/* Input node */
if (!config_setting_lookup_string(cfg, "in", &in) &&
@ -216,14 +183,12 @@ int path_parse(struct path *p, config_setting_t *cfg, struct list *nodes)
!(cfg_out = config_setting_get_member(cfg, "dest")) &&
!(cfg_out = config_setting_get_member(cfg, "sink")))
cerror(cfg, "Missing output nodes for path");
list_init(&destinations);
ret = node_parse_list(&destinations, cfg_out, nodes);
if (ret <= 0)
cerror(cfg_out, "Invalid output nodes");
/* Optional settings */
list_init(&p->hooks);
cfg_hook = config_setting_get_member(cfg, "hook");
if (cfg_hook)
hook_parse_list(&p->hooks, cfg_hook);
@ -282,9 +247,54 @@ int path_check(struct path *p)
return 0;
}
int path_init2(struct path *p)
{
int ret, max_queuelen = 0;
assert(p->state == STATE_CHECKED);
/* Add internal hooks if they are not already in the list*/
list_foreach(struct plugin *pl, &plugins) {
if (pl->type == PLUGIN_TYPE_HOOK) {
struct hook c;
struct hook *h = &pl->hook;
if (h->type & HOOK_AUTO) {
hook_copy(h, &c);
list_push(&p->hooks, memdup(&c, sizeof(c)));
}
}
}
/* We sort the hooks according to their priority before starting the path */
list_sort(&p->hooks, hook_cmp_priority);
list_foreach(struct hook *h, &p->hooks)
hook_init(h, p->super_node);
/* Initialize destinations */
list_foreach(struct path_destination *pd, &p->destinations) {
ret = queue_init(&pd->queue, pd->queuelen, &memtype_hugepage);
if (ret)
error("Failed to initialize queue for path");
if (pd->queuelen > max_queuelen)
max_queuelen = pd->queuelen;
}
/* Initialize source */
ret = pool_init(&p->source->pool, max_queuelen, SAMPLE_LEN(p->source->samplelen), &memtype_hugepage);
if (ret)
error("Failed to allocate memory pool for path");
return 0;
}
int path_start(struct path *p)
{
int ret;
assert(p->state == STATE_CHECKED);
info("Starting path: %s (#hooks=%zu)",
path_name(p), list_length(&p->hooks));
@ -302,6 +312,7 @@ int path_start(struct path *p)
return 0;
}
int path_stop(struct path *p)
{
int ret;
@ -379,9 +390,6 @@ int path_reverse(struct path *p, struct path *r)
struct path_destination *first_pd = list_first(&p->destinations);
list_init(&r->destinations);
list_init(&r->hooks);
/* General */
r->enabled = p->enabled;
r->cfg = p->cfg;

380
lib/super_node.c Normal file
View file

@ -0,0 +1,380 @@
/** Configuration parser.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
*********************************************************************************/
#include <stdlib.h>
#include <string.h>
#include <libgen.h>
#include <unistd.h>
#include "super_node.h"
#include "node.h"
#include "path.h"
#include "utils.h"
#include "list.h"
#include "hook.h"
#include "advio.h"
#include "web.h"
#include "log.h"
#include "api.h"
#include "plugin.h"
#include "memory.h"
#include "kernel/rt.h"
static void config_dtor(void *data)
{
if (data)
free(data);
}
static int super_node_parse_global(struct super_node *sn, config_setting_t *cfg)
{
if (!config_setting_is_group(cfg))
cerror(cfg, "Global section must be a dictionary.");
config_setting_lookup_int(cfg, "hugepages", &sn->hugepages);
config_setting_lookup_int(cfg, "affinity", &sn->affinity);
config_setting_lookup_int(cfg, "priority", &sn->priority);
config_setting_lookup_float(cfg, "stats", &sn->stats);
return 0;
}
int super_node_init(struct super_node *sn)
{
assert(sn->state == STATE_DESTROYED);
config_init(&sn->cfg);
log_init(&sn->log, V, LOG_ALL);
api_init(&sn->api, sn);
web_init(&sn->web, &sn->api);
list_init(&sn->nodes);
list_init(&sn->paths);
list_init(&sn->plugins);
/* Default values */
sn->affinity = 0;
sn->priority = 0;
sn->stats = 0;
sn->hugepages = DEFAULT_NR_HUGEPAGES;
sn->web.port = 80;
sn->web.htdocs = "/villas/web/socket/";
sn->state = STATE_INITIALIZED;
return 0;
}
int super_node_parse_uri(struct super_node *sn, const char *uri)
{
info("Parsing configuration: uri=%s", uri);
int ret = CONFIG_FALSE;
if (uri) { INDENT
FILE *f;
AFILE *af;
config_setting_t *cfg_root;
/* Via stdin */
if (strcmp("-", uri) == 0) {
af = NULL;
f = stdin;
}
/* Local file? */
else if (access(uri, F_OK) != -1) {
/* Setup libconfig include path.
* This is only supported for local files */
char *uri_cpy = strdup(uri);
char *include_dir = dirname(uri_cpy);
config_set_include_dir(&sn->cfg, include_dir);
free(uri_cpy);
af = NULL;
f = fopen(uri, "r");
}
/* Use advio (libcurl) to fetch the config from a remote */
else {
af = afopen(uri, "r", ADVIO_MEM);
f = af ? af->file : NULL;
}
/* Check if file could be loaded / opened */
if (!f)
error("Failed to open configuration from: %s", uri);
/* Parse config */
config_set_auto_convert(&sn->cfg, 1);
ret = config_read(&sn->cfg, f);
if (ret != CONFIG_TRUE)
error("Failed to parse configuration: %s in %s:%d", config_error_text(&sn->cfg), uri, config_error_line(&sn->cfg));
cfg_root = config_root_setting(&sn->cfg);
/* Little hack to properly report configuration filename in error messages
* We add the uri as a "hook" object to the root setting.
* See cerror() on how this info is used.
*/
config_setting_set_hook(cfg_root, strdup(uri));
config_set_destructor(&sn->cfg, config_dtor);
/* Close configuration file */
if (af)
afclose(af);
else
fclose(f);
return super_node_parse(sn, cfg_root);
}
else { INDENT
warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance.");
}
return 0;
}
int super_node_parse_cli(struct super_node *sn, int argc, char *argv[])
{
char *uri = (argc == 2) ? argv[1] : NULL;
sn->cli.argc = argc;
sn->cli.argv = argv;
return super_node_parse_uri(sn, uri);
}
int super_node_parse(struct super_node *sn, config_setting_t *cfg)
{
int ret;
assert(sn->state != STATE_STARTED);
assert(sn->state != STATE_DESTROYED);
config_setting_t *cfg_nodes, *cfg_paths, *cfg_plugins, *cfg_logging, *cfg_web;
super_node_parse_global(sn, cfg);
cfg_web = config_setting_get_member(cfg, "http");
if (cfg_web)
web_parse(&sn->web, cfg_web);
/* Parse logging settings */
cfg_logging = config_setting_get_member(cfg, "logging");
if (cfg_logging)
log_parse(&sn->log, cfg_logging);
/* Parse plugins */
cfg_plugins = config_setting_get_member(cfg, "plugins");
if (cfg_plugins) {
if (!config_setting_is_array(cfg_plugins))
cerror(cfg_plugins, "Setting 'plugins' must be a list of strings");
for (int i = 0; i < config_setting_length(cfg_plugins); i++) {
struct config_setting_t *cfg_plugin = config_setting_get_elem(cfg_plugins, i);
struct plugin plugin;
ret = plugin_parse(&plugin, cfg_plugin);
if (ret)
cerror(cfg_plugin, "Failed to parse plugin");
list_push(&sn->plugins, memdup(&plugin, sizeof(plugin)));
}
}
/* Parse nodes */
cfg_nodes = config_setting_get_member(cfg, "nodes");
if (cfg_nodes) {
if (!config_setting_is_group(cfg_nodes))
warn("Setting 'nodes' must be a group with node name => group mappings.");
for (int i = 0; i < config_setting_length(cfg_nodes); i++) {
config_setting_t *cfg_node = config_setting_get_elem(cfg_nodes, i);
struct plugin *p;
const char *type;
/* Required settings */
if (!config_setting_lookup_string(cfg_node, "type", &type))
cerror(cfg_node, "Missing node type");
p = plugin_lookup(PLUGIN_TYPE_NODE, type);
if (!p)
cerror(cfg_node, "Invalid node type: %s", type);
struct node n = { .state = STATE_DESTROYED };
ret = node_init(&n, &p->node);
if (ret)
cerror(cfg_node, "Failed to initialize node");
ret = node_parse(&n, cfg_node);
if (ret)
cerror(cfg_node, "Failed to parse node");
list_push(&sn->nodes, memdup(&n, sizeof(n)));
}
}
/* Parse paths */
cfg_paths = config_setting_get_member(cfg, "paths");
if (cfg_paths) {
if (!config_setting_is_list(cfg_paths))
warn("Setting 'paths' must be a list.");
for (int i = 0; i < config_setting_length(cfg_paths); i++) {
config_setting_t *cfg_path = config_setting_get_elem(cfg_paths, i);
struct path p = { .state = STATE_DESTROYED };
ret = path_init(&p, sn);
if (ret)
cerror(cfg_path, "Failed to init path");
ret = path_parse(&p, cfg_path, &sn->nodes);
if (ret)
cerror(cfg_path, "Failed to parse path");
list_push(&sn->paths, memdup(&p, sizeof(p)));
if (p.reverse) {
struct path r;
ret = path_init(&r, sn);
if (ret)
cerror(cfg_path, "Failed to init path");
ret = path_reverse(&p, &r);
if (ret)
cerror(cfg_path, "Failed to reverse path %s", path_name(&p));
list_push(&sn->paths, memdup(&r, sizeof(p)));
}
}
}
sn->state = STATE_PARSED;
return 0;
}
int super_node_check(struct super_node *sn)
{
int ret;
list_foreach(struct node *n, &sn->nodes) {
ret = node_check(n);
if (ret)
error("Invalid configuration for node %s", node_name(n));
}
list_foreach(struct path *p, &sn->paths) {
ret = path_check(p);
if (ret)
error("Invalid configuration for path %s", path_name(p));
}
sn->state = STATE_CHECKED;
return 0;
}
int super_node_start(struct super_node *sn)
{
assert(sn->state == STATE_CHECKED);
memory_init(sn->hugepages);
rt_init(sn->priority, sn->affinity);
api_start(&sn->api);
web_start(&sn->web);
info("Start node types");
list_foreach(struct node *n, &sn->nodes) { INDENT
config_setting_t *cfg = config_root_setting(&sn->cfg);
node_type_start(n->_vt, sn->cli.argc, sn->cli.argv, cfg);
}
info("Starting nodes");
list_foreach(struct node *n, &sn->nodes) { INDENT
int refs = list_count(&sn->paths, (cmp_cb_t) path_uses_node, n);
if (refs > 0)
node_start(n);
else
warn("No path is using the node %s. Skipping...", node_name(n));
}
info("Starting paths");
list_foreach(struct path *p, &sn->paths) { INDENT
if (p->enabled) {
path_init(p, sn);
path_start(p);
}
else
warn("Path %s is disabled. Skipping...", path_name(p));
}
sn->state = STATE_STARTED;
return 0;
}
int super_node_stop(struct super_node *sn)
{
assert(sn->state == STATE_STARTED);
info("Stopping paths");
list_foreach(struct path *p, &sn->paths) { INDENT
path_stop(p);
}
info("Stopping nodes");
list_foreach(struct node *n, &sn->nodes) { INDENT
node_stop(n);
}
info("De-initializing node types");
list_foreach(struct plugin *p, &plugins) { INDENT
if (p->type == PLUGIN_TYPE_NODE)
node_type_stop(&p->node);
}
web_stop(&sn->web);
api_stop(&sn->api);
log_stop(&sn->log);
sn->state = STATE_STOPPED;
return 0;
}
int super_node_destroy(struct super_node *sn)
{
assert(sn->state != STATE_DESTROYED);
config_destroy(&sn->cfg);
web_destroy(&sn->web);
log_destroy(&sn->log);
api_destroy(&sn->api);
list_destroy(&sn->plugins, (dtor_cb_t) plugin_destroy, false);
list_destroy(&sn->paths, (dtor_cb_t) path_destroy, true);
list_destroy(&sn->nodes, (dtor_cb_t) node_destroy, true);
sn->state = STATE_DESTROYED;
return 0;
}

View file

@ -16,7 +16,6 @@
#include <ctype.h>
#include "config.h"
#include "cfg.h"
#include "utils.h"
void print_copyright()

View file

@ -10,7 +10,7 @@
#include <getopt.h>
#include <villas/log.h>
#include <villas/cfg.h>
#include <villas/super_node.h>
#include <villas/timing.h>
#include <villas/utils.h>
#include <villas/memory.h>
@ -41,7 +41,7 @@ int main(int argc, char *argv[])
{
int ret;
struct cfg cfg;
struct super_node sn;
struct fpga_card *card;
if (argc < 3)
@ -52,7 +52,7 @@ int main(int argc, char *argv[])
while ((c = getopt(argc-1, argv+1, "d:")) != -1) {
switch (c) {
case 'd':
cfg.log.level = strtoul(optarg, &endptr, 10);
sn.log.level = strtoul(optarg, &endptr, 10);
break;
case '?':
@ -61,20 +61,15 @@ int main(int argc, char *argv[])
}
}
info("Parsing configuration");
cfg_parse(&cfg, argv[1]);
super_node_init(&sn);
super_node_parse_uri(&sn, argv[1]);
info("Initialize logging system");
log_init(&cfg.log, cfg.log.level, cfg.log.facilities);
info("Initialize real-time system");
rt_init(cfg.priority, cfg.affinity);
info("Initialize memory system");
memory_init(cfg.hugepages);
log_init(&sn.log, sn.log.level, sn.log.facilities);
rt_init(sn.priority, sn.affinity);
memory_init(sn.hugepages);
/* Initialize VILLASfpga card */
ret = fpga_init(argc, argv, config_root_setting(&cfg.cfg));
ret = fpga_init(argc, argv, config_root_setting(&sn.cfg));
if (ret)
error("Failed to initialize FPGA card");
@ -92,7 +87,7 @@ int main(int argc, char *argv[])
if (ret)
error("Failed to de-initialize FPGA card");
cfg_destroy(&cfg);
super_node_destroy(&sn);
return 0;
}

View file

@ -8,8 +8,7 @@
#include <unistd.h>
#include <villas/utils.h>
#include <villas/cfg.h>
#include <villas/path.h>
#include <villas/super_node.h>
#include <villas/memory.h>
#include <villas/node.h>
#include <villas/api.h>
@ -19,27 +18,18 @@
#include <villas/kernel/kernel.h>
#include <villas/kernel/rt.h>
#include <villas/hook.h>
#include <villas/stats.h>
#ifdef ENABLE_OPAL_ASYNC
#include <villas/nodes/opal.h>
#endif
struct cfg cfg;
struct super_node sn;
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
info("Stopping paths");
list_foreach(struct path *p, &cfg.paths) { INDENT
path_stop(p);
}
info("Stopping nodes");
list_foreach(struct node *n, &cfg.nodes) { INDENT
node_stop(n);
}
cfg_deinit(&cfg);
cfg_destroy(&cfg);
super_node_stop(&sn);
super_node_destroy(&sn);
info(GRN("Goodbye!"));
@ -92,7 +82,7 @@ int main(int argc, char *argv[])
usage();
#endif
log_init(&cfg.log, V, LOG_ALL);
super_node_init(&sn);
info("This is VILLASnode %s (built on %s, %s)", BLD(YEL(VERSION)),
BLD(MAG(__DATE__)), BLD(MAG(__TIME__)));
@ -103,30 +93,11 @@ int main(int argc, char *argv[])
signals_init(quit);
cfg_init_pre(&cfg);
cfg_parse_cli(&cfg, argc, argv);
cfg_init_post(&cfg);
super_node_parse_cli(&sn, argc, argv);
super_node_check(&sn);
super_node_start(&sn);
info("Starting nodes");
list_foreach(struct node *n, &cfg.nodes) { INDENT
int refs = list_count(&cfg.paths, (cmp_cb_t) path_uses_node, n);
if (refs > 0)
node_start(n);
else
warn("No path is using the node %s. Skipping...", node_name(n));
}
info("Starting paths");
list_foreach(struct path *p, &cfg.paths) { INDENT
if (p->enabled) {
path_init(p, &cfg);
path_start(p);
}
else
warn("Path %s is disabled. Skipping...", path_name(p));
}
if (cfg.stats > 0)
if (sn.stats > 0)
stats_print_header();
struct timespec now, last = time_now();
@ -134,15 +105,15 @@ int main(int argc, char *argv[])
/* Run! Until signal handler is invoked */
while (1) {
now = time_now();
if (cfg.stats > 0 && time_delta(&last, &now) > cfg.stats) {
list_foreach(struct path *p, &cfg.paths) {
if (sn.stats > 0 && time_delta(&last, &now) > sn.stats) {
list_foreach(struct path *p, &sn.paths) {
hook_run(p, NULL, 0, HOOK_PERIODIC);
}
last = time_now();
}
web_service(&cfg.web); /** @todo Maybe we should move this to another thread */
web_service(&sn.web); /** @todo Maybe we should move this to another thread */
}
return 0;

View file

@ -14,7 +14,7 @@
#include <signal.h>
#include <pthread.h>
#include <villas/cfg.h>
#include <villas/super_node.h>
#include <villas/utils.h>
#include <villas/node.h>
#include <villas/msg.h>
@ -24,7 +24,7 @@
#include "config.h"
static struct cfg cfg; /**< The global configuration */
static struct super_node sn = { .state = STATE_DESTROYED }; /**< The global configuration */
struct dir {
struct pool pool;
@ -53,10 +53,11 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
pool_destroy(&sendd.pool);
}
web_stop(&sn.web);
node_stop(node);
node_type_stop(node->_vt);
cfg_destroy(&cfg);
super_node_destroy(&sn);
info(GRN("Goodbye!"));
_exit(EXIT_SUCCESS);
@ -172,15 +173,11 @@ int main(int argc, char *argv[])
ptid = pthread_self();
/* Parse command line arguments */
if (argc < 3)
usage();
/* Default values */
sendd.enabled = true;
recvv.enabled = true;
while ((c = getopt(argc-2, argv+2, "hxrsd:")) != -1) {
while ((c = getopt(argc, argv, "hxrsd:")) != -1) {
switch (c) {
case 'x':
reverse = true;
@ -200,24 +197,40 @@ int main(int argc, char *argv[])
exit(c == '?' ? EXIT_FAILURE : EXIT_SUCCESS);
}
}
if (argc < optind + 2) {
usage();
exit(EXIT_FAILURE);
}
log_init(&cfg.log, level, LOG_ALL);
log_init(&sn.log, level, LOG_ALL);
log_start(&sn.log);
super_node_init(&sn);
super_node_parse_uri(&sn, argv[optind]);
memory_init(sn.hugepages);
signals_init(quit);
cfg_parse(&cfg, argv[1]);
rt_init(cfg.priority, cfg.affinity);
memory_init(cfg.hugepages);
rt_init(sn.priority, sn.affinity);
web_init(&sn.web, NULL); /* API is disabled in villas-pipe */
web_start(&sn.web);
/* Initialize node */
node = list_lookup(&cfg.nodes, argv[2]);
node = list_lookup(&sn.nodes, argv[optind+1]);
if (!node)
error("Node '%s' does not exist!", argv[2]);
error("Node '%s' does not exist!", argv[optind+1]);
if (reverse)
node_reverse(node);
ret = node_type_start(node->_vt, argc-optind, argv+optind, config_root_setting(&cfg.cfg));
ret = node_type_start(node->_vt, argc, argv, config_root_setting(&sn.cfg));
if (ret)
error("Failed to intialize node type: %s", node_name(node));
ret = node_check(node);
if (ret)
error("Invalid node configuration");
ret = node_start(node);
if (ret)
@ -227,8 +240,9 @@ int main(int argc, char *argv[])
pthread_create(&recvv.thread, NULL, recv_loop, NULL);
pthread_create(&sendd.thread, NULL, send_loop, NULL);
for (;;)
pause();
for (;;) {
web_service(&sn.web);
}
return 0;
}