From 909bde089485d3895fed450cb88d775c3349cb70 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 8 Jun 2016 22:38:21 +0200 Subject: [PATCH] introduced new data structures --- include/msg_format.h | 21 +++- include/node.h | 107 ++++++++++---------- include/path.h | 35 +++---- lib/cfg.c | 61 +++++++----- lib/node.c | 99 ++++++++++--------- lib/path.c | 225 +++++++++++++++++++++++++++++-------------- src/pipe.c | 96 +++++++++++------- src/server.c | 15 ++- src/signal.c | 35 ++++--- src/test.c | 25 +++-- 10 files changed, 442 insertions(+), 277 deletions(-) diff --git a/include/msg_format.h b/include/msg_format.h index 9b9eb145a..58d3eb778 100644 --- a/include/msg_format.h +++ b/include/msg_format.h @@ -39,7 +39,22 @@ #endif /** The total size in bytes of a message */ -#define MSG_LEN(values) (4 * (values) + 16 /* header */) +#define MSG_LEN(values) (sizeof(struct msg) + MSG_DATA_LEN(values)) + +/** The length of \p values values in bytes. */ +#define MSG_DATA_LEN(values) (sizeof(float) * (values)) + +/** The offset to the first data value in a message. */ +#define MSG_DATA_OFFSET(msg) ((char *) (msg) + offsetof(struct msg, data)) + +/** Initialize a message with default values */ +#define MSG_INIT(val, seq) (struct msg) {\ + .version = MSG_VERSION, \ + .type = MSG_TYPE_DATA, \ + .endian = MSG_ENDIAN_HOST, \ + .values = val, \ + .sequence = seq \ +} /** The timestamp of a message in struct timespec format */ #define MSG_TS(msg) (struct timespec) { \ @@ -71,7 +86,7 @@ struct msg /** A timestamp per message. Endianess is specified in msg::endian. */ struct { - uint32_t sec; /**< Seconds since 1970-01-01 00:00:00 */ + uint32_t sec; /**< Seconds since 1970-01-01 00:00:00 */ uint32_t nsec; /**< Nanoseconds of the current second. */ } ts; @@ -80,6 +95,6 @@ struct msg float f; /**< Floating point values (note msg::endian) */ uint32_t i; /**< Integer values (note msg::endian) */ } data[]; -} __attribute__((aligned(64), packed)); +} __attribute__((packed)); #endif /* _MSG_FORMAT_H_ */ diff --git a/include/node.h b/include/node.h index 19d2656f8..3e72eec05 100644 --- a/include/node.h +++ b/include/node.h @@ -23,9 +23,9 @@ #include #include -#include "msg.h" +#include "sample.h" #include "list.h" -#include "pool.h" +#include "queue.h" /* Helper macros for virtual node type */ #define REGISTER_NODE_TYPE(vt) \ @@ -36,7 +36,41 @@ __attribute__((constructor)) static void __register() { \ extern struct list node_types; /**< Vtable for virtual node sub types */ /* Forward declarations */ -struct config_setting_t *cfg; +struct config_setting_t cfg; +struct node_type; + +/** The data structure for a node. + * + * Every entity which exchanges messages is represented by a node. + * Nodes can be remote machines and simulators or locally running processes. + */ +struct node +{ + const char *name; /**< A short identifier of the node, only used for configuration and logging */ + + char *_name; /**< Singleton: A string used to print to screen. */ + char *_name_long; /**< Singleton: A string used to print to screen. */ + + int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */ + int affinity; /**< CPU Affinity of this node */ + + qptr_t sent; /**< Number of samples sent / written to this node. */ + qptr_t received; /**< Number of samples received / read from this node. */ + + enum node_state { + NODE_INVALID, /**< This node object is not in a valid state. */ + NODE_CREATED, /**< This node has been parsed from the configuration. */ + NODE_STARTING, /**< This node is currently being started. */ + NODE_RUNNING, /**< This node has been started by calling node_open() */ + NODE_STOPPING, /**< This node is currently shutting down. */ + NODE_STOPPED /**< Node was running, but has been stopped by calling node_close() */ + } state; /**< Node state */ + + struct node_type *_vt; /**< Virtual functions (C++ OOP style) */ + void *_vd; /**< Virtual data (used by struct node::_vt functions) */ + + config_setting_t *cfg; /**< A pointer to the libconfig object which instantiated this node */ +}; /** C++ like vtable construct for node_types */ struct node_type { @@ -47,6 +81,11 @@ struct node_type { struct list instances; /**< A list of all existing nodes of this type. */ size_t size; /**< Size of private data bock. @see node::_vd */ + enum node_type_state { + NODE_TYPE_UNINITIALIZED = 0, + NODE_TYPE_INITIALIZED + } state; + /** Global initialization per node type. * * This callback is invoked once per node-type. @@ -121,11 +160,11 @@ struct node_type { * Some node types might only support to receive one message at a time. * * @param n A pointer to the node object. - * @param pool A pointer to circular buffer containing the messages. - * @param cnt The number of messages which should be sent. + * @param smps An array of pointers to memory blocks where the function should store received samples. + * @param cnt The number of messages which should be received. * @return The number of messages actually received. */ - int (*read) (struct node *n, struct pool *pool, int cnt); + int (*read) (struct node *n, struct sample *smps[], unsigned cnt); /** Send multiple messages in a single datagram / packet. * @@ -135,11 +174,11 @@ struct node_type { * So the indexes will wrap around after len. * * @param n A pointer to the node object. - * @param pool A pointer to circular buffer containing the messages. + * @param smps An array of pointers to memory blocks where samples read from. * @param cnt The number of messages which should be sent. * @return The number of messages actually sent. */ - int (*write)(struct node *n, struct pool *pool, int cnt); + int (*write)(struct node *n, struct sample *smps[], unsigned cnt); /** Reverse source and destination of a node. * @@ -150,36 +189,6 @@ struct node_type { int (*reverse)(struct node *n); }; -/** The data structure for a node. - * - * Every entity which exchanges messages is represented by a node. - * Nodes can be remote machines and simulators or locally running processes. - */ -struct node -{ - const char *name; /**< A short identifier of the node, only used for configuration and logging */ - - char *_name; /**< Singleton: A string used to print to screen. */ - char *_name_long; /**< Singleton: A string used to print to screen. */ - - int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */ - int affinity; /**< CPU Affinity of this node */ - - enum node_state { - NODE_INVALID, /**< This node object is not in a valid state. */ - NODE_CREATED, /**< This node has been parsed from the configuration. */ - NODE_STARTING, /**< This node is currently being started. */ - NODE_RUNNING, /**< This node has been started by calling node_open() */ - NODE_STOPPING, /**< This node is currently shutting down. */ - NODE_STOPPED /**< Node was running, but has been stopped by calling node_close() */ - } state; /**< Node state */ - - struct node_type *_vt; /**< Virtual functions (C++ OOP style) */ - void *_vd; /**< Virtual data (used by struct node::_vt functions) */ - - config_setting_t *cfg; /**< A pointer to the libconfig object which instantiated this node */ -}; - /** Initialize all registered node type subsystems. * * @see node_type::init @@ -244,28 +253,14 @@ char * node_name_long(struct node *n); /** Return a pointer to a string which describes the node type */ const char * node_name_type(struct node *n); -/** Receive multiple messages at once. - * - * @see node_type::read - */ -int node_read(struct node *n, struct pool *pool, int cnt); - -/** Send multiple messages in a single datagram / packet. - * - * @see node_type::write - */ -int node_write(struct node *n, struct pool *pool, int cnt); - -/** Read a single message without using a message pool. */ -int node_read_single(struct node *n, struct msg *m); - -/** Send a single message without using a message pool. */ -int node_write_single(struct node *n, struct msg *m); - /** Reverse local and remote socket address. * * @see node_type::reverse */ int node_reverse(struct node *n); +int node_read(struct node *n, struct sample *smps[], unsigned cnt); + +int node_write(struct node *n, struct sample *smps[], unsigned cnt); + #endif /** _NODE_H_ @} */ diff --git a/include/path.h b/include/path.h index 1615689f8..b975ef925 100644 --- a/include/path.h +++ b/include/path.h @@ -24,6 +24,7 @@ #include "node.h" #include "msg.h" #include "hooks.h" +#include "queue.h" #include "pool.h" /** The datastructure for a path. @@ -40,16 +41,18 @@ struct path } state; /**< Path state */ struct node *in; /**< Pointer to the incoming node */ - struct node *out; /**< Pointer to the first outgoing node ( path::out == list_first(path::destinations) */ + struct queue queue; /**< A ring buffer for all received messages (unmodified) */ + struct pool pool; /**< Memory pool for messages / samples. */ + struct list destinations; /**< List of all outgoing nodes */ struct list hooks; /**< List of function pointers to hooks */ + int values; /**< Maximum number of values per sample for this path. */ + int queuelen; /**< Size of sample queue for this path. */ int enabled; /**< Is this path enabled */ int tfd; /**< Timer file descriptor for fixed rate sending */ double rate; /**< Send messages with a fixed rate over this path */ - - struct pool pool; /**< A circular buffer of past messages */ pthread_t recv_tid; /**< The thread id for this path */ pthread_t sent_tid; /**< A second thread id for fixed rate sending thread */ @@ -67,25 +70,17 @@ struct path struct hist gap_seq; /**< Histogram of sequence number displacement of received messages */ } hist; - struct { - struct timespec recv; /**< Last message received */ - struct timespec sent; /**< Last message sent */ - struct timespec last; /**< Previous message received (old value of path::ts__recv) */ - } ts; - - /* Statistics */ - unsigned int sent; /**< Counter for sent messages to all outgoing nodes */ - unsigned int received; /**< Counter for received messages from all incoming nodes */ - unsigned int invalid; /**< Counter for invalid messages */ - unsigned int skipped; /**< Counter for skipped messages due to hooks */ - unsigned int dropped; /**< Counter for dropped messages due to reordering */ - unsigned int overrun; /**< Counter of overruns for fixed-rate sending */ + /* Statistic counters */ + uintmax_t invalid; /**< Counter for invalid messages */ + uintmax_t skipped; /**< Counter for skipped messages due to hooks */ + uintmax_t dropped; /**< Counter for dropped messages due to reordering */ + uintmax_t overrun; /**< Counter of overruns for fixed-rate sending */ /** @} */ }; /** Create a path by allocating dynamic memory. */ -struct path * path_create(size_t poolsize, size_t values); +void path_init(struct path *p); /** Destroy path by freeing dynamically allocated memory. * @@ -93,6 +88,12 @@ struct path * path_create(size_t poolsize, size_t values); */ void path_destroy(struct path *p); +/** Initialize pool queue and hooks. + * + * Should be called after path_init() and before path_start(). + */ +int path_prepare(struct path *p); + /** Start a path. * * Start a new pthread for receiving/sending messages over this path. diff --git a/lib/cfg.c b/lib/cfg.c index 3624aacdd..8f5bcfc31 100644 --- a/lib/cfg.c +++ b/lib/cfg.c @@ -93,15 +93,12 @@ int config_parse_path(config_setting_t *cfg, { config_setting_t *cfg_out, *cfg_hook; const char *in; - int reverse, poolsize, values; + int ret, reverse; + struct path *p; - /* Pool settings */ - if (!config_setting_lookup_int(cfg, "poolsize", &poolsize)) - poolsize = DEFAULT_POOLSIZE; - if (!config_setting_lookup_int(cfg, "values", &values)) - values = DEFAULT_MSGVALUES; - - struct path *p = path_create(poolsize, values); + /* Allocate memory and intialize path structure */ + p = alloc(sizeof(struct path)); + path_init(p); /* Input node */ if (!config_setting_lookup_string(cfg, "in", &in)) @@ -109,20 +106,35 @@ int config_parse_path(config_setting_t *cfg, p->in = list_lookup(nodes, in); if (!p->in) - error("Invalid input node '%s'", in); + cerror(cfg, "Invalid input node '%s'", in); /* Output node(s) */ cfg_out = config_setting_get_member(cfg, "out"); - if (cfg_out) - config_parse_nodelist(cfg_out, &p->destinations, nodes); + if (!cfg_out) + cerror(cfg, "Missing output nodes for path"); - p->out = (struct node *) list_first(&p->destinations); + ret = config_parse_nodelist(cfg_out, &p->destinations, nodes); + if (ret <= 0) + cerror(cfg_out, "Invalid output nodes"); + + /* Check if nodes are suitable */ + if (p->in->_vt->read == NULL) + cerror(cfg, "Input node '%s' is not supported as a source.", node_name(p->in)); + + list_foreach(struct node *n, &p->destinations) { + if (n->_vt->write == NULL) + cerror(cfg_out, "Output node '%s' is not supported as a destination.", node_name(n)); + } /* Optional settings */ cfg_hook = config_setting_get_member(cfg, "hook"); if (cfg_hook) config_parse_hooklist(cfg_hook, &p->hooks); + if (!config_setting_lookup_int(cfg, "values", &p->values)) + p->values = DEFAULT_VALUES; + if (!config_setting_lookup_int(cfg, "queuelen", &p->queuelen)) + p->queuelen = DEFAULT_QUEUELEN; if (!config_setting_lookup_bool(cfg, "reverse", &reverse)) reverse = 0; if (!config_setting_lookup_bool(cfg, "enabled", &p->enabled)) @@ -136,15 +148,14 @@ int config_parse_path(config_setting_t *cfg, if (reverse) { if (list_length(&p->destinations) > 1) - error("Can't reverse path with multiple destination nodes"); + cerror(cfg, "Can't reverse path with multiple destination nodes"); - struct path *r = path_create(poolsize, values); + struct path *r = memdup(p, sizeof(struct path)); + path_init(r); - r->in = p->out; /* Swap in/out */ - r->out = p->in; - r->rate = p->rate; - - list_push(&r->destinations, r->out); + /* Swap source and destination node */ + r->in = list_first(&p->destinations); + list_push(&r->destinations, p->in); if (cfg_hook) config_parse_hooklist(cfg_hook, &r->hooks); @@ -180,10 +191,12 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *list, struct list str = config_setting_get_string(elm); if (str) { node = list_lookup(all, str); - if (node) - list_push(list, node); - else + if (!node) cerror(elm, "Unknown outgoing node '%s'", str); + else if (node->_vt->write == NULL) + cerror(cfg, "Output node '%s' is not supported as a sink.", node_name(node)); + + list_push(list, node); } else cerror(cfg, "Invalid outgoing node"); @@ -194,7 +207,7 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *list, struct list cerror(cfg, "Invalid output node(s)"); } - return 0; + return list_length(list); } int config_parse_node(config_setting_t *cfg, struct list *nodes, struct settings *set) @@ -259,7 +272,7 @@ int config_parse_hooklist(config_setting_t *cfg, struct list *list) { cerror(cfg, "Invalid hook functions"); } - return 0; + return list_length(list); } int config_parse_hook(config_setting_t *cfg, struct list *list) diff --git a/lib/node.c b/lib/node.c index 34cc46f70..4a7396b52 100644 --- a/lib/node.c +++ b/lib/node.c @@ -8,79 +8,92 @@ #include +#include "sample.h" #include "node.h" #include "cfg.h" #include "utils.h" -struct list node_types = LIST_INIT(NULL); /**< Vtable for virtual node sub types */ +/** List of registered node-types */ +struct list node_types = LIST_INIT(); int node_parse(struct node *n, config_setting_t *cfg) { return n->_vt->parse ? n->_vt->parse(n, cfg) : 0; } -int node_read(struct node *n, struct pool *p, int c) +int node_read(struct node *n, struct sample *smps[], unsigned cnt) { - return n->_vt->read ? n->_vt->read(n, p, c) : -1; -} + int nread = 0; -int node_write(struct node *n, struct pool *p, int c) -{ - return n->_vt->write ? n->_vt->write(n, p, c) : -1; -} + if (!n->_vt->read) + return -1; -int node_read_single(struct node *n, struct msg *m) -{ - struct pool p = { - .buffer = m, - .previous = -1, - .last = 0, - .length = 1, - .stride = MSG_LEN(m->values) - }; + /* Send in parts if vector not supported */ + if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) { + while (cnt - nread > 0) { + nread += n->_vt->read(n, &smps[nread], MIN(cnt - nread, n->_vt->vectorize)); + } + } + else { + nread = n->_vt->read(n, smps, cnt); + } - return node_read(n, &p, 1); + return nread; } -int node_write_single(struct node *n, struct msg *m) +int node_write(struct node *n, struct sample *smps[], unsigned cnt) { - struct pool p = { - .buffer = m, - .previous = -1, - .last = 0, - .length = 1, - .stride = MSG_LEN(m->values) - }; + int nsent = 0; + + if (!n->_vt->write) + return -1; + + /* Send in parts if vector not supported */ + if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) { + while (cnt - nsent > 0) + nsent += n->_vt->write(n, &smps[nsent], MIN(cnt - nsent, n->_vt->vectorize)); + } + else { + nsent = n->_vt->write(n, smps, cnt); + } - return node_write(n, &p, 1); + return nsent; } int node_init(struct node_type *vt, int argc, char *argv[], config_setting_t *cfg) { - if (list_length(&vt->instances) > 0) { - info("Initializing " YEL("%s") " node type", vt->name); - - if (vt->init) { INDENT - vt->init(argc, argv, cfg); - } - } - else - warn("No node is using the " YEL("%s") " node type. Skipping...", vt->name); + int ret; + + if (vt->state != NODE_TYPE_UNINITIALIZED) + return -1; - return 0; + info("Initializing " YEL("%s") " node type", vt->name); + { INDENT + ret = vt->init ? vt->init(argc, argv, cfg) : -1; + } + + if (ret == 0) + vt->state = NODE_TYPE_INITIALIZED; + + return ret; } int node_deinit(struct node_type *vt) { - if (list_length(&vt->instances) > 0) { - info("De-initializing " YEL("%s") " node type", vt->name); + int ret; + + if (vt->state != NODE_TYPE_INITIALIZED) + return -1; - if (vt->deinit) { INDENT - vt->deinit(); - } + info("De-initializing " YEL("%s") " node type", vt->name); + { INDENT + ret = vt->deinit ? vt->deinit() : -1; } + + if (ret == 0) + vt->state = NODE_TYPE_UNINITIALIZED; - return 0; + return ret; } int node_start(struct node *n) diff --git a/lib/path.c b/lib/path.c index d5af12bdf..4aabbde86 100644 --- a/lib/path.c +++ b/lib/path.c @@ -6,28 +6,64 @@ * Unauthorized copying of this file, via any medium is strictly prohibited. *********************************************************************************/ +#include +#include #include +#include #include +#include "config.h" #include "utils.h" #include "path.h" #include "timing.h" -#include "config.h" #include "pool.h" +#include "queue.h" -static void path_write(struct path *p) +static void path_write(struct path *p, bool resend) { list_foreach(struct node *n, &p->destinations) { - int sent = node_write( - n, /* Destination node */ - &p->pool, /* Pool of received messages */ - n->vectorize /* Number of messages which should be sent */ - ); + int cnt = n->vectorize; + int sent, tosend, base, available, release, released; + struct sample *smps[n->vectorize]; - debug(15, "Sent %u messages to node %s", sent, node_name(n)); - p->sent += sent; + /* The first message in the chunk which we want to send */ + if (resend) + base = p->in->received - cnt; /* we simply resent the last vector of samples */ + else { + base = n->sent; + } - p->ts.sent = time_now(); /** @todo use hardware timestamps for socket node type */ + available = queue_get_many(&p->queue, (void **) smps, cnt, base); + if (available < cnt) + warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); + + if (available == 0) + continue; + + tosend = hook_run(p, smps, available, HOOK_WRITE); + if (tosend == 0) + continue; + + sent = node_write(n, smps, tosend); + if (sent < 0) + error("Failed to sent %u samples to node %s", cnt, node_name(n)); + else if (sent < tosend) + warn("Partial write to node %s", node_name(n)); + + debug(DBG_PATH | 15, "Sent %u messages to node %s", sent, node_name(n)); + + /* Release samples from queue in case they are not sent periodically. */ + if (resend) + continue; + + /* Decrement reference count and release samples back to pool if we had the last reference */ + release = queue_pull_many(&p->queue, (void **) smps, sent, &n->sent); + if (release > 0) + debug(DBG_PATH | 3, "Releasing %u samples to pool for path %s", release, path_name(p)); + + released = pool_put_many(&p->pool, (void **) smps, release); + if (release != released) + warn("Failed to release %u samples to pool for path %s", release - released, path_name(p)); } } @@ -47,13 +83,13 @@ static void * path_run_async(void *arg) warn("Overrun detected for path: overruns=%" PRIu64, expir); } - if (p->received == 0) + if (p->in->received == 0) continue; - if (hook_run(p, HOOK_ASYNC)) + if (hook_run(p, NULL, 0, HOOK_ASYNC)) continue; - path_write(p); + path_write(p, true); } return NULL; @@ -63,51 +99,66 @@ static void * path_run_async(void *arg) static void * path_run(void *arg) { struct path *p = arg; + unsigned cnt = p->in->vectorize; + int recv, enqueue, enqueued; + int ready = 0; /**< Number of blocks in smps[] which are allocated and ready to be used by node_read(). */ + struct sample *smps[cnt]; /* Main thread loop */ for (;;) { - /* Receive message */ - int recv = node_read(p->in, &p->pool, p->in->vectorize); + struct node *out = (struct node *) list_first(&p->destinations); + debug(DBG_PATH | 5, "Current queue status for path %s: ready=%u write=%ju read[0]=%ju", path_name(p), ready, p->in->received, out->sent); + debug(DBG_PATH | 5, "Current pool status for path %s: used=%zu avail=%zu", path_name(p), p->pool.stack.size, p->pool.stack.avail); + + /* Fill smps[] free sample blocks from the pool */ + ready += pool_get_many(&p->pool, (void **) smps, cnt - ready); + if (ready != cnt) + warn("Pool underrun for path %s", path_name(p)); + + /* Read ready samples and store them to blocks pointed by smps[] */ + recv = p->in->_vt->read(p->in, smps, ready); if (recv < 0) error("Failed to receive message from node %s", node_name(p->in)); - else if (recv == 0) - continue; + else if (recv < ready) + warn("Partial read for path %s: read=%u expected=%u", path_name(p), recv, ready); - /** @todo Replace this timestamp by hardware timestamping for node type which support it. */ - p->ts.last = p->ts.recv; - p->ts.recv = time_now(); - - debug(15, "Received %u messages from node %s", recv, node_name(p->in)); + debug(DBG_PATH | 15, "Received %u messages from node %s", recv, node_name(p->in)); - /* Run preprocessing hooks */ - if (hook_run(p, HOOK_PRE)) { - p->skipped += recv; - continue; + /* Run preprocessing hooks for vector of samples */ + enqueue = hook_run(p, smps, recv, HOOK_READ); + if (enqueue != recv) { + info("Hooks skipped %u out of %u samples for path %s", recv - enqueue, recv, path_name(p)); + p->skipped += recv - enqueue; } - /* For each received message... */ - for (int i = 0; i < recv; i++) { - /* Update tail pointer of message pool by the amount of actually received messages. */ - pool_push(&p->pool, 1); - - p->received++; + enqueued = queue_push_many(&p->queue, (void **) smps, enqueue, &p->in->received); + if (enqueue != enqueued) + warn("Failed to enqueue %u samples for path %s", enqueue - enqueued, path_name(p)); - /* Run hooks for filtering, stats collection and manipulation */ - if (hook_run(p, HOOK_MSG)) { - p->skipped++; - continue; + ready -= enqueued; + + list_foreach(struct hook *h, &p->hooks) { + int pull, release, released; + + pull = p->in->received - h->head - h->history; + if (pull > 0) { + struct sample *smps[pull]; + + release = queue_pull_many(&p->queue, (void **) smps, pull, &h->head); + if (release > 0) + debug(DBG_PATH | 3, "Releasing %u samples from queue of path %s", release, path_name(p)); + + released = pool_put_many(&p->pool, (void **) smps, release); + if (release != released) + warn("Failed to release %u samples to pool of path %s", release - released, path_name(p)); } } - - /* Run post processing hooks */ - if (hook_run(p, HOOK_POST)) { - p->skipped += recv; - continue; - } + + debug(DBG_PATH | 3, "Enqueuing %u samples to queue of path %s", enqueue, path_name(p)); /* At fixed rate mode, messages are send by another (asynchronous) thread */ - if (!p->rate) - path_write(p); + if (p->rate == 0) + path_write(p, false); } return NULL; @@ -115,13 +166,13 @@ static void * path_run(void *arg) int path_start(struct path *p) { - info("Starting path: %s (poollen=%zu, msgsize=%zu, #hooks=%zu, rate=%.1f)", - path_name(p), pool_length(&p->pool), pool_stride(&p->pool), list_length(&p->hooks), p->rate); - - /* We sort the hooks according to their priority before starting the path */ - list_sort(&p->hooks, hooks_sort_priority); + int ret; - if (hook_run(p, HOOK_PATH_START)) + info("Starting path: %s (#hooks=%zu, rate=%.1f)", + path_name(p), list_length(&p->hooks), p->rate); + + ret = hook_run(p, NULL, 0, HOOK_PATH_START); + if (ret) return -1; /* At fixed rate mode, we start another thread for sending */ @@ -154,7 +205,7 @@ int path_stop(struct path *p) p->state = PATH_STOPPED; - if (hook_run(p, HOOK_PATH_STOP)) + if (hook_run(p, NULL, 0, HOOK_PATH_STOP)) return -1; return 0; @@ -163,43 +214,77 @@ int path_stop(struct path *p) const char * path_name(struct path *p) { if (!p->_name) { - strcatf(&p->_name, "%s " MAG("=>"), p->in->name); + strcatf(&p->_name, "%s " MAG("=>"), node_name_short(p->in)); - list_foreach(struct node *n, &p->destinations) { - strcatf(&p->_name, " %s", n->name); - } + list_foreach(struct node *n, &p->destinations) + strcatf(&p->_name, " %s", node_name_short(n)); } return p->_name; } -struct path * path_create(size_t poolsize, size_t values) +void path_init(struct path *p) { - struct path *p = alloc(sizeof(struct path)); - - list_init(&p->destinations, NULL); - list_init(&p->hooks, free); - - pool_create(&p->pool, poolsize, 16 + values * sizeof(float)); /** @todo */ - + list_init(&p->destinations); + list_init(&p->hooks); + + /* Initialize hook system */ list_foreach(struct hook *h, &hooks) { if (h->type & HOOK_INTERNAL) list_push(&p->hooks, memdup(h, sizeof(*h))); } - - p->state = PATH_CREATED; - return p; + p->state = PATH_CREATED; +} + +int path_prepare(struct path *p) +{ + int ret; + + /* We sort the hooks according to their priority before starting the path */ + list_sort(&p->hooks, hooks_sort_priority); + + /* Allocate hook private memory */ + ret = hook_run(p, NULL, 0, HOOK_INIT); + if (ret) + error("Failed to initialize hooks of path: %s", path_name(p)); + + /* Parse hook arguments */ + ret = hook_run(p, NULL, 0, HOOK_PARSE); + if (ret) + error("Failed to parse arguments for hooks of path: %s", path_name(p)); + + /* Initialize queue */ + ret = pool_init_mmap(&p->pool, SAMPLE_LEN(p->values), p->queuelen); + if (ret) + error("Failed to allocate memory pool for path"); + + ret = queue_init(&p->queue, p->queuelen); + if (ret) + error("Failed to initialize queue for path"); + + /* Add a head pointer for each hook to the queue */ + list_foreach(struct hook *h, &p->hooks) + queue_reader_add(&p->queue, h->head, p->in->received); + + /* Add a head pointer for each destination node to the queue. */ + list_foreach(struct node *out, &p->destinations) + queue_reader_add(&p->queue, out->sent, p->in->received); + + return 0; } void path_destroy(struct path *p) -{ - list_destroy(&p->destinations); - list_destroy(&p->hooks); +{ + hook_run(p, NULL, 0, HOOK_DEINIT); /* Release memory */ + + list_destroy(&p->destinations, NULL, false); + list_destroy(&p->hooks, NULL, true); + + queue_destroy(&p->queue); pool_destroy(&p->pool); free(p->_name); - free(p); } int path_uses_node(struct path *p, struct node *n) { diff --git a/src/pipe.c b/src/pipe.c index d08f9155b..1359ea3c2 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -11,6 +11,7 @@ *********************************************************************************/ #include +#include #include #include #include @@ -32,7 +33,6 @@ struct pool recv_pool, send_pool; pthread_t recv_thread, send_thread; struct node *node; -int reverse; static void quit(int signal, siginfo_t *sinfo, void *ctx) { @@ -59,7 +59,10 @@ static void usage(char *name) printf("Usage: %s CONFIG [-r] NODE\n", name); printf(" CONFIG path to a configuration file\n"); printf(" NODE the name of the node to which samples are sent and received from\n"); - printf(" -r swap read / write endpoints)\n\n"); + printf(" -d LVL set debug log level to LVL\n"); + printf(" -x swap read / write endpoints\n"); + printf(" -s only read data from stdin and send it to node\n"); + printf(" -r only read data from node and write it to stdout\n\n"); print_copyright(); @@ -67,13 +70,25 @@ static void usage(char *name) } void * send_loop(void *ctx) -{ +{ + int ret; + struct sample *smps[node->vectorize]; + + /* Initialize memory */ + ret = pool_init_mmap(&send_pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize); + if (ret < 0) + error("Failed to allocate memory for receive pool."); + + ret = pool_get_many(&send_pool, (void **) smps, node->vectorize); + if (ret < 0) + error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret); + for (;;) { for (int i = 0; i < node->vectorize; i++) { - struct msg *m = pool_getrel(&send_pool, i); + struct sample *s = smps[i]; int reason; -retry: reason = msg_fscan(stdin, m, NULL, NULL); +retry: reason = sample_fscan(stdin, s, NULL); if (reason < 0) { if (feof(stdin)) return NULL; @@ -84,7 +99,7 @@ retry: reason = msg_fscan(stdin, m, NULL, NULL); } } - node_write(node, &send_pool, node->vectorize); + node_write(node, smps, node->vectorize); } return NULL; @@ -92,23 +107,27 @@ retry: reason = msg_fscan(stdin, m, NULL, NULL); void * recv_loop(void *ctx) { + int ret; + struct sample *smps[node->vectorize]; + + /* Initialize memory */ + ret = pool_init_mmap(&recv_pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize); + if (ret < 0) + error("Failed to allocate memory for receive pool."); + + ret = pool_get_many(&recv_pool, (void **) smps, node->vectorize); + if (ret < 0) + error("Failed to get %u samples out of receive pool (%d).", node->vectorize, ret); + /* Print header */ - fprintf(stdout, "# %-20s\t\t%s\n", "sec.nsec+offset(seq)", "data[]"); + fprintf(stdout, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]"); for (;;) { - struct timespec ts = time_now(); - - int recv = node_read(node, &recv_pool, node->vectorize); + int recv = node_read(node, smps, node->vectorize); for (int i = 0; i < recv; i++) { - struct msg *m = pool_getrel(&recv_pool, i); - - int ret = msg_verify(m); - if (ret) - warn("Failed to verify message: %d", ret); - - /** @todo should we drop reordered / delayed packets here? */ + struct sample *s = smps[i]; - msg_fprint(stdout, m, MSG_PRINT_ALL, time_delta(&MSG_TS(m), &ts)); + sample_fprint(stdout, s, SAMPLE_ALL); fflush(stdout); } } @@ -118,17 +137,29 @@ void * recv_loop(void *ctx) int main(int argc, char *argv[]) { + bool send = true, recv = true, reverse = false; + /* Parse command line arguments */ if (argc < 3) usage(argv[0]); + log_init(); + char c; - while ((c = getopt(argc-2, argv+2, "hr")) != -1) { + while ((c = getopt(argc-2, argv+2, "hxrsd:")) != -1) { switch (c) { - case 'r': - reverse = 1; + case 'x': + reverse = true; + break; + case 's': + recv = false; + break; + case 'r': + send = false; + break; + case 'd': + log_setlevel(atoi(optarg), -1); break; - case 'h': case '?': usage(argv[0]); @@ -151,7 +182,6 @@ int main(int argc, char *argv[]) /* Create lists */ list_init(&nodes); - log_init(); config_init(&config); config_parse(argv[1], &config, &settings, &nodes, NULL); @@ -159,22 +189,22 @@ int main(int argc, char *argv[]) node = list_lookup(&nodes, argv[2]); if (!node) error("Node '%s' does not exist!", argv[2]); - - node_init(node->_vt, argc-optind, argv+optind, config_root_setting(&config)); - - pool_create(&recv_pool, node->vectorize, sizeof(struct msg)); - pool_create(&send_pool, node->vectorize, sizeof(struct msg)); - + if (reverse) node_reverse(node); + + node_init(node->_vt, argc-optind, argv+optind, config_root_setting(&config)); node_start(node); - + /* Start threads */ - pthread_create(&recv_thread, NULL, recv_loop, NULL); - pthread_create(&send_thread, NULL, send_loop, NULL); + if (recv) + pthread_create(&recv_thread, NULL, recv_loop, NULL); + if (send) + pthread_create(&send_thread, NULL, send_loop, NULL); - for (;;) pause(); + for (;;) + pause(); return 0; } diff --git a/src/server.c b/src/server.c index b9e13b92f..17688786e 100644 --- a/src/server.c +++ b/src/server.c @@ -120,6 +120,11 @@ static void usage(const char *name) list_foreach(struct node_type *vt, &node_types) printf(" - %s: %s\n", vt->name, vt->description); printf("\n"); + + printf("Supported hooks:\n"); + list_foreach(struct hook *h, &hooks) + printf(" - %s: %s\n", h->name, h->description); + printf("\n"); print_copyright(); @@ -167,7 +172,9 @@ int main(int argc, char *argv[]) info("Initialize node types"); list_foreach(struct node_type *vt, &node_types) { INDENT - node_init(vt, argc, argv, config_root_setting(&config)); + int refs = list_length(&vt->instances); + if (refs > 0) + node_init(vt, argc, argv, config_root_setting(&config)); } info("Starting nodes"); @@ -181,8 +188,10 @@ int main(int argc, char *argv[]) info("Starting paths"); list_foreach(struct path *p, &paths) { INDENT - if (p->enabled) + if (p->enabled) { + path_prepare(p); path_start(p); + } else warn("Path %s is disabled. Skipping...", path_name(p)); } @@ -193,7 +202,7 @@ int main(int argc, char *argv[]) for (;;) { list_foreach(struct path *p, &paths) - hook_run(p, HOOK_PERIODIC); + hook_run(p, NULL, 0, HOOK_PERIODIC); usleep(settings.stats * 1e6); } } diff --git a/src/signal.c b/src/signal.c index 0c3e28841..8e5178e96 100644 --- a/src/signal.c +++ b/src/signal.c @@ -16,7 +16,7 @@ #include "config.h" #include "utils.h" -#include "msg.h" +#include "sample.h" #include "timing.h" #define CLOCKID CLOCK_REALTIME @@ -109,10 +109,10 @@ check: if (optarg == endptr) } /* Allocate memory for message buffer */ - struct msg *m = msg_create(values); + struct sample *s = alloc(SAMPLE_LEN(values)); /* Print header */ - printf("# S2SS signal params: type=%s, values=%u, rate=%f, limit=%u, amplitude=%f, freq=%f\n", + printf("# S2SS signal params: type=%s, values=%u, rate=%f, limit=%d, amplitude=%f, freq=%f\n", argv[1], values, rate, limit, ampl, freq); printf("# %-20s\t\t%s\n", "sec.nsec(seq)", "data[]"); @@ -128,30 +128,35 @@ check: if (optarg == endptr) struct timespec now = time_now(); double running = time_delta(&start, &now); - m->ts.sec = now.tv_sec; - m->ts.nsec = now.tv_nsec; - m->sequence = counter; + s->ts.origin = now; + s->sequence = counter; + s->length = values; - for (int i = 0; i < m->values; i++) { + for (int i = 0; i < values; i++) { int rtype = (type != TYPE_MIXED) ? type : i % 4; switch (rtype) { - case TYPE_RANDOM: m->data[i].f += box_muller(0, stddev); break; - case TYPE_SINE: m->data[i].f = ampl * sin(running * freq * 2 * M_PI); break; - case TYPE_TRIANGLE: m->data[i].f = ampl * (fabs(fmod(running * freq, 1) - .5) - 0.25) * 4; break; - case TYPE_SQUARE: m->data[i].f = ampl * ( (fmod(running * freq, 1) < .5) ? -1 : 1); break; - case TYPE_RAMP: m->data[i].f = fmod(counter, rate / freq); /** @todo send as integer? */ break; + case TYPE_RANDOM: s->values[i].f += box_muller(0, stddev); break; + case TYPE_SINE: s->values[i].f = ampl * sin(running * freq * 2 * M_PI); break; + case TYPE_TRIANGLE: s->values[i].f = ampl * (fabs(fmod(running * freq, 1) - .5) - 0.25) * 4; break; + case TYPE_SQUARE: s->values[i].f = ampl * ( (fmod(running * freq, 1) < .5) ? -1 : 1); break; + case TYPE_RAMP: s->values[i].f = fmod(counter, rate / freq); /** @todo send as integer? */ break; } } - msg_fprint(stdout, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0); + sample_fprint(stdout, s, SAMPLE_ALL & ~SAMPLE_OFFSET); fflush(stdout); /* Block until 1/p->rate seconds elapsed */ - counter += timerfd_wait(tfd); + int steps = timerfd_wait(tfd); + + if (steps > 1) + warn("Missed steps: %u", steps); + + counter += steps; } close(tfd); - free(m); + free(s); return 0; } diff --git a/src/test.c b/src/test.c index a5bb2946a..047c056ad 100644 --- a/src/test.c +++ b/src/test.c @@ -140,45 +140,44 @@ check: if (optarg == endptr) node_stop(node); node_deinit(node->_vt); - list_destroy(&nodes, node_destroy, false); + list_destroy(&nodes, (dtor_cb_t) node_destroy, false); config_destroy(&config); return 0; } void test_rtt() { - struct timespec sent, recv; - struct hist hist; - struct msg *m; + + struct timespec send, recv; - m = msg_create(0); + struct sample *smp_send = alloc(SAMPLE_LEN(2)); + struct sample *smp_recv = alloc(SAMPLE_LEN(2)); + hist_create(&hist, low, high, res); /* Print header */ fprintf(stdout, "%17s%5s%10s%10s%10s%10s%10s\n", "timestamp", "seq", "rtt", "min", "max", "mean", "stddev"); while (running && (count < 0 || count--)) { - clock_gettime(CLOCK_ID, &sent); - m->ts.sec = sent.tv_sec; - m->ts.nsec = sent.tv_nsec; + clock_gettime(CLOCK_ID, &send); - node_write_single(node, m); /* Ping */ - node_read_single(node, m); /* Pong */ + node_write(node, &smp_send, 1); /* Ping */ + node_read(node, &smp_recv, 1); /* Pong */ clock_gettime(CLOCK_ID, &recv); - double rtt = time_delta(&recv, &sent); + double rtt = time_delta(&recv, &send); if (rtt < 0) warn("Negative RTT: %f", rtt); hist_put(&hist, rtt); - m->sequence++; + smp_send->sequence++; fprintf(stdout, "%10lu.%06lu%5u%10.3f%10.3f%10.3f%10.3f%10.3f\n", - recv.tv_sec, recv.tv_nsec / 1000, m->sequence, + recv.tv_sec, recv.tv_nsec / 1000, smp_send->sequence, 1e3 * rtt, 1e3 * hist.lowest, 1e3 * hist.highest, 1e3 * hist_mean(&hist), 1e3 * hist_stddev(&hist)); }