diff --git a/include/villas/api.h b/include/villas/api.h index 53d0c1d9a..35dfcd307 100644 --- a/include/villas/api.h +++ b/include/villas/api.h @@ -15,7 +15,7 @@ /* Forward declarations */ enum lws_callback_reasons; struct lws; -struct cfg; +struct super_node; struct api; struct api_ressource; @@ -46,7 +46,7 @@ struct api { enum state state; - struct cfg *cfg; + struct super_node *super_node; }; struct api_buffer { @@ -88,7 +88,7 @@ struct api_ressource { * * Save references to list of paths / nodes for command execution. */ -int api_init(struct api *a, struct cfg *cfg); +int api_init(struct api *a, struct super_node *sn); int api_destroy(struct api *a); diff --git a/include/villas/hook.h b/include/villas/hook.h index 3f121b81b..cac1d91f0 100644 --- a/include/villas/hook.h +++ b/include/villas/hook.h @@ -23,13 +23,13 @@ #include "queue.h" #include "list.h" -#include "cfg.h" +#include "super_node.h" /* Forward declarations */ struct path; struct hook; struct sample; -struct cfg; +struct super_node; /** Optional parameters to hook callbacks */ struct hook_info { @@ -109,7 +109,7 @@ struct hook { }; /** Save references to global nodes, paths and settings */ -int hook_init(struct hook *h, struct cfg *cfg); +int hook_init(struct hook *h, struct super_node *sn); int hook_destroy(struct hook *h); diff --git a/include/villas/path.h b/include/villas/path.h index 393bdabc8..5c090fea8 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -27,7 +27,7 @@ #include "common.h" /* Forward declarations */ -struct cfg; +struct super_node; struct path_source { @@ -65,21 +65,18 @@ struct path struct stats *stats; /**< Statistic counters. This is a pointer to the statistic hooks private data. */ + struct super_node *super_node; /**< The super node this path belongs to. */ config_setting_t *cfg; /**< A pointer to the libconfig object which instantiated this path */ }; /** Initialize internal data structures. */ -int path_init(struct path *p, struct cfg *cfg); +int path_init(struct path *p, struct super_node *sn); + +int path_init2(struct path *p); /** Check if path configuration is proper. */ int path_check(struct path *p); -/** Destroy path by freeing dynamically allocated memory. - * - * @param i A pointer to the path structure. - */ -int path_destroy(struct path *p); - /** Start a path. * * Start a new pthread for receiving/sending messages over this path. @@ -98,6 +95,12 @@ int path_start(struct path *p); */ int path_stop(struct path *p); +/** Destroy path by freeing dynamically allocated memory. + * + * @param i A pointer to the path structure. + */ +int path_destroy(struct path *p); + /** Show some basic statistics for a path. * * @param p A pointer to the path structure. diff --git a/include/villas/cfg.h b/include/villas/super_node.h similarity index 69% rename from include/villas/cfg.h rename to include/villas/super_node.h index 4474ef0ff..52026dd7b 100644 --- a/include/villas/cfg.h +++ b/include/villas/super_node.h @@ -20,7 +20,7 @@ #include "common.h" /** Global configuration */ -struct cfg { +struct super_node { int priority; /**< Process priority (lower is better) */ int affinity; /**< Process affinity of the server and all created threads */ int hugepages; /**< Number of hugepages to reserve. */ @@ -51,23 +51,30 @@ struct cfg { #endif /** Inititalize configuration object before parsing the configuration. */ -int cfg_init(struct cfg *cfg); +int super_node_init(struct super_node *sn); -/** Initialize after parsing the configuration file. */ -int cfg_start(struct cfg *cfg); +/** Wrapper for super_node_parse() */ +int super_node_parse_cli(struct super_node *sn, int argc, char *argv[]); -int cfg_stop(struct cfg *cfg); +/** Wrapper for super_node_parse() */ +int super_node_parse_uri(struct super_node *sn, const char *uri); -/** Desctroy configuration object. */ -int cfg_destroy(struct cfg *cfg); - -/** Parse config file and store settings in supplied struct cfg. +/** Parse super-node configuration. * - * @param cfg A configuration object. - * @param filename The path to the configration file (relative or absolute) + * @param sn The super-node datastructure to fill. + * @param cfg A libconfig setting object. * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int cfg_parse(struct cfg *cfg, const char *uri); +int super_node_parse(struct super_node *sn, config_setting_t *cfg); -int cfg_parse_cli(struct cfg *cfg, int argc, char *argv[]); \ No newline at end of file +/** Check validity of super node configuration. */ +int super_node_check(struct super_node *sn); + +/** Initialize after parsing the configuration file. */ +int super_node_start(struct super_node *sn); + +int super_node_stop(struct super_node *sn); + +/** Desctroy configuration object. */ +int super_node_destroy(struct super_node *sn); \ No newline at end of file diff --git a/include/villas/utils.h b/include/villas/utils.h index b916757fe..164ce0d96 100644 --- a/include/villas/utils.h +++ b/include/villas/utils.h @@ -92,7 +92,6 @@ #define BIT(nr) (1UL << (nr)) /* Forward declarations */ -struct cfg; struct timespec; /** Print copyright message to screen. */ diff --git a/lib/Makefile.inc b/lib/Makefile.inc index 43c0b1829..b35a12e26 100644 --- a/lib/Makefile.inc +++ b/lib/Makefile.inc @@ -5,9 +5,9 @@ LIBS = $(BUILDDIR)/libvillas.so LIB_SRCS = $(addprefix lib/nodes/, file.c cbuilder.c) \ $(addprefix lib/kernel/, kernel.c rt.c) \ $(addprefix lib/, sample.c path.c node.c hook.c \ - log.c utils.c cfg.c hist.c timing.c pool.c list.c \ - queue.c memory.c advio.c web.c api.c plugin.c \ - node_type.c stats.c \ + log.c utils.c super_node.c hist.c timing.c pool.c \ + list.c queue.c memory.c advio.c web.c api.c \ + plugin.c node_type.c stats.c \ ) LIB_CFLAGS = $(CFLAGS) -fPIC diff --git a/lib/api.c b/lib/api.c index 352f31189..86b5ad668 100644 --- a/lib/api.c +++ b/lib/api.c @@ -231,13 +231,13 @@ int api_buffer_append(struct api_buffer *b, const char *in, size_t len) return 0; } -int api_init(struct api *a, struct cfg *cfg) +int api_init(struct api *a, struct super_node *sn) { info("Initialize API sub-system"); list_init(&a->sessions); - a->cfg = cfg; + a->super_node = sn; a->state = STATE_INITIALIZED; return 0; diff --git a/lib/apis/config.c b/lib/apis/config.c index 8d12e8735..ed1a78870 100644 --- a/lib/apis/config.c +++ b/lib/apis/config.c @@ -14,7 +14,7 @@ static int api_config(struct api_ressource *h, json_t *args, json_t **resp, struct api_session *s) { - config_setting_t *cfg_root = config_root_setting(&s->api->cfg->cfg); + config_setting_t *cfg_root = config_root_setting(&s->api->super_node->cfg); *resp = config_to_json(cfg_root); diff --git a/lib/apis/nodes.c b/lib/apis/nodes.c index 0ba40760a..6567c8b10 100644 --- a/lib/apis/nodes.c +++ b/lib/apis/nodes.c @@ -18,8 +18,8 @@ extern struct list nodes; static int api_nodes(struct api_ressource *r, json_t *args, json_t **resp, struct api_session *s) { json_t *json_nodes = json_array(); - - list_foreach(struct node *n, &s->api->cfg->nodes) { + + list_foreach(struct node *n, &s->api->super_node->nodes) { json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }", "name", node_name_short(n), "state", n->state, diff --git a/lib/cfg.c b/lib/cfg.c deleted file mode 100644 index 8a628a007..000000000 --- a/lib/cfg.c +++ /dev/null @@ -1,317 +0,0 @@ -/** Configuration parser. - * - * @author Steffen Vogel - * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC - *********************************************************************************/ - -#include -#include -#include -#include - -#include "utils.h" -#include "list.h" -#include "cfg.h" -#include "node.h" -#include "path.h" -#include "hook.h" -#include "advio.h" -#include "web.h" -#include "log.h" -#include "api.h" -#include "plugin.h" -#include "node.h" -#include "memory.h" - -#include "kernel/rt.h" - -static int cfg_parse_global(struct cfg *cfg, config_setting_t *cfg_root) -{ - if (!config_setting_is_group(cfg_root)) - cerror(cfg_root, "Global section must be a dictionary."); - - config_setting_lookup_int(cfg_root, "hugepages", &cfg->hugepages); - config_setting_lookup_int(cfg_root, "affinity", &cfg->affinity); - config_setting_lookup_int(cfg_root, "priority", &cfg->priority); - config_setting_lookup_float(cfg_root, "stats", &cfg->stats); - - return 0; -} - -int cfg_init(struct cfg *cfg) -{ - config_init(&cfg->cfg); - - log_init(&cfg->log, V, LOG_ALL); - api_init(&cfg->api, cfg); - web_init(&cfg->web, &cfg->api); - - list_init(&cfg->nodes); - list_init(&cfg->paths); - list_init(&cfg->plugins); - - /* Default values */ - cfg->affinity = -1; - cfg->stats = 0; - cfg->priority = 50; - cfg->hugepages = DEFAULT_NR_HUGEPAGES; - - cfg->state = STATE_INITIALIZED; - - return 0; -} - -int cfg_parse_cli(struct cfg *cfg, int argc, char *argv[]) -{ - cfg->cli.argc = argc; - cfg->cli.argv = argv; - - char *uri = (argc == 2) ? argv[1] : NULL; - - return cfg_parse(cfg, uri); -} - -int cfg_parse(struct cfg *cfg, const char *uri) -{ - config_setting_t *cfg_root, *cfg_nodes, *cfg_paths, *cfg_plugins, *cfg_logging, *cfg_web; - - info("Parsing configuration: uri=%s", uri); - - { INDENT - - int ret = CONFIG_FALSE; - - if (uri) { - /* Setup libconfig */ - config_set_auto_convert(&cfg->cfg, 1); - - FILE *f; - AFILE *af; - - /* Via stdin */ - if (strcmp("-", uri) == 0) { - af = NULL; - f = stdin; - } - /* Local file? */ - else if (access(uri, F_OK) != -1) { - /* Setup libconfig include path. - * This is only supported for local files */ - char *uri_cpy = strdup(uri); - char *include_dir = dirname(uri_cpy); - - config_set_include_dir(&cfg->cfg, include_dir); - - free(uri_cpy); - - af = NULL; - f = fopen(uri, "r"); - } - /* Use advio (libcurl) to fetch the config from a remote */ - else { - af = afopen(uri, "r", ADVIO_MEM); - f = af ? af->file : NULL; - } - - /* Check if file could be loaded / opened */ - if (!f) - error("Failed to open configuration from: %s", uri); - - /* Parse config */ - ret = config_read(&cfg->cfg, f); - if (ret != CONFIG_TRUE) - error("Failed to parse configuration: %s in %s:%d", config_error_text(&cfg->cfg), uri, config_error_line(&cfg->cfg)); - - /* Close configuration file */ - if (af) - afclose(af); - else - fclose(f); - } - else { - warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance."); - - cfg->web.port = 80; - cfg->web.htdocs = "/villas/web/socket/"; - } - - cfg_root = config_root_setting(&cfg->cfg); - if (cfg_root) - cfg_parse_global(cfg, cfg_root); - - cfg_web = config_setting_get_member(cfg_root, "http"); - if (cfg_web) - web_parse(&cfg->web, cfg_web); - - /* Parse logging settings */ - cfg_logging = config_setting_get_member(cfg_root, "logging"); - if (cfg_logging) - log_parse(&cfg->log, cfg_logging); - - /* Parse plugins */ - cfg_plugins = config_setting_get_member(cfg_root, "plugins"); - if (cfg_plugins) { - if (!config_setting_is_array(cfg_plugins)) - cerror(cfg_plugins, "Setting 'plugins' must be a list of strings"); - - for (int i = 0; i < config_setting_length(cfg_plugins); i++) { - struct config_setting_t *cfg_plugin = config_setting_get_elem(cfg_plugins, i); - - struct plugin plugin; - - ret = plugin_parse(&plugin, cfg_plugin); - if (ret) - cerror(cfg_plugin, "Failed to parse plugin"); - - list_push(&cfg->plugins, memdup(&plugin, sizeof(plugin))); - } - } - - /* Parse nodes */ - cfg_nodes = config_setting_get_member(cfg_root, "nodes"); - if (cfg_nodes) { - if (!config_setting_is_group(cfg_nodes)) - warn("Setting 'nodes' must be a group with node name => group mappings."); - - for (int i = 0; i < config_setting_length(cfg_nodes); i++) { - config_setting_t *cfg_node = config_setting_get_elem(cfg_nodes, i); - - struct plugin *p; - const char *type; - - /* Required settings */ - if (!config_setting_lookup_string(cfg_node, "type", &type)) - cerror(cfg_node, "Missing node type"); - - - p = plugin_lookup(PLUGIN_TYPE_NODE, type); - if (!p) - cerror(cfg_node, "Invalid node type: %s", type); - - struct node n; - - ret = node_parse(&n, cfg_node); - if (ret) - cerror(cfg_node, "Failed to parse node"); - - list_push(&cfg->nodes, memdup(&n, sizeof(n))); - } - } - - /* Parse paths */ - cfg_paths = config_setting_get_member(cfg_root, "paths"); - if (cfg_paths) { - if (!config_setting_is_list(cfg_paths)) - warn("Setting 'paths' must be a list."); - - for (int i = 0; i < config_setting_length(cfg_paths); i++) { - config_setting_t *cfg_path = config_setting_get_elem(cfg_paths, i); - - struct path p; - - ret = path_parse(&p, cfg_path, &cfg->nodes); - if (ret) - cerror(cfg_path, "Failed to parse path"); - - list_push(&cfg->paths, memdup(&p, sizeof(p))); - - if (p.reverse) { - struct path r; - - ret = path_reverse(&p, &r); - if (ret) - cerror(cfg_path, "Failed to reverse path %s", path_name(&p)); - - list_push(&cfg->paths, memdup(&r, sizeof(p))); - } - } - } - } - - cfg->state = STATE_PARSED; - - return 0; -} - -int cfg_start(struct cfg *cfg) -{ - memory_init(cfg->hugepages); - rt_init(cfg->priority, cfg->affinity); - - api_start(&cfg->api); - web_start(&cfg->web); - - info("Start node types"); - list_foreach(struct node *n, &cfg->nodes) { INDENT - config_setting_t *cfg_root = config_root_setting(&cfg->cfg); - - node_type_start(n->_vt, cfg->cli.argc, cfg->cli.argv, cfg_root); - } - - info("Starting nodes"); - list_foreach(struct node *n, &cfg->nodes) { INDENT - int refs = list_count(&cfg->paths, (cmp_cb_t) path_uses_node, n); - if (refs > 0) - node_start(n); - else - warn("No path is using the node %s. Skipping...", node_name(n)); - } - - info("Starting paths"); - list_foreach(struct path *p, &cfg->paths) { INDENT - if (p->enabled) { - path_init(p, cfg); - path_start(p); - } - else - warn("Path %s is disabled. Skipping...", path_name(p)); - } - - cfg->state = STATE_STARTED; - - return 0; -} - -int cfg_stop(struct cfg *cfg) -{ - info("Stopping paths"); - list_foreach(struct path *p, &cfg->paths) { INDENT - path_stop(p); - } - - info("Stopping nodes"); - list_foreach(struct node *n, &cfg->nodes) { INDENT - node_stop(n); - } - - info("De-initializing node types"); - list_foreach(struct plugin *p, &plugins) { INDENT - if (p->type == PLUGIN_TYPE_NODE) - node_type_stop(&p->node); - } - - web_stop(&cfg->web); - api_stop(&cfg->api); - log_stop(&cfg->log); - - cfg->state = STATE_STOPPED; - - return 0; -} - -int cfg_destroy(struct cfg *cfg) -{ - config_destroy(&cfg->cfg); - - web_destroy(&cfg->web); - log_destroy(&cfg->log); - api_destroy(&cfg->api); - - list_destroy(&cfg->plugins, (dtor_cb_t) plugin_destroy, false); - list_destroy(&cfg->paths, (dtor_cb_t) path_destroy, true); - list_destroy(&cfg->nodes, (dtor_cb_t) node_destroy, true); - - cfg->state = STATE_DESTROYED; - - return 0; -} \ No newline at end of file diff --git a/lib/hook.c b/lib/hook.c index ef5dcbd98..dfa9d9ffb 100644 --- a/lib/hook.c +++ b/lib/hook.c @@ -19,11 +19,11 @@ #include "node.h" #include "plugin.h" -int hook_init(struct hook *h, struct cfg *cfg) +int hook_init(struct hook *h, struct super_node *sn) { struct hook_info i = { - .nodes = &cfg->nodes, - .paths = &cfg->paths + .nodes = &sn->nodes, + .paths = &sn->paths }; if (h->type & HOOK_INIT) diff --git a/lib/kernel/rt.c b/lib/kernel/rt.c index 4f6b4776f..f0507d03f 100644 --- a/lib/kernel/rt.c +++ b/lib/kernel/rt.c @@ -9,7 +9,7 @@ #include "config.h" #include "utils.h" -#include "cfg.h" +#include "super_node.h" #include "kernel/kernel.h" #include "kernel/rt.h" diff --git a/lib/node.c b/lib/node.c index 054d5b53f..5e365ed9f 100644 --- a/lib/node.c +++ b/lib/node.c @@ -9,7 +9,6 @@ #include "sample.h" #include "node.h" -#include "cfg.h" #include "utils.h" #include "config.h" #include "plugin.h" diff --git a/lib/node_type.c b/lib/node_type.c index f4e585980..bbbfd4f1e 100644 --- a/lib/node_type.c +++ b/lib/node_type.c @@ -9,7 +9,7 @@ #include "sample.h" #include "node.h" -#include "cfg.h" +#include "super_node.h" #include "utils.h" #include "config.h" #include "plugin.h" diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index af79b7a3c..2120fd9e3 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -13,11 +13,11 @@ #include #include "nodes/websocket.h" +#include "super_node.h" #include "webmsg_format.h" #include "timing.h" #include "utils.h" #include "msg.h" -#include "cfg.h" #include "config.h" #include "plugin.h" diff --git a/lib/path.c b/lib/path.c index f85f27c89..f9cb2b317 100644 --- a/lib/path.c +++ b/lib/path.c @@ -19,7 +19,7 @@ #include "queue.h" #include "hook.h" #include "plugin.h" -#include "cfg.h" +#include "super_node.h" #include "memory.h" static void path_read(struct path *p) @@ -140,52 +140,17 @@ static int path_destination_destroy(struct path_destination *pd) return 0; } -int path_init(struct path *p, struct cfg *cfg) +int path_init(struct path *p, struct super_node *sn) { - int ret, max_queuelen = 0; - - if (p->state != STATE_DESTROYED) - return -1; - - /* Add internal hooks if they are not already in the list*/ - list_foreach(struct plugin *pl, &plugins) { - if (pl->type == PLUGIN_TYPE_HOOK) { - struct hook *h = &pl->hook; + assert(p->state == STATE_DESTROYED); - if ((h->type & HOOK_AUTO) && /* should this hook be added implicitely? */ - (list_lookup(&p->hooks, pl->name) == NULL)) /* is not already in list? */ - list_push(&p->hooks, memdup(h, sizeof(h))); - } - } + list_init(&p->hooks); + list_init(&p->destinations); - /* We sort the hooks according to their priority before starting the path */ - list_sort(&p->hooks, hook_cmp_priority); - - list_foreach(struct hook *h, &p->hooks) - hook_init(h, cfg); - - /* Parse hook arguments */ - ret = hook_run(p, NULL, 0, HOOK_PARSE); - if (ret) - error("Failed to parse arguments for hooks of path: %s", path_name(p)); - - /* Initialize destinations */ - list_foreach(struct path_destination *pd, &p->destinations) { - ret = queue_init(&pd->queue, pd->queuelen, &memtype_hugepage); - if (ret) - error("Failed to initialize queue for path"); - - if (pd->queuelen > max_queuelen) - max_queuelen = pd->queuelen; - } - - /* Initialize source */ - ret = pool_init(&p->source->pool, max_queuelen, SAMPLE_LEN(p->source->samplelen), &memtype_hugepage); - if (ret) - error("Failed to allocate memory pool for path"); + p->super_node = sn; p->state = STATE_INITIALIZED; - + return 0; } @@ -196,7 +161,9 @@ int path_parse(struct path *p, config_setting_t *cfg, struct list *nodes) int ret, samplelen, queuelen; struct node *source; - struct list destinations; + struct list destinations = { .state = STATE_DESTROYED }; + + list_init(&destinations); /* Input node */ if (!config_setting_lookup_string(cfg, "in", &in) && @@ -216,14 +183,12 @@ int path_parse(struct path *p, config_setting_t *cfg, struct list *nodes) !(cfg_out = config_setting_get_member(cfg, "dest")) && !(cfg_out = config_setting_get_member(cfg, "sink"))) cerror(cfg, "Missing output nodes for path"); - - list_init(&destinations); + ret = node_parse_list(&destinations, cfg_out, nodes); if (ret <= 0) cerror(cfg_out, "Invalid output nodes"); /* Optional settings */ - list_init(&p->hooks); cfg_hook = config_setting_get_member(cfg, "hook"); if (cfg_hook) hook_parse_list(&p->hooks, cfg_hook); @@ -282,9 +247,54 @@ int path_check(struct path *p) return 0; } +int path_init2(struct path *p) +{ + int ret, max_queuelen = 0; + + assert(p->state == STATE_CHECKED); + + /* Add internal hooks if they are not already in the list*/ + list_foreach(struct plugin *pl, &plugins) { + if (pl->type == PLUGIN_TYPE_HOOK) { + struct hook c; + struct hook *h = &pl->hook; + + if (h->type & HOOK_AUTO) { + hook_copy(h, &c); + list_push(&p->hooks, memdup(&c, sizeof(c))); + } + } + } + + /* We sort the hooks according to their priority before starting the path */ + list_sort(&p->hooks, hook_cmp_priority); + + list_foreach(struct hook *h, &p->hooks) + hook_init(h, p->super_node); + + /* Initialize destinations */ + list_foreach(struct path_destination *pd, &p->destinations) { + ret = queue_init(&pd->queue, pd->queuelen, &memtype_hugepage); + if (ret) + error("Failed to initialize queue for path"); + + if (pd->queuelen > max_queuelen) + max_queuelen = pd->queuelen; + } + + /* Initialize source */ + ret = pool_init(&p->source->pool, max_queuelen, SAMPLE_LEN(p->source->samplelen), &memtype_hugepage); + if (ret) + error("Failed to allocate memory pool for path"); + + return 0; +} + int path_start(struct path *p) { int ret; + + assert(p->state == STATE_CHECKED); info("Starting path: %s (#hooks=%zu)", path_name(p), list_length(&p->hooks)); @@ -302,6 +312,7 @@ int path_start(struct path *p) return 0; } + int path_stop(struct path *p) { int ret; @@ -379,9 +390,6 @@ int path_reverse(struct path *p, struct path *r) struct path_destination *first_pd = list_first(&p->destinations); - list_init(&r->destinations); - list_init(&r->hooks); - /* General */ r->enabled = p->enabled; r->cfg = p->cfg; diff --git a/lib/super_node.c b/lib/super_node.c new file mode 100644 index 000000000..e66df1366 --- /dev/null +++ b/lib/super_node.c @@ -0,0 +1,380 @@ +/** Configuration parser. + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + *********************************************************************************/ + +#include +#include +#include +#include + +#include "super_node.h" +#include "node.h" +#include "path.h" +#include "utils.h" +#include "list.h" +#include "hook.h" +#include "advio.h" +#include "web.h" +#include "log.h" +#include "api.h" +#include "plugin.h" +#include "memory.h" + +#include "kernel/rt.h" + +static void config_dtor(void *data) +{ + if (data) + free(data); +} + +static int super_node_parse_global(struct super_node *sn, config_setting_t *cfg) +{ + if (!config_setting_is_group(cfg)) + cerror(cfg, "Global section must be a dictionary."); + + config_setting_lookup_int(cfg, "hugepages", &sn->hugepages); + config_setting_lookup_int(cfg, "affinity", &sn->affinity); + config_setting_lookup_int(cfg, "priority", &sn->priority); + config_setting_lookup_float(cfg, "stats", &sn->stats); + + return 0; +} + +int super_node_init(struct super_node *sn) +{ + assert(sn->state == STATE_DESTROYED); + + config_init(&sn->cfg); + + log_init(&sn->log, V, LOG_ALL); + api_init(&sn->api, sn); + web_init(&sn->web, &sn->api); + + list_init(&sn->nodes); + list_init(&sn->paths); + list_init(&sn->plugins); + + /* Default values */ + sn->affinity = 0; + sn->priority = 0; + sn->stats = 0; + sn->hugepages = DEFAULT_NR_HUGEPAGES; + + sn->web.port = 80; + sn->web.htdocs = "/villas/web/socket/"; + + sn->state = STATE_INITIALIZED; + + return 0; +} + +int super_node_parse_uri(struct super_node *sn, const char *uri) +{ + info("Parsing configuration: uri=%s", uri); + + int ret = CONFIG_FALSE; + + if (uri) { INDENT + FILE *f; + AFILE *af; + config_setting_t *cfg_root; + + /* Via stdin */ + if (strcmp("-", uri) == 0) { + af = NULL; + f = stdin; + } + /* Local file? */ + else if (access(uri, F_OK) != -1) { + /* Setup libconfig include path. + * This is only supported for local files */ + char *uri_cpy = strdup(uri); + char *include_dir = dirname(uri_cpy); + + config_set_include_dir(&sn->cfg, include_dir); + + free(uri_cpy); + + af = NULL; + f = fopen(uri, "r"); + } + /* Use advio (libcurl) to fetch the config from a remote */ + else { + af = afopen(uri, "r", ADVIO_MEM); + f = af ? af->file : NULL; + } + + /* Check if file could be loaded / opened */ + if (!f) + error("Failed to open configuration from: %s", uri); + + /* Parse config */ + config_set_auto_convert(&sn->cfg, 1); + ret = config_read(&sn->cfg, f); + if (ret != CONFIG_TRUE) + error("Failed to parse configuration: %s in %s:%d", config_error_text(&sn->cfg), uri, config_error_line(&sn->cfg)); + + cfg_root = config_root_setting(&sn->cfg); + + /* Little hack to properly report configuration filename in error messages + * We add the uri as a "hook" object to the root setting. + * See cerror() on how this info is used. + */ + config_setting_set_hook(cfg_root, strdup(uri)); + config_set_destructor(&sn->cfg, config_dtor); + + /* Close configuration file */ + if (af) + afclose(af); + else + fclose(f); + + + + return super_node_parse(sn, cfg_root); + } + else { INDENT + warn("No configuration file specified. Starting unconfigured. Use the API to configure this instance."); + } + + return 0; +} + +int super_node_parse_cli(struct super_node *sn, int argc, char *argv[]) +{ + char *uri = (argc == 2) ? argv[1] : NULL; + + sn->cli.argc = argc; + sn->cli.argv = argv; + + return super_node_parse_uri(sn, uri); +} + +int super_node_parse(struct super_node *sn, config_setting_t *cfg) +{ + int ret; + + assert(sn->state != STATE_STARTED); + assert(sn->state != STATE_DESTROYED); + + config_setting_t *cfg_nodes, *cfg_paths, *cfg_plugins, *cfg_logging, *cfg_web; + + super_node_parse_global(sn, cfg); + + cfg_web = config_setting_get_member(cfg, "http"); + if (cfg_web) + web_parse(&sn->web, cfg_web); + + /* Parse logging settings */ + cfg_logging = config_setting_get_member(cfg, "logging"); + if (cfg_logging) + log_parse(&sn->log, cfg_logging); + + /* Parse plugins */ + cfg_plugins = config_setting_get_member(cfg, "plugins"); + if (cfg_plugins) { + if (!config_setting_is_array(cfg_plugins)) + cerror(cfg_plugins, "Setting 'plugins' must be a list of strings"); + + for (int i = 0; i < config_setting_length(cfg_plugins); i++) { + struct config_setting_t *cfg_plugin = config_setting_get_elem(cfg_plugins, i); + + struct plugin plugin; + + ret = plugin_parse(&plugin, cfg_plugin); + if (ret) + cerror(cfg_plugin, "Failed to parse plugin"); + + list_push(&sn->plugins, memdup(&plugin, sizeof(plugin))); + } + } + + /* Parse nodes */ + cfg_nodes = config_setting_get_member(cfg, "nodes"); + if (cfg_nodes) { + if (!config_setting_is_group(cfg_nodes)) + warn("Setting 'nodes' must be a group with node name => group mappings."); + + for (int i = 0; i < config_setting_length(cfg_nodes); i++) { + config_setting_t *cfg_node = config_setting_get_elem(cfg_nodes, i); + + struct plugin *p; + const char *type; + + /* Required settings */ + if (!config_setting_lookup_string(cfg_node, "type", &type)) + cerror(cfg_node, "Missing node type"); + + + p = plugin_lookup(PLUGIN_TYPE_NODE, type); + if (!p) + cerror(cfg_node, "Invalid node type: %s", type); + + struct node n = { .state = STATE_DESTROYED }; + + ret = node_init(&n, &p->node); + if (ret) + cerror(cfg_node, "Failed to initialize node"); + + ret = node_parse(&n, cfg_node); + if (ret) + cerror(cfg_node, "Failed to parse node"); + + list_push(&sn->nodes, memdup(&n, sizeof(n))); + } + } + + /* Parse paths */ + cfg_paths = config_setting_get_member(cfg, "paths"); + if (cfg_paths) { + if (!config_setting_is_list(cfg_paths)) + warn("Setting 'paths' must be a list."); + + for (int i = 0; i < config_setting_length(cfg_paths); i++) { + config_setting_t *cfg_path = config_setting_get_elem(cfg_paths, i); + + struct path p = { .state = STATE_DESTROYED }; + + ret = path_init(&p, sn); + if (ret) + cerror(cfg_path, "Failed to init path"); + + ret = path_parse(&p, cfg_path, &sn->nodes); + if (ret) + cerror(cfg_path, "Failed to parse path"); + + list_push(&sn->paths, memdup(&p, sizeof(p))); + + if (p.reverse) { + struct path r; + + ret = path_init(&r, sn); + if (ret) + cerror(cfg_path, "Failed to init path"); + + ret = path_reverse(&p, &r); + if (ret) + cerror(cfg_path, "Failed to reverse path %s", path_name(&p)); + + list_push(&sn->paths, memdup(&r, sizeof(p))); + } + } + } + + sn->state = STATE_PARSED; + + return 0; +} + +int super_node_check(struct super_node *sn) +{ + int ret; + + list_foreach(struct node *n, &sn->nodes) { + ret = node_check(n); + if (ret) + error("Invalid configuration for node %s", node_name(n)); + } + + list_foreach(struct path *p, &sn->paths) { + ret = path_check(p); + if (ret) + error("Invalid configuration for path %s", path_name(p)); + } + + sn->state = STATE_CHECKED; + + return 0; +} + +int super_node_start(struct super_node *sn) +{ + assert(sn->state == STATE_CHECKED); + + memory_init(sn->hugepages); + rt_init(sn->priority, sn->affinity); + + api_start(&sn->api); + web_start(&sn->web); + + info("Start node types"); + list_foreach(struct node *n, &sn->nodes) { INDENT + config_setting_t *cfg = config_root_setting(&sn->cfg); + + node_type_start(n->_vt, sn->cli.argc, sn->cli.argv, cfg); + } + + info("Starting nodes"); + list_foreach(struct node *n, &sn->nodes) { INDENT + int refs = list_count(&sn->paths, (cmp_cb_t) path_uses_node, n); + if (refs > 0) + node_start(n); + else + warn("No path is using the node %s. Skipping...", node_name(n)); + } + + info("Starting paths"); + list_foreach(struct path *p, &sn->paths) { INDENT + if (p->enabled) { + path_init(p, sn); + path_start(p); + } + else + warn("Path %s is disabled. Skipping...", path_name(p)); + } + + sn->state = STATE_STARTED; + + return 0; +} + +int super_node_stop(struct super_node *sn) +{ + assert(sn->state == STATE_STARTED); + + info("Stopping paths"); + list_foreach(struct path *p, &sn->paths) { INDENT + path_stop(p); + } + + info("Stopping nodes"); + list_foreach(struct node *n, &sn->nodes) { INDENT + node_stop(n); + } + + info("De-initializing node types"); + list_foreach(struct plugin *p, &plugins) { INDENT + if (p->type == PLUGIN_TYPE_NODE) + node_type_stop(&p->node); + } + + web_stop(&sn->web); + api_stop(&sn->api); + log_stop(&sn->log); + + sn->state = STATE_STOPPED; + + return 0; +} + +int super_node_destroy(struct super_node *sn) +{ + assert(sn->state != STATE_DESTROYED); + + config_destroy(&sn->cfg); + + web_destroy(&sn->web); + log_destroy(&sn->log); + api_destroy(&sn->api); + + list_destroy(&sn->plugins, (dtor_cb_t) plugin_destroy, false); + list_destroy(&sn->paths, (dtor_cb_t) path_destroy, true); + list_destroy(&sn->nodes, (dtor_cb_t) node_destroy, true); + + sn->state = STATE_DESTROYED; + + return 0; +} \ No newline at end of file diff --git a/lib/utils.c b/lib/utils.c index 2d8f4c6d9..05848b065 100644 --- a/lib/utils.c +++ b/lib/utils.c @@ -16,7 +16,6 @@ #include #include "config.h" -#include "cfg.h" #include "utils.h" void print_copyright() diff --git a/src/fpga.c b/src/fpga.c index 9218f6c25..bb2a6c2a8 100644 --- a/src/fpga.c +++ b/src/fpga.c @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include #include @@ -41,7 +41,7 @@ int main(int argc, char *argv[]) { int ret; - struct cfg cfg; + struct super_node sn; struct fpga_card *card; if (argc < 3) @@ -52,7 +52,7 @@ int main(int argc, char *argv[]) while ((c = getopt(argc-1, argv+1, "d:")) != -1) { switch (c) { case 'd': - cfg.log.level = strtoul(optarg, &endptr, 10); + sn.log.level = strtoul(optarg, &endptr, 10); break; case '?': @@ -61,20 +61,15 @@ int main(int argc, char *argv[]) } } - info("Parsing configuration"); - cfg_parse(&cfg, argv[1]); + super_node_init(&sn); + super_node_parse_uri(&sn, argv[1]); - info("Initialize logging system"); - log_init(&cfg.log, cfg.log.level, cfg.log.facilities); - - info("Initialize real-time system"); - rt_init(cfg.priority, cfg.affinity); - - info("Initialize memory system"); - memory_init(cfg.hugepages); + log_init(&sn.log, sn.log.level, sn.log.facilities); + rt_init(sn.priority, sn.affinity); + memory_init(sn.hugepages); /* Initialize VILLASfpga card */ - ret = fpga_init(argc, argv, config_root_setting(&cfg.cfg)); + ret = fpga_init(argc, argv, config_root_setting(&sn.cfg)); if (ret) error("Failed to initialize FPGA card"); @@ -92,7 +87,7 @@ int main(int argc, char *argv[]) if (ret) error("Failed to de-initialize FPGA card"); - cfg_destroy(&cfg); + super_node_destroy(&sn); return 0; } diff --git a/src/node.c b/src/node.c index 8ed56c851..d5739e797 100644 --- a/src/node.c +++ b/src/node.c @@ -8,8 +8,7 @@ #include #include -#include -#include +#include #include #include #include @@ -19,27 +18,18 @@ #include #include #include +#include #ifdef ENABLE_OPAL_ASYNC #include #endif -struct cfg cfg; +struct super_node sn; static void quit(int signal, siginfo_t *sinfo, void *ctx) { - info("Stopping paths"); - list_foreach(struct path *p, &cfg.paths) { INDENT - path_stop(p); - } - - info("Stopping nodes"); - list_foreach(struct node *n, &cfg.nodes) { INDENT - node_stop(n); - } - - cfg_deinit(&cfg); - cfg_destroy(&cfg); + super_node_stop(&sn); + super_node_destroy(&sn); info(GRN("Goodbye!")); @@ -92,7 +82,7 @@ int main(int argc, char *argv[]) usage(); #endif - log_init(&cfg.log, V, LOG_ALL); + super_node_init(&sn); info("This is VILLASnode %s (built on %s, %s)", BLD(YEL(VERSION)), BLD(MAG(__DATE__)), BLD(MAG(__TIME__))); @@ -103,30 +93,11 @@ int main(int argc, char *argv[]) signals_init(quit); - cfg_init_pre(&cfg); - cfg_parse_cli(&cfg, argc, argv); - cfg_init_post(&cfg); + super_node_parse_cli(&sn, argc, argv); + super_node_check(&sn); + super_node_start(&sn); - info("Starting nodes"); - list_foreach(struct node *n, &cfg.nodes) { INDENT - int refs = list_count(&cfg.paths, (cmp_cb_t) path_uses_node, n); - if (refs > 0) - node_start(n); - else - warn("No path is using the node %s. Skipping...", node_name(n)); - } - - info("Starting paths"); - list_foreach(struct path *p, &cfg.paths) { INDENT - if (p->enabled) { - path_init(p, &cfg); - path_start(p); - } - else - warn("Path %s is disabled. Skipping...", path_name(p)); - } - - if (cfg.stats > 0) + if (sn.stats > 0) stats_print_header(); struct timespec now, last = time_now(); @@ -134,15 +105,15 @@ int main(int argc, char *argv[]) /* Run! Until signal handler is invoked */ while (1) { now = time_now(); - if (cfg.stats > 0 && time_delta(&last, &now) > cfg.stats) { - list_foreach(struct path *p, &cfg.paths) { + if (sn.stats > 0 && time_delta(&last, &now) > sn.stats) { + list_foreach(struct path *p, &sn.paths) { hook_run(p, NULL, 0, HOOK_PERIODIC); } last = time_now(); } - web_service(&cfg.web); /** @todo Maybe we should move this to another thread */ + web_service(&sn.web); /** @todo Maybe we should move this to another thread */ } return 0; diff --git a/src/pipe.c b/src/pipe.c index be4ee6048..bf3fcbeab 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -14,7 +14,7 @@ #include #include -#include +#include #include #include #include @@ -24,7 +24,7 @@ #include "config.h" -static struct cfg cfg; /**< The global configuration */ +static struct super_node sn = { .state = STATE_DESTROYED }; /**< The global configuration */ struct dir { struct pool pool; @@ -53,10 +53,11 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) pool_destroy(&sendd.pool); } + web_stop(&sn.web); node_stop(node); node_type_stop(node->_vt); - cfg_destroy(&cfg); + super_node_destroy(&sn); info(GRN("Goodbye!")); _exit(EXIT_SUCCESS); @@ -172,15 +173,11 @@ int main(int argc, char *argv[]) ptid = pthread_self(); - /* Parse command line arguments */ - if (argc < 3) - usage(); - /* Default values */ sendd.enabled = true; recvv.enabled = true; - while ((c = getopt(argc-2, argv+2, "hxrsd:")) != -1) { + while ((c = getopt(argc, argv, "hxrsd:")) != -1) { switch (c) { case 'x': reverse = true; @@ -200,24 +197,40 @@ int main(int argc, char *argv[]) exit(c == '?' ? EXIT_FAILURE : EXIT_SUCCESS); } } + + if (argc < optind + 2) { + usage(); + exit(EXIT_FAILURE); + } - log_init(&cfg.log, level, LOG_ALL); + log_init(&sn.log, level, LOG_ALL); + log_start(&sn.log); + + super_node_init(&sn); + super_node_parse_uri(&sn, argv[optind]); + + memory_init(sn.hugepages); signals_init(quit); - cfg_parse(&cfg, argv[1]); - rt_init(cfg.priority, cfg.affinity); - memory_init(cfg.hugepages); + rt_init(sn.priority, sn.affinity); + + web_init(&sn.web, NULL); /* API is disabled in villas-pipe */ + web_start(&sn.web); /* Initialize node */ - node = list_lookup(&cfg.nodes, argv[2]); + node = list_lookup(&sn.nodes, argv[optind+1]); if (!node) - error("Node '%s' does not exist!", argv[2]); + error("Node '%s' does not exist!", argv[optind+1]); if (reverse) node_reverse(node); - ret = node_type_start(node->_vt, argc-optind, argv+optind, config_root_setting(&cfg.cfg)); + ret = node_type_start(node->_vt, argc, argv, config_root_setting(&sn.cfg)); if (ret) error("Failed to intialize node type: %s", node_name(node)); + + ret = node_check(node); + if (ret) + error("Invalid node configuration"); ret = node_start(node); if (ret) @@ -227,8 +240,9 @@ int main(int argc, char *argv[]) pthread_create(&recvv.thread, NULL, recv_loop, NULL); pthread_create(&sendd.thread, NULL, send_loop, NULL); - for (;;) - pause(); + for (;;) { + web_service(&sn.web); + } return 0; }