From c5fc72dd78ae329df0d34f457e5525be1fba087d Mon Sep 17 00:00:00 2001
From: Steffen Vogel <post@steffenvogel.de>
Date: Sat, 5 Aug 2017 21:02:09 +0200
Subject: [PATCH] adding pluggable IO formats

---
 include/villas/formats/csv.h                 |   6 +-
 include/villas/formats/json.h                |   4 +-
 include/villas/{ => formats}/msg.h           |   0
 include/villas/{ => formats}/msg_format.h    |   0
 include/villas/formats/villas.h              |  10 +-
 include/villas/{ => formats}/webmsg.h        |   0
 include/villas/{ => formats}/webmsg_format.h |   0
 include/villas/io.h                          | 109 ++++++++++-
 include/villas/super_node.h                  |   2 +
 lib/formats/Makefile.inc                     |  16 +-
 lib/formats/csv.c                            |  82 ++++++---
 lib/formats/json.c                           | 104 ++++++++---
 lib/formats/msg.c                            |  16 +-
 lib/formats/villas.c                         | 114 +++++++++---
 lib/formats/webmsg.c                         |  16 +-
 lib/hook.c                                   |   2 +-
 lib/io.c                                     | 179 +++++++++++++++----
 lib/nodes/Makefile.inc                       |   7 +-
 lib/nodes/nanomsg.c                          |   2 +-
 lib/nodes/socket.c                           |   4 +-
 lib/nodes/websocket.c                        |   4 +-
 lib/nodes/zeromq.c                           |   2 +-
 src/hook.c                                   |  86 +++++----
 src/node.c                                   |   5 +
 src/pipe.c                                   | 145 ++++++++-------
 src/signal.c                                 |  56 +++---
 src/test-cmp.c                               |  27 ++-
 tests/unit/advio.c                           |  48 ++---
 tests/unit/io.c                              | 148 +++++++++++++++
 tests/unit/sample_io.c                       |  95 ----------
 30 files changed, 900 insertions(+), 389 deletions(-)
 rename include/villas/{ => formats}/msg.h (100%)
 rename include/villas/{ => formats}/msg_format.h (100%)
 rename include/villas/{ => formats}/webmsg.h (100%)
 rename include/villas/{ => formats}/webmsg_format.h (100%)
 create mode 100644 tests/unit/io.c
 delete mode 100644 tests/unit/sample_io.c

diff --git a/include/villas/formats/csv.h b/include/villas/formats/csv.h
index 36ccb502f..77a0deed7 100644
--- a/include/villas/formats/csv.h
+++ b/include/villas/formats/csv.h
@@ -23,13 +23,13 @@
 
 #pragma once
 
-#include "advio.h"
+#include <stdio.h>
 
 /* Forward declarations. */
 struct sample;
 
 #define CSV_SEPARATOR '\t'
 
-int io_format_csv_fprint(AFILE *f, struct sample *smp, int flags);
+int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags);
 
-int io_format_csv_fscan(AFILE *f, struct sample *smp, int *flags);
+int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags);
diff --git a/include/villas/formats/json.h b/include/villas/formats/json.h
index ea87d760f..6454cdef4 100644
--- a/include/villas/formats/json.h
+++ b/include/villas/formats/json.h
@@ -30,6 +30,6 @@ int io_format_json_pack(json_t **j, struct sample *s, int flags);
 
 int io_format_json_unpack(json_t *j, struct sample *s, int *flags);
 
-int io_format_json_fprint(AFILE *f, struct sample *s, int flags);
+int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags);
 
-int io_format_json_fscan(AFILE *f, struct sample *s, int *flags);
+int io_format_json_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags);
diff --git a/include/villas/msg.h b/include/villas/formats/msg.h
similarity index 100%
rename from include/villas/msg.h
rename to include/villas/formats/msg.h
diff --git a/include/villas/msg_format.h b/include/villas/formats/msg_format.h
similarity index 100%
rename from include/villas/msg_format.h
rename to include/villas/formats/msg_format.h
diff --git a/include/villas/formats/villas.h b/include/villas/formats/villas.h
index 5acb44888..798e289c5 100644
--- a/include/villas/formats/villas.h
+++ b/include/villas/formats/villas.h
@@ -25,12 +25,12 @@
 
 #include <stdio.h>
 
-/* VILLASnode human readable format */
+#include "io.h"
 
-int io_format_villas_print(char *buf, size_t len, struct sample *s, int flags);
+int io_format_villas_print(struct io *io, struct sample *smps[], size_t cnt);
 
-int io_format_villas_scan(const char *line, struct sample *s, int *fl);
+int io_format_villas_scan(struct io *io, struct sample *smps[], size_t cnt);
 
-int io_format_villas_fprint(FILE *f, struct sample *s, int flags);
+int io_format_villas_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags);
 
-int io_format_villas_fscan(FILE *f, struct sample *s, int *flags);
+int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags);
diff --git a/include/villas/webmsg.h b/include/villas/formats/webmsg.h
similarity index 100%
rename from include/villas/webmsg.h
rename to include/villas/formats/webmsg.h
diff --git a/include/villas/webmsg_format.h b/include/villas/formats/webmsg_format.h
similarity index 100%
rename from include/villas/webmsg_format.h
rename to include/villas/formats/webmsg_format.h
diff --git a/include/villas/io.h b/include/villas/io.h
index 4b0bed7a6..e8668c895 100644
--- a/include/villas/io.h
+++ b/include/villas/io.h
@@ -24,6 +24,7 @@
 #pragma once
 
 #include "advio.h"
+#include "common.h"
 
 /* Forward declarations */
 struct sample;
