diff --git a/include/villas/formats/csv.h b/include/villas/formats/csv.h index 36ccb502f..77a0deed7 100644 --- a/include/villas/formats/csv.h +++ b/include/villas/formats/csv.h @@ -23,13 +23,13 @@ #pragma once -#include "advio.h" +#include /* Forward declarations. */ struct sample; #define CSV_SEPARATOR '\t' -int io_format_csv_fprint(AFILE *f, struct sample *smp, int flags); +int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags); -int io_format_csv_fscan(AFILE *f, struct sample *smp, int *flags); +int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags); diff --git a/include/villas/formats/json.h b/include/villas/formats/json.h index ea87d760f..6454cdef4 100644 --- a/include/villas/formats/json.h +++ b/include/villas/formats/json.h @@ -30,6 +30,6 @@ int io_format_json_pack(json_t **j, struct sample *s, int flags); int io_format_json_unpack(json_t *j, struct sample *s, int *flags); -int io_format_json_fprint(AFILE *f, struct sample *s, int flags); +int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags); -int io_format_json_fscan(AFILE *f, struct sample *s, int *flags); +int io_format_json_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags); diff --git a/include/villas/msg.h b/include/villas/formats/msg.h similarity index 100% rename from include/villas/msg.h rename to include/villas/formats/msg.h diff --git a/include/villas/msg_format.h b/include/villas/formats/msg_format.h similarity index 100% rename from include/villas/msg_format.h rename to include/villas/formats/msg_format.h diff --git a/include/villas/formats/villas.h b/include/villas/formats/villas.h index 5acb44888..798e289c5 100644 --- a/include/villas/formats/villas.h +++ b/include/villas/formats/villas.h @@ -25,12 +25,12 @@ #include -/* VILLASnode human readable format */ +#include "io.h" -int io_format_villas_print(char *buf, size_t len, struct sample *s, int flags); +int io_format_villas_print(struct io *io, struct sample *smps[], size_t cnt); -int io_format_villas_scan(const char *line, struct sample *s, int *fl); +int io_format_villas_scan(struct io *io, struct sample *smps[], size_t cnt); -int io_format_villas_fprint(FILE *f, struct sample *s, int flags); +int io_format_villas_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags); -int io_format_villas_fscan(FILE *f, struct sample *s, int *flags); +int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags); diff --git a/include/villas/webmsg.h b/include/villas/formats/webmsg.h similarity index 100% rename from include/villas/webmsg.h rename to include/villas/formats/webmsg.h diff --git a/include/villas/webmsg_format.h b/include/villas/formats/webmsg_format.h similarity index 100% rename from include/villas/webmsg_format.h rename to include/villas/formats/webmsg_format.h diff --git a/include/villas/io.h b/include/villas/io.h index 4b0bed7a6..e8668c895 100644 --- a/include/villas/io.h +++ b/include/villas/io.h @@ -24,6 +24,7 @@ #pragma once #include "advio.h" +#include "common.h" /* Forward declarations */ struct sample; @@ -31,34 +32,113 @@ struct io; /** These flags define the format which is used by io_fscan() and io_fprint(). */ enum io_flags { - IO_FORMAT_NANOSECONDS = (1 << 0), - IO_FORMAT_OFFSET = (1 << 1), - IO_FORMAT_SEQUENCE = (1 << 2), - IO_FORMAT_VALUES = (1 << 3), - IO_FORMAT_ALL = 16-1 + IO_FORMAT_NANOSECONDS = (1 << 0), /**< Include nanoseconds in output. */ + IO_FORMAT_OFFSET = (1 << 1), /**< Include offset / delta between received and send timestamps. */ + IO_FORMAT_SEQUENCE = (1 << 2), /**< Include sequence number in output. */ + IO_FORMAT_VALUES = (1 << 3), /**< Include values in output. */ + IO_FORMAT_ALL = 16-1, /**< Enable all output options. */ + IO_FLAG_FLUSH = (1 << 8), /**< Flush the output stream after each chunk of samples. */ }; struct io_format { int (*init)(struct io *io); int (*destroy)(struct io *io); + + /** @{ + * High-level interface + */ + + /** Open an IO stream. + * + * @see fopen() + */ int (*open)(struct io *io, const char *uri, const char *mode); + + /** Close an IO stream. + * + * @see fclose() + */ int (*close)(struct io *io); + + /** Check if end-of-file was reached. + * + * @see feof() + */ int (*eof)(struct io *io); + + /** Rewind an IO stream. + * + * @see rewind() + */ void (*rewind)(struct io *io); - int (*print)(struct io *io, struct sample *s, int fl); - int (*scan)(struct io *io, struct sample *s, int *fl); + + /** Flush buffered data to disk. + * + * @see fflush() + */ + int (*flush)(struct io *io); + + int (*print)(struct io *io, struct sample *smps[], size_t cnt); + int (*scan)( struct io *io, struct sample *smps[], size_t cnt); + /** @} */ + + /** @{ + * Low-level interface + */ + + /** Parse samples from the buffer \p buf with a length of \p len bytes. + * + * @return The number of bytes consumed of \p buf. + */ + size_t (*sscan)( char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags); + + /** Print \p cnt samples from \p smps into buffer \p buf of length \p len. + * + * @return The number of bytes written to \p buf. + */ + size_t (*sprint)(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags); + + /** Parse up to \p cnt samples from stream \p f into array \p smps. + * + * @return The number of samples parsed. + */ + int (*fscan)( FILE *f, struct sample *smps[], size_t cnt, int *flags); + + /** Print \p cnt samples from \p smps to stream \p f. + * + * @return The number of samples written to \p f. + */ + int (*fprint)(FILE *f, struct sample *smps[], size_t cnt, int flags); + + /** @} */ size_t size; /**< Number of bytes to allocate for io::_vd */ }; struct io { + enum state state; int flags; + enum { + IO_MODE_STDIO, + IO_MODE_ADVIO, + IO_MODE_CUSTOM + } mode; + /** A format type can use this file handle or overwrite the * io_format::{open,close,eof,rewind} functions and the private * data in io::_vd. */ - AFILE *file; + union { + struct { + FILE *input; + FILE *output; + } stdio; + struct { + AFILE *input; + AFILE *output; + } advio; + }; void *_vd; struct io_format *_vt; @@ -79,3 +159,16 @@ int io_scan(struct io *io, struct sample *smps[], size_t cnt); int io_eof(struct io *io); void io_rewind(struct io *io); + +int io_flush(struct io *io); + + +int io_stream_open(struct io *io, const char *uri, const char *mode); + +int io_stream_close(struct io *io); + +int io_stream_eof(struct io *io); + +void io_stream_rewind(struct io *io); + +int io_stream_flush(struct io *io); diff --git a/include/villas/super_node.h b/include/villas/super_node.h index 72a5f9b89..954966401 100644 --- a/include/villas/super_node.h +++ b/include/villas/super_node.h @@ -44,6 +44,8 @@ struct super_node { struct api api; struct web web; + char *name; /**< A name of this super node. Usually the hostname. */ + struct { int argc; char **argv; diff --git a/lib/formats/Makefile.inc b/lib/formats/Makefile.inc index e760b08ab..12022e536 100644 --- a/lib/formats/Makefile.inc +++ b/lib/formats/Makefile.inc @@ -20,4 +20,18 @@ # along with this program. If not, see . ################################################################################### -LIB_SRCS += $(addprefix lib/formats/,villas.c csv.c msg.c webmsg.c) +LIB_SRCS += $(addprefix lib/formats/,json.c villas.c csv.c) + +WITH_HDF5 ?= 0 + +ifeq ($(PLATFORM),Darwin) + HDF5_PREFIX ?= /opt/local +else + HDF5_PREFIX ?= /usr +endif + +ifeq ($(WITH_HDF5),1) +ifneq ($(wildcard $(HDF5_PREFIX)/include/hdf5_hl.h),) + LIB_SRCS += lib/formats/hdf5.c +endif +endif diff --git a/lib/formats/csv.c b/lib/formats/csv.c index b88da09d2..eba524b60 100644 --- a/lib/formats/csv.c +++ b/lib/formats/csv.c @@ -25,50 +25,44 @@ #include "formats/csv.h" #include "plugin.h" #include "sample.h" +#include "timing.h" -int io_format_csv_fprint(AFILE *f, struct sample *s, int flags) +int io_format_csv_fprint_single(FILE *f, struct sample *s, int flags) { - afprintf(f, "%ld %09ld %d", s->ts.origin.tv_sec, s->ts.origin.tv_nsec, s->sequence); + fprintf(f, "%ld %09ld %d", s->ts.origin.tv_sec, s->ts.origin.tv_nsec, s->sequence); for (int i = 0; i < s->length; i++) { switch ((s->format >> i) & 0x1) { case SAMPLE_DATA_FORMAT_FLOAT: - afprintf(f, "%c%.6f", CSV_SEPARATOR, s->data[i].f); + fprintf(f, "%c%.6f", CSV_SEPARATOR, s->data[i].f); break; case SAMPLE_DATA_FORMAT_INT: - afprintf(f, "%c%d", CSV_SEPARATOR, s->data[i].i); + fprintf(f, "%c%d", CSV_SEPARATOR, s->data[i].i); break; } } + fputc('\n', f); + return 0; } -int io_format_csv_fscan(AFILE *f, struct sample *smp, int *flags) +size_t io_format_csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flags) { int ret, off; - char *ptr, line[4096]; -skip: if (afgets(line, sizeof(line), f) == NULL) - return -1; /* An error occured */ - - /* Skip whitespaces, empty and comment lines */ - for (ptr = line; isspace(*ptr); ptr++); - if (*ptr == '\0' || *ptr == '#') - goto skip; - - ret = sscanf(line, "%ld %09ld %d %n", &smp->ts.origin.tv_sec, &smp->ts.origin.tv_nsec, &smp->sequence, &off); - if (ret != 4) + ret = sscanf(buf, "%ld %ld %d %n", &s->ts.origin.tv_sec, &s->ts.origin.tv_nsec, &s->sequence, &off); + if (ret != 3) return -1; int i; - for (i = 0; i < smp->capacity; i++) { - switch (smp->format & (1 << i)) { + for (i = 0; i < s->capacity; i++) { + switch (s->format & (1 << i)) { case SAMPLE_DATA_FORMAT_FLOAT: - ret = sscanf(line + off, "%f %n", &smp->data[i].f, &off); + ret = sscanf(buf + off, "%f %n", &s->data[i].f, &off); break; case SAMPLE_DATA_FORMAT_INT: - ret = sscanf(line + off, "%d %n", &smp->data[i].i, &off); + ret = sscanf(buf + off, "%d %n", &s->data[i].i, &off); break; } @@ -76,19 +70,51 @@ skip: if (afgets(line, sizeof(line), f) == NULL) break; } - smp->length = i; + s->length = i; + s->ts.received = time_now(); - return ret; + return 0; } -int io_format_csv_print(struct io *io, struct sample *smp, int flags) +int io_format_csv_fscan_single(FILE *f, struct sample *s, int *flags) { - return io_format_csv_fprint(io->file, smp, flags); + char *ptr, line[4096]; + +skip: if (fgets(line, sizeof(line), f) == NULL) + return -1; /* An error occured */ + + /* Skip whitespaces, empty and comment lines */ + for (ptr = line; isspace(*ptr); ptr++); + if (*ptr == '\0' || *ptr == '#') + goto skip; + + return io_format_csv_sscan_single(line, strlen(line), s, flags); } -int io_format_csv_scan(struct io *io, struct sample *smp, int *flags) +int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags) { - return io_format_csv_fscan(io->file, smp, flags); + int ret, i; + for (i = 0; i < cnt; i++) { + ret = io_format_csv_fprint_single(f, smps[i], flags); + if (ret < 0) + break; + } + + return i; +} + +int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags) +{ + int ret, i; + for (i = 0; i < cnt; i++) { + ret = io_format_csv_fscan_single(f, smps[i], flags); + if (ret < 0) { + warn("Failed to read CSV line: %d", ret); + break; + } + } + + return i; } static struct plugin p = { @@ -96,8 +122,8 @@ static struct plugin p = { .description = "Tabulator-separated values", .type = PLUGIN_TYPE_FORMAT, .io = { - .scan = io_format_csv_scan, - .print = io_format_csv_print, + .fprint = io_format_csv_fprint, + .fscan = io_format_csv_fscan, .size = 0 } }; diff --git a/lib/formats/json.c b/lib/formats/json.c index 650b9a8fc..00c79d19b 100644 --- a/lib/formats/json.c +++ b/lib/formats/json.c @@ -90,47 +90,93 @@ int io_format_json_unpack(json_t *j, struct sample *s, int *flags) return 0; } -int io_format_json_fprint(AFILE *f, struct sample *s, int flags) +size_t io_format_json_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags) { - int ret; + int i, ret; json_t *json; + size_t wr, off = 0; - ret = io_format_json_pack(&json, s, flags); - if (ret) - return ret; + for (i = 0; i < cnt; i++) { + ret = io_format_json_pack(&json, smps[i], flags); + if (ret) + return ret; - ret = json_dumpf(json, f->file, 0); + wr = json_dumpb(json, buf + off, len - off, 0); - json_decref(json); + json_decref(json); - return ret; + if (wr > len) + break; + + off += wr; + } + + return i; } -int io_format_json_fscan(AFILE *f, struct sample *s, int *flags) +size_t io_format_json_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags) { - int ret; + int i, ret; + json_t *json; + json_error_t err; + size_t off = 0; + + for (i = 0; i < cnt; i++) { + json = json_loadb(buf + off, len - off, JSON_DISABLE_EOF_CHECK, &err); + if (!json) + break; + + off += err.position; + + ret = io_format_json_unpack(json, smps[i], flags); + + json_decref(json); + + if (ret) + break; + } + + return i; +} + +int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags) +{ + int ret, i; + json_t *json; + + for (i = 0; i < cnt; i++) { + ret = io_format_json_pack(&json, smps[i], flags); + if (ret) + return ret; + + ret = json_dumpf(json, f, 0); + fputc('\n', f); + + json_decref(json); + } + + return i; +} + +int io_format_json_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags) +{ + int i, ret; json_t *json; json_error_t err; - json = json_loadf(f->file, JSON_DISABLE_EOF_CHECK, &err); - if (!json) - return -1; + for (i = 0; i < cnt; i++) { +skip: json = json_loadf(f, JSON_DISABLE_EOF_CHECK, &err); + if (!json) + break; - ret = io_format_json_unpack(json, s, flags); + ret = io_format_json_unpack(json, smps[i], flags); + if (ret) + goto skip; - json_decref(json); + json_decref(json); + } - return ret; -} - -int io_format_json_print(struct io *io, struct sample *smp, int flags) -{ - return io_format_json_fprint(io->file, smp, flags); -} - -int io_format_json_scan(struct io *io, struct sample *smp, int *flags) -{ - return io_format_json_fscan(io->file, smp, flags); + return i; } static struct plugin p = { @@ -138,8 +184,10 @@ static struct plugin p = { .description = "Javascript Object Notation", .type = PLUGIN_TYPE_FORMAT, .io = { - .print = io_format_json_print, - .scan = io_format_json_scan, + .fscan = io_format_json_fscan, + .fprint = io_format_json_fprint, + .sscan = io_format_json_sscan, + .sprint = io_format_json_sprint, .size = 0 }, }; diff --git a/lib/formats/msg.c b/lib/formats/msg.c index 08622f8ce..d61868a36 100644 --- a/lib/formats/msg.c +++ b/lib/formats/msg.c @@ -23,10 +23,11 @@ #include #include -#include "msg.h" -#include "msg_format.h" +#include "formats/msg.h" +#include "formats/msg_format.h" #include "sample.h" #include "utils.h" +#include "plugin.h" void msg_ntoh(struct msg *m) { @@ -150,3 +151,14 @@ int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t return i; } + +static struct plugin p = { + .name = "msg", + .description = "VILLAS binary network format", + .type = PLUGIN_TYPE_FORMAT, + .io = { + .size = 0 + }, +}; + +REGISTER_PLUGIN(&p); diff --git a/lib/formats/villas.c b/lib/formats/villas.c index 1c26f2609..f49d28a72 100644 --- a/lib/formats/villas.c +++ b/lib/formats/villas.c @@ -28,8 +28,9 @@ #include "utils.h" #include "timing.h" #include "sample.h" +#include "formats/villas.h" -int io_format_villas_print(char *buf, size_t len, struct sample *s, int flags) +size_t io_format_villas_sprint_single(char *buf, size_t len, struct sample *s, int flags) { size_t off = snprintf(buf, len, "%llu", (unsigned long long) s->ts.origin.tv_sec); @@ -57,15 +58,15 @@ int io_format_villas_print(char *buf, size_t len, struct sample *s, int flags) off += snprintf(buf + off, len - off, "\n"); - return 0; /* trailing '\0' */ + return off; } -int io_format_villas_scan(const char *line, struct sample *s, int *fl) +size_t io_format_villas_sscan_single(const char *buf, size_t len, struct sample *s, int *flags) { char *end; - const char *ptr = line; + const char *ptr = buf; - int flags = 0; + int fl = 0; double offset = 0; /* Format: Seconds.NanoSeconds+Offset(SequenceNumber) Value1 Value2 ... @@ -85,7 +86,7 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl) s->ts.origin.tv_nsec = (uint32_t) strtoul(ptr, &end, 10); if (ptr != end) - flags |= IO_FORMAT_NANOSECONDS; + fl |= IO_FORMAT_NANOSECONDS; else return -3; } @@ -98,7 +99,7 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl) offset = strtof(ptr, &end); /* offset is ignored for now */ if (ptr != end) - flags |= IO_FORMAT_OFFSET; + fl |= IO_FORMAT_OFFSET; else return -4; } @@ -109,7 +110,7 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl) s->sequence = strtoul(ptr, &end, 10); if (ptr != end) - flags |= IO_FORMAT_SEQUENCE; + fl |= IO_FORMAT_SEQUENCE; else return -5; @@ -135,35 +136,42 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl) } if (s->length > 0) - flags |= IO_FORMAT_VALUES; + fl |= IO_FORMAT_VALUES; - if (fl) - *fl = flags; - if (flags & IO_FORMAT_OFFSET) { + if (flags) + *flags = fl; + + if (fl & IO_FORMAT_OFFSET) { struct timespec off = time_from_double(offset); s->ts.received = time_add(&s->ts.origin, &off); } else - s->ts.received = s->ts.origin; + s->ts.received = time_now(); - return 0; + return end - buf; } -int io_format_villas_fprint(FILE *f, struct sample *s, int flags) +size_t io_format_villas_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags) { - char line[4096]; - int ret; + size_t off = 0; - ret = io_format_villas_print(line, sizeof(line), s, flags); - if (ret) - return ret; + for (int i = 0; i < cnt && off < len; i++) + off += io_format_villas_sprint_single(buf + off, len - off, smps[i], flags); - fputs(line, f); - - return 0; + return off; } -int io_format_villas_fscan(FILE *f, struct sample *s, int *fl) +size_t io_format_villas_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags) +{ + size_t off = 0; + + for (int i = 0; i < cnt && off < len; i++) + off += io_format_villas_sscan_single(buf + off, len - off, smps[i], flags); + + return off; +} + +int io_format_villas_fscan_single(FILE *f, struct sample *s, int *flags) { char *ptr, line[4096]; @@ -175,16 +183,68 @@ skip: if (fgets(line, sizeof(line), f) == NULL) if (*ptr == '\0' || *ptr == '#') goto skip; - return io_format_villas_scan(line, s, fl); + return io_format_villas_sscan_single(line, strlen(line), s, flags); } -struct plugin p = { +int io_format_villas_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags) +{ + char line[4096]; + int ret, i; + + for (i = 0; i < cnt; i++) { + ret = io_format_villas_sprint_single(line, sizeof(line), smps[i], flags); + if (ret < 0) + break; + + fputs(line, f); + } + + return i; +} + +int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags) +{ + int ret, i; + + for (i = 0; i < cnt; i++) { + ret = io_format_villas_fscan_single(f, smps[i], flags); + if (ret < 0) + return ret; + } + + return i; +} + +int io_format_villas_open(struct io *io, const char *uri, const char *mode) +{ + int ret; + + ret = io_stream_open(io, uri, mode); + if (ret) + return ret; + + FILE *f = io->mode == IO_MODE_ADVIO + ? io->advio.output->file + : io->stdio.output; + + fprintf(f, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]"); + + if (io->flags & IO_FLAG_FLUSH) + io_flush(io); + + return 0; +} + +static struct plugin p = { .name = "villas", - .description = "Human readable VILLAS format", + .description = "VILLAS human readable format", .type = PLUGIN_TYPE_FORMAT, .io = { + .open = io_format_villas_open, .fprint = io_format_villas_fprint, .fscan = io_format_villas_fscan, + .sprint = io_format_villas_sprint, + .sscan = io_format_villas_sscan, .size = 0 } }; diff --git a/lib/formats/webmsg.c b/lib/formats/webmsg.c index d0c1de283..1868d7b91 100644 --- a/lib/formats/webmsg.c +++ b/lib/formats/webmsg.c @@ -26,8 +26,9 @@ #include #endif -#include "webmsg.h" -#include "webmsg_format.h" +#include "plugin.h" +#include "formats/webmsg.h" +#include "formats/webmsg_format.h" void webmsg_ntoh(struct webmsg *m) { @@ -72,3 +73,14 @@ int webmsg_verify(struct webmsg *m) else return 0; } + +static struct plugin p = { + .name = "webmsg", + .description = "VILLAS binary format for websockets", + .type = PLUGIN_TYPE_FORMAT, + .io = { + .size = 0 + }, +}; + +REGISTER_PLUGIN(&p); diff --git a/lib/hook.c b/lib/hook.c index 44f0062de..7f261666d 100644 --- a/lib/hook.c +++ b/lib/hook.c @@ -24,7 +24,7 @@ #include "timing.h" #include "config.h" -#include "msg.h" +#include "formats/msg.h" #include "hook.h" #include "path.h" #include "utils.h" diff --git a/lib/io.c b/lib/io.c index 91e79451d..2d42b4e0d 100644 --- a/lib/io.c +++ b/lib/io.c @@ -21,9 +21,11 @@ *********************************************************************************/ #include +#include #include "io.h" #include "utils.h" +#include "sample.h" int io_init(struct io *io, struct io_format *fmt, int flags) { @@ -48,55 +50,170 @@ int io_destroy(struct io *io) return 0; } -int io_open(struct io *io, const char *uri, const char *mode) +int io_stream_open(struct io *io, const char *uri, const char *mode) { - if (io->_vt->open) - return io->_vt->open(io, uri, mode); + int ret; + + if (uri) { + if (aislocal(uri)) { + io->mode = IO_MODE_STDIO; + + io->stdio.input = + io->stdio.output = fopen(uri, mode); + + if (io->stdio.output == NULL) + return -1; + + ret = setvbuf(io->stdio.output, NULL, _IOLBF, BUFSIZ); + if (ret) + return -1; + } + else { + io->mode = IO_MODE_ADVIO; + + io->advio.input = + io->advio.output = afopen(uri, mode); + + if (io->advio.output == NULL) + return -1; + } + } else { - io->file = afopen(uri, mode); - if (!io->file) + io->mode = IO_MODE_STDIO; + io->flags |= IO_FLAG_FLUSH; + + io->stdio.input = stdin; + io->stdio.output = stdout; + + ret = setvbuf(io->stdio.input, NULL, _IOLBF, BUFSIZ); + if (ret) return -1; - return 0; + ret = setvbuf(io->stdio.output, NULL, _IOLBF, BUFSIZ); + if (ret) + return -1; } + + return 0; +} + +int io_stream_close(struct io *io) +{ + switch (io->mode) { + case IO_MODE_ADVIO: + return afclose(io->advio.input); + case IO_MODE_STDIO: + return io->stdio.input != stdin ? fclose(io->stdio.input) : 0; + case IO_MODE_CUSTOM: + return 0; + } + + return -1; +} + +int io_stream_flush(struct io *io) +{ + switch (io->mode) { + case IO_MODE_ADVIO: + return afflush(io->advio.output); + case IO_MODE_STDIO: + return fflush(io->stdio.output); + case IO_MODE_CUSTOM: + return 0; + } + + return -1; +} + +int io_stream_eof(struct io *io) +{ + switch (io->mode) { + case IO_MODE_ADVIO: + return afeof(io->advio.input); + case IO_MODE_STDIO: + return feof(io->stdio.input); + case IO_MODE_CUSTOM: + return 0; + } + + return -1; +} + +void io_stream_rewind(struct io *io) +{ + switch (io->mode) { + case IO_MODE_ADVIO: + return arewind(io->advio.input); + case IO_MODE_STDIO: + return rewind(io->stdio.input); + case IO_MODE_CUSTOM: { } + } +} + +int io_open(struct io *io, const char *uri, const char *mode) +{ + return io->_vt->open + ? io->_vt->open(io, uri, mode) + : io_stream_open(io, uri, mode); } int io_close(struct io *io) { - return io->_vt->close ? io->_vt->close(io) : afclose(io->file); + return io->_vt->close + ? io->_vt->close(io) + : io_stream_close(io); } -int io_print(struct io *io, struct sample *smps[], size_t cnt) +int io_flush(struct io *io) { - assert(io->_vt->print); - - for (int i = 0; i < cnt; i++) - io->_vt->print(io, smps[i], io->flags); - - return cnt; -} - -int io_scan(struct io *io, struct sample *smps[], size_t cnt) -{ - int ret; - - assert(io->_vt->scan); - - for (int i = 0; i < cnt && !io_eof(io); i++) { - ret = io->_vt->scan(io, smps[i], NULL); - if (ret < 0) - return i; - } - - return cnt; + return io->_vt->flush + ? io->_vt->flush(io) + : io_stream_flush(io); } int io_eof(struct io *io) { - return io->_vt->eof ? io->_vt->eof(io) : afeof(io->file); + return io->_vt->eof + ? io->_vt->eof(io) + : io_stream_eof(io); } void io_rewind(struct io *io) { - io->_vt->rewind ? io->_vt->rewind(io) : arewind(io->file); + io->_vt->rewind + ? io->_vt->rewind(io) + : io_stream_rewind(io); +} + +int io_print(struct io *io, struct sample *smps[], size_t cnt) +{ + int ret; + + if (io->_vt->print) + ret = io->_vt->print(io, smps, cnt); + else { + FILE *f = io->mode == IO_MODE_ADVIO + ? io->advio.output->file + : io->stdio.output; + + ret = io->_vt->fprint(f, smps, cnt, io->flags); + } + + if (io->flags & IO_FLAG_FLUSH) + io_flush(io); + + return ret; +} + +int io_scan(struct io *io, struct sample *smps[], size_t cnt) +{ + if (io->_vt->scan) + return io->_vt->scan(io, smps, cnt); + else { + FILE *f = io->mode == IO_MODE_ADVIO + ? io->advio.input->file + : io->stdio.input; + + return io->_vt->fscan(f, smps, cnt, NULL); + } } diff --git a/lib/nodes/Makefile.inc b/lib/nodes/Makefile.inc index d612bc48e..12e6bcf0e 100644 --- a/lib/nodes/Makefile.inc +++ b/lib/nodes/Makefile.inc @@ -90,8 +90,8 @@ endif # Enable Socket node type when libnl3 is available ifeq ($(WITH_SOCKET),1) - LIB_SRCS += $(addprefix lib/nodes/, socket.c) - LIB_SRCS += $(addprefix lib/, msg.c) + LIB_SRCS += lib/nodes/socket.c + LIB_SRCS += lib/formats/msg.c LIB_CFLAGS += -DWITH_SOCKET # libnl3 is optional but required for network emulation and IRQ pinning @@ -135,7 +135,8 @@ endif # Enable WebSocket support ifeq ($(WITH_WEBSOCKET),1) ifeq ($(shell $(PKGCONFIG) libwebsockets jansson; echo $$?),0) - LIB_SRCS += lib/nodes/websocket.c lib/webmsg.c + LIB_SRCS += lib/nodes/websocket.c + LIB_SRCS += lib/formats/webmsg.c LIB_PKGS += libwebsockets jansson LIB_CFLAGS += -DWITH_WEBSOCKET endif diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index 10d588819..14d60c0b0 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -27,7 +27,7 @@ #include "plugin.h" #include "nodes/nanomsg.h" #include "utils.h" -#include "msg.h" +#include "formats/msg.h" int nanomsg_reverse(struct node *n) { diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index f69f7db52..cb7c5c62e 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -42,8 +42,8 @@ #define WITH_NETEM #endif /* WITH_LIBNL_ROUTE_30 */ -#include "msg.h" -#include "msg_format.h" +#include "formats/msg.h" +#include "formats/msg_format.h" #include "sample.h" #include "queue.h" #include "plugin.h" diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 6aa153072..b4fd7b7c0 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -27,12 +27,12 @@ #include #include "super_node.h" -#include "webmsg.h" -#include "webmsg_format.h" #include "timing.h" #include "utils.h" #include "plugin.h" +#include "formats/webmsg.h" +#include "formats/webmsg_format.h" #include "nodes/websocket.h" /* Private static storage */ diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 0b84fe479..d309a1d7a 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -30,7 +30,7 @@ #include "utils.h" #include "queue.h" #include "plugin.h" -#include "msg.h" +#include "formats/msg.h" static void *context; diff --git a/src/hook.c b/src/hook.c index 1dfaf2812..ca7a29600 100644 --- a/src/hook.c +++ b/src/hook.c @@ -31,7 +31,7 @@ #include #include -#include +#include #include #include #include @@ -45,12 +45,13 @@ int cnt; -struct sample **samples; +struct sample **smps; struct plugin *p; struct log l = { .state = STATE_DESTROYED }; struct pool q = { .state = STATE_DESTROYED }; struct hook h = { .state = STATE_DESTROYED }; +struct io io = { .state = STATE_DESTROYED }; static void quit(int signal, siginfo_t *sinfo, void *ctx) { @@ -64,7 +65,7 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) if (ret) error("Failed to destroy hook"); - sample_free(samples, cnt); + sample_free(smps, cnt); ret = pool_destroy(&q); if (ret) @@ -81,13 +82,20 @@ static void usage() printf(" NAME the name of the hook function\n"); printf(" PARAM* a string of configuration settings for the hook\n"); printf(" OPTIONS is one or more of the following options:\n"); + printf(" -f FMT the data format\n"); printf(" -h show this help\n"); printf(" -d LVL set debug level to LVL\n"); - printf(" -v CNT process CNT samples at once\n"); + printf(" -v CNT process CNT smps at once\n"); printf("\n"); + printf("The following hook functions are supported:\n"); plugin_dump(PLUGIN_TYPE_HOOK); printf("\n"); + + printf("Supported IO formats:\n"); + plugin_dump(PLUGIN_TYPE_FORMAT); + printf("\n"); + printf("Example:"); printf(" villas-signal random | villas-hook skip_first seconds=10\n"); printf("\n"); @@ -98,6 +106,7 @@ static void usage() int main(int argc, char *argv[]) { int ret; + char *format = "villas"; size_t recv; @@ -109,6 +118,8 @@ int main(int argc, char *argv[]) char c, *endptr; while ((c = getopt(argc, argv, "hv:d:f:o:")) != -1) { switch (c) { + case 'f': + format = optarg; break; case 'v': cnt = strtoul(optarg, &endptr, 0); @@ -139,34 +150,51 @@ check: if (optarg == endptr) exit(EXIT_FAILURE); } - char *hookstr = argv[optind]; - - ret = signals_init(quit); - if (ret) - error("Failed to intialize signals"); + char *hook = argv[optind]; ret = log_init(&l, l.level, LOG_ALL); if (ret) error("Failed to initialize log"); - log_start(&l); + ret = log_start(&l); + if (ret) + error("Failed to start log"); + + ret = signals_init(quit); + if (ret) + error("Failed to intialize signals"); if (cnt < 1) error("Vectorize option must be greater than 0"); - memory_init(DEFAULT_NR_HUGEPAGES); + ret = memory_init(DEFAULT_NR_HUGEPAGES); + if (ret) + error("Failed to initialize memory"); - samples = alloc(cnt * sizeof(struct sample *)); + smps = alloc(cnt * sizeof(struct sample *)); ret = pool_init(&q, 10 * cnt, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage); if (ret) error("Failed to initilize memory pool"); - p = plugin_lookup(PLUGIN_TYPE_HOOK, hookstr); + /* Initialize IO */ + p = plugin_lookup(PLUGIN_TYPE_FORMAT, format); if (!p) - error("Unknown hook function '%s'", hookstr); + error("Unknown IO format '%s'", format); + + ret = io_init(&io, &p->io, IO_FORMAT_ALL); + if (ret) + error("Failed to initialize IO"); + + ret = io_open(&io, NULL, NULL); + if (ret) + error("Failed to open IO"); + + /* Initialize hook */ + p = plugin_lookup(PLUGIN_TYPE_HOOK, hook); + if (!p) + error("Unknown hook function '%s'", hook); - /** @todo villas-hook does not use the path structure */ ret = hook_init(&h, &p->hook, NULL); if (ret) error("Failed to initialize hook"); @@ -180,35 +208,25 @@ check: if (optarg == endptr) error("Failed to start hook"); for (;;) { - if (feof(stdin)) { + if (io_eof(&io)) { killme(SIGTERM); pause(); } - ret = sample_alloc(&q, samples, cnt); + ret = sample_alloc(&q, smps, cnt); if (ret != cnt) - error("Failed to allocate %d samples from pool", cnt); + error("Failed to allocate %d smps from pool", cnt); - recv = 0; - for (int j = 0; j < cnt && !feof(stdin); j++) { - ret = sample_io_villas_fscan(stdin, samples[j], NULL); - if (ret < 0) - break; + recv = io_scan(&io, smps, cnt); - samples[j]->ts.received = time_now(); - recv++; - } + debug(15, "Read %zu smps from stdin", recv); - debug(15, "Read %zu samples from stdin", recv); + hook_read(&h, smps, &recv); + hook_write(&h, smps, &recv); - hook_read(&h, samples, &recv); - hook_write(&h, samples, &recv); + io_print(&io, smps, recv); - for (int j = 0; j < recv; j++) - sample_io_villas_fprint(stdout, samples[j], SAMPLE_IO_ALL); - fflush(stdout); - - sample_free(samples, cnt); + sample_free(smps, cnt); } return 0; diff --git a/src/node.c b/src/node.c index acffd9f4a..9b73e5c35 100644 --- a/src/node.c +++ b/src/node.c @@ -75,6 +75,7 @@ static void usage() printf(" This type of invocation is used by OPAL-RT Asynchronous processes.\n"); printf(" See in the RT-LAB User Guide for more information.\n\n"); #endif + printf("Supported node-types:\n"); plugin_dump(PLUGIN_TYPE_NODE); printf("\n"); @@ -87,6 +88,10 @@ static void usage() plugin_dump(PLUGIN_TYPE_API); printf("\n"); + printf("Supported IO formats:\n"); + plugin_dump(PLUGIN_TYPE_FORMAT); + printf("\n"); + print_copyright(); exit(EXIT_FAILURE); diff --git a/src/pipe.c b/src/pipe.c index a8acb8251..2f1c014d6 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -44,8 +45,9 @@ #include "config.h" static struct super_node sn = { .state = STATE_DESTROYED }; /**< The global configuration */ +static struct io io = { .state = STATE_DESTROYED }; -struct dir { +static struct dir { struct pool pool; pthread_t thread; bool enabled; @@ -84,21 +86,22 @@ static void usage() printf(" CONFIG path to a configuration file\n"); printf(" NODE the name of the node to which samples are sent and received from\n"); printf(" OPTIONS are:\n"); - printf(" -f FMT set the format\n") - printf(" -d LVL set debug log level to LVL\n"); - printf(" -x swap read / write endpoints\n"); - printf(" -s only read data from stdin and send it to node\n"); - printf(" -r only read data from node and write it to stdout\n"); - printf(" -t NUM terminate after NUM seconds\n"); - printf(" -L NUM terminate after NUM samples sent\n"); - printf(" -l NUM terminate after NUM samples received\n\n"); + printf(" -f FMT set the format\n"); + printf(" -d LVL set debug log level to LVL\n"); + printf(" -o OPTION=VALUE overwrite options in config file\n"); + printf(" -x swap read / write endpoints\n"); + printf(" -s only read data from stdin and send it to node\n"); + printf(" -r only read data from node and write it to stdout\n"); + printf(" -t NUM terminate after NUM seconds\n"); + printf(" -L NUM terminate after NUM samples sent\n"); + printf(" -l NUM terminate after NUM samples received\n\n"); print_copyright(); } static void * send_loop(void *ctx) { - int ret, cnt = 0; + int ret, len, sent, cnt = 0; struct sample *smps[node->vectorize]; /* Initialize memory */ @@ -110,42 +113,30 @@ static void * send_loop(void *ctx) if (ret < 0) error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret); - while (!feof(stdin)) { - int len; - for (len = 0; len < node->vectorize; len++) { - struct sample *s = smps[len]; - int reason; + while (!io_eof(&io)) { + len = io_scan(&io, smps, node->vectorize); + if (len <= 0) + continue; - if (sendd.limit > 0 && cnt >= sendd.limit) - break; - -retry: reason = sample_io_villas_fscan(stdin, s, NULL); - if (reason < 0) { - if (feof(stdin)) - goto leave; - else { - warn("Skipped invalid message message: reason=%d", reason); - goto retry; - } - } - } - - cnt += node_write(node, smps, len); + sent = node_write(node, smps, len); + cnt += sent; if (sendd.limit > 0 && cnt >= sendd.limit) - goto leave2; + goto leave; pthread_testcancel(); } -leave2: info("Reached send limit. Terminating..."); - killme(SIGTERM); - - return NULL; - - /* We reached EOF on stdin here. Lets kill the process */ -leave: if (recvv.limit < 0) { - info("Reached end-of-file. Terminating..."); +leave: if (io_eof(&io)) { + if (recvv.limit < 0) { + info("Reached end-of-file. Terminating..."); + killme(SIGTERM); + } + else + info("Reached end-of-file. Wait for receive side..."); + } + else { + info("Reached send limit. Terminating..."); killme(SIGTERM); } @@ -164,26 +155,22 @@ static void * recv_loop(void *ctx) ret = sample_alloc(&recvv.pool, smps, node->vectorize); if (ret < 0) - error("Failed to get %u samples out of receive pool (%d).", node->vectorize, ret); - - /* Print header */ - fprintf(stdout, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]"); - fflush(stdout); + error("Failed to allocate %u samples from receive pool.", node->vectorize); for (;;) { int recv = node_read(node, smps, node->vectorize); struct timespec now = time_now(); + /* Fix timestamps */ for (int i = 0; i < recv; i++) { struct sample *s = smps[i]; if (s->ts.received.tv_sec == -1 || s->ts.received.tv_sec == 0) s->ts.received = now; - - sample_io_villas_fprint(stdout, s, SAMPLE_IO_ALL); - fflush(stdout); } + io_print(&io, smps, recv); + cnt += recv; if (recvv.limit > 0 && cnt >= recvv.limit) goto leave; @@ -201,17 +188,21 @@ int main(int argc, char *argv[]) { int ret, level = V, timeout = 0; bool reverse = false; + char *format = "villas"; sendd = recvv = (struct dir) { .enabled = true, .limit = -1 }; - char c, *endptr; - while ((c = getopt(argc, argv, "hxrsd:l:L:t:f:")) != -1) { - switch (c) { - case 'f'; + json_t *cfg_cli = json_object(); + char c, *endptr; + while ((c = getopt(argc, argv, "hxrsd:l:L:t:f:o:")) != -1) { + switch (c) { + case 'f': + format = optarg; + break; case 'x': reverse = true; break; @@ -233,6 +224,11 @@ int main(int argc, char *argv[]) case 't': timeout = strtoul(optarg, &endptr, 10); goto check; + case 'o': + ret = json_object_extend_str(cfg_cli, optarg); + if (ret) + error("Invalid option: %s", optarg); + break; case 'h': case '?': usage(); @@ -243,7 +239,6 @@ int main(int argc, char *argv[]) check: if (optarg == endptr) error("Failed to parse parse option argument '-%c %s'", c, optarg); - } if (argc != optind + 2) { @@ -253,18 +248,48 @@ check: if (optarg == endptr) char *configfile = argv[optind]; char *nodestr = argv[optind+1]; + struct plugin *p; - log_init(&sn.log, level, LOG_ALL); - log_start(&sn.log); + ret = log_init(&sn.log, level, LOG_ALL); + if (ret) + error("Failed to intialize log"); - super_node_init(&sn); - super_node_parse_uri(&sn, configfile); + ret = log_start(&sn.log); + if (ret) + error("Failed to start log"); - memory_init(sn.hugepages); - signals_init(quit); - rt_init(sn.priority, sn.affinity); + ret = signals_init(quit); + if (ret) + error("Failed to initialize signals"); + + ret = memory_init(sn.hugepages); + if (ret) + error("Failed to initialize memory"); + + ret = rt_init(sn.priority, sn.affinity); + if (ret) + error("Failed to initalize real-time"); + + p = plugin_lookup(PLUGIN_TYPE_FORMAT, format); + if (!p) + error("Invalid format: %s", format); + + ret = io_init(&io, &p->io, IO_FORMAT_ALL); + if (ret) + error("Failed to initialize IO"); + + ret = io_open(&io, NULL, NULL); + if (ret) + error("Failed to open IO"); + + ret = super_node_init(&sn); + if (ret) + error("Failed to initialize super-node"); + + ret = super_node_parse_uri(&sn, configfile); + if (ret) + error("Failed to parse configuration"); - /* Initialize node */ node = list_lookup(&sn.nodes, nodestr); if (!node) error("Node '%s' does not exist!", nodestr); diff --git a/src/signal.c b/src/signal.c index 23df871b7..b7e3fceb2 100644 --- a/src/signal.c +++ b/src/signal.c @@ -30,7 +30,7 @@ #include #include -#include +#include #include #include #include @@ -39,6 +39,7 @@ /* Some default values */ struct node n; struct log l; +struct io io; struct sample *t; @@ -54,14 +55,15 @@ void usage() printf(" ramp\n"); printf("\n"); printf(" OPTIONS is one or more of the following options:\n"); - printf(" -d LVL set debug level\n"); - printf(" -v NUM specifies how many values a message should contain\n"); - printf(" -r HZ how many messages per second\n"); - printf(" -n non real-time mode. do not throttle output.\n"); - printf(" -f HZ the frequency of the signal\n"); - printf(" -a FLT the amplitude\n"); - printf(" -D FLT the standard deviation for 'random' signals\n"); - printf(" -l NUM only send LIMIT messages and stop\n\n"); + printf(" -d LVL set debug level\n"); + printf(" -f FMT set the format\n"); + printf(" -v NUM specifies how many values a message should contain\n"); + printf(" -r HZ how many messages per second\n"); + printf(" -n non real-time mode. do not throttle output.\n"); + printf(" -F HZ the frequency of the signal\n"); + printf(" -a FLT the amplitude\n"); + printf(" -D FLT the standard deviation for 'random' signals\n"); + printf(" -l NUM only send LIMIT messages and stop\n\n"); print_copyright(); } @@ -78,6 +80,14 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) if (ret) error("Failed to destroy node"); + ret = io_close(&io); + if (ret) + error("Failed to close output"); + + ret = io_destroy(&io); + if (ret) + error("Failed to destroy output"); + ret = log_stop(&l); if (ret) error("Failed to stop log"); @@ -92,7 +102,8 @@ int main(int argc, char *argv[]) { int ret; struct plugin *p; - struct signal *s; + + char *format = "villas"; /** @todo hardcoded for now */ ret = log_init(&l, l.level, LOG_ALL); if (ret) @@ -114,6 +125,18 @@ int main(int argc, char *argv[]) if (ret) error("Failed to initialize node"); + p = plugin_lookup(PLUGIN_TYPE_FORMAT, format); + if (!p) + error("Invalid output format '%s'", format); + + ret = io_init(&io, &p->io, IO_FLAG_FLUSH | (IO_FORMAT_ALL & ~IO_FORMAT_OFFSET)); + if (ret) + error("Failed to initialize output"); + + ret = io_open(&io, NULL, NULL); + if (ret) + error("Failed to open output"); + ret = node_parse_cli(&n, argc, argv); if (ret) error("Failed to parse command line options"); @@ -122,29 +145,20 @@ int main(int argc, char *argv[]) if (ret) error("Failed to verify node configuration"); - info("Starting signal generation: %s", node_name(&n)); - /* Allocate memory for message buffer */ - s = n._vd; + struct signal *s = n._vd; t = alloc(SAMPLE_LEN(s->values)); t->capacity = s->values; - /* Print header */ - printf("# VILLASnode signal params: type=%s, values=%u, rate=%f, limit=%d, amplitude=%f, freq=%f\n", - argv[optind], s->values, s->rate, s->limit, s->amplitude, s->frequency); - printf("# %-20s\t\t%s\n", "sec.nsec(seq)", "data[]"); - ret = node_start(&n); if (ret) serror("Failed to start node"); for (;;) { node_read(&n, &t, 1); - - sample_io_villas_fprint(stdout, t, SAMPLE_IO_ALL & ~SAMPLE_IO_OFFSET); - fflush(stdout); + io_print(&io, &t, 1); } return 0; diff --git a/src/test-cmp.c b/src/test-cmp.c index eb122bea6..427f29be9 100644 --- a/src/test-cmp.c +++ b/src/test-cmp.c @@ -28,7 +28,8 @@ #include #include -#include +#include +#include #include #include #include @@ -103,11 +104,21 @@ check: if (optarg == endptr) f1.path = argv[optind]; f2.path = argv[optind + 1]; - log_init(&log, V, LOG_ALL); - log_start(&log); + ret = log_init(&log, V, LOG_ALL); + if (ret) + error("Failed to initialize log"); - pool_init(&pool, 1024, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_heap); - sample_alloc(&pool, samples, 2); + ret = log_start(&log); + if (ret) + error("Failed to start log"); + + ret = pool_init(&pool, 1024, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_heap); + if (ret) + error("Failed to initialize pool"); + + ret = sample_alloc(&pool, samples, 2); + if (ret != 2) + error("Failed to allocate samples"); f1.sample = samples[0]; f2.sample = samples[1]; @@ -121,11 +132,11 @@ check: if (optarg == endptr) serror("Failed to open file: %s", f2.path); while (!feof(f1.handle) && !feof(f2.handle)) { - ret = sample_io_villas_fscan(f1.handle, f1.sample, &f1.flags); + ret = io_format_villas_fscan(f1.handle, &f1.sample, 1, &f1.flags); if (ret < 0 && !feof(f1.handle)) goto out; - ret = sample_io_villas_fscan(f2.handle, f2.sample, &f2.flags); + ret = io_format_villas_fscan(f2.handle, &f2.sample, 1, &f2.flags); if (ret < 0 && !feof(f2.handle)) goto out; @@ -139,7 +150,7 @@ check: if (optarg == endptr) } /* Compare sequence no */ - if ((f1.flags & SAMPLE_IO_SEQUENCE) && (f2.flags & SAMPLE_IO_SEQUENCE)) { + if ((f1.flags & IO_FORMAT_SEQUENCE) && (f2.flags & IO_FORMAT_SEQUENCE)) { if (f1.sample->sequence != f2.sample->sequence) { printf("sequence no: %d != %d\n", f1.sample->sequence, f2.sample->sequence); ret = 2; diff --git a/tests/unit/advio.c b/tests/unit/advio.c index 96bbbd608..aa8790a3e 100644 --- a/tests/unit/advio.c +++ b/tests/unit/advio.c @@ -28,7 +28,7 @@ #include #include #include -#include +#include /** This URI points to a Sciebo share which contains some test files. * The Sciebo share is read/write accessible via WebDAV. */ @@ -40,12 +40,12 @@ Test(advio, local) int ret; char *buf = NULL; size_t buflen = 0; - + /* We open this file and check the first line */ af = afopen(__FILE__, "r"); cr_assert(af, "Failed to open local file"); - ret = getline(&buf, &buflen, af->file); + ret = getline(&buf, &buflen, af->file); cr_assert_gt(ret, 1); cr_assert_str_eq(buf, "/** Unit tests for advio\n"); } @@ -74,26 +74,26 @@ Test(advio, download_large) { AFILE *af; int ret, len = 16; - + struct sample *smp = alloc(SAMPLE_LEN(len)); smp->capacity = len; af = afopen(BASE_URI "/download-large" , "r"); cr_assert(af, "Failed to download file"); - ret = sample_io_villas_fscan(af->file, smp, NULL); - cr_assert_eq(ret, 0); - + ret = io_format_villas_fscan(af->file, &smp, 1, NULL); + cr_assert_eq(ret, 1); + cr_assert_eq(smp->sequence, 0); cr_assert_eq(smp->length, 4); cr_assert_eq(smp->ts.origin.tv_sec, 1497710378); cr_assert_eq(smp->ts.origin.tv_nsec, 863332240); - + float data[] = { 0.022245, 0.000000, -1.000000, 1.000000 }; - + for (int i = 0; i < smp->length; i++) cr_assert_float_eq(smp->data[i].f, data[i], 1e-5); - + ret = afclose(af); cr_assert_eq(ret, 0, "Failed to close file"); } @@ -106,44 +106,44 @@ Test(advio, resume) char line1[32]; char *line2 = NULL; size_t linelen = 0; - + mkdtemp(dir); ret = asprintf(&fn, "%s/file", dir); cr_assert_gt(ret, 0); - + af1 = afopen(fn, "w+"); cr_assert_not_null(af1); - + /* We flush once the empty file in order to upload an empty file. */ aupload(af1, 0); - + af2 = afopen(fn, "r"); cr_assert_not_null(af2); - + for (int i = 0; i < 100; i++) { snprintf(line1, sizeof(line1), "This is line %d\n", i); - + afputs(line1, af1); aupload(af1, 1); - + adownload(af2, 1); agetline(&line2, &linelen, af2); - + cr_assert_str_eq(line1, line2); } - + ret = afclose(af1); cr_assert_eq(ret, 0); - + ret = afclose(af2); cr_assert_eq(ret, 0); - + ret = unlink(fn); cr_assert_eq(ret, 0); - + ret = rmdir(dir); cr_assert_eq(ret, 0); - + free(line2); } @@ -236,4 +236,4 @@ Test(advio, append) cr_assert(ret == 0, "Failed to close file"); cr_assert_arr_eq(buffer, expect, sizeof(expect)); -} \ No newline at end of file +} diff --git a/tests/unit/io.c b/tests/unit/io.c new file mode 100644 index 000000000..431f7132d --- /dev/null +++ b/tests/unit/io.c @@ -0,0 +1,148 @@ +/** Unit tests for IO formats. + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include +#include +#include + +#include "utils.h" +#include "timing.h" +#include "sample.h" +#include "plugin.h" +#include "pool.h" +#include "io.h" + +#define NUM_SAMPLES 10 +#define NUM_VALUES 10 + +void cr_assert_eq_sample(struct sample *s, struct sample *t) +{ + cr_assert_eq(s->length, t->length); + cr_assert_eq(s->sequence, t->sequence); + cr_assert_eq(s->format, t->format); + + cr_assert_eq(s->ts.origin.tv_sec, t->ts.origin.tv_sec); + cr_assert_eq(s->ts.origin.tv_nsec, t->ts.origin.tv_nsec); + cr_assert_eq(s->ts.received.tv_sec, t->ts.received.tv_sec); + cr_assert_eq(s->ts.received.tv_nsec, t->ts.received.tv_nsec); + + for (int i = 0; i < MIN(s->length, t->length); i++) + cr_assert_float_eq(s->data[i].f, t->data[i].f, 1e-6); +} + +ParameterizedTestParameters(io, highlevel) +{ + static char formats[][32] = { + "villas", + "json", + "csv" + }; + + return cr_make_param_array(char[32], formats, ARRAY_LEN(formats)); +} + +ParameterizedTest(char *fmt, io, highlevel) +{ + int ret; + char filename[64]; + + struct io io; + struct pool p = { .state = STATE_DESTROYED }; + struct sample *smps[NUM_SAMPLES]; + struct sample *smpt[NUM_SAMPLES]; + + ret = pool_init(&p, 2 * NUM_SAMPLES, SAMPLE_LEN(NUM_VALUES), &memtype_hugepage); + cr_assert_eq(ret, 0); + + /* Prepare a sample with arbitrary data */ + ret = sample_alloc(&p, smps, NUM_SAMPLES); + cr_assert_eq(ret, NUM_SAMPLES); + + ret = sample_alloc(&p, smpt, NUM_SAMPLES); + cr_assert_eq(ret, NUM_SAMPLES); + + for (int i = 0; i < NUM_SAMPLES; i++) { + smpt[i]->capacity = + smps[i]->capacity = NUM_VALUES; + smps[i]->length = NUM_VALUES; + smps[i]->sequence = 235 + i; + smps[i]->format = 0; /* all float */ + smps[i]->ts.origin = time_now(); + smps[i]->ts.received = (struct timespec) { + .tv_sec = smps[i]->ts.origin.tv_sec - 1, + .tv_nsec = smps[i]->ts.origin.tv_nsec + }; + + for (int j = 0; j < smps[i]->length; j++) + smps[i]->data[j].f = j * 1.2 + i * 100; + } + + /* Open a file for IO */ + strncpy(filename, "/tmp/villas-unit-test.XXXXXX", sizeof(filename)); + mktemp(filename); + + printf("Writing to file: %s\n", filename); + + struct plugin *pl; + + pl = plugin_lookup(PLUGIN_TYPE_FORMAT, fmt); + cr_assert_not_null(pl, "Format '%s' does not exist", fmt); + + ret = io_init(&io, &pl->io, IO_FORMAT_ALL); + cr_assert_eq(ret, 0); + + ret = io_open(&io, filename, "w+"); + cr_assert_eq(ret, 0); + + ret = io_print(&io, smps, NUM_SAMPLES); + cr_assert_eq(ret, NUM_SAMPLES); + + io_rewind(&io); + io_flush(&io); + + char cmd[128]; + snprintf(cmd, sizeof(cmd), "cat %s", filename); + system(cmd); + + ret = io_scan(&io, smpt, NUM_SAMPLES); + cr_assert_eq(ret, NUM_SAMPLES, "Read only %d of %d samples back", ret, NUM_SAMPLES); + + for (int i = 0; i < 0; i++) + cr_assert_eq_sample(smps[i], smpt[i]); + + ret = io_close(&io); + cr_assert_eq(ret, 0); + + ret = io_destroy(&io); + cr_assert_eq(ret, 0); + + ret = unlink(filename); + cr_assert_eq(ret, 0); + + sample_free(smps, NUM_SAMPLES); + sample_free(smpt, NUM_SAMPLES); + + ret = pool_destroy(&p); + cr_assert_eq(ret, 0); +} diff --git a/tests/unit/sample_io.c b/tests/unit/sample_io.c deleted file mode 100644 index fee5a7b62..000000000 --- a/tests/unit/sample_io.c +++ /dev/null @@ -1,95 +0,0 @@ -/** Unit tests for the sample_io module. - * - * @author Steffen Vogel - * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#include - -#include -#include - -#include "utils.h" -#include "timing.h" -#include "sample.h" -#include "sample_io.h" - -ParameterizedTestParameters(sample_io, read_write) -{ - static enum sample_io_format formats[] = { - SAMPLE_IO_FORMAT_VILLAS, - SAMPLE_IO_FORMAT_JSON, - }; - - return cr_make_param_array(enum sample_io_format, formats, ARRAY_LEN(formats)); -} - -ParameterizedTest(enum sample_io_format *fmt, sample_io, read_write) -{ - FILE *f; - int ret; - struct sample *s, *t; - - /* Prepare a sample with arbitrary data */ - s = malloc(SAMPLE_LEN(16)); - cr_assert_not_null(s); - - t = malloc(SAMPLE_LEN(16)); - cr_assert_not_null(s); - - t->capacity = - s->capacity = 16; - s->length = 12; - s->sequence = 235; - s->format = 0; - s->ts.origin = time_now(); - s->ts.received = (struct timespec) { s->ts.origin.tv_sec - 1, s->ts.origin.tv_nsec }; - - for (int i = 0; i < s->length; i++) - s->data[i].f = i * 1.2; - - /* Open a file for IO */ - f = tmpfile(); - cr_assert_not_null(f); - - ret = sample_io_fprint(f, s, *fmt, SAMPLE_IO_ALL); - cr_assert_eq(ret, 0); - - rewind(f); - - ret = sample_io_fscan(f, t, *fmt, NULL); - cr_assert_eq(ret, 0); - - cr_assert_eq(s->length, t->length); - cr_assert_eq(s->sequence, t->sequence); - cr_assert_eq(s->format, t->format); - - cr_assert_eq(s->ts.origin.tv_sec, t->ts.origin.tv_sec); - cr_assert_eq(s->ts.origin.tv_nsec, t->ts.origin.tv_nsec); - cr_assert_eq(s->ts.received.tv_sec, t->ts.received.tv_sec); - cr_assert_eq(s->ts.received.tv_nsec, t->ts.received.tv_nsec); - - for (int i = 0; i < MIN(s->length, t->length); i++) - cr_assert_float_eq(s->data[i].f, t->data[i].f, 1e-6); - - fclose(f); - - free(s); - free(t); -} \ No newline at end of file