diff --git a/doc/nodes/File.md b/doc/nodes/File.md index 42839398d..659ed2bf2 100644 --- a/doc/nodes/File.md +++ b/doc/nodes/File.md @@ -5,7 +5,7 @@ The `file` node-type can be used to log or replay samples to / from disk. ## Configuration Every `file` node can be configured to only read or write or to do both at the same time. -The node configuration is splitted in to groups: `in` and `out`. +The node configuration is divided into two sub-groups: `in` and `out`. #### `uri` *(string: libcurl URI)* @@ -19,7 +19,9 @@ See [strftime(3)](http://man7.org/linux/man-pages/man3/strftime.3.html) for a li uri = "logs/measurements_%Y-%m-%d_%H-%M-%S.log" -will create a file called: *path_of_working_directory*/logs/measurements_2015-08-09_22-20-50.log +will create a file called: + + ./logs/measurements_2015-08-09_22-20-50.log See below for a description of the file format. @@ -48,12 +50,13 @@ Those variables will also been displayed during the startup phase of the server The supported values for `epoch_mode`: - | `epoch_mode` | `offset` | `start = first + offset` | - | -----------: | :-------------------: | :----------------------: | - | `direct` | `now - first + epoch` | `now + epoch` | - | `wait` | `now + epoch` | `now + first` | - | `relative` | `epoch` | `first + epoch` | - | `absolute` | `epoch - first` | `epoch` | +| `epoch_mode` | `offset` | `start = first + offset` | +| -----------: | :-------------------: | :----------------------: | +| `direct` | `now - first + epoch` | `now + epoch` | +| `wait` | `now + epoch` | `now + first` | +| `relative` | `epoch` | `first + epoch` | +| `absolute` | `epoch - first` | `epoch` | +| `original` | `0` | immeadiatly | #### `rate` *(float)* @@ -61,20 +64,6 @@ By default `send_rate` has the value `0` which means that the time between conse If this setting has a non-zero value, the default behaviour is overwritten with a fixed rate. -#### `split` *(integer)* - -Only valid for the `out` group. - -Splits the output file every `split` mega-byte. This setting will append the chunk number to the `uri` setting. - -Example: `data/my_measurements.log_001` - -#### `splitted` *(boolean)* - -Only valid for the `in` group. - -Expects the input data in splitted format. - ### Example nodes = { @@ -94,14 +83,10 @@ Expects the input data in splitted format. rate = 2.0 # A constant rate at which the lines of the input files should be read # A missing or zero value will use the timestamp in the first column # of the file to determine the pause between consecutive lines. - - splitted = false }, out = { - URI = "logs/output_%F_%T.log" # The output URI accepts all format tokens of (see strftime(3)) + uri = "logs/output_%F_%T.log" # The output URI accepts all format tokens of (see strftime(3)) mode = "a+" # You might want to use "a+" to append to a file - - split = 100, # Split output file every 100 MB } } } @@ -132,10 +117,10 @@ The columns are defined as follows: This example shows a dump with three values per sample: - 1438959964.162102394 6 3.489760 -1.882725 0.860070 - 1438959964.261677582 7 2.375948 -2.204084 0.907518 - 1438959964.361622787 8 3.620115 -1.359236 -0.622333 - 1438959964.461907066 9 5.844254 -0.966527 -0.628751 - 1438959964.561499526 10 6.317059 -1.716363 0.351925 - 1438959964.661578339 11 6.471288 -0.159862 0.123948 - 1438959964.761956859 12 7.365932 -1.488268 -0.780568 + 1438959964.162102394(6) 3.489760 -1.882725 0.860070 + 1438959964.261677582(7) 2.375948 -2.204084 0.907518 + 1438959964.361622787(8) 3.620115 -1.359236 -0.622333 + 1438959964.461907066(9) 5.844254 -0.966527 -0.628751 + 1438959964.561499526(10) 6.317059 -1.716363 0.351925 + 1438959964.661578339(11) 6.471288 -0.159862 0.123948 + 1438959964.761956859(12) 7.365932 -1.488268 -0.780568 diff --git a/etc/example.conf b/etc/example.conf index acab53b2d..e5dc28f83 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -94,14 +94,10 @@ nodes = { rate = 2.0 # A constant rate at which the lines of the input files should be read # A missing or zero value will use the timestamp in the first column # of the file to determine the pause between consecutive lines. - - splitted = false }, out = { uri = "logs/output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3)) mode = "a+" # You might want to use "a+" to append to a file - - split = 100, # Split output file every 100 MB } }, gtfpga_node = { diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index d8355421b..54f00d91d 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -33,16 +33,16 @@ struct file { const char *fmt; /**< Format string for file name. */ char *uri; /**< Real file name */ - - int chunk; /**< Current chunk number. */ - int split; /**< Split file every file::split mega bytes. */ } read, write; + + int rewind; /**< Should we rewind the file when we reach EOF? */ enum read_epoch_mode { EPOCH_DIRECT, EPOCH_WAIT, EPOCH_RELATIVE, - EPOCH_ABSOLUTE + EPOCH_ABSOLUTE, + EPOCH_ORIGINAL } read_epoch_mode; /**< Specifies how file::offset is calculated. */ struct timespec read_first; /**< The first timestamp in the file file::{read,write}::uri */ diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 0b540f486..dd0d28993 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -41,19 +41,10 @@ static char * file_format_name(const char *format, struct timespec *ts) static AFILE * file_reopen(struct file_direction *dir) { - char buf[FILE_MAX_PATHLEN]; - const char *uri = buf; - - /* Append chunk number to filename */ - if (dir->chunk >= 0) - snprintf(buf, FILE_MAX_PATHLEN, "%s_%03u", dir->uri, dir->chunk); - else - uri = dir->uri; - if (dir->handle) afclose(dir->handle); - return afopen(uri, dir->mode); + return afopen(dir->uri, dir->mode); } static int file_parse_direction(config_setting_t *cfg, struct file *f, int d) @@ -79,12 +70,6 @@ int file_parse(struct node *n, config_setting_t *cfg) if (cfg_out) { if (file_parse_direction(cfg_out, f, FILE_WRITE)) cerror(cfg_out, "Failed to parse output file for node %s", node_name(n)); - - /* More write specific settings */ - if (config_setting_lookup_int(cfg_out, "split", &f->write.split)) - f->write.split <<= 20; /* in MiB */ - else - f->write.split = -1; /* Save all samples in a single file */ } cfg_in = config_setting_get_member(cfg, "in"); @@ -93,8 +78,8 @@ int file_parse(struct node *n, config_setting_t *cfg) cerror(cfg_in, "Failed to parse input file for node %s", node_name(n)); /* More read specific settings */ - if (!config_setting_lookup_bool(cfg_in, "splitted", &f->read.split)) - f->read.split = 0; /* Input files are suffixed with split indizes (.000, .001) */ + if (!config_setting_lookup_bool(cfg_in, "rewind", &f->rewind)) + f->rewind = 0; if (!config_setting_lookup_float(cfg_in, "rate", &f->read_rate)) f->read_rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */ @@ -116,6 +101,8 @@ int file_parse(struct node *n, config_setting_t *cfg) f->read_epoch_mode = EPOCH_RELATIVE; else if (!strcmp(epoch_mode, "absolute")) f->read_epoch_mode = EPOCH_ABSOLUTE; + else if (!strcmp(epoch_mode, "original")) + f->read_epoch_mode = EPOCH_ORIGINAL; else cerror(cfg_in, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode); } @@ -137,6 +124,7 @@ char * file_print(struct node *n) case EPOCH_WAIT: epoch_str = "wait"; break; case EPOCH_RELATIVE: epoch_str = "relative"; break; case EPOCH_ABSOLUTE: epoch_str = "absolute"; break; + case EPOCH_ORIGINAL: epoch_str = "original"; break; } strcatf(&buf, "in=%s, epoch_mode=%s, epoch=%.2f, ", @@ -187,7 +175,6 @@ int file_start(struct node *n) if (f->read.fmt) { /* Prepare file name */ - f->read.chunk = f->read.split ? 0 : -1; f->read.uri = file_format_name(f->read.fmt, &now); /* Open file */ @@ -232,12 +219,13 @@ int file_start(struct node *n) case EPOCH_ABSOLUTE: /* read first value at f->read_epoch */ f->read_offset = time_diff(&f->read_first, &f->read_epoch); break; + + default: { } } } if (f->write.fmt) { /* Prepare file name */ - f->write.chunk = f->write.split ? 0 : -1; f->write.uri = file_format_name(f->write.fmt, &now); /* Open file */ @@ -278,27 +266,26 @@ int file_read(struct node *n, struct sample *smps[], unsigned cnt) retry: values = sample_io_villas_fscan(f->read.handle->file, s, &flags); /* Get message and timestamp */ if (values < 0) { if (afeof(f->read.handle)) { - if (f->read.split) { - f->read.chunk++; - f->read.handle = file_reopen(&f->read); - if (!f->read.handle) - return 0; - - info("Open new input chunk of node %s: %d", node_name(n), f->read.chunk); - } - else { + if (f->rewind) { info("Rewind input file of node %s", node_name(n)); arewind(f->read.handle); goto retry; } + else { + info("Reached end-of-file"); + exit(EXIT_SUCCESS); + } } else warn("Failed to read messages from node %s: reason=%d", node_name(n), values); return 0; } - - if (!f->read_rate || aftell(f->read.handle) == 0) { + + if (f->read_epoch_mode == EPOCH_ORIGINAL) { + return 1; + } + else if (!f->read_rate || aftell(f->read.handle) == 0) { s->ts.origin = time_add(&s->ts.origin, &f->read_offset); if (timerfd_wait_until(f->read_timer, &s->ts.origin) == 0) serror("Failed to wait for timer"); @@ -322,14 +309,6 @@ int file_write(struct node *n, struct sample *smps[], unsigned cnt) assert(f->write.handle); assert(cnt == 1); - /* Split file if requested */ - if (f->write.split > 0 && aftell(f->write.handle) > f->write.split) { - f->write.chunk++; - f->write.handle = file_reopen(&f->write); - - info("Splitted output node %s: chunk=%u", node_name(n), f->write.chunk); - } - sample_io_villas_fprint(f->write.handle->file, s, SAMPLE_IO_ALL & ~SAMPLE_IO_OFFSET); afflush(f->write.handle);