@@ -31,34 +32,113 @@ struct io;
 
 /** These flags define the format which is used by io_fscan() and io_fprint(). */
 enum io_flags {
-	IO_FORMAT_NANOSECONDS	= (1 << 0),
-	IO_FORMAT_OFFSET	= (1 << 1),
-	IO_FORMAT_SEQUENCE	= (1 << 2),
-	IO_FORMAT_VALUES	= (1 << 3),
-	IO_FORMAT_ALL		= 16-1
+	IO_FORMAT_NANOSECONDS	= (1 << 0), /**< Include nanoseconds in output. */
+	IO_FORMAT_OFFSET	= (1 << 1), /**< Include offset / delta between received and send timestamps. */
+	IO_FORMAT_SEQUENCE	= (1 << 2), /**< Include sequence number in output. */
+	IO_FORMAT_VALUES	= (1 << 3), /**< Include values in output. */
+	IO_FORMAT_ALL		= 16-1,     /**< Enable all output options. */
+	IO_FLAG_FLUSH		= (1 << 8), /**< Flush the output stream after each chunk of samples. */
 };
 
 struct io_format {
 	int (*init)(struct io *io);
 	int (*destroy)(struct io *io);
+
+	/** @{
+	 * High-level interface
+	 */
+
+	/** Open an IO stream.
+	 *
+	 * @see fopen()
+	 */
 	int (*open)(struct io *io, const char *uri, const char *mode);
+
+	/** Close an IO stream.
+	 *
+	 * @see fclose()
+	 */
 	int (*close)(struct io *io);
+
+	/** Check if end-of-file was reached.
+	 *
+	 * @see feof()
+	 */
 	int (*eof)(struct io *io);
+
+	/** Rewind an IO stream.
+	 *
+	 * @see rewind()
+	 */
 	void (*rewind)(struct io *io);
-	int (*print)(struct io *io, struct sample *s, int fl);
-	int (*scan)(struct io *io, struct sample *s, int *fl);
+
+	/** Flush buffered data to disk.
+	 *
+	 * @see fflush()
+	 */
+	int (*flush)(struct io *io);
+
+	int (*print)(struct io *io, struct sample *smps[], size_t cnt);
+	int (*scan)( struct io *io, struct sample *smps[], size_t cnt);
+	/** @} */
+
+	/** @{
+	 * Low-level interface
+	 */
+
+	/** Parse samples from the buffer \p buf with a length of \p len bytes.
+	 *
+	 * @return The number of bytes consumed of \p buf.
+	 */
+	size_t (*sscan)( char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags);
+
+	/** Print \p cnt samples from \p smps into buffer \p buf of length \p len.
+	 *
+	 * @return The number of bytes written to \p buf.
+	 */
+	size_t (*sprint)(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags);
+
+	/** Parse up to \p cnt samples from stream \p f into array \p smps.
+	 *
+	 * @return The number of samples parsed.
+	 */
+	int (*fscan)( FILE *f, struct sample *smps[], size_t cnt, int *flags);
+
+	/** Print \p cnt samples from \p smps to stream \p f.
+	 *
+	 * @return The number of samples written to \p f.
+	 */
+	int (*fprint)(FILE *f, struct sample *smps[], size_t cnt, int  flags);
+
+	/** @} */
 
 	size_t size; /**< Number of bytes to allocate for io::_vd */
 };
 
 struct io {
+	enum state state;
 	int flags;
 
+	enum {
+		IO_MODE_STDIO,
+		IO_MODE_ADVIO,
+		IO_MODE_CUSTOM
+	} mode;
+
 	/** A format type can use this file handle or overwrite the
 	 * io_format::{open,close,eof,rewind} functions and the private
 	 * data in io::_vd.
 	 */
-	AFILE *file;
+	union {
+		struct {
+			FILE *input;
+			FILE *output;
+		} stdio;
+		struct {
+			AFILE *input;
+			AFILE *output;
+		} advio;
+	};
 
 	void *_vd;
 	struct io_format *_vt;
@@ -79,3 +159,16 @@ int io_scan(struct io *io, struct sample *smps[], size_t cnt);
 int io_eof(struct io *io);
 
 void io_rewind(struct io *io);
+
+int io_flush(struct io *io);
+
+
+int io_stream_open(struct io *io, const char *uri, const char *mode);
+
+int io_stream_close(struct io *io);
+
+int io_stream_eof(struct io *io);
+
+void io_stream_rewind(struct io *io);
+
+int io_stream_flush(struct io *io);
diff --git a/include/villas/super_node.h b/include/villas/super_node.h
index 72a5f9b89..954966401 100644
--- a/include/villas/super_node.h
+++ b/include/villas/super_node.h
@@ -44,6 +44,8 @@ struct super_node {
 	struct api api;
 	struct web web;
 
+	char *name;		/**< A name of this super node. Usually the hostname. */
+
 	struct {
 		int argc;
 		char **argv;
diff --git a/lib/formats/Makefile.inc b/lib/formats/Makefile.inc
index e760b08ab..12022e536 100644
--- a/lib/formats/Makefile.inc
+++ b/lib/formats/Makefile.inc
@@ -20,4 +20,18 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 ###################################################################################
 
-LIB_SRCS += $(addprefix lib/formats/,villas.c csv.c msg.c webmsg.c)
+LIB_SRCS += $(addprefix lib/formats/,json.c villas.c csv.c)
+
+WITH_HDF5 ?= 0
+
+ifeq ($(PLATFORM),Darwin)
+	HDF5_PREFIX ?= /opt/local
+else
+	HDF5_PREFIX ?= /usr
+endif
+
+ifeq ($(WITH_HDF5),1)
+ifneq ($(wildcard $(HDF5_PREFIX)/include/hdf5_hl.h),)
+	LIB_SRCS += lib/formats/hdf5.c
+endif
+endif
diff --git a/lib/formats/csv.c b/lib/formats/csv.c
index b88da09d2..eba524b60 100644
--- a/lib/formats/csv.c
+++ b/lib/formats/csv.c
@@ -25,50 +25,44 @@
 #include "formats/csv.h"
 #include "plugin.h"
 #include "sample.h"
+#include "timing.h"
 
-int io_format_csv_fprint(AFILE *f, struct sample *s, int flags)
+int io_format_csv_fprint_single(FILE *f, struct sample *s, int flags)
 {
-	afprintf(f, "%ld %09ld %d", s->ts.origin.tv_sec, s->ts.origin.tv_nsec, s->sequence);
+	fprintf(f, "%ld %09ld %d", s->ts.origin.tv_sec, s->ts.origin.tv_nsec, s->sequence);
 
 	for (int i = 0; i < s->length; i++) {
 		switch ((s->format >> i) & 0x1) {
 			case SAMPLE_DATA_FORMAT_FLOAT:
-				afprintf(f, "%c%.6f", CSV_SEPARATOR, s->data[i].f);
+				fprintf(f, "%c%.6f", CSV_SEPARATOR, s->data[i].f);
 				break;
 			case SAMPLE_DATA_FORMAT_INT:
-				afprintf(f, "%c%d", CSV_SEPARATOR, s->data[i].i);
+				fprintf(f, "%c%d", CSV_SEPARATOR, s->data[i].i);
 				break;
 		}
 	}
 
+	fputc('\n', f);
+
 	return 0;
 }
 
-int io_format_csv_fscan(AFILE *f, struct sample *smp, int *flags)
+size_t io_format_csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flags)
 {
 	int ret, off;
-	char *ptr, line[4096];
 
-skip:	if (afgets(line, sizeof(line), f) == NULL)
-		return -1; /* An error occured */
-
-	/* Skip whitespaces, empty and comment lines */
-	for (ptr = line; isspace(*ptr); ptr++);
-	if (*ptr == '\0' || *ptr == '#')
-		goto skip;
-
-	ret = sscanf(line, "%ld %09ld %d %n", &smp->ts.origin.tv_sec, &smp->ts.origin.tv_nsec, &smp->sequence, &off);
-	if (ret != 4)
+	ret = sscanf(buf, "%ld %ld %d %n", &s->ts.origin.tv_sec, &s->ts.origin.tv_nsec, &s->sequence, &off);
+	if (ret != 3)
 		return -1;
 
 	int i;
-	for (i = 0; i < smp->capacity; i++) {
-		switch (smp->format & (1 << i)) {
+	for (i = 0; i < s->capacity; i++) {
+		switch (s->format & (1 << i)) {
 			case SAMPLE_DATA_FORMAT_FLOAT:
-				ret = sscanf(line + off, "%f %n", &smp->data[i].f, &off);
+				ret = sscanf(buf + off, "%f %n", &s->data[i].f, &off);
 				break;
 			case SAMPLE_DATA_FORMAT_INT:
-				ret = sscanf(line + off, "%d %n", &smp->data[i].i, &off);
+				ret = sscanf(buf + off, "%d %n", &s->data[i].i, &off);
 				break;
 		}
 
@@ -76,19 +70,51 @@ skip:	if (afgets(line, sizeof(line), f) == NULL)
 			break;
 	}
 
-	smp->length = i;
+	s->length = i;
+	s->ts.received = time_now();
 
-	return ret;
+	return 0;
 }
 
-int io_format_csv_print(struct io *io, struct sample *smp, int flags)
+int io_format_csv_fscan_single(FILE *f, struct sample *s, int *flags)
 {
-	return io_format_csv_fprint(io->file, smp, flags);
+	char *ptr, line[4096];
+
+skip:	if (fgets(line, sizeof(line), f) == NULL)
+		return -1; /* An error occured */
+
+	/* Skip whitespaces, empty and comment lines */
+	for (ptr = line; isspace(*ptr); ptr++);
+	if (*ptr == '\0' || *ptr == '#')
+		goto skip;
+
+	return io_format_csv_sscan_single(line, strlen(line), s, flags);
 }
 
-int io_format_csv_scan(struct io *io, struct sample *smp, int *flags)
+int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags)
 {
-	return io_format_csv_fscan(io->file, smp, flags);
+	int ret, i;
+	for (i = 0; i < cnt; i++) {
+		ret = io_format_csv_fprint_single(f, smps[i], flags);
+		if (ret < 0)
+			break;
+	}
+
+	return i;
+}
+
+int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags)
+{
+	int ret, i;
+	for (i = 0; i < cnt; i++) {
+		ret = io_format_csv_fscan_single(f, smps[i], flags);
+		if (ret < 0) {
+			warn("Failed to read CSV line: %d", ret);
+			break;
+		}
+	}
+
+	return i;
 }
 
 static struct plugin p = {
@@ -96,8 +122,8 @@ static struct plugin p = {
 	.description = "Tabulator-separated values",
 	.type = PLUGIN_TYPE_FORMAT,
 	.io = {
-		.scan	= io_format_csv_scan,
-		.print	= io_format_csv_print,
+		.fprint	= io_format_csv_fprint,
+		.fscan	= io_format_csv_fscan,
 		.size = 0
 	}
 };
diff --git a/lib/formats/json.c b/lib/formats/json.c
index 650b9a8fc..00c79d19b 100644
--- a/lib/formats/json.c
+++ b/lib/formats/json.c
@@ -90,47 +90,93 @@ int io_format_json_unpack(json_t *j, struct sample *s, int *flags)
 	return 0;
 }
 
-int io_format_json_fprint(AFILE *f, struct sample *s, int flags)
+size_t io_format_json_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags)
 {
-	int ret;
+	int i, ret;
 	json_t *json;
+	size_t wr, off = 0;
 
-	ret = io_format_json_pack(&json, s, flags);
-	if (ret)
-		return ret;
+	for (i = 0; i < cnt; i++) {
+		ret = io_format_json_pack(&json, smps[i], flags);
+		if (ret)
+			return ret;
 
-	ret = json_dumpf(json, f->file, 0);
+		wr = json_dumpb(json, buf + off, len - off, 0);
 
-	json_decref(json);
+		json_decref(json);
 
-	return ret;
+		if (wr > len)
+			break;
+
+		off += wr;
+	}
+
+	return i;
 }
 
-int io_format_json_fscan(AFILE *f, struct sample *s, int *flags)
+size_t io_format_json_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags)
 {
-	int ret;
+	int i, ret;
+	json_t *json;
+	json_error_t err;
+	size_t off = 0;
+
+	for (i = 0; i < cnt; i++) {
+		json = json_loadb(buf + off, len - off, JSON_DISABLE_EOF_CHECK, &err);
+		if (!json)
+			break;
+
+		off += err.position;
+
+		ret = io_format_json_unpack(json, smps[i], flags);
+
+		json_decref(json);
+
+		if (ret)
+			break;
+	}
+
+	return i;
+}
+
+int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags)
+{
+	int ret, i;
+	json_t *json;
+
+	for (i = 0; i < cnt; i++) {
+		ret = io_format_json_pack(&json, smps[i], flags);
+		if (ret)
+			return ret;
+
+		ret = json_dumpf(json, f, 0);
+		fputc('\n', f);
+
+		json_decref(json);
+	}
+
+	return i;
+}
+
+int io_format_json_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags)
+{
+	int i, ret;
 	json_t *json;
 	json_error_t err;
 
-	json = json_loadf(f->file, JSON_DISABLE_EOF_CHECK, &err);
-	if (!json)
-		return -1;
+	for (i = 0; i < cnt; i++) {
+skip:		json = json_loadf(f, JSON_DISABLE_EOF_CHECK, &err);
+		if (!json)
+			break;
 
-	ret = io_format_json_unpack(json, s, flags);
+		ret = io_format_json_unpack(json, smps[i], flags);
+		if (ret)
+			goto skip;
 
-	json_decref(json);
+		json_decref(json);
+	}
 
-	return ret;
-}
-
-int io_format_json_print(struct io *io, struct sample *smp, int flags)
-{
-	return io_format_json_fprint(io->file, smp, flags);
-}
-
-int io_format_json_scan(struct io *io, struct sample *smp, int *flags)
-{
-	return io_format_json_fscan(io->file, smp, flags);
+	return i;
 }
 
 static struct plugin p = {
@@ -138,8 +184,10 @@ static struct plugin p = {
 	.description = "Javascript Object Notation",
 	.type = PLUGIN_TYPE_FORMAT,
 	.io = {
-		.print	= io_format_json_print,
-		.scan	= io_format_json_scan,
+		.fscan	= io_format_json_fscan,
+		.fprint	= io_format_json_fprint,
+		.sscan	= io_format_json_sscan,
+		.sprint	= io_format_json_sprint,
 		.size = 0
 	},
 };
diff --git a/lib/formats/msg.c b/lib/formats/msg.c
index 08622f8ce..d61868a36 100644
--- a/lib/formats/msg.c
+++ b/lib/formats/msg.c
@@ -23,10 +23,11 @@
 #include <arpa/inet.h>
 #include <string.h>
 
-#include "msg.h"
-#include "msg_format.h"
+#include "formats/msg.h"
+#include "formats/msg_format.h"
 #include "sample.h"
 #include "utils.h"
+#include "plugin.h"
 
 void msg_ntoh(struct msg *m)
 {
@@ -150,3 +151,14 @@ int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t
 
 	return i;
 }
+
+static struct plugin p = {
+	.name = "msg",
+	.description = "VILLAS binary network format",
+	.type = PLUGIN_TYPE_FORMAT,
+	.io = {
+		.size = 0
+	},
+};
+
+REGISTER_PLUGIN(&p);
diff --git a/lib/formats/villas.c b/lib/formats/villas.c
index 1c26f2609..f49d28a72 100644
--- a/lib/formats/villas.c
+++ b/lib/formats/villas.c
@@ -28,8 +28,9 @@
 #include "utils.h"
 #include "timing.h"
 #include "sample.h"
+#include "formats/villas.h"
 
-int io_format_villas_print(char *buf, size_t len, struct sample *s, int flags)
+size_t io_format_villas_sprint_single(char *buf, size_t len, struct sample *s, int flags)
 {
 	size_t off = snprintf(buf, len, "%llu", (unsigned long long) s->ts.origin.tv_sec);
 
@@ -57,15 +58,15 @@ int io_format_villas_print(char *buf, size_t len, struct sample *s, int flags)
 
 	off += snprintf(buf + off, len - off, "\n");
 
-	return 0; /* trailing '\0' */
+	return off;
 }
 
-int io_format_villas_scan(const char *line, struct sample *s, int *fl)
+size_t io_format_villas_sscan_single(const char *buf, size_t len, struct sample *s, int *flags)
 {
 	char *end;
-	const char *ptr = line;
+	const char *ptr = buf;
 
-	int flags = 0;
+	int fl = 0;
 	double offset = 0;
 
 	/* Format: Seconds.NanoSeconds+Offset(SequenceNumber) Value1 Value2 ...
@@ -85,7 +86,7 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl)
 
 		s->ts.origin.tv_nsec = (uint32_t) strtoul(ptr, &end, 10);
 		if (ptr != end)
-			flags |= IO_FORMAT_NANOSECONDS;
+			fl |= IO_FORMAT_NANOSECONDS;
 		else
 			return -3;
 	}
@@ -98,7 +99,7 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl)
 
 		offset = strtof(ptr, &end); /* offset is ignored for now */
 		if (ptr != end)
-			flags |= IO_FORMAT_OFFSET;
+			fl |= IO_FORMAT_OFFSET;
 		else
 			return -4;
 	}
@@ -109,7 +110,7 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl)
 
 		s->sequence = strtoul(ptr, &end, 10);
 		if (ptr != end)
