1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

adding pluggable IO formats

This commit is contained in:
Steffen Vogel 2017-08-05 21:02:09 +02:00
parent 7eaa93cd36
commit c5fc72dd78
30 changed files with 900 additions and 389 deletions

View file

@ -23,13 +23,13 @@
#pragma once
#include "advio.h"
#include <stdio.h>
/* Forward declarations. */
struct sample;
#define CSV_SEPARATOR '\t'
int io_format_csv_fprint(AFILE *f, struct sample *smp, int flags);
int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags);
int io_format_csv_fscan(AFILE *f, struct sample *smp, int *flags);
int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags);

View file

@ -30,6 +30,6 @@ int io_format_json_pack(json_t **j, struct sample *s, int flags);
int io_format_json_unpack(json_t *j, struct sample *s, int *flags);
int io_format_json_fprint(AFILE *f, struct sample *s, int flags);
int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags);
int io_format_json_fscan(AFILE *f, struct sample *s, int *flags);
int io_format_json_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags);

View file

@ -25,12 +25,12 @@
#include <stdio.h>
/* VILLASnode human readable format */
#include "io.h"
int io_format_villas_print(char *buf, size_t len, struct sample *s, int flags);
int io_format_villas_print(struct io *io, struct sample *smps[], size_t cnt);
int io_format_villas_scan(const char *line, struct sample *s, int *fl);
int io_format_villas_scan(struct io *io, struct sample *smps[], size_t cnt);
int io_format_villas_fprint(FILE *f, struct sample *s, int flags);
int io_format_villas_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags);
int io_format_villas_fscan(FILE *f, struct sample *s, int *flags);
int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags);

View file

