diff --git a/include/villas/api.h b/include/villas/api.h index 35dfcd307..6d7c6c51d 100644 --- a/include/villas/api.h +++ b/include/villas/api.h @@ -7,13 +7,13 @@ #pragma once +#include #include #include "list.h" #include "common.h" /* Forward declarations */ -enum lws_callback_reasons; struct lws; struct super_node; @@ -103,7 +103,7 @@ int api_session_destroy(struct api_session *s); int api_session_run_command(struct api_session *s, json_t *req, json_t **resp); /** Send contents of buffer over libwebsockets connection */ -int api_buffer_send(struct api_buffer *b, struct lws *w); +int api_buffer_send(struct api_buffer *b, struct lws *w, enum lws_write_protocol prot); /** Append received data to buffer. */ int api_buffer_append(struct api_buffer *b, const char *in, size_t len); diff --git a/include/villas/fpga/card.h b/include/villas/fpga/card.h index 7b2396d3f..e5a69f19d 100644 --- a/include/villas/fpga/card.h +++ b/include/villas/fpga/card.h @@ -26,7 +26,10 @@ struct fpga_card { enum state state; /**< The state of this FPGA card. */ + struct pci *pci; struct pci_device filter; /**< Filter for PCI device. */ + + struct vfio_container *vfio_container; struct vfio_device vfio_device; /**< VFIO device handle. */ int do_reset; /**< Reset VILLASfpga during startup? */ @@ -47,18 +50,29 @@ struct fpga_card { config_setting_t *cfg; }; -int fpga_card_parse(struct fpga_card *c, config_setting_t *cfg); - -void fpga_card_dump(struct fpga_card *c); - /** Initialize FPGA card and its IP components. */ int fpga_card_init(struct fpga_card *c, struct pci *pci, struct vfio_container *vc); -int fpga_card_destroy(struct fpga_card *c); +/** Parse configuration of FPGA card including IP cores from config. */ +int fpga_card_parse(struct fpga_card *c, config_setting_t *cfg); + +int fpga_card_parse_list(struct list *l, config_setting_t *cfg); /** Check if the FPGA card configuration is plausible. */ int fpga_card_check(struct fpga_card *c); +/** Start FPGA card. */ +int fpga_card_start(struct fpga_card *c); + +/** Stop FPGA card. */ +int fpga_card_stop(struct fpga_card *c); + +/** Destroy FPGA card. */ +int fpga_card_destroy(struct fpga_card *c); + +/** Dump details of FPGA card to stdout. */ +void fpga_card_dump(struct fpga_card *c); + /** Reset the FPGA to a known state */ int fpga_card_reset(struct fpga_card *c); diff --git a/include/villas/fpga/ips/dft.h b/include/villas/fpga/ips/dft.h index ca7d16b0f..5157b1c26 100644 --- a/include/villas/fpga/ips/dft.h +++ b/include/villas/fpga/ips/dft.h @@ -27,7 +27,9 @@ struct dft { int dft_parse(struct fpga_ip *c); -int dft_init(struct fpga_ip *c); +int dft_start(struct fpga_ip *c); + +int dft_stop(struct fpga_ip *c); int dft_destroy(struct fpga_ip *c); diff --git a/include/villas/fpga/ips/dma.h b/include/villas/fpga/ips/dma.h index db05a9595..2e5df2f18 100644 --- a/include/villas/fpga/ips/dma.h +++ b/include/villas/fpga/ips/dma.h @@ -67,6 +67,6 @@ int dma_simple_write_complete(struct fpga_ip *c, char **buf, size_t *len); int dma_ping_pong(struct fpga_ip *c, char *src, char *dst, size_t len); -int dma_init(struct fpga_ip *c); +int dma_start(struct fpga_ip *c); /** @} */ \ No newline at end of file diff --git a/include/villas/fpga/ips/fifo.h b/include/villas/fpga/ips/fifo.h index d685fd663..148695e1a 100644 --- a/include/villas/fpga/ips/fifo.h +++ b/include/villas/fpga/ips/fifo.h @@ -24,10 +24,10 @@ struct fifo { uint32_t baseaddr_axi4; }; -/* Forward declaration */ +/* Forward declarations */ struct ip; -int fifo_init(struct fpga_ip *c); +int fifo_start(struct fpga_ip *c); ssize_t fifo_write(struct fpga_ip *c, char *buf, size_t len); diff --git a/include/villas/fpga/ips/switch.h b/include/villas/fpga/ips/switch.h index 1172ab9cb..6a2202d94 100644 --- a/include/villas/fpga/ips/switch.h +++ b/include/villas/fpga/ips/switch.h @@ -17,7 +17,7 @@ #include "list.h" -/* Forward declaration */ +/* Forward declarations */ struct ip; struct sw_path { @@ -34,7 +34,7 @@ struct sw { struct ip; -int switch_init(struct fpga_ip *c); +int switch_start(struct fpga_ip *c); /** Initialize paths which have been parsed by switch_parse() */ int switch_init_paths(struct fpga_ip *c); diff --git a/include/villas/fpga/ips/timer.h b/include/villas/fpga/ips/timer.h index f1ae91ecb..471e1cedb 100644 --- a/include/villas/fpga/ips/timer.h +++ b/include/villas/fpga/ips/timer.h @@ -15,8 +15,13 @@ #include +/* Forward declarations */ +struct fpga_ip; + struct timer { XTmrCtr inst; }; +int timer_start(struct fpga_ip *c); + /** @} */ \ No newline at end of file diff --git a/include/villas/kernel/if.h b/include/villas/kernel/if.h index 5df083efb..46f106408 100644 --- a/include/villas/kernel/if.h +++ b/include/villas/kernel/if.h @@ -33,6 +33,7 @@ struct interface { struct rtnl_qdisc *tc_qdisc; /**< libnl3: Root priority queuing discipline (qdisc). */ char irqs[IF_IRQ_MAX]; /**< List of IRQs of the NIC. */ + int affinity; /**< IRQ / Core Affinity of this interface. */ struct list sockets; /**< Linked list of associated sockets. */ }; @@ -58,11 +59,10 @@ int if_destroy(struct interface *i); * maps interface IRQs according to affinity. * * @param i A pointer to the interface structure. - * @param affinity Set the IRQ affinity of this interface. * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int if_start(struct interface *i, int affinity); +int if_start(struct interface *i); /** Stop interface * diff --git a/include/villas/kernel/kernel.h b/include/villas/kernel/kernel.h index ee0561c3c..cdb868be3 100644 --- a/include/villas/kernel/kernel.h +++ b/include/villas/kernel/kernel.h @@ -11,8 +11,6 @@ #include -#include "config.h" - //#include /** Check if current process has capability \p cap. diff --git a/include/villas/nodes/fpga.h b/include/villas/nodes/fpga.h index e201434b7..6c15c51ce 100644 --- a/include/villas/nodes/fpga.h +++ b/include/villas/nodes/fpga.h @@ -29,15 +29,12 @@ struct fpga_ip; /** The node type */ struct fpga { struct fpga_ip *ip; + + struct pci *pci; + struct vfio_container *vfio_container; int use_irqs; - struct dma_mem dma; - - enum { - FPGA_DM_DMA, - FPGA_DM_FIFO - } type; }; /** @see node_vtable::init */ @@ -49,9 +46,6 @@ int fpga_deinit(); /** @see node_vtable::parse */ int fpga_parse(struct node *n, config_setting_t *cfg); -/** Parse the 'fpga' section in the configuration file */ -int fpga_parse_cards(config_setting_t *cfg); - struct fpga_card * fpga_lookup_card(const char *name); /** @see node_vtable::print */ diff --git a/include/villas/nodes/ngsi.h b/include/villas/nodes/ngsi.h index 45c6be49d..aedb2d1b0 100644 --- a/include/villas/nodes/ngsi.h +++ b/include/villas/nodes/ngsi.h @@ -23,7 +23,6 @@ #include #include "list.h" -#include "config.h" #include "msg.h" #include "super_node.h" #include "node.h" diff --git a/include/villas/path.h b/include/villas/path.h index 640b6e2ee..75fb6942b 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -17,18 +17,15 @@ #include #include "list.h" -#include "config.h" -#include "hist.h" -#include "node.h" -#include "msg.h" #include "queue.h" #include "pool.h" -#include "stats.h" #include "common.h" +#include "hook.h" /* Forward declarations */ +struct stats; +struct node; struct super_node; -struct hook_info; struct path_source { @@ -59,6 +56,9 @@ struct path int enabled; /**< Is this path enabled. */ int reverse; /**< This path as a matching reverse path. */ + + int samplelen; + int queuelen; pthread_t tid; /**< The thread id for this path. */ @@ -133,15 +133,4 @@ int path_uses_node(struct path *p, struct node *n); */ int path_parse(struct path *p, config_setting_t *cfg, struct list *nodes); -/** Conditionally execute the hooks - * - * @param p A pointer to the path structure. - * @param when Which type of hooks should be executed? - * @param smps An array to of (cnt) pointers to msgs. - * @param cnt The size of the sample array. - * @retval 0 All registred hooks for the specified type have been executed successfully. - * @retval <0 On of the hook functions signalized, that the processing should be aborted; message should be skipped. - */ -int path_run_hooks(struct path *p, int when, struct sample *smps[], size_t cnt); - /** @} */ \ No newline at end of file diff --git a/lib/api.c b/lib/api.c index 4722dd7db..3adb272b7 100644 --- a/lib/api.c +++ b/lib/api.c @@ -21,11 +21,11 @@ size_t json_dumpb(const json_t *json, char *buffer, size_t size, size_t flags) if (!str) return 0; - len = strlen(str) - 1; // not \0 terminated + len = strlen(str); // not \0 terminated if (buffer && len <= size) memcpy(buffer, str, len); - //free(str); + free(str); return len; } @@ -50,6 +50,8 @@ static int api_parse_request(struct api_buffer *b, json_t **req) b->len -= e.position; } + else + b->len = 0; return 1; } @@ -61,9 +63,11 @@ static int api_unparse_response(struct api_buffer *b, json_t *res) retry: len = json_dumpb(res, b->buf + b->len, b->size - b->len, 0); if (len > b->size - b->len) { b->buf = realloc(b->buf, b->len + len); - b->len += len; + b->size += len; goto retry; } + else + b->len += len; return 0; } @@ -90,8 +94,9 @@ int api_session_run_command(struct api_session *s, json_t *req, json_t **resp) "error", "command not found", "code", -2, "command", rstr); - - debug(LOG_API, "Running API request: %s with arguments: %s", p->name, json_dumps(args, 0)); + + + debug(LOG_API, "Running API request: %s", p->name); ret = p->api.cb(&p->api, args, resp, s); if (ret) @@ -99,7 +104,7 @@ int api_session_run_command(struct api_session *s, json_t *req, json_t **resp) "error", "command failed", "code", ret); - debug(LOG_API, "API request completed with code: %d and output: %s", ret, json_dumps(*resp, 0)); + debug(LOG_API, "API request completed with code: %d", ret); return 0; } @@ -150,10 +155,10 @@ int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void /* Prepare HTTP response header */ const char headers[] = "HTTP/1.1 200 OK\r\n" "Content-type: application/json\r\n" - "User-agent: " USER_AGENT + "User-agent: " USER_AGENT "\r\n" "\r\n"; - api_buffer_append(&s->response.headers, headers, sizeof(headers)); + api_buffer_append(&s->response.headers, headers, sizeof(headers)-1); /* book us a LWS_CALLBACK_HTTP_WRITEABLE callback */ lws_callback_on_writable(wsi); @@ -168,12 +173,8 @@ int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void json_t *req, *resp; while (api_parse_request(&s->request.body, &req) == 1) { - api_session_run_command(s, req, &resp); - api_unparse_response(&s->response.body, resp); - - debug(LOG_WEB, "Sending response: %s len=%zu", s->response.body.buf, s->response.body.len); lws_callback_on_writable(wsi); } @@ -189,9 +190,9 @@ int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void case LWS_CALLBACK_HTTP_WRITEABLE: /* We send headers only in HTTP mode */ if (s->mode == API_MODE_HTTP) - api_buffer_send(&s->response.headers, wsi); + api_buffer_send(&s->response.headers, wsi, LWS_WRITE_HTTP_HEADERS); - api_buffer_send(&s->response.body, wsi); + api_buffer_send(&s->response.body, wsi, LWS_WRITE_HTTP); if (s->completed && s->response.body.len == 0) return -1; @@ -205,14 +206,14 @@ int api_http_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void return 0; } -int api_buffer_send(struct api_buffer *b, struct lws *w) +int api_buffer_send(struct api_buffer *b, struct lws *w, enum lws_write_protocol prot) { int sent; if (b->len <= 0) return 0; - - sent = lws_write(w, (unsigned char *) b->buf, b->len, LWS_WRITE_HTTP_HEADERS); + + sent = lws_write(w, (unsigned char *) b->buf, b->len, prot); if (sent > 0) { memmove(b->buf, b->buf + sent, sent); b->len -= sent; @@ -284,7 +285,11 @@ int api_session_init(struct api_session *s, struct api *a, enum api_mode m) s->request.body = s->response.body = - s->response.headers = (struct api_buffer) { 0 }; + s->response.headers = (struct api_buffer) { + .buf = NULL, + .size = 0, + .len = 0 + }; return 0; } diff --git a/lib/apis/config.c b/lib/apis/config.c index ed1a78870..4c054e28b 100644 --- a/lib/apis/config.c +++ b/lib/apis/config.c @@ -16,8 +16,17 @@ static int api_config(struct api_ressource *h, json_t *args, json_t **resp, stru { config_setting_t *cfg_root = config_root_setting(&s->api->super_node->cfg); - *resp = config_to_json(cfg_root); - + if (cfg_root) { + *resp = json_pack("{ s: o }", + "config", config_to_json(cfg_root)); + } + /* No configuration has been loaded yet. */ + else { + *resp = json_pack("{ s: s, s: i }", + "error", "no configuration loaded", + "code", 2); + } + return 0; } diff --git a/lib/hist.c b/lib/hist.c index 1f28bf721..eec4fcf53 100644 --- a/lib/hist.c +++ b/lib/hist.c @@ -13,6 +13,7 @@ #include "utils.h" #include "hist.h" +#include "config.h" #define VAL(h, i) ((h)->low + (i) * (h)->resolution) #define INDEX(h, v) round((v - (h)->low) / (h)->resolution) diff --git a/lib/hooks/convert.c b/lib/hooks/convert.c index c801d1105..f0d5c589f 100644 --- a/lib/hooks/convert.c +++ b/lib/hooks/convert.c @@ -16,17 +16,34 @@ struct convert { TO_FIXED, TO_FLOAT } mode; + + double scale; + + long long mask; }; +static int convert_init(struct hook *h) +{ + struct convert *p = h->_vd; + + p->scale = 1; + p->mask = -1; + + return 0; +} + static int convert_parse(struct hook *h, config_setting_t *cfg) { struct convert *p = h->_vd; const char *mode; + config_setting_lookup_float(cfg, "scale", &p->scale); + config_setting_lookup_int64(cfg, "mask", &p->mask); + if (!config_setting_lookup_string(cfg, "mode", &mode)) cerror(cfg, "Missing setting 'mode' for hook '%s'", plugin_name(h->_vt)); - + if (!strcmp(mode, "fixed")) p->mode = TO_FIXED; else if (!strcmp(mode, "float")) @@ -43,9 +60,20 @@ static int convert_read(struct hook *h, struct sample *smps[], size_t *cnt) for (int i = 0; i < *cnt; i++) { for (int k = 0; k < smps[i]->length; k++) { + + /* Only convert values which are not masked */ + if ((k < sizeof(p->mask) * 8) && !(p->mask & (1LL << k))) + continue; + switch (p->mode) { - case TO_FIXED: smps[i]->data[k].i = smps[i]->data[k].f * 1e3; break; - case TO_FLOAT: smps[i]->data[k].f = smps[i]->data[k].i; break; + case TO_FIXED: + smps[i]->data[k].i = smps[i]->data[k].f * p->scale; + sample_set_data_format(smps[i], k, SAMPLE_DATA_FORMAT_INT); + break; + case TO_FLOAT: + smps[i]->data[k].f = smps[i]->data[k].i * p->scale; + sample_set_data_format(smps[i], k, SAMPLE_DATA_FORMAT_FLOAT); + break; } } } @@ -59,6 +87,7 @@ static struct plugin p = { .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 99, + .init = convert_init, .parse = convert_parse, .read = convert_read, .size = sizeof(struct convert) diff --git a/lib/hooks/map.c b/lib/hooks/map.c index 91af0d474..78375a161 100644 --- a/lib/hooks/map.c +++ b/lib/hooks/map.c @@ -36,15 +36,12 @@ static int map_destroy(struct hook *h) static int map_parse(struct hook *h, config_setting_t *cfg) { - struct map *p = h->_vd; - int ret; - + struct map *p = h->_vd; config_setting_t *cfg_mapping; cfg_mapping = config_setting_lookup(cfg, "mapping"); - - if (!config_setting_is_array(cfg_mapping)) + if (!cfg_mapping || !config_setting_is_array(cfg_mapping)) return -1; ret = mapping_parse(&p->mapping, cfg_mapping); @@ -80,7 +77,7 @@ static int map_read(struct hook *h, struct sample *smps[], size_t *cnt) static struct plugin p = { .name = "map", - .description = "Remap values and / or add data generated by this instace.", + .description = "Remap values and / or add header, timestamp values to the sample", .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 99, diff --git a/lib/hooks/print.c b/lib/hooks/print.c index a4eec0f1c..ca67a1f4b 100644 --- a/lib/hooks/print.c +++ b/lib/hooks/print.c @@ -11,6 +11,7 @@ #include "hook.h" #include "plugin.h" #include "sample.h" +#include "sample_io.h" struct print { FILE *output; @@ -64,7 +65,7 @@ static int print_read(struct hook *h, struct sample *smps[], size_t *cnt) struct print *p = h->_vd; for (int i = 0; i < *cnt; i++) - sample_fprint(p->output, smps[i], SAMPLE_ALL); + sample_io_villas_fprint(p->output, smps[i], SAMPLE_IO_ALL); return 0; } diff --git a/lib/hooks/skip_first.c b/lib/hooks/skip_first.c index 860acb8a7..e78fa3a60 100644 --- a/lib/hooks/skip_first.c +++ b/lib/hooks/skip_first.c @@ -68,6 +68,7 @@ static int skip_first_read(struct hook *h, struct sample *smps[], size_t *cnt) { struct skip_first *p = h->_vd; + /* Remember sequence no or timestamp of first sample. */ if (p->state == HOOK_SKIP_FIRST_STATE_STARTED) { switch (p->mode) { case HOOK_SKIP_MODE_SAMPLES: @@ -75,7 +76,7 @@ static int skip_first_read(struct hook *h, struct sample *smps[], size_t *cnt) break; case HOOK_SKIP_MODE_SECONDS: - p->seconds.until = time_add(&smps[0]->ts.received, &p->seconds.wait); + p->seconds.until = time_add(&smps[0]->ts.origin, &p->seconds.wait); break; } @@ -87,14 +88,15 @@ static int skip_first_read(struct hook *h, struct sample *smps[], size_t *cnt) bool skip; switch (p->mode) { case HOOK_SKIP_MODE_SAMPLES: - skip = p->samples.until >= smps[i]->sequence; + skip = p->samples.until > smps[i]->sequence; break; case HOOK_SKIP_MODE_SECONDS: - skip = time_delta(&p->seconds.until, &smps[i]->ts.received) < 0; + skip = time_delta(&p->seconds.until, &smps[i]->ts.origin) < 0; break; default: skip = false; + break; } if (!skip) { diff --git a/lib/hooks/stats.c b/lib/hooks/stats_collect.c similarity index 55% rename from lib/hooks/stats.c rename to lib/hooks/stats_collect.c index 4de4ebf8a..34a4373b1 100644 --- a/lib/hooks/stats.c +++ b/lib/hooks/stats_collect.c @@ -128,103 +128,7 @@ static int stats_collect_read(struct hook *h, struct sample *smps[], size_t *cnt return 0; } -struct stats_send { - struct node *dest; - - enum { - STATS_SEND_MODE_PERIODIC, - STATS_SEND_MODE_READ - } mode; - - int decimation; -}; - -static int stats_send_init(struct hook *h) -{ - struct stats_send *p = h->_vd; - - p->decimation = 1; - p->mode = STATS_SEND_MODE_PERIODIC; - - return 0; -} - -static int stats_send_parse(struct hook *h, config_setting_t *cfg) -{ - struct stats_send *p = h->_vd; - - assert(h->path && h->path->super_node); - - const char *dest, *mode; - - if (config_setting_lookup_string(cfg, "destination", &dest)) { - p->dest = list_lookup(&h->path->super_node->nodes, dest); - if (!p->dest) - cerror(cfg, "Invalid destination node '%s' for hook '%s'", dest, plugin_name(h->_vt)); - } - else - cerror(cfg, "Missing setting 'destination' for hook '%s'", plugin_name(h->_vt)); - - if (config_setting_lookup_string(cfg, "destination", &mode)) { - if (!strcmp(mode, "periodic")) - p->mode = STATS_SEND_MODE_PERIODIC; - else if (!strcmp(mode, "read")) - p->mode = STATS_SEND_MODE_READ; - else - cerror(cfg, "Invalid value '%s' for setting 'mode' of hook '%s'", mode, plugin_name(h->_vt)); - } - - config_setting_lookup_int(cfg, "decimation", &p->decimation); - - return 0; -} - -static int stats_send_start(struct hook *h) -{ - struct stats_send *p = h->_vd; - - if (p->dest->state != STATE_STOPPED) - node_start(p->dest); - - return 0; -} - -static int stats_send_stop(struct hook *h) -{ - struct stats_send *p = h->_vd; - - if (p->dest->state != STATE_STOPPED) - node_stop(p->dest); - - return 0; -} - -static int stats_send_periodic(struct hook *h) -{ - struct stats_send *p = h->_vd; - - if (p->mode == STATS_SEND_MODE_PERIODIC) - stats_send(h->path->stats, p->dest); - - return 0; -} - -static int stats_send_read(struct hook *h, struct sample *smps[], size_t *cnt) -{ - struct stats_send *p = h->_vd; - - assert(h->path->stats); - - if (p->mode == STATS_SEND_MODE_READ) { - size_t processed = h->path->stats->histograms[STATS_OWD].total; - if (processed % p->decimation == 0) - stats_send(h->path->stats, p->dest); - } - - return 0; -} - -static struct plugin p1 = { +static struct plugin p = { .name = "stats", .description = "Collect statistics for the current path", .type = PLUGIN_TYPE_HOOK, @@ -242,23 +146,6 @@ static struct plugin p1 = { } }; -static struct plugin p2 = { - .name = "stats_send", - .description = "Send path statistics to another node", - .type = PLUGIN_TYPE_HOOK, - .hook = { - .priority = 99, - .init = stats_send_init, - .parse = stats_send_parse, - .start = stats_send_start, - .stop = stats_send_stop, - .periodic= stats_send_periodic, - .read = stats_send_read, - .size = sizeof(struct stats_send) - } -}; - -REGISTER_PLUGIN(&p1) -REGISTER_PLUGIN(&p2) +REGISTER_PLUGIN(&p) /** @} */ \ No newline at end of file diff --git a/lib/hooks/stats_send.c b/lib/hooks/stats_send.c new file mode 100644 index 000000000..982515b43 --- /dev/null +++ b/lib/hooks/stats_send.c @@ -0,0 +1,130 @@ +/** Sending statistics to another node. + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + *********************************************************************************/ + +/** @addtogroup hooks Hook functions + * @{ + */ + +#include "hook.h" +#include "plugin.h" +#include "stats.h" +#include "path.h" + +struct stats_send { + struct node *dest; + + enum { + STATS_SEND_MODE_PERIODIC, + STATS_SEND_MODE_READ + } mode; + + int decimation; +}; + +static int stats_send_init(struct hook *h) +{ + struct stats_send *p = h->_vd; + + p->decimation = 1; + p->mode = STATS_SEND_MODE_PERIODIC; + + return 0; +} + +static int stats_send_parse(struct hook *h, config_setting_t *cfg) +{ + struct stats_send *p = h->_vd; + + assert(h->path && h->path->super_node); + + const char *dest, *mode; + + if (config_setting_lookup_string(cfg, "destination", &dest)) { + p->dest = list_lookup(&h->path->super_node->nodes, dest); + if (!p->dest) + cerror(cfg, "Invalid destination node '%s' for hook '%s'", dest, plugin_name(h->_vt)); + } + else + cerror(cfg, "Missing setting 'destination' for hook '%s'", plugin_name(h->_vt)); + + if (config_setting_lookup_string(cfg, "destination", &mode)) { + if (!strcmp(mode, "periodic")) + p->mode = STATS_SEND_MODE_PERIODIC; + else if (!strcmp(mode, "read")) + p->mode = STATS_SEND_MODE_READ; + else + cerror(cfg, "Invalid value '%s' for setting 'mode' of hook '%s'", mode, plugin_name(h->_vt)); + } + + config_setting_lookup_int(cfg, "decimation", &p->decimation); + + return 0; +} + +static int stats_send_start(struct hook *h) +{ + struct stats_send *p = h->_vd; + + if (p->dest->state != STATE_STOPPED) + node_start(p->dest); + + return 0; +} + +static int stats_send_stop(struct hook *h) +{ + struct stats_send *p = h->_vd; + + if (p->dest->state != STATE_STOPPED) + node_stop(p->dest); + + return 0; +} + +static int stats_send_periodic(struct hook *h) +{ + struct stats_send *p = h->_vd; + + if (p->mode == STATS_SEND_MODE_PERIODIC) + stats_send(h->path->stats, p->dest); + + return 0; +} + +static int stats_send_read(struct hook *h, struct sample *smps[], size_t *cnt) +{ + struct stats_send *p = h->_vd; + + assert(h->path->stats); + + if (p->mode == STATS_SEND_MODE_READ) { + size_t processed = h->path->stats->histograms[STATS_OWD].total; + if (processed % p->decimation == 0) + stats_send(h->path->stats, p->dest); + } + + return 0; +} + +static struct plugin p = { + .name = "stats_send", + .description = "Send path statistics to another node", + .type = PLUGIN_TYPE_HOOK, + .hook = { + .priority = 99, + .init = stats_send_init, + .parse = stats_send_parse, + .start = stats_send_start, + .stop = stats_send_stop, + .periodic= stats_send_periodic, + .read = stats_send_read, + .size = sizeof(struct stats_send) + } +}; + +REGISTER_PLUGIN(&p) + +/** @} */ \ No newline at end of file diff --git a/lib/kernel/if.c b/lib/kernel/if.c index 785596739..1d1622608 100644 --- a/lib/kernel/if.c +++ b/lib/kernel/if.c @@ -49,13 +49,13 @@ int if_destroy(struct interface *i) return 0; } -int if_start(struct interface *i, int affinity) +int if_start(struct interface *i) { info("Starting interface '%s' which is used by %zu sockets", rtnl_link_get_name(i->nl_link), list_length(&i->sockets)); { INDENT /* Set affinity for network interfaces (skip _loopback_ dev) */ - if_set_affinity(i, affinity); + //if_set_affinity(i, i->affinity); /* Assign fwmark's to socket nodes which have netem options */ int ret, mark = 0; diff --git a/lib/log.c b/lib/log.c index 7c670447b..8336a07a7 100644 --- a/lib/log.c +++ b/lib/log.c @@ -25,6 +25,14 @@ /** The global log instance. */ static struct log *log; +__attribute__((constructor)) static void setup_default_logger() +{ + static struct log l; + + log_init(&l, V, LOG_ALL); + log_start(&l); +} + /** List of debug facilities as strings */ static const char *facilities_strs[] = { "pool", /* LOG_POOL */ @@ -63,8 +71,8 @@ int log_init(struct log *l, int level, long facilitites) l->level = level; l->facilities = facilitites; - - debug(LOG_LOG | 5, "Log sub-system intialized: level=%d, faciltities=%#lx", level, facilitites); + l->file = stderr; + l->path = NULL; l->state = STATE_INITIALIZED; @@ -100,6 +108,8 @@ int log_start(struct log *l) } l->state = STATE_STARTED; + + debug(LOG_LOG | 5, "Log sub-system started: level=%d, faciltities=%#lx, path=%s", l->level, l->facilities, l->path); return 0; } @@ -242,7 +252,7 @@ void debug(long class, const char *fmt, ...) int lvl = class & 0xFF; int fac = class & ~0xFF; - assert(log != NULL); + assert(log && log->state != STATE_DESTROYED); if (((fac == 0) || (fac & log->facilities)) && (lvl <= log->level)) { va_start(ap, fmt); @@ -255,7 +265,7 @@ void info(const char *fmt, ...) { va_list ap; - assert(log != NULL); + assert(log && log->state != STATE_DESTROYED); va_start(ap, fmt); log_vprint(log, LOG_LVL_INFO, fmt, ap); @@ -266,7 +276,7 @@ void warn(const char *fmt, ...) { va_list ap; - assert(log != NULL); + assert(log && log->state != STATE_DESTROYED); va_start(ap, fmt); log_vprint(log, LOG_LVL_WARN, fmt, ap); @@ -277,7 +287,7 @@ void stats(const char *fmt, ...) { va_list ap; - assert(log != NULL); + assert(log && log->state != STATE_DESTROYED); va_start(ap, fmt); log_vprint(log, LOG_LVL_STATS, fmt, ap); @@ -288,7 +298,7 @@ void error(const char *fmt, ...) { va_list ap; - assert(log != NULL); + assert(log && log->state != STATE_DESTROYED); va_start(ap, fmt); log_vprint(log, LOG_LVL_ERROR, fmt, ap); @@ -302,7 +312,7 @@ void serror(const char *fmt, ...) va_list ap; char *buf = NULL; - assert(log != NULL); + assert(log && log->state != STATE_DESTROYED); va_start(ap, fmt); vstrcatf(&buf, fmt, ap); @@ -321,7 +331,7 @@ void cerror(config_setting_t *cfg, const char *fmt, ...) const char *file; int line; - assert(log != NULL); + assert(log && log->state != STATE_DESTROYED); va_start(ap, fmt); vstrcatf(&buf, fmt, ap); diff --git a/lib/node.c b/lib/node.c index 46396daaa..2cd76bdf5 100644 --- a/lib/node.c +++ b/lib/node.c @@ -98,7 +98,8 @@ int node_stop(struct node *n) { int ret; - assert(n->state == STATE_STARTED); + if (n->state != STATE_STARTED) + return 0; info("Stopping node %s", node_name(n)); { INDENT @@ -248,5 +249,5 @@ int node_parse_list(struct list *list, config_setting_t *cfg, struct list *all) cerror(cfg, "Invalid output node(s)"); } - return list_length(list); + return 0; } diff --git a/lib/nodes/fpga.c b/lib/nodes/fpga.c index 4d8b984e0..5065814f1 100644 --- a/lib/nodes/fpga.c +++ b/lib/nodes/fpga.c @@ -30,36 +30,10 @@ void fpga_dump(struct fpga *f) fpga_card_dump(c); } -int fpga_parse_cards(config_setting_t *cfg) -{ - int ret; - config_setting_t *cfg_fpgas; - - cfg_fpgas = config_setting_get_member(cfg, "fpgas"); - if (!cfg_fpgas) - cerror(cfg, "Config file is missing 'fpgas' section"); - - if (!config_setting_is_group(cfg_fpgas)) - cerror(cfg_fpgas, "FPGA configuration section must be a group"); - - for (int i = 0; i < config_setting_length(cfg_fpgas); i++) { - config_setting_t *cfg_fpga = config_setting_get_elem(cfg_fpgas, i); - - struct fpga_card *c = alloc(sizeof(struct fpga_card)); - - ret = fpga_card_parse(c, cfg_fpga); - if (ret) - cerror(cfg_fpga, "Failed to parse FPGA card configuration"); - - list_push(&cards, c); - } - - return 0; -} - int fpga_init(int argc, char *argv[], config_setting_t *cfg) { int ret; + config_setting_t *cfg_fpgas; ret = pci_init(&pci); if (ret) @@ -70,7 +44,11 @@ int fpga_init(int argc, char *argv[], config_setting_t *cfg) cerror(cfg, "Failed to initiliaze VFIO sub-system"); /* Parse FPGA configuration */ - ret = fpga_parse_cards(cfg); + cfg_fpgas = config_setting_lookup(cfg, "fpgas"); + if (!cfg_fpgas) + cerror(cfg, "Config file is missing 'fpgas' section"); + + ret = fpga_card_parse_list(&cards, cfg_fpgas); if (ret) cerror(cfg, "Failed to parse VILLASfpga config"); @@ -123,7 +101,7 @@ int fpga_parse(struct node *n, config_setting_t *cfg) ip = list_lookup(&card->ips, ip_name); if (!ip) cerror(cfg, "There is no datamover named '%s' on the FPGA card '%s'", ip_name, card_name); - if (!ip->_vt->type != FPGA_IP_TYPE_DATAMOVER) + if (ip->_vt->type != FPGA_IP_TYPE_DM_DMA && ip->_vt->type != FPGA_IP_TYPE_DM_FIFO) cerror(cfg, "The IP '%s' on FPGA card '%s' is not a datamover", ip_name, card_name); free(cpy); @@ -148,16 +126,6 @@ char * fpga_print(struct node *n) return strf("dm=%s", f->ip->name); } -int fpga_get_type(struct fpga_ip *c) -{ - if (!fpga_vlnv_cmp(&c->vlnv, &(struct fpga_vlnv) { "xilinx.com", "ip", "axi_dma", NULL })) - return FPGA_DM_DMA; - else if (!fpga_vlnv_cmp(&c->vlnv, &(struct fpga_vlnv) { "xilinx.com", "ip", "axi_fifo_mm_s", NULL })) - return FPGA_DM_FIFO; - else - return -1; -} - int fpga_start(struct node *n) { int ret; @@ -165,14 +133,14 @@ int fpga_start(struct node *n) struct fpga *f = n->_vd; struct fpga_card *c = f->ip->card; - fpga_card_init(c, &pci, &vc); + fpga_card_init(c, f->pci, f->vfio_container); int flags = 0; if (!f->use_irqs) flags |= INTC_POLLING; - switch (f->type) { - case FPGA_DM_DMA: + switch (f->ip->_vt->type) { + case FPGA_IP_TYPE_DM_DMA: /* Map DMA accessible memory */ ret = dma_alloc(f->ip, &f->dma, 0x1000, 0); if (ret) @@ -182,9 +150,11 @@ int fpga_start(struct node *n) intc_enable(c->intc, (1 << (f->ip->irq + 1)), flags); /* S2MM */ break; - case FPGA_DM_FIFO: + case FPGA_IP_TYPE_DM_FIFO: intc_enable(c->intc, (1 << f->ip->irq), flags); /* MM2S & S2MM */ break; + + default: { } } @@ -198,8 +168,8 @@ int fpga_stop(struct node *n) struct fpga *f = n->_vd; struct fpga_card *c = f->ip->card; - switch (f->type) { - case FPGA_DM_DMA: + switch (f->ip->_vt->type) { + case FPGA_IP_TYPE_DM_DMA: intc_disable(c->intc, f->ip->irq); /* MM2S */ intc_disable(c->intc, f->ip->irq + 1); /* S2MM */ @@ -207,9 +177,11 @@ int fpga_stop(struct node *n) if (ret) return ret; - case FPGA_DM_FIFO: + case FPGA_IP_TYPE_DM_FIFO: if (f->use_irqs) intc_disable(c->intc, f->ip->irq); /* MM2S & S2MM */ + + default: { } } return 0; @@ -231,8 +203,8 @@ int fpga_read(struct node *n, struct sample *smps[], unsigned cnt) smp->ts.origin = time_now(); /* Read data from RTDS */ - switch (f->type) { - case FPGA_DM_DMA: + switch (f->ip->_vt->type) { + case FPGA_IP_TYPE_DM_DMA: ret = dma_read(f->ip, f->dma.base_phys + 0x800, len); if (ret) return ret; @@ -245,11 +217,13 @@ int fpga_read(struct node *n, struct sample *smps[], unsigned cnt) smp->length = recvlen / 4; return 1; - case FPGA_DM_FIFO: + case FPGA_IP_TYPE_DM_FIFO: recvlen = fifo_read(f->ip, (char *) smp->data, len); smp->length = recvlen / 4; return 1; + + default: { } } return -1; @@ -265,8 +239,8 @@ int fpga_write(struct node *n, struct sample *smps[], unsigned cnt) size_t len = smp->length * sizeof(smp->data[0]); /* Send data to RTDS */ - switch (f->type) { - case FPGA_DM_DMA: + switch (f->ip->_vt->type) { + case FPGA_IP_TYPE_DM_DMA: memcpy(f->dma.base_virt, smp->data, len); ret = dma_write(f->ip, f->dma.base_phys, len); @@ -280,10 +254,12 @@ int fpga_write(struct node *n, struct sample *smps[], unsigned cnt) //info("Sent %u bytes to FPGA", sentlen); return 1; - case FPGA_DM_FIFO: + case FPGA_IP_TYPE_DM_FIFO: sentlen = fifo_write(f->ip, (char *) smp->data, len); return sentlen / sizeof(smp->data[0]); break; + + default: { } } return -1; diff --git a/lib/nodes/ngsi.c b/lib/nodes/ngsi.c index 9fb660c30..8dea91e6b 100644 --- a/lib/nodes/ngsi.c +++ b/lib/nodes/ngsi.c @@ -17,6 +17,7 @@ #include "utils.h" #include "timing.h" #include "plugin.h" +#include "config.h" /* Some global settings */ static char *name = NULL; diff --git a/lib/nodes/opal.c b/lib/nodes/opal.c index 202198d4a..3672b4dca 100644 --- a/lib/nodes/opal.c +++ b/lib/nodes/opal.c @@ -72,7 +72,7 @@ int opal_init(int argc, char *argv[], config_setting_t *cfg) info("Started as OPAL Asynchronous process"); info("This is VILLASnode %s (built on %s, %s)", - VERSION_STR, __DATE__, __TIME__); + BUILDID, __DATE__, __TIME__); opal_print_global(); diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 87e7464db..678bc1be4 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -38,6 +38,8 @@ struct list interfaces; int socket_init(int argc, char *argv[], config_setting_t *cfg) { + int ret; + if (getuid() != 0) error("The 'socket' node-type requires super-user privileges!"); @@ -51,7 +53,8 @@ int socket_init(int argc, char *argv[], config_setting_t *cfg) struct rtnl_link *link; /* Determine outgoing interface */ - if (if_get_egress((struct sockaddr *) &s->remote, &link)) { + ret = if_get_egress((struct sockaddr *) &s->remote, &link); + if (ret) { char *buf = socket_print_addr((struct sockaddr *) &s->remote); error("Failed to get interface for socket address '%s'", buf); free(buf); @@ -70,7 +73,9 @@ int socket_init(int argc, char *argv[], config_setting_t *cfg) /* If not found, create a new interface */ struct interface j; - if_init(&j, link); + ret = if_init(&j, link); + if (ret) + continue; list_push(&interfaces, memdup(&j, sizeof(j))); i = &j; @@ -78,15 +83,10 @@ int socket_init(int argc, char *argv[], config_setting_t *cfg) found: list_push(&i->sockets, s); } - /** @todo Improve mapping of NIC IRQs per path */ - int affinity; - if (!config_setting_lookup_int(cfg, "affinity", &affinity)) - affinity = -1; - - for (size_t j = 0; list_length(&interfaces); j++) { + for (size_t j = 0; j < list_length(&interfaces); j++) { struct interface *i = list_at(&interfaces, j); - if_start(i, affinity); + if_start(i); } return 0; diff --git a/lib/path.c b/lib/path.c index 75ccc8f98..53065fc97 100644 --- a/lib/path.c +++ b/lib/path.c @@ -21,6 +21,7 @@ #include "plugin.h" #include "super_node.h" #include "memory.h" +#include "stats.h" static void path_read(struct path *p) { @@ -50,7 +51,7 @@ static void path_read(struct path *p) debug(LOG_PATH | 15, "Received %u messages from node %s", recv, node_name(ps->node)); /* Run preprocessing hooks for vector of samples */ - enqueue = path_run_hooks(p, HOOK_READ, smps, recv); + enqueue = hook_read_list(&p->hooks, smps, recv); if (enqueue != recv) { info("Hooks skipped %u out of %u samples for path %s", recv - enqueue, recv, path_name(p)); @@ -94,7 +95,7 @@ static void path_write(struct path *p) debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p)); - tosend = path_run_hooks(p, HOOK_WRITE, smps, available); + tosend = hook_write_list(&p->hooks, smps, available); if (tosend == 0) continue; @@ -269,11 +270,13 @@ int path_init2(struct path *p) struct plugin *q = list_at(&plugins, i); if (q->type == PLUGIN_TYPE_HOOK) { - struct hook h; + struct hook h = { .state = STATE_DESTROYED }; struct hook_type *vt = &q->hook; - if (vt->when & HOOK_AUTO) { - hook_init(&h, vt, p->super_node); + if (vt->builtin) { + ret = hook_init(&h, vt, p); + if (ret) + return ret; list_push(&p->hooks, memdup(&h, sizeof(h))); } @@ -314,6 +317,7 @@ int path_start(struct path *p) for (size_t i = 0; i < list_length(&p->hooks); i++) { struct hook *h = list_at(&p->hooks, i); + ret = hook_start(h); if (ret) return ret; } @@ -341,7 +345,7 @@ int path_stop(struct path *p) for (size_t i = 0; i < list_length(&p->hooks); i++) { struct hook *h = list_at(&p->hooks, i); - ret = hook_run(h, HOOK_STOP, NULL, 0); + ret = hook_stop(h); if (ret) return ret; } @@ -445,27 +449,4 @@ int path_reverse(struct path *p, struct path *r) } return 0; -} - -int path_run_hooks(struct path *p, int when, struct sample *smps[], size_t cnt) -{ - int ret = 0; - - struct hook_info i = { - .samples = smps, - .count = cnt - }; - - if (ret) - break; - - if (i.count == 0) - for (size_t i = 0; i < list_length(&p->hooks); i++) { - struct hook *h = list_at(&p->hooks, i); - - ret = hook_run(h, when, smps, &cnt); - break; - } - - return i.count; } \ No newline at end of file diff --git a/lib/super_node.c b/lib/super_node.c index ee9546a4d..3d4e4d037 100644 --- a/lib/super_node.c +++ b/lib/super_node.c @@ -21,6 +21,7 @@ #include "api.h" #include "plugin.h" #include "memory.h" +#include "config.h" #include "kernel/rt.h" @@ -80,9 +81,11 @@ int super_node_parse_uri(struct super_node *sn, const char *uri) config_setting_t *cfg_root; /* Via stdin */ - if (strcmp("-", uri) == 0) { + if (!strcmp("-", uri)) { af = NULL; f = stdin; + + info("Reading config from stdin"); } /* Local file? */ else if (access(uri, F_OK) != -1) { @@ -126,7 +129,7 @@ int super_node_parse_uri(struct super_node *sn, const char *uri) /* Close configuration file */ if (af) afclose(af); - else + else if (f != stdin) fclose(f); return super_node_parse(sn, cfg_root); @@ -272,6 +275,8 @@ int super_node_parse(struct super_node *sn, config_setting_t *cfg) int super_node_check(struct super_node *sn) { int ret; + + assert(sn->state != STATE_DESTROYED); for (size_t i = 0; i < list_length(&sn->nodes); i++) { struct node *n = list_at(&sn->nodes, i); @@ -301,6 +306,7 @@ int super_node_start(struct super_node *sn) memory_init(sn->hugepages); rt_init(sn->priority, sn->affinity); + log_start(&sn->log); api_start(&sn->api); web_start(&sn->web); @@ -308,7 +314,7 @@ int super_node_start(struct super_node *sn) for (size_t i = 0; i < list_length(&sn->nodes); i++) { INDENT struct node *n = list_at(&sn->nodes, i); - node_type_start(n->_vt, sn); + node_type_start(n->_vt, sn->cli.argc, sn->cli.argv, config_root_setting(&sn->cfg)); } info("Starting nodes"); @@ -327,7 +333,7 @@ int super_node_start(struct super_node *sn) struct path *p = list_at(&sn->paths, i); if (p->enabled) { - path_init(p, sn); + path_init2(p); path_start(p); } else diff --git a/lib/web.c b/lib/web.c index 484bd736d..d4f16e7a4 100644 --- a/lib/web.c +++ b/lib/web.c @@ -128,8 +128,12 @@ int web_init(struct web *w, struct api *a) w->api = a; - w->state = STATE_INITIALIZED; + /* Default values */ + w->port = 80; + w->htdocs = WEB_PATH; + w->state = STATE_INITIALIZED; + return 0; } @@ -140,12 +144,8 @@ int web_parse(struct web *w, config_setting_t *cfg) config_setting_lookup_string(cfg, "ssl_cert", &w->ssl_cert); config_setting_lookup_string(cfg, "ssl_private_key", &w->ssl_private_key); - - if (!config_setting_lookup_int(cfg, "port", &w->port)) - w->port = 80; - - if (!config_setting_lookup_string(cfg, "htdocs", &w->htdocs)) - w->htdocs = "/usr/share/villas/htdocs"; + config_setting_lookup_int(cfg, "port", &w->port); + config_setting_lookup_string(cfg, "htdocs", &w->htdocs); w->state = STATE_PARSED; @@ -171,7 +171,7 @@ int web_start(struct web *w) .ssl_private_key_filepath = w->ssl_private_key }; - info("Starting web sub-system"); + info("Starting Web sub-system: webroot=%s", w->htdocs); { INDENT /* update web root of mount point */ diff --git a/src/node.c b/src/node.c index 10bd2c645..06350cadc 100644 --- a/src/node.c +++ b/src/node.c @@ -25,6 +25,8 @@ #include #endif +#include "config.h" + struct super_node sn; static void quit(int signal, siginfo_t *sinfo, void *ctx) @@ -83,7 +85,7 @@ int main(int argc, char *argv[]) usage(); #endif - info("This is VILLASnode %s (built on %s, %s)", BLD(YEL(VERSION_STR)), + info("This is VILLASnode %s (built on %s, %s)", BLD(YEL(BUILDID)), BLD(MAG(__DATE__)), BLD(MAG(__TIME__))); /* Checks system requirements*/