-			flags |= IO_FORMAT_SEQUENCE;
+			fl |= IO_FORMAT_SEQUENCE;
 		else
 			return -5;
 
@@ -135,35 +136,42 @@ int io_format_villas_scan(const char *line, struct sample *s, int *fl)
 	}
 
 	if (s->length > 0)
-		flags |= IO_FORMAT_VALUES;
+		fl |= IO_FORMAT_VALUES;
 
-	if (fl)
-		*fl = flags;
-	if (flags & IO_FORMAT_OFFSET) {
+	if (flags)
+		*flags = fl;
+
+	if (fl & IO_FORMAT_OFFSET) {
 		struct timespec off = time_from_double(offset);
 		s->ts.received = time_add(&s->ts.origin, &off);
 	}
 	else
-		s->ts.received = s->ts.origin;
+		s->ts.received = time_now();
 
-	return 0;
+	return end - buf;
 }
 
-int io_format_villas_fprint(FILE *f, struct sample *s, int flags)
+size_t io_format_villas_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags)
 {
-	char line[4096];
-	int ret;
+	size_t off = 0;
 
-	ret = io_format_villas_print(line, sizeof(line), s, flags);
-	if (ret)
-		return ret;
+	for (int i = 0; i < cnt && off < len; i++)
+		off += io_format_villas_sprint_single(buf + off, len - off, smps[i], flags);
 
-	fputs(line, f);
-
-	return 0;
+	return off;
 }
 
-int io_format_villas_fscan(FILE *f, struct sample *s, int *fl)
+size_t io_format_villas_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags)
+{
+	size_t off = 0;
+
+	for (int i = 0; i < cnt && off < len; i++)
+		off += io_format_villas_sscan_single(buf + off, len - off, smps[i], flags);
+
+	return off;
+}
+
+int io_format_villas_fscan_single(FILE *f, struct sample *s, int *flags)
 {
 	char *ptr, line[4096];
 
@@ -175,16 +183,68 @@ skip:	if (fgets(line, sizeof(line), f) == NULL)
 	if (*ptr == '\0' || *ptr == '#')
 		goto skip;
 
-	return io_format_villas_scan(line, s, fl);
+	return io_format_villas_sscan_single(line, strlen(line), s, flags);
 }
 
-struct plugin p = {
+int io_format_villas_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags)
+{
+	char line[4096];
+	int ret, i;
+
+	for (i = 0; i < cnt; i++) {
+		ret = io_format_villas_sprint_single(line, sizeof(line), smps[i], flags);
+		if (ret < 0)
+			break;
+
+		fputs(line, f);
+	}
+
+	return i;
+}
+
+int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags)
+{
+	int ret, i;
+
+	for (i = 0; i < cnt; i++) {
+		ret = io_format_villas_fscan_single(f, smps[i], flags);
+		if (ret < 0)
+			return ret;
+	}
+
+	return i;
+}
+
+int io_format_villas_open(struct io *io, const char *uri, const char *mode)
+{
+	int ret;
+
+	ret = io_stream_open(io, uri, mode);
+	if (ret)
+		return ret;
+
+	FILE *f = io->mode == IO_MODE_ADVIO
+			? io->advio.output->file
+			: io->stdio.output;
+
+	fprintf(f, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]");
+
+	if (io->flags & IO_FLAG_FLUSH)
+		io_flush(io);
+
+	return 0;
+}
+
+static struct plugin p = {
 	.name = "villas",
-	.description = "Human readable VILLAS format",
+	.description = "VILLAS human readable format",
 	.type = PLUGIN_TYPE_FORMAT,
 	.io = {
+		.open	= io_format_villas_open,
 		.fprint	= io_format_villas_fprint,
 		.fscan	= io_format_villas_fscan,
+		.sprint	= io_format_villas_sprint,
+		.sscan	= io_format_villas_sscan,
 		.size = 0
 	}
 };
