1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

io: generalization of line based formats

This commit is contained in:
Steffen Vogel 2018-05-12 13:47:35 +02:00
parent 71b2ec3d51
commit 3438dc305d
4 changed files with 107 additions and 151 deletions

View file

@ -32,12 +32,14 @@ struct io_format;
enum io_flags {
IO_FLUSH = (1 << 8), /**< Flush the output stream after each chunk of samples. */
IO_NONBLOCK = (1 << 9) /**< Dont block io_read() while waiting for new samples. */
IO_NONBLOCK = (1 << 9), /**< Dont block io_read() while waiting for new samples. */
IO_NEWLINES = (1 << 10) /**< The samples of this format are newline delimited. */
};
struct io {
enum state state;
int flags;
char delimiter; /**< Newline delimiter. */
struct {
/** A format type can use this file handle or overwrite the
@ -50,6 +52,7 @@ struct io {
} stream;
char *buffer;
size_t buflen;
struct list *signals;
struct node *node;
@ -81,6 +84,10 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt);
int io_scan(struct io *io, struct sample *smps[], unsigned cnt);
int io_scan_lines(struct io *io, struct sample *smps[], unsigned cnt);
int io_print_lines(struct io *io, struct sample *smps[], unsigned cnt);
int io_eof(struct io *io);
void io_rewind(struct io *io);
@ -100,3 +107,6 @@ void io_stream_rewind(struct io *io);
int io_stream_fd(struct io *io);
int io_stream_flush(struct io *io);
FILE * io_stream_input(struct io *io);
FILE * io_stream_output(struct io *io);

120
lib/io.c
View file

@ -24,6 +24,7 @@
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <ctype.h>
#include <villas/io.h>
#include <villas/io_format.h>
@ -36,6 +37,13 @@ int io_init(struct io *io, struct io_format *fmt, int flags)
io->_vd = alloc(fmt->size);
io->flags = flags | io->_vt->flags;
io->delimiter = '\n';
io->input.buflen =
io->output.buflen = 4096;
io->input.buffer = alloc(io->input.buflen);
io->output.buffer = alloc(io->output.buflen);
return io->_vt->init ? io->_vt->init(io) : 0;
}
@ -49,6 +57,8 @@ int io_destroy(struct io *io)
return ret;
free(io->_vd);
free(io->input.buffer);
free(io->output.buffer);
return 0;
}
@ -61,7 +71,7 @@ int io_stream_open(struct io *io, const char *uri)
if (!strcmp(uri, "-")) {
goto stdio;
}
else if (aislocal(uri)) {
else if (aislocal(uri) == 1) {
io->mode = IO_MODE_STDIO;
io->output.stream.std = fopen(uri, "a+");
@ -194,9 +204,11 @@ void io_stream_rewind(struct io *io)
{
switch (io->mode) {
case IO_MODE_ADVIO:
return arewind(io->input.stream.adv);
arewind(io->input.stream.adv);
break;
case IO_MODE_STDIO:
return rewind(io->input.stream.std);
rewind(io->input.stream.std);
break;
case IO_MODE_CUSTOM: { }
}
}
@ -215,7 +227,6 @@ int io_stream_fd(struct io *io)
return -1;
}
int io_open(struct io *io, const char *uri)
{
int ret;
@ -260,9 +271,10 @@ int io_eof(struct io *io)
void io_rewind(struct io *io)
{
io->_vt->rewind
? io->_vt->rewind(io)
: io_stream_rewind(io);
if (io->_vt->rewind)
io->_vt->rewind(io);
else
io_stream_rewind(io);
}
int io_fd(struct io *io)
@ -290,27 +302,22 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt)
if (io->_vt->print)
ret = io->_vt->print(io, smps, cnt);
else if (io->flags & IO_NEWLINES)
ret = io_print_lines(io, smps, cnt);
else {
FILE *f = io->mode == IO_MODE_ADVIO
? io->output.stream.adv->file
: io->output.stream.std;
//flockfile(f);
FILE *f = io_stream_output(io);
if (io->_vt->fprint)
ret = io->_vt->fprint(f, smps, cnt, io->flags);
ret = io_format_fprint(io->_vt, f, smps, cnt, io->flags);
else if (io->_vt->sprint) {
char buf[4096];
size_t wbytes;
ret = io->_vt->sprint(buf, sizeof(buf), &wbytes, smps, cnt, io->flags);
ret = io_format_sprint(io->_vt, io->output.buffer, io->output.buflen, &wbytes, smps, cnt, io->flags);
fwrite(buf, wbytes, 1, f);
fwrite(io->output.buffer, wbytes, 1, f);
}
else
ret = -1;
//funlockfile(f);
}
if (io->flags & IO_FLUSH)
@ -325,28 +332,83 @@ int io_scan(struct io *io, struct sample *smps[], unsigned cnt)
if (io->_vt->scan)
ret = io->_vt->scan(io, smps, cnt);
else if (io->flags & IO_NEWLINES)
ret = io_scan_lines(io, smps, cnt);
else {
FILE *f = io->mode == IO_MODE_ADVIO
? io->input.stream.adv->file
: io->input.stream.std;
//flockfile(f);
FILE *f = io_stream_input(io);
if (io->_vt->fscan)
return io->_vt->fscan(f, smps, cnt, io->flags);
ret = io_format_fscan(io->_vt, f, smps, cnt, io->flags);
else if (io->_vt->sscan) {
size_t bytes, rbytes;
char buf[4096];
bytes = fread(buf, 1, sizeof(buf), f);
bytes = fread(io->input.buffer, 1, io->input.buflen, f);
ret = io->_vt->sscan(buf, bytes, &rbytes, smps, cnt, io->flags);
ret = io_format_sscan(io->_vt, io->input.buffer, bytes, &rbytes, smps, cnt, io->flags);
}
else
ret = -1;
//funlockfile(f);
}
return ret;
}
int io_print_lines(struct io *io, struct sample *smps[], unsigned cnt)
{
int ret, i;
FILE *f = io_stream_output(io);
for (i = 0; i < cnt; i++) {
size_t wbytes;
ret = io_format_sprint(io->_vt, io->output.buffer, io->output.buflen, &wbytes, &smps[i], 1, io->flags);
if (ret < 0)
return ret;
fwrite(io->output.buffer, wbytes, 1, f);
}
return i;
}
int io_scan_lines(struct io *io, struct sample *smps[], unsigned cnt)
{
int ret, i;
FILE *f = io_stream_input(io);
for (i = 0; i < cnt; i++) {
size_t rbytes;
ssize_t bytes;
char *ptr;
skip: bytes = getdelim(&io->input.buffer, &io->input.buflen, io->delimiter, f);
if (bytes < 0)
return -1; /* An error or eof occured */
/* Skip whitespaces, empty and comment lines */
for (ptr = io->input.buffer; isspace(*ptr); ptr++);
if (ptr[0] == '\0' || ptr[0] == '#')
goto skip;
ret = io_format_sscan(io->_vt, io->input.buffer, bytes, &rbytes, &smps[i], 1, io->flags);
if (ret < 0)
return ret;
}
return i;
}
FILE * io_stream_output(struct io *io) {
return io->mode == IO_MODE_ADVIO
? io->output.stream.adv->file
: io->output.stream.std;
}
FILE * io_stream_input(struct io *io) {
return io->mode == IO_MODE_ADVIO
? io->input.stream.adv->file
: io->input.stream.std;
}

