1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

introduced new data structures

This commit is contained in:
Steffen Vogel 2016-06-08 22:38:21 +02:00
parent 0cebda19c8
commit 909bde0894
10 changed files with 442 additions and 277 deletions

View file

@ -39,7 +39,22 @@
#endif
/** The total size in bytes of a message */
#define MSG_LEN(values) (4 * (values) + 16 /* header */)
#define MSG_LEN(values) (sizeof(struct msg) + MSG_DATA_LEN(values))
/** The length of \p values values in bytes. */
#define MSG_DATA_LEN(values) (sizeof(float) * (values))
/** The offset to the first data value in a message. */
#define MSG_DATA_OFFSET(msg) ((char *) (msg) + offsetof(struct msg, data))
/** Initialize a message with default values */
#define MSG_INIT(val, seq) (struct msg) {\
.version = MSG_VERSION, \
.type = MSG_TYPE_DATA, \
.endian = MSG_ENDIAN_HOST, \
.values = val, \
.sequence = seq \
}
/** The timestamp of a message in struct timespec format */
#define MSG_TS(msg) (struct timespec) { \
@ -71,7 +86,7 @@ struct msg
/** A timestamp per message. Endianess is specified in msg::endian. */
struct {
uint32_t sec; /**< Seconds since 1970-01-01 00:00:00 */
uint32_t sec; /**< Seconds since 1970-01-01 00:00:00 */
uint32_t nsec; /**< Nanoseconds of the current second. */
} ts;
@ -80,6 +95,6 @@ struct msg
float f; /**< Floating point values (note msg::endian) */
uint32_t i; /**< Integer values (note msg::endian) */
} data[];
} __attribute__((aligned(64), packed));
} __attribute__((packed));
#endif /* _MSG_FORMAT_H_ */

View file