diff --git a/lib/formats/webmsg.c b/lib/formats/webmsg.c
index d0c1de283..1868d7b91 100644
--- a/lib/formats/webmsg.c
+++ b/lib/formats/webmsg.c
@@ -26,8 +26,9 @@
   #include <endian.h>
 #endif
 
-#include "webmsg.h"
-#include "webmsg_format.h"
+#include "plugin.h"
+#include "formats/webmsg.h"
+#include "formats/webmsg_format.h"
 
 void webmsg_ntoh(struct webmsg *m)
 {
@@ -72,3 +73,14 @@ int webmsg_verify(struct webmsg *m)
 	else
 		return 0;
 }
+
+static struct plugin p = {
+	.name = "webmsg",
+	.description = "VILLAS binary format for websockets",
+	.type = PLUGIN_TYPE_FORMAT,
+	.io = {
+		.size = 0
+	},
+};
+
+REGISTER_PLUGIN(&p);
diff --git a/lib/hook.c b/lib/hook.c
index 44f0062de..7f261666d 100644
--- a/lib/hook.c
+++ b/lib/hook.c
@@ -24,7 +24,7 @@
 
 #include "timing.h"
 #include "config.h"
-#include "msg.h"
+#include "formats/msg.h"
 #include "hook.h"
 #include "path.h"
 #include "utils.h"
diff --git a/lib/io.c b/lib/io.c
index 91e79451d..2d42b4e0d 100644
--- a/lib/io.c
+++ b/lib/io.c
@@ -21,9 +21,11 @@
  *********************************************************************************/
 
 #include <stdlib.h>
+#include <stdio.h>
 
 #include "io.h"
 #include "utils.h"
+#include "sample.h"
 
 int io_init(struct io *io, struct io_format *fmt, int flags)
 {
@@ -48,55 +50,170 @@ int io_destroy(struct io *io)
 	return 0;
 }
 
-int io_open(struct io *io, const char *uri, const char *mode)
+int io_stream_open(struct io *io, const char *uri, const char *mode)
 {
-	if (io->_vt->open)
-		return io->_vt->open(io, uri, mode);
+	int ret;
+
+	if (uri) {
+		if (aislocal(uri)) {
+			io->mode = IO_MODE_STDIO;
+
+			io->stdio.input  =
+			io->stdio.output = fopen(uri, mode);
+
+			if (io->stdio.output == NULL)
+				return -1;
+
+			ret = setvbuf(io->stdio.output, NULL, _IOLBF, BUFSIZ);
+			if (ret)
+				return -1;
+		}
+		else {
+			io->mode = IO_MODE_ADVIO;
+
+			io->advio.input  =
+			io->advio.output = afopen(uri, mode);
+
+			if (io->advio.output == NULL)
+				return -1;
+		}
+	}
 	else {
-		io->file = afopen(uri, mode);
-		if (!io->file)
+		io->mode = IO_MODE_STDIO;
+		io->flags |= IO_FLAG_FLUSH;
+
+		io->stdio.input  = stdin;
+		io->stdio.output = stdout;
+
+		ret = setvbuf(io->stdio.input, NULL, _IOLBF, BUFSIZ);
+		if (ret)
 			return -1;
 
-		return 0;
+		ret = setvbuf(io->stdio.output, NULL, _IOLBF, BUFSIZ);
+		if (ret)
+			return -1;
 	}
+
+	return 0;
+}
+
+int io_stream_close(struct io *io)
+{
+	switch (io->mode) {
+		case IO_MODE_ADVIO:
+			return afclose(io->advio.input);
+		case IO_MODE_STDIO:
+			return io->stdio.input != stdin ? fclose(io->stdio.input) : 0;
+		case IO_MODE_CUSTOM:
+			return 0;
+	}
+
+	return -1;
+}
+
+int io_stream_flush(struct io *io)
+{
+	switch (io->mode) {
+		case IO_MODE_ADVIO:
+			return afflush(io->advio.output);
+		case IO_MODE_STDIO:
+			return fflush(io->stdio.output);
+		case IO_MODE_CUSTOM:
+			return 0;
+	}
+
+	return -1;
+}
+
+int io_stream_eof(struct io *io)
+{
+	switch (io->mode) {
+		case IO_MODE_ADVIO:
+			return afeof(io->advio.input);
+		case IO_MODE_STDIO:
+			return feof(io->stdio.input);
+		case IO_MODE_CUSTOM:
+			return 0;
+	}
+
+	return -1;
+}
+
+void io_stream_rewind(struct io *io)
+{
+	switch (io->mode) {
+		case IO_MODE_ADVIO:
+			return arewind(io->advio.input);
+		case IO_MODE_STDIO:
+			return rewind(io->stdio.input);
+		case IO_MODE_CUSTOM: { }
+	}
+}
+
+int io_open(struct io *io, const char *uri, const char *mode)
+{
+	return io->_vt->open
+		? io->_vt->open(io, uri, mode)
+		: io_stream_open(io, uri, mode);
 }
 
 int io_close(struct io *io)
 {
-	return io->_vt->close ? io->_vt->close(io) : afclose(io->file);
+	return io->_vt->close
+		? io->_vt->close(io)
+	 	: io_stream_close(io);
 }
 
-int io_print(struct io *io, struct sample *smps[], size_t cnt)
+int io_flush(struct io *io)
 {
-	assert(io->_vt->print);
-
-	for (int i = 0; i < cnt; i++)
-		io->_vt->print(io, smps[i], io->flags);
-
-	return cnt;
-}
-
-int io_scan(struct io *io, struct sample *smps[], size_t cnt)
-{
-	int ret;
-
-	assert(io->_vt->scan);
-
-	for (int i = 0; i < cnt && !io_eof(io); i++) {
-		ret = io->_vt->scan(io, smps[i], NULL);
-		if (ret < 0)
-			return i;
-	}
-
-	return cnt;
+	return io->_vt->flush
+		? io->_vt->flush(io)
+		: io_stream_flush(io);
 }
 
 int io_eof(struct io *io)
 {
-	return io->_vt->eof ? io->_vt->eof(io) : afeof(io->file);
+	return io->_vt->eof
+		? io->_vt->eof(io)
+		: io_stream_eof(io);
 }
 
 void io_rewind(struct io *io)
 {
-	io->_vt->rewind ? io->_vt->rewind(io) : arewind(io->file);
+	io->_vt->rewind
+		? io->_vt->rewind(io)
+		: io_stream_rewind(io);
+}
+
+int io_print(struct io *io, struct sample *smps[], size_t cnt)
+{
+	int ret;
+
+	if (io->_vt->print)
+		ret = io->_vt->print(io, smps, cnt);
+	else {
+		FILE *f = io->mode == IO_MODE_ADVIO
+				? io->advio.output->file
+				: io->stdio.output;
+
+		ret = io->_vt->fprint(f, smps, cnt, io->flags);
+	}
+
+	if (io->flags & IO_FLAG_FLUSH)
+		io_flush(io);
+
+	return ret;
+}
+
+int io_scan(struct io *io, struct sample *smps[], size_t cnt)
+{
+	if (io->_vt->scan)
+		return io->_vt->scan(io, smps, cnt);
+	else {
+		FILE *f = io->mode == IO_MODE_ADVIO
+				? io->advio.input->file
+				: io->stdio.input;
+
+		return io->_vt->fscan(f, smps, cnt, NULL);
+	}
 }
diff --git a/lib/nodes/Makefile.inc b/lib/nodes/Makefile.inc
index d612bc48e..12e6bcf0e 100644
--- a/lib/nodes/Makefile.inc
+++ b/lib/nodes/Makefile.inc
@@ -90,8 +90,8 @@ endif
 
 # Enable Socket node type when libnl3 is available
 ifeq ($(WITH_SOCKET),1)
-	LIB_SRCS    += $(addprefix lib/nodes/, socket.c)
-	LIB_SRCS    += $(addprefix lib/, msg.c)
+	LIB_SRCS    += lib/nodes/socket.c
+	LIB_SRCS    += lib/formats/msg.c
 	LIB_CFLAGS  += -DWITH_SOCKET
 
 	# libnl3 is optional but required for network emulation and IRQ pinning
@@ -135,7 +135,8 @@ endif
 # Enable WebSocket support
 ifeq ($(WITH_WEBSOCKET),1)
 ifeq ($(shell $(PKGCONFIG) libwebsockets jansson; echo $$?),0)
-	LIB_SRCS    += lib/nodes/websocket.c lib/webmsg.c
+	LIB_SRCS    += lib/nodes/websocket.c
+	LIB_SRCS    += lib/formats/webmsg.c
 	LIB_PKGS    += libwebsockets jansson
 	LIB_CFLAGS  += -DWITH_WEBSOCKET
 endif
diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c
index 10d588819..14d60c0b0 100644
--- a/lib/nodes/nanomsg.c
+++ b/lib/nodes/nanomsg.c
@@ -27,7 +27,7 @@
 #include "plugin.h"
 #include "nodes/nanomsg.h"
 #include "utils.h"
