diff --git a/include/villas/io.h b/include/villas/io.h index 51b9d95eb..9a0d2b688 100644 --- a/include/villas/io.h +++ b/include/villas/io.h @@ -32,12 +32,14 @@ struct io_format; enum io_flags { IO_FLUSH = (1 << 8), /**< Flush the output stream after each chunk of samples. */ - IO_NONBLOCK = (1 << 9) /**< Dont block io_read() while waiting for new samples. */ + IO_NONBLOCK = (1 << 9), /**< Dont block io_read() while waiting for new samples. */ + IO_NEWLINES = (1 << 10) /**< The samples of this format are newline delimited. */ }; struct io { enum state state; int flags; + char delimiter; /**< Newline delimiter. */ struct { /** A format type can use this file handle or overwrite the @@ -50,6 +52,7 @@ struct io { } stream; char *buffer; + size_t buflen; struct list *signals; struct node *node; @@ -81,6 +84,10 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt); int io_scan(struct io *io, struct sample *smps[], unsigned cnt); +int io_scan_lines(struct io *io, struct sample *smps[], unsigned cnt); + +int io_print_lines(struct io *io, struct sample *smps[], unsigned cnt); + int io_eof(struct io *io); void io_rewind(struct io *io); @@ -100,3 +107,6 @@ void io_stream_rewind(struct io *io); int io_stream_fd(struct io *io); int io_stream_flush(struct io *io); + +FILE * io_stream_input(struct io *io); +FILE * io_stream_output(struct io *io); diff --git a/lib/io.c b/lib/io.c index dfcdba997..6c4f80248 100644 --- a/lib/io.c +++ b/lib/io.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -36,6 +37,13 @@ int io_init(struct io *io, struct io_format *fmt, int flags) io->_vd = alloc(fmt->size); io->flags = flags | io->_vt->flags; + io->delimiter = '\n'; + + io->input.buflen = + io->output.buflen = 4096; + + io->input.buffer = alloc(io->input.buflen); + io->output.buffer = alloc(io->output.buflen); return io->_vt->init ? io->_vt->init(io) : 0; } @@ -49,6 +57,8 @@ int io_destroy(struct io *io) return ret; free(io->_vd); + free(io->input.buffer); + free(io->output.buffer); return 0; } @@ -61,7 +71,7 @@ int io_stream_open(struct io *io, const char *uri) if (!strcmp(uri, "-")) { goto stdio; } - else if (aislocal(uri)) { + else if (aislocal(uri) == 1) { io->mode = IO_MODE_STDIO; io->output.stream.std = fopen(uri, "a+"); @@ -194,9 +204,11 @@ void io_stream_rewind(struct io *io) { switch (io->mode) { case IO_MODE_ADVIO: - return arewind(io->input.stream.adv); + arewind(io->input.stream.adv); + break; case IO_MODE_STDIO: - return rewind(io->input.stream.std); + rewind(io->input.stream.std); + break; case IO_MODE_CUSTOM: { } } } @@ -215,7 +227,6 @@ int io_stream_fd(struct io *io) return -1; } - int io_open(struct io *io, const char *uri) { int ret; @@ -260,9 +271,10 @@ int io_eof(struct io *io) void io_rewind(struct io *io) { - io->_vt->rewind - ? io->_vt->rewind(io) - : io_stream_rewind(io); + if (io->_vt->rewind) + io->_vt->rewind(io); + else + io_stream_rewind(io); } int io_fd(struct io *io) @@ -290,27 +302,22 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt) if (io->_vt->print) ret = io->_vt->print(io, smps, cnt); + else if (io->flags & IO_NEWLINES) + ret = io_print_lines(io, smps, cnt); else { - FILE *f = io->mode == IO_MODE_ADVIO - ? io->output.stream.adv->file - : io->output.stream.std; - - //flockfile(f); + FILE *f = io_stream_output(io); if (io->_vt->fprint) - ret = io->_vt->fprint(f, smps, cnt, io->flags); + ret = io_format_fprint(io->_vt, f, smps, cnt, io->flags); else if (io->_vt->sprint) { - char buf[4096]; size_t wbytes; - ret = io->_vt->sprint(buf, sizeof(buf), &wbytes, smps, cnt, io->flags); + ret = io_format_sprint(io->_vt, io->output.buffer, io->output.buflen, &wbytes, smps, cnt, io->flags); - fwrite(buf, wbytes, 1, f); + fwrite(io->output.buffer, wbytes, 1, f); } else ret = -1; - - //funlockfile(f); } if (io->flags & IO_FLUSH) @@ -325,28 +332,83 @@ int io_scan(struct io *io, struct sample *smps[], unsigned cnt) if (io->_vt->scan) ret = io->_vt->scan(io, smps, cnt); + else if (io->flags & IO_NEWLINES) + ret = io_scan_lines(io, smps, cnt); else { - FILE *f = io->mode == IO_MODE_ADVIO - ? io->input.stream.adv->file - : io->input.stream.std; - - //flockfile(f); + FILE *f = io_stream_input(io); if (io->_vt->fscan) - return io->_vt->fscan(f, smps, cnt, io->flags); + ret = io_format_fscan(io->_vt, f, smps, cnt, io->flags); else if (io->_vt->sscan) { size_t bytes, rbytes; - char buf[4096]; - bytes = fread(buf, 1, sizeof(buf), f); + bytes = fread(io->input.buffer, 1, io->input.buflen, f); - ret = io->_vt->sscan(buf, bytes, &rbytes, smps, cnt, io->flags); + ret = io_format_sscan(io->_vt, io->input.buffer, bytes, &rbytes, smps, cnt, io->flags); } else ret = -1; - - //funlockfile(f); } return ret; } + +int io_print_lines(struct io *io, struct sample *smps[], unsigned cnt) +{ + int ret, i; + + FILE *f = io_stream_output(io); + + for (i = 0; i < cnt; i++) { + size_t wbytes; + + ret = io_format_sprint(io->_vt, io->output.buffer, io->output.buflen, &wbytes, &smps[i], 1, io->flags); + if (ret < 0) + return ret; + + fwrite(io->output.buffer, wbytes, 1, f); + } + + return i; +} + +int io_scan_lines(struct io *io, struct sample *smps[], unsigned cnt) +{ + int ret, i; + + FILE *f = io_stream_input(io); + + for (i = 0; i < cnt; i++) { + size_t rbytes; + ssize_t bytes; + char *ptr; + +skip: bytes = getdelim(&io->input.buffer, &io->input.buflen, io->delimiter, f); + if (bytes < 0) + return -1; /* An error or eof occured */ + + /* Skip whitespaces, empty and comment lines */ + for (ptr = io->input.buffer; isspace(*ptr); ptr++); + + if (ptr[0] == '\0' || ptr[0] == '#') + goto skip; + + ret = io_format_sscan(io->_vt, io->input.buffer, bytes, &rbytes, &smps[i], 1, io->flags); + if (ret < 0) + return ret; + } + + return i; +} + +FILE * io_stream_output(struct io *io) { + return io->mode == IO_MODE_ADVIO + ? io->output.stream.adv->file + : io->output.stream.std; +} + +FILE * io_stream_input(struct io *io) { + return io->mode == IO_MODE_ADVIO + ? io->input.stream.adv->file + : io->input.stream.std; +} diff --git a/lib/io/csv.c b/lib/io/csv.c index c7bdc4d1c..a88b9721c 100644 --- a/lib/io/csv.c +++ b/lib/io/csv.c @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -154,66 +155,9 @@ int csv_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsi return i; } -int csv_fprint_single(FILE *f, struct sample *s, int flags) -{ - int ret; - char line[4096]; - - ret = csv_sprint_single(line, sizeof(line), s, flags); - if (ret < 0) - return ret; - - fputs(line, f); - - return 0; -} - -int csv_fscan_single(FILE *f, struct sample *s, int flags) -{ - char *ptr, line[4096]; - -skip: if (fgets(line, sizeof(line), f) == NULL) - return -1; /* An error occured */ - - /* Skip whitespaces, empty and comment lines */ - for (ptr = line; isspace(*ptr); ptr++); - if (*ptr == '\0' || *ptr == '#') - goto skip; - - return csv_sscan_single(line, strlen(line), s, flags); -} - -int csv_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags) -{ - int ret, i; - for (i = 0; i < cnt; i++) { - ret = csv_fprint_single(f, smps[i], flags); - if (ret < 0) - break; - } - - return i; -} - -int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags) -{ - int ret, i; - for (i = 0; i < cnt; i++) { - ret = csv_fscan_single(f, smps[i], flags); - if (ret < 0) { - warn("Failed to read CSV line: %d", ret); - break; - } - } - - return i; -} - void csv_header(struct io *io) { - FILE *f = io->mode == IO_MODE_ADVIO - ? io->output.stream.adv->file - : io->output.stream.std; + FILE *f = io_stream_output(io); fprintf(f, "# secs%cnsecs%coffset%csequence%cdata[]\n", CSV_SEPARATOR, CSV_SEPARATOR, CSV_SEPARATOR, CSV_SEPARATOR); } @@ -223,12 +167,11 @@ static struct plugin p = { .description = "Tabulator-separated values", .type = PLUGIN_TYPE_IO, .io = { - .fprint = csv_fprint, - .fscan = csv_fscan, .sprint = csv_sprint, .sscan = csv_sscan, .header = csv_header, .size = 0, + .flags = IO_NEWLINES } }; diff --git a/lib/io/villas_human.c b/lib/io/villas_human.c index 357026c6a..6adc04242 100644 --- a/lib/io/villas_human.c +++ b/lib/io/villas_human.c @@ -21,7 +21,6 @@ *********************************************************************************/ #include -#include #include #include @@ -187,81 +186,23 @@ int villas_human_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smp return i; } -int villas_human_fscan_single(FILE *f, struct sample *s, int flags) -{ - char *ptr, line[4096]; - -skip: if (fgets(line, sizeof(line), f) == NULL) - return -1; /* An error occured */ - - /* Skip whitespaces, empty and comment lines */ - for (ptr = line; isspace(*ptr); ptr++); - if (*ptr == '\0' || *ptr == '#') - goto skip; - - return villas_human_sscan_single(line, strlen(line), s, flags); -} - -int villas_human_fprint_single(FILE *f, struct sample *s, int flags) -{ - int ret; - char line[4096]; - - ret = villas_human_sprint_single(line, sizeof(line), s, flags); - if (ret < 0) - return ret; - - fputs(line, f); - - return 0; -} - -int villas_human_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags) -{ - int ret, i; - - for (i = 0; i < cnt; i++) { - ret = villas_human_fprint_single(f, smps[i], flags); - if (ret < 0) - return ret; - } - - return i; -} - void villas_human_header(struct io *io) { - FILE *f = io->mode == IO_MODE_ADVIO - ? io->output.stream.adv->file - : io->output.stream.std; + FILE *f = io_stream_output(io); fprintf(f, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]"); } -int villas_human_fscan(FILE *f, struct sample *smps[], unsigned cnt, int flags) -{ - int ret, i; - - for (i = 0; i < cnt; i++) { - ret = villas_human_fscan_single(f, smps[i], flags); - if (ret < 0) - return ret; - } - - return i; -} - static struct plugin p = { .name = "villas.human", .description = "VILLAS human readable format", .type = PLUGIN_TYPE_IO, .io = { - .fprint = villas_human_fprint, - .fscan = villas_human_fscan, .sprint = villas_human_sprint, .sscan = villas_human_sscan, .header = villas_human_header, .size = 0, + .flags = IO_NEWLINES } };