mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
improved epoch related settings for file node
This commit is contained in:
parent
9b5cf26bac
commit
c9a3461ffe
5 changed files with 142 additions and 72 deletions
|
@ -13,6 +13,8 @@ S2SS is used in distributed- and co-simulation scenarios and developed for the f
|
|||
|
||||
The project consists of a server daemon and several client modules which are documented here.
|
||||
|
||||
[TOC]
|
||||
|
||||
### Server
|
||||
|
||||
The server simply acts as a gateway to forward simulation data from one client to another.
|
||||
|
|
|
@ -31,16 +31,31 @@ Specifies the mode which should be used to open the output file.
|
|||
See [open(2)](http://man7.org/linux/man-pages/man2/open.2.html) for an explanation of allowed values.
|
||||
The default value is `w+` which will start writing at the beginning of the file and create it in case it does not exist yet.
|
||||
|
||||
#### `epoch_mode` *("now"|"relative"|"absolute")*
|
||||
#### `epoch_mode` *("direct"|"wait" | "relative"|"absolute")*
|
||||
|
||||
The *epoch* describes the point in time when the first message will be read from the file.
|
||||
This setting allows to select the behaviour of the following `epoch` setting.
|
||||
It can be used to adjust the point in time when the first value should be read.
|
||||
|
||||
The behaviour of `epoch` is depending on the value of `epoch_mode`.
|
||||
|
||||
- `epoch_mode = now`: The first value is read at *now* + `epoch` seconds.
|
||||
- `epoch_mode = relative`: The first value is read at *start* + `epoch` seconds.
|
||||
- `epoch_mode = absolute`: The first value is read at `epoch` seconds after 1970-01-01 00:00:00.
|
||||
To facilitate the following description of supported `epoch_mode`'s, we will introduce some intermediate variables (timestamps).
|
||||
Those variables will also been displayed during the startup phase of the server to simplify debugging.
|
||||
|
||||
- `epoch` is the value of the `epoch` setting.
|
||||
- `first` is the timestamp of the first message / line in the input file.
|
||||
- `offset` will be added to the timestamps in the file to obtain the real time when the message will be sent.
|
||||
- `start` is the point in time when the first message will be sent (`first + offset`).
|
||||
- `eta` the time to wait until the first message will be send (`start - now`)
|
||||
|
||||
The supported values for `epoch_mode`:
|
||||
|
||||
| `epoch_mode` | `offset` | `start = first + offset` |
|
||||
| -----------: | :-------------------: | :----------------------: |
|
||||
| `direct` | `now - first + epoch` | `now + epoch` |
|
||||
| `wait` | `now + epoch` | `now + first` |
|
||||
| `relative` | `epoch` | `first + epoch` |
|
||||
| `absolute` | `epoch - first` | `epoch` |
|
||||
|
||||
#### `send_rate` *(float)*
|
||||
|
||||
|
@ -60,8 +75,9 @@ If this setting has a non-zero value, the default behaviour is overwritten with
|
|||
in = "logs/file_input.log", # These options specify the path prefix where the the files are stored
|
||||
out = "logs/file_output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3))
|
||||
|
||||
epoch_mode = "now" # One of:
|
||||
# now (default)
|
||||
epoch_mode = "direct" # One of:
|
||||
# direct (default)
|
||||
# wait
|
||||
# relative
|
||||
# absolute
|
||||
|
||||
|
|
|
@ -90,8 +90,9 @@ nodes = {
|
|||
in = "logs/file_input.log", # These options specify the path prefix where the the files are stored
|
||||
out = "logs/file_output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3))
|
||||
|
||||
epoch_mode = "now" # One of:
|
||||
# now (default)
|
||||
epoch_mode = "direct" # One of:
|
||||
# direct (default)
|
||||
# wait
|
||||
# relative
|
||||
# absolute
|
||||
|
||||
|
|
|
@ -32,17 +32,19 @@ struct file {
|
|||
const char *file_mode; /**< The mode for fopen() which is used for the out file. */
|
||||
|
||||
enum epoch_mode {
|
||||
EPOCH_NOW,
|
||||
EPOCH_DIRECT,
|
||||
EPOCH_WAIT,
|
||||
EPOCH_RELATIVE,
|
||||
EPOCH_ABSOLUTE
|
||||
} epoch_mode; /**< Specifies how file::offset is calculated. */
|
||||
|
||||
struct timespec start; /**< The first timestamp of the input file. */
|
||||
struct timespec first; /**< The first timestamp in the file file::path_in */
|
||||
struct timespec epoch; /**< The epoch timestamp from the configuration. */
|
||||
struct timespec offset; /**< An offset between the timestamp in the input file and the current time */
|
||||
|
||||
double rate; /**< The sending rate. */
|
||||
int tfd; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */
|
||||
int sequence; /**< Last sequence of this node */
|
||||
};
|
||||
|
||||
/** @see node_vtable::init */
|
||||
|
|
|
@ -30,9 +30,55 @@ char * file_print(struct node *n)
|
|||
{
|
||||
struct file *f = n->file;
|
||||
char *buf = NULL;
|
||||
|
||||
if (f->path_in) {
|
||||
const char *epoch_mode_str = NULL;
|
||||
switch (f->epoch_mode) {
|
||||
case EPOCH_DIRECT: epoch_mode_str = "direct"; break;
|
||||
case EPOCH_WAIT: epoch_mode_str = "wait"; break;
|
||||
case EPOCH_RELATIVE: epoch_mode_str = "relative"; break;
|
||||
case EPOCH_ABSOLUTE: epoch_mode_str = "absolute"; break;
|
||||
}
|
||||
|
||||
strcatf(&buf, "in=%s, epoch_mode=%s, epoch=%.2f, ",
|
||||
f->path_in,
|
||||
epoch_mode_str,
|
||||
time_to_double(&f->epoch)
|
||||
);
|
||||
|
||||
if (f->rate)
|
||||
strcatf(&buf, "rate=%.1f, ", f->rate);
|
||||
}
|
||||
|
||||
if (f->path_out) {
|
||||
strcatf(&buf, "out=%s, mode=%s, ",
|
||||
f->path_out,
|
||||
f->file_mode
|
||||
);
|
||||
}
|
||||
|
||||
if (f->first.tv_sec || f->first.tv_nsec)
|
||||
strcatf(&buf, "first=%.2f, ", time_to_double(&f->first));
|
||||
|
||||
if (f->offset.tv_sec || f->offset.tv_nsec)
|
||||
strcatf(&buf, "offset=%.2f, ", time_to_double(&f->offset));
|
||||
|
||||
if ((f->first.tv_sec || f->first.tv_nsec) &&
|
||||
(f->offset.tv_sec || f->offset.tv_nsec)) {
|
||||
struct timespec eta, now;
|
||||
clock_gettime(CLOCK_REALTIME, &now);
|
||||
|
||||
return strcatf(&buf, "in=%s, out=%s, mode=%s, rate=%.1f, epoch_mode=%u, epoch=%.0f",
|
||||
f->path_in, f->path_out, f->file_mode, f->rate, f->epoch_mode, time_to_double(&f->epoch));
|
||||
eta = time_add(&f->first, &f->offset);
|
||||
eta = time_diff(&now, &eta);
|
||||
|
||||
if (eta.tv_sec || eta.tv_nsec)
|
||||
strcatf(&buf, "eta=%.2f sec, ", time_to_double(&eta));
|
||||
}
|
||||
|
||||
if (strlen(buf) > 2)
|
||||
buf[strlen(buf) - 2] = 0;
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
int file_parse(config_setting_t *cfg, struct node *n)
|
||||
|
@ -65,10 +111,12 @@ int file_parse(config_setting_t *cfg, struct node *n)
|
|||
f->epoch = time_from_double(epoch_flt);
|
||||
|
||||
if (!config_setting_lookup_string(n->cfg, "epoch_mode", &epoch_mode))
|
||||
epoch_mode = "now";
|
||||
epoch_mode = "direct";
|
||||
|
||||
if (!strcmp(epoch_mode, "now"))
|
||||
f->epoch_mode = EPOCH_NOW;
|
||||
if (!strcmp(epoch_mode, "direct"))
|
||||
f->epoch_mode = EPOCH_DIRECT;
|
||||
else if (!strcmp(epoch_mode, "wait"))
|
||||
f->epoch_mode = EPOCH_WAIT;
|
||||
else if (!strcmp(epoch_mode, "relative"))
|
||||
f->epoch_mode = EPOCH_RELATIVE;
|
||||
else if (!strcmp(epoch_mode, "absolute"))
|
||||
|
@ -86,17 +134,51 @@ int file_open(struct node *n)
|
|||
struct file *f = n->file;
|
||||
|
||||
if (f->path_in) {
|
||||
f->in = fopen(f->path_in, "r");
|
||||
/* Open file */
|
||||
f->in = fopen(f->path_in, "r");
|
||||
if (!f->in)
|
||||
serror("Failed to open file for reading: '%s'", f->path_in);
|
||||
|
||||
/* Create timer */
|
||||
f->tfd = timerfd_create(CLOCK_REALTIME, 0);
|
||||
if (f->tfd < 0)
|
||||
serror("Failed to create timer");
|
||||
|
||||
/* Get current time */
|
||||
struct timespec now, eta;
|
||||
clock_gettime(CLOCK_REALTIME, &now);
|
||||
|
||||
/* Arm the timer */
|
||||
/* Get timestamp of first line */
|
||||
struct msg m;
|
||||
int ret = msg_fscan(f->in, &m, NULL, NULL); rewind(f->in);
|
||||
if (ret < 0)
|
||||
error("Failed to read first timestamp of node '%s'", n->name);
|
||||
|
||||
f->first = MSG_TS(&m);
|
||||
|
||||
/* Set offset depending on epoch_mode */
|
||||
switch (f->epoch_mode) {
|
||||
case EPOCH_DIRECT: /* read first value at now + epoch */
|
||||
f->offset = time_diff(&f->first, &now);
|
||||
f->offset = time_add(&f->offset, &f->epoch);
|
||||
break;
|
||||
|
||||
case EPOCH_WAIT: /* read first value at now + first + epoch */
|
||||
f->offset = now;
|
||||
f->offset = time_add(&f->offset, &f->epoch);
|
||||
break;
|
||||
|
||||
case EPOCH_RELATIVE: /* read first value at first + epoch */
|
||||
f->offset = f->epoch;
|
||||
break;
|
||||
|
||||
case EPOCH_ABSOLUTE: /* read first value at f->epoch */
|
||||
f->offset = time_diff(&f->first, &f->epoch);
|
||||
break;
|
||||
}
|
||||
|
||||
/* Arm the timer with a fixed rate */
|
||||
if (f->rate) {
|
||||
/* Send with fixed rate */
|
||||
struct itimerspec its = {
|
||||
.it_interval = time_from_double(1 / f->rate),
|
||||
.it_value = { 1, 0 },
|
||||
|
@ -106,43 +188,14 @@ int file_open(struct node *n)
|
|||
if (ret)
|
||||
serror("Failed to start timer");
|
||||
}
|
||||
else {
|
||||
struct msg m;
|
||||
/* Get current time */
|
||||
struct timespec now, first, eta;
|
||||
clock_gettime(CLOCK_REALTIME, &now);
|
||||
|
||||
/* Get timestamp of first sample */
|
||||
msg_fscan(f->in, &m, NULL, NULL);
|
||||
first = MSG_TS(&m);
|
||||
|
||||
/* Set offset depending on epoch_mode */
|
||||
switch (f->epoch_mode) {
|
||||
case EPOCH_NOW: /* read first value at f->now + f->epoch */
|
||||
first = time_diff(&f->start, &now);
|
||||
f->offset = time_add(&first, &f->epoch);
|
||||
break;
|
||||
case EPOCH_RELATIVE: /* read first value at f->start + f->epoch */
|
||||
f->offset = f->epoch;
|
||||
break;
|
||||
case EPOCH_ABSOLUTE: /* read first value at f->epoch */
|
||||
f->offset = time_diff(&f->start, &f->epoch);
|
||||
break;
|
||||
}
|
||||
|
||||
eta = time_add(&f->start, &f->offset);
|
||||
eta = time_diff(&now, &eta);
|
||||
|
||||
debug(5, "Opened file '%s' as input for node '%s': start=%.2f, offset=%.2f, eta=%.2f sec",
|
||||
f->path_in, n->name,
|
||||
time_to_double(&f->start),
|
||||
time_to_double(&f->offset),
|
||||
time_to_double(&eta)
|
||||
);
|
||||
}
|
||||
|
||||
char *buf = file_print(n);
|
||||
debug(4, "Opened file node '%s': %s", n->name, buf);
|
||||
free(buf);
|
||||
}
|
||||
|
||||
if (f->path_out) {
|
||||
/* Open output file */
|
||||
f->out = fopen(f->path_out, f->file_mode);
|
||||
if (!f->out)
|
||||
serror("Failed to open file for writing: '%s'", f->path_out);
|
||||
|
@ -167,39 +220,35 @@ int file_close(struct node *n)
|
|||
|
||||
int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
|
||||
{
|
||||
int i = 0;
|
||||
int values, flags, i = 0;
|
||||
struct file *f = n->file;
|
||||
|
||||
if (f->in) {
|
||||
for (i = 0; i < cnt; i++) {
|
||||
struct msg *cur = &pool[(first+i) % poolsize];
|
||||
|
||||
if (f->rate) {
|
||||
/* Wait until epoch for the first time only */
|
||||
if (ftell(f->in) == 0) {
|
||||
struct timespec until = time_add(&f->start, &f->offset);
|
||||
if (timerfd_wait_until(f->tfd, &until))
|
||||
serror("Failed to wait for timer");
|
||||
}
|
||||
/* Wait with fixed rate delay */
|
||||
else {
|
||||
if (timerfd_wait(f->tfd) < 0)
|
||||
serror("Failed to wait for timer");
|
||||
}
|
||||
/* Get message and timestamp */
|
||||
values = msg_fscan(f->in, cur, &flags, NULL);
|
||||
if (values < 0) {
|
||||
if (!feof(f->in))
|
||||
warn("Failed to parse file of node '%s", n->name);
|
||||
|
||||
msg_fscan(f->in, cur, NULL, NULL);
|
||||
return 0;
|
||||
}
|
||||
else {
|
||||
struct timespec until;
|
||||
|
||||
/* Get message and timestamp */
|
||||
msg_fscan(f->in, cur, NULL, NULL);
|
||||
/* Fix missing sequence no */
|
||||
cur->sequence = f->sequence = (flags & MSG_PRINT_SEQUENCE) ? cur->sequence : f->sequence + 1;
|
||||
|
||||
if (!f->rate || ftell(f->in) == 0) {
|
||||
struct timespec until = time_add(&MSG_TS(cur), &f->offset);
|
||||
|
||||
/* Wait for next message / sampe */
|
||||
until = time_add(&MSG_TS(cur), &f->offset);
|
||||
if (timerfd_wait_until(f->tfd, &until) < 0)
|
||||
serror("Failed to wait for timer");
|
||||
}
|
||||
else { /* Wait with fixed rate delay */
|
||||
if (timerfd_wait(f->tfd) < 0)
|
||||
serror("Failed to wait for timer");
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
|
Loading…
Add table
Reference in a new issue