mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
hook: from now on both paths and nodes can have hooks!
This commit is contained in:
parent
a123736fbe
commit
526be78ca5
24 changed files with 250 additions and 131 deletions
|
@ -47,6 +47,7 @@ struct hook {
|
|||
enum state state;
|
||||
|
||||
struct path *path;
|
||||
struct node *node;
|
||||
|
||||
struct hook_type *_vt; /**< C++ like Vtable pointer. */
|
||||
void *_vd; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */
|
||||
|
@ -56,7 +57,7 @@ struct hook {
|
|||
json_t *cfg; /**< A JSON object containing the configuration of the hook. */
|
||||
};
|
||||
|
||||
int hook_init(struct hook *h, struct hook_type *vt, struct path *p);
|
||||
int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node *n);
|
||||
|
||||
int hook_parse(struct hook *h, json_t *cfg);
|
||||
int hook_parse_cli(struct hook *h, int argc, char *argv[]);
|
||||
|
@ -94,4 +95,4 @@ int hook_cmp_priority(const void *a, const void *b);
|
|||
* hooks = [ "print" ]
|
||||
* }
|
||||
*/
|
||||
int hook_parse_list(struct list *list, json_t *cfg, struct path *p);
|
||||
int hook_parse_list(struct list *list, json_t *cfg, struct path *p, struct node *n);
|
||||
|
|
|
@ -43,9 +43,15 @@
|
|||
struct hook;
|
||||
struct sample;
|
||||
|
||||
enum hook_flags {
|
||||
HOOK_BUILTIN = (1 << 0), /**< Should we add this hook by default to every path?. */
|
||||
HOOK_PATH = (1 << 1), /**< This hook type is used by paths. */
|
||||
HOOK_NODE = (1 << 2) /**< This hook type is used by nodes. */
|
||||
};
|
||||
|
||||
struct hook_type {
|
||||
int priority; /**< Default priority of this hook type. */
|
||||
bool builtin; /**< Should we add this hook by default to every path?. */
|
||||
int flags;
|
||||
|
||||
size_t size; /**< Size of allocation for struct hook::_vd */
|
||||
|
||||
|
|
|
@ -53,7 +53,11 @@ struct node
|
|||
|
||||
int id; /**< An id of this node which is only unique in the scope of it's super-node (VILLASnode instance). */
|
||||
|
||||
unsigned long sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */
|
||||
unsigned sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */
|
||||
|
||||
struct stats *stats; /**< Statistic counters. This is a pointer to the statistic hooks private data. */
|
||||
|
||||
struct list hooks; /**< List of write hooks (struct hook). */
|
||||
|
||||
enum state state;
|
||||
|
||||
|
|
|
@ -48,16 +48,13 @@ struct path_source {
|
|||
struct node *node;
|
||||
|
||||
struct pool pool;
|
||||
|
||||
struct list hooks; /**< Read Hooks. */
|
||||
struct list mappings; /**< List of struct mapping_entry */
|
||||
struct list mappings; /**< List of mappings (struct mapping_entry). */
|
||||
};
|
||||
|
||||
struct path_destination {
|
||||
struct node *node;
|
||||
struct queue queue;
|
||||
|
||||
struct list hooks; /**< Write Hooks. */
|
||||
struct queue queue;
|
||||
};
|
||||
|
||||
/** The datastructure for a path. */
|
||||
|
@ -74,7 +71,7 @@ struct path {
|
|||
|
||||
struct list sources; /**< List of all incoming nodes (struct path_source). */
|
||||
struct list destinations; /**< List of all outgoing nodes (struct path_destination). */
|
||||
struct list hooks; /**< Processing hooks. */
|
||||
struct list hooks; /**< List of processing hooks (struct hook). */
|
||||
|
||||
int enabled; /**< Is this path enabled. */
|
||||
int reverse; /**< This path as a matching reverse path. */
|
||||
|
@ -82,19 +79,14 @@ struct path {
|
|||
int samplelen; /**< Will be calculated based on path::sources.mappings */
|
||||
int sequence;
|
||||
|
||||
pthread_t tid; /**< The thread id for this path. */
|
||||
|
||||
char *_name; /**< Singleton: A string which is used to print this path to screen. */
|
||||
|
||||
struct stats *stats; /**< Statistic counters. This is a pointer to the statistic hooks private data. */
|
||||
|
||||
struct super_node *super_node; /**< The super node this path belongs to. */
|
||||
|
||||
pthread_t tid; /**< The thread id for this path. */
|
||||
json_t *cfg; /**< A JSON object containing the configuration of the path. */
|
||||
};
|
||||
|
||||
/** Initialize internal data structures. */
|
||||
int path_init(struct path *p, struct super_node *sn);
|
||||
int path_init(struct path *p);
|
||||
|
||||
int path_init2(struct path *p);
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
#include "plugin.h"
|
||||
#include "config_helper.h"
|
||||
|
||||
int hook_init(struct hook *h, struct hook_type *vt, struct path *p)
|
||||
int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node *n)
|
||||
{
|
||||
int ret;
|
||||
|
||||
|
@ -41,6 +41,7 @@ int hook_init(struct hook *h, struct hook_type *vt, struct path *p)
|
|||
h->priority = vt->priority;
|
||||
|
||||
h->path = p;
|
||||
h->node = n;
|
||||
|
||||
h->_vt = vt;
|
||||
h->_vd = alloc(vt->size);
|
||||
|
@ -206,7 +207,7 @@ int hook_cmp_priority(const void *a, const void *b)
|
|||
return ha->priority - hb->priority;
|
||||
}
|
||||
|
||||
int hook_parse_list(struct list *list, json_t *cfg, struct path *o)
|
||||
int hook_parse_list(struct list *list, json_t *cfg, struct path *o, struct node *n)
|
||||
{
|
||||
if (!json_is_array(cfg))
|
||||
error("Hooks must be configured as a list of objects");
|
||||
|
@ -229,7 +230,7 @@ int hook_parse_list(struct list *list, json_t *cfg, struct path *o)
|
|||
|
||||
struct hook *h = alloc(sizeof(struct hook));
|
||||
|
||||
ret = hook_init(h, &p->hook, o);
|
||||
ret = hook_init(h, &p->hook, o, n);
|
||||
if (ret)
|
||||
jerror(&err, "Failed to initialize hook");
|
||||
|
||||
|
|
|
@ -107,6 +107,7 @@ static struct plugin p = {
|
|||
.description = "Convert message from / to floating-point / integer",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_PATH | HOOK_NODE,
|
||||
.priority = 99,
|
||||
.init = convert_init,
|
||||
.parse = convert_parse,
|
||||
|
|
|
@ -82,6 +82,7 @@ static struct plugin p = {
|
|||
.description = "Downsamping by integer factor",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_NODE | HOOK_PATH,
|
||||
.priority = 99,
|
||||
.init = decimate_init,
|
||||
.parse = decimate_parse,
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
#include "hook.h"
|
||||
#include "plugin.h"
|
||||
#include "stats.h"
|
||||
#include "path.h"
|
||||
#include "node.h"
|
||||
#include "sample.h"
|
||||
|
||||
struct drop {
|
||||
|
@ -67,8 +67,8 @@ static int drop_read(struct hook *h, struct sample *smps[], unsigned *cnt)
|
|||
dist = cur->sequence - (int32_t) prev->sequence;
|
||||
if (dist <= 0) {
|
||||
debug(10, "Reordered sample: sequence=%u, distance=%d", cur->sequence, dist);
|
||||
if (h->path && h->path->stats)
|
||||
stats_update(h->path->stats->delta, STATS_REORDERED, dist);
|
||||
if (h->node && h->node->stats)
|
||||
stats_update(h->node->stats->delta, STATS_REORDERED, dist);
|
||||
}
|
||||
else
|
||||
goto ok;
|
||||
|
@ -105,8 +105,8 @@ static struct plugin p = {
|
|||
.description = "Drop messages with reordered sequence numbers",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_BUILTIN | HOOK_NODE,
|
||||
.priority = 3,
|
||||
.builtin = true,
|
||||
.read = drop_read,
|
||||
.start = drop_start,
|
||||
.stop = drop_stop,
|
||||
|
|
|
@ -59,8 +59,8 @@ static struct plugin p = {
|
|||
.description = "Update timestamps of sample if not set",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_NODE | HOOK_BUILTIN,
|
||||
.priority = 0,
|
||||
.builtin = true,
|
||||
.read = fix_ts_read,
|
||||
}
|
||||
};
|
||||
|
|
|
@ -129,6 +129,7 @@ static struct plugin p = {
|
|||
.description = "Calc jitter, mean and variance of GPS vs NTP TS",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_NODE,
|
||||
.priority = 0,
|
||||
.init = jitter_calc_init,
|
||||
.destroy= jitter_calc_deinit,
|
||||
|
|
|
@ -72,7 +72,7 @@ static int map_parse(struct hook *h, json_t *cfg)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int map_read(struct hook *h, struct sample *smps[], unsigned *cnt)
|
||||
static int map_process(struct hook *h, struct sample *smps[], unsigned *cnt)
|
||||
{
|
||||
int ret;
|
||||
struct map *p = h->_vd;
|
||||
|
@ -104,11 +104,12 @@ static struct plugin p = {
|
|||
.description = "Remap values and / or add header, timestamp values to the sample",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_PATH,
|
||||
.priority = 99,
|
||||
.init = map_init,
|
||||
.destroy= map_destroy,
|
||||
.parse = map_parse,
|
||||
.read = map_read,
|
||||
.process= map_process,
|
||||
.size = sizeof(struct map)
|
||||
}
|
||||
};
|
||||
|
|
|
@ -101,7 +101,7 @@ static int print_parse(struct hook *h, json_t *cfg)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int print_read(struct hook *h, struct sample *smps[], unsigned *cnt)
|
||||
static int print_process(struct hook *h, struct sample *smps[], unsigned *cnt)
|
||||
{
|
||||
struct print *p = h->_vd;
|
||||
|
||||
|
@ -115,12 +115,13 @@ static struct plugin p = {
|
|||
.description = "Print the message to stdout",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_NODE | HOOK_PATH,
|
||||
.priority = 99,
|
||||
.init = print_init,
|
||||
.parse = print_parse,
|
||||
.start = print_start,
|
||||
.stop = print_stop,
|
||||
.read = print_read,
|
||||
.process= print_process,
|
||||
.size = sizeof(struct print)
|
||||
}
|
||||
};
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
|
||||
#include "hook.h"
|
||||
#include "plugin.h"
|
||||
#include "path.h"
|
||||
#include "node.h"
|
||||
#include "sample.h"
|
||||
|
||||
struct restart {
|
||||
|
@ -58,19 +58,19 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt)
|
|||
struct restart *r = h->_vd;
|
||||
struct sample *prev, *cur = NULL;
|
||||
|
||||
assert(h->path);
|
||||
assert(h->node);
|
||||
|
||||
for (i = 0, prev = r->prev; i < *cnt; i++, prev = cur) {
|
||||
cur = smps[i];
|
||||
|
||||
if (prev) {
|
||||
if (cur->sequence == 0 && prev->sequence <= UINT32_MAX - 32) {
|
||||
warn("Simulation for path %s restarted (previous->seq=%u, current->seq=%u)",
|
||||
path_name(h->path), prev->sequence, cur->sequence);
|
||||
warn("Simulation from node %s restarted (previous->seq=%u, current->seq=%u)",
|
||||
node_name(h->node), prev->sequence, cur->sequence);
|
||||
|
||||
/* Run restart hooks */
|
||||
for (size_t i = 0; i < list_length(&h->path->hooks); i++) {
|
||||
struct hook *k = list_at(&h->path->hooks, i);
|
||||
for (size_t i = 0; i < list_length(&h->node->hooks); i++) {
|
||||
struct hook *k = list_at(&h->node->hooks, i);
|
||||
|
||||
hook_restart(k);
|
||||
}
|
||||
|
@ -90,11 +90,11 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt)
|
|||
|
||||
static struct plugin p = {
|
||||
.name = "restart",
|
||||
.description = "Call restart hooks for current path",
|
||||
.description = "Call restart hooks for current node",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_NODE | HOOK_BUILTIN,
|
||||
.priority = 1,
|
||||
.builtin = true,
|
||||
.read = restart_read,
|
||||
.start = restart_start,
|
||||
.stop = restart_stop,
|
||||
|
|
|
@ -63,6 +63,7 @@ static struct plugin p = {
|
|||
.description = "Shift sequence number of samples",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_NODE | HOOK_PATH,
|
||||
.priority = 99,
|
||||
.parse = shift_seq_parse,
|
||||
.read = shift_seq_read,
|
||||
|
|
|
@ -104,6 +104,7 @@ static struct plugin p = {
|
|||
.description = "Shift timestamps of samples",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_NODE | HOOK_PATH,
|
||||
.priority = 99,
|
||||
.init = shift_ts_init,
|
||||
.parse = shift_ts_parse,
|
||||
|
|
|
@ -149,6 +149,7 @@ static struct plugin p = {
|
|||
.description = "Skip the first samples",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_NODE | HOOK_PATH,
|
||||
.priority = 99,
|
||||
.parse = skip_first_parse,
|
||||
.start = skip_first_restart,
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
#include "hook.h"
|
||||
#include "plugin.h"
|
||||
#include "stats.h"
|
||||
#include "path.h"
|
||||
#include "node.h"
|
||||
|
||||
struct stats_collect {
|
||||
struct stats stats;
|
||||
|
@ -50,8 +50,8 @@ static int stats_collect_init(struct hook *h)
|
|||
/* Register statistic object to path.
|
||||
*
|
||||
* This allows the path code to update statistics. */
|
||||
if (h->path)
|
||||
h->path->stats = &p->stats;
|
||||
if (h->node)
|
||||
h->node->stats = &p->stats;
|
||||
|
||||
/* Set default values */
|
||||
p->format = STATS_FORMAT_HUMAN;
|
||||
|
@ -165,6 +165,7 @@ static struct plugin p = {
|
|||
.description = "Collect statistics for the current path",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_NODE,
|
||||
.priority = 2,
|
||||
.init = stats_collect_init,
|
||||
.destroy= stats_collect_destroy,
|
||||
|
|
|
@ -42,6 +42,7 @@ static struct plugin p = {
|
|||
.description = "Overwrite origin timestamp of samples with receive timestamp",
|
||||
.type = PLUGIN_TYPE_HOOK,
|
||||
.hook = {
|
||||
.flags = HOOK_NODE,
|
||||
.priority = 99,
|
||||
.read = ts_read,
|
||||
.size = 0
|
||||
|
|
|
@ -159,7 +159,7 @@ int mapping_parse_str(struct mapping_entry *e, const char *str, struct list *nod
|
|||
}
|
||||
else {
|
||||
e->data.offset = 0;
|
||||
e->length = e->node->samplelen;
|
||||
e->length = e->node ? e->node->samplelen : 0;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
|
84
lib/node.c
84
lib/node.c
|
@ -33,12 +33,14 @@
|
|||
int node_init(struct node *n, struct node_type *vt)
|
||||
{
|
||||
static int max_id;
|
||||
int ret;
|
||||
|
||||
assert(n->state == STATE_DESTROYED);
|
||||
|
||||
n->_vt = vt;
|
||||
n->_vd = alloc(vt->size);
|
||||
|
||||
|
||||
n->stats = NULL;
|
||||
n->name = NULL;
|
||||
n->_name = NULL;
|
||||
n->_name_long = NULL;
|
||||
|
@ -51,26 +53,58 @@ int node_init(struct node *n, struct node_type *vt)
|
|||
|
||||
list_push(&vt->instances, n);
|
||||
|
||||
list_init(&n->hooks);
|
||||
|
||||
/* Add internal hooks if they are not already in the list */
|
||||
for (size_t i = 0; i < list_length(&plugins); i++) {
|
||||
struct plugin *q = list_at(&plugins, i);
|
||||
|
||||
if (q->type != PLUGIN_TYPE_HOOK)
|
||||
continue;
|
||||
|
||||
struct hook_type *vt = &q->hook;
|
||||
|
||||
if ((vt->flags & HOOK_NODE) && (vt->flags & HOOK_BUILTIN)) {
|
||||
struct hook *h = alloc(sizeof(struct hook));
|
||||
|
||||
ret = hook_init(h, vt, NULL, n);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
list_push(&n->hooks, h);
|
||||
}
|
||||
}
|
||||
|
||||
n->state = STATE_INITIALIZED;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int node_init2(struct node *n)
|
||||
{
|
||||
/* We sort the hooks according to their priority before starting the path */
|
||||
list_sort(&n->hooks, hook_cmp_priority);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int node_parse(struct node *n, json_t *cfg, const char *name)
|
||||
{
|
||||
struct plugin *p;
|
||||
int ret;
|
||||
|
||||
json_error_t err;
|
||||
json_t *json_hooks = NULL;
|
||||
|
||||
const char *type;
|
||||
|
||||
n->name = strdup(name);
|
||||
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: i, s?: i }",
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: i, s?: i, s?: o }",
|
||||
"type", &type,
|
||||
"vectorize", &n->vectorize,
|
||||
"samplelen", &n->samplelen
|
||||
"samplelen", &n->samplelen,
|
||||
"hooks", &json_hooks
|
||||
);
|
||||
if (ret)
|
||||
jerror(&err, "Failed to parse node '%s'", node_name(n));
|
||||
|
@ -78,6 +112,12 @@ int node_parse(struct node *n, json_t *cfg, const char *name)
|
|||
p = plugin_lookup(PLUGIN_TYPE_NODE, type);
|
||||
assert(&p->node == n->_vt);
|
||||
|
||||
if (json_hooks) {
|
||||
ret = hook_parse_list(&n->hooks, json_hooks, NULL, n);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = n->_vt->parse ? n->_vt->parse(n, cfg) : 0;
|
||||
if (ret)
|
||||
error("Failed to parse node '%s'", node_name(n));
|
||||
|
@ -138,6 +178,14 @@ int node_start(struct node *n)
|
|||
|
||||
info("Starting node %s", node_name_long(n));
|
||||
{ INDENT
|
||||
for (size_t i = 0; i < list_length(&n->hooks); i++) {
|
||||
struct hook *h = list_at(&n->hooks, i);
|
||||
|
||||
ret = hook_start(h);
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = n->_vt->start ? n->_vt->start(n) : 0;
|
||||
if (ret)
|
||||
return ret;
|
||||
|
@ -159,6 +207,14 @@ int node_stop(struct node *n)
|
|||
|
||||
info("Stopping node %s", node_name(n));
|
||||
{ INDENT
|
||||
for (size_t i = 0; i < list_length(&n->hooks); i++) {
|
||||
struct hook *h = list_at(&n->hooks, i);
|
||||
|
||||
ret = hook_stop(h);
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = n->_vt->stop ? n->_vt->stop(n) : 0;
|
||||
}
|
||||
|
||||
|
@ -172,6 +228,8 @@ int node_destroy(struct node *n)
|
|||
{
|
||||
assert(n->state != STATE_DESTROYED && n->state != STATE_STARTED);
|
||||
|
||||
list_destroy(&n->hooks, (dtor_cb_t) hook_destroy, true);
|
||||
|
||||
if (n->_vt->destroy)
|
||||
n->_vt->destroy(n);
|
||||
|
||||
|
@ -196,7 +254,7 @@ int node_destroy(struct node *n)
|
|||
|
||||
int node_read(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int readd, nread = 0;
|
||||
int readd, rread, nread = 0;
|
||||
|
||||
if (!n->_vt->read)
|
||||
return -1;
|
||||
|
@ -223,7 +281,17 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
for (int i = 0; i < nread; i++)
|
||||
smps[i]->source = n;
|
||||
|
||||
return nread;
|
||||
rread = hook_read_list(&n->hooks, smps, nread);
|
||||
if (nread != rread) {
|
||||
int skipped = nread - rread;
|
||||
|
||||
debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for node %s", skipped, nread, node_name(n));
|
||||
|
||||
if (n->stats)
|
||||
stats_update(n->stats->delta, STATS_SKIPPED, skipped);
|
||||
}
|
||||
|
||||
return rread;
|
||||
}
|
||||
|
||||
int node_write(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
|
@ -233,6 +301,10 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
if (!n->_vt->write)
|
||||
return -1;
|
||||
|
||||
cnt = hook_write_list(&n->hooks, smps, cnt);
|
||||
if (cnt <= 0)
|
||||
return cnt;
|
||||
|
||||
/* Send in parts if vector not supported */
|
||||
if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) {
|
||||
while (cnt - nsent > 0) {
|
||||
|
@ -269,7 +341,7 @@ char * node_name_long(struct node *n)
|
|||
if (n->_vt->print) {
|
||||
struct node_type *vt = n->_vt;
|
||||
char *name_long = vt->print(n);
|
||||
strcatf(&n->_name_long, "%s: id=%d, vectorize=%d, samplelen=%d, %s", node_name(n), n->id, n->vectorize, n->samplelen, name_long);
|
||||
strcatf(&n->_name_long, "%s: #hooks=%zu, id=%d, vectorize=%d, samplelen=%d, %s", node_name(n), list_length(&n->hooks), n->id, n->vectorize, n->samplelen, name_long);
|
||||
free(name_long);
|
||||
}
|
||||
else
|
||||
|
|
181
lib/path.c
181
lib/path.c
|
@ -35,12 +35,33 @@
|
|||
#include "queue.h"
|
||||
#include "hook.h"
|
||||
#include "plugin.h"
|
||||
#include "super_node.h"
|
||||
#include "memory.h"
|
||||
#include "stats.h"
|
||||
#include "node.h"
|
||||
|
||||
static void path_read_source(struct path *p, struct path_source *ps)
|
||||
static int path_source_init(struct path_source *ps)
|
||||
{
|
||||
int ret;
|
||||
|
||||
ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, ps->node->vectorize), SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int path_source_destroy(struct path_source *ps)
|
||||
{
|
||||
int ret;
|
||||
|
||||
ret = pool_destroy(&ps->pool);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void path_source_read(struct path *p, struct path_source *ps)
|
||||
{
|
||||
int ready, recv, mux, enqueue, enqueued;
|
||||
int cnt = ps->node->vectorize;
|
||||
|
@ -55,19 +76,12 @@ static void path_read_source(struct path *p, struct path_source *ps)
|
|||
|
||||
/* Read ready samples and store them to blocks pointed by smps[] */
|
||||
recv = node_read(ps->node, read_smps, ready);
|
||||
if (recv < 0)
|
||||
if (recv == 0)
|
||||
goto out2;
|
||||
else if (recv < 0)
|
||||
error("Failed to receive message from node %s", node_name(ps->node));
|
||||
else if (recv < ready)
|
||||
warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready);
|
||||
|
||||
/* Run read hooks */
|
||||
enqueue = hook_read_list(&p->hooks, read_smps, recv);
|
||||
if (enqueue != recv) {
|
||||
debug(LOG_PATH | 10, "Hooks skipped %u out of %u samples for path %s", recv - enqueue, recv, path_name(p));
|
||||
|
||||
if (p->stats)
|
||||
stats_update(p->stats->delta, STATS_SKIPPED, recv - enqueue);
|
||||
}
|
||||
|
||||
/* Mux samples */
|
||||
mux = sample_alloc(&p->pool, muxed_smps, recv);
|
||||
|
@ -84,12 +98,8 @@ static void path_read_source(struct path *p, struct path_source *ps)
|
|||
|
||||
/* Run processing hooks */
|
||||
enqueue = hook_process_list(&p->hooks, muxed_smps, mux);
|
||||
if (enqueue != mux) {
|
||||
debug(LOG_PATH | 10, "Hooks skipped %u out of %u samples for path %s", mux - enqueue, recv, path_name(p));
|
||||
|
||||
if (p->stats)
|
||||
stats_update(p->stats->delta, STATS_SKIPPED, mux - enqueue);
|
||||
}
|
||||
if (enqueue == 0)
|
||||
goto out1;
|
||||
|
||||
/* Keep track of the lowest index that wasn't enqueued;
|
||||
* all following samples must be freed here */
|
||||
|
@ -106,7 +116,30 @@ static void path_read_source(struct path *p, struct path_source *ps)
|
|||
debug(LOG_PATH | 15, "Enqueued %u samples from %s to queue of %s", enqueued, node_name(ps->node), node_name(pd->node));
|
||||
}
|
||||
|
||||
sample_put_many(muxed_smps, ready);
|
||||
out1: sample_put_many(muxed_smps, recv);
|
||||
out2: sample_put_many(read_smps, ready);
|
||||
}
|
||||
|
||||
static int path_destination_init(struct path_destination *pd, int queuelen)
|
||||
{
|
||||
int ret;
|
||||
|
||||
ret = queue_init(&pd->queue, queuelen, &memtype_hugepage);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int path_destination_destroy(struct path_destination *pd)
|
||||
{
|
||||
int ret;
|
||||
|
||||
ret = queue_destroy(&pd->queue);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void path_poll(struct path *p)
|
||||
|
@ -122,7 +155,7 @@ static void path_poll(struct path *p)
|
|||
struct path_source *ps = list_at(&p->sources, i);
|
||||
|
||||
if (p->reader.pfds[i].revents & POLLIN) {
|
||||
path_read_source(p, ps);
|
||||
path_source_read(p, ps);
|
||||
updates++;
|
||||
}
|
||||
}
|
||||
|
@ -135,7 +168,6 @@ static void path_write(struct path *p)
|
|||
|
||||
int cnt = pd->node->vectorize;
|
||||
int sent;
|
||||
int tosend;
|
||||
int available;
|
||||
int released;
|
||||
|
||||
|
@ -151,15 +183,11 @@ static void path_write(struct path *p)
|
|||
|
||||
debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p));
|
||||
|
||||
tosend = hook_write_list(&p->hooks, smps, available);
|
||||
if (tosend == 0)
|
||||
continue;
|
||||
|
||||
sent = node_write(pd->node, smps, tosend);
|
||||
sent = node_write(pd->node, smps, available);
|
||||
if (sent < 0)
|
||||
error("Failed to sent %u samples to node %s", cnt, node_name(pd->node));
|
||||
else if (sent < tosend)
|
||||
warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, tosend);
|
||||
else if (sent < available)
|
||||
warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, available);
|
||||
|
||||
released = sample_put_many(smps, sent);
|
||||
|
||||
|
@ -174,29 +202,20 @@ static void * path_run(void *arg)
|
|||
struct path *p = arg;
|
||||
|
||||
for (;;) {
|
||||
path_poll(p);
|
||||
/* We only need to poll in case there is more than one source */
|
||||
if (list_length(&p->sources) > 1)
|
||||
path_poll(p);
|
||||
|
||||
path_write(p);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int path_source_destroy(struct path_source *ps)
|
||||
int path_init(struct path *p)
|
||||
{
|
||||
pool_destroy(&ps->pool);
|
||||
int ret;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int path_destination_destroy(struct path_destination *pd)
|
||||
{
|
||||
queue_destroy(&pd->queue);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int path_init(struct path *p, struct super_node *sn)
|
||||
{
|
||||
assert(p->state == STATE_DESTROYED);
|
||||
|
||||
list_init(&p->hooks);
|
||||
|
@ -210,7 +229,25 @@ int path_init(struct path *p, struct super_node *sn)
|
|||
p->enabled = 1;
|
||||
p->queuelen = DEFAULT_QUEUELEN;
|
||||
|
||||
p->super_node = sn;
|
||||
/* Add internal hooks if they are not already in the list */
|
||||
for (size_t i = 0; i < list_length(&plugins); i++) {
|
||||
struct plugin *q = list_at(&plugins, i);
|
||||
|
||||
if (q->type != PLUGIN_TYPE_HOOK)
|
||||
continue;
|
||||
|
||||
struct hook_type *vt = &q->hook;
|
||||
|
||||
if ((vt->flags & HOOK_PATH) && (vt->flags & HOOK_BUILTIN)) {
|
||||
struct hook *h = alloc(sizeof(struct hook));
|
||||
|
||||
ret = hook_init(h, vt, p, NULL);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
list_push(&p->hooks, h);
|
||||
}
|
||||
}
|
||||
|
||||
p->state = STATE_INITIALIZED;
|
||||
|
||||
|
@ -223,27 +260,6 @@ int path_init2(struct path *p)
|
|||
|
||||
assert(p->state == STATE_CHECKED);
|
||||
|
||||
/* Add internal hooks if they are not already in the list*/
|
||||
for (size_t i = 0; i < list_length(&plugins); i++) {
|
||||
struct plugin *q = list_at(&plugins, i);
|
||||
|
||||
if (q->type == PLUGIN_TYPE_HOOK) {
|
||||
struct hook_type *vt = &q->hook;
|
||||
|
||||
if (vt->builtin) {
|
||||
struct hook *h = alloc(sizeof(struct hook));
|
||||
|
||||
ret = hook_init(h, vt, p);
|
||||
if (ret) {
|
||||
free(h);
|
||||
return ret;
|
||||
}
|
||||
|
||||
list_push(&p->hooks, h);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* We sort the hooks according to their priority before starting the path */
|
||||
list_sort(&p->hooks, hook_cmp_priority);
|
||||
|
||||
|
@ -251,30 +267,34 @@ int path_init2(struct path *p)
|
|||
for (size_t i = 0; i < list_length(&p->destinations); i++) {
|
||||
struct path_destination *pd = list_at(&p->destinations, i);
|
||||
|
||||
ret = queue_init(&pd->queue, p->queuelen, &memtype_hugepage);
|
||||
ret = path_destination_init(pd, p->queuelen);
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Initialize sources */
|
||||
for (size_t i = 0; i < list_length(&p->sources); i++) {
|
||||
struct path_source *ps = list_at(&p->sources, i);
|
||||
|
||||
ret = path_source_init(ps);
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Calc sample length of path */
|
||||
p->samplelen = 0;
|
||||
for (size_t i = 0; i < list_length(&p->sources); i++) {
|
||||
struct path_source *ps = list_at(&p->sources, i);
|
||||
|
||||
/** @todo replace p->queuelen with ps->node->vectorize ? */
|
||||
ret = pool_init(&ps->pool, p->queuelen, SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage);
|
||||
if (ret)
|
||||
error("Failed to allocate memory pool for path");
|
||||
|
||||
for (size_t i = 0; i < list_length(&ps->mappings); i++) {
|
||||
struct mapping_entry *me = list_at(&ps->mappings, i);
|
||||
|
||||
|
||||
if (me->offset + me->length > p->samplelen)
|
||||
p->samplelen = me->offset + me->length;
|
||||
}
|
||||
}
|
||||
|
||||
ret = pool_init(&p->pool, list_length(&p->destinations) * p->queuelen, SAMPLE_LEN(p->samplelen), &memtype_hugepage);
|
||||
|
||||
ret = pool_init(&p->pool, MAX(1, list_length(&p->destinations)) * p->queuelen, SAMPLE_LEN(p->samplelen), &memtype_hugepage);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
|
@ -353,10 +373,11 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes)
|
|||
ps = alloc(sizeof(struct path_source));
|
||||
|
||||
ps->node = me->node;
|
||||
|
||||
|
||||
ps->mappings.state = STATE_DESTROYED;
|
||||
|
||||
list_init(&ps->mappings);
|
||||
list_init(&ps->hooks);
|
||||
|
||||
|
||||
list_push(&p->sources, ps);
|
||||
}
|
||||
|
||||
|
@ -369,14 +390,12 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes)
|
|||
struct path_destination *pd = alloc(sizeof(struct path_destination));
|
||||
|
||||
pd->node = n;
|
||||
|
||||
list_init(&pd->hooks);
|
||||
|
||||
list_push(&p->destinations, pd);
|
||||
}
|
||||
|
||||
if (json_hooks) {
|
||||
ret = hook_parse_list(&p->hooks, json_hooks, p);
|
||||
ret = hook_parse_list(&p->hooks, json_hooks, p, NULL);
|
||||
if (ret)
|
||||
return ret;
|
||||
}
|
||||
|
@ -571,7 +590,7 @@ int path_reverse(struct path *p, struct path *r)
|
|||
struct hook *h = list_at(&p->hooks, i);
|
||||
struct hook *g = alloc(sizeof(struct hook));
|
||||
|
||||
ret = hook_init(g, h->_vt, r);
|
||||
ret = hook_init(g, h->_vt, r, NULL);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
|
|
|
@ -285,7 +285,7 @@ int super_node_parse_json(struct super_node *sn, json_t *cfg)
|
|||
json_array_foreach(cfg_paths, index, cfg_path) {
|
||||
struct path *p = alloc(sizeof(struct path));
|
||||
|
||||
ret = path_init(p, sn);
|
||||
ret = path_init(p);
|
||||
if (ret)
|
||||
error("Failed to initialize path");
|
||||
|
||||
|
@ -298,7 +298,7 @@ int super_node_parse_json(struct super_node *sn, json_t *cfg)
|
|||
if (p->reverse) {
|
||||
struct path *r = alloc(sizeof(struct path));
|
||||
|
||||
ret = path_init(r, sn);
|
||||
ret = path_init(r);
|
||||
if (ret)
|
||||
error("Failed to init path");
|
||||
|
||||
|
|
|
@ -193,7 +193,7 @@ check: if (optarg == endptr)
|
|||
if (!p)
|
||||
error("Unknown hook function '%s'", hook);
|
||||
|
||||
ret = hook_init(&h, &p->hook, NULL);
|
||||
ret = hook_init(&h, &p->hook, NULL, NULL);
|
||||
if (ret)
|
||||
error("Failed to initialize hook");
|
||||
|
||||
|
|
14
src/node.c
14
src/node.c
|
@ -170,6 +170,20 @@ int main(int argc, char *argv[])
|
|||
hook_periodic(h);
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < list_length(&sn.nodes); i++) {
|
||||
struct node *n = list_at(&sn.nodes, i);
|
||||
|
||||
if (n->state != STATE_STARTED)
|
||||
continue;
|
||||
|
||||
for (size_t j = 0; j < list_length(&n->hooks); j++) {
|
||||
struct hook *h = list_at(&n->hooks, j);
|
||||
|
||||
hook_periodic(h);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
|
Loading…
Add table
Reference in a new issue