1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00
VILLASnode/lib/nodes/file.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

468 lines
11 KiB
C++
Raw Permalink Normal View History

/* Node type: File.
2015-03-31 13:28:11 +02:00
*
2022-03-15 09:18:01 -04:00
* Author: Steffen Vogel <post@steffenvogel.de>
2022-03-15 09:28:57 -04:00
* SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
2022-07-04 18:20:03 +02:00
* SPDX-License-Identifier: Apache-2.0
2015-06-02 21:53:04 +02:00
*/
2015-03-31 13:28:11 +02:00
#include <cerrno>
#include <cinttypes>
#include <cstring>
#include <libgen.h>
#include <sys/stat.h>
#include <unistd.h>
2015-03-31 13:28:11 +02:00
2021-02-16 14:15:14 +01:00
#include <villas/exceptions.hpp>
2021-05-10 00:12:30 +02:00
#include <villas/format.hpp>
#include <villas/node_compat.hpp>
#include <villas/nodes/file.hpp>
2017-12-09 02:19:28 +08:00
#include <villas/queue.h>
2021-05-10 00:12:30 +02:00
#include <villas/timing.hpp>
2021-02-16 14:15:14 +01:00
#include <villas/utils.hpp>
using namespace villas;
2021-05-10 00:12:30 +02:00
using namespace villas::node;
2019-06-04 16:55:38 +02:00
using namespace villas::utils;
static char *file_format_name(const char *format, struct timespec *ts) {
struct tm tm;
char *buf = new char[FILE_MAX_PATHLEN];
if (!buf)
throw MemoryAllocationError();
// Convert time
gmtime_r(&ts->tv_sec, &tm);
strftime(buf, FILE_MAX_PATHLEN, format, &tm);
return buf;
}
static struct timespec file_calc_offset(const struct timespec *first,
const struct timespec *epoch,
enum file::EpochMode mode) {
2017-05-08 00:46:48 +02:00
// Get current time
struct timespec now = time_now();
struct timespec offset;
2019-04-22 23:43:46 +02:00
// Set offset depending on epoch
2017-05-08 00:46:48 +02:00
switch (mode) {
2019-06-23 16:13:23 +02:00
case file::EpochMode::DIRECT: // read first value at now + epoch
2017-05-08 00:46:48 +02:00
offset = time_diff(first, &now);
return time_add(&offset, epoch);
2017-05-08 00:46:48 +02:00
2019-06-23 16:13:23 +02:00
case file::EpochMode::WAIT: // read first value at now + first + epoch
2017-05-08 00:46:48 +02:00
offset = now;
return time_add(&now, epoch);
2019-06-23 16:13:23 +02:00
case file::EpochMode::RELATIVE: // read first value at first + epoch
2017-05-08 00:46:48 +02:00
return *epoch;
2019-06-23 16:13:23 +02:00
case file::EpochMode::ABSOLUTE: // read first value at f->epoch
2017-05-08 00:46:48 +02:00
return time_diff(first, epoch);
default:
return (struct timespec){0};
2017-05-08 00:46:48 +02:00
}
}
int villas::node::file_parse(NodeCompat *n, json_t *json) {
auto *f = n->getData<struct file>();
int ret;
json_error_t err;
2021-06-18 14:32:03 -04:00
json_t *json_format = nullptr;
2019-04-22 23:45:38 +02:00
const char *uri_tmpl = nullptr;
const char *eof = nullptr;
const char *epoch = nullptr;
double epoch_flt = 0;
2021-05-10 00:12:30 +02:00
ret = json_unpack_ex(json, &err, 0,
"{ s: s, s?: o, s?: { s?: s, s?: F, s?: s, s?: F, s?: "
"i, s?: i }, s?: { s?: b, s?: i } }",
"uri", &uri_tmpl, "format", &json_format, "in", "eof",
2019-04-23 11:19:42 +02:00
&eof, "rate", &f->rate, "epoch_mode", &epoch, "epoch",
&epoch_flt, "buffer_size", &f->buffer_size_in, "skip",
&f->skip_lines, "out", "flush", &f->flush, "buffer_size",
&f->buffer_size_out);
if (ret)
2021-02-16 14:15:14 +01:00
throw ConfigError(json, err, "node-config-node-file");
f->epoch = time_from_double(epoch_flt);
2019-04-22 23:45:38 +02:00
f->uri_tmpl = uri_tmpl ? strdup(uri_tmpl) : nullptr;
2021-05-10 00:12:30 +02:00
// Format
if (f->formatter)
delete f->formatter;
2021-05-10 00:12:30 +02:00
f->formatter = json_format ? FormatFactory::make(json_format)
: FormatFactory::make("villas.human");
if (!f->formatter)
throw ConfigError(json_format, "node-config-node-file-format",
"Invalid format configuration");
if (eof) {
if (!strcmp(eof, "exit") || !strcmp(eof, "stop"))
2019-06-23 16:13:23 +02:00
f->eof_mode = file::EOFBehaviour::STOP;
else if (!strcmp(eof, "rewind"))
2019-06-23 16:13:23 +02:00
f->eof_mode = file::EOFBehaviour::REWIND;
else if (!strcmp(eof, "wait"))
2019-06-23 16:13:23 +02:00
f->eof_mode = file::EOFBehaviour::SUSPEND;
else
2021-02-16 14:15:14 +01:00
throw RuntimeError("Invalid mode '{}' for 'eof' setting", eof);
}
2019-04-22 23:43:46 +02:00
if (epoch) {
if (!strcmp(epoch, "direct"))
2019-06-23 16:13:23 +02:00
f->epoch_mode = file::EpochMode::DIRECT;
2019-04-22 23:43:46 +02:00
else if (!strcmp(epoch, "wait"))
2019-06-23 16:13:23 +02:00
f->epoch_mode = file::EpochMode::WAIT;
2019-04-22 23:43:46 +02:00
else if (!strcmp(epoch, "relative"))
2019-06-23 16:13:23 +02:00
f->epoch_mode = file::EpochMode::RELATIVE;
2019-04-22 23:43:46 +02:00
else if (!strcmp(epoch, "absolute"))
2019-06-23 16:13:23 +02:00
f->epoch_mode = file::EpochMode::ABSOLUTE;
2019-04-22 23:43:46 +02:00
else if (!strcmp(epoch, "original"))
2019-06-23 16:13:23 +02:00
f->epoch_mode = file::EpochMode::ORIGINAL;
else
2021-02-16 14:15:14 +01:00
throw RuntimeError("Invalid value '{}' for setting 'epoch'", epoch);
}
return 0;
}
char *villas::node::file_print(NodeCompat *n) {
auto *f = n->getData<struct file>();
2019-04-22 23:45:38 +02:00
char *buf = nullptr;
2019-04-22 23:45:38 +02:00
const char *epoch_str = nullptr;
const char *eof_str = nullptr;
switch (f->epoch_mode) {
2019-06-23 16:13:23 +02:00
case file::EpochMode::DIRECT:
2019-04-22 23:43:46 +02:00
epoch_str = "direct";
break;
2019-06-23 16:13:23 +02:00
case file::EpochMode::WAIT:
2019-04-22 23:43:46 +02:00
epoch_str = "wait";
break;
2019-06-23 16:13:23 +02:00
case file::EpochMode::RELATIVE:
2019-04-22 23:43:46 +02:00
epoch_str = "relative";
break;
2019-06-23 16:13:23 +02:00
case file::EpochMode::ABSOLUTE:
2019-04-22 23:43:46 +02:00
epoch_str = "absolute";
break;
2019-06-23 16:13:23 +02:00
case file::EpochMode::ORIGINAL:
2019-04-22 23:43:46 +02:00
epoch_str = "original";
break;
default:
epoch_str = "";
break;
}
2019-04-22 23:43:46 +02:00
switch (f->eof_mode) {
2019-06-23 16:13:23 +02:00
case file::EOFBehaviour::STOP:
2019-04-22 23:43:46 +02:00
eof_str = "stop";
break;
2019-06-23 16:13:23 +02:00
case file::EOFBehaviour::SUSPEND:
2019-04-22 23:43:46 +02:00
eof_str = "wait";
break;
2019-06-23 16:13:23 +02:00
case file::EOFBehaviour::REWIND:
2019-04-22 23:43:46 +02:00
eof_str = "rewind";
break;
default:
eof_str = "";
break;
}
2021-05-10 00:12:30 +02:00
strcatf(
&buf,
"uri=%s, out.flush=%s, in.skip=%d, in.eof=%s, in.epoch=%s, in.epoch=%.2f",
f->uri ? f->uri : f->uri_tmpl, f->flush ? "yes" : "no", f->skip_lines,
eof_str, epoch_str, time_to_double(&f->epoch));
if (f->rate)
strcatf(&buf, ", in.rate=%.1f", f->rate);
if (f->first.tv_sec || f->first.tv_nsec)
strcatf(&buf, ", first=%.2f", time_to_double(&f->first));
if (f->offset.tv_sec || f->offset.tv_nsec)
strcatf(&buf, ", offset=%.2f", time_to_double(&f->offset));
if ((f->first.tv_sec || f->first.tv_nsec) &&
(f->offset.tv_sec || f->offset.tv_nsec)) {
struct timespec eta, now = time_now();
eta = time_add(&f->first, &f->offset);
eta = time_diff(&now, &eta);
if (eta.tv_sec || eta.tv_nsec)
2017-06-17 18:54:03 +02:00
strcatf(&buf, ", eta=%.2f sec", time_to_double(&eta));
}
return buf;
2015-03-31 13:28:11 +02:00
}
int villas::node::file_start(NodeCompat *n) {
auto *f = n->getData<struct file>();
struct timespec now = time_now();
2021-05-10 00:12:30 +02:00
int ret;
// Prepare file name
if (f->uri)
delete[] f->uri;
f->uri = file_format_name(f->uri_tmpl, &now);
2021-02-16 14:15:14 +01:00
// Check if directory exists
struct stat sb;
char *cpy = strdup(f->uri);
char *dir = dirname(cpy);
2021-02-16 14:15:14 +01:00
ret = stat(dir, &sb);
if (ret) {
if (errno == ENOENT || errno == ENOTDIR) {
2018-10-21 10:32:09 +02:00
ret = mkdir(dir, 0644);
if (ret)
2021-02-16 14:15:14 +01:00
throw SystemError("Failed to create directory");
} else if (errno != EISDIR)
throw SystemError("Failed to stat");
} else if (!S_ISDIR(sb.st_mode)) {
ret = mkdir(dir, 0644);
if (ret)
throw SystemError("Failed to create directory");
}
2021-02-16 14:15:14 +01:00
free(cpy);
f->formatter->start(n->getInputSignals(false));
// Open file
f->stream_out = fopen(f->uri, "a+");
2021-05-10 00:12:30 +02:00
if (!f->stream_out)
return -1;
f->stream_in = fopen(f->uri, "r");
if (!f->stream_in)
return -1;
if (f->buffer_size_in) {
2021-05-10 00:12:30 +02:00
ret = setvbuf(f->stream_in, nullptr, _IOFBF, f->buffer_size_in);
if (ret)
return ret;
}
if (f->buffer_size_out) {
2021-05-10 00:12:30 +02:00
ret = setvbuf(f->stream_out, nullptr, _IOFBF, f->buffer_size_out);
if (ret)
return ret;
}
// Create timer
2020-03-04 13:06:28 +01:00
f->task.setRate(f->rate);
// Get timestamp of first line
2019-06-23 16:13:23 +02:00
if (f->epoch_mode != file::EpochMode::ORIGINAL) {
2021-05-10 00:12:30 +02:00
rewind(f->stream_in);
2021-05-10 00:12:30 +02:00
if (feof(f->stream_in)) {
2021-02-16 14:15:14 +01:00
n->logger->warn("Empty file");
2017-09-05 10:11:23 +02:00
} else {
struct Sample smp;
2021-05-10 00:12:30 +02:00
smp.capacity = 0;
2021-05-10 00:12:30 +02:00
ret = f->formatter->scan(f->stream_in, &smp);
2017-09-05 10:11:23 +02:00
if (ret == 1) {
2021-05-10 00:12:30 +02:00
f->first = smp.ts.origin;
2017-09-05 10:11:23 +02:00
f->offset = file_calc_offset(&f->first, &f->epoch, f->epoch_mode);
} else
2021-02-16 14:15:14 +01:00
n->logger->warn("Failed to read first timestamp");
}
2017-09-05 10:11:23 +02:00
}
2021-05-10 00:12:30 +02:00
rewind(f->stream_in);
// Fast-forward
struct Sample *smp = sample_alloc_mem(n->getInputSignals(false)->size());
for (unsigned i = 0; i < f->skip_lines; i++)
2021-05-10 00:12:30 +02:00
f->formatter->scan(f->stream_in, smp);
sample_free(smp);
2015-03-31 13:28:11 +02:00
return 0;
}
int villas::node::file_stop(NodeCompat *n) {
auto *f = n->getData<struct file>();
2020-03-04 13:06:28 +01:00
f->task.stop();
2015-08-07 01:11:43 +02:00
2021-05-10 00:12:30 +02:00
fclose(f->stream_in);
fclose(f->stream_out);
2015-03-31 13:28:11 +02:00
return 0;
}
int villas::node::file_read(NodeCompat *n, struct Sample *const smps[],
unsigned cnt) {
auto *f = n->getData<struct file>();
int ret;
uint64_t steps;
assert(cnt == 1);
2021-05-10 00:12:30 +02:00
retry:
ret = f->formatter->scan(f->stream_in, smps, cnt);
2018-08-20 18:48:30 +02:00
if (ret <= 0) {
2021-05-10 00:12:30 +02:00
if (feof(f->stream_in)) {
2019-04-22 23:43:46 +02:00
switch (f->eof_mode) {
2019-06-23 16:13:23 +02:00
case file::EOFBehaviour::REWIND:
2021-02-16 14:15:14 +01:00
n->logger->info("Rewind input file");
f->offset = file_calc_offset(&f->first, &f->epoch, f->epoch_mode);
2021-05-10 00:12:30 +02:00
rewind(f->stream_in);
goto retry;
2017-07-24 19:33:35 +02:00
2019-06-23 16:13:23 +02:00
case file::EOFBehaviour::SUSPEND:
// We wait 10ms before fetching again.
usleep(100000);
// Try to download more data if this is a remote file.
2021-05-10 00:12:30 +02:00
clearerr(f->stream_in);
goto retry;
2017-07-24 19:33:35 +02:00
2019-06-23 16:13:23 +02:00
case file::EOFBehaviour::STOP:
2021-02-16 14:15:14 +01:00
n->logger->info("Reached end-of-file.");
n->setState(State::STOPPING);
return -1;
2019-04-22 23:43:46 +02:00
default: {
}
2015-08-09 23:52:44 +02:00
}
} else
2021-02-16 14:15:14 +01:00
n->logger->warn("Failed to read messages: reason={}", ret);
return 0;
}
// We dont wait in FILE_EPOCH_ORIGINAL mode
2019-06-23 16:13:23 +02:00
if (f->epoch_mode == file::EpochMode::ORIGINAL)
return cnt;
2017-07-24 19:33:35 +02:00
if (f->rate) {
2020-03-04 13:06:28 +01:00
steps = f->task.wait();
2017-05-08 00:46:48 +02:00
smps[0]->ts.origin = time_now();
} else {
smps[0]->ts.origin = time_add(&smps[0]->ts.origin, &f->offset);
2020-03-04 13:06:28 +01:00
f->task.setNext(&smps[0]->ts.origin);
steps = f->task.wait();
}
2015-08-07 01:11:43 +02:00
// Check for overruns
if (steps == 0)
2021-02-16 14:15:14 +01:00
throw SystemError("Failed to wait for timer");
else if (steps != 1)
2021-02-16 14:15:14 +01:00
n->logger->warn("Missed steps: {}", steps - 1);
return cnt;
2015-03-31 13:28:11 +02:00
}
int villas::node::file_write(NodeCompat *n, struct Sample *const smps[],
unsigned cnt) {
int ret;
auto *f = n->getData<struct file>();
2015-08-07 01:11:43 +02:00
assert(cnt == 1);
2021-05-10 00:12:30 +02:00
ret = f->formatter->print(f->stream_out, smps, cnt);
if (ret < 0)
return ret;
2015-08-07 01:11:43 +02:00
2021-05-10 00:12:30 +02:00
if (f->flush)
fflush(f->stream_out);
return cnt;
2015-08-07 01:11:43 +02:00
}
int villas::node::file_poll_fds(NodeCompat *n, int fds[]) {
auto *f = n->getData<struct file>();
if (f->rate) {
2020-03-04 13:06:28 +01:00
fds[0] = f->task.getFD();
2019-04-22 23:43:46 +02:00
return 1;
2019-06-23 16:13:23 +02:00
} else if (f->epoch_mode == file::EpochMode::ORIGINAL) {
2021-05-10 00:12:30 +02:00
fds[0] = fileno(f->stream_in);
2019-04-22 23:43:46 +02:00
return 1;
}
2019-04-22 23:43:46 +02:00
return -1; // TODO: not supported yet
}
int villas::node::file_init(NodeCompat *n) {
auto *f = n->getData<struct file>();
2020-03-04 13:06:28 +01:00
new (&f->task) Task(CLOCK_REALTIME);
// Default values
f->rate = 0;
f->eof_mode = file::EOFBehaviour::STOP;
f->epoch_mode = file::EpochMode::DIRECT;
f->flush = 0;
f->buffer_size_in = 0;
f->buffer_size_out = 0;
f->skip_lines = 0;
f->formatter = nullptr;
2020-03-04 13:06:28 +01:00
return 0;
}
int villas::node::file_destroy(NodeCompat *n) {
auto *f = n->getData<struct file>();
2020-03-04 13:06:28 +01:00
f->task.~Task();
if (f->uri)
delete[] f->uri;
if (f->formatter)
delete f->formatter;
2020-03-04 13:06:28 +01:00
return 0;
}
static NodeCompatType p;
__attribute__((constructor(110))) static void register_plugin() {
2021-06-21 16:11:42 -04:00
p.name = "file";
p.description = "support for file log / replay node type";
p.vectorize = 1;
p.size = sizeof(struct file);
p.init = file_init;
p.destroy = file_destroy;
p.parse = file_parse;
p.print = file_print;
p.start = file_start;
p.stop = file_stop;
p.read = file_read;
p.write = file_write;
p.poll_fds = file_poll_fds;
static NodeCompatFactory ncp(&p);
}