1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

support advio for file node-type

This commit is contained in:
Steffen Vogel 2017-03-07 08:04:37 -04:00
parent 7626c67616
commit 3573932df8
3 changed files with 20 additions and 17 deletions

View file

@ -52,7 +52,9 @@ int afflush(AFILE *file);
/* The remaining functions from stdio are just replaced macros */
#define afeof(af) feof(af->file)
#define afeof(af) feof((af)->file)
#define aftell(af) ftell((af)->file)
#define arewind(af) rewind((af)->file)
size_t afread(void *restrict ptr, size_t size, size_t nitems, AFILE *restrict stream);
size_t afwrite(const void *restrict ptr, size_t size, size_t nitems, AFILE *restrict stream);

View file

@ -15,6 +15,7 @@
#pragma once
#include "advio.h"
#include "node.h"
#define FILE_MAX_PATHLEN 512
@ -26,7 +27,7 @@ enum {
struct file {
struct file_direction {
FILE *handle; /**< libc: stdio file handle */
AFILE *handle; /**< libc: stdio file handle */
const char *mode; /**< libc: fopen() mode */
const char *fmt; /**< Format string for file name. */

View file

@ -38,7 +38,7 @@ static char * file_format_name(const char *format, struct timespec *ts)
return buf;
}
static FILE * file_reopen(struct file_direction *dir)
static AFILE * file_reopen(struct file_direction *dir)
{
char buf[FILE_MAX_PATHLEN];
const char *uri = buf;
@ -50,9 +50,9 @@ static FILE * file_reopen(struct file_direction *dir)
uri = dir->uri;
if (dir->handle)
fclose(dir->handle);
afclose(dir->handle);
return fopen(path, dir->mode);
return afopen(uri, dir->mode, /* ADVIO_MEM */ 0);
}
static int file_parse_direction(config_setting_t *cfg, struct file *f, int d)
@ -206,7 +206,7 @@ int file_open(struct node *n)
/* Get timestamp of first line */
struct sample s;
int ret = sample_fscan(f->read.handle, &s, NULL); rewind(f->read.handle);
int ret = sample_fscan(f->read.handle->file, &s, NULL); arewind(f->read.handle);
if (ret < 0)
error("Failed to read first timestamp of node %s", node_name(n));
@ -237,12 +237,12 @@ int file_open(struct node *n)
if (f->write.fmt) {
/* Prepare file name */
f->write.chunk = f->write.split ? 0 : -1;
f->write.path = file_format_name(f->write.fmt, &now);
f->write.uri = file_format_name(f->write.fmt, &now);
/* Open file */
f->write.handle = file_reopen(&f->write);
if (!f->write.handle)
serror("Failed to open file for writing: '%s'", f->write.path);
serror("Failed to open file for writing: '%s'", f->write.uri);
}
return 0;
@ -258,9 +258,9 @@ int file_close(struct node *n)
if (f->read_timer)
close(f->read_timer);
if (f->read.handle)
fclose(f->read.handle);
afclose(f->read.handle);
if (f->write.handle)
fclose(f->write.handle);
afclose(f->write.handle);
return 0;
}
@ -274,9 +274,9 @@ int file_read(struct node *n, struct sample *smps[], unsigned cnt)
assert(f->read.handle);
assert(cnt == 1);
retry: values = sample_fscan(f->read.handle, s, &flags); /* Get message and timestamp */
retry: values = sample_fscan(f->read.handle->file, s, &flags); /* Get message and timestamp */
if (values < 0) {
if (feof(f->read.handle)) {
if (afeof(f->read.handle)) {
if (f->read.split) {
f->read.chunk++;
f->read.handle = file_reopen(&f->read);
@ -287,7 +287,7 @@ retry: values = sample_fscan(f->read.handle, s, &flags); /* Get message and time
}
else {
info("Rewind input file of node %s", node_name(n));
rewind(f->read.handle);
arewind(f->read.handle);
goto retry;
}
}
@ -297,7 +297,7 @@ retry: values = sample_fscan(f->read.handle, s, &flags); /* Get message and time
return 0;
}
if (!f->read_rate || ftell(f->read.handle) == 0) {
if (!f->read_rate || aftell(f->read.handle) == 0) {
s->ts.origin = time_add(&s->ts.origin, &f->read_offset);
if (timerfd_wait_until(f->read_timer, &s->ts.origin) == 0)
serror("Failed to wait for timer");
@ -322,15 +322,15 @@ int file_write(struct node *n, struct sample *smps[], unsigned cnt)
assert(cnt == 1);
/* Split file if requested */
if (f->write.split > 0 && ftell(f->write.handle) > f->write.split) {
if (f->write.split > 0 && aftell(f->write.handle) > f->write.split) {
f->write.chunk++;
f->write.handle = file_reopen(&f->write);
info("Splitted output node %s: chunk=%u", node_name(n), f->write.chunk);
}
sample_fprint(f->write.handle, s, SAMPLE_ALL & ~SAMPLE_OFFSET);
fflush(f->write.handle);
sample_fprint(f->write.handle->file, s, SAMPLE_ALL & ~SAMPLE_OFFSET);
afflush(f->write.handle);
return 1;
}