@ -24,6 +24,7 @@
#pragma once
#include "advio.h"
#include "common.h"
/* Forward declarations */
struct sample;
@ -31,34 +32,113 @@ struct io;
/** These flags define the format which is used by io_fscan() and io_fprint(). */
enum io_flags {
IO_FORMAT_NANOSECONDS = (1 << 0),
IO_FORMAT_OFFSET = (1 << 1),
IO_FORMAT_SEQUENCE = (1 << 2),
IO_FORMAT_VALUES = (1 << 3),
IO_FORMAT_ALL = 16-1
IO_FORMAT_NANOSECONDS = (1 << 0), /**< Include nanoseconds in output. */
IO_FORMAT_OFFSET = (1 << 1), /**< Include offset / delta between received and send timestamps. */
IO_FORMAT_SEQUENCE = (1 << 2), /**< Include sequence number in output. */
IO_FORMAT_VALUES = (1 << 3), /**< Include values in output. */
IO_FORMAT_ALL = 16-1, /**< Enable all output options. */
IO_FLAG_FLUSH = (1 << 8), /**< Flush the output stream after each chunk of samples. */
};
struct io_format {
int (*init)(struct io *io);
int (*destroy)(struct io *io);
/** @{
* High-level interface
*/
/** Open an IO stream.
*
* @see fopen()
*/
int (*open)(struct io *io, const char *uri, const char *mode);
/** Close an IO stream.
*
* @see fclose()
*/
int (*close)(struct io *io);
/** Check if end-of-file was reached.
*
* @see feof()
*/
int (*eof)(struct io *io);
/** Rewind an IO stream.
*
* @see rewind()
*/
void (*rewind)(struct io *io);
int (*print)(struct io *io, struct sample *s, int fl);
int (*scan)(struct io *io, struct sample *s, int *fl);
/** Flush buffered data to disk.
*
* @see fflush()
*/
int (*flush)(struct io *io);
int (*print)(struct io *io, struct sample *smps[], size_t cnt);
int (*scan)( struct io *io, struct sample *smps[], size_t cnt);
/** @} */
/** @{
* Low-level interface
*/
/** Parse samples from the buffer \p buf with a length of \p len bytes.
*
* @return The number of bytes consumed of \p buf.
*/
size_t (*sscan)( char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags);
/** Print \p cnt samples from \p smps into buffer \p buf of length \p len.
*
* @return The number of bytes written to \p buf.
*/
size_t (*sprint)(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags);
/** Parse up to \p cnt samples from stream \p f into array \p smps.
*
* @return The number of samples parsed.
*/
int (*fscan)( FILE *f, struct sample *smps[], size_t cnt, int *flags);
/** Print \p cnt samples from \p smps to stream \p f.
*
* @return The number of samples written to \p f.
*/
int (*fprint)(FILE *f, struct sample *smps[], size_t cnt, int flags);
/** @} */
size_t size; /**< Number of bytes to allocate for io::_vd */
};
struct io {
enum state state;
int flags;
enum {
IO_MODE_STDIO,
IO_MODE_ADVIO,
IO_MODE_CUSTOM
} mode;
/** A format type can use this file handle or overwrite the
* io_format::{open,close,eof,rewind} functions and the private
* data in io::_vd.
*/
AFILE *file;
union {
struct {
FILE *input;
FILE *output;
} stdio;
struct {
AFILE *input;
AFILE *output;
} advio;
};
void *_vd;
struct io_format *_vt;
@ -79,3 +159,16 @@ int io_scan(struct io *io, struct sample *smps[], size_t cnt);
int io_eof(struct io *io);
void io_rewind(struct io *io);
int io_flush(struct io *io);
int io_stream_open(struct io *io, const char *uri, const char *mode);
int io_stream_close(struct io *io);
int io_stream_eof(struct io *io);
void io_stream_rewind(struct io *io);
int io_stream_flush(struct io *io);

View file

@ -44,6 +44,8 @@ struct super_node {
struct api api;
struct web web;
char *name; /**< A name of this super node. Usually the hostname. */
struct {
int argc;
char **argv;

View file

@ -20,4 +20,18 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
###################################################################################
LIB_SRCS += $(addprefix lib/formats/,villas.c csv.c msg.c webmsg.c)
LIB_SRCS += $(addprefix lib/formats/,json.c villas.c csv.c)
WITH_HDF5 ?= 0
ifeq ($(PLATFORM),Darwin)
HDF5_PREFIX ?= /opt/local
else
HDF5_PREFIX ?= /usr
endif
ifeq ($(WITH_HDF5),1)
ifneq ($(wildcard $(HDF5_PREFIX)/include/hdf5_hl.h),)
LIB_SRCS += lib/formats/hdf5.c
endif
endif

View file

@ -25,50 +25,44 @@
#include "formats/csv.h"
#include "plugin.h"
#include "sample.h"
#include "timing.h"
int io_format_csv_fprint(AFILE *f, struct sample *s, int flags)
int io_format_csv_fprint_single(FILE *f, struct sample *s, int flags)
{
afprintf(f, "%ld %09ld %d", s->ts.origin.tv_sec, s->ts.origin.tv_nsec, s->sequence);
fprintf(f, "%ld %09ld %d", s->ts.origin.tv_sec, s->ts.origin.tv_nsec, s->sequence);
for (int i = 0; i < s->length; i++) {
switch ((s->format >> i) & 0x1) {
case SAMPLE_DATA_FORMAT_FLOAT:
afprintf(f, "%c%.6f", CSV_SEPARATOR, s->data[i].f);
fprintf(f, "%c%.6f", CSV_SEPARATOR, s->data[i].f);
break;
case SAMPLE_DATA_FORMAT_INT:
afprintf(f, "%c%d", CSV_SEPARATOR, s->data[i].i);
fprintf(f, "%c%d", CSV_SEPARATOR, s->data[i].i);
break;
}
}
fputc('\n', f);
return 0;
}
int io_format_csv_fscan(AFILE *f, struct sample *smp, int *flags)
size_t io_format_csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flags)
{
int ret, off;
char *ptr, line[4096];
skip: if (afgets(line, sizeof(line), f) == NULL)
return -1; /* An error occured */
/* Skip whitespaces, empty and comment lines */
for (ptr = line; isspace(*ptr); ptr++);
if (*ptr == '\0' || *ptr == '#')
goto skip;
ret = sscanf(line, "%ld %09ld %d %n", &smp->ts.origin.tv_sec, &smp->ts.origin.tv_nsec, &smp->sequence, &off);
if (ret != 4)
ret = sscanf(buf, "%ld %ld %d %n", &s->ts.origin.tv_sec, &s->ts.origin.tv_nsec, &s->sequence, &off);
if (ret != 3)
return -1;
int i;
for (i = 0; i < smp->capacity; i++) {
switch (smp->format & (1 << i)) {
for (i = 0; i < s->capacity; i++) {
switch (s->format & (1 << i)) {
case SAMPLE_DATA_FORMAT_FLOAT:
ret = sscanf(line + off, "%f %n", &smp->data[i].f, &off);
ret = sscanf(buf + off, "%f %n", &s->data[i].f, &off);
break;
case SAMPLE_DATA_FORMAT_INT:
ret = sscanf(line + off, "%d %n", &smp->data[i].i, &off);
ret = sscanf(buf + off, "%d %n", &s->data[i].i, &off);
break;
}
@ -76,19 +70,51 @@ skip: if (afgets(line, sizeof(line), f) == NULL)
break;
}
smp->length = i;
s->length = i;
s->ts.received = time_now();
return ret;
return 0;
}
int io_format_csv_print(struct io *io, struct sample *smp, int flags)
int io_format_csv_fscan_single(FILE *f, struct sample *s, int *flags)
{
return io_format_csv_fprint(io->file, smp, flags);
char *ptr, line[4096];
skip: if (fgets(line, sizeof(line), f) == NULL)
return -1; /* An error occured */
/* Skip whitespaces, empty and comment lines */
for (ptr = line; isspace(*ptr); ptr++);
if (*ptr == '\0' || *ptr == '#')
goto skip;
return io_format_csv_sscan_single(line, strlen(line), s, flags);
}
int io_format_csv_scan(struct io *io, struct sample *smp, int *flags)
int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags)
{
return io_format_csv_fscan(io->file, smp, flags);
int ret, i;
for (i = 0; i < cnt; i++) {
ret = io_format_csv_fprint_single(f, smps[i], flags);
if (ret < 0)
break;
}
return i;
}
int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = io_format_csv_fscan_single(f, smps[i], flags);
if (ret < 0) {
warn("Failed to read CSV line: %d", ret);
break;
}
}
return i;
}
static struct plugin p = {
@ -96,8 +122,8 @@ static struct plugin p = {
.description = "Tabulator-separated values",
.type = PLUGIN_TYPE_FORMAT,
.io = {
.scan = io_format_csv_scan,
.print = io_format_csv_print,
.fprint = io_format_csv_fprint,
.fscan = io_format_csv_fscan,
.size = 0
}
};

View file

@ -90,47 +90,93 @@ int io_format_json_unpack(json_t *j, struct sample *s, int *flags)
return 0;
}
int io_format_json_fprint(AFILE *f, struct sample *s, int flags)
size_t io_format_json_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags)
{
int ret;
int i, ret;
json_t *json;
size_t wr, off = 0;
ret = io_format_json_pack(&json, s, flags);
if (ret)
return ret;
for (i = 0; i < cnt; i++) {
ret = io_format_json_pack(&json, smps[i], flags);
if (ret)
return ret;
ret = json_dumpf(json, f->file, 0);
wr = json_dumpb(json, buf + off, len - off, 0);
json_decref(json);
json_decref(json);
return ret;
if (wr > len)
break;
off += wr;
}
return i;
}
int io_format_json_fscan(AFILE *f, struct sample *s, int *flags)
size_t io_format_json_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags)
{
int ret;
int i, ret;
json_t *json;
json_error_t err;
size_t off = 0;
for (i = 0; i < cnt; i++) {
json = json_loadb(buf + off, len - off, JSON_DISABLE_EOF_CHECK, &err);
if (!json)
break;
off += err.position;
ret = io_format_json_unpack(json, smps[i], flags);
json_decref(json);
if (ret)
break;
}
return i;
}
int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags)
{
int ret, i;
json_t *json;
for (i = 0; i < cnt; i++) {
ret = io_format_json_pack(&json, smps[i], flags);
if (ret)
return ret;
ret = json_dumpf(json, f, 0);
fputc('\n', f);
json_decref(json);
}
return i;
}
int io_format_json_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags)
{
int i, ret;
json_t *json;
json_error_t err;
json = json_loadf(f->file, JSON_DISABLE_EOF_CHECK, &err);
if (!json)
return -1;
for (i = 0; i < cnt; i++) {
skip: json = json_loadf(f, JSON_DISABLE_EOF_CHECK, &err);
if (!json)
break;
ret = io_format_json_unpack(json, s, flags);
ret = io_format_json_unpack(json, smps[i], flags);
if (ret)
goto skip;
json_decref(json);
json_decref(json);
}
return ret;
}
int io_format_json_print(struct io *io, struct sample *smp, int flags)
{
return io_format_json_fprint(io->file, smp, flags);
}
int io_format_json_scan(struct io *io, struct sample *smp, int *flags)
{
return io_format_json_fscan(io->file, smp, flags);
return i;
}
static struct plugin p = {
@ -138,8 +184,10 @@ static struct plugin p = {
.description = "Javascript Object Notation",
.type = PLUGIN_TYPE_FORMAT,
.io = {
.print = io_format_json_print,
.scan = io_format_json_scan,
.fscan = io_format_json_fscan,
.fprint = io_format_json_fprint,
.sscan = io_format_json_sscan,
.sprint = io_format_json_sprint,
.size = 0
},
};

View file

@ -23,10 +23,11 @@
#include <arpa/inet.h>
#include <string.h>
#include "msg.h"
#include "msg_format.h"
#include "formats/msg.h"
#include "formats/msg_format.h"
#include "sample.h"
#include "utils.h"
#include "plugin.h"
void msg_ntoh(struct msg *m)
{
@ -150,3 +151,14 @@ int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t
return i;
}
static struct plugin p = {
.name = "msg",
.description = "VILLAS binary network format",
.type = PLUGIN_TYPE_FORMAT,
.io = {
.size = 0
},
};
REGISTER_PLUGIN(&p);

View file

@ -28,8 +28,9 @@
#include "utils.h"
#include "timing.h"
#include "sample.h"
#include "formats/villas.h"
int io_format_villas_print(char *buf, size_t len, struct sample *s, int flags)
size_t io_format_villas_sprint_single(char *buf, size_t len, struct sample *s, int flags)
{
size_t off = snprintf(buf, len, "%llu", (unsigned long long) s->ts.origin.tv_sec);
@ -57,15 +58,15 @@ int io_format_villas_print(char *buf, size_t len, struct sample *s, int flags)
off += snprintf(buf + off, len - off, "\n");
return 0; /* trailing '\0' */
return off;
}
int io_format_villas_scan(const char *line, struct sample *s, int *fl)
size_t io_format_villas_sscan_single(const char *buf, size_t len, struct sample *s, int *flags)
{
char *end;
const char *ptr = line;
const char *ptr = buf;
int flags = 0;
int fl = 0;
double offset = 0;
/* Format: Seconds.NanoSeconds+Offset(SequenceNumber) Value1 Value2 ...
@ -85,7 +86,7 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl)
s->ts.origin.tv_nsec = (uint32_t) strtoul(ptr, &end, 10);
if (ptr != end)
flags |= IO_FORMAT_NANOSECONDS;
fl |= IO_FORMAT_NANOSECONDS;
else
return -3;
}
@ -98,7 +99,7 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl)
offset = strtof(ptr, &end); /* offset is ignored for now */
if (ptr != end)
flags |= IO_FORMAT_OFFSET;
fl |= IO_FORMAT_OFFSET;
else
return -4;
}
@ -109,7 +110,7 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl)
s->sequence = strtoul(ptr, &end, 10);
if (ptr != end)
flags |= IO_FORMAT_SEQUENCE;
fl |= IO_FORMAT_SEQUENCE;
else
return -5;
@ -135,35 +136,42 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl)
}
if (s->length > 0)
flags |= IO_FORMAT_VALUES;
fl |= IO_FORMAT_VALUES;
if (fl)
*fl = flags;
if (flags & IO_FORMAT_OFFSET) {
if (flags)
*flags = fl;
if (fl & IO_FORMAT_OFFSET) {
struct timespec off = time_from_double(offset);
s->ts.received = time_add(&s->ts.origin, &off);
}
else
s->ts.received = s->ts.origin;
s->ts.received = time_now();
return 0;
return end - buf;
}
int io_format_villas_fprint(FILE *f, struct sample *s, int flags)
size_t io_format_villas_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags)
{
char line[4096];
int ret;
size_t off = 0;
ret = io_format_villas_print(line, sizeof(line), s, flags);
if (ret)
return ret;
for (int i = 0; i < cnt && off < len; i++)
off += io_format_villas_sprint_single(buf + off, len - off, smps[i], flags);
fputs(line, f);
return 0;
return off;
}
int io_format_villas_fscan(FILE *f, struct sample *s, int *fl)
size_t io_format_villas_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags)
{
size_t off = 0;
for (int i = 0; i < cnt && off < len; i++)
off += io_format_villas_sscan_single(buf + off, len - off, smps[i], flags);
return off;
}
int io_format_villas_fscan_single(FILE *f, struct sample *s, int *flags)
{
char *ptr, line[4096];
@ -175,16 +183,68 @@ skip: if (fgets(line, sizeof(line), f) == NULL)
if (*ptr == '\0' || *ptr == '#')
goto skip;
return io_format_villas_scan(line, s, fl);
return io_format_villas_sscan_single(line, strlen(line), s, flags);
}
struct plugin p = {
int io_format_villas_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags)
{
char line[4096];
int ret, i;
for (i = 0; i < cnt; i++) {
ret = io_format_villas_sprint_single(line, sizeof(line), smps[i], flags);
if (ret < 0)
break;
fputs(line, f);
}
return i;
}
int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = io_format_villas_fscan_single(f, smps[i], flags);
if (ret < 0)
return ret;
}
return i;
}
int io_format_villas_open(struct io *io, const char *uri, const char *mode)
{
int ret;
ret = io_stream_open(io, uri, mode);
if (ret)
return ret;
FILE *f = io->mode == IO_MODE_ADVIO
? io->advio.output->file
: io->stdio.output;
fprintf(f, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]");
if (io->flags & IO_FLAG_FLUSH)
io_flush(io);
return 0;
}
static struct plugin p = {
.name = "villas",
.description = "Human readable VILLAS format",
.description = "VILLAS human readable format",
.type = PLUGIN_TYPE_FORMAT,
.io = {
.open = io_format_villas_open,
.fprint = io_format_villas_fprint,
.fscan = io_format_villas_fscan,
.sprint = io_format_villas_sprint,
.sscan = io_format_villas_sscan,
.size = 0
}
};

