1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

refactoring: unified states of common objects: nodes, paths, node-types, plugins, hooks, etc..

This commit is contained in:
Steffen Vogel 2017-03-11 23:50:30 -03:00
parent c84bfbcc7c
commit cdd5a2ca90
29 changed files with 568 additions and 496 deletions

View file

@ -7,10 +7,11 @@
#pragma once
#include "list.h"
#include <jansson.h>
#include "list.h"
#include "common.h"
/* Forward declarations */
enum lws_callback_reasons;
struct lws;
@ -43,10 +44,7 @@ enum api_mode {
struct api {
struct list sessions; /**< List of currently active connections */
enum {
API_STATE_DESTROYED,
API_STATE_INITIALIZED
} state;
enum state state;
struct cfg *cfg;
};
@ -94,7 +92,9 @@ int api_init(struct api *a, struct cfg *cfg);
int api_destroy(struct api *a);
int api_deinit(struct api *a);
int api_start(struct api *a);
int api_stop(struct api *a);
int api_session_init(struct api_session *s, struct api *a, enum api_mode m);

View file

@ -17,6 +17,7 @@
#include "api.h"
#include "web.h"
#include "log.h"
#include "common.h"
/** Global configuration */
struct cfg {
@ -37,6 +38,8 @@ struct cfg {
int argc;
char **argv;
} cli;
enum state state;
config_t cfg; /**< Pointer to configuration file */
json_t *json; /**< JSON representation of the same config. */
@ -48,12 +51,12 @@ struct cfg {
#endif
/** Inititalize configuration object before parsing the configuration. */
int cfg_init_pre(struct cfg *cfg);
int cfg_init(struct cfg *cfg);
/** Initialize after parsing the configuration file. */
int cfg_init_post(struct cfg *cfg);
int cfg_start(struct cfg *cfg);
int cfg_deinit(struct cfg *cfg);
int cfg_stop(struct cfg *cfg);
/** Desctroy configuration object. */
int cfg_destroy(struct cfg *cfg);

20
include/villas/common.h Normal file
View file

@ -0,0 +1,20 @@
/** Some common defines, enums and datastructures.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
*********************************************************************************/
#pragma once
/* Common states for most objects in VILLASnode (paths, nodes, hooks, plugins) */
enum state {
STATE_DESTROYED = 0,
STATE_INITIALIZED = 1,
STATE_PARSED = 2,
STATE_CHECKED = 3,
STATE_STARTED = 4,
STATE_LOADED = 4, /* alias for STATE_STARTED used by plugins */
STATE_STOPPED = 5,
STATE_UNLOADED = 5 /* alias for STATE_STARTED used by plugins */
};

View file

@ -13,20 +13,16 @@
#include <libconfig.h>
#include "common.h"
/* Forward declarations */
struct fpga_ip;
struct vfio_container;
enum fpga_card_state {
FPGA_CARD_STATE_UNKOWN,
FPGA_CARD_STATE_RESETTED,
FPGA_CARD_STATE_INITIALIZED
};
struct fpga_card {
char *name; /**< The name of the FPGA card */
enum fpga_card_state state; /**< The state of this FPGA card. */
enum state state; /**< The state of this FPGA card. */
struct pci_dev filter; /**< Filter for PCI device. */
struct vfio_dev vd; /**< VFIO device handle. */

View file

@ -13,7 +13,7 @@
#include <stdint.h>
#include "utils.h"
#include "common.h"
#include "nodes/fpga.h"
@ -28,11 +28,6 @@
#include "fpga/ips/dft.h"
#include "fpga/ips/intc.h"
enum fpga_ip_state {
IP_STATE_UNKNOWN,
IP_STATE_INITIALIZED
};
struct fpga_ip_type {
struct fpga_vlnv vlnv;
@ -55,7 +50,7 @@ struct fpga_ip {
char *name; /**< Name of the FPGA IP component. */
struct fpga_vlnv vlnv; /**< The Vendor, Library, Name, Version tag of the FPGA IP component. */
enum fpga_ip_state state; /**< The current state of the FPGA IP component. */
enum state state; /**< The current state of the FPGA IP component. */
struct fpga_ip_type *_vt; /**< Vtable containing FPGA IP type function pointers. */

View file

@ -8,9 +8,10 @@
#pragma once
#include <stdarg.h>
#include <time.h>
#include <libconfig.h>
#include "utils.h"
#include "common.h"
#ifdef __GNUC__
#define INDENT int __attribute__ ((__cleanup__(log_outdent), unused)) _old_indent = log_indent(1);
@ -59,6 +60,8 @@ enum log_facilities {
};
struct log {
enum state state;
struct timespec epoch; /**< A global clock used to prefix the log messages. */
/** Debug level used by the debug() macro.
@ -73,7 +76,9 @@ struct log {
/** Initialize log object */
int log_init(struct log *l, int level, long faciltities);
int log_deinit(struct log *l);
int log_start(struct log *l);
int log_stop(struct log *l);
/** Destroy log object */
int log_destroy(struct log *l);

View file

@ -18,6 +18,7 @@
#include "sample.h"
#include "list.h"
#include "queue.h"
#include "common.h"
/** The data structure for a node.
*
@ -35,16 +36,9 @@ struct node
int affinity; /**< CPU Affinity of this node */
unsigned long sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */
enum node_state {
NODE_INVALID, /**< This node object is not in a valid state. */
NODE_CREATED, /**< This node has been parsed from the configuration. */
NODE_STARTING, /**< This node is currently being started. */
NODE_RUNNING, /**< This node has been started by calling node_open() */
NODE_STOPPING, /**< This node is currently shutting down. */
NODE_STOPPED /**< Node was running, but has been stopped by calling node_close() */
} state; /**< Node state */
enum state state;
struct node_type *_vt; /**< Virtual functions (C++ OOP style) */
void *_vd; /**< Virtual data (used by struct node::_vt functions) */

View file

@ -13,6 +13,7 @@
#include <libconfig.h>
#include "list.h"
#include "common.h"
/* Forward declarations */
struct node;
@ -25,10 +26,7 @@ struct node_type {
struct list instances; /**< A list of all existing nodes of this type. */
size_t size; /**< Size of private data bock. @see node::_vd */
enum node_type_state {
NODE_TYPE_DEINITIALIZED = 0,
NODE_TYPE_INITIALIZED
} state;
enum state state;
/** Global initialization per node type.
*

View file

@ -24,6 +24,7 @@
#include "queue.h"
#include "pool.h"
#include "stats.h"
#include "common.h"
/* Forward declarations */
struct cfg;
@ -47,13 +48,7 @@ struct path_destination
/** The datastructure for a path. */
struct path
{
enum {
PATH_INVALID, /**< Path is invalid. */
PATH_INITIALIZED, /**< Path queues, memory pools & hook system initialized. */
PATH_RUNNING, /**< Path is currently running. */
PATH_STOPPED, /**< Path has been stopped. */
PATH_DESTROYED /**< Path is destroyed. */
} state; /**< Path state */
enum state state; /**< Path state. */
/* Each path has a single source and multiple destinations */
struct path_source *source; /**< Pointer to the incoming node */

View file

@ -9,6 +9,8 @@
#include "hook.h"
#include "api.h"
#include "common.h"
#include "utils.h"
#include "fpga/ip.h"
@ -32,12 +34,6 @@ enum plugin_type {
PLUGIN_TYPE_MODEL_CBUILDER
};
enum plugin_state {
PLUGIN_STATE_DESTROYED,
PLUGIN_STATE_UNLOADED,
PLUGIN_STATE_LOADED
};
struct plugin {
char *name;
char *description;
@ -45,7 +41,8 @@ struct plugin {
char *path;
enum plugin_type type;
enum plugin_state state;
enum state state;
int (*load)(struct plugin *p);
int (*unload)(struct plugin *p);

View file

@ -9,9 +9,11 @@
#pragma once
#include <stddef.h>
#include <sys/types.h>
#include "queue.h"
#include "common.h"
#include "memory.h"
/** A thread-safe memory pool */
@ -19,10 +21,7 @@ struct pool {
void *buffer; /**< Address of the underlying memory area */
const struct memtype *mem;
enum {
POOL_STATE_DESTROYED,
POOL_STATE_INITIALIZED
} state;
enum state state;
size_t len; /**< Length of the underlying memory area */

View file

@ -33,21 +33,23 @@
#pragma once
#include <stddef.h>
#include <stdint.h>
#include <stdatomic.h>
#include "memory.h"
#include "common.h"
/* Forward declarations */
struct memtype;
#define CACHELINE_SIZE 64
typedef char cacheline_pad_t[CACHELINE_SIZE];
/** A lock-free multiple-producer, multiple-consumer (MPMC) queue. */
struct queue {
cacheline_pad_t _pad0; /**< Shared area: all threads read */
enum {
QUEUE_STATE_DESTROYED,
QUEUE_STATE_INITIALIZED
} state;
enum state state;
struct memtype const * mem;
size_t buffer_mask;

View file

@ -7,16 +7,15 @@
#pragma once
#include "common.h"
/* Forward declarations */
struct api;
struct web {
struct api *api;
enum {
WEB_STATE_DESTROYED,
WEB_STATE_INITIALIZED
} state;
enum state state;
struct lws_context *context; /**< The libwebsockets server context. */
struct lws_vhost *vhost; /**< The libwebsockets vhost. */
@ -35,11 +34,12 @@ int web_init(struct web *w, struct api *a);
int web_destroy(struct web *w);
int web_start(struct web *w);
int web_stop(struct web *w);
/** Parse HTTPd and WebSocket related options */
int web_parse(struct web *w, config_setting_t *lcs);
/** De-initializes the web interface. */
int web_deinit(struct web *w);
/** libwebsockets service routine. Call periodically */
int web_service(struct web *w);

View file

@ -226,25 +226,38 @@ int api_init(struct api *a, struct cfg *cfg)
list_init(&a->sessions);
a->cfg = cfg;
a->state = API_STATE_INITIALIZED;
a->state = STATE_INITIALIZED;
return 0;
}
int api_destroy(struct api *a)
{
// if (a->state = API_STATE_INITIALIZED)
// do something
if (a->state == STATE_STARTED)
return -1;
a->state = API_STATE_DESTROYED;
a->state = STATE_DESTROYED;
return 0;
}
int api_deinit(struct api *a)
int api_start(struct api *a)
{
info("Starting API sub-system");
a->state = STATE_STARTED;
return 0;
}
int api_stop(struct api *a)
{
info("Stopping API sub-system");
list_destroy(&a->sessions, (dtor_cb_t) api_session_destroy, false);
a->state = STATE_STOPPED;
return 0;
}

View file

@ -21,75 +21,46 @@
#include "api.h"
#include "plugin.h"
#include "node.h"
#include "memory.h"
#include "kernel/rt.h"
int cfg_init_pre(struct cfg *cfg)
static int cfg_parse_global(struct cfg *cfg, config_setting_t *cfg_root)
{
if (!config_setting_is_group(cfg_root))
cerror(cfg_root, "Global section must be a dictionary.");
config_setting_lookup_int(cfg_root, "hugepages", &cfg->hugepages);
config_setting_lookup_int(cfg_root, "affinity", &cfg->affinity);
config_setting_lookup_int(cfg_root, "priority", &cfg->priority);
config_setting_lookup_float(cfg_root, "stats", &cfg->stats);
return 0;
}
int cfg_init(struct cfg *cfg)
{
config_init(&cfg->cfg);
info("Inititliaze logging sub-system");
log_init(&cfg->log, V, LOG_ALL);
api_init(&cfg->api, cfg);
web_init(&cfg->web, &cfg->api);
list_init(&cfg->nodes);
list_init(&cfg->paths);
list_init(&cfg->plugins);
/* Default values */
cfg->affinity = -1;
cfg->stats = 0;
cfg->priority = 50;
cfg->hugepages = DEFAULT_NR_HUGEPAGES;
cfg->state = STATE_INITIALIZED;
return 0;
}
int cfg_init_post(struct cfg *cfg)
{
memory_init();
rt_init(cfg->priority, cfg->affinity);
api_init(&cfg->api, cfg);
web_init(&cfg->web, &cfg->api);
info("Initialize node types");
list_foreach(struct node *n, &cfg->nodes) { INDENT
config_setting_t *cfg_root = config_root_setting(&cfg->cfg);
node_type_init(n->_vt, cfg->cli.argc, cfg->cli.argv, cfg_root);
}
return 0;
}
int cfg_deinit(struct cfg *cfg)
{
info("De-initializing node types");
list_foreach(struct plugin *p, &plugins) { INDENT
if (p->type == PLUGIN_TYPE_NODE)
node_type_deinit(&p->node);
}
info("De-initializing web interface");
web_deinit(&cfg->web);
info("De-initialize API");
api_deinit(&cfg->api);
info("De-initialize log sub-system");
log_deinit(&cfg->log);
return 0;
}
int cfg_destroy(struct cfg *cfg)
{
config_destroy(&cfg->cfg);
web_destroy(&cfg->web);
log_destroy(&cfg->log);
api_destroy(&cfg->api);
list_destroy(&cfg->plugins, (dtor_cb_t) plugin_destroy, false);
list_destroy(&cfg->paths, (dtor_cb_t) path_destroy, true);
list_destroy(&cfg->nodes, (dtor_cb_t) node_destroy, true);
return 0;
}
int cfg_parse_cli(struct cfg *cfg, int argc, char *argv[])
{
cfg->cli.argc = argc;
@ -164,21 +135,9 @@ int cfg_parse(struct cfg *cfg, const char *uri)
cfg->web.htdocs = "/villas/web/socket/";
}
/* Parse global settings */
cfg_root = config_root_setting(&cfg->cfg);
if (cfg_root) {
if (!config_setting_is_group(cfg_root))
warn("Missing global section in config file.");
if (!config_setting_lookup_int(cfg_root, "affinity", &cfg->affinity))
cfg->affinity = 0;
if (!config_setting_lookup_int(cfg_root, "priority", &cfg->priority))
cfg->priority = 0;
if (!config_setting_lookup_float(cfg_root, "stats", &cfg->stats))
cfg->stats = 0;
}
if (cfg_root)
cfg_parse_global(cfg, cfg_root);
cfg_web = config_setting_get_member(cfg_root, "http");
if (cfg_web)

View file

@ -7,6 +7,9 @@
#include <unistd.h>
#include "config.h"
#include "log.h"
#include "list.h"
#include "utils.h"
#include "kernel/pci.h"
#include "kernel/vfio.h"
@ -22,7 +25,7 @@ int fpga_card_init(struct fpga_card *c, struct pci *pci, struct vfio_container *
fpga_card_check(c);
if (c->state == FPGA_CARD_STATE_INITIALIZED)
if (c->state == STATE_INITIALIZED)
return 0;
/* Search for FPGA card */
@ -78,8 +81,7 @@ int fpga_card_parse(struct fpga_card *c, config_setting_t *cfg)
c->filter.id.device = FPGA_PCI_PID_VFPGA;
c->name = config_setting_name(cfg);
c->state = FPGA_CARD_STATE_UNKOWN;
if (!config_setting_lookup_int(cfg, "affinity", &c->affinity))
c->affinity = 0;
@ -129,6 +131,7 @@ int fpga_card_parse(struct fpga_card *c, config_setting_t *cfg)
}
c->cfg = cfg;
c->state = STATE_PARSED;
return 0;
}
@ -204,7 +207,7 @@ int fpga_card_reset(struct fpga_card *c)
if (rst_reg[0])
return -2;
c->state = FPGA_CARD_STATE_RESETTED;
c->state = STATE_INITIALIZED;
return 0;
}

View file

@ -18,7 +18,7 @@ int fpga_ip_init(struct fpga_ip *c)
error("Failed to intialize IP core: %s", c->name);
if (ret == 0)
c->state = IP_STATE_INITIALIZED;
c->state = STATE_INITIALIZED;
debug(8, "IP Core %s initalized (%u)", c->name, ret);

View file

@ -10,6 +10,7 @@
#include <unistd.h>
#include "log.h"
#include "utils.h"
#include "kernel/pci.h"
#include "config.h"

100
lib/log.c
View file

@ -54,6 +54,67 @@ static const char *facilities_strs[] = {
/** The current log indention level (per thread!). */
static __thread int indent = 0;
int log_init(struct log *l, int level, long facilitites)
{
/* Register this log instance globally */
log = l;
l->level = level;
l->facilities = facilitites;
debug(LOG_LOG | 5, "Log sub-system intialized: level=%d, faciltities=%#lx", level, facilitites);
l->state = STATE_INITIALIZED;
return 0;
}
int log_parse(struct log *l, config_setting_t *cfg)
{
const char *facilities;
if (!config_setting_is_group(cfg))
cerror(cfg, "Setting 'log' must be a group.");
config_setting_lookup_int(cfg, "level", &l->level);
if (config_setting_lookup_string(cfg, "facilities", &facilities))
log_set_facility_expression(l, facilities);
l->state = STATE_PARSED;
return 0;
}
int log_start(struct log *l)
{
l->epoch = time_now();
l->state = STATE_STARTED;
return 0;
}
int log_stop(struct log *l)
{
if (l->state != STATE_STARTED)
return -1;
l->state = STATE_STOPPED;
return 0;
}
int log_destroy(struct log *l)
{
if (l->state == STATE_STARTED)
return -1;
l->state = STATE_DESTROYED;
return 0;
}
int log_indent(int levels)
{
int old = indent;
@ -117,30 +178,6 @@ found: if (negate)
return l->facilities;
}
int log_init(struct log *l, int level, long facilitites)
{
l->epoch = time_now();
l->level = level;
l->facilities = facilitites;
/* Register this log instance globally */
log = l;
debug(LOG_LOG | 5, "Log sub-system intialized: level=%d, faciltities=%#lx", level, facilitites);
return 0;
}
int log_deinit(struct log *l)
{
return 0;
}
int log_destroy(struct log *l)
{
return 0;
}
void log_print(struct log *l, const char *lvl, const char *fmt, ...)
{
va_list ap;
@ -180,21 +217,6 @@ void log_vprint(struct log *l, const char *lvl, const char *fmt, va_list ap)
free(buf);
}
int log_parse(struct log *l, config_setting_t *cfg)
{
const char *facilities;
if (!config_setting_is_group(cfg))
cerror(cfg, "Setting 'log' must be a group.");
config_setting_lookup_int(cfg, "level", &l->level);
if (config_setting_lookup_string(cfg, "facilities", &facilities))
log_set_facility_expression(l, facilities);
return 0;
}
void line()
{
char buf[LOG_WIDTH];

View file

@ -14,6 +14,121 @@
#include "config.h"
#include "plugin.h"
int node_init(struct node *n)
{
if (n->state != STATE_DESTROYED)
return -1;
n->state = STATE_INITIALIZED;
return 0;
}
int node_parse(struct node *n, config_setting_t *cfg)
{
struct plugin *p;
const char *type, *name;
int ret;
name = config_setting_name(cfg);
if (!config_setting_lookup_string(cfg, "type", &type))
cerror(cfg, "Missing node type");
p = plugin_lookup(PLUGIN_TYPE_NODE, type);
assert(&p->node == n->_vt);
if (!config_setting_lookup_int(cfg, "vectorize", &n->vectorize))
n->vectorize = 1;
n->name = name;
n->cfg = cfg;
ret = n->_vt->parse ? n->_vt->parse(n, cfg) : 0;
if (ret)
cerror(cfg, "Failed to parse node '%s'", node_name(n));
n->state = STATE_PARSED;
return ret;
}
int node_check(struct node *n)
{
if (n->state != STATE_INITIALIZED || n->state != STATE_PARSED)
return -1;
if (n->vectorize <= 0)
error("Invalid `vectorize` value %d for node %s. Must be natural number!", n->vectorize, node_name(n));
if (n->_vt->vectorize && n->_vt->vectorize < n->vectorize)
error("Invalid value for `vectorize`. Node type requires a number smaller than %d!",
n->_vt->vectorize);
n->state = STATE_CHECKED;
return 0;
}
int node_start(struct node *n)
{
int ret;
if (n->state != STATE_CHECKED)
return -1;
info("Starting node %s", node_name_long(n));
{ INDENT
ret = n->_vt->start ? n->_vt->start(n) : -1;
}
if (ret == 0)
n->state = STATE_STARTED;
n->sequence = 0;
return ret;
}
int node_stop(struct node *n)
{
int ret;
if (n->state != STATE_STARTED)
return -1;
info("Stopping node %s", node_name(n));
{ INDENT
ret = n->_vt->stop ? n->_vt->stop(n) : -1;
}
if (ret == 0)
n->state = STATE_STOPPED;
return ret;
}
int node_destroy(struct node *n)
{
if (n->state == STATE_STARTED)
return -1;
if (n->_vt->destroy)
n->_vt->destroy(n);
list_remove(&n->_vt->instances, n);
if (n->_vd)
free(n->_vd);
if (n->_name)
free(n->_name);
n->state = STATE_DESTROYED;
return 0;
}
int node_read(struct node *n, struct sample *smps[], unsigned cnt)
{
int nread = 0;
@ -56,48 +171,6 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt)
return nsent;
}
int node_start(struct node *n)
{
int ret;
if (n->state != NODE_CREATED && n->state != NODE_STOPPED)
return -1;
n->state = NODE_STARTING;
info("Starting node %s", node_name_long(n));
{ INDENT
ret = n->_vt->open ? n->_vt->open(n) : -1;
}
if (ret == 0)
n->state = NODE_RUNNING;
n->sequence = 0;
return ret;
}
int node_stop(struct node *n)
{
int ret;
if (n->state != NODE_RUNNING)
return -1;
n->state = NODE_STOPPING;
info("Stopping node %s", node_name(n));
{ INDENT
ret = n->_vt->close ? n->_vt->close(n) : -1;
}
if (ret == 0)
n->state = NODE_STOPPED;
return ret;
}
char * node_name(struct node *n)
{
if (!n->_name)
@ -131,21 +204,8 @@ int node_reverse(struct node *n)
return n->_vt->reverse ? n->_vt->reverse(n) : -1;
}
int node_parse_list(struct list *list, config_setting_t *cfg, struct list *all)
{
int node_destroy(struct node *n)
{
if (n->_vt->destroy)
n->_vt->destroy(n);
list_remove(&n->_vt->instances, n);
free(n->_vd);
free(n->_name);
return 0;
}
int node_parse_list(struct list *list, config_setting_t *cfg, struct list *all) {
const char *str;
struct node *node;
@ -188,42 +248,3 @@ int node_parse_list(struct list *list, config_setting_t *cfg, struct list *all)
return list_length(list);
}
int node_parse(struct node *n, config_setting_t *cfg)
{
struct plugin *p;
const char *type, *name;
int ret;
name = config_setting_name(cfg);
if (!config_setting_lookup_string(cfg, "type", &type))
cerror(cfg, "Missing node type");
p = plugin_lookup(PLUGIN_TYPE_NODE, type);
assert(&p->node == n->_vt);
if (!config_setting_lookup_int(cfg, "vectorize", &n->vectorize))
n->vectorize = 1;
n->name = name;
n->cfg = cfg;
ret = n->_vt->parse ? n->_vt->parse(n, cfg) : 0;
if (ret)
cerror(cfg, "Failed to parse node '%s'", node_name(n));
return ret;
}
int node_check(struct node *n)
{
if (n->vectorize <= 0)
error("Invalid `vectorize` value %d for node %s. Must be natural number!", n->vectorize, node_name(n));
if (n->_vt->vectorize && n->_vt->vectorize < n->vectorize)
error("Invalid value for `vectorize`. Node type requires a number smaller than %d!",
n->_vt->vectorize);
return 0;
}

View file

@ -18,7 +18,7 @@ int node_type_start(struct node_type *vt, int argc, char *argv[], config_setting
{
int ret;
if (vt->state != NODE_TYPE_DEINITIALIZED)
if (vt->state == STATE_STARTED)
return -1;
info("Initializing " YEL("%s") " node type", plugin_name(vt));
@ -27,7 +27,7 @@ int node_type_start(struct node_type *vt, int argc, char *argv[], config_setting
}
if (ret == 0)
vt->state = NODE_TYPE_INITIALIZED;
vt->state = STATE_STARTED;
return ret;
}
@ -36,7 +36,7 @@ int node_type_stop(struct node_type *vt)
{
int ret;
if (vt->state != NODE_TYPE_INITIALIZED)
if (vt->state != STATE_STARTED)
return -1;
info("De-initializing " YEL("%s") " node type", plugin_name(vt));
@ -45,7 +45,7 @@ int node_type_stop(struct node_type *vt)
}
if (ret == 0)
vt->state = NODE_TYPE_DEINITIALIZED;
vt->state = STATE_DESTROYED;
return ret;
}

View file

@ -7,6 +7,7 @@
#include "node.h"
#include "log.h"
#include "plugin.h"
#include "utils.h"
#include "nodes/cbuilder.h"

View file

@ -150,7 +150,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
}
/* Check if node is running */
if (c->node->state != NODE_RUNNING)
if (c->node->state != STATE_STARTED)
return -1;
}
@ -193,7 +193,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
case LWS_CALLBACK_SERVER_WRITEABLE: {
w = (struct websocket *) c->node->_vd;
if (c->node && c->node->state != NODE_RUNNING)
if (c->node && c->node->state != STATE_STARTED)
return -1;
if (c->state == WEBSOCKET_SHUTDOWN) {
@ -226,7 +226,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
case LWS_CALLBACK_RECEIVE: {
w = (struct websocket *) c->node->_vd;
if (c->node->state != NODE_RUNNING)
if (c->node->state != STATE_STARTED)
return -1;
if (!lws_frame_is_binary(wsi) || len < WEBMSG_LEN(0))

View file

@ -20,6 +20,7 @@
#include "hook.h"
#include "plugin.h"
#include "cfg.h"
#include "memory.h"
static void path_read(struct path *p)
{
@ -125,63 +126,6 @@ static void * path_run(void *arg)
return NULL;
}
int path_start(struct path *p)
{
int ret;
info("Starting path: %s (#hooks=%zu)",
path_name(p), list_length(&p->hooks));
ret = hook_run(p, NULL, 0, HOOK_PATH_START);
if (ret)
return -1;
p->state = PATH_RUNNING;
return pthread_create(&p->tid, NULL, &path_run, p);
}
int path_stop(struct path *p)
{
int ret;
info("Stopping path: %s", path_name(p));
pthread_cancel(p->tid);
pthread_join(p->tid, NULL);
ret = hook_run(p, NULL, 0, HOOK_PATH_STOP);
if (ret)
return -1;
p->state = PATH_STOPPED;
return 0;
}
const char * path_name(struct path *p)
{
if (!p->_name) {
if (list_length(&p->destinations) == 1) {
struct path_destination *pd = (struct path_destination *) list_first(&p->destinations);
strcatf(&p->_name, "%s " MAG("=>") " %s",
node_name_short(p->source->node),
node_name_short(pd->node));
}
else {
strcatf(&p->_name, "%s " MAG("=>") " [", node_name_short(p->source->node));
list_foreach(struct path_destination *pd, &p->destinations)
strcatf(&p->_name, " %s", node_name_short(pd->node));
strcatf(&p->_name, " ]");
}
}
return p->_name;
}
static int path_source_destroy(struct path_source *ps)
{
pool_destroy(&ps->pool);
@ -196,41 +140,13 @@ static int path_destination_destroy(struct path_destination *pd)
return 0;
}
int path_destroy(struct path *p)
{
list_destroy(&p->hooks, (dtor_cb_t) hook_destroy, true);
list_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true);
path_source_destroy(p->source);
if (p->_name)
free(p->_name);
if (p->source)
free(p->source);
p->state = PATH_DESTROYED;
return 0;
}
int path_check(struct path *p)
{
list_foreach (struct node *n, &p->destinations) {
if (!n->_vt->write)
error("Destiation node '%s' is not supported as a sink for path '%s'", node_name(n), path_name(p));
}
if (!p->source->node->_vt->read)
error("Source node '%s' is not supported as source for path '%s'", node_name(p->source->node), path_name(p));
return 0;
}
int path_init(struct path *p, struct cfg *cfg)
{
int ret, max_queuelen = 0;
if (p->state != STATE_DESTROYED)
return -1;
/* Add internal hooks if they are not already in the list*/
list_foreach(struct plugin *pl, &plugins) {
if (pl->type == PLUGIN_TYPE_HOOK) {
@ -238,7 +154,7 @@ int path_init(struct path *p, struct cfg *cfg)
if ((h->type & HOOK_AUTO) && /* should this hook be added implicitely? */
(list_lookup(&p->hooks, pl->name) == NULL)) /* is not already in list? */
list_push(&p->hooks, memdup(h, sizeof(struct hook)));
list_push(&p->hooks, memdup(h, sizeof(h)));
}
}
@ -268,63 +184,11 @@ int path_init(struct path *p, struct cfg *cfg)
if (ret)
error("Failed to allocate memory pool for path");
p->state = PATH_INITIALIZED;
p->state = STATE_INITIALIZED;
return 0;
}
int path_uses_node(struct path *p, struct node *n) {
list_foreach(struct path_destination *pd, &p->destinations) {
if (pd->node == n)
return 0;
}
return p->source->node == n ? 0 : -1;
}
int path_reverse(struct path *p, struct path *r)
{
int ret;
if (list_length(&p->destinations) > 1)
return -1;
struct path_destination *first_pd = list_first(&p->destinations);
list_init(&r->destinations);
list_init(&r->hooks);
/* General */
r->enabled = p->enabled;
r->cfg = p->cfg;
struct path_destination *pd = alloc(sizeof(struct path_destination));
pd->node = p->source->node;
pd->queuelen = first_pd->queuelen;
list_push(&r->destinations, pd);
struct path_source *ps = alloc(sizeof(struct path_source));
ps->node = first_pd->node;
ps->samplelen = p->source->samplelen;
r->source = ps;
list_foreach(struct hook *h, &p->hooks) {
struct hook *hc = alloc(sizeof(struct hook));
ret = hook_copy(h, hc);
if (ret)
return ret;
list_push(&r->hooks, hc);
}
return 0;
}
int path_parse(struct path *p, config_setting_t *cfg, struct list *nodes)
{
config_setting_t *cfg_out, *cfg_hook;
@ -402,5 +266,149 @@ int path_parse(struct path *p, config_setting_t *cfg, struct list *nodes)
list_destroy(&destinations, NULL, false);
return 0;
}
int path_check(struct path *p)
{
list_foreach (struct node *n, &p->destinations) {
if (!n->_vt->write)
error("Destiation node '%s' is not supported as a sink for path '%s'", node_name(n), path_name(p));
}
if (!p->source->node->_vt->read)
error("Source node '%s' is not supported as source for path '%s'", node_name(p->source->node), path_name(p));
return 0;
}
int path_start(struct path *p)
{
int ret;
info("Starting path: %s (#hooks=%zu)",
path_name(p), list_length(&p->hooks));
ret = hook_run(p, NULL, 0, HOOK_PATH_START);
if (ret)
return ret;
ret = pthread_create(&p->tid, NULL, &path_run, p);
if (ret)
return ret;
p->state = STATE_STARTED;
return 0;
}
int path_stop(struct path *p)
{
int ret;
info("Stopping path: %s", path_name(p));
pthread_cancel(p->tid);
pthread_join(p->tid, NULL);
ret = hook_run(p, NULL, 0, HOOK_PATH_STOP);
if (ret)
return -1;
p->state = STATE_STOPPED;
return 0;
}
int path_destroy(struct path *p)
{
list_destroy(&p->hooks, (dtor_cb_t) hook_destroy, true);
list_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true);
path_source_destroy(p->source);
if (p->_name)
free(p->_name);
if (p->source)
free(p->source);
p->state = STATE_DESTROYED;
return 0;
}
const char * path_name(struct path *p)
{
if (!p->_name) {
if (list_length(&p->destinations) == 1) {
struct path_destination *pd = (struct path_destination *) list_first(&p->destinations);
strcatf(&p->_name, "%s " MAG("=>") " %s",
node_name_short(p->source->node),
node_name_short(pd->node));
}
else {
strcatf(&p->_name, "%s " MAG("=>") " [", node_name_short(p->source->node));
list_foreach(struct path_destination *pd, &p->destinations)
strcatf(&p->_name, " %s", node_name_short(pd->node));
strcatf(&p->_name, " ]");
}
}
return p->_name;
}
int path_uses_node(struct path *p, struct node *n) {
list_foreach(struct path_destination *pd, &p->destinations) {
if (pd->node == n)
return 0;
}
return p->source->node == n ? 0 : -1;
}
int path_reverse(struct path *p, struct path *r)
{
int ret;
if (list_length(&p->destinations) > 1)
return -1;
struct path_destination *first_pd = list_first(&p->destinations);
list_init(&r->destinations);
list_init(&r->hooks);
/* General */
r->enabled = p->enabled;
r->cfg = p->cfg;
struct path_destination *pd = alloc(sizeof(struct path_destination));
pd->node = p->source->node;
pd->queuelen = first_pd->queuelen;
list_push(&r->destinations, pd);
struct path_source *ps = alloc(sizeof(struct path_source));
ps->node = first_pd->node;
ps->samplelen = p->source->samplelen;
r->source = ps;
list_foreach(struct hook *h, &p->hooks) {
struct hook *hc = alloc(sizeof(struct hook));
ret = hook_copy(h, hc);
if (ret)
return ret;
list_push(&r->hooks, hc);
}
return 0;
}

View file

@ -16,7 +16,7 @@ int plugin_init(struct plugin *p, char *name, char *path)
p->name = strdup(name);
p->path = strdup(path);
p->state = PLUGIN_STATE_UNLOADED;
p->state = STATE_INITIALIZED;
return 0;
}
@ -27,7 +27,7 @@ int plugin_load(struct plugin *p)
if (!p->path)
return -1;
p->state = PLUGIN_STATE_LOADED;
p->state = STATE_LOADED;
return 0;
}
@ -36,21 +36,21 @@ int plugin_unload(struct plugin *p)
{
int ret;
if (p->state != PLUGIN_STATE_LOADED)
if (p->state != STATE_LOADED)
return -1;
ret = dlclose(p->handle);
if (ret)
return -1;
p->state = PLUGIN_STATE_UNLOADED;
p->state = STATE_UNLOADED;
return 0;
}
int plugin_destroy(struct plugin *p)
{
if (p->state == PLUGIN_STATE_LOADED)
if (p->state == STATE_LOADED)
plugin_unload(p);
if (p->path)

View file

@ -13,6 +13,9 @@
int pool_init(struct pool *p, size_t cnt, size_t blocksz, const struct memtype *m)
{
int ret;
if (p->state != STATE_DESTROYED)
return -1;
/* Make sure that we use a block size that is aligned to the size of a cache line */
p->alignment = kernel_get_cacheline_size();
@ -33,19 +36,23 @@ int pool_init(struct pool *p, size_t cnt, size_t blocksz, const struct memtype *
for (int i = 0; i < cnt; i++)
queue_push(&p->queue, (char *) p->buffer + i * p->blocksz);
p->state = POOL_STATE_INITIALIZED;
p->state = STATE_INITIALIZED;
return 0;
}
int pool_destroy(struct pool *p)
{
int ret;
if (p->state != STATE_INITIALIZED)
return -1;
queue_destroy(&p->queue);
if (p->state == POOL_STATE_INITIALIZED)
return memory_free(p->mem, p->buffer, p->len);
p->state = POOL_STATE_DESTROYED;
ret = memory_free(p->mem, p->buffer, p->len);
if (ret == 0)
p->state = STATE_DESTROYED;
return 0;
return ret;
}

View file

@ -33,11 +33,14 @@
#include "queue.h"
#include "utils.h"
#include "memory.h"
/** Initialize MPMC queue */
int queue_init(struct queue *q, size_t size, const struct memtype *mem)
{
if (q->state != STATE_DESTROYED)
return -1;
/* Queue size must be 2 exponent */
if (!IS_POW2(size)) {
size_t old_size = size;
@ -57,7 +60,7 @@ int queue_init(struct queue *q, size_t size, const struct memtype *mem)
atomic_store_explicit(&q->tail, 0, memory_order_relaxed);
atomic_store_explicit(&q->head, 0, memory_order_relaxed);
q->state = QUEUE_STATE_INITIALIZED;
q->state = STATE_INITIALIZED;
return 0;
}
@ -66,10 +69,13 @@ int queue_destroy(struct queue *q)
{
int ret = 0;
if (q->state == QUEUE_STATE_INITIALIZED)
ret = memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(q->buffer[0]));
if (q->state != STATE_INITIALIZED)
return -1;
ret = memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(q->buffer[0]));
q->state = QUEUE_STATE_DESTROYED;
if (ret == 0)
q->state = STATE_DESTROYED;
return ret;
}

View file

@ -11,6 +11,7 @@
#include "timing.h"
#include "path.h"
#include "sample.h"
#include "utils.h"
#include "log.h"
static struct stats_desc {

122
lib/web.c
View file

@ -17,32 +17,44 @@
#include "nodes/websocket.h"
/* Forward declarations */
lws_callback_function api_protocol_cb;
lws_callback_function api_ws_protocol_cb;
lws_callback_function api_http_protocol_cb;
lws_callback_function websocket_protocol_cb;
/** Path to the directory which should be served by build in HTTP server */
static char htdocs[PATH_MAX] = "/usr/local/share/villas/node/htdocs";
/** List of libwebsockets protocols. */
static struct lws_protocols protocols[] = {
{
.name = "http-only",
.callback = api_protocol_cb,
.name = "http-api",
.callback = api_http_protocol_cb,
.per_session_data_size = sizeof(struct api_session),
.rx_buffer_size = 0
},
{
.name = "live",
.callback = websocket_protocol_cb,
.per_session_data_size = sizeof(struct websocket_connection),
.rx_buffer_size = 0
},
{
.name = "api",
.callback = api_protocol_cb,
.callback = api_ws_protocol_cb,
.per_session_data_size = sizeof(struct api_session),
.rx_buffer_size = 0
},
#if 0 /* not supported yet */
{
.name = "log",
.callback = log_ws_protocol_cb,
.per_session_data_size = 0,
.rx_buffer_size = 0
},
{
.name = "stats",
.callback = stats_ws_protocol_cb,
.per_session_data_size = sizeof(struct api_session),
.rx_buffer_size = 0
},
#endif
{
.name = "live",
.callback = websocket_protocol_cb,
.per_session_data_size = sizeof(struct websocket_connection),
.rx_buffer_size = 0
},
{ NULL /* terminator */ }
};
@ -50,22 +62,8 @@ static struct lws_protocols protocols[] = {
static struct lws_http_mount mounts[] = {
{
.mount_next = &mounts[1],
.mountpoint = "/api/v1/",
.origin = "cmd",
.def = NULL,
.cgienv = NULL,
.cgi_timeout = 0,
.cache_max_age = 0,
.cache_reusable = 0,
.cache_revalidate = 0,
.cache_intermediaries = 0,
.origin_protocol = LWSMPRO_CALLBACK,
.mountpoint_len = 8
},
{
.mount_next = NULL,
.mountpoint = "/",
.origin = htdocs,
.origin = NULL,
.def = "/index.html",
.cgienv = NULL,
.cgi_timeout = 0,
@ -75,6 +73,20 @@ static struct lws_http_mount mounts[] = {
.cache_intermediaries = 0,
.origin_protocol = LWSMPRO_FILE,
.mountpoint_len = 1
},
{
.mount_next = NULL,
.mountpoint = "/api/v1/",
.origin = "http-api",
.def = NULL,
.cgienv = NULL,
.cgi_timeout = 0,
.cache_max_age = 0,
.cache_reusable = 0,
.cache_revalidate = 0,
.cache_intermediaries = 0,
.origin_protocol = LWSMPRO_CALLBACK,
.mountpoint_len = 8
}
};
@ -110,9 +122,15 @@ static void logger(int level, const char *msg) {
}
}
int web_service(struct web *w)
int web_init(struct web *w, struct api *a)
{
return lws_service(w->context, 10);
info("Initialize web sub-system");
w->api = a;
w->state = STATE_INITIALIZED;
return 0;
}
int web_parse(struct web *w, config_setting_t *cfg)
@ -120,23 +138,24 @@ int web_parse(struct web *w, config_setting_t *cfg)
if (!config_setting_is_group(cfg))
cerror(cfg, "Setting 'http' must be a group.");
/* Parse global config */
config_setting_lookup_string(cfg, "ssl_cert", &w->ssl_cert);
config_setting_lookup_string(cfg, "ssl_private_key", &w->ssl_private_key);
config_setting_lookup_int(cfg, "port", &w->port);
config_setting_lookup_string(cfg, "htdocs", &w->htdocs);
if (!config_setting_lookup_int(cfg, "port", &w->port))
w->port = 80;
if (!config_setting_lookup_string(cfg, "htdocs", &w->htdocs))
w->htdocs = "/usr/share/villas/htdocs";
w->state = STATE_PARSED;
return 0;
}
int web_init(struct web *w, struct api *a)
int web_start(struct web *w)
{
info("Initialize web sub-system");
w->api = a;
/** @todo this is a hack */
strncpy(htdocs, w->htdocs, sizeof(htdocs));
/* update web root of mount point */
mounts[0].origin = w->htdocs;
lws_set_log_level((1 << LLL_COUNT) - 1, logger);
@ -165,24 +184,31 @@ int web_init(struct web *w, struct api *a)
if (w->vhost == NULL)
error("WebSocket: failed to initialize server");
w->state = WEB_STATE_INITIALIZED;
w->state = STATE_STARTED;
return 0;
}
int web_stop(struct web *w)
{
lws_cancel_service(w->context);
return 0;
}
int web_destroy(struct web *w)
{
if (w->state == WEB_STATE_INITIALIZED)
lws_context_destroy(w->context);
if (w->state == STATE_STARTED)
return -1;
lws_context_destroy(w->context);
w->state = WEB_STATE_DESTROYED;
w->state = STATE_DESTROYED;
return 0;
}
int web_deinit(struct web *w)
int web_service(struct web *w)
{
lws_cancel_service(w->context);
return 0;
return lws_service(w->context, 10);
}