diff --git a/server/etc/example.conf b/server/etc/example.conf index c15dc2915..52920e29b 100644 --- a/server/etc/example.conf +++ b/server/etc/example.conf @@ -88,13 +88,20 @@ nodes = { in = "logs/file_input.log", # These options specify the path prefix where the the files are stored out = "logs/file_output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3)) + + epoch_mode = "now" # One of: + # now (default) + # relative + # absolute + + epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0): + # - epoch_mode = now: The first value is read at: _now_ + epoch seconds. + # - epoch_mode = relative: The first value is read at _start_ + `epoch` seconds. + # - epoch_mode = absolute: The first value is read at epoch seconds after 1970-01-01 00:00:00. - timestamp = true # Prepend a Unix timestamp in front of every message - # The accuracy of this timestamp depends on the system clock which - # should be synchronized via NTP / PTP! - # The timestamp describes the point of time in which the value is - # written to the file. This does not necessarily have to coincide - # with the time in which the message was received!!! + rate = 2.0 # A constant rate at which the lines of the input files should be read + # A missing or zero value will use the timestamp in the first column + # of the file to determine the pause between consecutive lines. }, gtfpga_node = { type = "gtfpga", diff --git a/server/include/file.h b/server/include/file.h index f80cee969..4d0220199 100644 --- a/server/include/file.h +++ b/server/include/file.h @@ -6,12 +6,12 @@ * @author Steffen Vogel * @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. - * Unauthorized copying of this file, via any medium is strictly prohibited. + * Unauthorized copying of this file, via any medium is strictly prohibited. * * @addtogroup file File-IO node type * @{ *********************************************************************************/ - + #ifndef _FILE_H_ #define _FILE_H_ @@ -23,13 +23,22 @@ struct file { FILE *in; FILE *out; - - const char *path_in; + + char *path_in; char *path_out; - const char *mode; /**< The mode for fopen() which is used for the out file. */ - + const char *file_mode; /**< The mode for fopen() which is used for the out file. */ + + enum epoch_mode { + EPOCH_NOW, + EPOCH_RELATIVE, + EPOCH_ABSOLUTE + } epoch_mode; /**< Specifies how file::offset is calculated. */ + + struct timespec start; /**< The first timestamp of the input file. */ + struct timespec epoch; /**< The epoch timestamp from the configuration. */ struct timespec offset; /**< An offset between the timestamp in the input file and the current time */ + double rate; /**< The sending rate. */ int tfd; /**< Timer file descriptor. Blocks until 1/rate seconds are elapsed. */ }; @@ -58,4 +67,4 @@ int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt /** @see node_vtable::write */ int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt); -#endif /** _FILE_H_ @} */ \ No newline at end of file +#endif /** _FILE_H_ @} */ diff --git a/server/src/file.c b/server/src/file.c index 7e7fb02ab..992e92603 100644 --- a/server/src/file.c +++ b/server/src/file.c @@ -9,7 +9,9 @@ *********************************************************************************/ #include +#include +#include "msg.h" #include "file.h" #include "utils.h" #include "timing.h" @@ -28,15 +30,18 @@ int file_print(struct node *n, char *buf, int len) { struct file *f = n->file; - return snprintf(buf, len, "in=%s, out=%s, mode=%s, rate=%.1f", - f->path_in, f->path_out, f->mode, f->rate); + return snprintf(buf, len, "in=%s, out=%s, mode=%s, rate=%.1f, epoch_mode=%u, epoch=%.0f", + f->path_in, f->path_out, f->file_mode, f->rate, f->epoch_mode, time_to_double(&f->epoch)); } int file_parse(config_setting_t *cfg, struct node *n) { struct file *f = alloc(sizeof(struct file)); - const char *out; + const char *out, *in; + const char *epoch_mode; + double epoch_flt; + if (config_setting_lookup_string(cfg, "out", &out)) { time_t t = time(NULL); struct tm *tm = localtime(&t); @@ -46,13 +51,29 @@ int file_parse(config_setting_t *cfg, struct node *n) cerror(cfg, "Invalid path for output"); } - config_setting_lookup_string(cfg, "in", &f->path_in); + if (config_setting_lookup_string(cfg, "in", &in)) + f->path_in = strdup(in); - if (!config_setting_lookup_string(cfg, "mode", &f->mode)) - f->mode = "w+"; + if (!config_setting_lookup_string(cfg, "mode", &f->file_mode)) + f->file_mode = "w+"; if (!config_setting_lookup_float(cfg, "rate", &f->rate)) - f->rate = 0; + f->rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */ + + if (config_setting_lookup_float(n->cfg, "epoch", &epoch_flt)) + f->epoch = time_from_double(epoch_flt); + + if (!config_setting_lookup_string(n->cfg, "epoch_mode", &epoch_mode)) + epoch_mode = "now"; + + if (!strcmp(epoch_mode, "now")) + f->epoch_mode = EPOCH_NOW; + else if (!strcmp(epoch_mode, "relative")) + f->epoch_mode = EPOCH_RELATIVE; + else if (!strcmp(epoch_mode, "absolute")) + f->epoch_mode = EPOCH_ABSOLUTE; + else + cerror(n->cfg, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode); n->file = f; @@ -68,30 +89,58 @@ int file_open(struct node *n) if (!f->in) serror("Failed to open file for reading: '%s'", f->path_in); - f->tfd = timerfd_create(CLOCK_MONOTONIC, 0); + f->tfd = timerfd_create(CLOCK_REALTIME, 0); if (f->tfd < 0) serror("Failed to create timer"); /* Arm the timer */ - struct itimerspec its; if (f->rate) { /* Send with fixed rate */ - its.it_interval = time_from_double(1 / f->rate); - its.it_value = (struct timespec) { 1, 0 }; + struct itimerspec its = { + .it_interval = time_from_double(1 / f->rate), + .it_value = { 1, 0 }, + }; + + int ret = timerfd_settime(f->tfd, 0, &its, NULL); + if (ret) + serror("Failed to start timer"); } else { - /* Read timestamp from first line to get an epoch offset */ - time_fscan(f->in, &f->offset); - rewind(f->in); - } + /* Get current time */ + struct timespec now, tmp; + clock_gettime(CLOCK_REALTIME, &now); - int ret = timerfd_settime(f->tfd, 0, &its, NULL); - if (ret) - serror("Failed to start timer"); + /* Get timestamp of first sample */ + time_fscan(f->in, &f->start); rewind(f->in); + + /* Set offset depending on epoch_mode */ + switch (f->epoch_mode) { + case EPOCH_NOW: /* read first value at f->now + f->epoch */ + tmp = time_diff(&f->start, &now); + f->offset = time_add(&tmp, &f->epoch); + break; + case EPOCH_RELATIVE: /* read first value at f->start + f->epoch */ + f->offset = f->epoch; + break; + case EPOCH_ABSOLUTE: /* read first value at f->epoch */ + f->offset = time_diff(&f->start, &f->epoch); + break; + } + + tmp = time_add(&f->start, &f->offset); + tmp = time_diff(&now, &tmp); + + debug(5, "Opened file '%s' as input for node '%s': start=%.2f, offset=%.2f, eta=%.2f", + f->path_in, n->name, + time_to_double(&f->start), + time_to_double(&f->offset), + time_to_double(&tmp) + ); + } } if (f->path_out) { - f->out = fopen(f->path_out, f->mode); + f->out = fopen(f->path_out, f->file_mode); if (!f->out) serror("Failed to open file for writing: '%s'", f->path_out); } @@ -110,8 +159,6 @@ int file_close(struct node *n) if (f->out) fclose(f->out); - free(f->path_out); - return 0; } @@ -121,18 +168,23 @@ int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt struct file *f = n->file; if (f->in) { - struct timespec ts; + for (i = 0; i < cnt; i++) { + struct msg *cur = &pool[(first+i) % poolsize]; + msg_fscan(f->in, cur); - for (i = 0; i < cnt; i++) - msg_fscan(f->in, &pool[(first+i) % poolsize]); - - if (f->rate) - timerfd_wait(f->tfd); - else - timerfd_wait_until(f->tfd, &ts); + if (f->rate) { + if (timerfd_wait(f->tfd) < 0) + serror("Failed to wait for timer"); + } + else { + struct timespec until = time_add(&MSG_TS(cur), &f->offset); + if (timerfd_wait_until(f->tfd, &until) < 0) + serror("Failed to wait for timer"); + } + } } else - warn("Can not read from node '%s'", n->name); + error("Can not read from node '%s'", n->name); return i; } @@ -150,7 +202,7 @@ int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cn msg_fprint(f->out, &pool[(first+i) % poolsize]); } else - warn("Can not write to node '%s", n->name); + error("Can not write to node '%s", n->name); return i; } diff --git a/server/src/msg.c b/server/src/msg.c index 53dca4ad4..e737175ac 100644 --- a/server/src/msg.c +++ b/server/src/msg.c @@ -65,8 +65,12 @@ int msg_fscan(FILE *f, struct msg *m) char line[MSG_VALUES * 16]; char *next, *ptr = line; - if (!fgets(line, sizeof(line), f)) - return 0; +retry: if (fgets(line, sizeof(line), f) == NULL) { + if (feof(f)) + goto retry; + else + return -1; /* An error occured */ + } m->ts.sec = (uint32_t) strtoul(ptr, &ptr, 10); ptr++; m->ts.nsec = (uint32_t) strtoul(ptr, &ptr, 10); diff --git a/server/src/node.c b/server/src/node.c index 73ad319e1..94d3e9281 100644 --- a/server/src/node.c +++ b/server/src/node.c @@ -121,6 +121,9 @@ void node_reverse(struct node *n) case BSD_SOCKET: SWAP(n->socket->remote, n->socket->local); break; + case LOG_FILE: + SWAP(n->file->path_in, n->file->path_out); + break; default: { } } } @@ -136,6 +139,10 @@ void node_destroy(struct node *n) case BSD_SOCKET: free(n->socket->netem); break; + case LOG_FILE: + free(n->file->path_in); + free(n->file->path_out); + break; default: { } }