View file

@ -24,6 +24,7 @@
#include <inttypes.h>
#include <string.h>
#include <villas/io.h>
#include <villas/io/csv.h>
#include <villas/plugin.h>
#include <villas/sample.h>
@ -154,66 +155,9 @@ int csv_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi
return i;
}
int csv_fprint_single(FILE *f, struct sample *s, int flags)
{
int ret;
char line[4096];
ret = csv_sprint_single(line, sizeof(line), s, flags);
if (ret < 0)
return ret;
fputs(line, f);
return 0;
}
int csv_fscan_single(FILE *f, struct sample *s, int flags)
{
char *ptr, line[4096];
skip: if (fgets(line, sizeof(line), f) == NULL)
return -1; /* An error occured */
/* Skip whitespaces, empty and comment lines */
for (ptr = line; isspace(*ptr); ptr++);
if (*ptr == '\0' || *ptr == '#')
goto skip;
return csv_sscan_single(line, strlen(line), s, flags);
}
int csv_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = csv_fprint_single(f, smps[i], flags);
if (ret < 0)
break;
}
return i;
}
int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = csv_fscan_single(f, smps[i], flags);
if (ret < 0) {
warn("Failed to read CSV line: %d", ret);
break;
}
}
return i;
}
void csv_header(struct io *io)
{
FILE *f = io->mode == IO_MODE_ADVIO
? io->output.stream.adv->file
: io->output.stream.std;
FILE *f = io_stream_output(io);
fprintf(f, "# secs%cnsecs%coffset%csequence%cdata[]\n", CSV_SEPARATOR, CSV_SEPARATOR, CSV_SEPARATOR, CSV_SEPARATOR);
}
@ -223,12 +167,11 @@ static struct plugin p = {
.description = "Tabulator-separated values",
.type = PLUGIN_TYPE_IO,
.io = {
.fprint = csv_fprint,
.fscan = csv_fscan,
.sprint = csv_sprint,
.sscan = csv_sscan,
.header = csv_header,
.size = 0,
.flags = IO_NEWLINES
}
};

View file

@ -21,7 +21,6 @@
*********************************************************************************/
#include <stdbool.h>
#include <ctype.h>
#include <inttypes.h>
#include <string.h>
@ -187,81 +186,23 @@ int villas_human_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smp
return i;
}
int villas_human_fscan_single(FILE *f, struct sample *s, int flags)
{
char *ptr, line[4096];
skip: if (fgets(line, sizeof(line), f) == NULL)
return -1; /* An error occured */
/* Skip whitespaces, empty and comment lines */
for (ptr = line; isspace(*ptr); ptr++);
if (*ptr == '\0' || *ptr == '#')
goto skip;
return villas_human_sscan_single(line, strlen(line), s, flags);
}
int villas_human_fprint_single(FILE *f, struct sample *s, int flags)
{
int ret;
char line[4096];
ret = villas_human_sprint_single(line, sizeof(line), s, flags);
if (ret < 0)
return ret;
fputs(line, f);
return 0;
}
int villas_human_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = villas_human_fprint_single(f, smps[i], flags);
if (ret < 0)
return ret;
}
return i;
}
void villas_human_header(struct io *io)
{
FILE *f = io->mode == IO_MODE_ADVIO
? io->output.stream.adv->file
: io->output.stream.std;
FILE *f = io_stream_output(io);
fprintf(f, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]");
}
int villas_human_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = villas_human_fscan_single(f, smps[i], flags);
if (ret < 0)
return ret;
}
return i;
}
static struct plugin p = {
.name = "villas.human",
.description = "VILLAS human readable format",
.type = PLUGIN_TYPE_IO,
.io = {
.fprint = villas_human_fprint,
.fscan = villas_human_fscan,
.sprint = villas_human_sprint,
.sscan = villas_human_sscan,
.header = villas_human_header,
.size = 0,
.flags = IO_NEWLINES
}
};