-#include "msg.h"
+#include "formats/msg.h"
 
 int nanomsg_reverse(struct node *n)
 {
diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c
index f69f7db52..cb7c5c62e 100644
--- a/lib/nodes/socket.c
+++ b/lib/nodes/socket.c
@@ -42,8 +42,8 @@
   #define WITH_NETEM
 #endif /* WITH_LIBNL_ROUTE_30 */
 
-#include "msg.h"
-#include "msg_format.h"
+#include "formats/msg.h"
+#include "formats/msg_format.h"
 #include "sample.h"
 #include "queue.h"
 #include "plugin.h"
diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c
index 6aa153072..b4fd7b7c0 100644
--- a/lib/nodes/websocket.c
+++ b/lib/nodes/websocket.c
@@ -27,12 +27,12 @@
 #include <signal.h>
 
 #include "super_node.h"
-#include "webmsg.h"
-#include "webmsg_format.h"
 #include "timing.h"
 #include "utils.h"
 #include "plugin.h"
 
+#include "formats/webmsg.h"
+#include "formats/webmsg_format.h"
 #include "nodes/websocket.h"
 
 /* Private static storage */
diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c
index 0b84fe479..d309a1d7a 100644
--- a/lib/nodes/zeromq.c
+++ b/lib/nodes/zeromq.c
@@ -30,7 +30,7 @@
 #include "utils.h"
 #include "queue.h"
 #include "plugin.h"
-#include "msg.h"
+#include "formats/msg.h"
 
 static void *context;
 
diff --git a/src/hook.c b/src/hook.c
index 1dfaf2812..ca7a29600 100644
--- a/src/hook.c
+++ b/src/hook.c
@@ -31,7 +31,7 @@
 
 #include <villas/timing.h>
 #include <villas/sample.h>
-#include <villas/sample_io.h>
+#include <villas/io.h>
 #include <villas/hook.h>
 #include <villas/utils.h>
 #include <villas/pool.h>
@@ -45,12 +45,13 @@
 
 int cnt;
 
-struct sample **samples;
+struct sample **smps;
 struct plugin *p;
 
 struct log  l = { .state = STATE_DESTROYED };
 struct pool q = { .state = STATE_DESTROYED };
 struct hook h = { .state = STATE_DESTROYED };
+struct io  io = { .state = STATE_DESTROYED };
 
 static void quit(int signal, siginfo_t *sinfo, void *ctx)
 {
@@ -64,7 +65,7 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
 	if (ret)
 		error("Failed to destroy hook");
 
-	sample_free(samples, cnt);
+	sample_free(smps, cnt);
 
 	ret = pool_destroy(&q);
 	if (ret)
@@ -81,13 +82,20 @@ static void usage()
 	printf("  NAME      the name of the hook function\n");
 	printf("  PARAM*    a string of configuration settings for the hook\n");
 	printf("  OPTIONS is one or more of the following options:\n");
+	printf("    -f FMT  the data format\n");
 	printf("    -h      show this help\n");
 	printf("    -d LVL  set debug level to LVL\n");
-	printf("    -v CNT  process CNT samples at once\n");
+	printf("    -v CNT  process CNT smps at once\n");
 	printf("\n");
+
 	printf("The following hook functions are supported:\n");
 	plugin_dump(PLUGIN_TYPE_HOOK);
 	printf("\n");
+
+	printf("Supported IO formats:\n");
+	plugin_dump(PLUGIN_TYPE_FORMAT);
+	printf("\n");
+
 	printf("Example:");
 	printf("  villas-signal random | villas-hook skip_first seconds=10\n");
 	printf("\n");
@@ -98,6 +106,7 @@ static void usage()
 int main(int argc, char *argv[])
 {
 	int ret;
+	char *format = "villas";
 
 	size_t recv;
 
@@ -109,6 +118,8 @@ int main(int argc, char *argv[])
 	char c, *endptr;
 	while ((c = getopt(argc, argv, "hv:d:f:o:")) != -1) {
 		switch (c) {
+			case 'f':
+				format = optarg;
 				break;
 			case 'v':
 				cnt = strtoul(optarg, &endptr, 0);
@@ -139,34 +150,51 @@ check:		if (optarg == endptr)
 		exit(EXIT_FAILURE);
 	}
 
-	char *hookstr = argv[optind];
-
-	ret = signals_init(quit);
-	if (ret)
-		error("Failed to intialize signals");
+	char *hook = argv[optind];
 
 	ret = log_init(&l, l.level, LOG_ALL);
 	if (ret)
 		error("Failed to initialize log");
 
-	log_start(&l);
+	ret = log_start(&l);
+	if (ret)
+		error("Failed to start log");
+
+	ret = signals_init(quit);
+	if (ret)
+		error("Failed to intialize signals");
 
 	if (cnt < 1)
 		error("Vectorize option must be greater than 0");
 
-	memory_init(DEFAULT_NR_HUGEPAGES);
+	ret = memory_init(DEFAULT_NR_HUGEPAGES);
+	if (ret)
+		error("Failed to initialize memory");
 
-	samples = alloc(cnt * sizeof(struct sample *));
+	smps = alloc(cnt * sizeof(struct sample *));
 
 	ret = pool_init(&q, 10 * cnt, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage);
 	if (ret)
 		error("Failed to initilize memory pool");
 
-	p = plugin_lookup(PLUGIN_TYPE_HOOK, hookstr);
+	/* Initialize IO */
+	p = plugin_lookup(PLUGIN_TYPE_FORMAT, format);
 	if (!p)
-		error("Unknown hook function '%s'", hookstr);
+		error("Unknown IO format '%s'", format);
+
+	ret = io_init(&io, &p->io, IO_FORMAT_ALL);
+	if (ret)
+		error("Failed to initialize IO");
+
+	ret = io_open(&io, NULL, NULL);
+	if (ret)
+		error("Failed to open IO");
+
+	/* Initialize hook */
+	p = plugin_lookup(PLUGIN_TYPE_HOOK, hook);
+	if (!p)
+		error("Unknown hook function '%s'", hook);
 
-	/** @todo villas-hook does not use the path structure */
 	ret = hook_init(&h, &p->hook, NULL);
 	if (ret)
 		error("Failed to initialize hook");
@@ -180,35 +208,25 @@ check:		if (optarg == endptr)
 		error("Failed to start hook");
 
 	for (;;) {
-		if (feof(stdin)) {
+		if (io_eof(&io)) {
 			killme(SIGTERM);
 			pause();
 		}
 
-		ret = sample_alloc(&q, samples, cnt);
+		ret = sample_alloc(&q, smps, cnt);
 		if (ret != cnt)
-			error("Failed to allocate %d samples from pool", cnt);
+			error("Failed to allocate %d smps from pool", cnt);
 
-		recv = 0;
-		for (int j = 0; j < cnt && !feof(stdin); j++) {
-			ret = sample_io_villas_fscan(stdin, samples[j], NULL);
-			if (ret < 0)
-				break;
+		recv = io_scan(&io, smps, cnt);
 
-			samples[j]->ts.received = time_now();
-			recv++;
-		}
+		debug(15, "Read %zu smps from stdin", recv);
 
-		debug(15, "Read %zu samples from stdin", recv);
+		hook_read(&h, smps, &recv);
+		hook_write(&h, smps, &recv);
 
-		hook_read(&h, samples, &recv);
-		hook_write(&h, samples, &recv);
+		io_print(&io, smps, recv);
 
-		for (int j = 0; j < recv; j++)
-			sample_io_villas_fprint(stdout, samples[j], SAMPLE_IO_ALL);
-		fflush(stdout);
-
-		sample_free(samples, cnt);
+		sample_free(smps, cnt);
 	}
 
 	return 0;
diff --git a/src/node.c b/src/node.c
index acffd9f4a..9b73e5c35 100644
--- a/src/node.c
+++ b/src/node.c
@@ -75,6 +75,7 @@ static void usage()
 	printf("  This type of invocation is used by OPAL-RT Asynchronous processes.\n");
 	printf("  See in the RT-LAB User Guide for more information.\n\n");
 #endif
+
 	printf("Supported node-types:\n");
 	plugin_dump(PLUGIN_TYPE_NODE);
 	printf("\n");
@@ -87,6 +88,10 @@ static void usage()
 	plugin_dump(PLUGIN_TYPE_API);
 	printf("\n");
 
+	printf("Supported IO formats:\n");
+	plugin_dump(PLUGIN_TYPE_FORMAT);
+	printf("\n");
+
 	print_copyright();
 
 	exit(EXIT_FAILURE);
diff --git a/src/pipe.c b/src/pipe.c
index a8acb8251..2f1c014d6 100644
--- a/src/pipe.c
+++ b/src/pipe.c
@@ -37,6 +37,7 @@
 #include <villas/pool.h>
 #include <villas/io.h>
 #include <villas/kernel/rt.h>
+#include <villas/plugin.h>
 #include <villas/config_helper.h>
 
 #include <villas/nodes/websocket.h>
@@ -44,8 +45,9 @@
 #include "config.h"
 
 static struct super_node sn = { .state = STATE_DESTROYED }; /**< The global configuration */
+static struct io io = { .state = STATE_DESTROYED };
 
-struct dir {
+static struct dir {
 	struct pool pool;
 	pthread_t thread;
 	bool enabled;
@@ -84,21 +86,22 @@ static void usage()
 	printf("  CONFIG  path to a configuration file\n");
 	printf("  NODE    the name of the node to which samples are sent and received from\n");
 	printf("  OPTIONS are:\n");
-	printf("    -f FMT  set the format\n")
-	printf("    -d LVL  set debug log level to LVL\n");
-	printf("    -x      swap read / write endpoints\n");
-	printf("    -s      only read data from stdin and send it to node\n");
-	printf("    -r      only read data from node and write it to stdout\n");
-	printf("    -t NUM  terminate after NUM seconds\n");
-	printf("    -L NUM  terminate after NUM samples sent\n");
-	printf("    -l NUM  terminate after NUM samples received\n\n");
+	printf("    -f FMT           set the format\n");
+	printf("    -d LVL           set debug log level to LVL\n");
+	printf("    -o OPTION=VALUE  overwrite options in config file\n");
+	printf("    -x               swap read / write endpoints\n");
+	printf("    -s               only read data from stdin and send it to node\n");
+	printf("    -r               only read data from node and write it to stdout\n");
+	printf("    -t NUM           terminate after NUM seconds\n");
+	printf("    -L NUM           terminate after NUM samples sent\n");
+	printf("    -l NUM           terminate after NUM samples received\n\n");
 
 	print_copyright();
 }
 
 static void * send_loop(void *ctx)
 {
-	int ret, cnt = 0;
+	int ret, len, sent, cnt = 0;
 	struct sample *smps[node->vectorize];
 
 	/* Initialize memory */
@@ -110,42 +113,30 @@ static void * send_loop(void *ctx)
 	if (ret < 0)
 		error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret);
 
-	while (!feof(stdin)) {
-		int len;
-		for (len = 0; len < node->vectorize; len++) {
-			struct sample *s = smps[len];
-			int reason;
+	while (!io_eof(&io)) {
+		len = io_scan(&io, smps, node->vectorize);
+		if (len <= 0)
+			continue;
 
-			if (sendd.limit > 0 && cnt >= sendd.limit)
-				break;
-
-retry:			reason = sample_io_villas_fscan(stdin, s, NULL);
-			if (reason < 0) {
-				if (feof(stdin))
-					goto leave;
-				else {
-					warn("Skipped invalid message message: reason=%d", reason);
-					goto retry;
-				}
-			}
-		}
-
-		cnt += node_write(node, smps, len);
+		sent = node_write(node, smps, len);
 
+		cnt += sent;
 		if (sendd.limit > 0 && cnt >= sendd.limit)
-			goto leave2;
+			goto leave;
 
 		pthread_testcancel();
 	}
 
-leave2:	info("Reached send limit. Terminating...");
-	killme(SIGTERM);
-
-	return NULL;
-
-	/* We reached EOF on stdin here. Lets kill the process */
-leave:	if (recvv.limit < 0) {
-		info("Reached end-of-file. Terminating...");
+leave:	if (io_eof(&io)) {
+		if (recvv.limit < 0) {
+			info("Reached end-of-file. Terminating...");
+			killme(SIGTERM);
+		}
+		else
+			info("Reached end-of-file. Wait for receive side...");
+	}
+	else {
+		info("Reached send limit. Terminating...");
 		killme(SIGTERM);
 	}
 
@@ -164,26 +155,22 @@ static void * recv_loop(void *ctx)
 
 	ret = sample_alloc(&recvv.pool, smps, node->vectorize);
 	if (ret  < 0)
-		error("Failed to get %u samples out of receive pool (%d).", node->vectorize, ret);
-
-	/* Print header */
-	fprintf(stdout, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]");
-	fflush(stdout);
+		error("Failed to allocate %u samples from receive pool.", node->vectorize);
 
 	for (;;) {
 		int recv = node_read(node, smps, node->vectorize);
 		struct timespec now = time_now();
 
+		/* Fix timestamps */
 		for (int i = 0; i < recv; i++) {
 			struct sample *s = smps[i];
 
 			if (s->ts.received.tv_sec == -1 || s->ts.received.tv_sec == 0)
 				s->ts.received = now;
-
-			sample_io_villas_fprint(stdout, s, SAMPLE_IO_ALL);
-			fflush(stdout);
 		}
 
+		io_print(&io, smps, recv);
+
 		cnt += recv;
 		if (recvv.limit > 0 && cnt >= recvv.limit)
 			goto leave;
@@ -201,17 +188,21 @@ int main(int argc, char *argv[])
 {
 	int ret, level = V, timeout = 0;
 	bool reverse = false;
+	char *format = "villas";
 
 	sendd = recvv = (struct dir) {
 		.enabled = true,
 		.limit = -1
 	};
 
-	char c, *endptr;
-	while ((c = getopt(argc, argv, "hxrsd:l:L:t:f:")) != -1) {
-		switch (c) {
-			case 'f';
+	json_t *cfg_cli = json_object();
 
+	char c, *endptr;
+	while ((c = getopt(argc, argv, "hxrsd:l:L:t:f:o:")) != -1) {
+		switch (c) {
+			case 'f':
+				format = optarg;
+				break;
 			case 'x':
 				reverse = true;
 				break;
@@ -233,6 +224,11 @@ int main(int argc, char *argv[])
 			case 't':
 				timeout = strtoul(optarg, &endptr, 10);
 				goto check;
+			case 'o':
+				ret = json_object_extend_str(cfg_cli, optarg);
+				if (ret)
+					error("Invalid option: %s", optarg);
+				break;
 			case 'h':
 			case '?':
 				usage();
@@ -243,7 +239,6 @@ int main(int argc, char *argv[])
 
 check:		if (optarg == endptr)
 			error("Failed to parse parse option argument '-%c %s'", c, optarg);
-
 	}
 
 	if (argc != optind + 2) {
@@ -253,18 +248,48 @@ check:		if (optarg == endptr)
 
 	char *configfile = argv[optind];
 	char *nodestr    = argv[optind+1];
+	struct plugin *p;
 
-	log_init(&sn.log, level, LOG_ALL);
-	log_start(&sn.log);
+	ret = log_init(&sn.log, level, LOG_ALL);
+	if (ret)
+		error("Failed to intialize log");
 
-	super_node_init(&sn);
-	super_node_parse_uri(&sn, configfile);
+	ret = log_start(&sn.log);
+	if (ret)
+		error("Failed to start log");
 
-	memory_init(sn.hugepages);
-	signals_init(quit);
-	rt_init(sn.priority, sn.affinity);
+	ret = signals_init(quit);
+	if (ret)
+		error("Failed to initialize signals");
+
+	ret = memory_init(sn.hugepages);
+	if (ret)
+		error("Failed to initialize memory");
+
+	ret = rt_init(sn.priority, sn.affinity);
+	if (ret)
+		error("Failed to initalize real-time");
+
+	p = plugin_lookup(PLUGIN_TYPE_FORMAT, format);
+	if (!p)
+		error("Invalid format: %s", format);
+
+	ret = io_init(&io, &p->io, IO_FORMAT_ALL);
+	if (ret)
+		error("Failed to initialize IO");
+
+	ret = io_open(&io, NULL, NULL);
+	if (ret)
+		error("Failed to open IO");
+
+	ret = super_node_init(&sn);
+	if (ret)
+		error("Failed to initialize super-node");
+
+	ret = super_node_parse_uri(&sn, configfile);
+	if (ret)
+		error("Failed to parse configuration");
 
-	/* Initialize node */
 	node = list_lookup(&sn.nodes, nodestr);
 	if (!node)
 		error("Node '%s' does not exist!", nodestr);
diff --git a/src/signal.c b/src/signal.c
index 23df871b7..b7e3fceb2 100644
--- a/src/signal.c
+++ b/src/signal.c
@@ -30,7 +30,7 @@
 
 #include <villas/utils.h>
 #include <villas/sample.h>
-#include <villas/sample_io.h>
+#include <villas/formats/villas.h>
 #include <villas/timing.h>
 #include <villas/node.h>
 #include <villas/plugin.h>
@@ -39,6 +39,7 @@
 /* Some default values */
 struct node n;
 struct log l;
+struct io io;
 
 struct sample *t;
 
@@ -54,14 +55,15 @@ void usage()
 	printf("    ramp\n");
 	printf("\n");
 	printf("  OPTIONS is one or more of the following options:\n");
-	printf("    -d LVL   set debug level\n");
-	printf("    -v NUM   specifies how many values a message should contain\n");
-	printf("    -r HZ    how many messages per second\n");
-	printf("    -n       non real-time mode. do not throttle output.\n");
-	printf("    -f HZ    the frequency of the signal\n");
-	printf("    -a FLT   the amplitude\n");
-	printf("    -D FLT   the standard deviation for 'random' signals\n");
-	printf("    -l NUM   only send LIMIT messages and stop\n\n");
+	printf("    -d LVL  set debug level\n");
+	printf("    -f FMT  set the format\n");
+	printf("    -v NUM  specifies how many values a message should contain\n");
+	printf("    -r HZ   how many messages per second\n");
+	printf("    -n      non real-time mode. do not throttle output.\n");
+	printf("    -F HZ   the frequency of the signal\n");
+	printf("    -a FLT  the amplitude\n");
+	printf("    -D FLT  the standard deviation for 'random' signals\n");
+	printf("    -l NUM  only send LIMIT messages and stop\n\n");
 
 	print_copyright();
 }
@@ -78,6 +80,14 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
 	if (ret)
 		error("Failed to destroy node");
 
+	ret = io_close(&io);
+	if (ret)
+		error("Failed to close output");
+
+	ret = io_destroy(&io);
+	if (ret)
+		error("Failed to destroy output");
+
 	ret = log_stop(&l);
 	if (ret)
 		error("Failed to stop log");
@@ -92,7 +102,8 @@ int main(int argc, char *argv[])
 {
 	int ret;
 	struct plugin *p;
-	struct signal *s;
+
+	char *format = "villas"; /** @todo hardcoded for now */
 
 	ret = log_init(&l, l.level, LOG_ALL);
 	if (ret)
@@ -114,6 +125,18 @@ int main(int argc, char *argv[])
 	if (ret)
 		error("Failed to initialize node");
 
+	p = plugin_lookup(PLUGIN_TYPE_FORMAT, format);
+	if (!p)
+		error("Invalid output format '%s'", format);
+
+	ret = io_init(&io, &p->io, IO_FLAG_FLUSH | (IO_FORMAT_ALL & ~IO_FORMAT_OFFSET));
+	if (ret)
+		error("Failed to initialize output");
+
+	ret = io_open(&io, NULL, NULL);
+	if (ret)
+		error("Failed to open output");
+
 	ret = node_parse_cli(&n, argc, argv);
 	if (ret)
 		error("Failed to parse command line options");
@@ -122,29 +145,20 @@ int main(int argc, char *argv[])
 	if (ret)
 		error("Failed to verify node configuration");
 
-	info("Starting signal generation: %s", node_name(&n));
-
 	/* Allocate memory for message buffer */
-	s = n._vd;
+	struct signal *s = n._vd;
 
 	t = alloc(SAMPLE_LEN(s->values));
 
 	t->capacity = s->values;
 
-	/* Print header */
-	printf("# VILLASnode signal params: type=%s, values=%u, rate=%f, limit=%d, amplitude=%f, freq=%f\n",
-		argv[optind], s->values, s->rate, s->limit, s->amplitude, s->frequency);
-	printf("# %-20s\t\t%s\n", "sec.nsec(seq)", "data[]");
-
 	ret = node_start(&n);
 	if (ret)
 		serror("Failed to start node");
 
 	for (;;) {
 		node_read(&n, &t, 1);
-
-		sample_io_villas_fprint(stdout, t, SAMPLE_IO_ALL & ~SAMPLE_IO_OFFSET);
-		fflush(stdout);
+		io_print(&io, &t, 1);
 	}
 
 	return 0;
diff --git a/src/test-cmp.c b/src/test-cmp.c
index eb122bea6..427f29be9 100644
--- a/src/test-cmp.c
+++ b/src/test-cmp.c
@@ -28,7 +28,8 @@
 #include <jansson.h>
 
 #include <villas/sample.h>
-#include <villas/sample_io.h>
+#include <villas/io.h>
+#include <villas/formats/villas.h>
 #include <villas/utils.h>
 #include <villas/timing.h>
 #include <villas/pool.h>
@@ -103,11 +104,21 @@ check:		if (optarg == endptr)
 	f1.path = argv[optind];
 	f2.path = argv[optind + 1];
 
-	log_init(&log, V, LOG_ALL);
-	log_start(&log);
+	ret = log_init(&log, V, LOG_ALL);
+	if (ret)
+		error("Failed to initialize log");
 
-	pool_init(&pool, 1024, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_heap);
-	sample_alloc(&pool, samples, 2);
+	ret = log_start(&log);
+	if (ret)
+		error("Failed to start log");
+
+	ret = pool_init(&pool, 1024, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_heap);
+	if (ret)
+		error("Failed to initialize pool");
+
+	ret = sample_alloc(&pool, samples, 2);
+	if (ret != 2)
+		error("Failed to allocate samples");
 
 	f1.sample = samples[0];
 	f2.sample = samples[1];
@@ -121,11 +132,11 @@ check:		if (optarg == endptr)
 		serror("Failed to open file: %s", f2.path);
 
 	while (!feof(f1.handle) && !feof(f2.handle)) {
-		ret = sample_io_villas_fscan(f1.handle, f1.sample, &f1.flags);
+		ret = io_format_villas_fscan(f1.handle, &f1.sample, 1, &f1.flags);
 		if (ret < 0 && !feof(f1.handle))
 			goto out;
 
-		ret = sample_io_villas_fscan(f2.handle, f2.sample, &f2.flags);
+		ret = io_format_villas_fscan(f2.handle, &f2.sample, 1, &f2.flags);
 		if (ret < 0 && !feof(f2.handle))
 			goto out;
 
@@ -139,7 +150,7 @@ check:		if (optarg == endptr)
 		}
 
 		/* Compare sequence no */
-		if ((f1.flags & SAMPLE_IO_SEQUENCE) && (f2.flags & SAMPLE_IO_SEQUENCE)) {
+		if ((f1.flags & IO_FORMAT_SEQUENCE) && (f2.flags & IO_FORMAT_SEQUENCE)) {
 			if (f1.sample->sequence != f2.sample->sequence) {
 				printf("sequence no: %d != %d\n", f1.sample->sequence, f2.sample->sequence);
 				ret = 2;
diff --git a/tests/unit/advio.c b/tests/unit/advio.c
index 96bbbd608..aa8790a3e 100644
--- a/tests/unit/advio.c
+++ b/tests/unit/advio.c
@@ -28,7 +28,7 @@
 #include <villas/utils.h>
 #include <villas/advio.h>
 #include <villas/sample.h>
-#include <villas/sample_io.h>
+#include <villas/formats/villas.h>
 
 /** This URI points to a Sciebo share which contains some test files.
  * The Sciebo share is read/write accessible via WebDAV. */
@@ -40,12 +40,12 @@ Test(advio, local)
 	int ret;
 	char *buf = NULL;
 	size_t buflen = 0;
-	
+
 	/* We open this file and check the first line */
 	af = afopen(__FILE__, "r");
 	cr_assert(af, "Failed to open local file");
 
-	ret = getline(&buf, &buflen, af->file);	
+	ret = getline(&buf, &buflen, af->file);
 	cr_assert_gt(ret, 1);
 	cr_assert_str_eq(buf, "/** Unit tests for advio\n");
 }
@@ -74,26 +74,26 @@ Test(advio, download_large)
 {
 	AFILE *af;
 	int ret, len = 16;
-	
+
 	struct sample *smp = alloc(SAMPLE_LEN(len));
 	smp->capacity = len;
 
 	af = afopen(BASE_URI "/download-large" , "r");
 	cr_assert(af, "Failed to download file");
 
-	ret = sample_io_villas_fscan(af->file, smp, NULL);
-	cr_assert_eq(ret, 0);
-	
+	ret = io_format_villas_fscan(af->file, &smp, 1, NULL);
+	cr_assert_eq(ret, 1);
+
 	cr_assert_eq(smp->sequence, 0);
 	cr_assert_eq(smp->length, 4);
 	cr_assert_eq(smp->ts.origin.tv_sec, 1497710378);
 	cr_assert_eq(smp->ts.origin.tv_nsec, 863332240);
-	
+
 	float data[] = { 0.022245, 0.000000, -1.000000, 1.000000 };
-	
+
 	for (int i = 0; i < smp->length; i++)
 		cr_assert_float_eq(smp->data[i].f, data[i], 1e-5);
-	
+
 	ret = afclose(af);
 	cr_assert_eq(ret, 0, "Failed to close file");
 }
@@ -106,44 +106,44 @@ Test(advio, resume)
 	char line1[32];
 	char *line2 = NULL;
 	size_t linelen = 0;
-	
+
 	mkdtemp(dir);
 	ret = asprintf(&fn, "%s/file", dir);
 	cr_assert_gt(ret, 0);
-	
+
 	af1 = afopen(fn, "w+");
 	cr_assert_not_null(af1);
-	
+
 	/* We flush once the empty file in order to upload an empty file. */
 	aupload(af1, 0);
-	
+
 	af2 = afopen(fn, "r");
 	cr_assert_not_null(af2);
-	
+
 	for (int i = 0; i < 100; i++) {
 		snprintf(line1, sizeof(line1), "This is line %d\n", i);
-		
+
 		afputs(line1, af1);
 		aupload(af1, 1);
-		
+
 		adownload(af2, 1);
 		agetline(&line2, &linelen, af2);
-		
+
 		cr_assert_str_eq(line1, line2);
 	}
-	
+
 	ret = afclose(af1);
 	cr_assert_eq(ret, 0);
-	
+
 	ret = afclose(af2);
 	cr_assert_eq(ret, 0);
-	
+
 	ret = unlink(fn);
 	cr_assert_eq(ret, 0);
-	
+
 	ret = rmdir(dir);
 	cr_assert_eq(ret, 0);
-	
+
 	free(line2);
 }
 
@@ -236,4 +236,4 @@ Test(advio, append)
 	cr_assert(ret == 0, "Failed to close file");
 
 	cr_assert_arr_eq(buffer, expect, sizeof(expect));
-}
\ No newline at end of file
+}
diff --git a/tests/unit/io.c b/tests/unit/io.c
new file mode 100644
index 000000000..431f7132d
--- /dev/null
+++ b/tests/unit/io.c
@@ -0,0 +1,148 @@
+/** Unit tests for IO formats.
+ *
+ * @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
+ * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *********************************************************************************/
+
+#include <stdio.h>
+
+#include <criterion/criterion.h>
+#include <criterion/parameterized.h>
+#include <criterion/logging.h>
+
+#include "utils.h"
+#include "timing.h"
+#include "sample.h"
+#include "plugin.h"
+#include "pool.h"
+#include "io.h"
+
+#define NUM_SAMPLES 10
+#define NUM_VALUES 10
+
+void cr_assert_eq_sample(struct sample *s, struct sample *t)
+{
+	cr_assert_eq(s->length, t->length);
+	cr_assert_eq(s->sequence, t->sequence);
+	cr_assert_eq(s->format, t->format);
+
+	cr_assert_eq(s->ts.origin.tv_sec, t->ts.origin.tv_sec);
+	cr_assert_eq(s->ts.origin.tv_nsec, t->ts.origin.tv_nsec);
+	cr_assert_eq(s->ts.received.tv_sec, t->ts.received.tv_sec);
+	cr_assert_eq(s->ts.received.tv_nsec, t->ts.received.tv_nsec);
+
+	for (int i = 0; i < MIN(s->length, t->length); i++)
+		cr_assert_float_eq(s->data[i].f, t->data[i].f, 1e-6);
+}
+
+ParameterizedTestParameters(io, highlevel)
+{
+	static char formats[][32] = {
+		"villas",
+		"json",
+		"csv"
+	};
+
+	return cr_make_param_array(char[32], formats, ARRAY_LEN(formats));
+}
+
+ParameterizedTest(char *fmt, io, highlevel)
+{
+	int ret;
+	char filename[64];
+
+	struct io io;
+	struct pool p = { .state = STATE_DESTROYED };
+	struct sample *smps[NUM_SAMPLES];
+	struct sample *smpt[NUM_SAMPLES];
+
+	ret = pool_init(&p, 2 * NUM_SAMPLES, SAMPLE_LEN(NUM_VALUES), &memtype_hugepage);
+	cr_assert_eq(ret, 0);
+
+	/* Prepare a sample with arbitrary data */
+	ret = sample_alloc(&p, smps, NUM_SAMPLES);
+	cr_assert_eq(ret, NUM_SAMPLES);
+
+	ret = sample_alloc(&p, smpt, NUM_SAMPLES);
+	cr_assert_eq(ret, NUM_SAMPLES);
+
+	for (int i = 0; i < NUM_SAMPLES; i++) {
+		smpt[i]->capacity =
+		smps[i]->capacity = NUM_VALUES;
+		smps[i]->length = NUM_VALUES;
+		smps[i]->sequence = 235 + i;
+		smps[i]->format = 0; /* all float */
+		smps[i]->ts.origin = time_now();
+		smps[i]->ts.received = (struct timespec) {
+			.tv_sec  = smps[i]->ts.origin.tv_sec - 1,
+			.tv_nsec = smps[i]->ts.origin.tv_nsec
+		};
+
+		for (int j = 0; j < smps[i]->length; j++)
+			smps[i]->data[j].f = j * 1.2 + i * 100;
+	}
+
+	/* Open a file for IO */
+	strncpy(filename, "/tmp/villas-unit-test.XXXXXX", sizeof(filename));
+	mktemp(filename);
+
+	printf("Writing to file: %s\n", filename);
+
+	struct plugin *pl;
+
+	pl = plugin_lookup(PLUGIN_TYPE_FORMAT, fmt);
+	cr_assert_not_null(pl, "Format '%s' does not exist", fmt);
+
+	ret = io_init(&io, &pl->io, IO_FORMAT_ALL);
+	cr_assert_eq(ret, 0);
+
+	ret = io_open(&io, filename, "w+");
+	cr_assert_eq(ret, 0);
+
+	ret = io_print(&io, smps, NUM_SAMPLES);
+	cr_assert_eq(ret, NUM_SAMPLES);
+
+	io_rewind(&io);
+	io_flush(&io);
+
+	char cmd[128];
+	snprintf(cmd, sizeof(cmd), "cat %s", filename);
+	system(cmd);
+
+	ret = io_scan(&io, smpt, NUM_SAMPLES);
+	cr_assert_eq(ret, NUM_SAMPLES, "Read only %d of %d samples back", ret, NUM_SAMPLES);
+
+	for (int i = 0; i < 0; i++)
+		cr_assert_eq_sample(smps[i], smpt[i]);
+
+	ret = io_close(&io);
+	cr_assert_eq(ret, 0);
+
+	ret = io_destroy(&io);
+	cr_assert_eq(ret, 0);
+
+	ret = unlink(filename);
+	cr_assert_eq(ret, 0);
+
+	sample_free(smps, NUM_SAMPLES);
+	sample_free(smpt, NUM_SAMPLES);
+
+	ret = pool_destroy(&p);
+	cr_assert_eq(ret, 0);
+}
diff --git a/tests/unit/sample_io.c b/tests/unit/sample_io.c
deleted file mode 100644
index fee5a7b62..000000000
--- a/tests/unit/sample_io.c
+++ /dev/null
@@ -1,95 +0,0 @@
-/** Unit tests for the sample_io module.
- *
- * @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
- * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
- * @license GNU General Public License (version 3)
- *
- * VILLASnode
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- *********************************************************************************/
-
-#include <stdio.h>
-
-#include <criterion/criterion.h>
-#include <criterion/parameterized.h>
-
-#include "utils.h"
-#include "timing.h"
-#include "sample.h"
-#include "sample_io.h"
-
-ParameterizedTestParameters(sample_io, read_write)
-{
-	static enum sample_io_format formats[] = {
-		SAMPLE_IO_FORMAT_VILLAS,
-		SAMPLE_IO_FORMAT_JSON,
-	};
-
-	return cr_make_param_array(enum sample_io_format, formats, ARRAY_LEN(formats));
-}
-
-ParameterizedTest(enum sample_io_format *fmt, sample_io, read_write)
-{
-	FILE *f;
-	int ret;
-	struct sample *s, *t;
-
-	/* Prepare a sample with arbitrary data */
-	s = malloc(SAMPLE_LEN(16));
-	cr_assert_not_null(s);
-
-	t = malloc(SAMPLE_LEN(16));
-	cr_assert_not_null(s);
-
-	t->capacity =
-	s->capacity = 16;
-	s->length = 12;
-	s->sequence = 235;
-	s->format = 0;
-	s->ts.origin = time_now();
-	s->ts.received = (struct timespec) { s->ts.origin.tv_sec - 1, s->ts.origin.tv_nsec };
-
-	for (int i = 0; i < s->length; i++)
-		s->data[i].f = i * 1.2;
-
-	/* Open a file for IO */
-	f = tmpfile();
-	cr_assert_not_null(f);
-
-	ret = sample_io_fprint(f, s, *fmt, SAMPLE_IO_ALL);
-	cr_assert_eq(ret, 0);
-
-	rewind(f);
-
-	ret = sample_io_fscan(f, t, *fmt, NULL);
-	cr_assert_eq(ret, 0);
-
-	cr_assert_eq(s->length, t->length);
-	cr_assert_eq(s->sequence, t->sequence);
-	cr_assert_eq(s->format, t->format);
-
-	cr_assert_eq(s->ts.origin.tv_sec, t->ts.origin.tv_sec);
-	cr_assert_eq(s->ts.origin.tv_nsec, t->ts.origin.tv_nsec);
-	cr_assert_eq(s->ts.received.tv_sec, t->ts.received.tv_sec);
-	cr_assert_eq(s->ts.received.tv_nsec, t->ts.received.tv_nsec);
-
-	for (int i = 0; i < MIN(s->length, t->length); i++)
-		cr_assert_float_eq(s->data[i].f, t->data[i].f, 1e-6);
-
-	fclose(f);
-
-	free(s);
-	free(t);
-}
\ No newline at end of file