diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index 35b873eeb..26dceca0a 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -51,8 +51,6 @@ struct file { char *uri; /**< Real file name. */ } read, write; - int rewind; /**< Should we rewind the file when we reach EOF? */ - enum read_epoch_mode { EPOCH_DIRECT, EPOCH_WAIT, @@ -64,7 +62,8 @@ struct file { struct timespec read_first; /**< The first timestamp in the file file::{read,write}::uri */ struct timespec read_epoch; /**< The epoch timestamp from the configuration. */ struct timespec read_offset; /**< An offset between the timestamp in the input file and the current time */ - + + int read_rewind; /**< Should we rewind the file when we reach EOF? */ int read_timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ double read_rate; /**< The read rate. */ }; diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 6b9245f7e..ed223a634 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -76,6 +76,38 @@ static int file_parse_direction(config_setting_t *cfg, struct file *f, int d) return 0; } +static struct timespec file_calc_read_offset(const struct timespec *first, const struct timespec *epoch, enum read_epoch_mode mode) +{ + /* Get current time */ + struct timespec now = time_now(); + struct timespec offset; + + /* Set read_offset depending on epoch_mode */ + switch (mode) { + case EPOCH_DIRECT: /* read first value at now + epoch */ + offset = time_diff(first, &now); + offset = time_add(&offset, epoch); + break; + + case EPOCH_WAIT: /* read first value at now + first + epoch */ + offset = now; + return time_add(&now, epoch); + break; + + case EPOCH_RELATIVE: /* read first value at first + epoch */ + return *epoch; + break; + + case EPOCH_ABSOLUTE: /* read first value at f->read_epoch */ + return time_diff(first, epoch); + break; + + default: { } + } + + return offset; +} + int file_parse(struct node *n, config_setting_t *cfg) { struct file *f = n->_vd; @@ -94,8 +126,8 @@ int file_parse(struct node *n, config_setting_t *cfg) cerror(cfg_in, "Failed to parse input file for node %s", node_name(n)); /* More read specific settings */ - if (!config_setting_lookup_bool(cfg_in, "rewind", &f->rewind)) - f->rewind = 0; + if (!config_setting_lookup_bool(cfg_in, "rewind", &f->read_rewind)) + f->read_rewind = 1; if (!config_setting_lookup_float(cfg_in, "rate", &f->read_rate)) f->read_rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */ @@ -106,21 +138,22 @@ int file_parse(struct node *n, config_setting_t *cfg) f->read_epoch = time_from_double(epoch_flt); const char *epoch_mode; - if (!config_setting_lookup_string(cfg_in, "epoch_mode", &epoch_mode)) - epoch_mode = "direct"; - - if (!strcmp(epoch_mode, "direct")) - f->read_epoch_mode = EPOCH_DIRECT; - else if (!strcmp(epoch_mode, "wait")) - f->read_epoch_mode = EPOCH_WAIT; - else if (!strcmp(epoch_mode, "relative")) - f->read_epoch_mode = EPOCH_RELATIVE; - else if (!strcmp(epoch_mode, "absolute")) - f->read_epoch_mode = EPOCH_ABSOLUTE; - else if (!strcmp(epoch_mode, "original")) - f->read_epoch_mode = EPOCH_ORIGINAL; + if (config_setting_lookup_string(cfg_in, "epoch_mode", &epoch_mode)) { + if (!strcmp(epoch_mode, "direct")) + f->read_epoch_mode = EPOCH_DIRECT; + else if (!strcmp(epoch_mode, "wait")) + f->read_epoch_mode = EPOCH_WAIT; + else if (!strcmp(epoch_mode, "relative")) + f->read_epoch_mode = EPOCH_RELATIVE; + else if (!strcmp(epoch_mode, "absolute")) + f->read_epoch_mode = EPOCH_ABSOLUTE; + else if (!strcmp(epoch_mode, "original")) + f->read_epoch_mode = EPOCH_ORIGINAL; + else + cerror(cfg_in, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode); + } else - cerror(cfg_in, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode); + f->read_epoch_mode = EPOCH_DIRECT; } n->_vd = f; @@ -143,9 +176,10 @@ char * file_print(struct node *n) case EPOCH_ORIGINAL: epoch_str = "original"; break; } - strcatf(&buf, "in=%s, mode=%s, epoch_mode=%s, epoch=%.2f", + strcatf(&buf, "in=%s, mode=%s, rewind=%u, epoch_mode=%s, epoch=%.2f", f->read.uri ? f->read.uri : f->read.fmt, f->read.mode, + f->read_rewind, epoch_str, time_to_double(&f->read_epoch) ); @@ -207,9 +241,6 @@ int file_start(struct node *n) if (f->read_timer < 0) serror("Failed to create timer"); - /* Get current time */ - struct timespec now = time_now(); - /* Get timestamp of first line */ struct sample s; @@ -217,33 +248,10 @@ int file_start(struct node *n) ret = sample_io_villas_fscan(f->read.handle->file, &s, NULL); if (ret < 0) error("Failed to read first timestamp of node %s", node_name(n)); - - arewind(f->read.handle); f->read_first = s.ts.origin; - - /* Set read_offset depending on epoch_mode */ - switch (f->read_epoch_mode) { - case EPOCH_DIRECT: /* read first value at now + epoch */ - f->read_offset = time_diff(&f->read_first, &now); - f->read_offset = time_add(&f->read_offset, &f->read_epoch); - break; - - case EPOCH_WAIT: /* read first value at now + first + epoch */ - f->read_offset = now; - f->read_offset = time_add(&f->read_offset, &f->read_epoch); - break; - - case EPOCH_RELATIVE: /* read first value at first + epoch */ - f->read_offset = f->read_epoch; - break; - - case EPOCH_ABSOLUTE: /* read first value at f->read_epoch */ - f->read_offset = time_diff(&f->read_first, &f->read_epoch); - break; - - default: { } - } + f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode); + arewind(f->read.handle); } if (f->write.fmt) { @@ -281,6 +289,7 @@ int file_read(struct node *n, struct sample *smps[], unsigned cnt) struct file *f = n->_vd; struct sample *s = smps[0]; int values, flags; + uint64_t ex; assert(f->read.handle); assert(cnt == 1); @@ -288,9 +297,12 @@ int file_read(struct node *n, struct sample *smps[], unsigned cnt) retry: values = sample_io_villas_fscan(f->read.handle->file, s, &flags); /* Get message and timestamp */ if (values < 0) { if (afeof(f->read.handle)) { - if (f->rewind) { + if (f->read_rewind) { info("Rewind input file of node %s", node_name(n)); + + f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode); arewind(f->read.handle); + goto retry; } else { @@ -307,17 +319,23 @@ retry: values = sample_io_villas_fscan(f->read.handle->file, s, &flags); /* Get if (f->read_epoch_mode == EPOCH_ORIGINAL) { return 1; } - else if (!f->read_rate || aftell(f->read.handle) == 0) { - s->ts.origin = time_add(&s->ts.origin, &f->read_offset); - if (timerfd_wait_until(f->read_timer, &s->ts.origin) == 0) - serror("Failed to wait for timer"); - } - else { /* Wait with fixed rate delay */ - if (timerfd_wait(f->read_timer) == 0) - serror("Failed to wait for timer"); + else { + if (!f->read_rate || aftell(f->read.handle) == 0) { + s->ts.origin = time_add(&s->ts.origin, &f->read_offset); + + ex = timerfd_wait_until(f->read_timer, &s->ts.origin); + } + else { /* Wait with fixed rate delay */ + ex = timerfd_wait(f->read_timer); - /* Update timestamp */ - s->ts.origin = time_now(); + s->ts.origin = time_now(); + } + + /* Check for overruns */ + if (ex == 0) + serror("Failed to wait for timer"); + else if (ex != 1) + warn("Overrun: %lu", ex - 1); } return 1;