@ -23,9 +23,9 @@
#include <netinet/in.h>
#include <libconfig.h>
#include "msg.h"
#include "sample.h"
#include "list.h"
#include "pool.h"
#include "queue.h"
/* Helper macros for virtual node type */
#define REGISTER_NODE_TYPE(vt) \
@ -36,7 +36,41 @@ __attribute__((constructor)) static void __register() { \
extern struct list node_types; /**< Vtable for virtual node sub types */
/* Forward declarations */
struct config_setting_t *cfg;
struct config_setting_t cfg;
struct node_type;
/** The data structure for a node.
*
* Every entity which exchanges messages is represented by a node.
* Nodes can be remote machines and simulators or locally running processes.
*/
struct node
{
const char *name; /**< A short identifier of the node, only used for configuration and logging */
char *_name; /**< Singleton: A string used to print to screen. */
char *_name_long; /**< Singleton: A string used to print to screen. */
int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
int affinity; /**< CPU Affinity of this node */
qptr_t sent; /**< Number of samples sent / written to this node. */
qptr_t received; /**< Number of samples received / read from this node. */
enum node_state {
NODE_INVALID, /**< This node object is not in a valid state. */
NODE_CREATED, /**< This node has been parsed from the configuration. */
NODE_STARTING, /**< This node is currently being started. */
NODE_RUNNING, /**< This node has been started by calling node_open() */
NODE_STOPPING, /**< This node is currently shutting down. */
NODE_STOPPED /**< Node was running, but has been stopped by calling node_close() */
} state; /**< Node state */
struct node_type *_vt; /**< Virtual functions (C++ OOP style) */
void *_vd; /**< Virtual data (used by struct node::_vt functions) */
config_setting_t *cfg; /**< A pointer to the libconfig object which instantiated this node */
};
/** C++ like vtable construct for node_types */
struct node_type {
@ -47,6 +81,11 @@ struct node_type {
struct list instances; /**< A list of all existing nodes of this type. */
size_t size; /**< Size of private data bock. @see node::_vd */
enum node_type_state {
NODE_TYPE_UNINITIALIZED = 0,
NODE_TYPE_INITIALIZED
} state;
/** Global initialization per node type.
*
* This callback is invoked once per node-type.
@ -121,11 +160,11 @@ struct node_type {
* Some node types might only support to receive one message at a time.
*
* @param n A pointer to the node object.
* @param pool A pointer to circular buffer containing the messages.
* @param cnt The number of messages which should be sent.
* @param smps An array of pointers to memory blocks where the function should store received samples.
* @param cnt The number of messages which should be received.
* @return The number of messages actually received.
*/
int (*read) (struct node *n, struct pool *pool, int cnt);
int (*read) (struct node *n, struct sample *smps[], unsigned cnt);
/** Send multiple messages in a single datagram / packet.
*
@ -135,11 +174,11 @@ struct node_type {
* So the indexes will wrap around after len.
*
* @param n A pointer to the node object.
* @param pool A pointer to circular buffer containing the messages.
* @param smps An array of pointers to memory blocks where samples read from.
* @param cnt The number of messages which should be sent.
* @return The number of messages actually sent.
*/
int (*write)(struct node *n, struct pool *pool, int cnt);
int (*write)(struct node *n, struct sample *smps[], unsigned cnt);
/** Reverse source and destination of a node.
*
@ -150,36 +189,6 @@ struct node_type {
int (*reverse)(struct node *n);
};
/** The data structure for a node.
*
* Every entity which exchanges messages is represented by a node.
* Nodes can be remote machines and simulators or locally running processes.
*/
struct node
{
const char *name; /**< A short identifier of the node, only used for configuration and logging */
char *_name; /**< Singleton: A string used to print to screen. */
char *_name_long; /**< Singleton: A string used to print to screen. */
int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
int affinity; /**< CPU Affinity of this node */
enum node_state {
NODE_INVALID, /**< This node object is not in a valid state. */
NODE_CREATED, /**< This node has been parsed from the configuration. */
NODE_STARTING, /**< This node is currently being started. */
NODE_RUNNING, /**< This node has been started by calling node_open() */
NODE_STOPPING, /**< This node is currently shutting down. */
NODE_STOPPED /**< Node was running, but has been stopped by calling node_close() */
} state; /**< Node state */
struct node_type *_vt; /**< Virtual functions (C++ OOP style) */
void *_vd; /**< Virtual data (used by struct node::_vt functions) */
config_setting_t *cfg; /**< A pointer to the libconfig object which instantiated this node */
};
/** Initialize all registered node type subsystems.
*
* @see node_type::init
@ -244,28 +253,14 @@ char * node_name_long(struct node *n);
/** Return a pointer to a string which describes the node type */
const char * node_name_type(struct node *n);
/** Receive multiple messages at once.
*
* @see node_type::read
*/
int node_read(struct node *n, struct pool *pool, int cnt);
/** Send multiple messages in a single datagram / packet.
*
* @see node_type::write
*/
int node_write(struct node *n, struct pool *pool, int cnt);
/** Read a single message without using a message pool. */
int node_read_single(struct node *n, struct msg *m);
/** Send a single message without using a message pool. */
int node_write_single(struct node *n, struct msg *m);
/** Reverse local and remote socket address.
*
* @see node_type::reverse
*/
int node_reverse(struct node *n);
int node_read(struct node *n, struct sample *smps[], unsigned cnt);
int node_write(struct node *n, struct sample *smps[], unsigned cnt);
#endif /** _NODE_H_ @} */

View file

@ -24,6 +24,7 @@
#include "node.h"
#include "msg.h"
#include "hooks.h"
#include "queue.h"
#include "pool.h"
/** The datastructure for a path.
@ -40,16 +41,18 @@ struct path
} state; /**< Path state */
struct node *in; /**< Pointer to the incoming node */
struct node *out; /**< Pointer to the first outgoing node ( path::out == list_first(path::destinations) */
struct queue queue; /**< A ring buffer for all received messages (unmodified) */
struct pool pool; /**< Memory pool for messages / samples. */
struct list destinations; /**< List of all outgoing nodes */
struct list hooks; /**< List of function pointers to hooks */
int values; /**< Maximum number of values per sample for this path. */
int queuelen; /**< Size of sample queue for this path. */
int enabled; /**< Is this path enabled */
int tfd; /**< Timer file descriptor for fixed rate sending */
double rate; /**< Send messages with a fixed rate over this path */
struct pool pool; /**< A circular buffer of past messages */
pthread_t recv_tid; /**< The thread id for this path */
pthread_t sent_tid; /**< A second thread id for fixed rate sending thread */
@ -67,25 +70,17 @@ struct path
struct hist gap_seq; /**< Histogram of sequence number displacement of received messages */
} hist;
struct {
struct timespec recv; /**< Last message received */
struct timespec sent; /**< Last message sent */
struct timespec last; /**< Previous message received (old value of path::ts__recv) */
} ts;
/* Statistics */
unsigned int sent; /**< Counter for sent messages to all outgoing nodes */
unsigned int received; /**< Counter for received messages from all incoming nodes */
unsigned int invalid; /**< Counter for invalid messages */
unsigned int skipped; /**< Counter for skipped messages due to hooks */
unsigned int dropped; /**< Counter for dropped messages due to reordering */
unsigned int overrun; /**< Counter of overruns for fixed-rate sending */
/* Statistic counters */
uintmax_t invalid; /**< Counter for invalid messages */
uintmax_t skipped; /**< Counter for skipped messages due to hooks */
uintmax_t dropped; /**< Counter for dropped messages due to reordering */
uintmax_t overrun; /**< Counter of overruns for fixed-rate sending */
/** @} */
};
/** Create a path by allocating dynamic memory. */
struct path * path_create(size_t poolsize, size_t values);
void path_init(struct path *p);
/** Destroy path by freeing dynamically allocated memory.
*
@ -93,6 +88,12 @@ struct path * path_create(size_t poolsize, size_t values);
*/
void path_destroy(struct path *p);
/** Initialize pool queue and hooks.
*
* Should be called after path_init() and before path_start().
*/
int path_prepare(struct path *p);
/** Start a path.
*
* Start a new pthread for receiving/sending messages over this path.

View file

@ -93,15 +93,12 @@ int config_parse_path(config_setting_t *cfg,
{
config_setting_t *cfg_out, *cfg_hook;
const char *in;
int reverse, poolsize, values;
int ret, reverse;
struct path *p;
/* Pool settings */
if (!config_setting_lookup_int(cfg, "poolsize", &poolsize))
poolsize = DEFAULT_POOLSIZE;
if (!config_setting_lookup_int(cfg, "values", &values))
values = DEFAULT_MSGVALUES;
struct path *p = path_create(poolsize, values);
/* Allocate memory and intialize path structure */
p = alloc(sizeof(struct path));
path_init(p);
/* Input node */
if (!config_setting_lookup_string(cfg, "in", &in))
@ -109,20 +106,35 @@ int config_parse_path(config_setting_t *cfg,
p->in = list_lookup(nodes, in);
if (!p->in)
error("Invalid input node '%s'", in);
cerror(cfg, "Invalid input node '%s'", in);
/* Output node(s) */
cfg_out = config_setting_get_member(cfg, "out");
if (cfg_out)
config_parse_nodelist(cfg_out, &p->destinations, nodes);
if (!cfg_out)
cerror(cfg, "Missing output nodes for path");
p->out = (struct node *) list_first(&p->destinations);
ret = config_parse_nodelist(cfg_out, &p->destinations, nodes);
if (ret <= 0)
cerror(cfg_out, "Invalid output nodes");
/* Check if nodes are suitable */
if (p->in->_vt->read == NULL)
cerror(cfg, "Input node '%s' is not supported as a source.", node_name(p->in));
list_foreach(struct node *n, &p->destinations) {
if (n->_vt->write == NULL)
cerror(cfg_out, "Output node '%s' is not supported as a destination.", node_name(n));
}
/* Optional settings */
cfg_hook = config_setting_get_member(cfg, "hook");
if (cfg_hook)
config_parse_hooklist(cfg_hook, &p->hooks);
if (!config_setting_lookup_int(cfg, "values", &p->values))
p->values = DEFAULT_VALUES;
if (!config_setting_lookup_int(cfg, "queuelen", &p->queuelen))
p->queuelen = DEFAULT_QUEUELEN;
if (!config_setting_lookup_bool(cfg, "reverse", &reverse))
reverse = 0;
if (!config_setting_lookup_bool(cfg, "enabled", &p->enabled))
@ -136,15 +148,14 @@ int config_parse_path(config_setting_t *cfg,
if (reverse) {
if (list_length(&p->destinations) > 1)
error("Can't reverse path with multiple destination nodes");
cerror(cfg, "Can't reverse path with multiple destination nodes");
struct path *r = path_create(poolsize, values);
struct path *r = memdup(p, sizeof(struct path));
path_init(r);
r->in = p->out; /* Swap in/out */
r->out = p->in;
r->rate = p->rate;
list_push(&r->destinations, r->out);
/* Swap source and destination node */
r->in = list_first(&p->destinations);
list_push(&r->destinations, p->in);
if (cfg_hook)
config_parse_hooklist(cfg_hook, &r->hooks);
@ -180,10 +191,12 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *list, struct list
str = config_setting_get_string(elm);
if (str) {
node = list_lookup(all, str);
if (node)
list_push(list, node);
else
if (!node)
cerror(elm, "Unknown outgoing node '%s'", str);
else if (node->_vt->write == NULL)
cerror(cfg, "Output node '%s' is not supported as a sink.", node_name(node));
list_push(list, node);
}
else
cerror(cfg, "Invalid outgoing node");
@ -194,7 +207,7 @@ int config_parse_nodelist(config_setting_t *cfg, struct list *list, struct list
cerror(cfg, "Invalid output node(s)");
}
return 0;
return list_length(list);
}
int config_parse_node(config_setting_t *cfg, struct list *nodes, struct settings *set)
@ -259,7 +272,7 @@ int config_parse_hooklist(config_setting_t *cfg, struct list *list) {
cerror(cfg, "Invalid hook functions");
}
return 0;
return list_length(list);
}
int config_parse_hook(config_setting_t *cfg, struct list *list)

View file

@ -8,79 +8,92 @@
#include <string.h>
#include "sample.h"
#include "node.h"
#include "cfg.h"
#include "utils.h"
struct list node_types = LIST_INIT(NULL); /**< Vtable for virtual node sub types */
/** List of registered node-types */
struct list node_types = LIST_INIT();
int node_parse(struct node *n, config_setting_t *cfg)
{
return n->_vt->parse ? n->_vt->parse(n, cfg) : 0;
}
int node_read(struct node *n, struct pool *p, int c)
int node_read(struct node *n, struct sample *smps[], unsigned cnt)
{
return n->_vt->read ? n->_vt->read(n, p, c) : -1;
}
int nread = 0;
int node_write(struct node *n, struct pool *p, int c)
{
return n->_vt->write ? n->_vt->write(n, p, c) : -1;
}
if (!n->_vt->read)
return -1;
int node_read_single(struct node *n, struct msg *m)
{
struct pool p = {
.buffer = m,
.previous = -1,
.last = 0,
.length = 1,
.stride = MSG_LEN(m->values)
};
/* Send in parts if vector not supported */
if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) {
while (cnt - nread > 0) {
nread += n->_vt->read(n, &smps[nread], MIN(cnt - nread, n->_vt->vectorize));
}
}
else {
nread = n->_vt->read(n, smps, cnt);
}
return node_read(n, &p, 1);
return nread;
}
int node_write_single(struct node *n, struct msg *m)
int node_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct pool p = {
.buffer = m,
.previous = -1,
.last = 0,
.length = 1,
.stride = MSG_LEN(m->values)
};
int nsent = 0;
if (!n->_vt->write)
return -1;
/* Send in parts if vector not supported */
if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) {
while (cnt - nsent > 0)
nsent += n->_vt->write(n, &smps[nsent], MIN(cnt - nsent, n->_vt->vectorize));
}
else {
nsent = n->_vt->write(n, smps, cnt);
}
return node_write(n, &p, 1);
return nsent;
}
int node_init(struct node_type *vt, int argc, char *argv[], config_setting_t *cfg)
{
if (list_length(&vt->instances) > 0) {
info("Initializing " YEL("%s") " node type", vt->name);
if (vt->init) { INDENT
vt->init(argc, argv, cfg);
}
}
else
warn("No node is using the " YEL("%s") " node type. Skipping...", vt->name);
int ret;
if (vt->state != NODE_TYPE_UNINITIALIZED)
return -1;
return 0;
info("Initializing " YEL("%s") " node type", vt->name);
{ INDENT
ret = vt->init ? vt->init(argc, argv, cfg) : -1;
}
if (ret == 0)
vt->state = NODE_TYPE_INITIALIZED;
return ret;
}
int node_deinit(struct node_type *vt)
{
if (list_length(&vt->instances) > 0) {
info("De-initializing " YEL("%s") " node type", vt->name);
int ret;
if (vt->state != NODE_TYPE_INITIALIZED)
return -1;
if (vt->deinit) { INDENT
vt->deinit();
}
info("De-initializing " YEL("%s") " node type", vt->name);
{ INDENT
ret = vt->deinit ? vt->deinit() : -1;
}
if (ret == 0)
vt->state = NODE_TYPE_UNINITIALIZED;
return 0;
return ret;
}
int node_start(struct node *n)

View file

@ -6,28 +6,64 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
*********************************************************************************/
#include <stdbool.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <inttypes.h>
#include "config.h"
#include "utils.h"
#include "path.h"
#include "timing.h"
#include "config.h"
#include "pool.h"
#include "queue.h"
static void path_write(struct path *p)
static void path_write(struct path *p, bool resend)
{
list_foreach(struct node *n, &p->destinations) {
int sent = node_write(
n, /* Destination node */
&p->pool, /* Pool of received messages */
n->vectorize /* Number of messages which should be sent */
);
int cnt = n->vectorize;
int sent, tosend, base, available, release, released;
struct sample *smps[n->vectorize];
debug(15, "Sent %u messages to node %s", sent, node_name(n));
p->sent += sent;
/* The first message in the chunk which we want to send */
if (resend)
base = p->in->received - cnt; /* we simply resent the last vector of samples */
else {
base = n->sent;
}
p->ts.sent = time_now(); /** @todo use hardware timestamps for socket node type */
available = queue_get_many(&p->queue, (void **) smps, cnt, base);
if (available < cnt)
warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt);
if (available == 0)
continue;
tosend = hook_run(p, smps, available, HOOK_WRITE);
if (tosend == 0)
continue;
sent = node_write(n, smps, tosend);
if (sent < 0)
error("Failed to sent %u samples to node %s", cnt, node_name(n));
else if (sent < tosend)
warn("Partial write to node %s", node_name(n));
debug(DBG_PATH | 15, "Sent %u messages to node %s", sent, node_name(n));
/* Release samples from queue in case they are not sent periodically. */
if (resend)
continue;
/* Decrement reference count and release samples back to pool if we had the last reference */
release = queue_pull_many(&p->queue, (void **) smps, sent, &n->sent);
if (release > 0)
debug(DBG_PATH | 3, "Releasing %u samples to pool for path %s", release, path_name(p));
released = pool_put_many(&p->pool, (void **) smps, release);
if (release != released)
warn("Failed to release %u samples to pool for path %s", release - released, path_name(p));
}
}
@ -47,13 +83,13 @@ static void * path_run_async(void *arg)
warn("Overrun detected for path: overruns=%" PRIu64, expir);
}
if (p->received == 0)
if (p->in->received == 0)
continue;
if (hook_run(p, HOOK_ASYNC))
if (hook_run(p, NULL, 0, HOOK_ASYNC))
continue;
path_write(p);
path_write(p, true);
}
return NULL;
@ -63,51 +99,66 @@ static void * path_run_async(void *arg)
static void * path_run(void *arg)
{
struct path *p = arg;
unsigned cnt = p->in->vectorize;
int recv, enqueue, enqueued;
int ready = 0; /**< Number of blocks in smps[] which are allocated and ready to be used by node_read(). */
struct sample *smps[cnt];
/* Main thread loop */
for (;;) {
/* Receive message */
int recv = node_read(p->in, &p->pool, p->in->vectorize);
struct node *out = (struct node *) list_first(&p->destinations);
debug(DBG_PATH | 5, "Current queue status for path %s: ready=%u write=%ju read[0]=%ju", path_name(p), ready, p->in->received, out->sent);
debug(DBG_PATH | 5, "Current pool status for path %s: used=%zu avail=%zu", path_name(p), p->pool.stack.size, p->pool.stack.avail);
/* Fill smps[] free sample blocks from the pool */
ready += pool_get_many(&p->pool, (void **) smps, cnt - ready);
if (ready != cnt)
warn("Pool underrun for path %s", path_name(p));
/* Read ready samples and store them to blocks pointed by smps[] */
recv = p->in->_vt->read(p->in, smps, ready);
if (recv < 0)
error("Failed to receive message from node %s", node_name(p->in));
else if (recv == 0)
continue;
else if (recv < ready)
warn("Partial read for path %s: read=%u expected=%u", path_name(p), recv, ready);
/** @todo Replace this timestamp by hardware timestamping for node type which support it. */
p->ts.last = p->ts.recv;
p->ts.recv = time_now();
debug(15, "Received %u messages from node %s", recv, node_name(p->in));
debug(DBG_PATH | 15, "Received %u messages from node %s", recv, node_name(p->in));
/* Run preprocessing hooks */
if (hook_run(p, HOOK_PRE)) {
p->skipped += recv;
continue;
/* Run preprocessing hooks for vector of samples */
enqueue = hook_run(p, smps, recv, HOOK_READ);
if (enqueue != recv) {
info("Hooks skipped %u out of %u samples for path %s", recv - enqueue, recv, path_name(p));
p->skipped += recv - enqueue;
}
/* For each received message... */
for (int i = 0; i < recv; i++) {
/* Update tail pointer of message pool by the amount of actually received messages. */
pool_push(&p->pool, 1);
p->received++;
enqueued = queue_push_many(&p->queue, (void **) smps, enqueue, &p->in->received);
if (enqueue != enqueued)
warn("Failed to enqueue %u samples for path %s", enqueue - enqueued, path_name(p));
/* Run hooks for filtering, stats collection and manipulation */
if (hook_run(p, HOOK_MSG)) {
p->skipped++;
continue;
ready -= enqueued;
list_foreach(struct hook *h, &p->hooks) {
int pull, release, released;
pull = p->in->received - h->head - h->history;
if (pull > 0) {
struct sample *smps[pull];
release = queue_pull_many(&p->queue, (void **) smps, pull, &h->head);
if (release > 0)
debug(DBG_PATH | 3, "Releasing %u samples from queue of path %s", release, path_name(p));
released = pool_put_many(&p->pool, (void **) smps, release);
if (release != released)
warn("Failed to release %u samples to pool of path %s", release - released, path_name(p));
}
}
/* Run post processing hooks */
if (hook_run(p, HOOK_POST)) {
p->skipped += recv;
continue;
}
debug(DBG_PATH | 3, "Enqueuing %u samples to queue of path %s", enqueue, path_name(p));
/* At fixed rate mode, messages are send by another (asynchronous) thread */
if (!p->rate)
path_write(p);
if (p->rate == 0)
path_write(p, false);
}
return NULL;
@ -115,13 +166,13 @@ static void * path_run(void *arg)
int path_start(struct path *p)
{
info("Starting path: %s (poollen=%zu, msgsize=%zu, #hooks=%zu, rate=%.1f)",
path_name(p), pool_length(&p->pool), pool_stride(&p->pool), list_length(&p->hooks), p->rate);
/* We sort the hooks according to their priority before starting the path */
list_sort(&p->hooks, hooks_sort_priority);
int ret;
if (hook_run(p, HOOK_PATH_START))
info("Starting path: %s (#hooks=%zu, rate=%.1f)",
path_name(p), list_length(&p->hooks), p->rate);
ret = hook_run(p, NULL, 0, HOOK_PATH_START);
if (ret)
return -1;
/* At fixed rate mode, we start another thread for sending */
@ -154,7 +205,7 @@ int path_stop(struct path *p)
p->state = PATH_STOPPED;
if (hook_run(p, HOOK_PATH_STOP))
if (hook_run(p, NULL, 0, HOOK_PATH_STOP))
return -1;
return 0;
@ -163,43 +214,77 @@ int path_stop(struct path *p)
const char * path_name(struct path *p)
{
if (!p->_name) {
strcatf(&p->_name, "%s " MAG("=>"), p->in->name);
strcatf(&p->_name, "%s " MAG("=>"), node_name_short(p->in));
list_foreach(struct node *n, &p->destinations) {
strcatf(&p->_name, " %s", n->name);
}
list_foreach(struct node *n, &p->destinations)
strcatf(&p->_name, " %s", node_name_short(n));
}
return p->_name;
}
struct path * path_create(size_t poolsize, size_t values)
void path_init(struct path *p)
{
struct path *p = alloc(sizeof(struct path));
list_init(&p->destinations, NULL);
list_init(&p->hooks, free);
pool_create(&p->pool, poolsize, 16 + values * sizeof(float)); /** @todo */
list_init(&p->destinations);
list_init(&p->hooks);
/* Initialize hook system */
list_foreach(struct hook *h, &hooks) {
if (h->type & HOOK_INTERNAL)
list_push(&p->hooks, memdup(h, sizeof(*h)));
}
p->state = PATH_CREATED;
return p;
p->state = PATH_CREATED;
}
int path_prepare(struct path *p)
{
int ret;
/* We sort the hooks according to their priority before starting the path */
list_sort(&p->hooks, hooks_sort_priority);
/* Allocate hook private memory */
ret = hook_run(p, NULL, 0, HOOK_INIT);
if (ret)
error("Failed to initialize hooks of path: %s", path_name(p));
/* Parse hook arguments */
ret = hook_run(p, NULL, 0, HOOK_PARSE);
if (ret)
error("Failed to parse arguments for hooks of path: %s", path_name(p));
/* Initialize queue */
ret = pool_init_mmap(&p->pool, SAMPLE_LEN(p->values), p->queuelen);
if (ret)
error("Failed to allocate memory pool for path");
ret = queue_init(&p->queue, p->queuelen);
if (ret)
error("Failed to initialize queue for path");
/* Add a head pointer for each hook to the queue */
list_foreach(struct hook *h, &p->hooks)
queue_reader_add(&p->queue, h->head, p->in->received);
/* Add a head pointer for each destination node to the queue. */
list_foreach(struct node *out, &p->destinations)
queue_reader_add(&p->queue, out->sent, p->in->received);
return 0;
}
void path_destroy(struct path *p)
{
list_destroy(&p->destinations);
list_destroy(&p->hooks);
{
hook_run(p, NULL, 0, HOOK_DEINIT); /* Release memory */
list_destroy(&p->destinations, NULL, false);
list_destroy(&p->hooks, NULL, true);
queue_destroy(&p->queue);
pool_destroy(&p->pool);
free(p->_name);
free(p);
}
int path_uses_node(struct path *p, struct node *n) {

View file

@ -11,6 +11,7 @@
*********************************************************************************/
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
@ -32,7 +33,6 @@ struct pool recv_pool, send_pool;
pthread_t recv_thread, send_thread;
struct node *node;
int reverse;
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
@ -59,7 +59,10 @@ static void usage(char *name)
printf("Usage: %s CONFIG [-r] NODE\n", name);
printf(" CONFIG path to a configuration file\n");
printf(" NODE the name of the node to which samples are sent and received from\n");
printf(" -r swap read / write endpoints)\n\n");
printf(" -d LVL set debug log level to LVL\n");
printf(" -x swap read / write endpoints\n");
printf(" -s only read data from stdin and send it to node\n");
printf(" -r only read data from node and write it to stdout\n\n");
print_copyright();
@ -67,13 +70,25 @@ static void usage(char *name)
}
void * send_loop(void *ctx)
{
{
int ret;
struct sample *smps[node->vectorize];
/* Initialize memory */
ret = pool_init_mmap(&send_pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize);
if (ret < 0)
error("Failed to allocate memory for receive pool.");
ret = pool_get_many(&send_pool, (void **) smps, node->vectorize);
if (ret < 0)
error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret);
for (;;) {
for (int i = 0; i < node->vectorize; i++) {
struct msg *m = pool_getrel(&send_pool, i);
struct sample *s = smps[i];
int reason;
retry: reason = msg_fscan(stdin, m, NULL, NULL);
retry: reason = sample_fscan(stdin, s, NULL);
if (reason < 0) {
if (feof(stdin))
return NULL;
@ -84,7 +99,7 @@ retry: reason = msg_fscan(stdin, m, NULL, NULL);
}
}
node_write(node, &send_pool, node->vectorize);
node_write(node, smps, node->vectorize);
}
return NULL;
@ -92,23 +107,27 @@ retry: reason = msg_fscan(stdin, m, NULL, NULL);
void * recv_loop(void *ctx)
{
int ret;
struct sample *smps[node->vectorize];
/* Initialize memory */
ret = pool_init_mmap(&recv_pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize);
if (ret < 0)
error("Failed to allocate memory for receive pool.");
ret = pool_get_many(&recv_pool, (void **) smps, node->vectorize);
if (ret < 0)
error("Failed to get %u samples out of receive pool (%d).", node->vectorize, ret);
/* Print header */
fprintf(stdout, "# %-20s\t\t%s\n", "sec.nsec+offset(seq)", "data[]");
fprintf(stdout, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]");
for (;;) {
struct timespec ts = time_now();
int recv = node_read(node, &recv_pool, node->vectorize);
int recv = node_read(node, smps, node->vectorize);
for (int i = 0; i < recv; i++) {
struct msg *m = pool_getrel(&recv_pool, i);
int ret = msg_verify(m);
if (ret)
warn("Failed to verify message: %d", ret);
/** @todo should we drop reordered / delayed packets here? */
struct sample *s = smps[i];
msg_fprint(stdout, m, MSG_PRINT_ALL, time_delta(&MSG_TS(m), &ts));
sample_fprint(stdout, s, SAMPLE_ALL);
fflush(stdout);
}
}
@ -118,17 +137,29 @@ void * recv_loop(void *ctx)
int main(int argc, char *argv[])
{
bool send = true, recv = true, reverse = false;
/* Parse command line arguments */
if (argc < 3)
usage(argv[0]);
log_init();
char c;
while ((c = getopt(argc-2, argv+2, "hr")) != -1) {
while ((c = getopt(argc-2, argv+2, "hxrsd:")) != -1) {
switch (c) {
case 'r':
reverse = 1;
case 'x':
reverse = true;
break;
case 's':
recv = false;
break;
case 'r':
send = false;
break;
case 'd':
log_setlevel(atoi(optarg), -1);
break;
case 'h':
case '?':
usage(argv[0]);
@ -151,7 +182,6 @@ int main(int argc, char *argv[])
/* Create lists */
list_init(&nodes);
log_init();
config_init(&config);
config_parse(argv[1], &config, &settings, &nodes, NULL);
@ -159,22 +189,22 @@ int main(int argc, char *argv[])
node = list_lookup(&nodes, argv[2]);
if (!node)
error("Node '%s' does not exist!", argv[2]);
node_init(node->_vt, argc-optind, argv+optind, config_root_setting(&config));
pool_create(&recv_pool, node->vectorize, sizeof(struct msg));
pool_create(&send_pool, node->vectorize, sizeof(struct msg));
if (reverse)
node_reverse(node);
node_init(node->_vt, argc-optind, argv+optind, config_root_setting(&config));
node_start(node);
/* Start threads */
pthread_create(&recv_thread, NULL, recv_loop, NULL);
pthread_create(&send_thread, NULL, send_loop, NULL);
if (recv)
pthread_create(&recv_thread, NULL, recv_loop, NULL);
if (send)
pthread_create(&send_thread, NULL, send_loop, NULL);
for (;;) pause();
for (;;)
pause();
return 0;
}

View file

@ -120,6 +120,11 @@ static void usage(const char *name)
list_foreach(struct node_type *vt, &node_types)
printf(" - %s: %s\n", vt->name, vt->description);
printf("\n");
printf("Supported hooks:\n");
list_foreach(struct hook *h, &hooks)
printf(" - %s: %s\n", h->name, h->description);
printf("\n");
print_copyright();
@ -167,7 +172,9 @@ int main(int argc, char *argv[])
info("Initialize node types");
list_foreach(struct node_type *vt, &node_types) { INDENT
node_init(vt, argc, argv, config_root_setting(&config));
int refs = list_length(&vt->instances);
if (refs > 0)
node_init(vt, argc, argv, config_root_setting(&config));
}
info("Starting nodes");
@ -181,8 +188,10 @@ int main(int argc, char *argv[])
info("Starting paths");
list_foreach(struct path *p, &paths) { INDENT
if (p->enabled)
if (p->enabled) {
path_prepare(p);
path_start(p);
}
else
warn("Path %s is disabled. Skipping...", path_name(p));
}
@ -193,7 +202,7 @@ int main(int argc, char *argv[])
for (;;) {
list_foreach(struct path *p, &paths)
hook_run(p, HOOK_PERIODIC);
hook_run(p, NULL, 0, HOOK_PERIODIC);
usleep(settings.stats * 1e6);
}
}

View file

@ -16,7 +16,7 @@
#include "config.h"
#include "utils.h"
#include "msg.h"
#include "sample.h"
#include "timing.h"
#define CLOCKID CLOCK_REALTIME
@ -109,10 +109,10 @@ check: if (optarg == endptr)
}
/* Allocate memory for message buffer */
struct msg *m = msg_create(values);
struct sample *s = alloc(SAMPLE_LEN(values));
/* Print header */
printf("# S2SS signal params: type=%s, values=%u, rate=%f, limit=%u, amplitude=%f, freq=%f\n",
printf("# S2SS signal params: type=%s, values=%u, rate=%f, limit=%d, amplitude=%f, freq=%f\n",
argv[1], values, rate, limit, ampl, freq);
printf("# %-20s\t\t%s\n", "sec.nsec(seq)", "data[]");
@ -128,30 +128,35 @@ check: if (optarg == endptr)
struct timespec now = time_now();
double running = time_delta(&start, &now);
m->ts.sec = now.tv_sec;
m->ts.nsec = now.tv_nsec;
m->sequence = counter;
s->ts.origin = now;
s->sequence = counter;
s->length = values;
for (int i = 0; i < m->values; i++) {
for (int i = 0; i < values; i++) {
int rtype = (type != TYPE_MIXED) ? type : i % 4;
switch (rtype) {
case TYPE_RANDOM: m->data[i].f += box_muller(0, stddev); break;
case TYPE_SINE: m->data[i].f = ampl * sin(running * freq * 2 * M_PI); break;
case TYPE_TRIANGLE: m->data[i].f = ampl * (fabs(fmod(running * freq, 1) - .5) - 0.25) * 4; break;
case TYPE_SQUARE: m->data[i].f = ampl * ( (fmod(running * freq, 1) < .5) ? -1 : 1); break;
case TYPE_RAMP: m->data[i].f = fmod(counter, rate / freq); /** @todo send as integer? */ break;
case TYPE_RANDOM: s->values[i].f += box_muller(0, stddev); break;
case TYPE_SINE: s->values[i].f = ampl * sin(running * freq * 2 * M_PI); break;
case TYPE_TRIANGLE: s->values[i].f = ampl * (fabs(fmod(running * freq, 1) - .5) - 0.25) * 4; break;
case TYPE_SQUARE: s->values[i].f = ampl * ( (fmod(running * freq, 1) < .5) ? -1 : 1); break;
case TYPE_RAMP: s->values[i].f = fmod(counter, rate / freq); /** @todo send as integer? */ break;
}
}
msg_fprint(stdout, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0);
sample_fprint(stdout, s, SAMPLE_ALL & ~SAMPLE_OFFSET);
fflush(stdout);
/* Block until 1/p->rate seconds elapsed */
counter += timerfd_wait(tfd);
int steps = timerfd_wait(tfd);
if (steps > 1)
warn("Missed steps: %u", steps);
counter += steps;
}
close(tfd);
free(m);
free(s);
return 0;
}

View file

@ -140,45 +140,44 @@ check: if (optarg == endptr)
node_stop(node);
node_deinit(node->_vt);
list_destroy(&nodes, node_destroy, false);
list_destroy(&nodes, (dtor_cb_t) node_destroy, false);
config_destroy(&config);
return 0;
}
void test_rtt() {
struct timespec sent, recv;
struct hist hist;
struct msg *m;
struct timespec send, recv;
m = msg_create(0);
struct sample *smp_send = alloc(SAMPLE_LEN(2));
struct sample *smp_recv = alloc(SAMPLE_LEN(2));
hist_create(&hist, low, high, res);
/* Print header */
fprintf(stdout, "%17s%5s%10s%10s%10s%10s%10s\n", "timestamp", "seq", "rtt", "min", "max", "mean", "stddev");
while (running && (count < 0 || count--)) {
clock_gettime(CLOCK_ID, &sent);
m->ts.sec = sent.tv_sec;
m->ts.nsec = sent.tv_nsec;
clock_gettime(CLOCK_ID, &send);
node_write_single(node, m); /* Ping */
node_read_single(node, m); /* Pong */
node_write(node, &smp_send, 1); /* Ping */
node_read(node, &smp_recv, 1); /* Pong */
clock_gettime(CLOCK_ID, &recv);
double rtt = time_delta(&recv, &sent);
double rtt = time_delta(&recv, &send);
if (rtt < 0)
warn("Negative RTT: %f", rtt);
hist_put(&hist, rtt);
m->sequence++;
smp_send->sequence++;
fprintf(stdout, "%10lu.%06lu%5u%10.3f%10.3f%10.3f%10.3f%10.3f\n",
recv.tv_sec, recv.tv_nsec / 1000, m->sequence,
recv.tv_sec, recv.tv_nsec / 1000, smp_send->sequence,
1e3 * rtt, 1e3 * hist.lowest, 1e3 * hist.highest,
1e3 * hist_mean(&hist), 1e3 * hist_stddev(&hist));
}