diff --git a/documentation/Mainpage.md b/documentation/Mainpage.md index 35525edca..455c0e5b2 100644 --- a/documentation/Mainpage.md +++ b/documentation/Mainpage.md @@ -13,6 +13,8 @@ S2SS is used in distributed- and co-simulation scenarios and developed for the f The project consists of a server daemon and several client modules which are documented here. +[TOC] + ### Server The server simply acts as a gateway to forward simulation data from one client to another. diff --git a/documentation/clients/File.md b/documentation/clients/File.md index 28a14b112..75428cd18 100644 --- a/documentation/clients/File.md +++ b/documentation/clients/File.md @@ -31,16 +31,31 @@ Specifies the mode which should be used to open the output file. See [open(2)](http://man7.org/linux/man-pages/man2/open.2.html) for an explanation of allowed values. The default value is `w+` which will start writing at the beginning of the file and create it in case it does not exist yet. -#### `epoch_mode` *("now"|"relative"|"absolute")* +#### `epoch_mode` *("direct"|"wait" | "relative"|"absolute")* +The *epoch* describes the point in time when the first message will be read from the file. This setting allows to select the behaviour of the following `epoch` setting. It can be used to adjust the point in time when the first value should be read. The behaviour of `epoch` is depending on the value of `epoch_mode`. - - `epoch_mode = now`: The first value is read at *now* + `epoch` seconds. - - `epoch_mode = relative`: The first value is read at *start* + `epoch` seconds. - - `epoch_mode = absolute`: The first value is read at `epoch` seconds after 1970-01-01 00:00:00. +To facilitate the following description of supported `epoch_mode`'s, we will introduce some intermediate variables (timestamps). +Those variables will also been displayed during the startup phase of the server to simplify debugging. + +- `epoch` is the value of the `epoch` setting. +- `first` is the timestamp of the first message / line in the input file. +- `offset` will be added to the timestamps in the file to obtain the real time when the message will be sent. +- `start` is the point in time when the first message will be sent (`first + offset`). +- `eta` the time to wait until the first message will be send (`start - now`) + +The supported values for `epoch_mode`: + + | `epoch_mode` | `offset` | `start = first + offset` | + | -----------: | :-------------------: | :----------------------: | + | `direct` | `now - first + epoch` | `now + epoch` | + | `wait` | `now + epoch` | `now + first` | + | `relative` | `epoch` | `first + epoch` | + | `absolute` | `epoch - first` | `epoch` | #### `send_rate` *(float)* @@ -60,8 +75,9 @@ If this setting has a non-zero value, the default behaviour is overwritten with in = "logs/file_input.log", # These options specify the path prefix where the the files are stored out = "logs/file_output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3)) - epoch_mode = "now" # One of: - # now (default) + epoch_mode = "direct" # One of: + # direct (default) + # wait # relative # absolute diff --git a/server/etc/example.conf b/server/etc/example.conf index 22ce22dc1..d6e74e6d4 100644 --- a/server/etc/example.conf +++ b/server/etc/example.conf @@ -90,8 +90,9 @@ nodes = { in = "logs/file_input.log", # These options specify the path prefix where the the files are stored out = "logs/file_output_%F_%T.log" # The output path accepts all format tokens of (see strftime(3)) - epoch_mode = "now" # One of: - # now (default) + epoch_mode = "direct" # One of: + # direct (default) + # wait # relative # absolute diff --git a/server/include/file.h b/server/include/file.h index 51733cc32..ca633b749 100644 --- a/server/include/file.h +++ b/server/include/file.h @@ -32,17 +32,19 @@ struct file { const char *file_mode; /**< The mode for fopen() which is used for the out file. */ enum epoch_mode { - EPOCH_NOW, + EPOCH_DIRECT, + EPOCH_WAIT, EPOCH_RELATIVE, EPOCH_ABSOLUTE } epoch_mode; /**< Specifies how file::offset is calculated. */ - struct timespec start; /**< The first timestamp of the input file. */ + struct timespec first; /**< The first timestamp in the file file::path_in */ struct timespec epoch; /**< The epoch timestamp from the configuration. */ struct timespec offset; /**< An offset between the timestamp in the input file and the current time */ double rate; /**< The sending rate. */ int tfd; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ + int sequence; /**< Last sequence of this node */ }; /** @see node_vtable::init */ diff --git a/server/src/file.c b/server/src/file.c index bb06e2058..ab60bacf1 100644 --- a/server/src/file.c +++ b/server/src/file.c @@ -30,9 +30,55 @@ char * file_print(struct node *n) { struct file *f = n->file; char *buf = NULL; + + if (f->path_in) { + const char *epoch_mode_str = NULL; + switch (f->epoch_mode) { + case EPOCH_DIRECT: epoch_mode_str = "direct"; break; + case EPOCH_WAIT: epoch_mode_str = "wait"; break; + case EPOCH_RELATIVE: epoch_mode_str = "relative"; break; + case EPOCH_ABSOLUTE: epoch_mode_str = "absolute"; break; + } + + strcatf(&buf, "in=%s, epoch_mode=%s, epoch=%.2f, ", + f->path_in, + epoch_mode_str, + time_to_double(&f->epoch) + ); + + if (f->rate) + strcatf(&buf, "rate=%.1f, ", f->rate); + } + + if (f->path_out) { + strcatf(&buf, "out=%s, mode=%s, ", + f->path_out, + f->file_mode + ); + } + + if (f->first.tv_sec || f->first.tv_nsec) + strcatf(&buf, "first=%.2f, ", time_to_double(&f->first)); + + if (f->offset.tv_sec || f->offset.tv_nsec) + strcatf(&buf, "offset=%.2f, ", time_to_double(&f->offset)); + + if ((f->first.tv_sec || f->first.tv_nsec) && + (f->offset.tv_sec || f->offset.tv_nsec)) { + struct timespec eta, now; + clock_gettime(CLOCK_REALTIME, &now); - return strcatf(&buf, "in=%s, out=%s, mode=%s, rate=%.1f, epoch_mode=%u, epoch=%.0f", - f->path_in, f->path_out, f->file_mode, f->rate, f->epoch_mode, time_to_double(&f->epoch)); + eta = time_add(&f->first, &f->offset); + eta = time_diff(&now, &eta); + + if (eta.tv_sec || eta.tv_nsec) + strcatf(&buf, "eta=%.2f sec, ", time_to_double(&eta)); + } + + if (strlen(buf) > 2) + buf[strlen(buf) - 2] = 0; + + return buf; } int file_parse(config_setting_t *cfg, struct node *n) @@ -65,10 +111,12 @@ int file_parse(config_setting_t *cfg, struct node *n) f->epoch = time_from_double(epoch_flt); if (!config_setting_lookup_string(n->cfg, "epoch_mode", &epoch_mode)) - epoch_mode = "now"; + epoch_mode = "direct"; - if (!strcmp(epoch_mode, "now")) - f->epoch_mode = EPOCH_NOW; + if (!strcmp(epoch_mode, "direct")) + f->epoch_mode = EPOCH_DIRECT; + else if (!strcmp(epoch_mode, "wait")) + f->epoch_mode = EPOCH_WAIT; else if (!strcmp(epoch_mode, "relative")) f->epoch_mode = EPOCH_RELATIVE; else if (!strcmp(epoch_mode, "absolute")) @@ -86,17 +134,51 @@ int file_open(struct node *n) struct file *f = n->file; if (f->path_in) { - f->in = fopen(f->path_in, "r"); + /* Open file */ + f->in = fopen(f->path_in, "r"); if (!f->in) serror("Failed to open file for reading: '%s'", f->path_in); + /* Create timer */ f->tfd = timerfd_create(CLOCK_REALTIME, 0); if (f->tfd < 0) serror("Failed to create timer"); + + /* Get current time */ + struct timespec now, eta; + clock_gettime(CLOCK_REALTIME, &now); - /* Arm the timer */ + /* Get timestamp of first line */ + struct msg m; + int ret = msg_fscan(f->in, &m, NULL, NULL); rewind(f->in); + if (ret < 0) + error("Failed to read first timestamp of node '%s'", n->name); + + f->first = MSG_TS(&m); + + /* Set offset depending on epoch_mode */ + switch (f->epoch_mode) { + case EPOCH_DIRECT: /* read first value at now + epoch */ + f->offset = time_diff(&f->first, &now); + f->offset = time_add(&f->offset, &f->epoch); + break; + + case EPOCH_WAIT: /* read first value at now + first + epoch */ + f->offset = now; + f->offset = time_add(&f->offset, &f->epoch); + break; + + case EPOCH_RELATIVE: /* read first value at first + epoch */ + f->offset = f->epoch; + break; + + case EPOCH_ABSOLUTE: /* read first value at f->epoch */ + f->offset = time_diff(&f->first, &f->epoch); + break; + } + + /* Arm the timer with a fixed rate */ if (f->rate) { - /* Send with fixed rate */ struct itimerspec its = { .it_interval = time_from_double(1 / f->rate), .it_value = { 1, 0 }, @@ -106,43 +188,14 @@ int file_open(struct node *n) if (ret) serror("Failed to start timer"); } - else { - struct msg m; - /* Get current time */ - struct timespec now, first, eta; - clock_gettime(CLOCK_REALTIME, &now); - - /* Get timestamp of first sample */ - msg_fscan(f->in, &m, NULL, NULL); - first = MSG_TS(&m); - - /* Set offset depending on epoch_mode */ - switch (f->epoch_mode) { - case EPOCH_NOW: /* read first value at f->now + f->epoch */ - first = time_diff(&f->start, &now); - f->offset = time_add(&first, &f->epoch); - break; - case EPOCH_RELATIVE: /* read first value at f->start + f->epoch */ - f->offset = f->epoch; - break; - case EPOCH_ABSOLUTE: /* read first value at f->epoch */ - f->offset = time_diff(&f->start, &f->epoch); - break; - } - - eta = time_add(&f->start, &f->offset); - eta = time_diff(&now, &eta); - - debug(5, "Opened file '%s' as input for node '%s': start=%.2f, offset=%.2f, eta=%.2f sec", - f->path_in, n->name, - time_to_double(&f->start), - time_to_double(&f->offset), - time_to_double(&eta) - ); - } + + char *buf = file_print(n); + debug(4, "Opened file node '%s': %s", n->name, buf); + free(buf); } if (f->path_out) { + /* Open output file */ f->out = fopen(f->path_out, f->file_mode); if (!f->out) serror("Failed to open file for writing: '%s'", f->path_out); @@ -167,39 +220,35 @@ int file_close(struct node *n) int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt) { - int i = 0; + int values, flags, i = 0; struct file *f = n->file; if (f->in) { for (i = 0; i < cnt; i++) { struct msg *cur = &pool[(first+i) % poolsize]; - if (f->rate) { - /* Wait until epoch for the first time only */ - if (ftell(f->in) == 0) { - struct timespec until = time_add(&f->start, &f->offset); - if (timerfd_wait_until(f->tfd, &until)) - serror("Failed to wait for timer"); - } - /* Wait with fixed rate delay */ - else { - if (timerfd_wait(f->tfd) < 0) - serror("Failed to wait for timer"); - } + /* Get message and timestamp */ + values = msg_fscan(f->in, cur, &flags, NULL); + if (values < 0) { + if (!feof(f->in)) + warn("Failed to parse file of node '%s", n->name); - msg_fscan(f->in, cur, NULL, NULL); + return 0; } - else { - struct timespec until; - /* Get message and timestamp */ - msg_fscan(f->in, cur, NULL, NULL); + /* Fix missing sequence no */ + cur->sequence = f->sequence = (flags & MSG_PRINT_SEQUENCE) ? cur->sequence : f->sequence + 1; + + if (!f->rate || ftell(f->in) == 0) { + struct timespec until = time_add(&MSG_TS(cur), &f->offset); - /* Wait for next message / sampe */ - until = time_add(&MSG_TS(cur), &f->offset); if (timerfd_wait_until(f->tfd, &until) < 0) serror("Failed to wait for timer"); } + else { /* Wait with fixed rate delay */ + if (timerfd_wait(f->tfd) < 0) + serror("Failed to wait for timer"); + } } } else