From 3eea0c67bb42e14a009c996f04fc1984478d24a5 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 14 Aug 2017 14:42:07 +0200 Subject: [PATCH] - introduce low-level interface for new IO subsystem: we now have a fully extensible system for new IO formats and file formats - reworked file node-type to remove in / out directions --- include/villas/compat.h | 4 + include/villas/io.h | 96 +---- include/villas/{formats => io}/csv.h | 4 +- include/villas/{formats => io}/json.h | 8 +- include/villas/{formats => io}/msg.h | 11 +- include/villas/{formats => io}/msg_format.h | 0 include/villas/io/raw.h | 60 +++ include/villas/{formats => io}/villas.h | 8 +- include/villas/{formats => io}/webmsg.h | 0 .../villas/{formats => io}/webmsg_format.h | 0 include/villas/io_format.h | 146 +++++++ include/villas/kernel/rt.h | 3 + include/villas/nodes/file.h | 39 +- include/villas/nodes/nanomsg.h | 8 + include/villas/nodes/socket.h | 18 + include/villas/nodes/websocket.h | 31 +- include/villas/nodes/zeromq.h | 9 +- include/villas/plugin.h | 4 +- include/villas/sample.h | 18 +- lib/Makefile.inc | 2 +- lib/Makefile.villas.inc | 2 +- lib/advio.c | 11 +- lib/formats/hdf5.c | 90 ---- lib/hook.c | 2 +- lib/io.c | 118 +++++- lib/{formats => io}/Makefile.inc | 2 +- lib/{formats => io}/csv.c | 31 +- lib/io/h5pt.c | 170 ++++++++ lib/{formats => io}/json.c | 32 +- lib/{formats => io}/msg.c | 80 ++-- lib/io/raw.c | 241 +++++++++++ lib/{formats => io}/villas.c | 94 +++-- lib/{formats => io}/webmsg.c | 98 ++++- lib/io_format.c | 46 ++ lib/nodes/Makefile.inc | 4 +- lib/nodes/cbuilder.c | 13 +- lib/nodes/file.c | 392 +++++++----------- lib/nodes/nanomsg.c | 38 +- lib/nodes/socket.c | 242 ++--------- lib/nodes/websocket.c | 216 +++++----- lib/nodes/zeromq.c | 33 +- src/hook.c | 18 +- src/node.c | 2 +- src/pipe.c | 4 +- src/signal.c | 8 +- src/test-cmp.c | 7 +- tests/unit/io.c | 136 ++++-- 47 files changed, 1622 insertions(+), 977 deletions(-) rename include/villas/{formats => io}/csv.h (86%) rename include/villas/{formats => io}/json.h (77%) rename include/villas/{formats => io}/msg.h (87%) rename include/villas/{formats => io}/msg_format.h (100%) create mode 100644 include/villas/io/raw.h rename include/villas/{formats => io}/villas.h (76%) rename include/villas/{formats => io}/webmsg.h (100%) rename include/villas/{formats => io}/webmsg_format.h (100%) create mode 100644 include/villas/io_format.h delete mode 100644 lib/formats/hdf5.c rename lib/{formats => io}/Makefile.inc (94%) rename lib/{formats => io}/csv.c (73%) create mode 100644 lib/io/h5pt.c rename lib/{formats => io}/json.c (79%) rename lib/{formats => io}/msg.c (59%) create mode 100644 lib/io/raw.c rename lib/{formats => io}/villas.c (72%) rename lib/{formats => io}/webmsg.c (53%) create mode 100644 lib/io_format.c diff --git a/include/villas/compat.h b/include/villas/compat.h index 9d8377eb5..ad574fda2 100644 --- a/include/villas/compat.h +++ b/include/villas/compat.h @@ -31,11 +31,15 @@ size_t json_dumpb(const json_t *json, char *buffer, size_t size, size_t flags); #define le16toh(x) OSSwapLittleToHostInt16(x) #define le32toh(x) OSSwapLittleToHostInt32(x) + #define le64toh(x) OSSwapLittleToHostInt64(x) #define be16toh(x) OSSwapBigToHostInt16(x) #define be32toh(x) OSSwapBigToHostInt32(x) + #define be64toh(x) OSSwapBigToHostInt64(x) #define htole16(x) OSSwapHostToLittleInt16(x) #define htole32(x) OSSwapHostToLittleInt32(x) + #define htole64(x) OSSwapHostToLittleInt64(x) #define htobe16(x) OSSwapHostToBigInt16(x) #define htobe32(x) OSSwapHostToBigInt32(x) + #define htobe64(x) OSSwapHostToBigInt64(x) #endif /* __MACH__ */ diff --git a/include/villas/io.h b/include/villas/io.h index e8668c895..83e0a0c03 100644 --- a/include/villas/io.h +++ b/include/villas/io.h @@ -28,91 +28,10 @@ /* Forward declarations */ struct sample; -struct io; +struct io_format; -/** These flags define the format which is used by io_fscan() and io_fprint(). */ enum io_flags { - IO_FORMAT_NANOSECONDS = (1 << 0), /**< Include nanoseconds in output. */ - IO_FORMAT_OFFSET = (1 << 1), /**< Include offset / delta between received and send timestamps. */ - IO_FORMAT_SEQUENCE = (1 << 2), /**< Include sequence number in output. */ - IO_FORMAT_VALUES = (1 << 3), /**< Include values in output. */ - IO_FORMAT_ALL = 16-1, /**< Enable all output options. */ - IO_FLAG_FLUSH = (1 << 8), /**< Flush the output stream after each chunk of samples. */ -}; - -struct io_format { - int (*init)(struct io *io); - int (*destroy)(struct io *io); - - /** @{ - * High-level interface - */ - - /** Open an IO stream. - * - * @see fopen() - */ - int (*open)(struct io *io, const char *uri, const char *mode); - - /** Close an IO stream. - * - * @see fclose() - */ - int (*close)(struct io *io); - - /** Check if end-of-file was reached. - * - * @see feof() - */ - int (*eof)(struct io *io); - - /** Rewind an IO stream. - * - * @see rewind() - */ - void (*rewind)(struct io *io); - - /** Flush buffered data to disk. - * - * @see fflush() - */ - int (*flush)(struct io *io); - - int (*print)(struct io *io, struct sample *smps[], size_t cnt); - int (*scan)( struct io *io, struct sample *smps[], size_t cnt); - /** @} */ - - /** @{ - * Low-level interface - */ - - /** Parse samples from the buffer \p buf with a length of \p len bytes. - * - * @return The number of bytes consumed of \p buf. - */ - size_t (*sscan)( char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags); - - /** Print \p cnt samples from \p smps into buffer \p buf of length \p len. - * - * @return The number of bytes written to \p buf. - */ - size_t (*sprint)(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags); - - /** Parse up to \p cnt samples from stream \p f into array \p smps. - * - * @return The number of samples parsed. - */ - int (*fscan)( FILE *f, struct sample *smps[], size_t cnt, int *flags); - - /** Print \p cnt samples from \p smps to stream \p f. - * - * @return The number of samples written to \p f. - */ - int (*fprint)(FILE *f, struct sample *smps[], size_t cnt, int flags); - - /** @} */ - - size_t size; /**< Number of bytes to allocate for io::_vd */ + IO_FLUSH = (1 << 8) /**< Flush the output stream after each chunk of samples. */ }; struct io { @@ -126,7 +45,7 @@ struct io { } mode; /** A format type can use this file handle or overwrite the - * io_format::{open,close,eof,rewind} functions and the private + * format::{open,close,eof,rewind} functions and the private * data in io::_vd. */ union { @@ -140,6 +59,11 @@ struct io { } advio; }; + struct { + char *input; + char *output; + } buffer; + void *_vd; struct io_format *_vt; }; @@ -148,7 +72,7 @@ int io_init(struct io *io, struct io_format *fmt, int flags); int io_destroy(struct io *io); -int io_open(struct io *io, const char *uri, const char *mode); +int io_open(struct io *io, const char *uri); int io_close(struct io *io); @@ -163,7 +87,7 @@ void io_rewind(struct io *io); int io_flush(struct io *io); -int io_stream_open(struct io *io, const char *uri, const char *mode); +int io_stream_open(struct io *io, const char *uri); int io_stream_close(struct io *io); diff --git a/include/villas/formats/csv.h b/include/villas/io/csv.h similarity index 86% rename from include/villas/formats/csv.h rename to include/villas/io/csv.h index 77a0deed7..993305664 100644 --- a/include/villas/formats/csv.h +++ b/include/villas/io/csv.h @@ -30,6 +30,6 @@ struct sample; #define CSV_SEPARATOR '\t' -int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags); +int csv_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags); -int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags); +int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags); diff --git a/include/villas/formats/json.h b/include/villas/io/json.h similarity index 77% rename from include/villas/formats/json.h rename to include/villas/io/json.h index 6454cdef4..045a1201e 100644 --- a/include/villas/formats/json.h +++ b/include/villas/io/json.h @@ -26,10 +26,10 @@ #include "sample.h" -int io_format_json_pack(json_t **j, struct sample *s, int flags); +int json_pack_sample(json_t **j, struct sample *s, int flags); -int io_format_json_unpack(json_t *j, struct sample *s, int *flags); +int json_unpack_sample(json_t *j, struct sample *s, int *flags); -int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags); +int json_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags); -int io_format_json_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags); +int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags); diff --git a/include/villas/formats/msg.h b/include/villas/io/msg.h similarity index 87% rename from include/villas/formats/msg.h rename to include/villas/io/msg.h index 194ffe1b3..c4094d8db 100644 --- a/include/villas/formats/msg.h +++ b/include/villas/io/msg.h @@ -23,12 +23,12 @@ #pragma once +#include + /* Forward declarations. */ struct msg; struct sample; - -/** The maximum length of a packet which contains stuct msg. */ -#define MSG_MAX_PACKET_LEN 1500 +struct io; /** Swaps the byte-order of the message. * @@ -60,9 +60,8 @@ int msg_to_sample(struct msg *msg, struct sample *smp); /** Copy fields form \p smp into \p msg. */ int msg_from_sample(struct msg *msg, struct sample *smp); - /** Copy / read struct msg's from buffer \p buf to / fram samples \p smps. */ -ssize_t msg_buffer_from_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len); +int msg_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags); /** Read struct sample's from buffer \p buf into samples \p smps. */ -int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len); +int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags); diff --git a/include/villas/formats/msg_format.h b/include/villas/io/msg_format.h similarity index 100% rename from include/villas/formats/msg_format.h rename to include/villas/io/msg_format.h diff --git a/include/villas/io/raw.h b/include/villas/io/raw.h new file mode 100644 index 000000000..797503fed --- /dev/null +++ b/include/villas/io/raw.h @@ -0,0 +1,60 @@ +/** RAW IO format + * + * @file + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#pragma once + +#include + +/* Forward declarations */ +struct sample; + +enum raw_flags { + RAW_FAKE = (1 << 16), /**< Treat the first three values as: sequenceno, seconds, nanoseconds */ + + RAW_BE_INT = (1 << 17), /**< Byte-order for integer data: big-endian if set. */ + RAW_BE_FLT = (1 << 18), /**< Byte-order for floating point data: big-endian if set. */ + RAW_BE_HDR = (1 << 19), /**< Byte-order for fake header fields: big-endian if set. */ + + /** Byte-order for all fields: big-endian if set. */ + RAW_BE = RAW_BE_INT | RAW_BE_FLT | RAW_BE_HDR, + + /** Mix floating and integer types. + * + * io_raw_sscan() parses all values as single / double precission fp. + * io_raw_sprint() uses sample::format to determine the type. + */ + RAW_AUTO = (1 << 22), + RAW_FLT = (1 << 23), /**< Data-type: floating point otherwise integer. */ + + //RAW_1 = (0 << 24), /**< Pack each value as a single bit. */ + RAW_8 = (3 << 24), /**< Pack each value as a byte. */ + RAW_16 = (4 << 24), /**< Pack each value as a word. */ + RAW_32 = (5 << 24), /**< Pack each value as a double word. */ + RAW_64 = (6 << 24) /**< Pack each value as a quad word. */ +}; + +/** Copy / read struct msg's from buffer \p buf to / fram samples \p smps. */ +int raw_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags); + +/** Read struct sample's from buffer \p buf into samples \p smps. */ +int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags); diff --git a/include/villas/formats/villas.h b/include/villas/io/villas.h similarity index 76% rename from include/villas/formats/villas.h rename to include/villas/io/villas.h index 798e289c5..94aadc40d 100644 --- a/include/villas/formats/villas.h +++ b/include/villas/io/villas.h @@ -27,10 +27,10 @@ #include "io.h" -int io_format_villas_print(struct io *io, struct sample *smps[], size_t cnt); +int villas_print(struct io *io, struct sample *smps[], unsigned cnt); -int io_format_villas_scan(struct io *io, struct sample *smps[], size_t cnt); +int villas_scan(struct io *io, struct sample *smps[], unsigned cnt); -int io_format_villas_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags); +int villas_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags); -int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags); +int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags); diff --git a/include/villas/formats/webmsg.h b/include/villas/io/webmsg.h similarity index 100% rename from include/villas/formats/webmsg.h rename to include/villas/io/webmsg.h diff --git a/include/villas/formats/webmsg_format.h b/include/villas/io/webmsg_format.h similarity index 100% rename from include/villas/formats/webmsg_format.h rename to include/villas/io/webmsg_format.h diff --git a/include/villas/io_format.h b/include/villas/io_format.h new file mode 100644 index 000000000..d82bd5c17 --- /dev/null +++ b/include/villas/io_format.h @@ -0,0 +1,146 @@ +/** Read / write sample data in different formats. + * + * @file + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#pragma once + +#include + +/* Forward declarations */ +struct sample; +struct io; + +enum io_format_flags { + IO_FORMAT_NANOSECONDS = (1 << 0), /**< Include nanoseconds in output. */ + IO_FORMAT_OFFSET = (1 << 1), /**< Include offset / delta between received and send timestamps. */ + IO_FORMAT_SEQUENCE = (1 << 2), /**< Include sequence number in output. */ + IO_FORMAT_VALUES = (1 << 3), /**< Include values in output. */ + IO_FORMAT_ALL = 15, /**< Enable all output options. */ + + IO_FORMAT_BINARY = (1 << 8) +}; + +struct io_format { + int (*init)(struct io *io); + int (*destroy)(struct io *io); + + /** @{ + * High-level interface + */ + + /** Open an IO stream. + * + * @see fopen() + */ + int (*open)(struct io *io, const char *uri); + + /** Close an IO stream. + * + * @see fclose() + */ + int (*close)(struct io *io); + + /** Check if end-of-file was reached. + * + * @see feof() + */ + int (*eof)(struct io *io); + + /** Rewind an IO stream. + * + * @see rewind() + */ + void (*rewind)(struct io *io); + + /** Flush buffered data to disk. + * + * @see fflush() + */ + int (*flush)(struct io *io); + + int (*print)(struct io *io, struct sample *smps[], unsigned cnt); + int (*scan)( struct io *io, struct sample *smps[], unsigned cnt); + /** @} */ + + /** @{ + * Low-level interface + */ + + /** @see io_format_sscan */ + int (*sscan)(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags); + + /** @see io_format_sprint */ + int (*sprint)(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags); + + /** @see io_format_fscan */ + int (*fscan)(FILE *f, struct sample *smps[], unsigned cnt, int *flags); + + /** @see io_format_fprint */ + int (*fprint)(FILE *f, struct sample *smps[], unsigned cnt, int flags); + + /** @} */ + + size_t size; /**< Number of bytes to allocate for io::_vd */ + int flags; /**< A set of flags which is automatically used. */ +}; + +struct io_format * io_format_lookup(const char *name); + +/** Parse samples from the buffer \p buf with a length of \p len bytes. + * + * @param buf[in] The buffer of data which should be parsed / de-serialized. + * @param len[in] The length of the buffer \p buf. + * @param rbytes[out] The number of bytes which have been read from \p buf. + * @param smps[out] The array of pointers to samples. + * @param cnt[in] The number of pointers in the array \p smps. + * + * @retval >=0 The number of samples which have been parsed from \p buf and written into \p smps. + * @retval <0 Something went wrong. + */ +int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags); + +/** Print \p cnt samples from \p smps into buffer \p buf of length \p len. + * + * @param buf[out] The buffer which should be filled with serialized data. + * @param len[in] The length of the buffer \p buf. + * @param rbytes[out] The number of bytes which have been written to \p buf. Ignored if NULL. + * @param smps[in] The array of pointers to samples. + * @param cnt[in] The number of pointers in the array \p smps. + * + * @retval >=0 The number of samples from \p smps which have been written into \p buf. + * @retval <0 Something went wrong. + */ +int io_format_sprint(struct io_format *fmt, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags); + +/** Parse up to \p cnt samples from stream \p f into array \p smps. + * + * @retval >=0 The number of samples which have been parsed from \p f and written into \p smps. + * @retval <0 Something went wrong. + */ +int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int *flags); + +/** Print \p cnt samples from \p smps to stream \p f. + * + * @retval >=0 The number of samples from \p smps which have been written to \p f. + * @retval <0 Something went wrong. + */ +int io_format_fprint(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags); diff --git a/include/villas/kernel/rt.h b/include/villas/kernel/rt.h index a6408ff1d..44c22a0b5 100644 --- a/include/villas/kernel/rt.h +++ b/include/villas/kernel/rt.h @@ -1,5 +1,6 @@ /** Linux specific real-time optimizations * + * @see: https://wiki.linuxfoundation.org/realtime/documentation/howto/applications/application_base * @file * @author Steffen Vogel * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC @@ -31,6 +32,8 @@ int rt_set_affinity(int affinity); int rt_set_priority(int priority); +int rt_lock_memory(); + /** Checks for realtime (PREEMPT_RT) patched kernel. * * See https://rt.wiki.kernel.org diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index ba489f439..8977eaf21 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -31,48 +31,41 @@ #pragma once -#include "advio.h" +#include "io.h" #include "node.h" #include "periodic_task.h" #define FILE_MAX_PATHLEN 512 -enum { - FILE_READ, - FILE_WRITE -}; - struct file { - struct file_direction { - AFILE *handle; /**< libc: stdio file handle. */ + struct io io; /**< Format and file IO */ + struct io_format *format; - const char *mode; /**< libc: fopen() mode. */ - const char *format; /**< Format string for file name. */ - - char *uri; /**< Real file name. */ - } read, write; + char *uri_tmpl; /**< Format string for file name. */ + char *uri; /**< Real file name. */ + char *mode; /**< File access mode. */ int flush; /**< Flush / upload file contents after each write. */ + struct periodic_task timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ + double rate; /**< The read rate. */ - enum read_epoch_mode { + enum epoch_mode { FILE_EPOCH_DIRECT, FILE_EPOCH_WAIT, FILE_EPOCH_RELATIVE, FILE_EPOCH_ABSOLUTE, FILE_EPOCH_ORIGINAL - } read_epoch_mode; /**< Specifies how file::offset is calculated. */ - - struct timespec read_first; /**< The first timestamp in the file file::{read,write}::uri */ - struct timespec read_epoch; /**< The epoch timestamp from the configuration. */ - struct timespec read_offset; /**< An offset between the timestamp in the input file and the current time */ - + } epoch_mode; /**< Specifies how file::offset is calculated. */ + enum { FILE_EOF_EXIT, /**< Terminate when EOF is reached. */ FILE_EOF_REWIND, /**< Rewind the file when EOF is reached. */ FILE_EOF_WAIT /**< Blocking wait when EOF is reached. */ - } read_eof; /**< Should we rewind the file when we reach EOF? */ - int read_timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ - double read_rate; /**< The read rate. */ + } eof; + + struct timespec first; /**< The first timestamp in the file file::{read,write}::uri */ + struct timespec epoch; /**< The epoch timestamp from the configuration. */ + struct timespec offset; /**< An offset between the timestamp in the input file and the current time */ }; /** @see node_type::print */ diff --git a/include/villas/nodes/nanomsg.h b/include/villas/nodes/nanomsg.h index 0d3818c65..d4ba6070c 100644 --- a/include/villas/nodes/nanomsg.h +++ b/include/villas/nodes/nanomsg.h @@ -34,6 +34,12 @@ #include "node.h" #include "list.h" +/** The maximum length of a packet which contains stuct msg. */ +#define NANOMSG_MAX_PACKET_LEN 1500 + +/* Forward declarations */ +struct io_format; + struct nanomsg { struct { int socket; @@ -44,6 +50,8 @@ struct nanomsg { int socket; struct list endpoints; } subscriber; + + struct io_format *format; }; /** @see node_type::print */ diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index 769741155..b142c12ac 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -39,8 +39,22 @@ #include #endif +#ifdef WITH_LIBNL_ROUTE_30 + #include "kernel/if.h" + #include "kernel/nl.h" + #include "kernel/tc.h" + + #define WITH_NETEM +#endif /* WITH_LIBNL_ROUTE_30 */ + #include "node.h" +/* Forward declarations */ +struct io_format; + +/** The maximum length of a packet which contains stuct msg. */ +#define SOCKET_MAX_PACKET_LEN 1500 + enum socket_layer { SOCKET_LAYER_ETH, SOCKET_LAYER_IP, @@ -80,6 +94,8 @@ struct socket { union sockaddr_union local; /**< Local address of the socket */ union sockaddr_union remote; /**< Remote address of the socket */ + struct io_format *format; + /* Multicast options */ struct multicast { int enabled; /**< Is multicast enabled? */ @@ -88,8 +104,10 @@ struct socket { struct ip_mreq mreq; /**< A multicast group to join. */ } multicast; +#ifdef WITH_NETEM struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */ struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */ +#endif /* WITH_NETEM */ }; diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index 5316756ec..c8d6d95f7 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -57,24 +57,31 @@ struct websocket { /* Internal datastructures */ struct websocket_connection { - struct node *node; - struct lws *wsi; - - struct queue queue; /**< For samples which are sent to the WebSocket */ - - struct { - char name[64]; - char ip[64]; - } peer; - + enum state state; /**< The current status of this connection. */ + enum { WEBSOCKET_MODE_CLIENT, WEBSOCKET_MODE_SERVER, } mode; - enum state state; + struct lws *wsi; + struct node *node; + struct io_format *format; /**< The IO format used for this connection. */ + struct queue_signalled queue; /**< For samples which are sent to the WebSocket */ - char *buf; /**< A buffer which is used to construct the messages. */ + union { + /**< Only used in case websocket_connection::mode == WEBSOCKET_MODE_CLIENT */ + struct websocket_destination *destination; + + /**< Only used in case websocket_connection::mode == WEBSOCKET_MODE_SERVER */ + struct { + char name[64]; + char ip[64]; + } peer; + }; + + char *buf; /**< A buffer which is used to construct the messages. */ + size_t buflen; /**< Length of websocket_connection::buf. */ char *_name; }; diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index ca9d264c0..3de66acd1 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -30,19 +30,26 @@ #pragma once #include +#include -#include "node.h" #include "list.h" #if ZMQ_VERSION_MAJOR > 4 || (ZMQ_VERSION_MAJOR == 4 && ZMQ_VERSION_MINOR >= 2) #define ZMQ_BUILD_DISH 1 #endif +/* Forward declarations */ +struct io_format; +struct node; +struct sample; + struct zeromq { int ipv6; char *filter; + struct io_format *format; + struct { int enabled; struct { diff --git a/include/villas/plugin.h b/include/villas/plugin.h index 1531259ba..9b71b7e46 100644 --- a/include/villas/plugin.h +++ b/include/villas/plugin.h @@ -23,7 +23,7 @@ #pragma once -#include "io.h" +#include "io_format.h" #include "hook.h" #include "api.h" #include "common.h" @@ -54,7 +54,7 @@ enum plugin_type { PLUGIN_TYPE_HOOK, PLUGIN_TYPE_NODE, PLUGIN_TYPE_API, - PLUGIN_TYPE_FORMAT, + PLUGIN_TYPE_IO, PLUGIN_TYPE_FPGA_IP, PLUGIN_TYPE_MODEL_CBUILDER }; diff --git a/include/villas/sample.h b/include/villas/sample.h index 707058739..62bb67624 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -42,20 +42,22 @@ struct pool; #define SAMPLE_LEN(len) (sizeof(struct sample) + SAMPLE_DATA_LEN(len)) /** The length of a sample data portion of a sample datastructure with \p values values in bytes. */ -#define SAMPLE_DATA_LEN(len) ((len) * sizeof(float)) +#define SAMPLE_DATA_LEN(len) ((len) * sizeof(double)) /** The offset to the beginning of the data section. */ #define SAMPLE_DATA_OFFSET(smp) ((char *) (smp) + offsetof(struct sample, data)) enum sample_data_format { - SAMPLE_DATA_FORMAT_FLOAT= 0, - SAMPLE_DATA_FORMAT_INT = 1 + SAMPLE_DATA_FORMAT_FLOAT = 0, + SAMPLE_DATA_FORMAT_INT = 1 }; struct sample { int sequence; /**< The sequence number of this sample. */ int length; /**< The number of values in sample::values which are valid. */ int capacity; /**< The number of values in sample::values for which memory is reserved. */ + + int id; atomic_int refcnt; /**< Reference counter. */ off_t pool_off; /**< This sample belongs to this memory pool (relative pointer). */ @@ -68,12 +70,16 @@ struct sample { struct timespec sent; /**< The point in time when this data was send for the last time. */ } ts; - uint64_t format; /**< A long bitfield indicating the number representation of the first 64 values in sample::data[] */ + /** A long bitfield indicating the number representation of the first 64 values in sample::data[]. + * + * @see sample_data_format + */ + uint64_t format; /** The values. */ union { - float f; /**< Floating point values. */ - uint32_t i; /**< Integer values. */ + double f; /**< Floating point values. */ + uint64_t i; /**< Integer values. */ } data[]; /**< Data is in host endianess! */ }; diff --git a/lib/Makefile.inc b/lib/Makefile.inc index 56ea6be64..7c762a738 100644 --- a/lib/Makefile.inc +++ b/lib/Makefile.inc @@ -28,7 +28,7 @@ LIB_CFLAGS = $(CFLAGS) -fPIC -include lib/hooks/Makefile.inc -include lib/nodes/Makefile.inc --include lib/formats/Makefile.inc +-include lib/io/Makefile.inc -include $(patsubst %, lib/Makefile.%.inc, $(SONAMES)) diff --git a/lib/Makefile.villas.inc b/lib/Makefile.villas.inc index 92aed87c4..2d9a51d75 100644 --- a/lib/Makefile.villas.inc +++ b/lib/Makefile.villas.inc @@ -33,7 +33,7 @@ LIB_SRCS += $(addprefix lib/kernel/, kernel.c rt.c) \ utils.c super_node.c hist.c timing.c pool.c list.c queue.c \ queue_signalled.c memory.c advio.c plugin.c node_type.c stats.c \ mapping.c io.c shmem.c config_helper.c crypt.c compat.c \ - log_table.c log_helper.c \ + log_table.c log_helper.c io_format.c periodic_task.c \ ) LIB_LDFLAGS = -shared diff --git a/lib/advio.c b/lib/advio.c index 02b955eca..8c7205643 100644 --- a/lib/advio.c +++ b/lib/advio.c @@ -247,14 +247,19 @@ int afclose(AFILE *af) int ret; ret = afflush(af); - + if (ret) + return ret; + curl_easy_cleanup(af->curl); - fclose(af->file); + + ret = fclose(af->file); + if (ret) + return ret; free(af->uri); free(af); - return ret; + return 0; } int afseek(AFILE *af, long offset, int origin) diff --git a/lib/formats/hdf5.c b/lib/formats/hdf5.c deleted file mode 100644 index 48c866a04..000000000 --- a/lib/formats/hdf5.c +++ /dev/null @@ -1,90 +0,0 @@ -#include "hdf5_hl.h" - -#include - -/*------------------------------------------------------------------------- - * Packet Table Fixed-Length Example - * - * Example program that creates a packet table and performs - * writes and reads. - * - *------------------------------------------------------------------------- - */ - -int main(void) -{ - hid_t fid; /* File identifier */ - hid_t ptable; /* Packet table identifier */ - - herr_t err; /* Function return status */ - hsize_t count; /* Number of records in the table */ - - int x; /* Loop variable */ - - /* Buffers to hold data */ - int writeBuffer[5]; - int readBuffer[5]; - - /* Initialize buffers */ - for(x=0; x<5; x++) - { - writeBuffer[x]=x; - readBuffer[x] = -1; - } - - /* Create a file using default properties */ - fid=H5Fcreate("packet_table_FLexample.h5",H5F_ACC_TRUNC,H5P_DEFAULT,H5P_DEFAULT); - - /* Create a fixed-length packet table within the file */ - /* This table's "packets" will be simple integers. */ - ptable = H5PTcreate_fl(fid, "Packet Test Dataset", H5T_NATIVE_INT, 1, 1); - if(ptable == H5I_INVALID_HID) - goto out; - - /* Write one packet to the packet table */ - err = H5PTappend(ptable, 1, &(writeBuffer[0]) ); - if(err < 0) - goto out; - - /* Write several packets to the packet table */ - err = H5PTappend(ptable, 4, &(writeBuffer[1]) ); - if(err < 0) - goto out; - - /* Get the number of packets in the packet table. This should be five. */ - err = H5PTget_num_packets(ptable, &count); - if(err < 0) - goto out; - - printf("Number of packets in packet table after five appends: %llu\n", count); - - /* Initialize packet table's "current record" */ - err = H5PTcreate_index(ptable); - if(err < 0) - goto out; - - /* Iterate through packets, read each one back */ - for(x=0; x<5; x++) - { - err = H5PTget_next(ptable, 1, &(readBuffer[x]) ); - if(err < 0) - goto out; - - printf("Packet %d's value is %d\n", x, readBuffer[x]); - } - - /* Close the packet table */ - err = H5PTclose(ptable); - if(err < 0) - goto out; - - /* Close the file */ - H5Fclose(fid); - - return 0; - - out: /* An error has occurred. Clean up and exit. */ - H5PTclose(ptable); - H5Fclose(fid); - return -1; -} diff --git a/lib/hook.c b/lib/hook.c index 31efa38a0..6a8d0e848 100644 --- a/lib/hook.c +++ b/lib/hook.c @@ -24,7 +24,7 @@ #include "timing.h" #include "config.h" -#include "formats/msg.h" +#include "io/msg.h" #include "hook.h" #include "path.h" #include "utils.h" diff --git a/lib/io.c b/lib/io.c index 2d42b4e0d..564c51fe7 100644 --- a/lib/io.c +++ b/lib/io.c @@ -24,15 +24,17 @@ #include #include "io.h" +#include "io_format.h" #include "utils.h" #include "sample.h" +#include "plugin.h" int io_init(struct io *io, struct io_format *fmt, int flags) { io->_vt = fmt; io->_vd = alloc(fmt->size); - io->flags = flags; + io->flags = flags | io->_vt->flags; return io->_vt->init ? io->_vt->init(io) : 0; } @@ -50,41 +52,43 @@ int io_destroy(struct io *io) return 0; } -int io_stream_open(struct io *io, const char *uri, const char *mode) +int io_stream_open(struct io *io, const char *uri) { int ret; if (uri) { if (aislocal(uri)) { io->mode = IO_MODE_STDIO; - - io->stdio.input = - io->stdio.output = fopen(uri, mode); - + + io->stdio.output = fopen(uri, "a+"); if (io->stdio.output == NULL) return -1; - ret = setvbuf(io->stdio.output, NULL, _IOLBF, BUFSIZ); - if (ret) + io->stdio.input = fopen(uri, "r"); + if (io->stdio.input == NULL) return -1; } else { io->mode = IO_MODE_ADVIO; - io->advio.input = - io->advio.output = afopen(uri, mode); - + io->advio.output = afopen(uri, "a+"); if (io->advio.output == NULL) return -1; + + io->advio.input = afopen(uri, "r"); + if (io->advio.input == NULL) + return -1; } } else { io->mode = IO_MODE_STDIO; - io->flags |= IO_FLAG_FLUSH; + io->flags |= IO_FLUSH; io->stdio.input = stdin; io->stdio.output = stdout; - + } + + if (io->mode == IO_MODE_STDIO) { ret = setvbuf(io->stdio.input, NULL, _IOLBF, BUFSIZ); if (ret) return -1; @@ -99,11 +103,34 @@ int io_stream_open(struct io *io, const char *uri, const char *mode) int io_stream_close(struct io *io) { + int ret; + switch (io->mode) { case IO_MODE_ADVIO: - return afclose(io->advio.input); + ret = afclose(io->advio.input); + if (ret) + return ret; + + ret = afclose(io->advio.output); + if (ret) + return ret; + + return 0; + case IO_MODE_STDIO: - return io->stdio.input != stdin ? fclose(io->stdio.input) : 0; + if (io->stdio.input == stdin) + return 0; + + ret = fclose(io->stdio.input); + if (ret) + return ret; + + ret = fclose(io->stdio.output); + if (ret) + return ret; + + return 0; + case IO_MODE_CUSTOM: return 0; } @@ -150,11 +177,11 @@ void io_stream_rewind(struct io *io) } } -int io_open(struct io *io, const char *uri, const char *mode) +int io_open(struct io *io, const char *uri) { return io->_vt->open - ? io->_vt->open(io, uri, mode) - : io_stream_open(io, uri, mode); + ? io->_vt->open(io, uri) + : io_stream_open(io, uri); } int io_close(struct io *io) @@ -195,11 +222,26 @@ int io_print(struct io *io, struct sample *smps[], size_t cnt) FILE *f = io->mode == IO_MODE_ADVIO ? io->advio.output->file : io->stdio.output; + + //flockfile(f); - ret = io->_vt->fprint(f, smps, cnt, io->flags); + if (io->_vt->fprint) + ret = io->_vt->fprint(f, smps, cnt, io->flags); + else if (io->_vt->sprint) { + char buf[4096]; + size_t wbytes; + + ret = io->_vt->sprint(buf, sizeof(buf), &wbytes, smps, cnt, io->flags); + + fwrite(buf, wbytes, 1, f); + } + else + ret = -1; + + //funlockfile(f); } - if (io->flags & IO_FLAG_FLUSH) + if (io->flags & IO_FLUSH) io_flush(io); return ret; @@ -207,13 +249,45 @@ int io_print(struct io *io, struct sample *smps[], size_t cnt) int io_scan(struct io *io, struct sample *smps[], size_t cnt) { + int ret; + if (io->_vt->scan) - return io->_vt->scan(io, smps, cnt); + ret = io->_vt->scan(io, smps, cnt); else { FILE *f = io->mode == IO_MODE_ADVIO ? io->advio.input->file : io->stdio.input; + + //flockfile(f); + + int flags = io->flags; - return io->_vt->fscan(f, smps, cnt, NULL); + if (io->_vt->fscan) + return io->_vt->fscan(f, smps, cnt, &flags); + else if (io->_vt->sscan) { + size_t bytes, rbytes; + char buf[4096]; + + bytes = fread(buf, 1, sizeof(buf), f); + + ret = io->_vt->sscan(buf, bytes, &rbytes, smps, cnt, &flags); + } + else + ret = -1; + + //funlockfile(f); } + + return ret; +} + +struct io_format * io_format_lookup(const char *name) +{ + struct plugin *p; + + p = plugin_lookup(PLUGIN_TYPE_IO, name); + if (!p) + return NULL; + + return &p->io; } diff --git a/lib/formats/Makefile.inc b/lib/io/Makefile.inc similarity index 94% rename from lib/formats/Makefile.inc rename to lib/io/Makefile.inc index 12022e536..d75555287 100644 --- a/lib/formats/Makefile.inc +++ b/lib/io/Makefile.inc @@ -20,7 +20,7 @@ # along with this program. If not, see . ################################################################################### -LIB_SRCS += $(addprefix lib/formats/,json.c villas.c csv.c) +LIB_SRCS += $(addprefix lib/io/,json.c villas.c csv.c raw.c msg.c) WITH_HDF5 ?= 0 diff --git a/lib/formats/csv.c b/lib/io/csv.c similarity index 73% rename from lib/formats/csv.c rename to lib/io/csv.c index eba524b60..26d556d5e 100644 --- a/lib/formats/csv.c +++ b/lib/io/csv.c @@ -21,13 +21,14 @@ *********************************************************************************/ #include +#include -#include "formats/csv.h" +#include "io/csv.h" #include "plugin.h" #include "sample.h" #include "timing.h" -int io_format_csv_fprint_single(FILE *f, struct sample *s, int flags) +int csv_fprint_single(FILE *f, struct sample *s, int flags) { fprintf(f, "%ld %09ld %d", s->ts.origin.tv_sec, s->ts.origin.tv_nsec, s->sequence); @@ -37,7 +38,7 @@ int io_format_csv_fprint_single(FILE *f, struct sample *s, int flags) fprintf(f, "%c%.6f", CSV_SEPARATOR, s->data[i].f); break; case SAMPLE_DATA_FORMAT_INT: - fprintf(f, "%c%d", CSV_SEPARATOR, s->data[i].i); + fprintf(f, "%c%" PRId64, CSV_SEPARATOR, s->data[i].i); break; } } @@ -47,7 +48,7 @@ int io_format_csv_fprint_single(FILE *f, struct sample *s, int flags) return 0; } -size_t io_format_csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flags) +size_t csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flags) { int ret, off; @@ -59,10 +60,10 @@ size_t io_format_csv_sscan_single(const char *buf, size_t len, struct sample *s, for (i = 0; i < s->capacity; i++) { switch (s->format & (1 << i)) { case SAMPLE_DATA_FORMAT_FLOAT: - ret = sscanf(buf + off, "%f %n", &s->data[i].f, &off); + ret = sscanf(buf + off, "%lf %n", &s->data[i].f, &off); break; case SAMPLE_DATA_FORMAT_INT: - ret = sscanf(buf + off, "%d %n", &s->data[i].i, &off); + ret = sscanf(buf + off, "%" PRId64 " %n", &s->data[i].i, &off); break; } @@ -76,7 +77,7 @@ size_t io_format_csv_sscan_single(const char *buf, size_t len, struct sample *s, return 0; } -int io_format_csv_fscan_single(FILE *f, struct sample *s, int *flags) +int csv_fscan_single(FILE *f, struct sample *s, int *flags) { char *ptr, line[4096]; @@ -88,14 +89,14 @@ skip: if (fgets(line, sizeof(line), f) == NULL) if (*ptr == '\0' || *ptr == '#') goto skip; - return io_format_csv_sscan_single(line, strlen(line), s, flags); + return csv_sscan_single(line, strlen(line), s, flags); } -int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags) +int csv_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags) { int ret, i; for (i = 0; i < cnt; i++) { - ret = io_format_csv_fprint_single(f, smps[i], flags); + ret = csv_fprint_single(f, smps[i], flags); if (ret < 0) break; } @@ -103,11 +104,11 @@ int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags) return i; } -int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags) +int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags) { int ret, i; for (i = 0; i < cnt; i++) { - ret = io_format_csv_fscan_single(f, smps[i], flags); + ret = csv_fscan_single(f, smps[i], flags); if (ret < 0) { warn("Failed to read CSV line: %d", ret); break; @@ -120,10 +121,10 @@ int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags) static struct plugin p = { .name = "csv", .description = "Tabulator-separated values", - .type = PLUGIN_TYPE_FORMAT, + .type = PLUGIN_TYPE_IO, .io = { - .fprint = io_format_csv_fprint, - .fscan = io_format_csv_fscan, + .fprint = csv_fprint, + .fscan = csv_fscan, .size = 0 } }; diff --git a/lib/io/h5pt.c b/lib/io/h5pt.c new file mode 100644 index 000000000..8a5dcb981 --- /dev/null +++ b/lib/io/h5pt.c @@ -0,0 +1,170 @@ +/** HDF5 Packet Table IO format based-on the H5PT high-level API. + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include + +#include "plugin.h" +#include "io.h" + +enum h5pt_flags { + FORMAT_H5PT_COMPRESS = (1 << 16) +}; + +struct h5pt { + hsize_t index; + hsize_t num_packets; + + hid_t fid; /**< File identifier. */ + hid_t ptable; /**< Packet table identifier. */ +} + +hid_t h5pt_create_datatype(int values) +{ + hid_t dt = H5Tcreate(H5T_COMPOUND, sizeof()); +} + +int h5pt_open(struct io *io, const char *uri, const char *mode) +{ + struct h5pt *h5 = io->_vd; + + herr_t err; + + /* Create a file using default properties */ + h5->fid = H5Fcreate(uri, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT); + + /* Create a fixed-length packet table within the file */ + /* This table's "packets" will be simple integers. */ + h5->ptable = H5PTcreate_fl(fid, "Packet Test Dataset", H5T_NATIVE_INT, 1, 1); + if (h5->ptable == H5I_INVALID_HID) + return -1; + + err = H5PTis_valid(h5->ptable); + if (err < 0) + return err; + + err = H5PTget_num_packets(h5->ptable, h5->num_packets); + if (err < 0) + return err; + + io_format_h5pt_rewind(io); + + return 0; +} + +int h5pt_close(struct io *io) +{ + struct h5pt *h5 = io->_vd; + + herr_t err; + + /* Close dataset */ + err = H5PTclose(h5->ptable); + if (err < 0) + goto out; + + /* Close file */ +out: err = H5Fclose(h5->fid); + if (err < 0) + return err; + + return 0; +} + +int h5pt_rewind(struct io *io) +{ + struct h5pt *h5 = io->_vd; + + herr_t err; + + err = H5PTcreate_index(h5->ptable); + if (err < 0) + return err; + + h5->index = 0; + + return 0; +} + +int h5pt_eof(struct io *io) +{ + struct h5pt *h5 = io->_vd; + + return h5->index >= h5->num_packets; +} + +int h5pt_flush(struct io *io) +{ + struct h5pt *h5 = io->_vd; + + herr_t err; + + err = H5Fflush(h5->fid, H5F_SCOPE_GLOBAL); + if (err < 0) + return err; + + return 0; +} + +int h5pt_print(struct io *io, struct sample *smps[], size_t cnt) +{ + struct h5pt *h5 = io->_vd; + + herr_t err; + + /* Write one packet to the packet table */ + err = H5PTappend(h5->ptable, smps, cnt); + if (err < 0) + goto out; + + return 0; +} + +int h5pt_scan(struct io *io, struct sample *smps[], size_t cnt) +{ + struct h5pt *h5 = io->_vd; + + herr_t err; + + H5PTget_next(h5->ptable, cnt, smps) + + return 0; +} + +static struct plugin p = { + .name = "hdf5", + .description = "HDF5 Packet Table", + .type = PLUGIN_TYPE_FORMAT, + .format = { + .open = h5pt_open, + .close = h5pt_close, + .rewind = h5pt_rewind, + .eof = h5pt_eof, + .flush = h5pt_flush, + .print = h5pt_print, + .scan = h5pt_scan, + + .flags = IO_FORMAT_BINARY, + .size = sizeof(struct h5pt) + } +}; diff --git a/lib/formats/json.c b/lib/io/json.c similarity index 79% rename from lib/formats/json.c rename to lib/io/json.c index 00c79d19b..8e4bce22e 100644 --- a/lib/formats/json.c +++ b/lib/io/json.c @@ -21,9 +21,9 @@ *********************************************************************************/ #include "plugin.h" -#include "formats/json.h" +#include "io/json.h" -int io_format_json_pack(json_t **j, struct sample *s, int flags) +int json_pack_sample(json_t **j, struct sample *s, int flags) { json_error_t err; json_t *json_data = json_array(); @@ -50,7 +50,7 @@ int io_format_json_pack(json_t **j, struct sample *s, int flags) return 0; } -int io_format_json_unpack(json_t *j, struct sample *s, int *flags) +int json_unpack_sample(json_t *j, struct sample *s, int *flags) { int ret, i; json_t *json_data, *json_value; @@ -90,14 +90,14 @@ int io_format_json_unpack(json_t *j, struct sample *s, int *flags) return 0; } -size_t io_format_json_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags) +int json_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags) { int i, ret; json_t *json; size_t wr, off = 0; for (i = 0; i < cnt; i++) { - ret = io_format_json_pack(&json, smps[i], flags); + ret = json_pack_sample(&json, smps[i], flags); if (ret) return ret; @@ -114,7 +114,7 @@ size_t io_format_json_sprint(char *buf, size_t len, struct sample *smps[], size_ return i; } -size_t io_format_json_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags) +int json_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) { int i, ret; json_t *json; @@ -128,7 +128,7 @@ size_t io_format_json_sscan(char *buf, size_t len, struct sample *smps[], size_t off += err.position; - ret = io_format_json_unpack(json, smps[i], flags); + ret = json_unpack_sample(json, smps[i], flags); json_decref(json); @@ -139,13 +139,13 @@ size_t io_format_json_sscan(char *buf, size_t len, struct sample *smps[], size_t return i; } -int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags) +int json_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags) { int ret, i; json_t *json; for (i = 0; i < cnt; i++) { - ret = io_format_json_pack(&json, smps[i], flags); + ret = json_pack_sample(&json, smps[i], flags); if (ret) return ret; @@ -158,7 +158,7 @@ int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags) return i; } -int io_format_json_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags) +int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags) { int i, ret; json_t *json; @@ -169,7 +169,7 @@ skip: json = json_loadf(f, JSON_DISABLE_EOF_CHECK, &err); if (!json) break; - ret = io_format_json_unpack(json, smps[i], flags); + ret = json_unpack_sample(json, smps[i], flags); if (ret) goto skip; @@ -182,12 +182,12 @@ skip: json = json_loadf(f, JSON_DISABLE_EOF_CHECK, &err); static struct plugin p = { .name = "json", .description = "Javascript Object Notation", - .type = PLUGIN_TYPE_FORMAT, + .type = PLUGIN_TYPE_IO, .io = { - .fscan = io_format_json_fscan, - .fprint = io_format_json_fprint, - .sscan = io_format_json_sscan, - .sprint = io_format_json_sprint, + .fscan = json_fscan, + .fprint = json_fprint, + .sscan = json_sscan, + .sprint = json_sprint, .size = 0 }, }; diff --git a/lib/formats/msg.c b/lib/io/msg.c similarity index 59% rename from lib/formats/msg.c rename to lib/io/msg.c index d61868a36..61f6f010c 100644 --- a/lib/formats/msg.c +++ b/lib/io/msg.c @@ -20,11 +20,10 @@ * along with this program. If not, see . *********************************************************************************/ -#include #include -#include "formats/msg.h" -#include "formats/msg_format.h" +#include "io/msg.h" +#include "io/msg_format.h" #include "sample.h" #include "utils.h" #include "plugin.h" @@ -85,11 +84,18 @@ int msg_to_sample(struct msg *msg, struct sample *smp) smp->length = MIN(msg->length, smp->capacity); smp->sequence = msg->sequence; + smp->id = msg->id; smp->ts.origin = MSG_TS(msg); smp->ts.received.tv_sec = -1; smp->ts.received.tv_nsec = -1; + smp->format = 0; - memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(smp->length)); + for (int i = 0; i < smp->length; i++) { + switch (sample_get_data_format(smp, i)) { + case SAMPLE_DATA_FORMAT_FLOAT: smp->data[i].f = msg->data[i].f; break; + case SAMPLE_DATA_FORMAT_INT: smp->data[i].i = msg->data[i].i; break; + } + } return 0; } @@ -100,54 +106,71 @@ int msg_from_sample(struct msg *msg, struct sample *smp) msg->ts.sec = smp->ts.origin.tv_sec; msg->ts.nsec = smp->ts.origin.tv_nsec; + msg->id = smp->id; - memcpy(msg->data, smp->data, MSG_DATA_LEN(smp->length)); + for (int i = 0; i < smp->length; i++) { + switch (sample_get_data_format(smp, i)) { + case SAMPLE_DATA_FORMAT_FLOAT: msg->data[i].f = smp->data[i].f; break; + case SAMPLE_DATA_FORMAT_INT: msg->data[i].i = smp->data[i].i; break; + } + } msg_hton(msg); return 0; } -ssize_t msg_buffer_from_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len) +int msg_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags) { int ret, i = 0; char *ptr = buf; - struct msg *msg = (struct msg *) ptr; - struct sample *smp = smps[i]; + for (i = 0; i < cnt; i++) { + if (ptr + MSG_LEN(smps[i]->length) > buf + len) + break; - while (ptr < buf + len && i < cnt) { - ret = msg_from_sample(msg, smp); + ret = msg_from_sample((struct msg *) ptr, smps[i]); if (ret) return ret; - ptr += MSG_LEN(smp->length); - - msg = (struct msg *) ptr; - smp = smps[++i]; + ptr += MSG_LEN(smps[i]->length); } + + if (wbytes) + *wbytes = ptr - buf; - return ptr - buf; + return i; } -int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len) +int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) { int ret, i = 0; char *ptr = buf; - struct msg *msg = (struct msg *) ptr; - struct sample *smp = smps[i]; - - while (ptr < buf + len && i < cnt) { - ret = msg_to_sample(msg, smp); + for (i = 0; i < cnt; i++) { + struct msg *msg = (struct msg *) ptr; + + /* Check if length field is still in buffer bounaries */ + if ((char *) &msg->length + sizeof(msg->length) > buf + len) { + warn("Invalid msg received: reason=1"); + break; + } + + /* Check if remainder of message is in buffer boundaries */ + if (ptr + MSG_LEN(ntohs(msg->length)) > buf + len) { + warn("Invalid msg received: reason=2, msglen=%zu, len=%zu, ptr=%p, buf+%p, i=%u", MSG_LEN(ntohs(msg->length)), len, ptr, buf, i); + break; + } + + ret = msg_to_sample((struct msg *) ptr, smps[i]); if (ret) return ret; - ptr += MSG_LEN(smp->length); - - msg = (struct msg *) ptr; - smp = smps[++i]; + ptr += MSG_LEN(smps[i]->length); } + + if (rbytes) + *rbytes = ptr - buf; return i; } @@ -155,9 +178,12 @@ int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t static struct plugin p = { .name = "msg", .description = "VILLAS binary network format", - .type = PLUGIN_TYPE_FORMAT, + .type = PLUGIN_TYPE_IO, .io = { - .size = 0 + .sprint = msg_sprint, + .sscan = msg_sscan, + .size = 0, + .flags = IO_FORMAT_BINARY }, }; diff --git a/lib/io/raw.c b/lib/io/raw.c new file mode 100644 index 000000000..e3ef875da --- /dev/null +++ b/lib/io/raw.c @@ -0,0 +1,241 @@ +/** RAW IO format + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include "sample.h" +#include "plugin.h" +#include "utils.h" +#include "compat.h" +#include "io/raw.h" + +/** Convert float to host byte order */ +#define SWAP_FLT_TOH(o, n) ({ \ + union { float f; uint32_t i; } x = { .f = n }; \ + x.i = (o) ? be32toh(x.i) : le32toh(x.i); x.f; \ +}) + +/** Convert double to host byte order */ +#define SWAP_DBL_TOH(o, n) ({ \ + union { float f; uint64_t i; } x = { .f = n }; \ + x.i = (o) ? be64toh(x.i) : le64toh(x.i); x.f; \ +}) + +/** Convert float to big/little endian byte order */ +#define SWAP_FLT_TOE(o, n) ({ \ + union { float f; uint32_t i; } x = { .f = n }; \ + x.i = (o) ? htobe32(x.i) : htole32(x.i); x.f; \ +}) + +/** Convert double to big/little endian byte order */ +#define SWAP_DBL_TOE(o, n) ({ \ + union { double f; uint64_t i; } x = { .f = n }; \ + x.i = (o) ? htobe64(x.i) : htole64(x.i); x.f; \ +}) + +/** Convert integer of varying width to host byte order */ +#define SWAP_INT_TOH(o, b, n) (o ? be ## b ## toh(n) : le ## b ## toh(n)) + +/** Convert integer of varying width to big/little endian byte order */ +#define SWAP_INT_TOE(o, b, n) (o ? htobe ## b (n) : htole ## b (n)) + +int raw_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags) +{ + + int i, o = 0; + size_t nlen; + + int8_t *i8 = (void *) buf; + int16_t *i16 = (void *) buf; + int32_t *i32 = (void *) buf; + int64_t *i64 = (void *) buf; + float *f32 = (void *) buf; + double *f64 = (void *) buf; + + int bits = 1 << (flags >> 24); + + for (i = 0; i < cnt; i++) { + nlen = (smps[i]->length + o + (flags & RAW_FAKE) ? 3 : 0) * (bits / 8); + if (nlen >= len) + break; + + /* First three values are sequence, seconds and nano-seconds timestamps */ + if (flags & RAW_FAKE) { + switch (bits) { + case 32: + i32[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 32, smps[i]->sequence); + i32[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 32, smps[i]->ts.origin.tv_sec); + i32[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 32, smps[i]->ts.origin.tv_nsec); + break; + case 64: + i64[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 64, smps[i]->sequence); + i64[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 64, smps[i]->ts.origin.tv_sec); + i64[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 64, smps[i]->ts.origin.tv_nsec); + break; + } + } + + for (int j = 0; j < smps[i]->length; j++) { + enum { INT, FLT } fmt; + + if (flags & RAW_AUTO) + fmt = smps[i]->format & (1 << i) ? INT : FLT; + else if (flags & RAW_FLT) + fmt = FLT; + else + fmt = INT; + + switch (fmt) { + case FLT: + switch (bits) { + case 32: f32[o++] = SWAP_FLT_TOE(flags & RAW_BE_FLT, smps[i]->data[j].f); break; + case 64: f64[o++] = SWAP_DBL_TOE(flags & RAW_BE_FLT, smps[i]->data[j].f); break; + } + break; + + case INT: + switch (bits) { + case 8: i8 [o++] = smps[i]->data[j].i; break; + case 16: i16[o++] = SWAP_INT_TOE(flags & RAW_BE_INT, 16, smps[i]->data[j].i); break; + case 32: i32[o++] = SWAP_INT_TOE(flags & RAW_BE_INT, 32, smps[i]->data[j].i); break; + case 64: i64[o++] = SWAP_INT_TOE(flags & RAW_BE_INT, 64, smps[i]->data[j].i); break; + } + break; + } + } + } + + if (wbytes) + *wbytes = o * (bits / 8); + + return i; +} + +int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) +{ + /* The raw format can not encode multiple samples in one buffer + * as there is no support for framing. */ + struct sample *smp = smps[0]; + + int8_t *i8 = (void *) buf; + int16_t *i16 = (void *) buf; + int32_t *i32 = (void *) buf; + int64_t *i64 = (void *) buf; + float *f32 = (void *) buf; + double *f64 = (void *) buf; + + int off, bits = 1 << (*flags >> 24); + + smp->length = len / (bits / 8); + + if (*flags & RAW_FAKE) { + off = 3; + + if (smp->length < off) { +// warn("Node %s received a packet with no fake header. Skipping...", node_name(n)); + return 0; + } + + smp->length -= off; + + switch (bits) { + case 32: + smp->sequence = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[0]); + smp->ts.origin.tv_sec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[1]); + smp->ts.origin.tv_nsec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[2]); + break; + + case 64: + smp->sequence = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[0]); + smp->ts.origin.tv_sec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[1]); + smp->ts.origin.tv_nsec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[2]); + break; + } + } + else { + off = 0; + + smp->sequence = 0; + smp->ts.origin.tv_sec = 0; + smp->ts.origin.tv_nsec = 0; + } + + if (smp->length > smp->capacity) { + warn("Received more values than supported: length=%u, capacity=%u", smp->length, smp->capacity); + smp->length = smp->capacity; + } + + for (int i = 0; i < smp->length; i++) { + int fmt = *flags & RAW_FLT ? SAMPLE_DATA_FORMAT_FLOAT + : SAMPLE_DATA_FORMAT_INT; + + sample_set_data_format(smp, i, fmt); + + switch (fmt) { + case SAMPLE_DATA_FORMAT_FLOAT: + switch (bits) { + case 32: smp->data[i].f = SWAP_FLT_TOH(*flags & RAW_BE_FLT, f32[i+off]); break; + case 64: smp->data[i].f = SWAP_DBL_TOH(*flags & RAW_BE_FLT, f64[i+off]); break; + } + break; + + case SAMPLE_DATA_FORMAT_INT: + switch (bits) { + case 8: smp->data[i].i = i8[i]; break; + case 16: smp->data[i].i = (int16_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 16, i16[i+off]); break; + case 32: smp->data[i].i = (int32_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 32, i32[i+off]); break; + case 64: smp->data[i].i = (int64_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 64, i64[i+off]); break; + } + break; + } + } + + smp->ts.received.tv_sec = 0; + smp->ts.received.tv_nsec = 0; + + return 1; +} + +#define REGISTER_FORMAT_RAW(i, n, f) \ +static struct plugin i = { \ + .name = n, \ + .description = "RAW binary data", \ + .type = PLUGIN_TYPE_IO, \ + .io = { \ + .flags = f | IO_FORMAT_BINARY, \ + .sprint = raw_sprint, \ + .sscan = raw_sscan \ + } \ +}; \ +REGISTER_PLUGIN(& i); + +/* Feel free to add additional format identifiers here to suit your needs */ +REGISTER_FORMAT_RAW(p, "raw", 0) +REGISTER_FORMAT_RAW(p_f32, "raw-flt32", RAW_32 | RAW_FLT) +REGISTER_FORMAT_RAW(p_f64, "raw-flt64", RAW_64 | RAW_FLT) +REGISTER_FORMAT_RAW(p_i8, "raw-int8", RAW_8) +REGISTER_FORMAT_RAW(p_i16be, "raw-int16-be", RAW_16 | RAW_BE) +REGISTER_FORMAT_RAW(p_i16le, "raw-int16-le", RAW_16) +REGISTER_FORMAT_RAW(p_i32be, "raw-int32-be", RAW_32 | RAW_BE) +REGISTER_FORMAT_RAW(p_i32le, "raw-int32-le", RAW_32) +REGISTER_FORMAT_RAW(p_i64be, "raw-int64-be", RAW_64 | RAW_BE) +REGISTER_FORMAT_RAW(p_i64le, "raw-int64-le", RAW_64) +REGISTER_FORMAT_RAW(p_gtnet, "gtnet", RAW_32 | RAW_FLT | RAW_BE) +REGISTER_FORMAT_RAW(p_gtnef, "gtnet-fake", RAW_32 | RAW_FLT | RAW_BE | RAW_FAKE) diff --git a/lib/formats/villas.c b/lib/io/villas.c similarity index 72% rename from lib/formats/villas.c rename to lib/io/villas.c index f49d28a72..938ae14a7 100644 --- a/lib/formats/villas.c +++ b/lib/io/villas.c @@ -21,6 +21,7 @@ *********************************************************************************/ #include +#include #include #include "io.h" @@ -28,9 +29,9 @@ #include "utils.h" #include "timing.h" #include "sample.h" -#include "formats/villas.h" +#include "io/villas.h" -size_t io_format_villas_sprint_single(char *buf, size_t len, struct sample *s, int flags) +size_t villas_sprint_single(char *buf, size_t len, struct sample *s, int flags) { size_t off = snprintf(buf, len, "%llu", (unsigned long long) s->ts.origin.tv_sec); @@ -47,10 +48,10 @@ size_t io_format_villas_sprint_single(char *buf, size_t len, struct sample *s, i for (int i = 0; i < s->length; i++) { switch ((s->format >> i) & 0x1) { case SAMPLE_DATA_FORMAT_FLOAT: - off += snprintf(buf + off, len - off, "\t%.6f", s->data[i].f); + off += snprintf(buf + off, len - off, "\t%.6lf", s->data[i].f); break; case SAMPLE_DATA_FORMAT_INT: - off += snprintf(buf + off, len - off, "\t%d", s->data[i].i); + off += snprintf(buf + off, len - off, "\t%" PRIi64, s->data[i].i); break; } } @@ -61,7 +62,7 @@ size_t io_format_villas_sprint_single(char *buf, size_t len, struct sample *s, i return off; } -size_t io_format_villas_sscan_single(const char *buf, size_t len, struct sample *s, int *flags) +size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *flags) { char *end; const char *ptr = buf; @@ -151,27 +152,35 @@ size_t io_format_villas_sscan_single(const char *buf, size_t len, struct sample return end - buf; } -size_t io_format_villas_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags) +int villas_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags) { + int i; size_t off = 0; - for (int i = 0; i < cnt && off < len; i++) - off += io_format_villas_sprint_single(buf + off, len - off, smps[i], flags); + for (i = 0; i < cnt && off < len; i++) + off += villas_sprint_single(buf + off, len - off, smps[i], flags); + + if (wbytes) + *wbytes = off; - return off; + return i; } -size_t io_format_villas_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags) +int villas_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) { + int i; size_t off = 0; - for (int i = 0; i < cnt && off < len; i++) - off += io_format_villas_sscan_single(buf + off, len - off, smps[i], flags); + for (i = 0; i < cnt && off < len; i++) + off += villas_sscan_single(buf + off, len - off, smps[i], flags); + + if (rbytes) + *rbytes = off; - return off; + return i; } -int io_format_villas_fscan_single(FILE *f, struct sample *s, int *flags) +int villas_fscan_single(FILE *f, struct sample *s, int *flags) { char *ptr, line[4096]; @@ -183,31 +192,29 @@ skip: if (fgets(line, sizeof(line), f) == NULL) if (*ptr == '\0' || *ptr == '#') goto skip; - return io_format_villas_sscan_single(line, strlen(line), s, flags); + return villas_sscan_single(line, strlen(line), s, flags); } -int io_format_villas_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags) +int villas_fprint_single(FILE *f, struct sample *s, int flags) { + int ret; char line[4096]; - int ret, i; - for (i = 0; i < cnt; i++) { - ret = io_format_villas_sprint_single(line, sizeof(line), smps[i], flags); - if (ret < 0) - break; + ret = villas_sprint_single(line, sizeof(line), s, flags); + if (ret < 0) + return ret; + + fputs(line, f); - fputs(line, f); - } - - return i; + return 0; } -int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags) +int villas_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags) { int ret, i; for (i = 0; i < cnt; i++) { - ret = io_format_villas_fscan_single(f, smps[i], flags); + ret = villas_fprint_single(f, smps[i], flags); if (ret < 0) return ret; } @@ -215,11 +222,24 @@ int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flag return i; } -int io_format_villas_open(struct io *io, const char *uri, const char *mode) +int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags) +{ + int ret, i; + + for (i = 0; i < cnt; i++) { + ret = villas_fscan_single(f, smps[i], flags); + if (ret < 0) + return ret; + } + + return i; +} + +int villas_open(struct io *io, const char *uri) { int ret; - ret = io_stream_open(io, uri, mode); + ret = io_stream_open(io, uri); if (ret) return ret; @@ -229,7 +249,7 @@ int io_format_villas_open(struct io *io, const char *uri, const char *mode) fprintf(f, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]"); - if (io->flags & IO_FLAG_FLUSH) + if (io->flags & IO_FLUSH) io_flush(io); return 0; @@ -238,14 +258,14 @@ int io_format_villas_open(struct io *io, const char *uri, const char *mode) static struct plugin p = { .name = "villas", .description = "VILLAS human readable format", - .type = PLUGIN_TYPE_FORMAT, + .type = PLUGIN_TYPE_IO, .io = { - .open = io_format_villas_open, - .fprint = io_format_villas_fprint, - .fscan = io_format_villas_fscan, - .sprint = io_format_villas_sprint, - .sscan = io_format_villas_sscan, - .size = 0 + .open = villas_open, + .fprint = villas_fprint, + .fscan = villas_fscan, + .sprint = villas_sprint, + .sscan = villas_sscan, + .size = 0 } }; diff --git a/lib/formats/webmsg.c b/lib/io/webmsg.c similarity index 53% rename from lib/formats/webmsg.c rename to lib/io/webmsg.c index 1868d7b91..34d74a08b 100644 --- a/lib/formats/webmsg.c +++ b/lib/io/webmsg.c @@ -26,6 +26,7 @@ #include #endif +#include "sample.h" #include "plugin.h" #include "formats/webmsg.h" #include "formats/webmsg_format.h" @@ -74,11 +75,106 @@ int webmsg_verify(struct webmsg *m) return 0; } +int webmsg_to_sample(struct webmsg *msg, struct sample *smp) +{ + int ret; + + webmsg_ntoh(msg); + + ret = webmsg_verify(msg); + if (ret) + return -1; + + smp->length = MIN(msg->length, smp->capacity); + smp->sequence = msg->sequence; + smp->id = msg->id; + smp->ts.origin = WEBMSG_TS(msg); + smp->ts.received.tv_sec = -1; + smp->ts.received.tv_nsec = -1; + + memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(smp->length)); + + return 0; +} + +int webmsg_from_sample(struct webmsg *msg, struct sample *smp) +{ + *msg = WEBMSG_INIT(smp->length, smp->sequence); + + msg->id = smp->id; + msg->ts.sec = smp->ts.origin.tv_sec; + msg->ts.nsec = smp->ts.origin.tv_nsec; + + memcpy(msg->data, smp->data, WEBMSG_DATA_LEN(smp->length)); + + msg_hton(msg); + + return 0; +} + +size_t io_format_webmsg_length(struct sample *smps[], size_t cnt) +{ + size_t sz = 0; + + for (int i = 0; i < cnt; i++) + sz += WEBMSG_LEN(smps[i]->length); + + return sz; +} + +size_t io_format_webmsg_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags) +{ + int ret, i = 0; + char *ptr = buf; + + struct webmsg *msg = (struct webmsg *) ptr; + struct sample *smp = smps[i]; + + while (ptr < buf + len && i < cnt) { + ret = webmsg_from_sample(msg, smp); + if (ret) + return ret; + + ptr += WEBMSG_LEN(smp->length); + + msg = (struct webmsg *) ptr; + smp = smps[++i]; + } + + return ptr - buf; +} + +size_t io_format_webmsg_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags) +{ + int ret, i = 0; + char *ptr = buf; + + struct webmsg *msg = (struct webmsg *) ptr; + struct sample *smp = smps[i]; + + while (ptr < buf + len && i < cnt) { + ret = webmsg_to_sample(msg, smp); + if (ret) + return ret; + + ptr += WEBMSG_LEN(smp->length); + + msg = (struct webmsg *) ptr; + smp = smps[++i]; + } + + return i; +} + static struct plugin p = { .name = "webmsg", .description = "VILLAS binary format for websockets", .type = PLUGIN_TYPE_FORMAT, - .io = { + .format = { + .length = io_format_webmsg_length, + .sprint = io_format_webmsg_sprint, + .sscan = io_format_webmsg_sscan, + .flags = IO_FORMAT_BINARY, .size = 0 }, }; diff --git a/lib/io_format.c b/lib/io_format.c new file mode 100644 index 000000000..5c9dc170c --- /dev/null +++ b/lib/io_format.c @@ -0,0 +1,46 @@ +/** Read / write sample data in different formats. + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include + +#include "io_format.h" + +int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags) +{ + return fmt->sscan ? fmt->sscan(buf, len, rbytes, smps, cnt, flags) : -1; +} + +int io_format_sprint(struct io_format *fmt, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags) +{ + return fmt->sprint ? fmt->sprint(buf, len, wbytes, smps, cnt, flags) : -1; +} + +int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int *flags) +{ + return fmt->sprint ? fmt->fscan(f, smps, cnt, flags) : -1; +} + +int io_format_fprint(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags) +{ + return fmt->fprint ? fmt->fprint(f, smps, cnt, flags) : -1; +} \ No newline at end of file diff --git a/lib/nodes/Makefile.inc b/lib/nodes/Makefile.inc index 12916cd1f..8f4a6984a 100644 --- a/lib/nodes/Makefile.inc +++ b/lib/nodes/Makefile.inc @@ -24,9 +24,9 @@ LIB_SRCS += $(addprefix lib/nodes/, cbuilder.c loopback.c) ifeq ($(PLATFORM),Linux) WITH_FPGA ?= 1 - WITH_TEST_RTT ?= 1 endif +WITH_TEST_RTT ?= 0 WITH_FILE ?= 1 WITH_SIGNAL ?= 1 WITH_NGSI ?= 1 @@ -91,7 +91,6 @@ endif # Enable Socket node type when libnl3 is available ifeq ($(WITH_SOCKET),1) LIB_SRCS += lib/nodes/socket.c - LIB_SRCS += lib/formats/msg.c LIB_CFLAGS += -DWITH_SOCKET # libnl3 is optional but required for network emulation and IRQ pinning @@ -136,7 +135,6 @@ endif ifeq ($(WITH_WEBSOCKET),1) ifeq ($(shell $(PKGCONFIG) libwebsockets jansson; echo $$?),0) LIB_SRCS += lib/nodes/websocket.c - LIB_SRCS += lib/formats/webmsg.c LIB_PKGS += libwebsockets jansson LIB_CFLAGS += -DWITH_WEBSOCKET endif diff --git a/lib/nodes/cbuilder.c b/lib/nodes/cbuilder.c index 712dfded5..759851d28 100644 --- a/lib/nodes/cbuilder.c +++ b/lib/nodes/cbuilder.c @@ -94,8 +94,15 @@ int cbuilder_read(struct node *n, struct sample *smps[], unsigned cnt) pthread_mutex_lock(&cb->mtx); while (cb->read >= cb->step) pthread_cond_wait(&cb->cv, &cb->mtx); + + float data[smp->capacity]; - smp->length = cb->model->read(&smp->data[0].f, 16); + smp->length = cb->model->read(data, smp->capacity); + + /* Cast float -> double */ + for (int i = 0; i < smp->length; i++) + smp->data[i].f = data[i]; + smp->sequence = cb->step; cb->read = cb->step; @@ -111,8 +118,10 @@ int cbuilder_write(struct node *n, struct sample *smps[], unsigned cnt) struct sample *smp = smps[0]; pthread_mutex_lock(&cb->mtx); + + float flt = smp->data[0].f; - cb->model->write(&smp->data[0].f, smp->length); + cb->model->write(&flt, smp->length); cb->model->code(); cb->step++; diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 1cc50e38a..d81b922a4 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -29,19 +29,7 @@ #include "timing.h" #include "queue.h" #include "plugin.h" -#include "formats/villas.h" - -int file_reverse(struct node *n) -{ - struct file *f = n->_vd; - struct file_direction tmp; - - tmp = f->read; - f->read = f->write; - f->write = tmp; - - return 0; -} +#include "io.h" static char * file_format_name(const char *format, struct timespec *ts) { @@ -56,48 +44,13 @@ static char * file_format_name(const char *format, struct timespec *ts) return buf; } -static AFILE * file_reopen(struct file_direction *dir) -{ - if (dir->handle) - afclose(dir->handle); - - return afopen(dir->uri, dir->mode); -} - -static int file_parse_direction(json_t *cfg, struct file *f, int d) -{ - struct file_direction *dir = (d == FILE_READ) ? &f->read : &f->write; - int ret; - json_error_t err; - - const char *format = NULL; - const char *mode = NULL; - - ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s }", - "uri", &format, - "mode", &mode - ); - if (ret) - jerror(&err, "Failed to "); - - if (format) - dir->format = strdup(format); - - if (mode) - dir->mode = strdup(mode); - else - dir->mode = strdup(d == FILE_READ ? "r" : "w+"); - - return 0; -} - -static struct timespec file_calc_read_offset(const struct timespec *first, const struct timespec *epoch, enum read_epoch_mode mode) +static struct timespec file_calc_offset(const struct timespec *first, const struct timespec *epoch, enum epoch_mode mode) { /* Get current time */ struct timespec now = time_now(); struct timespec offset; - /* Set read_offset depending on epoch_mode */ + /* Set offset depending on epoch_mode */ switch (mode) { case FILE_EPOCH_DIRECT: /* read first value at now + epoch */ offset = time_diff(first, &now); @@ -110,7 +63,7 @@ static struct timespec file_calc_read_offset(const struct timespec *first, const case FILE_EPOCH_RELATIVE: /* read first value at first + epoch */ return *epoch; - case FILE_EPOCH_ABSOLUTE: /* read first value at f->read_epoch */ + case FILE_EPOCH_ABSOLUTE: /* read first value at f->epoch */ return time_diff(first, epoch); default: @@ -122,81 +75,64 @@ int file_parse(struct node *n, json_t *cfg) { struct file *f = n->_vd; - json_t *cfg_in = NULL; - json_t *cfg_out = NULL; - int ret; json_error_t err; + const char *uri_tmpl = NULL; + const char *format = "villas"; + const char *eof = NULL; + const char *epoch_mode = NULL; + double epoch_flt = 0; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o }", - "in", &cfg_in, - "out", &cfg_out + /* Default values */ + f->rate = 0; + f->eof = FILE_EOF_EXIT; + f->epoch_mode = FILE_EPOCH_DIRECT; + f->flush = 0; + + ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: b, s?: s, s?: f, s?: s, s?: f, s?: s }", + "uri", &uri_tmpl, + "flush", &f->flush, + "eof", &eof, + "rate", &f->rate, + "epoch_mode", &epoch_mode, + "epoch", &epoch_flt, + "format", &format ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); - if (cfg_out) { - if (file_parse_direction(cfg_out, f, FILE_WRITE)) - error("Failed to parse output file for node %s", node_name(n)); + f->epoch = time_from_double(epoch_flt); + f->uri_tmpl = uri_tmpl ? strdup(uri_tmpl) : NULL; + + f->format = io_format_lookup(format); + if (!f->format) + error("Invalid format '%s' for node %s", format, node_name(n)); - f->flush = 0; - - ret = json_unpack_ex(cfg_out, &err, 0, "{ s?: b }", "flush", &f->flush); - if (ret) - jerror(&err, "Failed to parse configuration of node %s", node_name(n)); + if (eof) { + if (!strcmp(eof, "exit")) + f->eof = FILE_EOF_EXIT; + else if (!strcmp(eof, "rewind")) + f->eof = FILE_EOF_REWIND; + else if (!strcmp(eof, "wait")) + f->eof = FILE_EOF_WAIT; + else + error("Invalid mode '%s' for 'eof' setting of node %s", eof, node_name(n)); } - if (cfg_in) { - const char *eof = NULL; - const char *epoch_mode = NULL; - double epoch_flt = 0; - - /* Default values */ - f->read_rate = 0; - f->read_eof = FILE_EOF_EXIT; - f->read_epoch_mode = FILE_EPOCH_DIRECT; - - if (file_parse_direction(cfg_in, f, FILE_READ)) - error("Failed to parse input file for node %s", node_name(n)); - - ret = json_unpack_ex(cfg_in, &err, 0, "{ s?: s, s?: f, s?: s, s?: f }", - "eof", &eof, - "rate", &f->read_rate, - "epoch_mode", &epoch_mode, - "epoch", &epoch_flt - ); - if (ret) - jerror(&err, "Failed to parse configuration of node %s", node_name(n)); - - f->read_epoch = time_from_double(epoch_flt); - - /* More read specific settings */ - if (eof) { - if (!strcmp(eof, "exit")) - f->read_eof = FILE_EOF_EXIT; - else if (!strcmp(eof, "rewind")) - f->read_eof = FILE_EOF_REWIND; - else if (!strcmp(eof, "wait")) - f->read_eof = FILE_EOF_WAIT; - else - error("Invalid mode '%s' for 'eof' setting of node %s", eof, node_name(n)); - } - - if (epoch_mode) { - if (!strcmp(epoch_mode, "direct")) - f->read_epoch_mode = FILE_EPOCH_DIRECT; - else if (!strcmp(epoch_mode, "wait")) - f->read_epoch_mode = FILE_EPOCH_WAIT; - else if (!strcmp(epoch_mode, "relative")) - f->read_epoch_mode = FILE_EPOCH_RELATIVE; - else if (!strcmp(epoch_mode, "absolute")) - f->read_epoch_mode = FILE_EPOCH_ABSOLUTE; - else if (!strcmp(epoch_mode, "original")) - f->read_epoch_mode = FILE_EPOCH_ORIGINAL; - else - error("Invalid value '%s' for setting 'epoch_mode' of node %s", epoch_mode, node_name(n)); - } + if (epoch_mode) { + if (!strcmp(epoch_mode, "direct")) + f->epoch_mode = FILE_EPOCH_DIRECT; + else if (!strcmp(epoch_mode, "wait")) + f->epoch_mode = FILE_EPOCH_WAIT; + else if (!strcmp(epoch_mode, "relative")) + f->epoch_mode = FILE_EPOCH_RELATIVE; + else if (!strcmp(epoch_mode, "absolute")) + f->epoch_mode = FILE_EPOCH_ABSOLUTE; + else if (!strcmp(epoch_mode, "original")) + f->epoch_mode = FILE_EPOCH_ORIGINAL; + else + error("Invalid value '%s' for setting 'epoch_mode' of node %s", epoch_mode, node_name(n)); } n->_vd = f; @@ -209,53 +145,46 @@ char * file_print(struct node *n) struct file *f = n->_vd; char *buf = NULL; - if (f->read.format) { - const char *epoch_str = NULL; - switch (f->read_epoch_mode) { - case FILE_EPOCH_DIRECT: epoch_str = "direct"; break; - case FILE_EPOCH_WAIT: epoch_str = "wait"; break; - case FILE_EPOCH_RELATIVE: epoch_str = "relative"; break; - case FILE_EPOCH_ABSOLUTE: epoch_str = "absolute"; break; - case FILE_EPOCH_ORIGINAL: epoch_str = "original"; break; - } + const char *epoch_str = NULL; + const char *eof_str = NULL; - const char *eof_str = NULL; - switch (f->read_eof) { - case FILE_EOF_EXIT: eof_str = "exit"; break; - case FILE_EOF_WAIT: eof_str = "wait"; break; - case FILE_EOF_REWIND: eof_str = "rewind"; break; - } - - strcatf(&buf, "in=%s, mode=%s, eof=%s, epoch_mode=%s, epoch=%.2f", - f->read.uri ? f->read.uri : f->read.format, - f->read.mode, - eof_str, - epoch_str, - time_to_double(&f->read_epoch) - ); - - if (f->read_rate) - strcatf(&buf, ", rate=%.1f", f->read_rate); + switch (f->epoch_mode) { + case FILE_EPOCH_DIRECT: epoch_str = "direct"; break; + case FILE_EPOCH_WAIT: epoch_str = "wait"; break; + case FILE_EPOCH_RELATIVE: epoch_str = "relative"; break; + case FILE_EPOCH_ABSOLUTE: epoch_str = "absolute"; break; + case FILE_EPOCH_ORIGINAL: epoch_str = "original"; break; } - if (f->write.format) { - strcatf(&buf, ", out=%s, mode=%s", - f->write.uri ? f->write.uri : f->write.format, - f->write.mode - ); + switch (f->eof) { + case FILE_EOF_EXIT: eof_str = "exit"; break; + case FILE_EOF_WAIT: eof_str = "wait"; break; + case FILE_EOF_REWIND: eof_str = "rewind"; break; } - if (f->read_first.tv_sec || f->read_first.tv_nsec) - strcatf(&buf, ", first=%.2f", time_to_double(&f->read_first)); + strcatf(&buf, "uri=%s, format=%s, flush=%s, eof=%s, epoch_mode=%s, epoch=%.2f", + f->uri ? f->uri : f->uri_tmpl, + plugin_name(f->format), + f->flush ? "yes" : "no", + eof_str, + epoch_str, + time_to_double(&f->epoch) + ); - if (f->read_offset.tv_sec || f->read_offset.tv_nsec) - strcatf(&buf, ", offset=%.2f", time_to_double(&f->read_offset)); + if (f->rate) + strcatf(&buf, ", rate=%.1f", f->rate); - if ((f->read_first.tv_sec || f->read_first.tv_nsec) && - (f->read_offset.tv_sec || f->read_offset.tv_nsec)) { + if (f->first.tv_sec || f->first.tv_nsec) + strcatf(&buf, ", first=%.2f", time_to_double(&f->first)); + + if (f->offset.tv_sec || f->offset.tv_nsec) + strcatf(&buf, ", offset=%.2f", time_to_double(&f->offset)); + + if ((f->first.tv_sec || f->first.tv_nsec) && + (f->offset.tv_sec || f->offset.tv_nsec)) { struct timespec eta, now = time_now(); - eta = time_add(&f->read_first, &f->read_offset); + eta = time_add(&f->first, &f->offset); eta = time_diff(&now, &eta); if (eta.tv_sec || eta.tv_nsec) @@ -270,51 +199,46 @@ int file_start(struct node *n) struct file *f = n->_vd; struct timespec now = time_now(); - int ret; + int ret, flags; - if (f->read.format) { - /* Prepare file name */ - f->read.uri = file_format_name(f->read.format, &now); + /* Prepare file name */ + f->uri = file_format_name(f->uri_tmpl, &now); - /* Open file */ - f->read.handle = file_reopen(&f->read); - if (!f->read.handle) - serror("Failed to open file for reading: '%s'", f->read.uri); + /* Open file */ + flags = IO_FORMAT_ALL; + if (f->flush) + flags |= IO_FLUSH; - /* Create timer */ - f->read_timer = f->read_rate - ? timerfd_create_rate(f->read_rate) - : timerfd_create(CLOCK_REALTIME, 0); - if (f->read_timer < 0) - serror("Failed to create timer"); + ret = io_init(&f->io, f->format, flags); + if (ret) + return ret; - arewind(f->read.handle); + ret = io_open(&f->io, f->uri); + if (ret) + return ret; - /* Get timestamp of first line */ - if (f->read_epoch_mode != FILE_EPOCH_ORIGINAL) { - struct sample s; - struct sample *smps[] = { &s }; - s.capacity = 0; + /* Create timer */ + ret = periodic_task_init(&f->timer, f->rate); + if (ret) + serror("Failed to create timer"); - ret = io_format_villas_fscan(f->read.handle->file, smps, 1, NULL); - if (ret < 0) - error("Failed to read first timestamp of node %s", node_name(n)); + /* Get timestamp of first line */ + if (f->epoch_mode != FILE_EPOCH_ORIGINAL) { + io_rewind(&f->io); - f->read_first = s.ts.origin; - f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode); - arewind(f->read.handle); - } + struct sample s; + struct sample *smps[] = { &s }; + s.capacity = 0; + + ret = io_scan(&f->io, smps, 1); + if (ret != 1) + error("Failed to read first timestamp of node %s", node_name(n)); + + f->first = s.ts.origin; + f->offset = file_calc_offset(&f->first, &f->epoch, f->epoch_mode); } - if (f->write.format) { - /* Prepare file name */ - f->write.uri = file_format_name(f->write.format, &now); - - /* Open file */ - f->write.handle = file_reopen(&f->write); - if (!f->write.handle) - serror("Failed to open file for writing: '%s'", f->write.uri); - } + io_rewind(&f->io); return 0; } @@ -322,16 +246,19 @@ int file_start(struct node *n) int file_stop(struct node *n) { struct file *f = n->_vd; + int ret; - if (f->read_timer) - close(f->read_timer); - if (f->read.handle) - afclose(f->read.handle); - if (f->write.handle) - afclose(f->write.handle); + periodic_task_destroy(&f->timer); - free(f->read.uri); - free(f->write.uri); + ret = io_close(&f->io); + if (ret) + return ret; + + ret = io_destroy(&f->io); + if (ret) + return ret; + + free(f->uri); return 0; } @@ -339,76 +266,78 @@ int file_stop(struct node *n) int file_read(struct node *n, struct sample *smps[], unsigned cnt) { struct file *f = n->_vd; - int values, flags; - uint64_t ex; + int ret; + uint64_t steps; - assert(f->read.handle); assert(cnt == 1); -retry: values = io_format_villas_fscan(f->read.handle->file, smps, 1, &flags); /* Get message and timestamp */ - if (values <= 0) { - if (afeof(f->read.handle)) { - switch (f->read_eof) { +retry: ret = io_scan(&f->io, smps, cnt); + if (ret <= 0) { + if (io_eof(&f->io)) { + switch (f->eof) { case FILE_EOF_REWIND: info("Rewind input file of node %s", node_name(n)); - f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode); - arewind(f->read.handle); + f->offset = file_calc_offset(&f->first, &f->epoch, f->epoch_mode); + io_rewind(&f->io); goto retry; case FILE_EOF_WAIT: - usleep(10000); /* We wait 10ms before fetching again. */ - adownload(f->read.handle, 1); + /* We wait 10ms before fetching again. */ + usleep(100000); + + /* Try to download more data if this is a remote file. */ + if (f->io.mode == IO_MODE_ADVIO) + adownload(f->io.advio.input, 1); + goto retry; case FILE_EOF_EXIT: info("Reached end-of-file of node %s", node_name(n)); + killme(SIGTERM); pause(); - } } else - warn("Failed to read messages from node %s: reason=%d", node_name(n), values); + warn("Failed to read messages from node %s: reason=%d", node_name(n), ret); return 0; } - if (f->read_epoch_mode != FILE_EPOCH_ORIGINAL) { - if (!f->read_rate || aftell(f->read.handle) == 0) { - smps[0]->ts.origin = time_add(&smps[0]->ts.origin, &f->read_offset); + /* We dont wait in FILE_EPOCH_ORIGINAL mode */ + if (f->epoch_mode == FILE_EPOCH_ORIGINAL) + return cnt; - ex = timerfd_wait_until(f->read_timer, &smps[0]->ts.origin); - } - else { /* Wait with fixed rate delay */ - ex = timerfd_wait(f->read_timer); + if (f->rate) { + steps = periodic_task_wait_until_next_period(&f->timer); - smps[0]->ts.origin = time_now(); - } + smps[0]->ts.origin = time_now(); + } + else { + smps[0]->ts.origin = time_add(&smps[0]->ts.origin, &f->offset); - /* Check for overruns */ - if (ex == 0) - serror("Failed to wait for timer"); - else if (ex != 1) - warn("Overrun: %" PRIu64, ex - 1); + steps = periodic_task_wait_until(&f->timer, &smps[0]->ts.origin); } - return 1; + /* Check for overruns */ + if (steps == 0) + serror("Failed to wait for timer"); + else if (steps != 1) + warn("Missed steps: %" PRIu64, steps - 1); + + return cnt; } int file_write(struct node *n, struct sample *smps[], unsigned cnt) { struct file *f = n->_vd; - assert(f->write.handle); assert(cnt == 1); - io_format_villas_fprint(f->write.handle->file, smps, cnt, IO_FORMAT_ALL & ~IO_FORMAT_OFFSET); + io_print(&f->io, smps, cnt); - if (f->flush) - afflush(f->write.handle); - - return 1; + return cnt; } static struct plugin p = { @@ -418,7 +347,6 @@ static struct plugin p = { .node = { .vectorize = 1, .size = sizeof(struct file), - .reverse = file_reverse, .parse = file_parse, .print = file_print, .start = file_start, diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index 14d60c0b0..b07937987 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -27,7 +27,7 @@ #include "plugin.h" #include "nodes/nanomsg.h" #include "utils.h" -#include "formats/msg.h" +#include "io_format.h" int nanomsg_reverse(struct node *n) { @@ -82,6 +82,8 @@ int nanomsg_parse(struct node *n, json_t *cfg) int ret; struct nanomsg *m = n->_vd; + const char *format = "villas"; + json_error_t err; json_t *cfg_pub = NULL; @@ -90,9 +92,10 @@ int nanomsg_parse(struct node *n, json_t *cfg) list_init(&m->publisher.endpoints); list_init(&m->subscriber.endpoints); - ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: s }", "publish", &cfg_pub, - "subscribe", &cfg_sub + "subscribe", &cfg_sub, + "format", &format ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); @@ -108,6 +111,10 @@ int nanomsg_parse(struct node *n, json_t *cfg) if (ret < 0) error("Invalid type for 'subscribe' setting of node %s", node_name(n)); } + + m->format = io_format_lookup(format); + if (!m->format) + error("Invalid format '%s' for node %s", format, node_name(n)); return 0; } @@ -118,7 +125,7 @@ char * nanomsg_print(struct node *n) char *buf = NULL; - strcatf(&buf, "subscribe=[ "); + strcatf(&buf, "format=%s, subscribe=[ ", plugin_name(m->format)); for (size_t i = 0; i < list_length(&m->subscriber.endpoints); i++) { char *ep = list_at(&m->subscriber.endpoints, i); @@ -207,17 +214,16 @@ int nanomsg_deinit() int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt) { - int ret; struct nanomsg *m = n->_vd; - - char data[MSG_MAX_PACKET_LEN]; + int bytes; + char data[NANOMSG_MAX_PACKET_LEN]; /* Receive payload */ - ret = nn_recv(m->subscriber.socket, data, sizeof(data), 0); - if (ret < 0) - return ret; + bytes = nn_recv(m->subscriber.socket, data, sizeof(data), 0); + if (bytes < 0) + return -1; - return msg_buffer_to_samples(smps, cnt, data, ret); + return io_format_sscan(m->format, data, bytes, NULL, smps, cnt, NULL); } int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) @@ -225,15 +231,15 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) int ret; struct nanomsg *m = n->_vd; - ssize_t sent; + size_t wbytes; - char data[MSG_MAX_PACKET_LEN]; + char data[NANOMSG_MAX_PACKET_LEN]; - sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data)); - if (sent < 0) + ret = io_format_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL); + if (ret <= 0) return -1; - ret = nn_send(m->publisher.socket, data, sent, 0); + ret = nn_send(m->publisher.socket, data, wbytes, 0); if (ret < 0) return ret; diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index cb7c5c62e..4935dd6b1 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -34,16 +34,13 @@ #include "config.h" #include "utils.h" -#ifdef WITH_LIBNL_ROUTE_30 +#ifdef WITH_NETEM #include "kernel/if.h" #include "kernel/nl.h" #include "kernel/tc.h" +#endif /* WITH_NETEM */ - #define WITH_NETEM -#endif /* WITH_LIBNL_ROUTE_30 */ - -#include "formats/msg.h" -#include "formats/msg_format.h" +#include "io_format.h" #include "sample.h" #include "queue.h" #include "plugin.h" @@ -127,7 +124,7 @@ int socket_deinit() char * socket_print(struct node *n) { struct socket *s = n->_vd; - char *layer = NULL, *header = NULL, *endian = NULL, *buf; + char *layer = NULL, *buf; switch (s->layer) { case SOCKET_LAYER_UDP: layer = "udp"; break; @@ -135,25 +132,10 @@ char * socket_print(struct node *n) case SOCKET_LAYER_ETH: layer = "eth"; break; } - switch (s->header) { - case SOCKET_HEADER_NONE: header = "none"; break; - case SOCKET_HEADER_FAKE: header = "fake"; break; - case SOCKET_HEADER_DEFAULT: header = "default"; break; - } - - if (s->header == SOCKET_HEADER_DEFAULT) - endian = "auto"; - else { - switch (s->endian) { - case SOCKET_ENDIAN_LITTLE: endian = "little"; break; - case SOCKET_ENDIAN_BIG: endian = "big"; break; - } - } - char *local = socket_print_addr((struct sockaddr *) &s->local); char *remote = socket_print_addr((struct sockaddr *) &s->remote); - buf = strf("layer=%s, header=%s, endian=%s, local=%s, remote=%s", layer, header, endian, local, remote); + buf = strf("layer=%s, format=%s, local=%s, remote=%s", layer, plugin_name(s->format), local, remote); if (s->multicast.enabled) { char group[INET_ADDRSTRLEN]; @@ -320,110 +302,15 @@ int socket_destroy(struct node *n) return 0; } -static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt) -{ - int length; - struct socket *s = n->_vd; - - char buf[MSG_MAX_PACKET_LEN]; - uint32_t *values = (uint32_t *) buf; - ssize_t bytes; - - struct sample *smp = smps[0]; - - if (cnt < 1) - return 0; - - union sockaddr_union src; - socklen_t srclen = sizeof(src); - - /* Receive next sample */ - bytes = recvfrom(s->sd, buf, sizeof(buf), 0, &src.sa, &srclen); - if (bytes == 0) - error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */ - else if (bytes < 0) - serror("Failed recv from node %s", node_name(n)); - else if (bytes % 4 != 0) { - warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes); - recv(s->sd, NULL, 0, 0); /* empty receive buffer */ - return -1; - } - - length = bytes / 4; - - /* Strip IP header from packet */ - if (s->layer == SOCKET_LAYER_IP) { - struct ip *iphdr = (struct ip *) buf; - - length -= iphdr->ip_hl; - values += iphdr->ip_hl; - } - - /* SOCK_RAW IP sockets to not provide the IP protocol number via recvmsg() - * So we simply set it ourself. */ - if (s->layer == SOCKET_LAYER_IP) { - switch (src.sa.sa_family) { - case AF_INET: src.sin.sin_port = s->remote.sin.sin_port; break; - case AF_INET6: src.sin6.sin6_port = s->remote.sin6.sin6_port; break; - } - } - - if (s->verify_source && socket_compare_addr(&src.sa, &s->remote.sa) != 0) { - char *buf = socket_print_addr((struct sockaddr *) &src); - warn("Received packet from unauthorized source: %s", buf); - free(buf); - - return 0; - } - - /* Convert packet contents to host endianess */ - for (int i = 0; i < length; i++) - values[i] = s->endian == SOCKET_ENDIAN_BIG - ? be32toh(values[i]) - : le32toh(values[i]); - - if (s->header == SOCKET_HEADER_FAKE) { - if (length < 3) { - warn("Node %s received a packet with no fake header. Skipping...", node_name(n)); - return 0; - } - - smp->sequence = values[0]; - smp->ts.origin.tv_sec = values[1]; - smp->ts.origin.tv_nsec = values[2]; - - values += 3; - length -= 3; - } - else { - smp->sequence = n->sequence++; /* Fake sequence no generated by VILLASnode */ - smp->ts.origin.tv_sec = 0; - smp->ts.origin.tv_nsec = 0; - } - - if (length > smp->capacity) { - warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity); - length = smp->capacity; - } - - memcpy(smp->data, values, SAMPLE_DATA_LEN(length)); - - smp->ts.received.tv_sec = 0; - smp->ts.received.tv_nsec = 0; - - smp->length = length; - - return 1; /* GTNET-SKT sends every sample in a single packet */ -} - -static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cnt) +int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { int ret; struct socket *s = n->_vd; - char buf[MSG_MAX_PACKET_LEN]; + char buf[SOCKET_MAX_PACKET_LEN]; char *bufptr = buf; ssize_t bytes; + size_t rbytes; union sockaddr_union src; socklen_t srclen = sizeof(src); @@ -465,100 +352,38 @@ static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cn return 0; } - ret = msg_buffer_to_samples(smps, cnt, bufptr, bytes); + ret = io_format_sscan(s->format, bufptr, bytes, &rbytes, smps, cnt, NULL); if (ret < 0) warn("Received invalid packet from node: %s reason=%d", node_name(n), ret); + if (bytes != rbytes) + warn("Received invalid packet from node: %s bytes=%zu, rbytes=%zu", node_name(n), bytes, rbytes); + return ret; } -static int socket_write_none(struct node *n, struct sample *smps[], unsigned cnt) -{ - struct socket *s = n->_vd; - - int sent = 0; - ssize_t bytes; - - if (cnt < 1) - return 0; - - for (int i = 0; i < cnt; i++) { - int off = s->header == SOCKET_HEADER_FAKE ? 3 : 0; - int len = smps[i]->length + off; - uint32_t data[len]; - - /* First three values are sequence, seconds and nano-seconds timestamps */ - if (s->header == SOCKET_HEADER_FAKE) { - data[0] = smps[i]->sequence; - data[1] = smps[i]->ts.origin.tv_sec; - data[2] = smps[i]->ts.origin.tv_nsec; - } - - for (int j = 0; j < smps[i]->length; j++) - data[off + j] = s->endian == SOCKET_ENDIAN_BIG - ? htobe32(smps[i]->data[j].i) - : htole32(smps[i]->data[j].i); - - bytes = sendto(s->sd, data, len * sizeof(data[0]), 0, - (struct sockaddr *) &s->remote, sizeof(s->remote)); - if (bytes < 0) - serror("Failed send to node %s", node_name(n)); - - sent++; - } - - return sent; -} - -static int socket_write_villas(struct node *n, struct sample *smps[], unsigned cnt) -{ - struct socket *s = n->_vd; - - char data[MSG_MAX_PACKET_LEN]; - ssize_t bytes = 0, sent; - - sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data)); - if (sent < 0) - return -1; - - /* Send message */ - bytes = sendto(s->sd, data, sent, 0, (struct sockaddr *) &s->remote, sizeof(s->remote)); - if (bytes < 0) - serror("Failed send to node %s", node_name(n)); - - return cnt; -} - -int socket_read(struct node *n, struct sample *smps[], unsigned cnt) -{ - struct socket *s = n->_vd; - - switch (s->header) { - case SOCKET_HEADER_NONE: - case SOCKET_HEADER_FAKE: - return socket_read_none(n, smps, cnt); - - case SOCKET_HEADER_DEFAULT: - return socket_read_villas(n, smps, cnt); - } - - return -1; -} - int socket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; - switch (s->header) { - case SOCKET_HEADER_NONE: - case SOCKET_HEADER_FAKE: - return socket_write_none(n, smps, cnt); + char data[SOCKET_MAX_PACKET_LEN]; + int ret; + ssize_t bytes; + size_t wbytes; - case SOCKET_HEADER_DEFAULT: - return socket_write_villas(n, smps, cnt); - } + ret = io_format_sprint(s->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL); + if (ret < 0) + return -1; - return -1; + /* Send message */ + bytes = sendto(s->sd, data, wbytes, 0, (struct sockaddr *) &s->remote, sizeof(s->remote)); + if (bytes < 0) + serror("Failed send to node %s", node_name(n)); + + if (bytes != wbytes) + warn("Partial send to node %s", node_name(n)); + + return cnt; } int socket_parse(struct node *n, json_t *cfg) @@ -569,6 +394,7 @@ int socket_parse(struct node *n, json_t *cfg) const char *endian = NULL; const char *layer = NULL; const char *header = NULL; + const char *format = "villas"; int ret; @@ -581,18 +407,24 @@ int socket_parse(struct node *n, json_t *cfg) s->endian = SOCKET_ENDIAN_BIG; s->verify_source = 0; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s: s, s: s, s?: b, s?: o }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s: s, s: s, s?: b, s?: o, s?: s }", "layer", &layer, "header", &header, "endian", &endian, "remote", &remote, "local", &local, "verify_source", &s->verify_source, - "multicast", &cfg_multicast + "multicast", &cfg_multicast, + "format", &format ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); + /* Format */ + s->format = io_format_lookup(format); + if (!s->format) + error("Invalid format '%s' for node %s", format, node_name(n)); + /* IP layer */ if (layer) { if (!strcmp(layer, "ip")) diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index b4fd7b7c0..d627da4f6 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -30,9 +30,7 @@ #include "timing.h" #include "utils.h" #include "plugin.h" - -#include "formats/webmsg.h" -#include "formats/webmsg_format.h" +#include "io_format.h" #include "nodes/websocket.h" /* Private static storage */ @@ -66,7 +64,11 @@ static int websocket_connection_init(struct websocket_connection *c, struct lws c->state = STATE_INITIALIZED; c->wsi = wsi; - c->buf = NULL; + + /** @todo: We must find a better way to determine the buffer size */ + c->buflen = 1 << 12; + c->buf = alloc(c->buflen); + if (c->node) { struct websocket *w = c->node->_vd; @@ -76,7 +78,7 @@ static int websocket_connection_init(struct websocket_connection *c, struct lws else list_push(&connections, c); - ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); + ret = queue_signalled_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage); if (ret) return ret; @@ -102,13 +104,13 @@ static int websocket_connection_destroy(struct websocket_connection *c) if (c->_name) free(c->_name); - ret = queue_destroy(&c->queue); + ret = queue_signalled_destroy(&c->queue); if (ret) return ret; if (c->buf) free(c->buf); - + c->state = STATE_DESTROYED; c->wsi = NULL; @@ -136,7 +138,7 @@ static int websocket_connection_write(struct websocket_connection *c, struct sam for (int i = 0; i < cnt; i++) { sample_get(smps[i]); /* increase reference count */ - ret = queue_push(&c->queue, (void **) smps[i]); + ret = queue_signalled_push(&c->queue, (void **) smps[i]); if (ret != 1) warn("Queue overrun in websocket connection: %s", websocket_connection_name(c)); } @@ -171,35 +173,52 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi int ret; struct websocket_connection *c = user; - struct webmsg *msg; - struct sample *smp; - switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED: ret = websocket_connection_init(c, wsi); if (ret) return -1; + + c->format = io_format_lookup("villas"); - return 0; + break; + + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + warn("Failed to establish connection: %s", websocket_connection_name(c)); + + break; case LWS_CALLBACK_ESTABLISHED: c->state = STATE_DESTROYED; c->mode = WEBSOCKET_MODE_SERVER; + /* We use the URI to associate this connection to a node + * and choose a protocol. + * + * Example: ws://example.com/node_1.json + * Will select the node with the name 'node_1' + * and format 'json'. + * + * If the node name is omitted, this connection + * will receive sample data from all websocket nodes + * (catch all). + */ + /* Get path of incoming request */ + char *node, *format = NULL; char uri[64]; + lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/ if (strlen(uri) <= 0) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL"); return -1; } - - if ((uri[0] == '/' && uri[1] == 0) || uri[0] == 0){ - /* Catch all connection */ + + node = strtok(uri, "/."); + if (strlen(node) == 0) c->node = NULL; - } else { - char *node = uri + 1; + format = strtok(NULL, ""); /* Search for node whose name matches the URI. */ c->node = list_lookup(&p.node.instances, node); @@ -208,12 +227,21 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi return -1; } } + + if (!format) + format = "villas"; + + c->format = io_format_lookup(format); + if (!c->format) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid format"); + return -1; + } ret = websocket_connection_init(c, wsi); if (ret) return -1; - return 0; + break; case LWS_CALLBACK_CLOSED: websocket_connection_destroy(c); @@ -224,72 +252,82 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi return 0; case LWS_CALLBACK_CLIENT_WRITEABLE: - case LWS_CALLBACK_SERVER_WRITEABLE: - if (c->state == STATE_STOPPED) { - websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_NORMAL, "Goodbye"); - return -1; - } - + case LWS_CALLBACK_SERVER_WRITEABLE: { if (c->node && c->node->state != STATE_STARTED) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1; } - size_t msglen, buflen = LWS_PRE; + size_t wbytes; + int cnt = 256; //c->node ? c->node->vectorize : 1; + int pulled; + + struct sample **smps = alloca(cnt * sizeof(struct sample *)); - while (queue_pull(&c->queue, (void **) &smp)) { - msglen = WEBMSG_LEN(smp->length); + pulled = queue_signalled_pull_many(&c->queue, (void **) smps, cnt); - c->buf = realloc(c->buf, buflen + msglen); - if (!c->buf) - serror("realloc failed:"); + io_format_sprint(c->format, c->buf + LWS_PRE, c->buflen - LWS_PRE, &wbytes, smps, pulled, IO_FORMAT_ALL); - msg = (struct webmsg *) (c->buf + buflen); - buflen += msglen; + sample_put_many(smps, pulled); - msg->version = WEBMSG_VERSION; - msg->type = WEBMSG_TYPE_DATA; - msg->length = smp->length; - msg->sequence = smp->sequence; - msg->id = smp->source->id; - msg->ts.sec = smp->ts.origin.tv_sec; - msg->ts.nsec = smp->ts.origin.tv_nsec; - - memcpy(&msg->data, &smp->data, WEBMSG_DATA_LEN(smp->length)); - - webmsg_hton(msg); - - sample_put(smp); - } - - ret = lws_write(wsi, (unsigned char *) c->buf + LWS_PRE, buflen - LWS_PRE, LWS_WRITE_BINARY); + ret = lws_write(wsi, (unsigned char *) c->buf + LWS_PRE, wbytes, c->format->flags & IO_FORMAT_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); if (ret < 0) { warn("Failed lws_write() for connection %s", websocket_connection_name(c)); return -1; } - + + if (c->state == STATE_STOPPED) { + info("Closing connection %s", websocket_connection_name(c)); + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_NORMAL, "Goodbye"); + return -1; + } + + if (queue_signalled_available(&c->queue) > 0) + lws_callback_on_writable(wsi); + return 0; + } case LWS_CALLBACK_CLIENT_RECEIVE: - case LWS_CALLBACK_RECEIVE: - if (!lws_frame_is_binary(wsi)) { + case LWS_CALLBACK_RECEIVE: { + if (c->format->flags & IO_FORMAT_BINARY && !lws_frame_is_binary(wsi)) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Binary data expected"); return -1; } - if (len < WEBMSG_LEN(0)) { + if (len <= 0) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid packet"); return -1; } + if (!c->node) { + websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Catch-all connection can not receive."); + return -1; + } + struct timespec ts_recv = time_now(); + int recvd; + int cnt = 256; //c->node->vectorize; + struct websocket *w = c->node->_vd; + struct sample **smps = alloca(cnt * sizeof(struct sample *)); - msg = (struct webmsg *) in; - while ((char *) msg < (char *) in + len) { - struct node *dest; + ret = sample_alloc(&w->pool, smps, cnt); + if (ret != 1) { + warn("Pool underrun for connection: %s", websocket_connection_name(c)); + break; + } - /* Convert message to host byte-order */ - webmsg_ntoh(msg); + recvd = io_format_sscan(c->format, in, len, NULL, smps, cnt, NULL); + if (recvd < 0) { + warn("Failed to parse sample data received on connection: %s", websocket_connection_name(c)); + break; + } + + struct node *dest; + + for (int i = 0; i < recvd; i++) { + /* Set receive timestamp */ + smps[i]->ts.received = ts_recv; /* Find destination node of this message */ if (c->node) @@ -300,53 +338,31 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi for (int i = 0; i < list_length(&p.node.instances); i++) { struct node *n = list_at(&p.node.instances, i); - if (n->id == msg->id) { + if (n->id == smps[i]->id) { dest = n; break; } } - if (!dest) { + if (!dest) warn("Ignoring message due to invalid node id"); - goto next; - } } + } - struct websocket *w = dest->_vd; - - ret = sample_alloc(&w->pool, &smp, 1); - if (ret != 1) { - warn("Pool underrun for connection: %s", websocket_connection_name(c)); - break; - } - - smp->ts.origin = WEBMSG_TS(msg); - smp->ts.received = ts_recv; - - smp->sequence = msg->sequence; - smp->length = msg->length; - if (smp->length > smp->capacity) { - smp->length = smp->capacity; - warn("Dropping values for connection: %s", websocket_connection_name(c)); - } - - memcpy(&smp->data, &msg->data, SAMPLE_DATA_LEN(smp->length)); - - ret = queue_signalled_push(&w->queue, (void **) smp); - if (ret != 1) { - warn("Queue overrun for connection %s", websocket_connection_name(c)); - break; - } - - /* Next message */ -next: msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length)); + ret = queue_signalled_push_many(&w->queue, (void **) smps, recvd); + if (ret != 1) { + warn("Queue overrun for connection %s", websocket_connection_name(c)); + break; } return 0; + } default: - return 0; + break; } + + return 0; } int websocket_init(struct super_node *sn) @@ -372,8 +388,12 @@ int websocket_deinit() } /* Wait for all connections to be closed */ - while (list_length(&connections) > 0) - usleep(0.2*1e6); + while (list_length(&connections) > 0) { + info("LWS: Waiting for connection shutdown"); + sched_yield(); + usleep(0.1 * 1e6); + } + list_destroy(&connections, (dtor_cb_t) websocket_destination_destroy, true); @@ -385,9 +405,7 @@ int websocket_start(struct node *n) int ret; struct websocket *w = n->_vd; - size_t blocklen = LWS_PRE + WEBMSG_LEN(DEFAULT_WEBSOCKET_SAMPLELEN); - - ret = pool_init(&w->pool, DEFAULT_WEBSOCKET_QUEUELEN, blocklen, &memtype_hugepage); + ret = pool_init(&w->pool, DEFAULT_WEBSOCKET_QUEUELEN, SAMPLE_LEN(DEFAULT_WEBSOCKET_SAMPLELEN), &memtype_hugepage); if (ret) return ret; @@ -403,6 +421,7 @@ int websocket_start(struct node *n) c->state = STATE_DESTROYED; c->mode = WEBSOCKET_MODE_CLIENT; c->node = n; + c->destination = d; d->info.context = web->context; d->info.vhost = web->vhost; @@ -428,8 +447,11 @@ int websocket_stop(struct node *n) } /* Wait for all connections to be closed */ - while (list_length(&w->connections) > 0) - sleep(1); + while (list_length(&w->connections) > 0) { + info("LWS: Waiting for connection shutdown"); + sched_yield(); + usleep(0.1 * 1e6); + } ret = queue_signalled_destroy(&w->queue); if (ret) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index d309a1d7a..c7d2388f1 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -27,10 +27,11 @@ #endif #include "nodes/zeromq.h" +#include "node.h" #include "utils.h" #include "queue.h" #include "plugin.h" -#include "formats/msg.h" +#include "io_format.h" static void *context; @@ -97,6 +98,7 @@ int zeromq_parse(struct node *n, json_t *cfg) const char *ep = NULL; const char *type = NULL; const char *filter = NULL; + const char *format = "villas"; size_t index; json_t *cfg_pub = NULL; @@ -109,19 +111,24 @@ int zeromq_parse(struct node *n, json_t *cfg) z->curve.enabled = false; z->ipv6 = 0; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: o, s?: o, s?: s, s?: s, s?: b }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: o, s?: o, s?: s, s?: s, s?: b, s?: s }", "subscribe", &ep, "publish", &cfg_pub, "curve", &cfg_curve, "filter", &filter, "pattern", &type, - "ipv6", &z->ipv6 + "ipv6", &z->ipv6, + "format", &format ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); z->subscriber.endpoint = ep ? strdup(ep) : NULL; z->filter = filter ? strdup(filter) : NULL; + + z->format = io_format_lookup(format); + if (!z->format) + error("Invalid format '%s' for node %s", format, node_name(n)); if (cfg_pub) { switch (json_typeof(cfg_pub)) { @@ -202,7 +209,13 @@ char * zeromq_print(struct node *n) #endif } - strcatf(&buf, "pattern=%s, ipv6=%s, crypto=%s, subscribe=%s, publish=[ ", pattern, z->ipv6 ? "yes" : "no", z->curve.enabled ? "yes" : "no", z->subscriber.endpoint); + strcatf(&buf, "format=%s, pattern=%s, ipv6=%s, crypto=%s, subscribe=%s, publish=[ ", + plugin_name(z->format), + pattern, + z->ipv6 ? "yes" : "no", + z->curve.enabled ? "yes" : "no", + z->subscriber.endpoint + ); for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { char *ep = list_at(&z->publisher.endpoints, i); @@ -416,7 +429,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) if (ret < 0) return ret; - recv = msg_buffer_to_samples(smps, cnt, zmq_msg_data(&m), zmq_msg_size(&m)); + recv = io_format_sscan(z->format, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt, NULL); ret = zmq_msg_close(&m); if (ret) @@ -430,16 +443,16 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) int ret; struct zeromq *z = n->_vd; - ssize_t sent; + size_t wbytes; zmq_msg_t m; char data[1500]; - sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data)); - if (sent < 0) + ret = io_format_sprint(z->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL); + if (ret <= 0) return -1; - ret = zmq_msg_init_size(&m, sent); + ret = zmq_msg_init_size(&m, wbytes); if (z->filter) { switch (z->pattern) { @@ -457,7 +470,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) } } - memcpy(zmq_msg_data(&m), data, sent); + memcpy(zmq_msg_data(&m), data, wbytes); ret = zmq_msg_send(&m, z->publisher.socket, 0); if (ret < 0) diff --git a/src/hook.c b/src/hook.c index ca7a29600..90eb9f78a 100644 --- a/src/hook.c +++ b/src/hook.c @@ -93,7 +93,7 @@ static void usage() printf("\n"); printf("Supported IO formats:\n"); - plugin_dump(PLUGIN_TYPE_FORMAT); + plugin_dump(PLUGIN_TYPE_IO); printf("\n"); printf("Example:"); @@ -105,11 +105,9 @@ static void usage() int main(int argc, char *argv[]) { - int ret; + int ret, recv; char *format = "villas"; - size_t recv; - /* Default values */ cnt = 1; @@ -178,7 +176,7 @@ check: if (optarg == endptr) error("Failed to initilize memory pool"); /* Initialize IO */ - p = plugin_lookup(PLUGIN_TYPE_FORMAT, format); + p = plugin_lookup(PLUGIN_TYPE_IO, format); if (!p) error("Unknown IO format '%s'", format); @@ -186,7 +184,7 @@ check: if (optarg == endptr) if (ret) error("Failed to initialize IO"); - ret = io_open(&io, NULL, NULL); + ret = io_open(&io, NULL); if (ret) error("Failed to open IO"); @@ -218,11 +216,13 @@ check: if (optarg == endptr) error("Failed to allocate %d smps from pool", cnt); recv = io_scan(&io, smps, cnt); + if (recv < 0) + killme(SIGTERM); - debug(15, "Read %zu smps from stdin", recv); + debug(15, "Read %u smps from stdin", recv); - hook_read(&h, smps, &recv); - hook_write(&h, smps, &recv); + hook_read(&h, smps, (unsigned *) &recv); + hook_write(&h, smps, (unsigned *) &recv); io_print(&io, smps, recv); diff --git a/src/node.c b/src/node.c index 9b73e5c35..6377935c7 100644 --- a/src/node.c +++ b/src/node.c @@ -89,7 +89,7 @@ static void usage() printf("\n"); printf("Supported IO formats:\n"); - plugin_dump(PLUGIN_TYPE_FORMAT); + plugin_dump(PLUGIN_TYPE_IO); printf("\n"); print_copyright(); diff --git a/src/pipe.c b/src/pipe.c index 2f1c014d6..1cb7fcb30 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -270,7 +270,7 @@ check: if (optarg == endptr) if (ret) error("Failed to initalize real-time"); - p = plugin_lookup(PLUGIN_TYPE_FORMAT, format); + p = plugin_lookup(PLUGIN_TYPE_IO, format); if (!p) error("Invalid format: %s", format); @@ -278,7 +278,7 @@ check: if (optarg == endptr) if (ret) error("Failed to initialize IO"); - ret = io_open(&io, NULL, NULL); + ret = io_open(&io, NULL); if (ret) error("Failed to open IO"); diff --git a/src/signal.c b/src/signal.c index b7e3fceb2..3fa3a8836 100644 --- a/src/signal.c +++ b/src/signal.c @@ -30,7 +30,7 @@ #include #include -#include +#include #include #include #include @@ -125,15 +125,15 @@ int main(int argc, char *argv[]) if (ret) error("Failed to initialize node"); - p = plugin_lookup(PLUGIN_TYPE_FORMAT, format); + p = plugin_lookup(PLUGIN_TYPE_IO, format); if (!p) error("Invalid output format '%s'", format); - ret = io_init(&io, &p->io, IO_FLAG_FLUSH | (IO_FORMAT_ALL & ~IO_FORMAT_OFFSET)); + ret = io_init(&io, &p->io, IO_FLUSH | (IO_FORMAT_ALL & ~IO_FORMAT_OFFSET)); if (ret) error("Failed to initialize output"); - ret = io_open(&io, NULL, NULL); + ret = io_open(&io, NULL); if (ret) error("Failed to open output"); diff --git a/src/test-cmp.c b/src/test-cmp.c index 427f29be9..1d9ce1d80 100644 --- a/src/test-cmp.c +++ b/src/test-cmp.c @@ -29,7 +29,8 @@ #include #include -#include +#include +#include #include #include #include @@ -132,11 +133,11 @@ check: if (optarg == endptr) serror("Failed to open file: %s", f2.path); while (!feof(f1.handle) && !feof(f2.handle)) { - ret = io_format_villas_fscan(f1.handle, &f1.sample, 1, &f1.flags); + ret = villas_fscan(f1.handle, &f1.sample, 1, &f1.flags); if (ret < 0 && !feof(f1.handle)) goto out; - ret = io_format_villas_fscan(f2.handle, &f2.sample, 1, &f2.flags); + ret = villas_fscan(f2.handle, &f2.sample, 1, &f2.flags); if (ret < 0 && !feof(f2.handle)) goto out; diff --git a/tests/unit/io.c b/tests/unit/io.c index 431f7132d..68cae73e1 100644 --- a/tests/unit/io.c +++ b/tests/unit/io.c @@ -21,6 +21,7 @@ *********************************************************************************/ #include +#include #include #include @@ -32,6 +33,7 @@ #include "plugin.h" #include "pool.h" #include "io.h" +#include "io/raw.h" #define NUM_SAMPLES 10 #define NUM_VALUES 10 @@ -40,23 +42,43 @@ void cr_assert_eq_sample(struct sample *s, struct sample *t) { cr_assert_eq(s->length, t->length); cr_assert_eq(s->sequence, t->sequence); - cr_assert_eq(s->format, t->format); +// cr_assert_eq(s->format, t->format); cr_assert_eq(s->ts.origin.tv_sec, t->ts.origin.tv_sec); cr_assert_eq(s->ts.origin.tv_nsec, t->ts.origin.tv_nsec); - cr_assert_eq(s->ts.received.tv_sec, t->ts.received.tv_sec); - cr_assert_eq(s->ts.received.tv_nsec, t->ts.received.tv_nsec); - for (int i = 0; i < MIN(s->length, t->length); i++) - cr_assert_float_eq(s->data[i].f, t->data[i].f, 1e-6); + for (int i = 0; i < MIN(s->length, t->length); i++) { + switch (sample_get_data_format(t, i)) { + case SAMPLE_DATA_FORMAT_FLOAT: + cr_assert_float_eq(s->data[i].f, t->data[i].f, 1e-3, "Sample data mismatch at index %d: %f != %f", i, s->data[i].f, t->data[i].f); + break; + case SAMPLE_DATA_FORMAT_INT: + cr_assert_eq(s->data[i].i, t->data[i].i); + break; + } + } } ParameterizedTestParameters(io, highlevel) { static char formats[][32] = { +#ifdef WITH_HDF5 + "hdf5", +#endif + "raw-int8", + "raw-int16-be", + "raw-int16-le", + "raw-int32-be", + "raw-int32-le", + "raw-int64-be", + "raw-int64-le", + "raw-flt32", + "raw-flt64", "villas", "json", - "csv" + "msg", + "gtnet", + "gtnet-fake", }; return cr_make_param_array(char[32], formats, ARRAY_LEN(formats)); @@ -64,16 +86,19 @@ ParameterizedTestParameters(io, highlevel) ParameterizedTest(char *fmt, io, highlevel) { - int ret; - char filename[64]; + int ret, cnt; struct io io; + struct io_format *f; + struct pool p = { .state = STATE_DESTROYED }; struct sample *smps[NUM_SAMPLES]; struct sample *smpt[NUM_SAMPLES]; ret = pool_init(&p, 2 * NUM_SAMPLES, SAMPLE_LEN(NUM_VALUES), &memtype_hugepage); cr_assert_eq(ret, 0); + + info("Testing: %s", fmt); /* Prepare a sample with arbitrary data */ ret = sample_alloc(&p, smps, NUM_SAMPLES); @@ -83,52 +108,94 @@ ParameterizedTest(char *fmt, io, highlevel) cr_assert_eq(ret, NUM_SAMPLES); for (int i = 0; i < NUM_SAMPLES; i++) { - smpt[i]->capacity = - smps[i]->capacity = NUM_VALUES; + smpt[i]->capacity = NUM_VALUES; + smps[i]->length = NUM_VALUES; smps[i]->sequence = 235 + i; smps[i]->format = 0; /* all float */ smps[i]->ts.origin = time_now(); - smps[i]->ts.received = (struct timespec) { - .tv_sec = smps[i]->ts.origin.tv_sec - 1, - .tv_nsec = smps[i]->ts.origin.tv_nsec - }; for (int j = 0; j < smps[i]->length; j++) - smps[i]->data[j].f = j * 1.2 + i * 100; + smps[i]->data[j].f = j * 0.1 + i * 100; + //smps[i]->data[j].i = -500 + j*100; } /* Open a file for IO */ - strncpy(filename, "/tmp/villas-unit-test.XXXXXX", sizeof(filename)); - mktemp(filename); + char *fn, dir[64]; + strncpy(dir, "/tmp/villas.XXXXXX", sizeof(dir)); + + mkdtemp(dir); + ret = asprintf(&fn, "%s/file", dir); + cr_assert_gt(ret, 0); - printf("Writing to file: %s\n", filename); + f = io_format_lookup(fmt); + cr_assert_not_null(f, "Format '%s' does not exist", fmt); - struct plugin *pl; - - pl = plugin_lookup(PLUGIN_TYPE_FORMAT, fmt); - cr_assert_not_null(pl, "Format '%s' does not exist", fmt); - - ret = io_init(&io, &pl->io, IO_FORMAT_ALL); + ret = io_init(&io, f, IO_FORMAT_ALL); cr_assert_eq(ret, 0); - ret = io_open(&io, filename, "w+"); + ret = io_open(&io, fn, "w+"); cr_assert_eq(ret, 0); ret = io_print(&io, smps, NUM_SAMPLES); cr_assert_eq(ret, NUM_SAMPLES); - io_rewind(&io); - io_flush(&io); + ret = io_close(&io); + cr_assert_eq(ret, 0); +#if 1 /* Show the file contents */ char cmd[128]; - snprintf(cmd, sizeof(cmd), "cat %s", filename); + if (!strcmp(fmt, "json") || !strcmp(fmt, "villas")) + snprintf(cmd, sizeof(cmd), "cat %s", fn); + else + snprintf(cmd, sizeof(cmd), "hexdump -C %s", fn); system(cmd); +#endif + + ret = io_open(&io, fn, "r"); + cr_assert_eq(ret, 0); - ret = io_scan(&io, smpt, NUM_SAMPLES); - cr_assert_eq(ret, NUM_SAMPLES, "Read only %d of %d samples back", ret, NUM_SAMPLES); + cnt = io_scan(&io, smpt, NUM_SAMPLES); - for (int i = 0; i < 0; i++) + /* The RAW format has certain limitations: + * - limited accuracy if smaller datatypes are used + * - no support for vectors / framing + * + * We need to take these limitations into account before comparing. + */ + if (f->sscan == raw_sscan) { + cr_assert_eq(cnt, 1); + cr_assert_eq(smpt[0]->length, smpt[0]->capacity); + + if (io.flags & RAW_FAKE) { + } + else { + smpt[0]->sequence = smps[0]->sequence; + smpt[0]->ts.origin = smps[0]->ts.origin; + } + + int bits = 1 << (io.flags >> 24); + for (int j = 0; j < smpt[0]->length; j++) { + if (io.flags & RAW_FLT) { + switch (bits) { + case 32: smps[0]->data[j].f = (float) smps[0]->data[j].f; break; + case 64: smps[0]->data[j].f = (double) smps[0]->data[j].f; break; + } + } + else { + switch (bits) { + case 8: smps[0]->data[j].i = ( int8_t) smps[0]->data[j].i; break; + case 16: smps[0]->data[j].i = ( int16_t) smps[0]->data[j].i; break; + case 32: smps[0]->data[j].i = ( int32_t) smps[0]->data[j].i; break; + case 64: smps[0]->data[j].i = ( int64_t) smps[0]->data[j].i; break; + } + } + } + } + else + cr_assert_eq(cnt, NUM_SAMPLES, "Read only %d of %d samples back", cnt, NUM_SAMPLES); + + for (int i = 0; i < cnt; i++) cr_assert_eq_sample(smps[i], smpt[i]); ret = io_close(&io); @@ -137,8 +204,13 @@ ParameterizedTest(char *fmt, io, highlevel) ret = io_destroy(&io); cr_assert_eq(ret, 0); - ret = unlink(filename); + ret = unlink(fn); cr_assert_eq(ret, 0); + + ret = rmdir(dir); + cr_assert_eq(ret, 0); + + free(fn); sample_free(smps, NUM_SAMPLES); sample_free(smpt, NUM_SAMPLES);