View file

@ -26,8 +26,9 @@
#include <endian.h>
#endif
#include "webmsg.h"
#include "webmsg_format.h"
#include "plugin.h"
#include "formats/webmsg.h"
#include "formats/webmsg_format.h"
void webmsg_ntoh(struct webmsg *m)
{
@ -72,3 +73,14 @@ int webmsg_verify(struct webmsg *m)
else
return 0;
}
static struct plugin p = {
.name = "webmsg",
.description = "VILLAS binary format for websockets",
.type = PLUGIN_TYPE_FORMAT,
.io = {
.size = 0
},
};
REGISTER_PLUGIN(&p);

View file

@ -24,7 +24,7 @@
#include "timing.h"
#include "config.h"
#include "msg.h"
#include "formats/msg.h"
#include "hook.h"
#include "path.h"
#include "utils.h"

179
lib/io.c
View file

@ -21,9 +21,11 @@
*********************************************************************************/
#include <stdlib.h>
#include <stdio.h>
#include "io.h"
#include "utils.h"
#include "sample.h"
int io_init(struct io *io, struct io_format *fmt, int flags)
{
@ -48,55 +50,170 @@ int io_destroy(struct io *io)
return 0;
}
int io_open(struct io *io, const char *uri, const char *mode)
int io_stream_open(struct io *io, const char *uri, const char *mode)
{
if (io->_vt->open)
return io->_vt->open(io, uri, mode);
int ret;
if (uri) {
if (aislocal(uri)) {
io->mode = IO_MODE_STDIO;
io->stdio.input =
io->stdio.output = fopen(uri, mode);
if (io->stdio.output == NULL)
return -1;
ret = setvbuf(io->stdio.output, NULL, _IOLBF, BUFSIZ);
if (ret)
return -1;
}
else {
io->mode = IO_MODE_ADVIO;
io->advio.input =
io->advio.output = afopen(uri, mode);
if (io->advio.output == NULL)
return -1;
}
}
else {
io->file = afopen(uri, mode);
if (!io->file)
io->mode = IO_MODE_STDIO;
io->flags |= IO_FLAG_FLUSH;
io->stdio.input = stdin;
io->stdio.output = stdout;
ret = setvbuf(io->stdio.input, NULL, _IOLBF, BUFSIZ);
if (ret)
return -1;
return 0;
ret = setvbuf(io->stdio.output, NULL, _IOLBF, BUFSIZ);
if (ret)
return -1;
}
return 0;
}
int io_stream_close(struct io *io)
{
switch (io->mode) {
case IO_MODE_ADVIO:
return afclose(io->advio.input);
case IO_MODE_STDIO:
return io->stdio.input != stdin ? fclose(io->stdio.input) : 0;
case IO_MODE_CUSTOM:
return 0;
}
return -1;
}
int io_stream_flush(struct io *io)
{
switch (io->mode) {
case IO_MODE_ADVIO:
return afflush(io->advio.output);
case IO_MODE_STDIO:
return fflush(io->stdio.output);
case IO_MODE_CUSTOM:
return 0;
}
return -1;
}
int io_stream_eof(struct io *io)
{
switch (io->mode) {
case IO_MODE_ADVIO:
return afeof(io->advio.input);
case IO_MODE_STDIO:
return feof(io->stdio.input);
case IO_MODE_CUSTOM:
return 0;
}
return -1;
}
void io_stream_rewind(struct io *io)
{
switch (io->mode) {
case IO_MODE_ADVIO:
return arewind(io->advio.input);
case IO_MODE_STDIO:
return rewind(io->stdio.input);
case IO_MODE_CUSTOM: { }
}
}
int io_open(struct io *io, const char *uri, const char *mode)
{
return io->_vt->open
? io->_vt->open(io, uri, mode)
: io_stream_open(io, uri, mode);
}
int io_close(struct io *io)
{
return io->_vt->close ? io->_vt->close(io) : afclose(io->file);
return io->_vt->close
? io->_vt->close(io)
: io_stream_close(io);
}
int io_print(struct io *io, struct sample *smps[], size_t cnt)
int io_flush(struct io *io)
{
assert(io->_vt->print);
for (int i = 0; i < cnt; i++)
io->_vt->print(io, smps[i], io->flags);
return cnt;
}
int io_scan(struct io *io, struct sample *smps[], size_t cnt)
{
int ret;
assert(io->_vt->scan);
for (int i = 0; i < cnt && !io_eof(io); i++) {
ret = io->_vt->scan(io, smps[i], NULL);
if (ret < 0)
return i;
}
return cnt;
return io->_vt->flush
? io->_vt->flush(io)
: io_stream_flush(io);
}
int io_eof(struct io *io)
{
return io->_vt->eof ? io->_vt->eof(io) : afeof(io->file);
return io->_vt->eof
? io->_vt->eof(io)
: io_stream_eof(io);
}
void io_rewind(struct io *io)
{
io->_vt->rewind ? io->_vt->rewind(io) : arewind(io->file);
io->_vt->rewind
? io->_vt->rewind(io)
: io_stream_rewind(io);
}
int io_print(struct io *io, struct sample *smps[], size_t cnt)
{
int ret;
if (io->_vt->print)
ret = io->_vt->print(io, smps, cnt);
else {
FILE *f = io->mode == IO_MODE_ADVIO
? io->advio.output->file
: io->stdio.output;
ret = io->_vt->fprint(f, smps, cnt, io->flags);
}
if (io->flags & IO_FLAG_FLUSH)
io_flush(io);
return ret;
}
int io_scan(struct io *io, struct sample *smps[], size_t cnt)
{
if (io->_vt->scan)
return io->_vt->scan(io, smps, cnt);
else {
FILE *f = io->mode == IO_MODE_ADVIO
? io->advio.input->file
: io->stdio.input;
return io->_vt->fscan(f, smps, cnt, NULL);
}
}

