1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

finally introduced new data structure for circular buffering of messages

This commit is contained in:
Steffen Vogel 2016-01-14 22:59:57 +01:00
parent 119d8b0846
commit b4f787b2c0
28 changed files with 439 additions and 317 deletions

View file

@ -5,7 +5,7 @@ TARGETS = server pipe signal test
LIBS = libs2ss.so
# Object files for libs2ss
LIB_OBJS = msg.o node.o checks.o list.o log.o utils.o cfg.o path.o hooks.o hist.o timing.o
LIB_OBJS = msg.o node.o checks.o list.o log.o utils.o cfg.o path.o hooks.o hist.o timing.o pool.o
# Source directories
VPATH = src lib

View file

@ -20,8 +20,8 @@
/** The version number of the s2ss server */
#define VERSION "v0.5-" _GIT_REV
/** Maximum number of float values in a message */
#define MAX_VALUES 64
/** Default number of values in a message */
#define DEFAULT_MSGVALUES 64
/** Maximum number of messages in the circular history buffer */
#define DEFAULT_POOLSIZE 32

View file

@ -69,9 +69,9 @@ int file_open(struct node *n);
int file_close(struct node *n);
/** @see node_vtable::read */
int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int file_read(struct node *n, struct pool *pool, int cnt);
/** @see node_vtable::write */
int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int file_write(struct node *n, struct pool *pool, int cnt);
#endif /** _FILE_H_ @} */

View file

@ -67,9 +67,9 @@ int gtfpga_open(struct node *n);
int gtfpga_close(struct node *n);
/** @see node_vtable::read */
int gtfpga_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int gtfpga_read(struct node *n, struct pool *pool, int cnt);
/** @see node_vtable::write */
int gtfpga_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int gtfpga_write(struct node *n, struct pool *pool, int cnt);
#endif /** _GTFPGA_H_ @} */

View file

@ -26,6 +26,12 @@ enum msg_flags {
MSG_PRINT_ALL = 0xFF
};
/** Allocate and initialize memory of a sinle message. */
struct msg * msg_create(size_t values);
/** Release memory allocated by msg_create(). */
void msg_destroy(struct msg *m);
/** Swaps message contents byte-order.
*
* Message can either be transmitted in little or big endian

View file

@ -19,11 +19,6 @@
#include <lwip/arch.h>
#endif
#include "config.h"
/** Maximum number of dword values in a message */
#define MSG_VALUES MAX_VALUES
/** The current version number for the message format */
#define MSG_VERSION 1
@ -31,7 +26,6 @@
#define MSG_TYPE_DATA 0 /**< Message contains float values */
#define MSG_TYPE_START 1 /**< Message marks the beginning of a new simulation case */
#define MSG_TYPE_STOP 2 /**< Message marks the end of a simulation case */
#define MSG_TYPE_EMPTY 3 /**< Message does not carry useful data */
#define MSG_ENDIAN_LITTLE 0 /**< Message values are in little endian format (float too!) */
#define MSG_ENDIAN_BIG 1 /**< Message values are in bit endian format */
@ -45,7 +39,7 @@
#endif
/** The total size in bytes of a message */
#define MSG_LEN(msg) (4 * ((msg)->length + 4))
#define MSG_LEN(values) (4 * (values) + 16 /* header */)
/** The timestamp of a message in struct timespec format */
#define MSG_TS(msg) (struct timespec) { \
@ -53,16 +47,6 @@
.tv_nsec = (msg)->ts.nsec \
}
/** Initialize a message */
#define MSG_INIT(i) { \
.version = MSG_VERSION, \
.type = MSG_TYPE_DATA, \
.endian = MSG_ENDIAN_HOST, \
.length = i, \
.sequence = 0, \
.rsvd1 = 0, .rsvd2 = 0 \
}
/** This message format is used by all clients
*
* @diafile msg_format.dia
@ -82,7 +66,7 @@ struct msg
#endif
unsigned rsvd2 : 8; /**< Reserved bits */
uint16_t length; /**< The number of values in msg::data[]. Endianess is specified in msg::endian. */
uint16_t values; /**< The number of values in msg::data[]. Endianess is specified in msg::endian. */
uint32_t sequence; /**< The sequence number is incremented by one for consecutive messages. Endianess is specified in msg::endian. */
/** A timestamp per message. Endianess is specified in msg::endian. */
@ -95,7 +79,7 @@ struct msg
union {
float f; /**< Floating point values (note msg::endian) */
uint32_t i; /**< Integer values (note msg::endian) */
} data[MSG_VALUES];
} data[];
} __attribute__((aligned(64), packed));
#endif /* _MSG_FORMAT_H_ */

View file

@ -84,9 +84,9 @@ int ngsi_open(struct node *n);
int ngsi_close(struct node *n);
/** @see node_vtable::read */
int ngsi_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int ngsi_read(struct node *n, struct pool *pool, int cnt);
/** @see node_vtable::write */
int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int ngsi_write(struct node *n, struct pool *pool, int cnt);
#endif /** _NGSI_H_ @} */

View file

