mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
removed split and splitted settings from file node-type
This commit is contained in:
parent
725a4e544e
commit
f6c204c441
4 changed files with 41 additions and 81 deletions
|
@ -5,7 +5,7 @@ The `file` node-type can be used to log or replay samples to / from disk.
|
|||
## Configuration
|
||||
|
||||
Every `file` node can be configured to only read or write or to do both at the same time.
|
||||
The node configuration is splitted in to groups: `in` and `out`.
|
||||
The node configuration is divided into two sub-groups: `in` and `out`.
|
||||
|
||||
#### `uri` *(string: libcurl URI)*
|
||||
|
||||
|
@ -19,7 +19,9 @@ See [strftime(3)](http://man7.org/linux/man-pages/man3/strftime.3.html) for a li
|
|||
|
||||
uri = "logs/measurements_%Y-%m-%d_%H-%M-%S.log"
|
||||
|
||||
will create a file called: *path_of_working_directory*/logs/measurements_2015-08-09_22-20-50.log
|
||||
will create a file called:
|
||||
|
||||
./logs/measurements_2015-08-09_22-20-50.log
|
||||
|
||||
See below for a description of the file format.
|
||||
|
||||
|
@ -48,12 +50,13 @@ Those variables will also been displayed during the startup phase of the server
|
|||
|
||||
The supported values for `epoch_mode`:
|
||||
|
||||
| `epoch_mode` | `offset` | `start = first + offset` |
|
||||
| -----------: | :-------------------: | :----------------------: |
|
||||
| `direct` | `now - first + epoch` | `now + epoch` |
|
||||
| `wait` | `now + epoch` | `now + first` |
|
||||
| `relative` | `epoch` | `first + epoch` |
|
||||
| `absolute` | `epoch - first` | `epoch` |
|
||||
| `epoch_mode` | `offset` | `start = first + offset` |
|
||||
| -----------: | :-------------------: | :----------------------: |
|
||||
| `direct` | `now - first + epoch` | `now + epoch` |
|
||||
| `wait` | `now + epoch` | `now + first` |
|
||||
| `relative` | `epoch` | `first + epoch` |
|
||||
| `absolute` | `epoch - first` | `epoch` |
|
||||
| `original` | `0` | immeadiatly |
|
||||
|
||||
#### `rate` *(float)*
|
||||
|
||||
|
@ -61,20 +64,6 @@ By default `send_rate` has the value `0` which means that the time between conse
|
|||
|
||||
If this setting has a non-zero value, the default behaviour is overwritten with a fixed rate.
|
||||
|
||||
#### `split` *(integer)*
|
||||
|
||||
Only valid for the `out` group.
|
||||
|
||||
Splits the output file every `split` mega-byte. This setting will append the chunk number to the `uri` setting.
|
||||
|
||||
Example: `data/my_measurements.log_001`
|
||||
|
||||
#### `splitted` *(boolean)*
|
||||
|
||||
Only valid for the `in` group.
|
||||
|
||||
Expects the input data in splitted format.
|
||||
|
||||
### Example
|
||||
|
||||
nodes = {
|
||||
|
@ -94,14 +83,10 @@ Expects the input data in splitted format.
|
|||
rate = 2.0 # A constant rate at which the lines of the input files should be read
|
||||
# A missing or zero value will use the timestamp in the first column
|
||||
# of the file to determine the pause between consecutive lines.
|
||||
|
||||
splitted = false
|
||||
},
|
||||
out = {
|
||||
URI = "logs/output_%F_%T.log" # The output URI accepts all format tokens of (see strftime(3))
|
||||
uri = "logs/output_%F_%T.log" # The output URI accepts all format tokens of (see strftime(3))
|
||||
mode = "a+" # You might want to use "a+" to append to a file
|
||||
|
||||
split = 100, # Split output file every 100 MB
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -132,10 +117,10 @@ The columns are defined as follows:
|
|||
|
||||
This example shows a dump with three values per sample:
|
||||
|
||||
1438959964.162102394 6 3.489760 -1.882725 0.860070
|
||||
1438959964.261677582 7 2.375948 -2.204084 0.907518
|
||||
1438959964.361622787 8 3.620115 -1.359236 -0.622333
|
||||
1438959964.461907066 9 5.844254 -0.966527 -0.628751
|
||||
1438959964.561499526 10 6.317059 -1.716363 0.351925
|
||||
1438959964.661578339 11 6.471288 -0.159862 0.123948
|
||||
1438959964.761956859 12 7.365932 -1.488268 -0.780568
|
||||
1438959964.162102394(6) 3.489760 -1.882725 0.860070
|
||||
1438959964.261677582(7) 2.375948 -2.204084 0.907518
|
||||
1438959964.361622787(8) 3.620115 -1.359236 -0.622333
|
||||
1438959964.461907066(9) 5.844254 -0.966527 -0.628751
|
||||
1438959964.561499526(10) 6.317059 -1.716363 0.351925
|
||||
1438959964.661578339(11) 6.471288 -0.159862 0.123948
|
||||
1438959964.761956859(12) 7.365932 -1.488268 -0.780568
|
||||
|
|
|
@ -94,14 +94,10 @@ nodes = {
|
|||
rate = 2.0 # A constant rate at which the lines of the input files should be read
|
||||
# A missing or zero value will use the timestamp in the first column
|
||||
# of the file to determine the pause between consecutive lines.
|
||||
|
||||
splitted = false
|
||||
},
|
||||
out = {
|
||||
uri = "logs/output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3))
|
||||
mode = "a+" # You might want to use "a+" to append to a file
|
||||
|
||||
split = 100, # Split output file every 100 MB
|
||||
}
|
||||
},
|
||||
gtfpga_node = {
|
||||
|
|
|
@ -33,16 +33,16 @@ struct file {
|
|||
const char *fmt; /**< Format string for file name. */
|
||||
|
||||
char *uri; /**< Real file name */
|
||||
|
||||
int chunk; /**< Current chunk number. */
|
||||
int split; /**< Split file every file::split mega bytes. */
|
||||
} read, write;
|
||||
|
||||
int rewind; /**< Should we rewind the file when we reach EOF? */
|
||||
|
||||
enum read_epoch_mode {
|
||||
EPOCH_DIRECT,
|
||||
EPOCH_WAIT,
|
||||
EPOCH_RELATIVE,
|
||||
EPOCH_ABSOLUTE
|
||||
EPOCH_ABSOLUTE,
|
||||
EPOCH_ORIGINAL
|
||||
} read_epoch_mode; /**< Specifies how file::offset is calculated. */
|
||||
|
||||
struct timespec read_first; /**< The first timestamp in the file file::{read,write}::uri */
|
||||
|
|
|
@ -41,19 +41,10 @@ static char * file_format_name(const char *format, struct timespec *ts)
|
|||
|
||||
static AFILE * file_reopen(struct file_direction *dir)
|
||||
{
|
||||
char buf[FILE_MAX_PATHLEN];
|
||||
const char *uri = buf;
|
||||
|
||||
/* Append chunk number to filename */
|
||||
if (dir->chunk >= 0)
|
||||
snprintf(buf, FILE_MAX_PATHLEN, "%s_%03u", dir->uri, dir->chunk);
|
||||
else
|
||||
uri = dir->uri;
|
||||
|
||||
if (dir->handle)
|
||||
afclose(dir->handle);
|
||||
|
||||
return afopen(uri, dir->mode);
|
||||
return afopen(dir->uri, dir->mode);
|
||||
}
|
||||
|
||||
static int file_parse_direction(config_setting_t *cfg, struct file *f, int d)
|
||||
|
@ -79,12 +70,6 @@ int file_parse(struct node *n, config_setting_t *cfg)
|
|||
if (cfg_out) {
|
||||
if (file_parse_direction(cfg_out, f, FILE_WRITE))
|
||||
cerror(cfg_out, "Failed to parse output file for node %s", node_name(n));
|
||||
|
||||
/* More write specific settings */
|
||||
if (config_setting_lookup_int(cfg_out, "split", &f->write.split))
|
||||
f->write.split <<= 20; /* in MiB */
|
||||
else
|
||||
f->write.split = -1; /* Save all samples in a single file */
|
||||
}
|
||||
|
||||
cfg_in = config_setting_get_member(cfg, "in");
|
||||
|
@ -93,8 +78,8 @@ int file_parse(struct node *n, config_setting_t *cfg)
|
|||
cerror(cfg_in, "Failed to parse input file for node %s", node_name(n));
|
||||
|
||||
/* More read specific settings */
|
||||
if (!config_setting_lookup_bool(cfg_in, "splitted", &f->read.split))
|
||||
f->read.split = 0; /* Input files are suffixed with split indizes (.000, .001) */
|
||||
if (!config_setting_lookup_bool(cfg_in, "rewind", &f->rewind))
|
||||
f->rewind = 0;
|
||||
if (!config_setting_lookup_float(cfg_in, "rate", &f->read_rate))
|
||||
f->read_rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */
|
||||
|
||||
|
@ -116,6 +101,8 @@ int file_parse(struct node *n, config_setting_t *cfg)
|
|||
f->read_epoch_mode = EPOCH_RELATIVE;
|
||||
else if (!strcmp(epoch_mode, "absolute"))
|
||||
f->read_epoch_mode = EPOCH_ABSOLUTE;
|
||||
else if (!strcmp(epoch_mode, "original"))
|
||||
f->read_epoch_mode = EPOCH_ORIGINAL;
|
||||
else
|
||||
cerror(cfg_in, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode);
|
||||
}
|
||||
|
@ -137,6 +124,7 @@ char * file_print(struct node *n)
|
|||
case EPOCH_WAIT: epoch_str = "wait"; break;
|
||||
case EPOCH_RELATIVE: epoch_str = "relative"; break;
|
||||
case EPOCH_ABSOLUTE: epoch_str = "absolute"; break;
|
||||
case EPOCH_ORIGINAL: epoch_str = "original"; break;
|
||||
}
|
||||
|
||||
strcatf(&buf, "in=%s, epoch_mode=%s, epoch=%.2f, ",
|
||||
|
@ -187,7 +175,6 @@ int file_start(struct node *n)
|
|||
|
||||
if (f->read.fmt) {
|
||||
/* Prepare file name */
|
||||
f->read.chunk = f->read.split ? 0 : -1;
|
||||
f->read.uri = file_format_name(f->read.fmt, &now);
|
||||
|
||||
/* Open file */
|
||||
|
@ -232,12 +219,13 @@ int file_start(struct node *n)
|
|||
case EPOCH_ABSOLUTE: /* read first value at f->read_epoch */
|
||||
f->read_offset = time_diff(&f->read_first, &f->read_epoch);
|
||||
break;
|
||||
|
||||
default: { }
|
||||
}
|
||||
}
|
||||
|
||||
if (f->write.fmt) {
|
||||
/* Prepare file name */
|
||||
f->write.chunk = f->write.split ? 0 : -1;
|
||||
f->write.uri = file_format_name(f->write.fmt, &now);
|
||||
|
||||
/* Open file */
|
||||
|
@ -278,27 +266,26 @@ int file_read(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
retry: values = sample_io_villas_fscan(f->read.handle->file, s, &flags); /* Get message and timestamp */
|
||||
if (values < 0) {
|
||||
if (afeof(f->read.handle)) {
|
||||
if (f->read.split) {
|
||||
f->read.chunk++;
|
||||
f->read.handle = file_reopen(&f->read);
|
||||
if (!f->read.handle)
|
||||
return 0;
|
||||
|
||||
info("Open new input chunk of node %s: %d", node_name(n), f->read.chunk);
|
||||
}
|
||||
else {
|
||||
if (f->rewind) {
|
||||
info("Rewind input file of node %s", node_name(n));
|
||||
arewind(f->read.handle);
|
||||
goto retry;
|
||||
}
|
||||
else {
|
||||
info("Reached end-of-file");
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
}
|
||||
else
|
||||
warn("Failed to read messages from node %s: reason=%d", node_name(n), values);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!f->read_rate || aftell(f->read.handle) == 0) {
|
||||
|
||||
if (f->read_epoch_mode == EPOCH_ORIGINAL) {
|
||||
return 1;
|
||||
}
|
||||
else if (!f->read_rate || aftell(f->read.handle) == 0) {
|
||||
s->ts.origin = time_add(&s->ts.origin, &f->read_offset);
|
||||
if (timerfd_wait_until(f->read_timer, &s->ts.origin) == 0)
|
||||
serror("Failed to wait for timer");
|
||||
|
@ -322,14 +309,6 @@ int file_write(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
assert(f->write.handle);
|
||||
assert(cnt == 1);
|
||||
|
||||
/* Split file if requested */
|
||||
if (f->write.split > 0 && aftell(f->write.handle) > f->write.split) {
|
||||
f->write.chunk++;
|
||||
f->write.handle = file_reopen(&f->write);
|
||||
|
||||
info("Splitted output node %s: chunk=%u", node_name(n), f->write.chunk);
|
||||
}
|
||||
|
||||
sample_io_villas_fprint(f->write.handle->file, s, SAMPLE_IO_ALL & ~SAMPLE_IO_OFFSET);
|
||||
afflush(f->write.handle);
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue