1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

added file split functionality by rewriting most of the file stuff..

This commit is contained in:
Steffen Vogel 2015-10-14 12:11:33 +02:00
parent 314e08550a
commit d85db3f604
5 changed files with 299 additions and 188 deletions

View file

@ -4,16 +4,14 @@ The `file` node-type can be used to log or replay samples to / from disk.
## Configuration
Every `file` node supports the following special settings:
Every `file` node can be configured to only read or write or to do both at the same time.
The node configuration is splitted in to groups: `in` and `out`.
#### `in` *(string: filesystem path)*
#### `path` *(string: filesystem path)*
Specifies the path to a file from which is written to or read from (depending in which group is used).
Specifies the path to a file which contains data for replaying.
See below for a description of the file format.
#### `out` *(string: filesystem path)*
Specifies the path to a file where samples will be written to.
This setting allows to add special paceholders for time and date values.
See [strftime(3)](http://man7.org/linux/man-pages/man3/strftime.3.html) for a list of supported placeholder.
@ -25,7 +23,7 @@ will create a file called: *path_of_working_directory*/logs/measurements_2015-08
See below for a description of the file format.
#### `file_mode` *(string)*
#### `mode` *(string)*
Specifies the mode which should be used to open the output file.
See [open(2)](http://man7.org/linux/man-pages/man2/open.2.html) for an explanation of allowed values.
@ -57,38 +55,53 @@ The supported values for `epoch_mode`:
| `relative` | `epoch` | `first + epoch` |
| `absolute` | `epoch - first` | `epoch` |
#### `send_rate` *(float)*
#### `rate` *(float)*
By default `send_rate` has the value `0` which means that the time between consecutive samples is the same as in the `in` file based on the timestamps in the first column.
If this setting has a non-zero value, the default behaviour is overwritten with a fixed rate.
#### `split` *(integer)*
Only valid for the `out` group.
Splits the output file every `split` mega-byte. This setting will append the chunk number to the `path` setting.
Example: `data/my_measurements.log_001`
#### `splitted` *(boolean)*
Only valid for the `in` group.
Expects the input data in splitted format.
### Example
file_node = {
type = "file",
### The following settings are specific to the file node-type!! ###
mode = "w+", # The mode in which files should be opened (see open(2))
# You might want to use "a+" to append to a file
in = "logs/file_input.log", # These options specify the path prefix where the the files are stored
out = "logs/file_output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3))
epoch_mode = "direct" # One of:
# direct (default)
# wait
# relative
# absolute
in = {
path = "logs/input.log", # These options specify the path prefix where the the files are stored
mode = "w+", # The mode in which files should be opened (see open(2))
epoch_mode = "direct" # One of: direct (default), wait, relative, absolute
epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0).
# Consult the documentation of a full explanation
epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0):
# - epoch_mode = now: The first value is read at: _now_ + epoch seconds.
# - epoch_mode = relative: The first value is read at _start_ + `epoch` seconds.
# - epoch_mode = absolute: The first value is read at epoch seconds after 1970-01-01 00:00:00.
rate = 2.0 # A constant rate at which the lines of the input files should be read
rate = 2.0 # A constant rate at which the lines of the input files should be read
# A missing or zero value will use the timestamp in the first column
# of the file to determine the pause between consecutive lines.
splitted = false
},
out = {
path = "logs/output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3))
mode = "a+" # You might want to use "a+" to append to a file
split = 100, # Split output file every 100 MB
}
}
## File Format

View file