@ -25,6 +25,7 @@
#include "msg.h"
#include "list.h"
#include "pool.h"
/* Helper macros for virtual node type */
#define REGISTER_NODE_TYPE(vt) \
@ -120,13 +121,11 @@ struct node_type {
* Some node types might only support to receive one message at a time.
*
* @param n A pointer to the node object.
* @param pool A pointer to an array of messages which should be sent.
* @param poolsize The length of the message array.
* @param first The index of the first message which should be sent.
* @param pool A pointer to circular buffer containing the messages.
* @param cnt The number of messages which should be sent.
* @return The number of messages actually received.
*/
int (*read) (struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int (*read) (struct node *n, struct pool *pool, int cnt);
/** Send multiple messages in a single datagram / packet.
*
@ -136,13 +135,11 @@ struct node_type {
* So the indexes will wrap around after len.
*
* @param n A pointer to the node object.
* @param pool A pointer to an array of messages which should be sent.
* @param poolsize The length of the message array.
* @param first The index of the first message which should be sent.
* @param pool A pointer to circular buffer containing the messages.
* @param cnt The number of messages which should be sent.
* @return The number of messages actually sent.
*/
int (*write)(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int (*write)(struct node *n, struct pool *pool, int cnt);
/** Reverse source and destination of a node.
*
@ -249,13 +246,19 @@ const char * node_name_type(struct node *n);
*
* @see node_type::read
*/
int node_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int node_read(struct node *n, struct pool *pool, int cnt);
/** Send multiple messages in a single datagram / packet.
*
* @see node_type::write
*/
int node_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int node_write(struct node *n, struct pool *pool, int cnt);
/** Read a single message without using a message pool. */
int node_read_single(struct node *n, struct msg *m);
/** Send a single message without using a message pool. */
int node_write_single(struct node *n, struct msg *m);
/** Reverse local and remote socket address.
*

View file

@ -70,9 +70,9 @@ int opal_open(struct node *n);
int opal_close(struct node *n);
/** @see node_vtable::read */
int opal_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int opal_read(struct node *n, struct pool *pool, int cnt);
/** @see node_vtable::write */
int opal_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int opal_write(struct node *n, struct pool *pool, int cnt);
#endif /** _OPAL_H_ @} */

View file

@ -24,6 +24,7 @@
#include "node.h"
#include "msg.h"
#include "hooks.h"
#include "pool.h"
/** The datastructure for a path.
*
@ -48,13 +49,7 @@ struct path
int tfd; /**< Timer file descriptor for fixed rate sending */
double rate; /**< Send messages with a fixed rate over this path */
int msgsize; /**< Maximum number of values per message for this path */
int poolsize; /**< Size of the history buffer in number of messages */
struct msg *pool; /**< A circular buffer of past messages */
struct msg *current; /**< A pointer to the last received message */
struct msg *previous; /**< A pointer to the previously received message */
struct pool pool; /**< A circular buffer of past messages */
pthread_t recv_tid; /**< The thread id for this path */
pthread_t sent_tid; /**< A second thread id for fixed rate sending thread */
@ -89,7 +84,7 @@ struct path
};
/** Create a path by allocating dynamic memory. */
struct path * path_create();
struct path * path_create(size_t poolsize, size_t values);
/** Destroy path by freeing dynamically allocated memory.
*

72
include/pool.h Normal file
View file

@ -0,0 +1,72 @@
/** Circular buffer
*
* Every path uses a circular buffer to save past messages.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC
* This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*/
#ifndef _POOL_H_
#define _POOL_H_
#define pool_start(pool) pool_get(pool, 0)
#define pool_end(pool) pool_get(pool, pool->lenght)
/** Return pointer to last element which has been inserted. */
#define pool_current(pool) pool_getrel(pool, 0)
/** Return pointer to the element before the previously added one. */
#define pool_previous(pool) pool_getrel(pool, -1)
/** Iterate through the circuluar buffer. */
#define pool_foreach(it, pool, start, end) for (it = pool_get(pool, start); \
it != pool_get(pool, end); \
it = pool_get_next(pool, it))
/** Return the number of elements in the pool. */
#define pool_length(pool) ((pool)->length)
/** Return the stride between two consecutive elemements in the pool. */
#define pool_stride(pool) ((pool)->stride)
/** The data structure for a message pool.
*
* A message pool is a circular buffer used to store messages (samples)
*/
struct pool {
void *buffer; /**< Heap allocated memory of the circular buffer. */
int last; /**< Index of the message which has been added last. */
int previous; /**< Previous value of pool::last. */
size_t length; /**< Number of messages in the circular buffer */
size_t stride; /**< Size per block in bytes. */
};
/** Initiliaze a new pool and allocate memory. */
void pool_create(struct pool *p, size_t len, size_t blocklen);
/** Release memory of pool. */
void pool_destroy(struct pool *p);
/** Advance the internal pointer of the pool. */
void pool_push(struct pool *p, int blocks);
/** Return pointer to pool element. */
void * pool_get(struct pool *p, int index);
/** Return pointer relative to last inserted element. */
void * pool_getrel(struct pool *p, int offset);
/** Return next element after ptr */
void * pool_get_next(struct pool *p, void *ptr);
/** Return element before ptr */
void * pool_get_previous(struct pool *p, void *ptr);
#endif /* _POOL_H_ */

View file

@ -72,10 +72,10 @@ int socket_open(struct node *n);
int socket_close(struct node *n);
/** @see node_vtable::write */
int socket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int socket_write(struct node *n, struct pool *pool, int cnt);
/** @see node_vtable::read */
int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int socket_read(struct node *n, struct pool *pool, int cnt);
/** @see node_vtable::parse */
int socket_parse(struct node *n, config_setting_t *cfg);

View file

@ -29,7 +29,8 @@ struct websocket {
struct {
pthread_cond_t cond;
pthread_mutex_t mutex;
struct msg *m;
struct pool *pool;
size_t cnt;
} read, write;
struct list connections; /**< List of struct libwebsockets sockets */
@ -48,9 +49,9 @@ int websocket_open(struct node *n);
int websocket_close(struct node *n);
/** @see node_vtable::read */
int websocket_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int websocket_read(struct node *n, struct pool *pool, int cnt);
/** @see node_vtable::write */
int websocket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt);
int websocket_write(struct node *n, struct pool *pool, int cnt);
#endif /* _WEBSOCKET_H_ */

View file

@ -86,9 +86,15 @@ int config_parse_path(config_setting_t *cfg,
{
config_setting_t *cfg_out, *cfg_hook;
const char *in;
int reverse;
int reverse, poolsize, values;
/* Pool settings */
if (!config_setting_lookup_int(cfg, "poolsize", &poolsize))
poolsize = DEFAULT_POOLSIZE;
if (!config_setting_lookup_int(cfg, "values", &values))
values = DEFAULT_MSGVALUES;
struct path *p = path_create();
struct path *p = path_create(poolsize, values);
/* Input node */
if (!config_setting_lookup_string(cfg, "in", &in))
@ -113,16 +119,12 @@ int config_parse_path(config_setting_t *cfg,
/* Initialize hooks and their private data / parameters */
path_run_hook(p, HOOK_INIT);
if (!config_setting_lookup_bool(cfg, "enabled", &p->enabled))
p->enabled = 1;
if (!config_setting_lookup_bool(cfg, "reverse", &reverse))
reverse = 0;
if (!config_setting_lookup_bool(cfg, "enabled", &p->enabled))
p->enabled = 1;
if (!config_setting_lookup_float(cfg, "rate", &p->rate))
p->rate = 0; /* disabled */
if (!config_setting_lookup_int(cfg, "poolsize", &p->poolsize))
p->poolsize = DEFAULT_POOLSIZE;
if (!config_setting_lookup_int(cfg, "msgsize", &p->msgsize))
p->msgsize = MAX_VALUES;
p->cfg = cfg;
@ -132,11 +134,11 @@ int config_parse_path(config_setting_t *cfg,
if (list_length(&p->destinations) > 1)
error("Can't reverse path with multiple destination nodes");
struct path *r = path_create();
struct path *r = path_create(poolsize, values);
/* Swap in/out */
r->in = p->out;
r->in = p->out; /* Swap in/out */
r->out = p->in;
r->rate = p->rate;
list_push(&r->destinations, r->out);
@ -145,10 +147,6 @@ int config_parse_path(config_setting_t *cfg,
/* Initialize hooks and their private data / parameters */
path_run_hook(r, HOOK_INIT);
r->rate = p->rate;
r->poolsize = p->poolsize;
r->msgsize = p->msgsize;
list_push(paths, r);
}

View file

@ -15,6 +15,7 @@
#include "file.h"
#include "utils.h"
#include "timing.h"
#include "pool.h"
int file_reverse(struct node *n)
{
@ -273,14 +274,14 @@ int file_close(struct node *n)
return 0;
}
int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int file_read(struct node *n, struct pool *pool, int cnt)
{
struct file *f = n->_vd;
int values, flags, i = 0;
if (f->read.handle) {
for (i = 0; i < cnt; i++) {
struct msg *cur = &pool[(first+i) % poolsize];
struct msg *cur = pool_getrel(pool, i);
/* Get message and timestamp */
retry: values = msg_fscan(f->read.handle, cur, &flags, NULL);
@ -335,13 +336,15 @@ retry: values = msg_fscan(f->read.handle, cur, &flags, NULL);
return i;
}
int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int file_write(struct node *n, struct pool *pool, int cnt)
{
int i = 0;
struct file *f = n->_vd;
if (f->write.handle) {
for (i = 0; i < cnt; i++) {
struct msg *m = pool_getrel(pool, i);
/* Split file if requested */
if ((f->write.split > 0) && ftell(f->write.handle) > f->write.split * (1 << 20)) {
f->write.chunk++;
@ -350,7 +353,6 @@ int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cn
info("Splitted output node %s: chunk=%u", node_name(n), f->write.chunk);
}
struct msg *m = &pool[(first+i) % poolsize];
msg_fprint(f->write.handle, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0);
}
fflush(f->write.handle);

View file

@ -234,11 +234,11 @@ int gtfpga_close(struct node *n)
}
/** @todo implement */
int gtfpga_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int gtfpga_read(struct node *n, struct pool *pool, int cnt)
{
struct gtfpga *g = n->_vd;
//struct msg *m = pool_getrel(pool, 0);
struct msg *m = &pool[first % poolsize];
if (cnt != 1)
error("The GTFPGA node type does not support combining!");
@ -254,10 +254,10 @@ int gtfpga_read(struct node *n, struct msg *pool, int poolsize, int first, int c
}
/** @todo implement */
int gtfpga_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int gtfpga_write(struct node *n, struct pool *pool, int cnt)
{
// struct gtfpga *g = n->_vd;
// struct msg *m = &pool[first % poolsize];
// struct msg *m = pool_getrel(pool, 0);
if (cnt != 1)
error("The GTFPGA node type does not support combining!");

View file

@ -25,10 +25,17 @@
struct list hooks;
int hooks_sort_priority(const void *a, const void *b) {
struct hook *ha = (struct hook *) a;
struct hook *hb = (struct hook *) b;
return ha->priority - hb->priority;
}
REGISTER_HOOK("print", 99, hook_print, HOOK_MSG)
int hook_print(struct path *p, struct hook *h, int when)
{
struct msg *m = p->current;
struct msg *m = pool_current(&p->pool);
double offset = time_delta(&MSG_TS(m), &p->ts.recv);
int flags = MSG_PRINT_ALL;
@ -44,7 +51,7 @@ int hook_print(struct path *p, struct hook *h, int when)
REGISTER_HOOK("ts", 99, hook_ts, HOOK_MSG)
int hook_ts(struct path *p, struct hook *h, int when)
{
struct msg *m = p->current;
struct msg *m = pool_current(&p->pool);
m->ts.sec = p->ts.recv.tv_sec;
m->ts.nsec = p->ts.recv.tv_nsec;
@ -55,7 +62,7 @@ int hook_ts(struct path *p, struct hook *h, int when)
REGISTER_HOOK("fix_ts", 0, hook_fix_ts, HOOK_INTERNAL | HOOK_MSG)
int hook_fix_ts(struct path *p, struct hook *h, int when)
{
struct msg *m = p->current;
struct msg *m = pool_current(&p->pool);
if ((m->ts.sec == 0 && m->ts.nsec == 0) ||
(m->ts.sec == -1 && m->ts.nsec == -1))
@ -70,32 +77,32 @@ int hook_skip_unchanged(struct path *p, struct hook *h, int when)
struct private {
double threshold;
struct msg previous;
} *private = h->private;
} *x = h->private;
switch (when) {
case HOOK_INIT:
private = h->private = alloc(sizeof(struct private));
x = h->private = alloc(sizeof(struct private));
if (!h->parameter)
error("Missing parameter for hook 'deduplication'");
private->threshold = strtof(h->parameter, NULL);
if (!private->threshold)
x->threshold = strtof(h->parameter, NULL);
if (!x->threshold)
error("Failed to parse parameter '%s' for hook 'deduplication'", h->parameter);
break;
case HOOK_DEINIT:
free(private);
free(x);
break;
case HOOK_ASYNC: {
int ret = 0;
struct msg *prev = &private->previous;
struct msg *cur = p->current;
struct msg *prev = &x->previous;
struct msg *cur = pool_current(&p->pool);
for (int i = 0; i < MIN(cur->length, prev->length); i++) {
if (fabs(cur->data[i].f - prev->data[i].f) > private->threshold)
for (int i = 0; i < MIN(cur->values, prev->values); i++) {
if (fabs(cur->data[i].f - prev->data[i].f) > x->threshold)
goto out;
}
@ -115,32 +122,32 @@ int hook_convert(struct path *p, struct hook *h, int when)
{
struct private {
enum { TO_FIXED, TO_FLOAT } mode;
} *private = h->private;
} *x = h->private;
switch (when) {
case HOOK_INIT:
private = h->private = alloc(sizeof(struct private));
x = h->private = alloc(sizeof(struct private));
if (!h->parameter)
error("Missing parameter for hook 'deduplication'");
if (!strcmp(h->parameter, "fixed"))
private->mode = TO_FIXED;
x->mode = TO_FIXED;
else if (!strcmp(h->parameter, "float"))
private->mode = TO_FLOAT;
x->mode = TO_FLOAT;
else
error("Invalid parameter '%s' for hook 'convert'", h->parameter);
break;
case HOOK_DEINIT:
free(private);
free(x);
break;
case HOOK_MSG: {
struct msg *m = p->current;
struct msg *m = pool_current(&p->pool);
for (int i = 0; i < m->length; i++) {
switch (private->mode) {
for (int i = 0; i < m->values; i++) {
switch (x->mode) {
/** @todo allow precission to be configured via parameter */
case TO_FIXED: m->data[i].i = m->data[i].f * 1e3; break;
case TO_FLOAT: m->data[i].f = m->data[i].i; break;
@ -161,45 +168,56 @@ int hook_fir(struct path *p, struct hook *h, int when)
char *end;
struct private {
double *coeffs;
double *history;
struct pool coeffs;
struct pool history;
int index;
} *private = h->private;
} *x = h->private;
switch (when) {
case HOOK_INIT:
if (!h->parameter)
error("Missing parameter for hook 'fir'");
private = h->private = alloc(sizeof(struct private));
x = h->private = alloc(sizeof(struct private));
private->coeffs = memdup(coeffs, sizeof(coeffs));
private->history = alloc(sizeof(coeffs));
pool_create(&x->coeffs, ARRAY_LEN(coeffs), sizeof(double));
pool_create(&x->history, ARRAY_LEN(coeffs), sizeof(double));
private->index = strtol(h->parameter, &end, 10);
/** Fill with static coefficients */
memcpy(x->coeffs.buffer, coeffs, sizeof(coeffs));
x->index = strtol(h->parameter, &end, 10);
if (h->parameter == end)
error("Invalid parameter '%s' for hook 'fir'", h->parameter);
break;
case HOOK_DEINIT:
free(private->coeffs);
free(private->history);
free(private);
pool_destroy(&x->coeffs);
pool_destroy(&x->history);
free(x);
break;
case HOOK_MSG: {
/* Current value of interest */
float *cur = &p->current->data[private->index].f;
struct msg *m = pool_current(&p->pool);
float *value = &m->data[x->index].f;
double *history = pool_current(&x->history);
/* Save last sample, unfiltered */
private->history[p->received % p->poolsize] = *cur;
*history = *value;
/* Reset accumulator */
*cur = 0;
*value = 0;
/* FIR loop */
for (int i = 0; i < MIN(ARRAY_LEN(coeffs), p->poolsize); i++)
*cur += private->coeffs[i] * private->history[p->received+p->poolsize-i];
for (int i = 0; i < pool_length(&x->coeffs); i++) {
double *coeff = pool_get(&x->coeffs, i);
double *hist = pool_getrel(&x->history, -i);
*value += *coeff * *hist;
}
break;
}
}
@ -212,26 +230,26 @@ int hook_decimate(struct path *p, struct hook *h, int when)
{
struct private {
long ratio;
} *private = h->private;
} *x = h->private;
switch (when) {
case HOOK_INIT:
if (!h->parameter)
error("Missing parameter for hook 'decimate'");
private = h->private = alloc(sizeof(struct private));
x = h->private = alloc(sizeof(struct private));
private->ratio = strtol(h->parameter, NULL, 10);
if (!private->ratio)
x->ratio = strtol(h->parameter, NULL, 10);
if (!x->ratio)
error("Invalid parameter '%s' for hook 'decimate'", h->parameter);
break;
case HOOK_DEINIT:
free(private);
free(x);
break;
case HOOK_POST:
return p->received % private->ratio;
return p->received % x->ratio;
}
return 0;
@ -243,35 +261,35 @@ int hook_skip_first(struct path *p, struct hook *h, int when)
struct private {
double wait; /**< Number of seconds to wait until first message is not skipped */
struct timespec started; /**< Timestamp of last simulation restart */
} *private = h->private;
} *x = h->private;
switch (when) {
case HOOK_INIT:
if (!h->parameter)
error("Missing parameter for hook 'skip_first'");
private = h->private = alloc(sizeof(struct private));
x = h->private = alloc(sizeof(struct private));
private->wait = strtof(h->parameter, NULL);
if (!private->wait)
x->wait = strtof(h->parameter, NULL);
if (!x->wait)
error("Invalid parameter '%s' for hook 'skip_first'", h->parameter);
break;
case HOOK_DEINIT:
free(private);
free(x);
break;
case HOOK_PATH_RESTART:
private->started = p->ts.recv;
x->started = p->ts.recv;
break;
case HOOK_PATH_START:
private->started = time_now();
x->started = time_now();
break;
case HOOK_POST: {
double delta = time_delta(&private->started, &p->ts.recv);
return delta < private->wait
double delta = time_delta(&x->started, &p->ts.recv);
return delta < x->wait
? -1 /* skip */
: 0; /* send */
}
@ -283,10 +301,13 @@ int hook_skip_first(struct path *p, struct hook *h, int when)
REGISTER_HOOK("restart", 1, hook_restart, HOOK_INTERNAL | HOOK_MSG)
int hook_restart(struct path *p, struct hook *h, int when)
{
if (p->current->sequence == 0 &&
p->previous->sequence <= UINT32_MAX - 32) {
struct msg *cur = pool_current(&p->pool);
struct msg *prev = pool_previous(&p->pool);
if (cur->sequence == 0 &&
prev->sequence <= UINT32_MAX - 32) {
warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u)",
path_name(p), p->previous->sequence, p->current->sequence);
path_name(p), prev->sequence, cur->sequence);
p->sent =
p->invalid =
@ -304,7 +325,9 @@ int hook_restart(struct path *p, struct hook *h, int when)
REGISTER_HOOK("verify", 2, hook_verify, HOOK_INTERNAL | HOOK_MSG)
int hook_verify(struct path *p, struct hook *h, int when)
{
int reason = msg_verify(p->current);
struct msg *cur = pool_current(&p->pool);
int reason = msg_verify(cur);
if (reason) {
p->invalid++;
warn("Received invalid message (reason = %d)", reason);
@ -317,7 +340,10 @@ int hook_verify(struct path *p, struct hook *h, int when)
REGISTER_HOOK("drop", 3, hook_drop, HOOK_INTERNAL | HOOK_MSG)
int hook_drop(struct path *p, struct hook *h, int when)
{
int dist = p->current->sequence - (int32_t) p->previous->sequence;
struct msg *cur = pool_current(&p->pool);
struct msg *prev = pool_previous(&p->pool);
int dist = cur->sequence - (int32_t) prev->sequence;
if (dist <= 0 && p->received > 1) {
p->dropped++;
return -1;
@ -354,7 +380,8 @@ int hook_stats(struct path *p, struct hook *h, int when)
}
case HOOK_MSG: {
struct msg *prev = p->previous, *cur = p->current;
struct msg *cur = pool_current(&p->pool);
struct msg *prev = pool_previous(&p->pool);
/* Exclude first message from statistics */
if (p->received > 0) {
@ -384,11 +411,14 @@ int hook_stats(struct path *p, struct hook *h, int when)
break;
case HOOK_PERIODIC: {
if (p->received > 1)
if (p->received > 1) {
struct msg *cur = pool_current(&p->pool);
stats("%-40.40s|%10.2g|%10.2f|%10u|%10u|%10u|%10u|%10u|%10u|%10u|", path_name(p),
p->hist.owd.last, 1 / p->hist.gap_msg.last,
p->sent, p->received, p->dropped, p->skipped, p->invalid, p->overrun, list_length(p->current)
p->sent, p->received, p->dropped, p->skipped, p->invalid, p->overrun, cur->values
);
}
else
stats("%-40.40s|%10s|%10s|%10u|%10u|%10u|%10u|%10u|%10u|%10s|", path_name(p), "", "",
p->sent, p->received, p->dropped, p->skipped, p->invalid, p->overrun, ""
@ -402,7 +432,7 @@ int hook_stats(struct path *p, struct hook *h, int when)
void hook_stats_header()
{
#define UNIT(u) "(" YEL(u) ")"
#define UNIT(u) "(" YEL(u) ")"
stats("%-40s|%19s|%19s|%19s|%19s|%19s|%19s|%19s|%10s|%10s|", "Source " MAG("=>") " Destination",
"OWD" UNIT("S") " ",
@ -418,48 +448,46 @@ void hook_stats_header()
line();
}
REGISTER_HOOK("stats_send", 99, hook_stats_send, HOOK_PRIVATE | HOOK_MSG)
REGISTER_HOOK("stats_send", 99, hook_stats_send, HOOK_PRIVATE | HOOK_PERIODIC)
int hook_stats_send(struct path *p, struct hook *h, int when)
{
struct private {
struct node *dest;
struct msg *msg;
int ratio;
} *private = h->private;
} *x = h->private;
switch (when) {
case HOOK_INIT:
if (!h->parameter)
error("Missing parameter for hook 'stats_send'");
private = h->private = alloc(sizeof(struct private));
x = h->private = alloc(sizeof(struct private));
private->dest = list_lookup(&nodes, h->parameter);
if (!private->dest)
x->msg = msg_create(9);
x->dest = list_lookup(&nodes, h->parameter);
if (!x->dest)
error("Invalid destination node '%s' for hook 'stats_send'", h->parameter);
break;
case HOOK_DEINIT:
free(private);
free(x->msg);
free(x);
break;
case HOOK_MSG: {
struct msg m = MSG_INIT(0);
m.data[m.length++].f = p->sent;
m.data[m.length++].f = p->received;
m.data[m.length++].f = p->invalid;
m.data[m.length++].f = p->skipped;
m.data[m.length++].f = p->dropped;
case HOOK_PERIODIC:
x->msg->data[0].f = p->sent;
x->msg->data[1].f = p->received;
x->msg->data[2].f = p->invalid;
x->msg->data[3].f = p->skipped;
x->msg->data[4].f = p->dropped;
m.data[m.length++].f = p->hist.owd.last,
m.data[m.length++].f = p->hist.gap_msg.last;
m.data[m.length++].f = p->hist.gap_recv.last;
/* Send single message with statistics to destination node */
node_write(private->dest, &m, 1, 0, 1);
x->msg->data[5].f = p->hist.owd.last,
x->msg->data[6].f = p->hist.gap_msg.last;
x->msg->data[7].f = p->hist.gap_recv.last;
node_write_single(x->dest, x->msg); /* Send single message with statistics to destination node */
break;
}
}
return 0;

View file

@ -22,14 +22,32 @@
#include "node.h"
#include "utils.h"
struct msg * msg_create(size_t values) {
struct msg m = {
.version = MSG_VERSION,
.type = MSG_TYPE_DATA,
.endian = MSG_ENDIAN_HOST,
.values = values,
.sequence = 0,
.rsvd1 = 0, .rsvd2 = 0
};
return memdup(&m, sizeof(struct msg) + sizeof(float) * values);
}
void msg_destroy(struct msg *m)
{
free(m);
}
void msg_swap(struct msg *m)
{
m->length = bswap_16(m->length);
m->values = bswap_16(m->values);
m->sequence = bswap_32(m->sequence);
m->ts.sec = bswap_32(m->ts.sec);
m->ts.nsec = bswap_32(m->ts.nsec);
for (int i = 0; i < m->length; i++)
for (int i = 0; i < m->values; i++)
m->data[i].i = bswap_32(m->data[i].i);
m->endian ^= 1;
@ -41,10 +59,8 @@ int msg_verify(struct msg *m)
return -1;
else if (m->type != MSG_TYPE_DATA)
return -2;
else if ((m->length <= 0) || (m->length > MSG_VALUES))
return -3;
else if ((m->rsvd1 != 0) || (m->rsvd2 != 0))
return -4;
return -3;
else
return 0;
}
@ -63,7 +79,7 @@ int msg_print(char *buf, size_t len, struct msg *m, int flags, double offset)
off += snprintf(buf + off, len - off, "(%u)", m->sequence);
if (flags & MSG_PRINT_VALUES) {
for (int i = 0; i < m->length; i++)
for (int i = 0; i < m->values; i++)
off += snprintf(buf + off, len - off, "\t%.6f", m->data[i].f);
}
@ -126,28 +142,25 @@ int msg_scan(const char *line, struct msg *m, int *fl, double *off)
m->sequence = (uint16_t) strtoul(ptr, &end, 10);
if (ptr != end && *end == ')')
flags |= MSG_PRINT_SEQUENCE;
else {
info(end);
else
return -5;
}
end = end + 1;
}
else
m->sequence = 0;
for ( m->length = 0, ptr = end;
m->length < MSG_VALUES;
m->length++, ptr = end) {
for (m->values = 0, ptr = end; ;
m->values++, ptr = end) {
/** @todo We only support floating point values at the moment */
m->data[m->length].f = strtod(ptr, &end);
m->data[m->values].f = strtod(ptr, &end);
if (end == ptr) /* there are no valid FP values anymore */
break;
}
if (m->length > 0)
if (m->values > 0)
flags |= MSG_PRINT_VALUES;
if (fl)
@ -155,7 +168,7 @@ int msg_scan(const char *line, struct msg *m, int *fl, double *off)
if (off && (flags & MSG_PRINT_OFFSET))
*off = offset;
return m->length;
return m->values;
}
int msg_fprint(FILE *f, struct msg *m, int flags, double offset)

View file

@ -26,6 +26,7 @@
#include "ngsi.h"
#include "utils.h"
#include "timing.h"
#include "pool.h"
/* Some global settings */
static char *name = NULL;
@ -86,7 +87,7 @@ struct ngsi_response {
size_t len;
};
static json_t* ngsi_build_entity(struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt, int flags)
static json_t* ngsi_build_entity(struct ngsi *i, struct pool *pool, int cnt, int flags)
{
json_t *entity = json_pack("{ s: s, s: s, s: b }",
"id", i->entity_id,
@ -105,7 +106,7 @@ static json_t* ngsi_build_entity(struct ngsi *i, struct msg *pool, int poolsize,
if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */
json_t *values = json_array();
for (int k = 0; k < cnt; k++) {
struct msg *m = &pool[(first + poolsize + k) % poolsize];
struct msg *m = pool_getrel(pool, k);
json_array_append_new(values, json_pack("[ f, f, i ]",
time_to_double(&MSG_TS(m)),
@ -139,7 +140,7 @@ static json_t* ngsi_build_entity(struct ngsi *i, struct msg *pool, int poolsize,
return entity;
}
static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct msg *pool, int poolsize, int first, int cnt)
static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct pool *pool, int cnt)
{
int ret;
const char *id, *name, *type;
@ -159,10 +160,10 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct msg *pool, i
return -2;
for (int j = 0; j < cnt; j++) {
struct msg *m = &pool[(first + j) % poolsize];
struct msg *m = pool_getrel(pool, j);
m->version = MSG_VERSION;
m->length = json_array_size(attributes);
m->values = json_array_size(attributes);
m->endian = MSG_ENDIAN_HOST;
}
@ -195,7 +196,7 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct msg *pool, i
size_t index2;
json_array_foreach(values, index2, tuple) {
struct msg *m = &pool[(first + index2) % poolsize];
struct msg *m = pool_getrel(pool, index2);
/* Check sample format */
if (!json_is_array(tuple) || json_array_size(tuple) != 3)
@ -527,7 +528,7 @@ int ngsi_open(struct node *n)
curl_easy_setopt(i->curl, CURLOPT_HTTPHEADER, i->headers);
/* Create entity and atributes */
json_t *entity = ngsi_build_entity(i, NULL, 0, 0, 0, NGSI_ENTITY_METADATA);
json_t *entity = ngsi_build_entity(i, NULL, 0, NGSI_ENTITY_METADATA);
ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", entity);
if (ret)
@ -544,7 +545,7 @@ int ngsi_close(struct node *n)
int ret;
/* Delete complete entity (not just attributes) */
json_t *entity = ngsi_build_entity(i, NULL, 0, 0, 0, 0);
json_t *entity = ngsi_build_entity(i, NULL, 0, 0);
ret = ngsi_request_context_update(i->curl, i->endpoint, "DELETE", entity);
@ -556,7 +557,7 @@ int ngsi_close(struct node *n)
return ret;
}
int ngsi_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int ngsi_read(struct node *n, struct pool *pool, int cnt)
{
struct ngsi *i = n->_vd;
int ret;
@ -564,13 +565,13 @@ int ngsi_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt
timerfd_wait(i->tfd);
json_t *rentity;
json_t *entity = ngsi_build_entity(i, NULL, 0, 0, 0, 0);
json_t *entity = ngsi_build_entity(i, NULL, 0, 0);
ret = ngsi_request_context_query(i->curl, i->endpoint, entity, &rentity);
if (ret)
goto out;
ret = ngsi_parse_entity(rentity, i, pool, poolsize, first, cnt);
ret = ngsi_parse_entity(rentity, i, pool, cnt);
if (ret)
goto out2;
@ -580,12 +581,12 @@ out: json_decref(entity);
return ret;
}
int ngsi_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int ngsi_write(struct node *n, struct pool *pool, int cnt)
{
struct ngsi *i = n->_vd;
int ret;
json_t *entity = ngsi_build_entity(i, pool, poolsize, first, cnt, NGSI_ENTITY_VALUES);
json_t *entity = ngsi_build_entity(i, pool, cnt, NGSI_ENTITY_VALUES);
ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", entity);

View file

@ -20,15 +20,41 @@ int node_parse(struct node *n, config_setting_t *cfg)
return n->_vt->parse ? n->_vt->parse(n, cfg) : 0;
}
int node_read(struct node *n, struct msg *p, int ps, int f, int c)
int node_read(struct node *n, struct pool *p, int c)
{
return n->_vt->read ? n->_vt->read(n, p, ps, f, c) : -1;
return n->_vt->read ? n->_vt->read(n, p, c) : -1;
}
int node_write(struct node *n, struct msg *p, int ps, int f, int c)
int node_write(struct node *n, struct pool *p, int c)
{
return n->_vt->write ? n->_vt->write(n, p, ps, f, c) : -1;
}
return n->_vt->write ? n->_vt->write(n, p, c) : -1;
}
int node_read_single(struct node *n, struct msg *m)
{
struct pool p = {
.buffer = m,
.previous = -1,
.last = 0,
.length = 1,
.stride = MSG_LEN(m->values)
};
return node_read(n, &p, 1);
}
int node_write_single(struct node *n, struct msg *m)
{
struct pool p = {
.buffer = m,
.previous = -1,
.last = 0,
.length = 1,
.stride = MSG_LEN(m->values)
};
return node_write(n, &p, 1);
}
int node_init(struct node_type *vt, int argc, char *argv[], config_setting_t *cfg)
{

View file

@ -180,7 +180,7 @@ int opal_close(struct node *n)
return 0;
}
int opal_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int opal_read(struct node *n, struct pool *pool, int cnt)
{
struct opal *o = n->_vd;
@ -245,7 +245,7 @@ int opal_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt
return 1;
}
int opal_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int opal_write(struct node *n, struct pool *pool, int cnt)
{
struct opal *o = n->_vd;

View file

@ -14,15 +14,14 @@
#include "path.h"
#include "timing.h"
#include "config.h"
#include "pool.h"
static void path_write(struct path *p)
{
list_foreach(struct node *n, &p->destinations) {
int sent = node_write(
n, /* Destination node */
p->pool, /* Pool of received messages */
p->poolsize, /* Size of the pool */
p->received - n->vectorize,/* Index of the first message which should be sent */
&p->pool, /* Pool of received messages */
n->vectorize /* Number of messages which should be sent */
);
@ -80,18 +79,17 @@ static void * path_run(void *arg)
{
struct path *p = arg;
/* Allocate memory for message pool */
p->pool = alloc(p->poolsize * sizeof(struct msg));
p->previous = p->current = p->pool;
/* Main thread loop */
for (;;) {
/* Receive message */
int recv = node_read(p->in, p->pool, p->poolsize, p->received, p->in->vectorize);
int recv = node_read(p->in, &p->pool, p->in->vectorize);
if (recv < 0)
error("Failed to receive message from node %s", node_name(p->in));
else if (recv == 0)
continue;
/* Update tail pointer of message pool by the amount of actually received messages. */
pool_push(&p->pool, recv);
/** @todo Replace this timestamp by hardware timestamping */
p->ts.last = p->ts.recv;
@ -107,9 +105,6 @@ static void * path_run(void *arg)
/* For each received message... */
for (int i = 0; i < recv; i++) {
p->previous = p->current;
p->current = &p->pool[p->received % p->poolsize];
p->received++;
/* Run hooks for filtering, stats collection and manipulation */
@ -135,8 +130,8 @@ static void * path_run(void *arg)
int path_start(struct path *p)
{
info("Starting path: %s (poolsize=%u, msgsize=%u, #hooks=%zu, rate=%.1f)",
path_name(p), p->poolsize, p->msgsize, list_length(&p->hooks), p->rate);
info("Starting path: %s (poollen=%zu, msgsize=%zu, #hooks=%zu, rate=%.1f)",
path_name(p), pool_length(&p->pool), pool_stride(&p->pool), list_length(&p->hooks), p->rate);
/* We sort the hooks according to their priority before starting the path */
list_sort(&p->hooks, ({int cmp(const void *a, const void *b) {
@ -203,12 +198,13 @@ const char * path_name(struct path *p)
return p->_name;
}
struct path * path_create()
struct path * path_create(size_t poolsize, size_t values)
{
struct path *p = alloc(sizeof(struct path));
list_init(&p->destinations, NULL);
list_init(&p->hooks, free);
pool_create(&p->pool, poolsize, 16 + values * sizeof(float)); /** @todo */
list_foreach(struct hook *h, &hooks) {
if (h->type & HOOK_INTERNAL)
@ -226,8 +222,8 @@ void path_destroy(struct path *p)
list_destroy(&p->destinations);
list_destroy(&p->hooks);
pool_destroy(&p->pool);
free(p->_name);
free(p->pool);
free(p);
}

58
lib/pool.c Normal file
View file

@ -0,0 +1,58 @@
/** Circular buffer
*
* Every path uses a circular buffer to save past messages.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC
* This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*/
#include <stddef.h>
#include "utils.h"
#include "pool.h"
void pool_create(struct pool *p, size_t length, size_t stride)
{
p->buffer = alloc(length * stride);
p->last = 0;
p->length = length;
p->stride = stride;
}
void pool_destroy(struct pool *p)
{
free(p->buffer);
p->length =
p->stride = 0;
}
void pool_push(struct pool *p, int blocks)
{
p->previous = p->last;
p->last += blocks;
}
void * pool_get(struct pool *p, int index)
{
return (char *) p->buffer + p->stride * (index % p->length);
}
void * pool_getrel(struct pool *p, int offset)
{
return pool_get(p, p->last + offset);
}
void * pool_get_next(struct pool *p, void *ptr)
{
ptr += p->stride;
if (ptr < p->buffer + p->length)
ptr -= p->length;
return ptr;
}

View file

@ -31,6 +31,7 @@
#include "utils.h"
#include "socket.h"
#include "checks.h"
#include "pool.h"
/* Forward declartions */
static struct node_type vt;
@ -198,7 +199,7 @@ int socket_destroy(struct node *n)
return 0;
}
int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int socket_read(struct node *n, struct pool *pool, int cnt)
{
struct socket *s = n->_vd;
@ -218,12 +219,12 @@ int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int c
/* Check if packet length is correct */
if (bytes % (cnt * 4) != 0)
error("Packet length not dividable by 4: received=%u, cnt=%u", bytes, cnt);
if (bytes / cnt > sizeof(struct msg))
error("Packet length is too large: received=%u, cnt=%u, max=%zu", bytes, cnt, sizeof(struct msg));
if (bytes / cnt > pool->stride)
error("Packet length is too large: received=%u, cnt=%u, max=%zu", bytes, cnt, pool->stride);
for (int i = 0; i < cnt; i++) {
/* All messages of a packet must have equal length! */
iov[i].iov_base = &pool[(first+poolsize+i) % poolsize];
iov[i].iov_base = pool_getrel(pool, i);
iov[i].iov_len = bytes / cnt;
}
@ -237,17 +238,17 @@ int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int c
debug(17, "Received packet of %u bytes: %u samples a %u values per sample", bytes, cnt, (bytes / cnt) / 4 - 4);
for (int i = 0; i < cnt; i++) {
struct msg *m = &pool[(first+poolsize+i) % poolsize];
struct msg *m = pool_getrel(pool, i);
/* Convert message to host endianess */
if (m->endian != MSG_ENDIAN_HOST)
msg_swap(m);
/* Check integrity of packet */
if (bytes / cnt != MSG_LEN(m))
error("Invalid message len: %u for node %s", MSG_LEN(m), node_name(n));
if (bytes / cnt != MSG_LEN(m->values))
error("Invalid message len: %u for node %s", MSG_LEN(m->values), node_name(n));
bytes -= MSG_LEN(m);
bytes -= MSG_LEN(m->values);
}
/* Check packet integrity */
@ -257,7 +258,7 @@ int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int c
return cnt;
}
int socket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int socket_write(struct node *n, struct pool *pool, int cnt)
{
struct socket *s = n->_vd;
int bytes, sent = 0;
@ -266,13 +267,10 @@ int socket_write(struct node *n, struct msg *pool, int poolsize, int first, int
struct iovec iov[cnt];
for (int i = 0; i < cnt; i++) {
struct msg *m = &pool[(first+i) % poolsize];
if (m->type == MSG_TYPE_EMPTY)
continue;
struct msg *m = pool_getrel(pool, i);
iov[sent].iov_base = m;
iov[sent].iov_len = MSG_LEN(m);
iov[sent].iov_len = MSG_LEN(m->values);
sent++;
}

View file

@ -1,65 +0,0 @@
/** Circular buffer
*
* Every path uses a circular buffer to save past messages.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC
* This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*/
#ifndef _POOL_H_
#define _POOL_H_
/* Forward declaration */
struct msg;
struct pool {
void *buffer;
int last;
size_t len;
size_t blocklen;
};
int pool_init(struct pool *p, size_t len, size_t blocklen)
{
p->buffer = alloc(len * blocklen);
p->first = NULL;
p->last = NULL;
p->len = len;
p->blocklen = len;
}
void pool_destroy(struct pool *p)
{
free(p->buffer);
p->len = 0;
p->blocklen = 0;
}
void * pool_get(struct pool *p, int index);
{
/* Negative indizes are relative to p->last */
if (index < 0)
index = p->last - index;
/* Check boundaries */
if (index > p->last || p->last - index > p->len)
return NULL;
return (char *) p->buffer + p->blocklen * (index % p->len);
}
void pool_put(struct pool *p, void *src)
{
memcpy(p->buffer + p->blocklen * (p->last++ % p->len), src, p->blocklen);
}
#endif /* _POOL_H_ */

View file

@ -22,6 +22,7 @@
#include "node.h"
#include "msg.h"
#include "timing.h"
#include "pool.h"
/** Linked list of nodes */
struct list nodes = LIST_INIT((dtor_cb_t) node_destroy);
@ -29,7 +30,7 @@ struct list nodes = LIST_INIT((dtor_cb_t) node_destroy);
/** The global configuration */
struct settings settings;
struct msg *recv_pool, *send_pool;
struct pool recv_pool, send_pool;
pthread_t recv_thread, send_thread;
struct node *node;
@ -42,9 +43,9 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
node_stop(node);
node_deinit(node->_vt);
free(recv_pool);
free(send_pool);
pool_destroy(&recv_pool);
pool_destroy(&send_pool);
list_destroy(&nodes);
@ -71,7 +72,7 @@ void * send_loop(void *ctx)
{
for (;;) {
for (int i = 0; i < node->vectorize; i++) {
struct msg *m = &send_pool[i];
struct msg *m = pool_getrel(&send_pool, i);
int reason;
retry: reason = msg_fscan(stdin, m, NULL, NULL);
@ -85,7 +86,7 @@ retry: reason = msg_fscan(stdin, m, NULL, NULL);
}
}
node_write(node, send_pool, node->vectorize, 0, node->vectorize);
node_write(node, &send_pool, node->vectorize);
}
return NULL;
@ -99,9 +100,9 @@ void * recv_loop(void *ctx)
for (;;) {
struct timespec ts = time_now();
int recv = node_read(node, recv_pool, node->vectorize, 0, node->vectorize);
int recv = node_read(node, &recv_pool, node->vectorize);
for (int i = 0; i < recv; i++) {
struct msg *m = &recv_pool[i];
struct msg *m = pool_getrel(&recv_pool, i);
int ret = msg_verify(m);
if (ret)
@ -159,10 +160,9 @@ int main(int argc, char *argv[])
error("Node '%s' does not exist!", argv[2]);
node_init(node->_vt, argc-optind, argv+optind, config_root_setting(&config));
recv_pool = alloc(sizeof(struct msg) * node->vectorize);
send_pool = alloc(sizeof(struct msg) * node->vectorize);
pool_create(&recv_pool, node->vectorize, sizeof(struct msg));
pool_create(&send_pool, node->vectorize, sizeof(struct msg));
if (reverse)
node_reverse(node);

View file

@ -46,7 +46,7 @@ int main(int argc, char *argv[])
exit(EXIT_FAILURE);
}
struct msg m = MSG_INIT(atoi(argv[2]));
int values = atoi(argv[2]);
double rate = atof(argv[3]);
int limit = argc >= 5 ? atoi(argv[4]) : -1;
int type = TYPE_MIXED;
@ -80,35 +80,38 @@ int main(int argc, char *argv[])
fprintf(stderr, "# %-20s\t\t%s\n", "sec.nsec(seq)", "data[]");
struct timespec start = time_now();
struct msg *m = msg_create(values);
/* Block until 1/p->rate seconds elapsed */
while (limit-- > 0 || argc < 5) {
struct timespec now = time_now();
double running = time_delta(&start, &now);
m.ts.sec = now.tv_sec;
m.ts.nsec = now.tv_nsec;
m->ts.sec = now.tv_sec;
m->ts.nsec = now.tv_nsec;
for (int i = 0; i < m.length; i++) {
int rtype = type != TYPE_MIXED ? type : i % 4;
for (int i = 0; i < m->values; i++) {
int rtype = (type != TYPE_MIXED) ? type : i % 4;
double ampl = i+1;
double freq = i+1;
switch (rtype) {
case TYPE_RANDOM: m.data[i].f += box_muller(0, 0.02); break;
case TYPE_SINE: m.data[i].f = ampl * sin(running * freq * M_PI); break;
case TYPE_TRIANGLE: m.data[i].f = ampl * (fabs(fmod(running * freq, 1) - .5) - 0.25) * 4; break;
case TYPE_SQUARE: m.data[i].f = ampl * ( (fmod(running * freq, 1) < .5) ? -1 : 1); break;
case TYPE_RANDOM: m->data[i].f += box_muller(0, 0.02); break;
case TYPE_SINE: m->data[i].f = ampl * sin(running * freq * M_PI); break;
case TYPE_TRIANGLE: m->data[i].f = ampl * (fabs(fmod(running * freq, 1) - .5) - 0.25) * 4; break;
case TYPE_SQUARE: m->data[i].f = ampl * ( (fmod(running * freq, 1) < .5) ? -1 : 1); break;
}
}
msg_fprint(stdout, &m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0);
msg_fprint(stdout, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0);
fflush(stdout);
m.sequence += timerfd_wait(tfd);
m->sequence += timerfd_wait(tfd);
}
close(tfd);
free(m);
return 0;
}

View file

@ -20,6 +20,7 @@
#include "utils.h"
#include "hist.h"
#include "timing.h"
#include "pool.h"
struct settings settings; /** <The global configuration */
@ -140,23 +141,24 @@ check:
}
void test_rtt() {
struct msg m = MSG_INIT(sizeof(struct timespec) / sizeof(float));
struct timespec sent, recv;
struct hist hist;
struct msg *m;
m = msg_create(0);
hist_create(&hist, low, high, res);
/* Print header */
fprintf(stdout, "%17s%5s%10s%10s%10s%10s%10s\n", "timestamp", "seq", "rtt", "min", "max", "mean", "stddev");
while (running && (count < 0 || count--)) {
while (running && (count < 0 || count--)) {
clock_gettime(CLOCK_ID, &sent);
m.ts.sec = sent.tv_sec;
m.ts.nsec = sent.tv_nsec;
m->ts.sec = sent.tv_sec;
m->ts.nsec = sent.tv_nsec;
node_write(node, &m, 1, 0, 1); /* Ping */
node_read(node, &m, 1, 0, 1); /* Pong */
node_write_single(node, m); /* Ping */
node_read_single(node, m); /* Pong */
clock_gettime(CLOCK_ID, &recv);
@ -167,10 +169,10 @@ void test_rtt() {
hist_put(&hist, rtt);
m.sequence++;
m->sequence++;
fprintf(stdout, "%10lu.%06lu%5u%10.3f%10.3f%10.3f%10.3f%10.3f\n",
recv.tv_sec, recv.tv_nsec / 1000, m.sequence,
recv.tv_sec, recv.tv_nsec / 1000, m->sequence,
1e3 * rtt, 1e3 * hist.lowest, 1e3 * hist.highest,
1e3 * hist_mean(&hist), 1e3 * hist_stddev(&hist));
}
@ -184,5 +186,6 @@ void test_rtt() {
error("Invalid file descriptor: %u", fd);
hist_print(&hist);
hist_destroy(&hist);
}