From cdd5a2ca90006f440add54777b8450e73e88be94 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 11 Mar 2017 23:50:30 -0300 Subject: [PATCH] refactoring: unified states of common objects: nodes, paths, node-types, plugins, hooks, etc.. --- include/villas/api.h | 14 +- include/villas/cfg.h | 9 +- include/villas/common.h | 20 +++ include/villas/fpga/card.h | 10 +- include/villas/fpga/ip.h | 9 +- include/villas/log.h | 9 +- include/villas/node.h | 14 +- include/villas/node_type.h | 6 +- include/villas/path.h | 9 +- include/villas/plugin.h | 11 +- include/villas/pool.h | 7 +- include/villas/queue.h | 12 +- include/villas/web.h | 14 +- lib/api.c | 23 ++- lib/cfg.c | 97 ++++-------- lib/fpga/card.c | 11 +- lib/fpga/ip.c | 2 +- lib/kernel/pci.c | 1 + lib/log.c | 100 ++++++++----- lib/node.c | 211 +++++++++++++++------------ lib/node_type.c | 8 +- lib/nodes/cbuilder.c | 1 + lib/nodes/websocket.c | 6 +- lib/path.c | 292 +++++++++++++++++++------------------ lib/plugin.c | 10 +- lib/pool.c | 19 ++- lib/queue.c | 16 +- lib/stats.c | 1 + lib/web.c | 122 ++++++++++------ 29 files changed, 568 insertions(+), 496 deletions(-) create mode 100644 include/villas/common.h diff --git a/include/villas/api.h b/include/villas/api.h index 9ae344e42..fcc9bdefb 100644 --- a/include/villas/api.h +++ b/include/villas/api.h @@ -7,10 +7,11 @@ #pragma once -#include "list.h" - #include +#include "list.h" +#include "common.h" + /* Forward declarations */ enum lws_callback_reasons; struct lws; @@ -43,10 +44,7 @@ enum api_mode { struct api { struct list sessions; /**< List of currently active connections */ - enum { - API_STATE_DESTROYED, - API_STATE_INITIALIZED - } state; + enum state state; struct cfg *cfg; }; @@ -94,7 +92,9 @@ int api_init(struct api *a, struct cfg *cfg); int api_destroy(struct api *a); -int api_deinit(struct api *a); +int api_start(struct api *a); + +int api_stop(struct api *a); int api_session_init(struct api_session *s, struct api *a, enum api_mode m); diff --git a/include/villas/cfg.h b/include/villas/cfg.h index 05df454aa..4474ef0ff 100644 --- a/include/villas/cfg.h +++ b/include/villas/cfg.h @@ -17,6 +17,7 @@ #include "api.h" #include "web.h" #include "log.h" +#include "common.h" /** Global configuration */ struct cfg { @@ -37,6 +38,8 @@ struct cfg { int argc; char **argv; } cli; + + enum state state; config_t cfg; /**< Pointer to configuration file */ json_t *json; /**< JSON representation of the same config. */ @@ -48,12 +51,12 @@ struct cfg { #endif /** Inititalize configuration object before parsing the configuration. */ -int cfg_init_pre(struct cfg *cfg); +int cfg_init(struct cfg *cfg); /** Initialize after parsing the configuration file. */ -int cfg_init_post(struct cfg *cfg); +int cfg_start(struct cfg *cfg); -int cfg_deinit(struct cfg *cfg); +int cfg_stop(struct cfg *cfg); /** Desctroy configuration object. */ int cfg_destroy(struct cfg *cfg); diff --git a/include/villas/common.h b/include/villas/common.h new file mode 100644 index 000000000..d1f3a210e --- /dev/null +++ b/include/villas/common.h @@ -0,0 +1,20 @@ +/** Some common defines, enums and datastructures. + * + * @file + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + *********************************************************************************/ + +#pragma once + +/* Common states for most objects in VILLASnode (paths, nodes, hooks, plugins) */ +enum state { + STATE_DESTROYED = 0, + STATE_INITIALIZED = 1, + STATE_PARSED = 2, + STATE_CHECKED = 3, + STATE_STARTED = 4, + STATE_LOADED = 4, /* alias for STATE_STARTED used by plugins */ + STATE_STOPPED = 5, + STATE_UNLOADED = 5 /* alias for STATE_STARTED used by plugins */ +}; \ No newline at end of file diff --git a/include/villas/fpga/card.h b/include/villas/fpga/card.h index ecb8cbaa3..54e55e0da 100644 --- a/include/villas/fpga/card.h +++ b/include/villas/fpga/card.h @@ -13,20 +13,16 @@ #include +#include "common.h" + /* Forward declarations */ struct fpga_ip; struct vfio_container; -enum fpga_card_state { - FPGA_CARD_STATE_UNKOWN, - FPGA_CARD_STATE_RESETTED, - FPGA_CARD_STATE_INITIALIZED -}; - struct fpga_card { char *name; /**< The name of the FPGA card */ - enum fpga_card_state state; /**< The state of this FPGA card. */ + enum state state; /**< The state of this FPGA card. */ struct pci_dev filter; /**< Filter for PCI device. */ struct vfio_dev vd; /**< VFIO device handle. */ diff --git a/include/villas/fpga/ip.h b/include/villas/fpga/ip.h index 38069ee49..5b7593136 100644 --- a/include/villas/fpga/ip.h +++ b/include/villas/fpga/ip.h @@ -13,7 +13,7 @@ #include -#include "utils.h" +#include "common.h" #include "nodes/fpga.h" @@ -28,11 +28,6 @@ #include "fpga/ips/dft.h" #include "fpga/ips/intc.h" -enum fpga_ip_state { - IP_STATE_UNKNOWN, - IP_STATE_INITIALIZED -}; - struct fpga_ip_type { struct fpga_vlnv vlnv; @@ -55,7 +50,7 @@ struct fpga_ip { char *name; /**< Name of the FPGA IP component. */ struct fpga_vlnv vlnv; /**< The Vendor, Library, Name, Version tag of the FPGA IP component. */ - enum fpga_ip_state state; /**< The current state of the FPGA IP component. */ + enum state state; /**< The current state of the FPGA IP component. */ struct fpga_ip_type *_vt; /**< Vtable containing FPGA IP type function pointers. */ diff --git a/include/villas/log.h b/include/villas/log.h index 0189983ef..9a50bca8f 100644 --- a/include/villas/log.h +++ b/include/villas/log.h @@ -8,9 +8,10 @@ #pragma once #include +#include #include -#include "utils.h" +#include "common.h" #ifdef __GNUC__ #define INDENT int __attribute__ ((__cleanup__(log_outdent), unused)) _old_indent = log_indent(1); @@ -59,6 +60,8 @@ enum log_facilities { }; struct log { + enum state state; + struct timespec epoch; /**< A global clock used to prefix the log messages. */ /** Debug level used by the debug() macro. @@ -73,7 +76,9 @@ struct log { /** Initialize log object */ int log_init(struct log *l, int level, long faciltities); -int log_deinit(struct log *l); +int log_start(struct log *l); + +int log_stop(struct log *l); /** Destroy log object */ int log_destroy(struct log *l); diff --git a/include/villas/node.h b/include/villas/node.h index 70f91543c..e14ad3cd9 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -18,6 +18,7 @@ #include "sample.h" #include "list.h" #include "queue.h" +#include "common.h" /** The data structure for a node. * @@ -35,16 +36,9 @@ struct node int affinity; /**< CPU Affinity of this node */ unsigned long sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */ - - enum node_state { - NODE_INVALID, /**< This node object is not in a valid state. */ - NODE_CREATED, /**< This node has been parsed from the configuration. */ - NODE_STARTING, /**< This node is currently being started. */ - NODE_RUNNING, /**< This node has been started by calling node_open() */ - NODE_STOPPING, /**< This node is currently shutting down. */ - NODE_STOPPED /**< Node was running, but has been stopped by calling node_close() */ - } state; /**< Node state */ - + + enum state state; + struct node_type *_vt; /**< Virtual functions (C++ OOP style) */ void *_vd; /**< Virtual data (used by struct node::_vt functions) */ diff --git a/include/villas/node_type.h b/include/villas/node_type.h index 929f0403f..b0b5f400b 100644 --- a/include/villas/node_type.h +++ b/include/villas/node_type.h @@ -13,6 +13,7 @@ #include #include "list.h" +#include "common.h" /* Forward declarations */ struct node; @@ -25,10 +26,7 @@ struct node_type { struct list instances; /**< A list of all existing nodes of this type. */ size_t size; /**< Size of private data bock. @see node::_vd */ - enum node_type_state { - NODE_TYPE_DEINITIALIZED = 0, - NODE_TYPE_INITIALIZED - } state; + enum state state; /** Global initialization per node type. * diff --git a/include/villas/path.h b/include/villas/path.h index f2d197096..393bdabc8 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -24,6 +24,7 @@ #include "queue.h" #include "pool.h" #include "stats.h" +#include "common.h" /* Forward declarations */ struct cfg; @@ -47,13 +48,7 @@ struct path_destination /** The datastructure for a path. */ struct path { - enum { - PATH_INVALID, /**< Path is invalid. */ - PATH_INITIALIZED, /**< Path queues, memory pools & hook system initialized. */ - PATH_RUNNING, /**< Path is currently running. */ - PATH_STOPPED, /**< Path has been stopped. */ - PATH_DESTROYED /**< Path is destroyed. */ - } state; /**< Path state */ + enum state state; /**< Path state. */ /* Each path has a single source and multiple destinations */ struct path_source *source; /**< Pointer to the incoming node */ diff --git a/include/villas/plugin.h b/include/villas/plugin.h index dded61cd1..67a2fb833 100644 --- a/include/villas/plugin.h +++ b/include/villas/plugin.h @@ -9,6 +9,8 @@ #include "hook.h" #include "api.h" +#include "common.h" +#include "utils.h" #include "fpga/ip.h" @@ -32,12 +34,6 @@ enum plugin_type { PLUGIN_TYPE_MODEL_CBUILDER }; -enum plugin_state { - PLUGIN_STATE_DESTROYED, - PLUGIN_STATE_UNLOADED, - PLUGIN_STATE_LOADED -}; - struct plugin { char *name; char *description; @@ -45,7 +41,8 @@ struct plugin { char *path; enum plugin_type type; - enum plugin_state state; + + enum state state; int (*load)(struct plugin *p); int (*unload)(struct plugin *p); diff --git a/include/villas/pool.h b/include/villas/pool.h index 5285b2a6e..9112bdf30 100644 --- a/include/villas/pool.h +++ b/include/villas/pool.h @@ -9,9 +9,11 @@ #pragma once +#include #include #include "queue.h" +#include "common.h" #include "memory.h" /** A thread-safe memory pool */ @@ -19,10 +21,7 @@ struct pool { void *buffer; /**< Address of the underlying memory area */ const struct memtype *mem; - enum { - POOL_STATE_DESTROYED, - POOL_STATE_INITIALIZED - } state; + enum state state; size_t len; /**< Length of the underlying memory area */ diff --git a/include/villas/queue.h b/include/villas/queue.h index 3137e874b..07d94c7e2 100644 --- a/include/villas/queue.h +++ b/include/villas/queue.h @@ -33,21 +33,23 @@ #pragma once +#include #include #include -#include "memory.h" +#include "common.h" + +/* Forward declarations */ +struct memtype; #define CACHELINE_SIZE 64 typedef char cacheline_pad_t[CACHELINE_SIZE]; +/** A lock-free multiple-producer, multiple-consumer (MPMC) queue. */ struct queue { cacheline_pad_t _pad0; /**< Shared area: all threads read */ - enum { - QUEUE_STATE_DESTROYED, - QUEUE_STATE_INITIALIZED - } state; + enum state state; struct memtype const * mem; size_t buffer_mask; diff --git a/include/villas/web.h b/include/villas/web.h index 42c3f62dd..072186946 100644 --- a/include/villas/web.h +++ b/include/villas/web.h @@ -7,16 +7,15 @@ #pragma once +#include "common.h" + /* Forward declarations */ struct api; struct web { struct api *api; - enum { - WEB_STATE_DESTROYED, - WEB_STATE_INITIALIZED - } state; + enum state state; struct lws_context *context; /**< The libwebsockets server context. */ struct lws_vhost *vhost; /**< The libwebsockets vhost. */ @@ -35,11 +34,12 @@ int web_init(struct web *w, struct api *a); int web_destroy(struct web *w); +int web_start(struct web *w); + +int web_stop(struct web *w); + /** Parse HTTPd and WebSocket related options */ int web_parse(struct web *w, config_setting_t *lcs); -/** De-initializes the web interface. */ -int web_deinit(struct web *w); - /** libwebsockets service routine. Call periodically */ int web_service(struct web *w); \ No newline at end of file diff --git a/lib/api.c b/lib/api.c index 9caf9294f..622f5c9d5 100644 --- a/lib/api.c +++ b/lib/api.c @@ -226,25 +226,38 @@ int api_init(struct api *a, struct cfg *cfg) list_init(&a->sessions); a->cfg = cfg; - a->state = API_STATE_INITIALIZED; + a->state = STATE_INITIALIZED; return 0; } int api_destroy(struct api *a) { - // if (a->state = API_STATE_INITIALIZED) - // do something + if (a->state == STATE_STARTED) + return -1; - a->state = API_STATE_DESTROYED; + a->state = STATE_DESTROYED; return 0; } -int api_deinit(struct api *a) +int api_start(struct api *a) { + info("Starting API sub-system"); + + a->state = STATE_STARTED; + + return 0; +} + +int api_stop(struct api *a) +{ + info("Stopping API sub-system"); + list_destroy(&a->sessions, (dtor_cb_t) api_session_destroy, false); + a->state = STATE_STOPPED; + return 0; } diff --git a/lib/cfg.c b/lib/cfg.c index 2c43f45b2..8a628a007 100644 --- a/lib/cfg.c +++ b/lib/cfg.c @@ -21,75 +21,46 @@ #include "api.h" #include "plugin.h" #include "node.h" +#include "memory.h" #include "kernel/rt.h" -int cfg_init_pre(struct cfg *cfg) +static int cfg_parse_global(struct cfg *cfg, config_setting_t *cfg_root) +{ + if (!config_setting_is_group(cfg_root)) + cerror(cfg_root, "Global section must be a dictionary."); + + config_setting_lookup_int(cfg_root, "hugepages", &cfg->hugepages); + config_setting_lookup_int(cfg_root, "affinity", &cfg->affinity); + config_setting_lookup_int(cfg_root, "priority", &cfg->priority); + config_setting_lookup_float(cfg_root, "stats", &cfg->stats); + + return 0; +} + +int cfg_init(struct cfg *cfg) { config_init(&cfg->cfg); - - info("Inititliaze logging sub-system"); + log_init(&cfg->log, V, LOG_ALL); + api_init(&cfg->api, cfg); + web_init(&cfg->web, &cfg->api); list_init(&cfg->nodes); list_init(&cfg->paths); list_init(&cfg->plugins); + + /* Default values */ + cfg->affinity = -1; + cfg->stats = 0; + cfg->priority = 50; + cfg->hugepages = DEFAULT_NR_HUGEPAGES; + + cfg->state = STATE_INITIALIZED; return 0; } -int cfg_init_post(struct cfg *cfg) -{ - memory_init(); - rt_init(cfg->priority, cfg->affinity); - api_init(&cfg->api, cfg); - web_init(&cfg->web, &cfg->api); - - info("Initialize node types"); - list_foreach(struct node *n, &cfg->nodes) { INDENT - config_setting_t *cfg_root = config_root_setting(&cfg->cfg); - - node_type_init(n->_vt, cfg->cli.argc, cfg->cli.argv, cfg_root); - } - - return 0; -} - -int cfg_deinit(struct cfg *cfg) -{ - info("De-initializing node types"); - list_foreach(struct plugin *p, &plugins) { INDENT - if (p->type == PLUGIN_TYPE_NODE) - node_type_deinit(&p->node); - } - - info("De-initializing web interface"); - web_deinit(&cfg->web); - - info("De-initialize API"); - api_deinit(&cfg->api); - - info("De-initialize log sub-system"); - log_deinit(&cfg->log); - - return 0; -} - -int cfg_destroy(struct cfg *cfg) -{ - config_destroy(&cfg->cfg); - - web_destroy(&cfg->web); - log_destroy(&cfg->log); - api_destroy(&cfg->api); - - list_destroy(&cfg->plugins, (dtor_cb_t) plugin_destroy, false); - list_destroy(&cfg->paths, (dtor_cb_t) path_destroy, true); - list_destroy(&cfg->nodes, (dtor_cb_t) node_destroy, true); - - return 0; -} - int cfg_parse_cli(struct cfg *cfg, int argc, char *argv[]) { cfg->cli.argc = argc; @@ -164,21 +135,9 @@ int cfg_parse(struct cfg *cfg, const char *uri) cfg->web.htdocs = "/villas/web/socket/"; } - /* Parse global settings */ cfg_root = config_root_setting(&cfg->cfg); - if (cfg_root) { - if (!config_setting_is_group(cfg_root)) - warn("Missing global section in config file."); - - if (!config_setting_lookup_int(cfg_root, "affinity", &cfg->affinity)) - cfg->affinity = 0; - - if (!config_setting_lookup_int(cfg_root, "priority", &cfg->priority)) - cfg->priority = 0; - - if (!config_setting_lookup_float(cfg_root, "stats", &cfg->stats)) - cfg->stats = 0; - } + if (cfg_root) + cfg_parse_global(cfg, cfg_root); cfg_web = config_setting_get_member(cfg_root, "http"); if (cfg_web) diff --git a/lib/fpga/card.c b/lib/fpga/card.c index 8af72cff7..39b28bbae 100644 --- a/lib/fpga/card.c +++ b/lib/fpga/card.c @@ -7,6 +7,9 @@ #include #include "config.h" +#include "log.h" +#include "list.h" +#include "utils.h" #include "kernel/pci.h" #include "kernel/vfio.h" @@ -22,7 +25,7 @@ int fpga_card_init(struct fpga_card *c, struct pci *pci, struct vfio_container * fpga_card_check(c); - if (c->state == FPGA_CARD_STATE_INITIALIZED) + if (c->state == STATE_INITIALIZED) return 0; /* Search for FPGA card */ @@ -78,8 +81,7 @@ int fpga_card_parse(struct fpga_card *c, config_setting_t *cfg) c->filter.id.device = FPGA_PCI_PID_VFPGA; c->name = config_setting_name(cfg); - c->state = FPGA_CARD_STATE_UNKOWN; - + if (!config_setting_lookup_int(cfg, "affinity", &c->affinity)) c->affinity = 0; @@ -129,6 +131,7 @@ int fpga_card_parse(struct fpga_card *c, config_setting_t *cfg) } c->cfg = cfg; + c->state = STATE_PARSED; return 0; } @@ -204,7 +207,7 @@ int fpga_card_reset(struct fpga_card *c) if (rst_reg[0]) return -2; - c->state = FPGA_CARD_STATE_RESETTED; + c->state = STATE_INITIALIZED; return 0; } \ No newline at end of file diff --git a/lib/fpga/ip.c b/lib/fpga/ip.c index 42dfb56de..95db9336d 100644 --- a/lib/fpga/ip.c +++ b/lib/fpga/ip.c @@ -18,7 +18,7 @@ int fpga_ip_init(struct fpga_ip *c) error("Failed to intialize IP core: %s", c->name); if (ret == 0) - c->state = IP_STATE_INITIALIZED; + c->state = STATE_INITIALIZED; debug(8, "IP Core %s initalized (%u)", c->name, ret); diff --git a/lib/kernel/pci.c b/lib/kernel/pci.c index 1dc164627..11aa7e59f 100644 --- a/lib/kernel/pci.c +++ b/lib/kernel/pci.c @@ -10,6 +10,7 @@ #include #include "log.h" +#include "utils.h" #include "kernel/pci.h" #include "config.h" diff --git a/lib/log.c b/lib/log.c index 892b59b51..de554f686 100644 --- a/lib/log.c +++ b/lib/log.c @@ -54,6 +54,67 @@ static const char *facilities_strs[] = { /** The current log indention level (per thread!). */ static __thread int indent = 0; +int log_init(struct log *l, int level, long facilitites) +{ + /* Register this log instance globally */ + log = l; + + l->level = level; + l->facilities = facilitites; + + debug(LOG_LOG | 5, "Log sub-system intialized: level=%d, faciltities=%#lx", level, facilitites); + + l->state = STATE_INITIALIZED; + + return 0; +} + +int log_parse(struct log *l, config_setting_t *cfg) +{ + const char *facilities; + + if (!config_setting_is_group(cfg)) + cerror(cfg, "Setting 'log' must be a group."); + + config_setting_lookup_int(cfg, "level", &l->level); + + if (config_setting_lookup_string(cfg, "facilities", &facilities)) + log_set_facility_expression(l, facilities); + + l->state = STATE_PARSED; + + return 0; +} + +int log_start(struct log *l) +{ + l->epoch = time_now(); + + l->state = STATE_STARTED; + + return 0; +} + +int log_stop(struct log *l) +{ + if (l->state != STATE_STARTED) + return -1; + + l->state = STATE_STOPPED; + + return 0; +} + +int log_destroy(struct log *l) +{ + if (l->state == STATE_STARTED) + return -1; + + l->state = STATE_DESTROYED; + + return 0; +} + int log_indent(int levels) { int old = indent; @@ -117,30 +178,6 @@ found: if (negate) return l->facilities; } -int log_init(struct log *l, int level, long facilitites) -{ - l->epoch = time_now(); - l->level = level; - l->facilities = facilitites; - - /* Register this log instance globally */ - log = l; - - debug(LOG_LOG | 5, "Log sub-system intialized: level=%d, faciltities=%#lx", level, facilitites); - - return 0; -} - -int log_deinit(struct log *l) -{ - return 0; -} - -int log_destroy(struct log *l) -{ - return 0; -} - void log_print(struct log *l, const char *lvl, const char *fmt, ...) { va_list ap; @@ -180,21 +217,6 @@ void log_vprint(struct log *l, const char *lvl, const char *fmt, va_list ap) free(buf); } -int log_parse(struct log *l, config_setting_t *cfg) -{ - const char *facilities; - - if (!config_setting_is_group(cfg)) - cerror(cfg, "Setting 'log' must be a group."); - - config_setting_lookup_int(cfg, "level", &l->level); - - if (config_setting_lookup_string(cfg, "facilities", &facilities)) - log_set_facility_expression(l, facilities); - - return 0; -} - void line() { char buf[LOG_WIDTH]; diff --git a/lib/node.c b/lib/node.c index 06c59c87f..054d5b53f 100644 --- a/lib/node.c +++ b/lib/node.c @@ -14,6 +14,121 @@ #include "config.h" #include "plugin.h" +int node_init(struct node *n) +{ + if (n->state != STATE_DESTROYED) + return -1; + + n->state = STATE_INITIALIZED; + + return 0; +} + +int node_parse(struct node *n, config_setting_t *cfg) +{ + struct plugin *p; + const char *type, *name; + int ret; + + name = config_setting_name(cfg); + + if (!config_setting_lookup_string(cfg, "type", &type)) + cerror(cfg, "Missing node type"); + + p = plugin_lookup(PLUGIN_TYPE_NODE, type); + assert(&p->node == n->_vt); + + if (!config_setting_lookup_int(cfg, "vectorize", &n->vectorize)) + n->vectorize = 1; + + n->name = name; + n->cfg = cfg; + + ret = n->_vt->parse ? n->_vt->parse(n, cfg) : 0; + if (ret) + cerror(cfg, "Failed to parse node '%s'", node_name(n)); + + n->state = STATE_PARSED; + + return ret; +} + +int node_check(struct node *n) +{ + if (n->state != STATE_INITIALIZED || n->state != STATE_PARSED) + return -1; + + if (n->vectorize <= 0) + error("Invalid `vectorize` value %d for node %s. Must be natural number!", n->vectorize, node_name(n)); + + if (n->_vt->vectorize && n->_vt->vectorize < n->vectorize) + error("Invalid value for `vectorize`. Node type requires a number smaller than %d!", + n->_vt->vectorize); + + n->state = STATE_CHECKED; + + return 0; +} + +int node_start(struct node *n) +{ + int ret; + + if (n->state != STATE_CHECKED) + return -1; + + info("Starting node %s", node_name_long(n)); + { INDENT + ret = n->_vt->start ? n->_vt->start(n) : -1; + } + + if (ret == 0) + n->state = STATE_STARTED; + + n->sequence = 0; + + return ret; +} + +int node_stop(struct node *n) +{ + int ret; + + if (n->state != STATE_STARTED) + return -1; + + info("Stopping node %s", node_name(n)); + { INDENT + ret = n->_vt->stop ? n->_vt->stop(n) : -1; + } + + if (ret == 0) + n->state = STATE_STOPPED; + + return ret; +} + +int node_destroy(struct node *n) +{ + if (n->state == STATE_STARTED) + return -1; + + if (n->_vt->destroy) + n->_vt->destroy(n); + + list_remove(&n->_vt->instances, n); + + if (n->_vd) + free(n->_vd); + + if (n->_name) + free(n->_name); + + n->state = STATE_DESTROYED; + + return 0; +} + int node_read(struct node *n, struct sample *smps[], unsigned cnt) { int nread = 0; @@ -56,48 +171,6 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt) return nsent; } -int node_start(struct node *n) -{ - int ret; - - if (n->state != NODE_CREATED && n->state != NODE_STOPPED) - return -1; - - n->state = NODE_STARTING; - - info("Starting node %s", node_name_long(n)); - { INDENT - ret = n->_vt->open ? n->_vt->open(n) : -1; - } - - if (ret == 0) - n->state = NODE_RUNNING; - - n->sequence = 0; - - return ret; -} - -int node_stop(struct node *n) -{ - int ret; - - if (n->state != NODE_RUNNING) - return -1; - - n->state = NODE_STOPPING; - - info("Stopping node %s", node_name(n)); - { INDENT - ret = n->_vt->close ? n->_vt->close(n) : -1; - } - - if (ret == 0) - n->state = NODE_STOPPED; - - return ret; -} - char * node_name(struct node *n) { if (!n->_name) @@ -131,21 +204,8 @@ int node_reverse(struct node *n) return n->_vt->reverse ? n->_vt->reverse(n) : -1; } +int node_parse_list(struct list *list, config_setting_t *cfg, struct list *all) { -int node_destroy(struct node *n) -{ - if (n->_vt->destroy) - n->_vt->destroy(n); - - list_remove(&n->_vt->instances, n); - - free(n->_vd); - free(n->_name); - - return 0; -} - -int node_parse_list(struct list *list, config_setting_t *cfg, struct list *all) { const char *str; struct node *node; @@ -188,42 +248,3 @@ int node_parse_list(struct list *list, config_setting_t *cfg, struct list *all) return list_length(list); } - -int node_parse(struct node *n, config_setting_t *cfg) -{ - struct plugin *p; - const char *type, *name; - int ret; - - name = config_setting_name(cfg); - - if (!config_setting_lookup_string(cfg, "type", &type)) - cerror(cfg, "Missing node type"); - - p = plugin_lookup(PLUGIN_TYPE_NODE, type); - assert(&p->node == n->_vt); - - if (!config_setting_lookup_int(cfg, "vectorize", &n->vectorize)) - n->vectorize = 1; - - n->name = name; - n->cfg = cfg; - - ret = n->_vt->parse ? n->_vt->parse(n, cfg) : 0; - if (ret) - cerror(cfg, "Failed to parse node '%s'", node_name(n)); - - return ret; -} - -int node_check(struct node *n) -{ - if (n->vectorize <= 0) - error("Invalid `vectorize` value %d for node %s. Must be natural number!", n->vectorize, node_name(n)); - - if (n->_vt->vectorize && n->_vt->vectorize < n->vectorize) - error("Invalid value for `vectorize`. Node type requires a number smaller than %d!", - n->_vt->vectorize); - - return 0; -} diff --git a/lib/node_type.c b/lib/node_type.c index 919f03c39..f4e585980 100644 --- a/lib/node_type.c +++ b/lib/node_type.c @@ -18,7 +18,7 @@ int node_type_start(struct node_type *vt, int argc, char *argv[], config_setting { int ret; - if (vt->state != NODE_TYPE_DEINITIALIZED) + if (vt->state == STATE_STARTED) return -1; info("Initializing " YEL("%s") " node type", plugin_name(vt)); @@ -27,7 +27,7 @@ int node_type_start(struct node_type *vt, int argc, char *argv[], config_setting } if (ret == 0) - vt->state = NODE_TYPE_INITIALIZED; + vt->state = STATE_STARTED; return ret; } @@ -36,7 +36,7 @@ int node_type_stop(struct node_type *vt) { int ret; - if (vt->state != NODE_TYPE_INITIALIZED) + if (vt->state != STATE_STARTED) return -1; info("De-initializing " YEL("%s") " node type", plugin_name(vt)); @@ -45,7 +45,7 @@ int node_type_stop(struct node_type *vt) } if (ret == 0) - vt->state = NODE_TYPE_DEINITIALIZED; + vt->state = STATE_DESTROYED; return ret; } diff --git a/lib/nodes/cbuilder.c b/lib/nodes/cbuilder.c index 3447c6211..baaab528b 100644 --- a/lib/nodes/cbuilder.c +++ b/lib/nodes/cbuilder.c @@ -7,6 +7,7 @@ #include "node.h" #include "log.h" #include "plugin.h" +#include "utils.h" #include "nodes/cbuilder.h" diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index cabe6eb61..8b740759e 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -150,7 +150,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi } /* Check if node is running */ - if (c->node->state != NODE_RUNNING) + if (c->node->state != STATE_STARTED) return -1; } @@ -193,7 +193,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi case LWS_CALLBACK_SERVER_WRITEABLE: { w = (struct websocket *) c->node->_vd; - if (c->node && c->node->state != NODE_RUNNING) + if (c->node && c->node->state != STATE_STARTED) return -1; if (c->state == WEBSOCKET_SHUTDOWN) { @@ -226,7 +226,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi case LWS_CALLBACK_RECEIVE: { w = (struct websocket *) c->node->_vd; - if (c->node->state != NODE_RUNNING) + if (c->node->state != STATE_STARTED) return -1; if (!lws_frame_is_binary(wsi) || len < WEBMSG_LEN(0)) diff --git a/lib/path.c b/lib/path.c index 80033359a..f85f27c89 100644 --- a/lib/path.c +++ b/lib/path.c @@ -20,6 +20,7 @@ #include "hook.h" #include "plugin.h" #include "cfg.h" +#include "memory.h" static void path_read(struct path *p) { @@ -125,63 +126,6 @@ static void * path_run(void *arg) return NULL; } -int path_start(struct path *p) -{ - int ret; - - info("Starting path: %s (#hooks=%zu)", - path_name(p), list_length(&p->hooks)); - - ret = hook_run(p, NULL, 0, HOOK_PATH_START); - if (ret) - return -1; - - p->state = PATH_RUNNING; - - return pthread_create(&p->tid, NULL, &path_run, p); -} - -int path_stop(struct path *p) -{ - int ret; - - info("Stopping path: %s", path_name(p)); - - pthread_cancel(p->tid); - pthread_join(p->tid, NULL); - - ret = hook_run(p, NULL, 0, HOOK_PATH_STOP); - if (ret) - return -1; - - p->state = PATH_STOPPED; - - return 0; -} - -const char * path_name(struct path *p) -{ - if (!p->_name) { - if (list_length(&p->destinations) == 1) { - struct path_destination *pd = (struct path_destination *) list_first(&p->destinations); - - strcatf(&p->_name, "%s " MAG("=>") " %s", - node_name_short(p->source->node), - node_name_short(pd->node)); - } - else { - strcatf(&p->_name, "%s " MAG("=>") " [", node_name_short(p->source->node)); - - list_foreach(struct path_destination *pd, &p->destinations) - strcatf(&p->_name, " %s", node_name_short(pd->node)); - - strcatf(&p->_name, " ]"); - } - } - - return p->_name; -} - static int path_source_destroy(struct path_source *ps) { pool_destroy(&ps->pool); @@ -196,41 +140,13 @@ static int path_destination_destroy(struct path_destination *pd) return 0; } -int path_destroy(struct path *p) -{ - list_destroy(&p->hooks, (dtor_cb_t) hook_destroy, true); - list_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true); - - path_source_destroy(p->source); - - if (p->_name) - free(p->_name); - - if (p->source) - free(p->source); - - p->state = PATH_DESTROYED; - - return 0; -} - -int path_check(struct path *p) -{ - list_foreach (struct node *n, &p->destinations) { - if (!n->_vt->write) - error("Destiation node '%s' is not supported as a sink for path '%s'", node_name(n), path_name(p)); - } - - if (!p->source->node->_vt->read) - error("Source node '%s' is not supported as source for path '%s'", node_name(p->source->node), path_name(p)); - - return 0; -} - int path_init(struct path *p, struct cfg *cfg) { int ret, max_queuelen = 0; + if (p->state != STATE_DESTROYED) + return -1; + /* Add internal hooks if they are not already in the list*/ list_foreach(struct plugin *pl, &plugins) { if (pl->type == PLUGIN_TYPE_HOOK) { @@ -238,7 +154,7 @@ int path_init(struct path *p, struct cfg *cfg) if ((h->type & HOOK_AUTO) && /* should this hook be added implicitely? */ (list_lookup(&p->hooks, pl->name) == NULL)) /* is not already in list? */ - list_push(&p->hooks, memdup(h, sizeof(struct hook))); + list_push(&p->hooks, memdup(h, sizeof(h))); } } @@ -268,63 +184,11 @@ int path_init(struct path *p, struct cfg *cfg) if (ret) error("Failed to allocate memory pool for path"); - p->state = PATH_INITIALIZED; + p->state = STATE_INITIALIZED; return 0; } -int path_uses_node(struct path *p, struct node *n) { - list_foreach(struct path_destination *pd, &p->destinations) { - if (pd->node == n) - return 0; - } - - return p->source->node == n ? 0 : -1; -} - -int path_reverse(struct path *p, struct path *r) -{ - int ret; - - if (list_length(&p->destinations) > 1) - return -1; - - struct path_destination *first_pd = list_first(&p->destinations); - - list_init(&r->destinations); - list_init(&r->hooks); - - /* General */ - r->enabled = p->enabled; - r->cfg = p->cfg; - - struct path_destination *pd = alloc(sizeof(struct path_destination)); - - pd->node = p->source->node; - pd->queuelen = first_pd->queuelen; - - list_push(&r->destinations, pd); - - struct path_source *ps = alloc(sizeof(struct path_source)); - - ps->node = first_pd->node; - ps->samplelen = p->source->samplelen; - - r->source = ps; - - list_foreach(struct hook *h, &p->hooks) { - struct hook *hc = alloc(sizeof(struct hook)); - - ret = hook_copy(h, hc); - if (ret) - return ret; - - list_push(&r->hooks, hc); - } - - return 0; -} - int path_parse(struct path *p, config_setting_t *cfg, struct list *nodes) { config_setting_t *cfg_out, *cfg_hook; @@ -402,5 +266,149 @@ int path_parse(struct path *p, config_setting_t *cfg, struct list *nodes) list_destroy(&destinations, NULL, false); + return 0; +} + +int path_check(struct path *p) +{ + list_foreach (struct node *n, &p->destinations) { + if (!n->_vt->write) + error("Destiation node '%s' is not supported as a sink for path '%s'", node_name(n), path_name(p)); + } + + if (!p->source->node->_vt->read) + error("Source node '%s' is not supported as source for path '%s'", node_name(p->source->node), path_name(p)); + + return 0; +} + +int path_start(struct path *p) +{ + int ret; + + info("Starting path: %s (#hooks=%zu)", + path_name(p), list_length(&p->hooks)); + + ret = hook_run(p, NULL, 0, HOOK_PATH_START); + if (ret) + return ret; + + ret = pthread_create(&p->tid, NULL, &path_run, p); + if (ret) + return ret; + + p->state = STATE_STARTED; + + return 0; +} + +int path_stop(struct path *p) +{ + int ret; + + info("Stopping path: %s", path_name(p)); + + pthread_cancel(p->tid); + pthread_join(p->tid, NULL); + + ret = hook_run(p, NULL, 0, HOOK_PATH_STOP); + if (ret) + return -1; + + p->state = STATE_STOPPED; + + return 0; +} + +int path_destroy(struct path *p) +{ + list_destroy(&p->hooks, (dtor_cb_t) hook_destroy, true); + list_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true); + + path_source_destroy(p->source); + + if (p->_name) + free(p->_name); + + if (p->source) + free(p->source); + + p->state = STATE_DESTROYED; + + return 0; +} + +const char * path_name(struct path *p) +{ + if (!p->_name) { + if (list_length(&p->destinations) == 1) { + struct path_destination *pd = (struct path_destination *) list_first(&p->destinations); + + strcatf(&p->_name, "%s " MAG("=>") " %s", + node_name_short(p->source->node), + node_name_short(pd->node)); + } + else { + strcatf(&p->_name, "%s " MAG("=>") " [", node_name_short(p->source->node)); + + list_foreach(struct path_destination *pd, &p->destinations) + strcatf(&p->_name, " %s", node_name_short(pd->node)); + + strcatf(&p->_name, " ]"); + } + } + + return p->_name; +} + +int path_uses_node(struct path *p, struct node *n) { + list_foreach(struct path_destination *pd, &p->destinations) { + if (pd->node == n) + return 0; + } + + return p->source->node == n ? 0 : -1; +} + +int path_reverse(struct path *p, struct path *r) +{ + int ret; + + if (list_length(&p->destinations) > 1) + return -1; + + struct path_destination *first_pd = list_first(&p->destinations); + + list_init(&r->destinations); + list_init(&r->hooks); + + /* General */ + r->enabled = p->enabled; + r->cfg = p->cfg; + + struct path_destination *pd = alloc(sizeof(struct path_destination)); + + pd->node = p->source->node; + pd->queuelen = first_pd->queuelen; + + list_push(&r->destinations, pd); + + struct path_source *ps = alloc(sizeof(struct path_source)); + + ps->node = first_pd->node; + ps->samplelen = p->source->samplelen; + + r->source = ps; + + list_foreach(struct hook *h, &p->hooks) { + struct hook *hc = alloc(sizeof(struct hook)); + + ret = hook_copy(h, hc); + if (ret) + return ret; + + list_push(&r->hooks, hc); + } + return 0; } \ No newline at end of file diff --git a/lib/plugin.c b/lib/plugin.c index 4ac431457..4341cbbb1 100644 --- a/lib/plugin.c +++ b/lib/plugin.c @@ -16,7 +16,7 @@ int plugin_init(struct plugin *p, char *name, char *path) p->name = strdup(name); p->path = strdup(path); - p->state = PLUGIN_STATE_UNLOADED; + p->state = STATE_INITIALIZED; return 0; } @@ -27,7 +27,7 @@ int plugin_load(struct plugin *p) if (!p->path) return -1; - p->state = PLUGIN_STATE_LOADED; + p->state = STATE_LOADED; return 0; } @@ -36,21 +36,21 @@ int plugin_unload(struct plugin *p) { int ret; - if (p->state != PLUGIN_STATE_LOADED) + if (p->state != STATE_LOADED) return -1; ret = dlclose(p->handle); if (ret) return -1; - p->state = PLUGIN_STATE_UNLOADED; + p->state = STATE_UNLOADED; return 0; } int plugin_destroy(struct plugin *p) { - if (p->state == PLUGIN_STATE_LOADED) + if (p->state == STATE_LOADED) plugin_unload(p); if (p->path) diff --git a/lib/pool.c b/lib/pool.c index e80b5bff5..5ec74a95c 100644 --- a/lib/pool.c +++ b/lib/pool.c @@ -13,6 +13,9 @@ int pool_init(struct pool *p, size_t cnt, size_t blocksz, const struct memtype *m) { int ret; + + if (p->state != STATE_DESTROYED) + return -1; /* Make sure that we use a block size that is aligned to the size of a cache line */ p->alignment = kernel_get_cacheline_size(); @@ -33,19 +36,23 @@ int pool_init(struct pool *p, size_t cnt, size_t blocksz, const struct memtype * for (int i = 0; i < cnt; i++) queue_push(&p->queue, (char *) p->buffer + i * p->blocksz); - p->state = POOL_STATE_INITIALIZED; + p->state = STATE_INITIALIZED; return 0; } int pool_destroy(struct pool *p) { + int ret; + + if (p->state != STATE_INITIALIZED) + return -1; + queue_destroy(&p->queue); - if (p->state == POOL_STATE_INITIALIZED) - return memory_free(p->mem, p->buffer, p->len); - - p->state = POOL_STATE_DESTROYED; + ret = memory_free(p->mem, p->buffer, p->len); + if (ret == 0) + p->state = STATE_DESTROYED; - return 0; + return ret; } \ No newline at end of file diff --git a/lib/queue.c b/lib/queue.c index a2cc3bcc6..408c63d83 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -33,11 +33,14 @@ #include "queue.h" #include "utils.h" +#include "memory.h" /** Initialize MPMC queue */ int queue_init(struct queue *q, size_t size, const struct memtype *mem) { - + if (q->state != STATE_DESTROYED) + return -1; + /* Queue size must be 2 exponent */ if (!IS_POW2(size)) { size_t old_size = size; @@ -57,7 +60,7 @@ int queue_init(struct queue *q, size_t size, const struct memtype *mem) atomic_store_explicit(&q->tail, 0, memory_order_relaxed); atomic_store_explicit(&q->head, 0, memory_order_relaxed); - q->state = QUEUE_STATE_INITIALIZED; + q->state = STATE_INITIALIZED; return 0; } @@ -66,10 +69,13 @@ int queue_destroy(struct queue *q) { int ret = 0; - if (q->state == QUEUE_STATE_INITIALIZED) - ret = memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(q->buffer[0])); + if (q->state != STATE_INITIALIZED) + return -1; + + ret = memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(q->buffer[0])); - q->state = QUEUE_STATE_DESTROYED; + if (ret == 0) + q->state = STATE_DESTROYED; return ret; } diff --git a/lib/stats.c b/lib/stats.c index fd137f8ca..c5b995cab 100644 --- a/lib/stats.c +++ b/lib/stats.c @@ -11,6 +11,7 @@ #include "timing.h" #include "path.h" #include "sample.h" +#include "utils.h" #include "log.h" static struct stats_desc { diff --git a/lib/web.c b/lib/web.c index 042651230..f3f7bb4df 100644 --- a/lib/web.c +++ b/lib/web.c @@ -17,32 +17,44 @@ #include "nodes/websocket.h" /* Forward declarations */ -lws_callback_function api_protocol_cb; +lws_callback_function api_ws_protocol_cb; +lws_callback_function api_http_protocol_cb; lws_callback_function websocket_protocol_cb; -/** Path to the directory which should be served by build in HTTP server */ -static char htdocs[PATH_MAX] = "/usr/local/share/villas/node/htdocs"; - /** List of libwebsockets protocols. */ static struct lws_protocols protocols[] = { { - .name = "http-only", - .callback = api_protocol_cb, + .name = "http-api", + .callback = api_http_protocol_cb, .per_session_data_size = sizeof(struct api_session), .rx_buffer_size = 0 }, - { - .name = "live", - .callback = websocket_protocol_cb, - .per_session_data_size = sizeof(struct websocket_connection), - .rx_buffer_size = 0 - }, { .name = "api", - .callback = api_protocol_cb, + .callback = api_ws_protocol_cb, .per_session_data_size = sizeof(struct api_session), .rx_buffer_size = 0 }, +#if 0 /* not supported yet */ + { + .name = "log", + .callback = log_ws_protocol_cb, + .per_session_data_size = 0, + .rx_buffer_size = 0 + }, + { + .name = "stats", + .callback = stats_ws_protocol_cb, + .per_session_data_size = sizeof(struct api_session), + .rx_buffer_size = 0 + }, +#endif + { + .name = "live", + .callback = websocket_protocol_cb, + .per_session_data_size = sizeof(struct websocket_connection), + .rx_buffer_size = 0 + }, { NULL /* terminator */ } }; @@ -50,22 +62,8 @@ static struct lws_protocols protocols[] = { static struct lws_http_mount mounts[] = { { .mount_next = &mounts[1], - .mountpoint = "/api/v1/", - .origin = "cmd", - .def = NULL, - .cgienv = NULL, - .cgi_timeout = 0, - .cache_max_age = 0, - .cache_reusable = 0, - .cache_revalidate = 0, - .cache_intermediaries = 0, - .origin_protocol = LWSMPRO_CALLBACK, - .mountpoint_len = 8 - }, - { - .mount_next = NULL, .mountpoint = "/", - .origin = htdocs, + .origin = NULL, .def = "/index.html", .cgienv = NULL, .cgi_timeout = 0, @@ -75,6 +73,20 @@ static struct lws_http_mount mounts[] = { .cache_intermediaries = 0, .origin_protocol = LWSMPRO_FILE, .mountpoint_len = 1 + }, + { + .mount_next = NULL, + .mountpoint = "/api/v1/", + .origin = "http-api", + .def = NULL, + .cgienv = NULL, + .cgi_timeout = 0, + .cache_max_age = 0, + .cache_reusable = 0, + .cache_revalidate = 0, + .cache_intermediaries = 0, + .origin_protocol = LWSMPRO_CALLBACK, + .mountpoint_len = 8 } }; @@ -110,9 +122,15 @@ static void logger(int level, const char *msg) { } } -int web_service(struct web *w) +int web_init(struct web *w, struct api *a) { - return lws_service(w->context, 10); + info("Initialize web sub-system"); + + w->api = a; + + w->state = STATE_INITIALIZED; + + return 0; } int web_parse(struct web *w, config_setting_t *cfg) @@ -120,23 +138,24 @@ int web_parse(struct web *w, config_setting_t *cfg) if (!config_setting_is_group(cfg)) cerror(cfg, "Setting 'http' must be a group."); - /* Parse global config */ config_setting_lookup_string(cfg, "ssl_cert", &w->ssl_cert); config_setting_lookup_string(cfg, "ssl_private_key", &w->ssl_private_key); - config_setting_lookup_int(cfg, "port", &w->port); - config_setting_lookup_string(cfg, "htdocs", &w->htdocs); + if (!config_setting_lookup_int(cfg, "port", &w->port)) + w->port = 80; + + if (!config_setting_lookup_string(cfg, "htdocs", &w->htdocs)) + w->htdocs = "/usr/share/villas/htdocs"; + + w->state = STATE_PARSED; + return 0; } -int web_init(struct web *w, struct api *a) +int web_start(struct web *w) { - info("Initialize web sub-system"); - - w->api = a; - - /** @todo this is a hack */ - strncpy(htdocs, w->htdocs, sizeof(htdocs)); + /* update web root of mount point */ + mounts[0].origin = w->htdocs; lws_set_log_level((1 << LLL_COUNT) - 1, logger); @@ -165,24 +184,31 @@ int web_init(struct web *w, struct api *a) if (w->vhost == NULL) error("WebSocket: failed to initialize server"); - w->state = WEB_STATE_INITIALIZED; + w->state = STATE_STARTED; + + return 0; +} + +int web_stop(struct web *w) +{ + lws_cancel_service(w->context); return 0; } int web_destroy(struct web *w) { - if (w->state == WEB_STATE_INITIALIZED) - lws_context_destroy(w->context); + if (w->state == STATE_STARTED) + return -1; + + lws_context_destroy(w->context); - w->state = WEB_STATE_DESTROYED; + w->state = STATE_DESTROYED; return 0; } -int web_deinit(struct web *w) +int web_service(struct web *w) { - lws_cancel_service(w->context); - - return 0; + return lws_service(w->context, 10); }