@ -87,20 +87,26 @@ nodes = {
### The following settings are specific to the file node-type!! ###
mode = "w+", # The mode in which files should be opened (see open(2))
# You might want to use "a+" to append to a file
in = "logs/file_input.log", # These options specify the path prefix where the the files are stored
out = "logs/file_output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3))
epoch_mode = "direct" # One of: direct (default), wait, relative, absolute
epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0).
in = {
path = "logs/input.log", # These options specify the path prefix where the the files are stored
mode = "w+", # The mode in which files should be opened (see open(2))
epoch_mode = "direct" # One of: direct (default), wait, relative, absolute
epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0).
# Consult the documentation of a full explanation
rate = 2.0 # A constant rate at which the lines of the input files should be read
rate = 2.0 # A constant rate at which the lines of the input files should be read
# A missing or zero value will use the timestamp in the first column
# of the file to determine the pause between consecutive lines.
splitted = false
},
out = {
path = "logs/output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3))
mode = "a+" # You might want to use "a+" to append to a file
split = 100, # Split output file every 100 MB
}
},
gtfpga_node = {
type = "gtfpga",

View file

@ -20,31 +20,40 @@
#include "node.h"
#define FILE_MAX_PATHLEN 128
#define FILE_MAX_PATHLEN 512
enum {
FILE_READ,
FILE_WRITE
};
struct file {
FILE *in;
FILE *out;
struct file_direction {
FILE *handle; /**< libc: stdio file handle */
char *path_in;
char *path_out;
const char *mode; /**< libc: fopen() mode */
const char *fmt; /**< Format string for file name. */
const char *file_mode; /**< The mode for fopen() which is used for the out file. */
char *path; /**< Real file name */
int chunk; /**< Current chunk number. */
int split; /**< Split file every file::split mega bytes. */
} read, write;
enum epoch_mode {
enum read_epoch_mode {
EPOCH_DIRECT,
EPOCH_WAIT,
EPOCH_RELATIVE,
EPOCH_ABSOLUTE
} epoch_mode; /**< Specifies how file::offset is calculated. */
} read_epoch_mode; /**< Specifies how file::offset is calculated. */
struct timespec first; /**< The first timestamp in the file file::path_in */
struct timespec epoch; /**< The epoch timestamp from the configuration. */
struct timespec offset; /**< An offset between the timestamp in the input file and the current time */
struct timespec read_first; /**< The first timestamp in the file file::path_in */
struct timespec read_epoch; /**< The epoch timestamp from the configuration. */
struct timespec read_offset; /**< An offset between the timestamp in the input file and the current time */
double rate; /**< The sending rate. */
int tfd; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */
int sequence; /**< Last sequence of this node */
int read_sequence; /**< Sequence number of last message which has been written to file::path_out */
int read_timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */
double read_rate; /**< The read rate. */
};
/** @see node_vtable::init */

View file

@ -26,49 +26,146 @@ int file_deinit()
return 0; /* nothing todo here */
}
static char * file_format_name(const char *format, struct timespec *ts)
{
struct tm tm;
char *buf = alloc(FILE_MAX_PATHLEN);
/* Convert time */
gmtime_r(&ts->tv_sec, &tm);
strftime(buf, FILE_MAX_PATHLEN, format, &tm);
return buf;
}
static FILE * file_reopen(struct file_direction *dir)
{
char buf[FILE_MAX_PATHLEN];
const char *path = buf;
/* Append chunk number to filename */
if (dir->chunk >= 0)
snprintf(buf, FILE_MAX_PATHLEN, "%s_%03u", dir->path, dir->chunk);
else
path = dir->path;
if (dir->handle)
fclose(dir->handle);
return fopen(path, dir->mode);
}
static int file_parse_direction(config_setting_t *cfg, struct file *f, int d)
{
struct file_direction *dir = (d == FILE_READ) ? &f->read : &f->write;
if (!config_setting_lookup_string(cfg, "path", &dir->fmt))
return -1;
if (!config_setting_lookup_string(cfg, "mode", &dir->mode))
dir->mode = (d == FILE_READ) ? "r" : "w+";
return 0;
}
int file_parse(config_setting_t *cfg, struct node *n)
{
struct file *f = alloc(sizeof(struct file));
config_setting_t *cfg_in, *cfg_out;
cfg_out = config_setting_get_member(cfg, "out");
if (cfg_out) {
if (file_parse_direction(cfg_out, f, FILE_WRITE))
cerror(cfg_out, "Failed to parse output file for node '%s'", n->name);
/* More write specific settings */
if (!config_setting_lookup_int(cfg_out, "split", &f->write.split))
f->write.split = 0; /* Save all samples in a single file */
}
cfg_in = config_setting_get_member(cfg, "in");
if (cfg_in) {
if (file_parse_direction(cfg_in, f, FILE_READ))
cerror(cfg_in, "Failed to parse input file for node '%s'", n->name);
/* More read specific settings */
if (!config_setting_lookup_bool(cfg_in, "splitted", &f->read.split))
f->read.split = 0; /* Save all samples in a single file */
if (!config_setting_lookup_float(cfg_in, "rate", &f->read_rate))
f->read_rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */
double epoch_flt;
if (!config_setting_lookup_float(cfg_in, "epoch", &epoch_flt))
epoch_flt = 0;
f->read_epoch = time_from_double(epoch_flt);
const char *epoch_mode;
if (!config_setting_lookup_string(cfg_in, "epoch_mode", &epoch_mode))
epoch_mode = "direct";
if (!strcmp(epoch_mode, "direct"))
f->read_epoch_mode = EPOCH_DIRECT;
else if (!strcmp(epoch_mode, "wait"))
f->read_epoch_mode = EPOCH_WAIT;
else if (!strcmp(epoch_mode, "relative"))
f->read_epoch_mode = EPOCH_RELATIVE;
else if (!strcmp(epoch_mode, "absolute"))
f->read_epoch_mode = EPOCH_ABSOLUTE;
else
cerror(cfg_in, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode);
}
n->file = f;
return 0;
}
char * file_print(struct node *n)
{
struct file *f = n->file;
char *buf = NULL;
if (f->path_in) {
const char *epoch_mode_str = NULL;
switch (f->epoch_mode) {
case EPOCH_DIRECT: epoch_mode_str = "direct"; break;
case EPOCH_WAIT: epoch_mode_str = "wait"; break;
case EPOCH_RELATIVE: epoch_mode_str = "relative"; break;
case EPOCH_ABSOLUTE: epoch_mode_str = "absolute"; break;
if (f->read.fmt) {
const char *epoch_str = NULL;
switch (f->read_epoch_mode) {
case EPOCH_DIRECT: epoch_str = "direct"; break;
case EPOCH_WAIT: epoch_str = "wait"; break;
case EPOCH_RELATIVE: epoch_str = "relative"; break;
case EPOCH_ABSOLUTE: epoch_str = "absolute"; break;
}
strcatf(&buf, "in=%s, epoch_mode=%s, epoch=%.2f, ",
f->path_in,
epoch_mode_str,
time_to_double(&f->epoch)
f->read.path ? f->read.path : f->read.fmt,
epoch_str,
time_to_double(&f->read_epoch)
);
if (f->rate)
strcatf(&buf, "rate=%.1f, ", f->rate);
if (f->read_rate)
strcatf(&buf, "rate=%.1f, ", f->read_rate);
}
if (f->path_out) {
if (f->write.fmt) {
strcatf(&buf, "out=%s, mode=%s, ",
f->path_out,
f->file_mode
f->write.path ? f->write.path : f->write.fmt,
f->write.mode
);
}
if (f->first.tv_sec || f->first.tv_nsec)
strcatf(&buf, "first=%.2f, ", time_to_double(&f->first));
if (f->read_first.tv_sec || f->read_first.tv_nsec)
strcatf(&buf, "first=%.2f, ", time_to_double(&f->read_first));
if (f->offset.tv_sec || f->offset.tv_nsec)
strcatf(&buf, "offset=%.2f, ", time_to_double(&f->offset));
if (f->read_offset.tv_sec || f->read_offset.tv_nsec)
strcatf(&buf, "offset=%.2f, ", time_to_double(&f->read_offset));
if ((f->first.tv_sec || f->first.tv_nsec) &&
(f->offset.tv_sec || f->offset.tv_nsec)) {
struct timespec eta, now = time_now();
if ((f->read_first.tv_sec || f->read_first.tv_nsec) &&
(f->read_offset.tv_sec || f->read_offset.tv_nsec)) {
struct timespec eta, now = time_now();
eta = time_add(&f->first, &f->offset);
eta = time_diff(&now, &eta);
eta = time_add(&f->read_first, &f->read_offset);
eta = time_diff(&now, &eta);
if (eta.tv_sec || eta.tv_nsec)
strcatf(&buf, "eta=%.2f sec, ", time_to_double(&eta));
@ -80,123 +177,81 @@ char * file_print(struct node *n)
return buf;
}
int file_parse(config_setting_t *cfg, struct node *n)
{
struct file *f = alloc(sizeof(struct file));
const char *out, *in;
const char *epoch_mode;
double epoch_flt;
if (config_setting_lookup_string(cfg, "out", &out)) {
time_t t = time(NULL);
struct tm *tm = localtime(&t);
f->path_out = alloc(FILE_MAX_PATHLEN);
if (strftime(f->path_out, FILE_MAX_PATHLEN, out, tm) == 0)
cerror(cfg, "Invalid path for output");
}
if (config_setting_lookup_string(cfg, "in", &in))
f->path_in = strdup(in);
if (!config_setting_lookup_string(cfg, "file_mode", &f->file_mode))
f->file_mode = "w+";
if (!config_setting_lookup_float(cfg, "send_rate", &f->rate))
f->rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */
if (config_setting_lookup_float(n->cfg, "epoch", &epoch_flt))
f->epoch = time_from_double(epoch_flt);
if (!config_setting_lookup_string(n->cfg, "epoch_mode", &epoch_mode))
epoch_mode = "direct";
if (!strcmp(epoch_mode, "direct"))
f->epoch_mode = EPOCH_DIRECT;
else if (!strcmp(epoch_mode, "wait"))
f->epoch_mode = EPOCH_WAIT;
else if (!strcmp(epoch_mode, "relative"))
f->epoch_mode = EPOCH_RELATIVE;
else if (!strcmp(epoch_mode, "absolute"))
f->epoch_mode = EPOCH_ABSOLUTE;
else
cerror(n->cfg, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode);
n->file = f;
return 0;
}
int file_open(struct node *n)
{
struct file *f = n->file;
struct timespec now = time_now();
if (f->path_in) {
if (f->read.fmt) {
/* Prepare file name */
f->read.chunk = f->read.split ? 0 : -1;
f->read.path = file_format_name(f->read.fmt, &now);
/* Open file */
f->in = fopen(f->path_in, "r");
if (!f->in)
serror("Failed to open file for reading: '%s'", f->path_in);
f->read.handle = file_reopen(&f->read);
if (!f->read.handle)
serror("Failed to open file for reading: '%s'", f->read.path);
/* Create timer */
f->tfd = timerfd_create(CLOCK_REALTIME, 0);
if (f->tfd < 0)
f->read_timer = timerfd_create(CLOCK_REALTIME, 0);
if (f->read_timer < 0)
serror("Failed to create timer");
/* Arm the timer with a fixed rate */
if (f->read_rate) {
struct itimerspec its = {
.it_interval = time_from_double(1 / f->read_rate),
.it_value = { 0, 1 },
};
int ret = timerfd_settime(f->read_timer, 0, &its, NULL);
if (ret)
serror("Failed to start timer");
}
/* Get current time */
struct timespec now = time_now();
/* Get timestamp of first line */
struct msg m;
int ret = msg_fscan(f->in, &m, NULL, NULL); rewind(f->in);
int ret = msg_fscan(f->read.handle, &m, NULL, NULL); rewind(f->read.handle);
if (ret < 0)
error("Failed to read first timestamp of node '%s'", n->name);
f->first = MSG_TS(&m);
f->read_first = MSG_TS(&m);
/* Set offset depending on epoch_mode */
switch (f->epoch_mode) {
/* Set read_offset depending on epoch_mode */
switch (f->read_epoch_mode) {
case EPOCH_DIRECT: /* read first value at now + epoch */
f->offset = time_diff(&f->first, &now);
f->offset = time_add(&f->offset, &f->epoch);
f->read_offset = time_diff(&f->read_first, &now);
f->read_offset = time_add(&f->read_offset, &f->read_epoch);
break;
case EPOCH_WAIT: /* read first value at now + first + epoch */
f->offset = now;
f->offset = time_add(&f->offset, &f->epoch);
f->read_offset = now;
f->read_offset = time_add(&f->read_offset, &f->read_epoch);
break;
case EPOCH_RELATIVE: /* read first value at first + epoch */
f->offset = f->epoch;
f->read_offset = f->read_epoch;
break;
case EPOCH_ABSOLUTE: /* read first value at f->epoch */
f->offset = time_diff(&f->first, &f->epoch);
case EPOCH_ABSOLUTE: /* read first value at f->read_epoch */
f->read_offset = time_diff(&f->read_first, &f->read_epoch);
break;
}
/* Arm the timer with a fixed rate */
if (f->rate) {
struct itimerspec its = {
.it_interval = time_from_double(1 / f->rate),
.it_value = { 1, 0 },
};
int ret = timerfd_settime(f->tfd, 0, &its, NULL);
if (ret)
serror("Failed to start timer");
}
char *buf = file_print(n);
debug(4, "Opened file node '%s': %s", n->name, buf);
free(buf);
}
if (f->path_out) {
/* Open output file */
f->out = fopen(f->path_out, f->file_mode);
if (!f->out)
serror("Failed to open file for writing: '%s'", f->path_out);
if (f->write.fmt) {
/* Prepare file name */
f->write.chunk = f->write.split ? 0 : -1;
f->write.path = file_format_name(f->write.fmt, &now);
/* Open file */
f->write.handle = file_reopen(&f->write);
if (!f->write.handle)
serror("Failed to open file for writing: '%s'", f->write.path);
}
return 0;
@ -205,13 +260,16 @@ int file_open(struct node *n)
int file_close(struct node *n)
{
struct file *f = n->file;
free(f->read.path);
free(f->write.path);
if (f->tfd)
close(f->tfd);
if (f->in)
fclose(f->in);
if (f->out)
fclose(f->out);
if (f->read_timer)
close(f->read_timer);
if (f->read.handle)
fclose(f->read.handle);
if (f->write.handle)
fclose(f->write.handle);
return 0;
}
@ -221,31 +279,54 @@ int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt
int values, flags, i = 0;
struct file *f = n->file;
if (f->in) {
if (f->read.handle) {
for (i = 0; i < cnt; i++) {
struct msg *cur = &pool[(first+i) % poolsize];
/* Get message and timestamp */
values = msg_fscan(f->in, cur, &flags, NULL);
retry: values = msg_fscan(f->read.handle, cur, &flags, NULL);
if (values < 0) {
if (!feof(f->in))
warn("Failed to parse file of node '%s': reason=%d", n->name, values);
if (feof(f->read.handle)) {
if (f->read.split) {
f->read.chunk++;
f->read.handle = file_reopen(&f->read);
if (!f->read.handle)
return 0;
info("Open new input chunk of node '%s': chunk=%u", n->name, f->read.chunk);
}
else {
info("Rewind input file of node '%s'", n->name);
rewind(f->read.handle);
goto retry;
}
}
else
warn("Failed to read messages from node '%s': reason=%d", n->name, values);
return 0;
}
/* Fix missing sequence no */
cur->sequence = f->sequence = (flags & MSG_PRINT_SEQUENCE) ? cur->sequence : f->sequence + 1;
if (!f->rate || ftell(f->in) == 0) {
struct timespec until = time_add(&MSG_TS(cur), &f->offset);
if (timerfd_wait_until(f->tfd, &until) < 0)
cur->sequence = f->read_sequence = (flags & MSG_PRINT_SEQUENCE) ? cur->sequence : f->read_sequence + 1;
if (!f->read_rate || ftell(f->read.handle) == 0) {
struct timespec until = time_add(&MSG_TS(cur), &f->read_offset);
if (timerfd_wait_until(f->read_timer, &until) < 0)
serror("Failed to wait for timer");
/* Update timestamp */
cur->ts.sec = until.tv_sec;
cur->ts.nsec = until.tv_nsec;
}
else { /* Wait with fixed rate delay */
if (timerfd_wait(f->tfd) < 0)
if (timerfd_wait(f->read_timer) < 0)
serror("Failed to wait for timer");
/* Update timestamp */
struct timespec now = time_now();
cur->ts.sec = now.tv_sec;
cur->ts.nsec = now.tv_nsec;
}
}
}
@ -260,17 +341,23 @@ int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cn
int i = 0;
struct file *f = n->file;
if (f->out) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
if (f->write.handle) {
for (i = 0; i < cnt; i++) {
/* Split file if requested */
if ((f->write.split > 0) && ftell(f->write.handle) > f->write.split * (1 << 20)) {
f->write.chunk++;
f->write.handle = file_reopen(&f->write);
info("Splitted output file for node '%s': chunk=%u", n->name, f->write.chunk);
}
struct msg *m = &pool[(first+i) % poolsize];
msg_fprint(f->out, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0);
msg_fprint(f->write.handle, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0);
}
fflush(f->write.handle);
}
else
error("Can not write to node '%s", n->name);
error("Can not write to node '%s'", n->name);
return i;
}

View file

@ -99,7 +99,7 @@ void node_reverse(struct node *n)
break;
#endif
case LOG_FILE:
SWAP(n->file->path_in, n->file->path_out);
SWAP(n->file->read, n->file->write);
break;
default: { }
}
@ -129,10 +129,6 @@ void node_destroy(struct node *n)
rtnl_cls_put(n->socket->tc_classifier);
break;
#endif
case LOG_FILE:
free(n->file->path_in);
free(n->file->path_out);
break;
default: { }
}