diff --git a/documentation/clients/File.md b/documentation/clients/File.md index 75428cd18..3015ea6f9 100644 --- a/documentation/clients/File.md +++ b/documentation/clients/File.md @@ -4,16 +4,14 @@ The `file` node-type can be used to log or replay samples to / from disk. ## Configuration -Every `file` node supports the following special settings: +Every `file` node can be configured to only read or write or to do both at the same time. +The node configuration is splitted in to groups: `in` and `out`. -#### `in` *(string: filesystem path)* +#### `path` *(string: filesystem path)* + +Specifies the path to a file from which is written to or read from (depending in which group is used). -Specifies the path to a file which contains data for replaying. See below for a description of the file format. - -#### `out` *(string: filesystem path)* - -Specifies the path to a file where samples will be written to. This setting allows to add special paceholders for time and date values. See [strftime(3)](http://man7.org/linux/man-pages/man3/strftime.3.html) for a list of supported placeholder. @@ -25,7 +23,7 @@ will create a file called: *path_of_working_directory*/logs/measurements_2015-08 See below for a description of the file format. -#### `file_mode` *(string)* +#### `mode` *(string)* Specifies the mode which should be used to open the output file. See [open(2)](http://man7.org/linux/man-pages/man2/open.2.html) for an explanation of allowed values. @@ -57,38 +55,53 @@ The supported values for `epoch_mode`: | `relative` | `epoch` | `first + epoch` | | `absolute` | `epoch - first` | `epoch` | -#### `send_rate` *(float)* +#### `rate` *(float)* By default `send_rate` has the value `0` which means that the time between consecutive samples is the same as in the `in` file based on the timestamps in the first column. If this setting has a non-zero value, the default behaviour is overwritten with a fixed rate. +#### `split` *(integer)* + +Only valid for the `out` group. + +Splits the output file every `split` mega-byte. This setting will append the chunk number to the `path` setting. + +Example: `data/my_measurements.log_001` + +#### `splitted` *(boolean)* + +Only valid for the `in` group. + +Expects the input data in splitted format. + ### Example file_node = { type = "file", ### The following settings are specific to the file node-type!! ### - mode = "w+", # The mode in which files should be opened (see open(2)) - # You might want to use "a+" to append to a file - - in = "logs/file_input.log", # These options specify the path prefix where the the files are stored - out = "logs/file_output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3)) - epoch_mode = "direct" # One of: - # direct (default) - # wait - # relative - # absolute + in = { + path = "logs/input.log", # These options specify the path prefix where the the files are stored + mode = "w+", # The mode in which files should be opened (see open(2)) + + epoch_mode = "direct" # One of: direct (default), wait, relative, absolute + epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0). + # Consult the documentation of a full explanation - epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0): - # - epoch_mode = now: The first value is read at: _now_ + epoch seconds. - # - epoch_mode = relative: The first value is read at _start_ + `epoch` seconds. - # - epoch_mode = absolute: The first value is read at epoch seconds after 1970-01-01 00:00:00. - - rate = 2.0 # A constant rate at which the lines of the input files should be read + rate = 2.0 # A constant rate at which the lines of the input files should be read # A missing or zero value will use the timestamp in the first column # of the file to determine the pause between consecutive lines. + + splitted = false + }, + out = { + path = "logs/output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3)) + mode = "a+" # You might want to use "a+" to append to a file + + split = 100, # Split output file every 100 MB + } } ## File Format diff --git a/server/etc/example.conf b/server/etc/example.conf index 8cd0e679e..916fc226a 100644 --- a/server/etc/example.conf +++ b/server/etc/example.conf @@ -87,20 +87,26 @@ nodes = { ### The following settings are specific to the file node-type!! ### - mode = "w+", # The mode in which files should be opened (see open(2)) - # You might want to use "a+" to append to a file - - in = "logs/file_input.log", # These options specify the path prefix where the the files are stored - out = "logs/file_output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3)) - - epoch_mode = "direct" # One of: direct (default), wait, relative, absolute - epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0). + in = { + path = "logs/input.log", # These options specify the path prefix where the the files are stored + mode = "w+", # The mode in which files should be opened (see open(2)) + + epoch_mode = "direct" # One of: direct (default), wait, relative, absolute + epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0). # Consult the documentation of a full explanation - - - rate = 2.0 # A constant rate at which the lines of the input files should be read + + rate = 2.0 # A constant rate at which the lines of the input files should be read # A missing or zero value will use the timestamp in the first column # of the file to determine the pause between consecutive lines. + + splitted = false + }, + out = { + path = "logs/output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3)) + mode = "a+" # You might want to use "a+" to append to a file + + split = 100, # Split output file every 100 MB + } }, gtfpga_node = { type = "gtfpga", diff --git a/server/include/file.h b/server/include/file.h index ca633b749..c414f9904 100644 --- a/server/include/file.h +++ b/server/include/file.h @@ -20,31 +20,40 @@ #include "node.h" -#define FILE_MAX_PATHLEN 128 +#define FILE_MAX_PATHLEN 512 + +enum { + FILE_READ, + FILE_WRITE +}; struct file { - FILE *in; - FILE *out; + struct file_direction { + FILE *handle; /**< libc: stdio file handle */ - char *path_in; - char *path_out; + const char *mode; /**< libc: fopen() mode */ + const char *fmt; /**< Format string for file name. */ - const char *file_mode; /**< The mode for fopen() which is used for the out file. */ + char *path; /**< Real file name */ + + int chunk; /**< Current chunk number. */ + int split; /**< Split file every file::split mega bytes. */ + } read, write; - enum epoch_mode { + enum read_epoch_mode { EPOCH_DIRECT, EPOCH_WAIT, EPOCH_RELATIVE, EPOCH_ABSOLUTE - } epoch_mode; /**< Specifies how file::offset is calculated. */ + } read_epoch_mode; /**< Specifies how file::offset is calculated. */ - struct timespec first; /**< The first timestamp in the file file::path_in */ - struct timespec epoch; /**< The epoch timestamp from the configuration. */ - struct timespec offset; /**< An offset between the timestamp in the input file and the current time */ + struct timespec read_first; /**< The first timestamp in the file file::path_in */ + struct timespec read_epoch; /**< The epoch timestamp from the configuration. */ + struct timespec read_offset; /**< An offset between the timestamp in the input file and the current time */ - double rate; /**< The sending rate. */ - int tfd; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ - int sequence; /**< Last sequence of this node */ + int read_sequence; /**< Sequence number of last message which has been written to file::path_out */ + int read_timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ + double read_rate; /**< The read rate. */ }; /** @see node_vtable::init */ diff --git a/server/src/file.c b/server/src/file.c index f3b626320..8fe33ad5a 100644 --- a/server/src/file.c +++ b/server/src/file.c @@ -26,49 +26,146 @@ int file_deinit() return 0; /* nothing todo here */ } +static char * file_format_name(const char *format, struct timespec *ts) +{ + struct tm tm; + char *buf = alloc(FILE_MAX_PATHLEN); + + /* Convert time */ + gmtime_r(&ts->tv_sec, &tm); + + strftime(buf, FILE_MAX_PATHLEN, format, &tm); + + return buf; +} + +static FILE * file_reopen(struct file_direction *dir) +{ + char buf[FILE_MAX_PATHLEN]; + const char *path = buf; + + /* Append chunk number to filename */ + if (dir->chunk >= 0) + snprintf(buf, FILE_MAX_PATHLEN, "%s_%03u", dir->path, dir->chunk); + else + path = dir->path; + + if (dir->handle) + fclose(dir->handle); + + return fopen(path, dir->mode); +} + +static int file_parse_direction(config_setting_t *cfg, struct file *f, int d) +{ + struct file_direction *dir = (d == FILE_READ) ? &f->read : &f->write; + + if (!config_setting_lookup_string(cfg, "path", &dir->fmt)) + return -1; + + if (!config_setting_lookup_string(cfg, "mode", &dir->mode)) + dir->mode = (d == FILE_READ) ? "r" : "w+"; + + return 0; +} + +int file_parse(config_setting_t *cfg, struct node *n) +{ + struct file *f = alloc(sizeof(struct file)); + + config_setting_t *cfg_in, *cfg_out; + + cfg_out = config_setting_get_member(cfg, "out"); + if (cfg_out) { + if (file_parse_direction(cfg_out, f, FILE_WRITE)) + cerror(cfg_out, "Failed to parse output file for node '%s'", n->name); + + /* More write specific settings */ + if (!config_setting_lookup_int(cfg_out, "split", &f->write.split)) + f->write.split = 0; /* Save all samples in a single file */ + } + + cfg_in = config_setting_get_member(cfg, "in"); + if (cfg_in) { + if (file_parse_direction(cfg_in, f, FILE_READ)) + cerror(cfg_in, "Failed to parse input file for node '%s'", n->name); + + /* More read specific settings */ + if (!config_setting_lookup_bool(cfg_in, "splitted", &f->read.split)) + f->read.split = 0; /* Save all samples in a single file */ + if (!config_setting_lookup_float(cfg_in, "rate", &f->read_rate)) + f->read_rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */ + + double epoch_flt; + if (!config_setting_lookup_float(cfg_in, "epoch", &epoch_flt)) + epoch_flt = 0; + + f->read_epoch = time_from_double(epoch_flt); + + const char *epoch_mode; + if (!config_setting_lookup_string(cfg_in, "epoch_mode", &epoch_mode)) + epoch_mode = "direct"; + + if (!strcmp(epoch_mode, "direct")) + f->read_epoch_mode = EPOCH_DIRECT; + else if (!strcmp(epoch_mode, "wait")) + f->read_epoch_mode = EPOCH_WAIT; + else if (!strcmp(epoch_mode, "relative")) + f->read_epoch_mode = EPOCH_RELATIVE; + else if (!strcmp(epoch_mode, "absolute")) + f->read_epoch_mode = EPOCH_ABSOLUTE; + else + cerror(cfg_in, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode); + } + + n->file = f; + + return 0; +} + char * file_print(struct node *n) { struct file *f = n->file; char *buf = NULL; - if (f->path_in) { - const char *epoch_mode_str = NULL; - switch (f->epoch_mode) { - case EPOCH_DIRECT: epoch_mode_str = "direct"; break; - case EPOCH_WAIT: epoch_mode_str = "wait"; break; - case EPOCH_RELATIVE: epoch_mode_str = "relative"; break; - case EPOCH_ABSOLUTE: epoch_mode_str = "absolute"; break; + if (f->read.fmt) { + const char *epoch_str = NULL; + switch (f->read_epoch_mode) { + case EPOCH_DIRECT: epoch_str = "direct"; break; + case EPOCH_WAIT: epoch_str = "wait"; break; + case EPOCH_RELATIVE: epoch_str = "relative"; break; + case EPOCH_ABSOLUTE: epoch_str = "absolute"; break; } strcatf(&buf, "in=%s, epoch_mode=%s, epoch=%.2f, ", - f->path_in, - epoch_mode_str, - time_to_double(&f->epoch) + f->read.path ? f->read.path : f->read.fmt, + epoch_str, + time_to_double(&f->read_epoch) ); - if (f->rate) - strcatf(&buf, "rate=%.1f, ", f->rate); + if (f->read_rate) + strcatf(&buf, "rate=%.1f, ", f->read_rate); } - if (f->path_out) { + if (f->write.fmt) { strcatf(&buf, "out=%s, mode=%s, ", - f->path_out, - f->file_mode + f->write.path ? f->write.path : f->write.fmt, + f->write.mode ); } - if (f->first.tv_sec || f->first.tv_nsec) - strcatf(&buf, "first=%.2f, ", time_to_double(&f->first)); + if (f->read_first.tv_sec || f->read_first.tv_nsec) + strcatf(&buf, "first=%.2f, ", time_to_double(&f->read_first)); - if (f->offset.tv_sec || f->offset.tv_nsec) - strcatf(&buf, "offset=%.2f, ", time_to_double(&f->offset)); + if (f->read_offset.tv_sec || f->read_offset.tv_nsec) + strcatf(&buf, "offset=%.2f, ", time_to_double(&f->read_offset)); - if ((f->first.tv_sec || f->first.tv_nsec) && - (f->offset.tv_sec || f->offset.tv_nsec)) { - struct timespec eta, now = time_now(); + if ((f->read_first.tv_sec || f->read_first.tv_nsec) && + (f->read_offset.tv_sec || f->read_offset.tv_nsec)) { + struct timespec eta, now = time_now(); - eta = time_add(&f->first, &f->offset); - eta = time_diff(&now, &eta); + eta = time_add(&f->read_first, &f->read_offset); + eta = time_diff(&now, &eta); if (eta.tv_sec || eta.tv_nsec) strcatf(&buf, "eta=%.2f sec, ", time_to_double(&eta)); @@ -80,123 +177,81 @@ char * file_print(struct node *n) return buf; } -int file_parse(config_setting_t *cfg, struct node *n) -{ - struct file *f = alloc(sizeof(struct file)); - - const char *out, *in; - const char *epoch_mode; - double epoch_flt; - - if (config_setting_lookup_string(cfg, "out", &out)) { - time_t t = time(NULL); - struct tm *tm = localtime(&t); - - f->path_out = alloc(FILE_MAX_PATHLEN); - if (strftime(f->path_out, FILE_MAX_PATHLEN, out, tm) == 0) - cerror(cfg, "Invalid path for output"); - - } - if (config_setting_lookup_string(cfg, "in", &in)) - f->path_in = strdup(in); - - if (!config_setting_lookup_string(cfg, "file_mode", &f->file_mode)) - f->file_mode = "w+"; - - if (!config_setting_lookup_float(cfg, "send_rate", &f->rate)) - f->rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */ - - if (config_setting_lookup_float(n->cfg, "epoch", &epoch_flt)) - f->epoch = time_from_double(epoch_flt); - - if (!config_setting_lookup_string(n->cfg, "epoch_mode", &epoch_mode)) - epoch_mode = "direct"; - - if (!strcmp(epoch_mode, "direct")) - f->epoch_mode = EPOCH_DIRECT; - else if (!strcmp(epoch_mode, "wait")) - f->epoch_mode = EPOCH_WAIT; - else if (!strcmp(epoch_mode, "relative")) - f->epoch_mode = EPOCH_RELATIVE; - else if (!strcmp(epoch_mode, "absolute")) - f->epoch_mode = EPOCH_ABSOLUTE; - else - cerror(n->cfg, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode); - - n->file = f; - - return 0; -} - int file_open(struct node *n) { struct file *f = n->file; + + struct timespec now = time_now(); - if (f->path_in) { + if (f->read.fmt) { + /* Prepare file name */ + f->read.chunk = f->read.split ? 0 : -1; + f->read.path = file_format_name(f->read.fmt, &now); + /* Open file */ - f->in = fopen(f->path_in, "r"); - if (!f->in) - serror("Failed to open file for reading: '%s'", f->path_in); + f->read.handle = file_reopen(&f->read); + if (!f->read.handle) + serror("Failed to open file for reading: '%s'", f->read.path); /* Create timer */ - f->tfd = timerfd_create(CLOCK_REALTIME, 0); - if (f->tfd < 0) + f->read_timer = timerfd_create(CLOCK_REALTIME, 0); + if (f->read_timer < 0) serror("Failed to create timer"); + /* Arm the timer with a fixed rate */ + if (f->read_rate) { + struct itimerspec its = { + .it_interval = time_from_double(1 / f->read_rate), + .it_value = { 0, 1 }, + }; + + int ret = timerfd_settime(f->read_timer, 0, &its, NULL); + if (ret) + serror("Failed to start timer"); + } + /* Get current time */ struct timespec now = time_now(); /* Get timestamp of first line */ struct msg m; - int ret = msg_fscan(f->in, &m, NULL, NULL); rewind(f->in); + int ret = msg_fscan(f->read.handle, &m, NULL, NULL); rewind(f->read.handle); if (ret < 0) error("Failed to read first timestamp of node '%s'", n->name); - f->first = MSG_TS(&m); + f->read_first = MSG_TS(&m); - /* Set offset depending on epoch_mode */ - switch (f->epoch_mode) { + /* Set read_offset depending on epoch_mode */ + switch (f->read_epoch_mode) { case EPOCH_DIRECT: /* read first value at now + epoch */ - f->offset = time_diff(&f->first, &now); - f->offset = time_add(&f->offset, &f->epoch); + f->read_offset = time_diff(&f->read_first, &now); + f->read_offset = time_add(&f->read_offset, &f->read_epoch); break; case EPOCH_WAIT: /* read first value at now + first + epoch */ - f->offset = now; - f->offset = time_add(&f->offset, &f->epoch); + f->read_offset = now; + f->read_offset = time_add(&f->read_offset, &f->read_epoch); break; case EPOCH_RELATIVE: /* read first value at first + epoch */ - f->offset = f->epoch; + f->read_offset = f->read_epoch; break; - case EPOCH_ABSOLUTE: /* read first value at f->epoch */ - f->offset = time_diff(&f->first, &f->epoch); + case EPOCH_ABSOLUTE: /* read first value at f->read_epoch */ + f->read_offset = time_diff(&f->read_first, &f->read_epoch); break; } - - /* Arm the timer with a fixed rate */ - if (f->rate) { - struct itimerspec its = { - .it_interval = time_from_double(1 / f->rate), - .it_value = { 1, 0 }, - }; - - int ret = timerfd_settime(f->tfd, 0, &its, NULL); - if (ret) - serror("Failed to start timer"); - } - - char *buf = file_print(n); - debug(4, "Opened file node '%s': %s", n->name, buf); - free(buf); } - if (f->path_out) { - /* Open output file */ - f->out = fopen(f->path_out, f->file_mode); - if (!f->out) - serror("Failed to open file for writing: '%s'", f->path_out); + if (f->write.fmt) { + /* Prepare file name */ + f->write.chunk = f->write.split ? 0 : -1; + f->write.path = file_format_name(f->write.fmt, &now); + + /* Open file */ + f->write.handle = file_reopen(&f->write); + if (!f->write.handle) + serror("Failed to open file for writing: '%s'", f->write.path); } return 0; @@ -205,13 +260,16 @@ int file_open(struct node *n) int file_close(struct node *n) { struct file *f = n->file; + + free(f->read.path); + free(f->write.path); - if (f->tfd) - close(f->tfd); - if (f->in) - fclose(f->in); - if (f->out) - fclose(f->out); + if (f->read_timer) + close(f->read_timer); + if (f->read.handle) + fclose(f->read.handle); + if (f->write.handle) + fclose(f->write.handle); return 0; } @@ -221,31 +279,54 @@ int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt int values, flags, i = 0; struct file *f = n->file; - if (f->in) { + if (f->read.handle) { for (i = 0; i < cnt; i++) { struct msg *cur = &pool[(first+i) % poolsize]; /* Get message and timestamp */ - values = msg_fscan(f->in, cur, &flags, NULL); +retry: values = msg_fscan(f->read.handle, cur, &flags, NULL); if (values < 0) { - if (!feof(f->in)) - warn("Failed to parse file of node '%s': reason=%d", n->name, values); + if (feof(f->read.handle)) { + if (f->read.split) { + f->read.chunk++; + f->read.handle = file_reopen(&f->read); + if (!f->read.handle) + return 0; + + info("Open new input chunk of node '%s': chunk=%u", n->name, f->read.chunk); + } + else { + info("Rewind input file of node '%s'", n->name); + rewind(f->read.handle); + goto retry; + } + } + else + warn("Failed to read messages from node '%s': reason=%d", n->name, values); return 0; } /* Fix missing sequence no */ - cur->sequence = f->sequence = (flags & MSG_PRINT_SEQUENCE) ? cur->sequence : f->sequence + 1; - - if (!f->rate || ftell(f->in) == 0) { - struct timespec until = time_add(&MSG_TS(cur), &f->offset); - - if (timerfd_wait_until(f->tfd, &until) < 0) + cur->sequence = f->read_sequence = (flags & MSG_PRINT_SEQUENCE) ? cur->sequence : f->read_sequence + 1; + + if (!f->read_rate || ftell(f->read.handle) == 0) { + struct timespec until = time_add(&MSG_TS(cur), &f->read_offset); + if (timerfd_wait_until(f->read_timer, &until) < 0) serror("Failed to wait for timer"); + + /* Update timestamp */ + cur->ts.sec = until.tv_sec; + cur->ts.nsec = until.tv_nsec; } else { /* Wait with fixed rate delay */ - if (timerfd_wait(f->tfd) < 0) + if (timerfd_wait(f->read_timer) < 0) serror("Failed to wait for timer"); + + /* Update timestamp */ + struct timespec now = time_now(); + cur->ts.sec = now.tv_sec; + cur->ts.nsec = now.tv_nsec; } } } @@ -260,17 +341,23 @@ int file_write(struct node *n, struct msg *pool, int poolsize, int first, int cn int i = 0; struct file *f = n->file; - if (f->out) { - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - + if (f->write.handle) { for (i = 0; i < cnt; i++) { + /* Split file if requested */ + if ((f->write.split > 0) && ftell(f->write.handle) > f->write.split * (1 << 20)) { + f->write.chunk++; + f->write.handle = file_reopen(&f->write); + + info("Splitted output file for node '%s': chunk=%u", n->name, f->write.chunk); + } + struct msg *m = &pool[(first+i) % poolsize]; - msg_fprint(f->out, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0); + msg_fprint(f->write.handle, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0); } + fflush(f->write.handle); } else - error("Can not write to node '%s", n->name); + error("Can not write to node '%s'", n->name); return i; } diff --git a/server/src/node.c b/server/src/node.c index 410a7c95b..a7b1a6f20 100644 --- a/server/src/node.c +++ b/server/src/node.c @@ -99,7 +99,7 @@ void node_reverse(struct node *n) break; #endif case LOG_FILE: - SWAP(n->file->path_in, n->file->path_out); + SWAP(n->file->read, n->file->write); break; default: { } } @@ -129,10 +129,6 @@ void node_destroy(struct node *n) rtnl_cls_put(n->socket->tc_classifier); break; #endif - case LOG_FILE: - free(n->file->path_in); - free(n->file->path_out); - break; default: { } }