View file

@ -90,8 +90,8 @@ endif
# Enable Socket node type when libnl3 is available
ifeq ($(WITH_SOCKET),1)
LIB_SRCS += $(addprefix lib/nodes/, socket.c)
LIB_SRCS += $(addprefix lib/, msg.c)
LIB_SRCS += lib/nodes/socket.c
LIB_SRCS += lib/formats/msg.c
LIB_CFLAGS += -DWITH_SOCKET
# libnl3 is optional but required for network emulation and IRQ pinning
@ -135,7 +135,8 @@ endif
# Enable WebSocket support
ifeq ($(WITH_WEBSOCKET),1)
ifeq ($(shell $(PKGCONFIG) libwebsockets jansson; echo $$?),0)
LIB_SRCS += lib/nodes/websocket.c lib/webmsg.c
LIB_SRCS += lib/nodes/websocket.c
LIB_SRCS += lib/formats/webmsg.c
LIB_PKGS += libwebsockets jansson
LIB_CFLAGS += -DWITH_WEBSOCKET
endif

View file

@ -27,7 +27,7 @@
#include "plugin.h"
#include "nodes/nanomsg.h"
#include "utils.h"
#include "msg.h"
#include "formats/msg.h"
int nanomsg_reverse(struct node *n)
{

View file

@ -42,8 +42,8 @@
#define WITH_NETEM
#endif /* WITH_LIBNL_ROUTE_30 */
#include "msg.h"
#include "msg_format.h"
#include "formats/msg.h"
#include "formats/msg_format.h"
#include "sample.h"
#include "queue.h"
#include "plugin.h"

View file

@ -27,12 +27,12 @@
#include <signal.h>
#include "super_node.h"
#include "webmsg.h"
#include "webmsg_format.h"
#include "timing.h"
#include "utils.h"
#include "plugin.h"
#include "formats/webmsg.h"
#include "formats/webmsg_format.h"
#include "nodes/websocket.h"
/* Private static storage */

View file

@ -30,7 +30,7 @@
#include "utils.h"
#include "queue.h"
#include "plugin.h"
#include "msg.h"
#include "formats/msg.h"
static void *context;

View file

@ -31,7 +31,7 @@
#include <villas/timing.h>
#include <villas/sample.h>
#include <villas/sample_io.h>
#include <villas/io.h>
#include <villas/hook.h>
#include <villas/utils.h>
#include <villas/pool.h>
@ -45,12 +45,13 @@
int cnt;
struct sample **samples;
struct sample **smps;
struct plugin *p;
struct log l = { .state = STATE_DESTROYED };
struct pool q = { .state = STATE_DESTROYED };
struct hook h = { .state = STATE_DESTROYED };
struct io io = { .state = STATE_DESTROYED };
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
@ -64,7 +65,7 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
if (ret)
error("Failed to destroy hook");
sample_free(samples, cnt);
sample_free(smps, cnt);
ret = pool_destroy(&q);
if (ret)
@ -81,13 +82,20 @@ static void usage()
printf(" NAME the name of the hook function\n");
printf(" PARAM* a string of configuration settings for the hook\n");
printf(" OPTIONS is one or more of the following options:\n");
printf(" -f FMT the data format\n");
printf(" -h show this help\n");
printf(" -d LVL set debug level to LVL\n");
printf(" -v CNT process CNT samples at once\n");
printf(" -v CNT process CNT smps at once\n");
printf("\n");
printf("The following hook functions are supported:\n");
plugin_dump(PLUGIN_TYPE_HOOK);
printf("\n");
printf("Supported IO formats:\n");
plugin_dump(PLUGIN_TYPE_FORMAT);
printf("\n");
printf("Example:");
printf(" villas-signal random | villas-hook skip_first seconds=10\n");
printf("\n");
@ -98,6 +106,7 @@ static void usage()
int main(int argc, char *argv[])
{
int ret;
char *format = "villas";
size_t recv;
@ -109,6 +118,8 @@ int main(int argc, char *argv[])
char c, *endptr;
while ((c = getopt(argc, argv, "hv:d:f:o:")) != -1) {
switch (c) {
case 'f':
format = optarg;
break;
case 'v':
cnt = strtoul(optarg, &endptr, 0);
@ -139,34 +150,51 @@ check: if (optarg == endptr)
exit(EXIT_FAILURE);
}
char *hookstr = argv[optind];
ret = signals_init(quit);
if (ret)
error("Failed to intialize signals");
char *hook = argv[optind];
ret = log_init(&l, l.level, LOG_ALL);
if (ret)
error("Failed to initialize log");
log_start(&l);
ret = log_start(&l);
if (ret)
error("Failed to start log");
ret = signals_init(quit);
if (ret)
error("Failed to intialize signals");
if (cnt < 1)
error("Vectorize option must be greater than 0");
memory_init(DEFAULT_NR_HUGEPAGES);
ret = memory_init(DEFAULT_NR_HUGEPAGES);
if (ret)
error("Failed to initialize memory");
samples = alloc(cnt * sizeof(struct sample *));
smps = alloc(cnt * sizeof(struct sample *));
ret = pool_init(&q, 10 * cnt, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage);
if (ret)
error("Failed to initilize memory pool");
p = plugin_lookup(PLUGIN_TYPE_HOOK, hookstr);
/* Initialize IO */
p = plugin_lookup(PLUGIN_TYPE_FORMAT, format);
if (!p)
error("Unknown hook function '%s'", hookstr);
error("Unknown IO format '%s'", format);
ret = io_init(&io, &p->io, IO_FORMAT_ALL);
if (ret)
error("Failed to initialize IO");
ret = io_open(&io, NULL, NULL);
if (ret)
error("Failed to open IO");
/* Initialize hook */
p = plugin_lookup(PLUGIN_TYPE_HOOK, hook);
if (!p)
error("Unknown hook function '%s'", hook);
/** @todo villas-hook does not use the path structure */
ret = hook_init(&h, &p->hook, NULL);
if (ret)
error("Failed to initialize hook");
@ -180,35 +208,25 @@ check: if (optarg == endptr)
error("Failed to start hook");
for (;;) {
if (feof(stdin)) {
if (io_eof(&io)) {
killme(SIGTERM);
pause();
}
ret = sample_alloc(&q, samples, cnt);
ret = sample_alloc(&q, smps, cnt);
if (ret != cnt)
error("Failed to allocate %d samples from pool", cnt);
error("Failed to allocate %d smps from pool", cnt);
recv = 0;
for (int j = 0; j < cnt && !feof(stdin); j++) {
ret = sample_io_villas_fscan(stdin, samples[j], NULL);
if (ret < 0)
break;
recv = io_scan(&io, smps, cnt);
samples[j]->ts.received = time_now();
recv++;
}
debug(15, "Read %zu smps from stdin", recv);
debug(15, "Read %zu samples from stdin", recv);
hook_read(&h, smps, &recv);
hook_write(&h, smps, &recv);
hook_read(&h, samples, &recv);
hook_write(&h, samples, &recv);
io_print(&io, smps, recv);
for (int j = 0; j < recv; j++)
sample_io_villas_fprint(stdout, samples[j], SAMPLE_IO_ALL);
fflush(stdout);
sample_free(samples, cnt);
sample_free(smps, cnt);
}
return 0;

View file

@ -75,6 +75,7 @@ static void usage()
printf(" This type of invocation is used by OPAL-RT Asynchronous processes.\n");
printf(" See in the RT-LAB User Guide for more information.\n\n");
#endif
printf("Supported node-types:\n");
plugin_dump(PLUGIN_TYPE_NODE);
printf("\n");
@ -87,6 +88,10 @@ static void usage()
plugin_dump(PLUGIN_TYPE_API);
printf("\n");
printf("Supported IO formats:\n");
plugin_dump(PLUGIN_TYPE_FORMAT);
printf("\n");
print_copyright();
exit(EXIT_FAILURE);

View file

@ -37,6 +37,7 @@
#include <villas/pool.h>
#include <villas/io.h>
#include <villas/kernel/rt.h>
#include <villas/plugin.h>
#include <villas/config_helper.h>
#include <villas/nodes/websocket.h>
@ -44,8 +45,9 @@
#include "config.h"
static struct super_node sn = { .state = STATE_DESTROYED }; /**< The global configuration */
static struct io io = { .state = STATE_DESTROYED };
struct dir {
static struct dir {
struct pool pool;
pthread_t thread;
bool enabled;
@ -84,21 +86,22 @@ static void usage()
printf(" CONFIG path to a configuration file\n");
printf(" NODE the name of the node to which samples are sent and received from\n");
printf(" OPTIONS are:\n");
printf(" -f FMT set the format\n")
printf(" -d LVL set debug log level to LVL\n");
printf(" -x swap read / write endpoints\n");
printf(" -s only read data from stdin and send it to node\n");
printf(" -r only read data from node and write it to stdout\n");
printf(" -t NUM terminate after NUM seconds\n");
printf(" -L NUM terminate after NUM samples sent\n");
printf(" -l NUM terminate after NUM samples received\n\n");
printf(" -f FMT set the format\n");
printf(" -d LVL set debug log level to LVL\n");
printf(" -o OPTION=VALUE overwrite options in config file\n");
printf(" -x swap read / write endpoints\n");
printf(" -s only read data from stdin and send it to node\n");
printf(" -r only read data from node and write it to stdout\n");
printf(" -t NUM terminate after NUM seconds\n");
printf(" -L NUM terminate after NUM samples sent\n");
printf(" -l NUM terminate after NUM samples received\n\n");
print_copyright();
}
static void * send_loop(void *ctx)
{
int ret, cnt = 0;
int ret, len, sent, cnt = 0;
struct sample *smps[node->vectorize];
/* Initialize memory */
@ -110,42 +113,30 @@ static void * send_loop(void *ctx)
if (ret < 0)
error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret);
while (!feof(stdin)) {
int len;
for (len = 0; len < node->vectorize; len++) {
struct sample *s = smps[len];
int reason;
while (!io_eof(&io)) {
len = io_scan(&io, smps, node->vectorize);
if (len <= 0)
continue;
if (sendd.limit > 0 && cnt >= sendd.limit)
break;
retry: reason = sample_io_villas_fscan(stdin, s, NULL);
if (reason < 0) {
if (feof(stdin))
goto leave;
else {
warn("Skipped invalid message message: reason=%d", reason);
goto retry;
}
}
}
cnt += node_write(node, smps, len);
sent = node_write(node, smps, len);
cnt += sent;
if (sendd.limit > 0 && cnt >= sendd.limit)
goto leave2;
goto leave;
pthread_testcancel();
}
leave2: info("Reached send limit. Terminating...");
killme(SIGTERM);
return NULL;
/* We reached EOF on stdin here. Lets kill the process */
leave: if (recvv.limit < 0) {
info("Reached end-of-file. Terminating...");
leave: if (io_eof(&io)) {
if (recvv.limit < 0) {
info("Reached end-of-file. Terminating...");
killme(SIGTERM);
}
else
info("Reached end-of-file. Wait for receive side...");
}
else {
info("Reached send limit. Terminating...");
killme(SIGTERM);
}
@ -164,26 +155,22 @@ static void * recv_loop(void *ctx)
ret = sample_alloc(&recvv.pool, smps, node->vectorize);
if (ret < 0)
error("Failed to get %u samples out of receive pool (%d).", node->vectorize, ret);
/* Print header */
fprintf(stdout, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]");
fflush(stdout);
error("Failed to allocate %u samples from receive pool.", node->vectorize);
for (;;) {
int recv = node_read(node, smps, node->vectorize);
struct timespec now = time_now();
/* Fix timestamps */
for (int i = 0; i < recv; i++) {
struct sample *s = smps[i];
if (s->ts.received.tv_sec == -1 || s->ts.received.tv_sec == 0)
s->ts.received = now;
sample_io_villas_fprint(stdout, s, SAMPLE_IO_ALL);
fflush(stdout);
}
io_print(&io, smps, recv);
cnt += recv;
if (recvv.limit > 0 && cnt >= recvv.limit)
goto leave;
@ -201,17 +188,21 @@ int main(int argc, char *argv[])
{
int ret, level = V, timeout = 0;
bool reverse = false;
char *format = "villas";
sendd = recvv = (struct dir) {
.enabled = true,
.limit = -1
};
char c, *endptr;
while ((c = getopt(argc, argv, "hxrsd:l:L:t:f:")) != -1) {
switch (c) {
case 'f';
json_t *cfg_cli = json_object();
char c, *endptr;
while ((c = getopt(argc, argv, "hxrsd:l:L:t:f:o:")) != -1) {
switch (c) {
case 'f':
format = optarg;
break;
case 'x':
reverse = true;
break;
@ -233,6 +224,11 @@ int main(int argc, char *argv[])
case 't':
timeout = strtoul(optarg, &endptr, 10);
goto check;
case 'o':
ret = json_object_extend_str(cfg_cli, optarg);
if (ret)
error("Invalid option: %s", optarg);
break;
case 'h':
case '?':
usage();
@ -243,7 +239,6 @@ int main(int argc, char *argv[])
check: if (optarg == endptr)
error("Failed to parse parse option argument '-%c %s'", c, optarg);
}
if (argc != optind + 2) {
@ -253,18 +248,48 @@ check: if (optarg == endptr)
char *configfile = argv[optind];
char *nodestr = argv[optind+1];
struct plugin *p;
log_init(&sn.log, level, LOG_ALL);
log_start(&sn.log);
ret = log_init(&sn.log, level, LOG_ALL);
if (ret)
error("Failed to intialize log");
super_node_init(&sn);
super_node_parse_uri(&sn, configfile);
ret = log_start(&sn.log);
if (ret)
error("Failed to start log");
memory_init(sn.hugepages);
signals_init(quit);
rt_init(sn.priority, sn.affinity);
ret = signals_init(quit);
if (ret)
error("Failed to initialize signals");
ret = memory_init(sn.hugepages);
if (ret)
error("Failed to initialize memory");
ret = rt_init(sn.priority, sn.affinity);
if (ret)
error("Failed to initalize real-time");
p = plugin_lookup(PLUGIN_TYPE_FORMAT, format);
if (!p)
error("Invalid format: %s", format);
ret = io_init(&io, &p->io, IO_FORMAT_ALL);
if (ret)
error("Failed to initialize IO");
ret = io_open(&io, NULL, NULL);
if (ret)
error("Failed to open IO");
ret = super_node_init(&sn);
if (ret)
error("Failed to initialize super-node");
ret = super_node_parse_uri(&sn, configfile);
if (ret)
error("Failed to parse configuration");
/* Initialize node */
node = list_lookup(&sn.nodes, nodestr);
if (!node)
error("Node '%s' does not exist!", nodestr);

View file

@ -30,7 +30,7 @@
#include <villas/utils.h>
#include <villas/sample.h>
#include <villas/sample_io.h>
#include <villas/formats/villas.h>
#include <villas/timing.h>
#include <villas/node.h>
#include <villas/plugin.h>
@ -39,6 +39,7 @@
/* Some default values */
struct node n;
struct log l;
struct io io;
struct sample *t;
@ -54,14 +55,15 @@ void usage()
printf(" ramp\n");
printf("\n");
printf(" OPTIONS is one or more of the following options:\n");
printf(" -d LVL set debug level\n");
printf(" -v NUM specifies how many values a message should contain\n");
printf(" -r HZ how many messages per second\n");
printf(" -n non real-time mode. do not throttle output.\n");
printf(" -f HZ the frequency of the signal\n");
printf(" -a FLT the amplitude\n");
printf(" -D FLT the standard deviation for 'random' signals\n");
printf(" -l NUM only send LIMIT messages and stop\n\n");
printf(" -d LVL set debug level\n");
printf(" -f FMT set the format\n");
printf(" -v NUM specifies how many values a message should contain\n");
printf(" -r HZ how many messages per second\n");
printf(" -n non real-time mode. do not throttle output.\n");
printf(" -F HZ the frequency of the signal\n");
printf(" -a FLT the amplitude\n");
printf(" -D FLT the standard deviation for 'random' signals\n");
printf(" -l NUM only send LIMIT messages and stop\n\n");
print_copyright();
}
@ -78,6 +80,14 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
if (ret)
error("Failed to destroy node");
ret = io_close(&io);
if (ret)
error("Failed to close output");
ret = io_destroy(&io);
if (ret)
error("Failed to destroy output");
ret = log_stop(&l);
if (ret)
error("Failed to stop log");
@ -92,7 +102,8 @@ int main(int argc, char *argv[])
{
int ret;
struct plugin *p;
struct signal *s;
char *format = "villas"; /** @todo hardcoded for now */
ret = log_init(&l, l.level, LOG_ALL);
if (ret)
@ -114,6 +125,18 @@ int main(int argc, char *argv[])
if (ret)
error("Failed to initialize node");
p = plugin_lookup(PLUGIN_TYPE_FORMAT, format);
if (!p)
error("Invalid output format '%s'", format);
ret = io_init(&io, &p->io, IO_FLAG_FLUSH | (IO_FORMAT_ALL & ~IO_FORMAT_OFFSET));
if (ret)
error("Failed to initialize output");
ret = io_open(&io, NULL, NULL);
if (ret)
error("Failed to open output");
ret = node_parse_cli(&n, argc, argv);
if (ret)
error("Failed to parse command line options");
@ -122,29 +145,20 @@ int main(int argc, char *argv[])
if (ret)
error("Failed to verify node configuration");
info("Starting signal generation: %s", node_name(&n));
/* Allocate memory for message buffer */
s = n._vd;
struct signal *s = n._vd;
t = alloc(SAMPLE_LEN(s->values));
t->capacity = s->values;
/* Print header */
printf("# VILLASnode signal params: type=%s, values=%u, rate=%f, limit=%d, amplitude=%f, freq=%f\n",
argv[optind], s->values, s->rate, s->limit, s->amplitude, s->frequency);
printf("# %-20s\t\t%s\n", "sec.nsec(seq)", "data[]");
ret = node_start(&n);
if (ret)
serror("Failed to start node");
for (;;) {
node_read(&n, &t, 1);
sample_io_villas_fprint(stdout, t, SAMPLE_IO_ALL & ~SAMPLE_IO_OFFSET);
fflush(stdout);
io_print(&io, &t, 1);
}
return 0;

View file

@ -28,7 +28,8 @@
#include <jansson.h>
#include <villas/sample.h>
#include <villas/sample_io.h>
#include <villas/io.h>
#include <villas/formats/villas.h>
#include <villas/utils.h>
#include <villas/timing.h>
#include <villas/pool.h>
@ -103,11 +104,21 @@ check: if (optarg == endptr)
f1.path = argv[optind];
f2.path = argv[optind + 1];
log_init(&log, V, LOG_ALL);
log_start(&log);
ret = log_init(&log, V, LOG_ALL);
if (ret)
error("Failed to initialize log");
pool_init(&pool, 1024, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_heap);
sample_alloc(&pool, samples, 2);
ret = log_start(&log);
if (ret)
error("Failed to start log");
ret = pool_init(&pool, 1024, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_heap);
if (ret)
error("Failed to initialize pool");
ret = sample_alloc(&pool, samples, 2);
if (ret != 2)
error("Failed to allocate samples");
f1.sample = samples[0];
f2.sample = samples[1];
@ -121,11 +132,11 @@ check: if (optarg == endptr)
serror("Failed to open file: %s", f2.path);
while (!feof(f1.handle) && !feof(f2.handle)) {
ret = sample_io_villas_fscan(f1.handle, f1.sample, &f1.flags);
ret = io_format_villas_fscan(f1.handle, &f1.sample, 1, &f1.flags);
if (ret < 0 && !feof(f1.handle))
goto out;
ret = sample_io_villas_fscan(f2.handle, f2.sample, &f2.flags);
ret = io_format_villas_fscan(f2.handle, &f2.sample, 1, &f2.flags);
if (ret < 0 && !feof(f2.handle))
goto out;
@ -139,7 +150,7 @@ check: if (optarg == endptr)
}
/* Compare sequence no */
if ((f1.flags & SAMPLE_IO_SEQUENCE) && (f2.flags & SAMPLE_IO_SEQUENCE)) {
if ((f1.flags & IO_FORMAT_SEQUENCE) && (f2.flags & IO_FORMAT_SEQUENCE)) {
if (f1.sample->sequence != f2.sample->sequence) {
printf("sequence no: %d != %d\n", f1.sample->sequence, f2.sample->sequence);
ret = 2;

View file

@ -28,7 +28,7 @@
#include <villas/utils.h>
#include <villas/advio.h>
#include <villas/sample.h>
#include <villas/sample_io.h>
#include <villas/formats/villas.h>
/** This URI points to a Sciebo share which contains some test files.
* The Sciebo share is read/write accessible via WebDAV. */
@ -40,12 +40,12 @@ Test(advio, local)
int ret;
char *buf = NULL;
size_t buflen = 0;
/* We open this file and check the first line */
af = afopen(__FILE__, "r");
cr_assert(af, "Failed to open local file");
ret = getline(&buf, &buflen, af->file);
ret = getline(&buf, &buflen, af->file);
cr_assert_gt(ret, 1);
cr_assert_str_eq(buf, "/** Unit tests for advio\n");
}
@ -74,26 +74,26 @@ Test(advio, download_large)
{
AFILE *af;
int ret, len = 16;
struct sample *smp = alloc(SAMPLE_LEN(len));
smp->capacity = len;
af = afopen(BASE_URI "/download-large" , "r");
cr_assert(af, "Failed to download file");
ret = sample_io_villas_fscan(af->file, smp, NULL);
cr_assert_eq(ret, 0);
ret = io_format_villas_fscan(af->file, &smp, 1, NULL);
cr_assert_eq(ret, 1);
cr_assert_eq(smp->sequence, 0);
cr_assert_eq(smp->length, 4);
cr_assert_eq(smp->ts.origin.tv_sec, 1497710378);
cr_assert_eq(smp->ts.origin.tv_nsec, 863332240);
float data[] = { 0.022245, 0.000000, -1.000000, 1.000000 };
for (int i = 0; i < smp->length; i++)
cr_assert_float_eq(smp->data[i].f, data[i], 1e-5);
ret = afclose(af);
cr_assert_eq(ret, 0, "Failed to close file");
}
@ -106,44 +106,44 @@ Test(advio, resume)
char line1[32];
char *line2 = NULL;
size_t linelen = 0;
mkdtemp(dir);
ret = asprintf(&fn, "%s/file", dir);
cr_assert_gt(ret, 0);
af1 = afopen(fn, "w+");
cr_assert_not_null(af1);
/* We flush once the empty file in order to upload an empty file. */
aupload(af1, 0);
af2 = afopen(fn, "r");
cr_assert_not_null(af2);
for (int i = 0; i < 100; i++) {
snprintf(line1, sizeof(line1), "This is line %d\n", i);
afputs(line1, af1);
aupload(af1, 1);
adownload(af2, 1);
agetline(&line2, &linelen, af2);
cr_assert_str_eq(line1, line2);
}
ret = afclose(af1);
cr_assert_eq(ret, 0);
ret = afclose(af2);
cr_assert_eq(ret, 0);
ret = unlink(fn);
cr_assert_eq(ret, 0);
ret = rmdir(dir);
cr_assert_eq(ret, 0);
free(line2);
}
@ -236,4 +236,4 @@ Test(advio, append)
cr_assert(ret == 0, "Failed to close file");
cr_assert_arr_eq(buffer, expect, sizeof(expect));
}
}

148
tests/unit/io.c Normal file
View file

@ -0,0 +1,148 @@
/** Unit tests for IO formats.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <stdio.h>
#include <criterion/criterion.h>
#include <criterion/parameterized.h>
#include <criterion/logging.h>
#include "utils.h"
#include "timing.h"
#include "sample.h"
#include "plugin.h"
#include "pool.h"
#include "io.h"
#define NUM_SAMPLES 10
#define NUM_VALUES 10
void cr_assert_eq_sample(struct sample *s, struct sample *t)
{
cr_assert_eq(s->length, t->length);
cr_assert_eq(s->sequence, t->sequence);
cr_assert_eq(s->format, t->format);
cr_assert_eq(s->ts.origin.tv_sec, t->ts.origin.tv_sec);
cr_assert_eq(s->ts.origin.tv_nsec, t->ts.origin.tv_nsec);
cr_assert_eq(s->ts.received.tv_sec, t->ts.received.tv_sec);
cr_assert_eq(s->ts.received.tv_nsec, t->ts.received.tv_nsec);
for (int i = 0; i < MIN(s->length, t->length); i++)
cr_assert_float_eq(s->data[i].f, t->data[i].f, 1e-6);
}
ParameterizedTestParameters(io, highlevel)
{
static char formats[][32] = {
"villas",
"json",
"csv"
};
return cr_make_param_array(char[32], formats, ARRAY_LEN(formats));
}
ParameterizedTest(char *fmt, io, highlevel)
{
int ret;
char filename[64];
struct io io;
struct pool p = { .state = STATE_DESTROYED };
struct sample *smps[NUM_SAMPLES];
struct sample *smpt[NUM_SAMPLES];
ret = pool_init(&p, 2 * NUM_SAMPLES, SAMPLE_LEN(NUM_VALUES), &memtype_hugepage);
cr_assert_eq(ret, 0);
/* Prepare a sample with arbitrary data */
ret = sample_alloc(&p, smps, NUM_SAMPLES);
cr_assert_eq(ret, NUM_SAMPLES);
ret = sample_alloc(&p, smpt, NUM_SAMPLES);
cr_assert_eq(ret, NUM_SAMPLES);
for (int i = 0; i < NUM_SAMPLES; i++) {
smpt[i]->capacity =
smps[i]->capacity = NUM_VALUES;
smps[i]->length = NUM_VALUES;
smps[i]->sequence = 235 + i;
smps[i]->format = 0; /* all float */
smps[i]->ts.origin = time_now();
smps[i]->ts.received = (struct timespec) {
.tv_sec = smps[i]->ts.origin.tv_sec - 1,
.tv_nsec = smps[i]->ts.origin.tv_nsec
};
for (int j = 0; j < smps[i]->length; j++)
smps[i]->data[j].f = j * 1.2 + i * 100;
}
/* Open a file for IO */
strncpy(filename, "/tmp/villas-unit-test.XXXXXX", sizeof(filename));
mktemp(filename);
printf("Writing to file: %s\n", filename);
struct plugin *pl;
pl = plugin_lookup(PLUGIN_TYPE_FORMAT, fmt);
cr_assert_not_null(pl, "Format '%s' does not exist", fmt);
ret = io_init(&io, &pl->io, IO_FORMAT_ALL);
cr_assert_eq(ret, 0);
ret = io_open(&io, filename, "w+");
cr_assert_eq(ret, 0);
ret = io_print(&io, smps, NUM_SAMPLES);
cr_assert_eq(ret, NUM_SAMPLES);
io_rewind(&io);
io_flush(&io);
char cmd[128];
snprintf(cmd, sizeof(cmd), "cat %s", filename);
system(cmd);
ret = io_scan(&io, smpt, NUM_SAMPLES);
cr_assert_eq(ret, NUM_SAMPLES, "Read only %d of %d samples back", ret, NUM_SAMPLES);
for (int i = 0; i < 0; i++)
cr_assert_eq_sample(smps[i], smpt[i]);
ret = io_close(&io);
cr_assert_eq(ret, 0);
ret = io_destroy(&io);
cr_assert_eq(ret, 0);
ret = unlink(filename);
cr_assert_eq(ret, 0);
sample_free(smps, NUM_SAMPLES);
sample_free(smpt, NUM_SAMPLES);
ret = pool_destroy(&p);
cr_assert_eq(ret, 0);
}

View file

@ -1,95 +0,0 @@
/** Unit tests for the sample_io module.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <stdio.h>
#include <criterion/criterion.h>
#include <criterion/parameterized.h>
#include "utils.h"
#include "timing.h"
#include "sample.h"
#include "sample_io.h"
ParameterizedTestParameters(sample_io, read_write)
{
static enum sample_io_format formats[] = {
SAMPLE_IO_FORMAT_VILLAS,
SAMPLE_IO_FORMAT_JSON,
};
return cr_make_param_array(enum sample_io_format, formats, ARRAY_LEN(formats));
}
ParameterizedTest(enum sample_io_format *fmt, sample_io, read_write)
{
FILE *f;
int ret;
struct sample *s, *t;
/* Prepare a sample with arbitrary data */
s = malloc(SAMPLE_LEN(16));
cr_assert_not_null(s);
t = malloc(SAMPLE_LEN(16));
cr_assert_not_null(s);
t->capacity =
s->capacity = 16;
s->length = 12;
s->sequence = 235;
s->format = 0;
s->ts.origin = time_now();
s->ts.received = (struct timespec) { s->ts.origin.tv_sec - 1, s->ts.origin.tv_nsec };
for (int i = 0; i < s->length; i++)
s->data[i].f = i * 1.2;
/* Open a file for IO */
f = tmpfile();
cr_assert_not_null(f);
ret = sample_io_fprint(f, s, *fmt, SAMPLE_IO_ALL);
cr_assert_eq(ret, 0);
rewind(f);
ret = sample_io_fscan(f, t, *fmt, NULL);
cr_assert_eq(ret, 0);
cr_assert_eq(s->length, t->length);
cr_assert_eq(s->sequence, t->sequence);
cr_assert_eq(s->format, t->format);
cr_assert_eq(s->ts.origin.tv_sec, t->ts.origin.tv_sec);
cr_assert_eq(s->ts.origin.tv_nsec, t->ts.origin.tv_nsec);
cr_assert_eq(s->ts.received.tv_sec, t->ts.received.tv_sec);
cr_assert_eq(s->ts.received.tv_nsec, t->ts.received.tv_nsec);
for (int i = 0; i < MIN(s->length, t->length); i++)
cr_assert_float_eq(s->data[i].f, t->data[i].f, 1e-6);
fclose(f);
free(s);
free(t);
}