1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

- introduce low-level interface for new IO subsystem: we now have a fully extensible system for new IO formats and file formats

- reworked file node-type to remove in / out directions
This commit is contained in:
Steffen Vogel 2017-08-14 14:42:07 +02:00
parent 27f60325a5
commit 3eea0c67bb
47 changed files with 1622 additions and 977 deletions

View file

@ -31,11 +31,15 @@ size_t json_dumpb(const json_t *json, char *buffer, size_t size, size_t flags);
#define le16toh(x) OSSwapLittleToHostInt16(x)
#define le32toh(x) OSSwapLittleToHostInt32(x)
#define le64toh(x) OSSwapLittleToHostInt64(x)
#define be16toh(x) OSSwapBigToHostInt16(x)
#define be32toh(x) OSSwapBigToHostInt32(x)
#define be64toh(x) OSSwapBigToHostInt64(x)
#define htole16(x) OSSwapHostToLittleInt16(x)
#define htole32(x) OSSwapHostToLittleInt32(x)
#define htole64(x) OSSwapHostToLittleInt64(x)
#define htobe16(x) OSSwapHostToBigInt16(x)
#define htobe32(x) OSSwapHostToBigInt32(x)
#define htobe64(x) OSSwapHostToBigInt64(x)
#endif /* __MACH__ */

View file

@ -28,91 +28,10 @@
/* Forward declarations */
struct sample;
struct io;
struct io_format;
/** These flags define the format which is used by io_fscan() and io_fprint(). */
enum io_flags {
IO_FORMAT_NANOSECONDS = (1 << 0), /**< Include nanoseconds in output. */
IO_FORMAT_OFFSET = (1 << 1), /**< Include offset / delta between received and send timestamps. */
IO_FORMAT_SEQUENCE = (1 << 2), /**< Include sequence number in output. */
IO_FORMAT_VALUES = (1 << 3), /**< Include values in output. */
IO_FORMAT_ALL = 16-1, /**< Enable all output options. */
IO_FLAG_FLUSH = (1 << 8), /**< Flush the output stream after each chunk of samples. */
};
struct io_format {
int (*init)(struct io *io);
int (*destroy)(struct io *io);
/** @{
* High-level interface
*/
/** Open an IO stream.
*
* @see fopen()
*/
int (*open)(struct io *io, const char *uri, const char *mode);
/** Close an IO stream.
*
* @see fclose()
*/
int (*close)(struct io *io);
/** Check if end-of-file was reached.
*
* @see feof()
*/
int (*eof)(struct io *io);
/** Rewind an IO stream.
*
* @see rewind()
*/
void (*rewind)(struct io *io);
/** Flush buffered data to disk.
*
* @see fflush()
*/
int (*flush)(struct io *io);
int (*print)(struct io *io, struct sample *smps[], size_t cnt);
int (*scan)( struct io *io, struct sample *smps[], size_t cnt);
/** @} */
/** @{
* Low-level interface
*/
/** Parse samples from the buffer \p buf with a length of \p len bytes.
*
* @return The number of bytes consumed of \p buf.
*/
size_t (*sscan)( char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags);
/** Print \p cnt samples from \p smps into buffer \p buf of length \p len.
*
* @return The number of bytes written to \p buf.
*/
size_t (*sprint)(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags);
/** Parse up to \p cnt samples from stream \p f into array \p smps.
*
* @return The number of samples parsed.
*/
int (*fscan)( FILE *f, struct sample *smps[], size_t cnt, int *flags);
/** Print \p cnt samples from \p smps to stream \p f.
*
* @return The number of samples written to \p f.
*/
int (*fprint)(FILE *f, struct sample *smps[], size_t cnt, int flags);
/** @} */
size_t size; /**< Number of bytes to allocate for io::_vd */
IO_FLUSH = (1 << 8) /**< Flush the output stream after each chunk of samples. */
};
struct io {
@ -126,7 +45,7 @@ struct io {
} mode;
/** A format type can use this file handle or overwrite the
* io_format::{open,close,eof,rewind} functions and the private
* format::{open,close,eof,rewind} functions and the private
* data in io::_vd.
*/
union {
@ -140,6 +59,11 @@ struct io {
} advio;
};
struct {
char *input;
char *output;
} buffer;
void *_vd;
struct io_format *_vt;
};
@ -148,7 +72,7 @@ int io_init(struct io *io, struct io_format *fmt, int flags);
int io_destroy(struct io *io);
int io_open(struct io *io, const char *uri, const char *mode);
int io_open(struct io *io, const char *uri);
int io_close(struct io *io);
@ -163,7 +87,7 @@ void io_rewind(struct io *io);
int io_flush(struct io *io);
int io_stream_open(struct io *io, const char *uri, const char *mode);
int io_stream_open(struct io *io, const char *uri);
int io_stream_close(struct io *io);

View file

@ -30,6 +30,6 @@ struct sample;
#define CSV_SEPARATOR '\t'
int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags);
int csv_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags);
int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags);
int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags);

View file

@ -26,10 +26,10 @@
#include "sample.h"
int io_format_json_pack(json_t **j, struct sample *s, int flags);
int json_pack_sample(json_t **j, struct sample *s, int flags);
int io_format_json_unpack(json_t *j, struct sample *s, int *flags);
int json_unpack_sample(json_t *j, struct sample *s, int *flags);
int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags);
int json_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags);
int io_format_json_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags);
int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags);

View file

@ -23,12 +23,12 @@
#pragma once
#include <stdlib.h>
/* Forward declarations. */
struct msg;
struct sample;
/** The maximum length of a packet which contains stuct msg. */
#define MSG_MAX_PACKET_LEN 1500
struct io;
/** Swaps the byte-order of the message.
*
@ -60,9 +60,8 @@ int msg_to_sample(struct msg *msg, struct sample *smp);
/** Copy fields form \p smp into \p msg. */
int msg_from_sample(struct msg *msg, struct sample *smp);
/** Copy / read struct msg's from buffer \p buf to / fram samples \p smps. */
ssize_t msg_buffer_from_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len);
int msg_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags);
/** Read struct sample's from buffer \p buf into samples \p smps. */
int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len);
int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags);

60
include/villas/io/raw.h Normal file
View file

@ -0,0 +1,60 @@
/** RAW IO format
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <stdlib.h>
/* Forward declarations */
struct sample;
enum raw_flags {
RAW_FAKE = (1 << 16), /**< Treat the first three values as: sequenceno, seconds, nanoseconds */
RAW_BE_INT = (1 << 17), /**< Byte-order for integer data: big-endian if set. */
RAW_BE_FLT = (1 << 18), /**< Byte-order for floating point data: big-endian if set. */
RAW_BE_HDR = (1 << 19), /**< Byte-order for fake header fields: big-endian if set. */
/** Byte-order for all fields: big-endian if set. */
RAW_BE = RAW_BE_INT | RAW_BE_FLT | RAW_BE_HDR,
/** Mix floating and integer types.
*
* io_raw_sscan() parses all values as single / double precission fp.
* io_raw_sprint() uses sample::format to determine the type.
*/
RAW_AUTO = (1 << 22),
RAW_FLT = (1 << 23), /**< Data-type: floating point otherwise integer. */
//RAW_1 = (0 << 24), /**< Pack each value as a single bit. */
RAW_8 = (3 << 24), /**< Pack each value as a byte. */
RAW_16 = (4 << 24), /**< Pack each value as a word. */
RAW_32 = (5 << 24), /**< Pack each value as a double word. */
RAW_64 = (6 << 24) /**< Pack each value as a quad word. */
};
/** Copy / read struct msg's from buffer \p buf to / fram samples \p smps. */
int raw_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags);
/** Read struct sample's from buffer \p buf into samples \p smps. */
int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags);

View file

@ -27,10 +27,10 @@
#include "io.h"
int io_format_villas_print(struct io *io, struct sample *smps[], size_t cnt);
int villas_print(struct io *io, struct sample *smps[], unsigned cnt);
int io_format_villas_scan(struct io *io, struct sample *smps[], size_t cnt);
int villas_scan(struct io *io, struct sample *smps[], unsigned cnt);
int io_format_villas_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags);
int villas_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags);
int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags);
int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags);

146
include/villas/io_format.h Normal file
View file

@ -0,0 +1,146 @@
/** Read / write sample data in different formats.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#pragma once
#include <stdio.h>
/* Forward declarations */
struct sample;
struct io;
enum io_format_flags {
IO_FORMAT_NANOSECONDS = (1 << 0), /**< Include nanoseconds in output. */
IO_FORMAT_OFFSET = (1 << 1), /**< Include offset / delta between received and send timestamps. */
IO_FORMAT_SEQUENCE = (1 << 2), /**< Include sequence number in output. */
IO_FORMAT_VALUES = (1 << 3), /**< Include values in output. */
IO_FORMAT_ALL = 15, /**< Enable all output options. */
IO_FORMAT_BINARY = (1 << 8)
};
struct io_format {
int (*init)(struct io *io);
int (*destroy)(struct io *io);
/** @{
* High-level interface
*/
/** Open an IO stream.
*
* @see fopen()
*/
int (*open)(struct io *io, const char *uri);
/** Close an IO stream.
*
* @see fclose()
*/
int (*close)(struct io *io);
/** Check if end-of-file was reached.
*
* @see feof()
*/
int (*eof)(struct io *io);
/** Rewind an IO stream.
*
* @see rewind()
*/
void (*rewind)(struct io *io);
/** Flush buffered data to disk.
*
* @see fflush()
*/
int (*flush)(struct io *io);
int (*print)(struct io *io, struct sample *smps[], unsigned cnt);
int (*scan)( struct io *io, struct sample *smps[], unsigned cnt);
/** @} */
/** @{
* Low-level interface
*/
/** @see io_format_sscan */
int (*sscan)(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags);
/** @see io_format_sprint */
int (*sprint)(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags);
/** @see io_format_fscan */
int (*fscan)(FILE *f, struct sample *smps[], unsigned cnt, int *flags);
/** @see io_format_fprint */
int (*fprint)(FILE *f, struct sample *smps[], unsigned cnt, int flags);
/** @} */
size_t size; /**< Number of bytes to allocate for io::_vd */
int flags; /**< A set of flags which is automatically used. */
};
struct io_format * io_format_lookup(const char *name);
/** Parse samples from the buffer \p buf with a length of \p len bytes.
*
* @param buf[in] The buffer of data which should be parsed / de-serialized.
* @param len[in] The length of the buffer \p buf.
* @param rbytes[out] The number of bytes which have been read from \p buf.
* @param smps[out] The array of pointers to samples.
* @param cnt[in] The number of pointers in the array \p smps.
*
* @retval >=0 The number of samples which have been parsed from \p buf and written into \p smps.
* @retval <0 Something went wrong.
*/
int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags);
/** Print \p cnt samples from \p smps into buffer \p buf of length \p len.
*
* @param buf[out] The buffer which should be filled with serialized data.
* @param len[in] The length of the buffer \p buf.
* @param rbytes[out] The number of bytes which have been written to \p buf. Ignored if NULL.
* @param smps[in] The array of pointers to samples.
* @param cnt[in] The number of pointers in the array \p smps.
*
* @retval >=0 The number of samples from \p smps which have been written into \p buf.
* @retval <0 Something went wrong.
*/
int io_format_sprint(struct io_format *fmt, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags);
/** Parse up to \p cnt samples from stream \p f into array \p smps.
*
* @retval >=0 The number of samples which have been parsed from \p f and written into \p smps.
* @retval <0 Something went wrong.
*/
int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int *flags);
/** Print \p cnt samples from \p smps to stream \p f.
*
* @retval >=0 The number of samples from \p smps which have been written to \p f.
* @retval <0 Something went wrong.
*/
int io_format_fprint(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags);

View file

@ -1,5 +1,6 @@
/** Linux specific real-time optimizations
*
* @see: https://wiki.linuxfoundation.org/realtime/documentation/howto/applications/application_base
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
@ -31,6 +32,8 @@ int rt_set_affinity(int affinity);
int rt_set_priority(int priority);
int rt_lock_memory();
/** Checks for realtime (PREEMPT_RT) patched kernel.
*
* See https://rt.wiki.kernel.org

View file

@ -31,48 +31,41 @@
#pragma once
#include "advio.h"
#include "io.h"
#include "node.h"
#include "periodic_task.h"
#define FILE_MAX_PATHLEN 512
enum {
FILE_READ,
FILE_WRITE
};
struct file {
struct file_direction {
AFILE *handle; /**< libc: stdio file handle. */
struct io io; /**< Format and file IO */
struct io_format *format;
const char *mode; /**< libc: fopen() mode. */
const char *format; /**< Format string for file name. */
char *uri; /**< Real file name. */
} read, write;
char *uri_tmpl; /**< Format string for file name. */
char *uri; /**< Real file name. */
char *mode; /**< File access mode. */
int flush; /**< Flush / upload file contents after each write. */
struct periodic_task timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */
double rate; /**< The read rate. */
enum read_epoch_mode {
enum epoch_mode {
FILE_EPOCH_DIRECT,
FILE_EPOCH_WAIT,
FILE_EPOCH_RELATIVE,
FILE_EPOCH_ABSOLUTE,
FILE_EPOCH_ORIGINAL
} read_epoch_mode; /**< Specifies how file::offset is calculated. */
struct timespec read_first; /**< The first timestamp in the file file::{read,write}::uri */
struct timespec read_epoch; /**< The epoch timestamp from the configuration. */
struct timespec read_offset; /**< An offset between the timestamp in the input file and the current time */
} epoch_mode; /**< Specifies how file::offset is calculated. */
enum {
FILE_EOF_EXIT, /**< Terminate when EOF is reached. */
FILE_EOF_REWIND, /**< Rewind the file when EOF is reached. */
FILE_EOF_WAIT /**< Blocking wait when EOF is reached. */
} read_eof; /**< Should we rewind the file when we reach EOF? */
int read_timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */
double read_rate; /**< The read rate. */
} eof;
struct timespec first; /**< The first timestamp in the file file::{read,write}::uri */
struct timespec epoch; /**< The epoch timestamp from the configuration. */
struct timespec offset; /**< An offset between the timestamp in the input file and the current time */
};
/** @see node_type::print */

View file

@ -34,6 +34,12 @@
#include "node.h"
#include "list.h"
/** The maximum length of a packet which contains stuct msg. */
#define NANOMSG_MAX_PACKET_LEN 1500
/* Forward declarations */
struct io_format;
struct nanomsg {
struct {
int socket;
@ -44,6 +50,8 @@ struct nanomsg {
int socket;
struct list endpoints;
} subscriber;
struct io_format *format;
};
/** @see node_type::print */

View file

@ -39,8 +39,22 @@
#include <linux/if_packet.h>
#endif
#ifdef WITH_LIBNL_ROUTE_30
#include "kernel/if.h"
#include "kernel/nl.h"
#include "kernel/tc.h"
#define WITH_NETEM
#endif /* WITH_LIBNL_ROUTE_30 */
#include "node.h"
/* Forward declarations */
struct io_format;
/** The maximum length of a packet which contains stuct msg. */
#define SOCKET_MAX_PACKET_LEN 1500
enum socket_layer {
SOCKET_LAYER_ETH,
SOCKET_LAYER_IP,
@ -80,6 +94,8 @@ struct socket {
union sockaddr_union local; /**< Local address of the socket */
union sockaddr_union remote; /**< Remote address of the socket */
struct io_format *format;
/* Multicast options */
struct multicast {
int enabled; /**< Is multicast enabled? */
@ -88,8 +104,10 @@ struct socket {
struct ip_mreq mreq; /**< A multicast group to join. */
} multicast;
#ifdef WITH_NETEM
struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */
struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */
#endif /* WITH_NETEM */
};

View file

@ -57,24 +57,31 @@ struct websocket {
/* Internal datastructures */
struct websocket_connection {
struct node *node;
struct lws *wsi;
struct queue queue; /**< For samples which are sent to the WebSocket */
struct {
char name[64];
char ip[64];
} peer;
enum state state; /**< The current status of this connection. */
enum {
WEBSOCKET_MODE_CLIENT,
WEBSOCKET_MODE_SERVER,
} mode;
enum state state;
struct lws *wsi;
struct node *node;
struct io_format *format; /**< The IO format used for this connection. */
struct queue_signalled queue; /**< For samples which are sent to the WebSocket */
char *buf; /**< A buffer which is used to construct the messages. */
union {
/**< Only used in case websocket_connection::mode == WEBSOCKET_MODE_CLIENT */
struct websocket_destination *destination;
/**< Only used in case websocket_connection::mode == WEBSOCKET_MODE_SERVER */
struct {
char name[64];
char ip[64];
} peer;
};
char *buf; /**< A buffer which is used to construct the messages. */
size_t buflen; /**< Length of websocket_connection::buf. */
char *_name;
};

View file

@ -30,19 +30,26 @@
#pragma once
#include <stdint.h>
#include <jansson.h>
#include "node.h"
#include "list.h"
#if ZMQ_VERSION_MAJOR > 4 || (ZMQ_VERSION_MAJOR == 4 && ZMQ_VERSION_MINOR >= 2)
#define ZMQ_BUILD_DISH 1
#endif
/* Forward declarations */
struct io_format;
struct node;
struct sample;
struct zeromq {
int ipv6;
char *filter;
struct io_format *format;
struct {
int enabled;
struct {

View file

@ -23,7 +23,7 @@
#pragma once
#include "io.h"
#include "io_format.h"
#include "hook.h"
#include "api.h"
#include "common.h"
@ -54,7 +54,7 @@ enum plugin_type {
PLUGIN_TYPE_HOOK,
PLUGIN_TYPE_NODE,
PLUGIN_TYPE_API,
PLUGIN_TYPE_FORMAT,
PLUGIN_TYPE_IO,
PLUGIN_TYPE_FPGA_IP,
PLUGIN_TYPE_MODEL_CBUILDER
};

View file

@ -42,20 +42,22 @@ struct pool;
#define SAMPLE_LEN(len) (sizeof(struct sample) + SAMPLE_DATA_LEN(len))
/** The length of a sample data portion of a sample datastructure with \p values values in bytes. */
#define SAMPLE_DATA_LEN(len) ((len) * sizeof(float))
#define SAMPLE_DATA_LEN(len) ((len) * sizeof(double))
/** The offset to the beginning of the data section. */
#define SAMPLE_DATA_OFFSET(smp) ((char *) (smp) + offsetof(struct sample, data))
enum sample_data_format {
SAMPLE_DATA_FORMAT_FLOAT= 0,
SAMPLE_DATA_FORMAT_INT = 1
SAMPLE_DATA_FORMAT_FLOAT = 0,
SAMPLE_DATA_FORMAT_INT = 1
};
struct sample {
int sequence; /**< The sequence number of this sample. */
int length; /**< The number of values in sample::values which are valid. */
int capacity; /**< The number of values in sample::values for which memory is reserved. */
int id;
atomic_int refcnt; /**< Reference counter. */
off_t pool_off; /**< This sample belongs to this memory pool (relative pointer). */
@ -68,12 +70,16 @@ struct sample {
struct timespec sent; /**< The point in time when this data was send for the last time. */
} ts;
uint64_t format; /**< A long bitfield indicating the number representation of the first 64 values in sample::data[] */
/** A long bitfield indicating the number representation of the first 64 values in sample::data[].
*
* @see sample_data_format
*/
uint64_t format;
/** The values. */
union {
float f; /**< Floating point values. */
uint32_t i; /**< Integer values. */
double f; /**< Floating point values. */
uint64_t i; /**< Integer values. */
} data[]; /**< Data is in host endianess! */
};

View file

@ -28,7 +28,7 @@ LIB_CFLAGS = $(CFLAGS) -fPIC
-include lib/hooks/Makefile.inc
-include lib/nodes/Makefile.inc
-include lib/formats/Makefile.inc
-include lib/io/Makefile.inc
-include $(patsubst %, lib/Makefile.%.inc, $(SONAMES))

View file

@ -33,7 +33,7 @@ LIB_SRCS += $(addprefix lib/kernel/, kernel.c rt.c) \
utils.c super_node.c hist.c timing.c pool.c list.c queue.c \
queue_signalled.c memory.c advio.c plugin.c node_type.c stats.c \
mapping.c io.c shmem.c config_helper.c crypt.c compat.c \
log_table.c log_helper.c \
log_table.c log_helper.c io_format.c periodic_task.c \
)
LIB_LDFLAGS = -shared

View file

@ -247,14 +247,19 @@ int afclose(AFILE *af)
int ret;
ret = afflush(af);
if (ret)
return ret;
curl_easy_cleanup(af->curl);
fclose(af->file);
ret = fclose(af->file);
if (ret)
return ret;
free(af->uri);
free(af);
return ret;
return 0;
}
int afseek(AFILE *af, long offset, int origin)

View file

@ -1,90 +0,0 @@
#include "hdf5_hl.h"
#include <stdlib.h>
/*-------------------------------------------------------------------------
* Packet Table Fixed-Length Example
*
* Example program that creates a packet table and performs
* writes and reads.
*
*-------------------------------------------------------------------------
*/
int main(void)
{
hid_t fid; /* File identifier */
hid_t ptable; /* Packet table identifier */
herr_t err; /* Function return status */
hsize_t count; /* Number of records in the table */
int x; /* Loop variable */
/* Buffers to hold data */
int writeBuffer[5];
int readBuffer[5];
/* Initialize buffers */
for(x=0; x<5; x++)
{
writeBuffer[x]=x;
readBuffer[x] = -1;
}
/* Create a file using default properties */
fid=H5Fcreate("packet_table_FLexample.h5",H5F_ACC_TRUNC,H5P_DEFAULT,H5P_DEFAULT);
/* Create a fixed-length packet table within the file */
/* This table's "packets" will be simple integers. */
ptable = H5PTcreate_fl(fid, "Packet Test Dataset", H5T_NATIVE_INT, 1, 1);
if(ptable == H5I_INVALID_HID)
goto out;
/* Write one packet to the packet table */
err = H5PTappend(ptable, 1, &(writeBuffer[0]) );
if(err < 0)
goto out;
/* Write several packets to the packet table */
err = H5PTappend(ptable, 4, &(writeBuffer[1]) );
if(err < 0)
goto out;
/* Get the number of packets in the packet table. This should be five. */
err = H5PTget_num_packets(ptable, &count);
if(err < 0)
goto out;
printf("Number of packets in packet table after five appends: %llu\n", count);
/* Initialize packet table's "current record" */
err = H5PTcreate_index(ptable);
if(err < 0)
goto out;
/* Iterate through packets, read each one back */
for(x=0; x<5; x++)
{
err = H5PTget_next(ptable, 1, &(readBuffer[x]) );
if(err < 0)
goto out;
printf("Packet %d's value is %d\n", x, readBuffer[x]);
}
/* Close the packet table */
err = H5PTclose(ptable);
if(err < 0)
goto out;
/* Close the file */
H5Fclose(fid);
return 0;
out: /* An error has occurred. Clean up and exit. */
H5PTclose(ptable);
H5Fclose(fid);
return -1;
}

View file

@ -24,7 +24,7 @@
#include "timing.h"
#include "config.h"
#include "formats/msg.h"
#include "io/msg.h"
#include "hook.h"
#include "path.h"
#include "utils.h"

118
lib/io.c
View file

@ -24,15 +24,17 @@
#include <stdio.h>
#include "io.h"
#include "io_format.h"
#include "utils.h"
#include "sample.h"
#include "plugin.h"
int io_init(struct io *io, struct io_format *fmt, int flags)
{
io->_vt = fmt;
io->_vd = alloc(fmt->size);
io->flags = flags;
io->flags = flags | io->_vt->flags;
return io->_vt->init ? io->_vt->init(io) : 0;
}
@ -50,41 +52,43 @@ int io_destroy(struct io *io)
return 0;
}
int io_stream_open(struct io *io, const char *uri, const char *mode)
int io_stream_open(struct io *io, const char *uri)
{
int ret;
if (uri) {
if (aislocal(uri)) {
io->mode = IO_MODE_STDIO;
io->stdio.input =
io->stdio.output = fopen(uri, mode);
io->stdio.output = fopen(uri, "a+");
if (io->stdio.output == NULL)
return -1;
ret = setvbuf(io->stdio.output, NULL, _IOLBF, BUFSIZ);
if (ret)
io->stdio.input = fopen(uri, "r");
if (io->stdio.input == NULL)
return -1;
}
else {
io->mode = IO_MODE_ADVIO;
io->advio.input =
io->advio.output = afopen(uri, mode);
io->advio.output = afopen(uri, "a+");
if (io->advio.output == NULL)
return -1;
io->advio.input = afopen(uri, "r");
if (io->advio.input == NULL)
return -1;
}
}
else {
io->mode = IO_MODE_STDIO;
io->flags |= IO_FLAG_FLUSH;
io->flags |= IO_FLUSH;
io->stdio.input = stdin;
io->stdio.output = stdout;
}
if (io->mode == IO_MODE_STDIO) {
ret = setvbuf(io->stdio.input, NULL, _IOLBF, BUFSIZ);
if (ret)
return -1;
@ -99,11 +103,34 @@ int io_stream_open(struct io *io, const char *uri, const char *mode)
int io_stream_close(struct io *io)
{
int ret;
switch (io->mode) {
case IO_MODE_ADVIO:
return afclose(io->advio.input);
ret = afclose(io->advio.input);
if (ret)
return ret;
ret = afclose(io->advio.output);
if (ret)
return ret;
return 0;
case IO_MODE_STDIO:
return io->stdio.input != stdin ? fclose(io->stdio.input) : 0;
if (io->stdio.input == stdin)
return 0;
ret = fclose(io->stdio.input);
if (ret)
return ret;
ret = fclose(io->stdio.output);
if (ret)
return ret;
return 0;
case IO_MODE_CUSTOM:
return 0;
}
@ -150,11 +177,11 @@ void io_stream_rewind(struct io *io)
}
}
int io_open(struct io *io, const char *uri, const char *mode)
int io_open(struct io *io, const char *uri)
{
return io->_vt->open
? io->_vt->open(io, uri, mode)
: io_stream_open(io, uri, mode);
? io->_vt->open(io, uri)
: io_stream_open(io, uri);
}
int io_close(struct io *io)
@ -195,11 +222,26 @@ int io_print(struct io *io, struct sample *smps[], size_t cnt)
FILE *f = io->mode == IO_MODE_ADVIO
? io->advio.output->file
: io->stdio.output;
//flockfile(f);
ret = io->_vt->fprint(f, smps, cnt, io->flags);
if (io->_vt->fprint)
ret = io->_vt->fprint(f, smps, cnt, io->flags);
else if (io->_vt->sprint) {
char buf[4096];
size_t wbytes;
ret = io->_vt->sprint(buf, sizeof(buf), &wbytes, smps, cnt, io->flags);
fwrite(buf, wbytes, 1, f);
}
else
ret = -1;
//funlockfile(f);
}
if (io->flags & IO_FLAG_FLUSH)
if (io->flags & IO_FLUSH)
io_flush(io);
return ret;
@ -207,13 +249,45 @@ int io_print(struct io *io, struct sample *smps[], size_t cnt)
int io_scan(struct io *io, struct sample *smps[], size_t cnt)
{
int ret;
if (io->_vt->scan)
return io->_vt->scan(io, smps, cnt);
ret = io->_vt->scan(io, smps, cnt);
else {
FILE *f = io->mode == IO_MODE_ADVIO
? io->advio.input->file
: io->stdio.input;
//flockfile(f);
int flags = io->flags;
return io->_vt->fscan(f, smps, cnt, NULL);
if (io->_vt->fscan)
return io->_vt->fscan(f, smps, cnt, &flags);
else if (io->_vt->sscan) {
size_t bytes, rbytes;
char buf[4096];
bytes = fread(buf, 1, sizeof(buf), f);
ret = io->_vt->sscan(buf, bytes, &rbytes, smps, cnt, &flags);
}
else
ret = -1;
//funlockfile(f);
}
return ret;
}
struct io_format * io_format_lookup(const char *name)
{
struct plugin *p;
p = plugin_lookup(PLUGIN_TYPE_IO, name);
if (!p)
return NULL;
return &p->io;
}

View file

@ -20,7 +20,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
###################################################################################
LIB_SRCS += $(addprefix lib/formats/,json.c villas.c csv.c)
LIB_SRCS += $(addprefix lib/io/,json.c villas.c csv.c raw.c msg.c)
WITH_HDF5 ?= 0

View file

@ -21,13 +21,14 @@
*********************************************************************************/
#include <ctype.h>
#include <inttypes.h>
#include "formats/csv.h"
#include "io/csv.h"
#include "plugin.h"
#include "sample.h"
#include "timing.h"
int io_format_csv_fprint_single(FILE *f, struct sample *s, int flags)
int csv_fprint_single(FILE *f, struct sample *s, int flags)
{
fprintf(f, "%ld %09ld %d", s->ts.origin.tv_sec, s->ts.origin.tv_nsec, s->sequence);
@ -37,7 +38,7 @@ int io_format_csv_fprint_single(FILE *f, struct sample *s, int flags)
fprintf(f, "%c%.6f", CSV_SEPARATOR, s->data[i].f);
break;
case SAMPLE_DATA_FORMAT_INT:
fprintf(f, "%c%d", CSV_SEPARATOR, s->data[i].i);
fprintf(f, "%c%" PRId64, CSV_SEPARATOR, s->data[i].i);
break;
}
}
@ -47,7 +48,7 @@ int io_format_csv_fprint_single(FILE *f, struct sample *s, int flags)
return 0;
}
size_t io_format_csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flags)
size_t csv_sscan_single(const char *buf, size_t len, struct sample *s, int *flags)
{
int ret, off;
@ -59,10 +60,10 @@ size_t io_format_csv_sscan_single(const char *buf, size_t len, struct sample *s,
for (i = 0; i < s->capacity; i++) {
switch (s->format & (1 << i)) {
case SAMPLE_DATA_FORMAT_FLOAT:
ret = sscanf(buf + off, "%f %n", &s->data[i].f, &off);
ret = sscanf(buf + off, "%lf %n", &s->data[i].f, &off);
break;
case SAMPLE_DATA_FORMAT_INT:
ret = sscanf(buf + off, "%d %n", &s->data[i].i, &off);
ret = sscanf(buf + off, "%" PRId64 " %n", &s->data[i].i, &off);
break;
}
@ -76,7 +77,7 @@ size_t io_format_csv_sscan_single(const char *buf, size_t len, struct sample *s,
return 0;
}
int io_format_csv_fscan_single(FILE *f, struct sample *s, int *flags)
int csv_fscan_single(FILE *f, struct sample *s, int *flags)
{
char *ptr, line[4096];
@ -88,14 +89,14 @@ skip: if (fgets(line, sizeof(line), f) == NULL)
if (*ptr == '\0' || *ptr == '#')
goto skip;
return io_format_csv_sscan_single(line, strlen(line), s, flags);
return csv_sscan_single(line, strlen(line), s, flags);
}
int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags)
int csv_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = io_format_csv_fprint_single(f, smps[i], flags);
ret = csv_fprint_single(f, smps[i], flags);
if (ret < 0)
break;
}
@ -103,11 +104,11 @@ int io_format_csv_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags)
return i;
}
int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags)
int csv_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = io_format_csv_fscan_single(f, smps[i], flags);
ret = csv_fscan_single(f, smps[i], flags);
if (ret < 0) {
warn("Failed to read CSV line: %d", ret);
break;
@ -120,10 +121,10 @@ int io_format_csv_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags)
static struct plugin p = {
.name = "csv",
.description = "Tabulator-separated values",
.type = PLUGIN_TYPE_FORMAT,
.type = PLUGIN_TYPE_IO,
.io = {
.fprint = io_format_csv_fprint,
.fscan = io_format_csv_fscan,
.fprint = csv_fprint,
.fscan = csv_fscan,
.size = 0
}
};

170
lib/io/h5pt.c Normal file
View file

@ -0,0 +1,170 @@
/** HDF5 Packet Table IO format based-on the H5PT high-level API.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <stdlib.h>
#include <hdf5_hl.h>
#include "plugin.h"
#include "io.h"
enum h5pt_flags {
FORMAT_H5PT_COMPRESS = (1 << 16)
};
struct h5pt {
hsize_t index;
hsize_t num_packets;
hid_t fid; /**< File identifier. */
hid_t ptable; /**< Packet table identifier. */
}
hid_t h5pt_create_datatype(int values)
{
hid_t dt = H5Tcreate(H5T_COMPOUND, sizeof());
}
int h5pt_open(struct io *io, const char *uri, const char *mode)
{
struct h5pt *h5 = io->_vd;
herr_t err;
/* Create a file using default properties */
h5->fid = H5Fcreate(uri, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
/* Create a fixed-length packet table within the file */
/* This table's "packets" will be simple integers. */
h5->ptable = H5PTcreate_fl(fid, "Packet Test Dataset", H5T_NATIVE_INT, 1, 1);
if (h5->ptable == H5I_INVALID_HID)
return -1;
err = H5PTis_valid(h5->ptable);
if (err < 0)
return err;
err = H5PTget_num_packets(h5->ptable, h5->num_packets);
if (err < 0)
return err;
io_format_h5pt_rewind(io);
return 0;
}
int h5pt_close(struct io *io)
{
struct h5pt *h5 = io->_vd;
herr_t err;
/* Close dataset */
err = H5PTclose(h5->ptable);
if (err < 0)
goto out;
/* Close file */
out: err = H5Fclose(h5->fid);
if (err < 0)
return err;
return 0;
}
int h5pt_rewind(struct io *io)
{
struct h5pt *h5 = io->_vd;
herr_t err;
err = H5PTcreate_index(h5->ptable);
if (err < 0)
return err;
h5->index = 0;
return 0;
}
int h5pt_eof(struct io *io)
{
struct h5pt *h5 = io->_vd;
return h5->index >= h5->num_packets;
}
int h5pt_flush(struct io *io)
{
struct h5pt *h5 = io->_vd;
herr_t err;
err = H5Fflush(h5->fid, H5F_SCOPE_GLOBAL);
if (err < 0)
return err;
return 0;
}
int h5pt_print(struct io *io, struct sample *smps[], size_t cnt)
{
struct h5pt *h5 = io->_vd;
herr_t err;
/* Write one packet to the packet table */
err = H5PTappend(h5->ptable, smps, cnt);
if (err < 0)
goto out;
return 0;
}
int h5pt_scan(struct io *io, struct sample *smps[], size_t cnt)
{
struct h5pt *h5 = io->_vd;
herr_t err;
H5PTget_next(h5->ptable, cnt, smps)
return 0;
}
static struct plugin p = {
.name = "hdf5",
.description = "HDF5 Packet Table",
.type = PLUGIN_TYPE_FORMAT,
.format = {
.open = h5pt_open,
.close = h5pt_close,
.rewind = h5pt_rewind,
.eof = h5pt_eof,
.flush = h5pt_flush,
.print = h5pt_print,
.scan = h5pt_scan,
.flags = IO_FORMAT_BINARY,
.size = sizeof(struct h5pt)
}
};

View file

@ -21,9 +21,9 @@
*********************************************************************************/
#include "plugin.h"
#include "formats/json.h"
#include "io/json.h"
int io_format_json_pack(json_t **j, struct sample *s, int flags)
int json_pack_sample(json_t **j, struct sample *s, int flags)
{
json_error_t err;
json_t *json_data = json_array();
@ -50,7 +50,7 @@ int io_format_json_pack(json_t **j, struct sample *s, int flags)
return 0;
}
int io_format_json_unpack(json_t *j, struct sample *s, int *flags)
int json_unpack_sample(json_t *j, struct sample *s, int *flags)
{
int ret, i;
json_t *json_data, *json_value;
@ -90,14 +90,14 @@ int io_format_json_unpack(json_t *j, struct sample *s, int *flags)
return 0;
}
size_t io_format_json_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags)
int json_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags)
{
int i, ret;
json_t *json;
size_t wr, off = 0;
for (i = 0; i < cnt; i++) {
ret = io_format_json_pack(&json, smps[i], flags);
ret = json_pack_sample(&json, smps[i], flags);
if (ret)
return ret;
@ -114,7 +114,7 @@ size_t io_format_json_sprint(char *buf, size_t len, struct sample *smps[], size_
return i;
}
size_t io_format_json_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags)
int json_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags)
{
int i, ret;
json_t *json;
@ -128,7 +128,7 @@ size_t io_format_json_sscan(char *buf, size_t len, struct sample *smps[], size_t
off += err.position;
ret = io_format_json_unpack(json, smps[i], flags);
ret = json_unpack_sample(json, smps[i], flags);
json_decref(json);
@ -139,13 +139,13 @@ size_t io_format_json_sscan(char *buf, size_t len, struct sample *smps[], size_t
return i;
}
int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags)
int json_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
int ret, i;
json_t *json;
for (i = 0; i < cnt; i++) {
ret = io_format_json_pack(&json, smps[i], flags);
ret = json_pack_sample(&json, smps[i], flags);
if (ret)
return ret;
@ -158,7 +158,7 @@ int io_format_json_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags)
return i;
}
int io_format_json_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags)
int json_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags)
{
int i, ret;
json_t *json;
@ -169,7 +169,7 @@ skip: json = json_loadf(f, JSON_DISABLE_EOF_CHECK, &err);
if (!json)
break;
ret = io_format_json_unpack(json, smps[i], flags);
ret = json_unpack_sample(json, smps[i], flags);
if (ret)
goto skip;
@ -182,12 +182,12 @@ skip: json = json_loadf(f, JSON_DISABLE_EOF_CHECK, &err);
static struct plugin p = {
.name = "json",
.description = "Javascript Object Notation",
.type = PLUGIN_TYPE_FORMAT,
.type = PLUGIN_TYPE_IO,
.io = {
.fscan = io_format_json_fscan,
.fprint = io_format_json_fprint,
.sscan = io_format_json_sscan,
.sprint = io_format_json_sprint,
.fscan = json_fscan,
.fprint = json_fprint,
.sscan = json_sscan,
.sprint = json_sprint,
.size = 0
},
};

View file

@ -20,11 +20,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <arpa/inet.h>
#include <string.h>
#include "formats/msg.h"
#include "formats/msg_format.h"
#include "io/msg.h"
#include "io/msg_format.h"
#include "sample.h"
#include "utils.h"
#include "plugin.h"
@ -85,11 +84,18 @@ int msg_to_sample(struct msg *msg, struct sample *smp)
smp->length = MIN(msg->length, smp->capacity);
smp->sequence = msg->sequence;
smp->id = msg->id;
smp->ts.origin = MSG_TS(msg);
smp->ts.received.tv_sec = -1;
smp->ts.received.tv_nsec = -1;
smp->format = 0;
memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(smp->length));
for (int i = 0; i < smp->length; i++) {
switch (sample_get_data_format(smp, i)) {
case SAMPLE_DATA_FORMAT_FLOAT: smp->data[i].f = msg->data[i].f; break;
case SAMPLE_DATA_FORMAT_INT: smp->data[i].i = msg->data[i].i; break;
}
}
return 0;
}
@ -100,54 +106,71 @@ int msg_from_sample(struct msg *msg, struct sample *smp)
msg->ts.sec = smp->ts.origin.tv_sec;
msg->ts.nsec = smp->ts.origin.tv_nsec;
msg->id = smp->id;
memcpy(msg->data, smp->data, MSG_DATA_LEN(smp->length));
for (int i = 0; i < smp->length; i++) {
switch (sample_get_data_format(smp, i)) {
case SAMPLE_DATA_FORMAT_FLOAT: msg->data[i].f = smp->data[i].f; break;
case SAMPLE_DATA_FORMAT_INT: msg->data[i].i = smp->data[i].i; break;
}
}
msg_hton(msg);
return 0;
}
ssize_t msg_buffer_from_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len)
int msg_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags)
{
int ret, i = 0;
char *ptr = buf;
struct msg *msg = (struct msg *) ptr;
struct sample *smp = smps[i];
for (i = 0; i < cnt; i++) {
if (ptr + MSG_LEN(smps[i]->length) > buf + len)
break;
while (ptr < buf + len && i < cnt) {
ret = msg_from_sample(msg, smp);
ret = msg_from_sample((struct msg *) ptr, smps[i]);
if (ret)
return ret;
ptr += MSG_LEN(smp->length);
msg = (struct msg *) ptr;
smp = smps[++i];
ptr += MSG_LEN(smps[i]->length);
}
if (wbytes)
*wbytes = ptr - buf;
return ptr - buf;
return i;
}
int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len)
int msg_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags)
{
int ret, i = 0;
char *ptr = buf;
struct msg *msg = (struct msg *) ptr;
struct sample *smp = smps[i];
while (ptr < buf + len && i < cnt) {
ret = msg_to_sample(msg, smp);
for (i = 0; i < cnt; i++) {
struct msg *msg = (struct msg *) ptr;
/* Check if length field is still in buffer bounaries */
if ((char *) &msg->length + sizeof(msg->length) > buf + len) {
warn("Invalid msg received: reason=1");
break;
}
/* Check if remainder of message is in buffer boundaries */
if (ptr + MSG_LEN(ntohs(msg->length)) > buf + len) {
warn("Invalid msg received: reason=2, msglen=%zu, len=%zu, ptr=%p, buf+%p, i=%u", MSG_LEN(ntohs(msg->length)), len, ptr, buf, i);
break;
}
ret = msg_to_sample((struct msg *) ptr, smps[i]);
if (ret)
return ret;
ptr += MSG_LEN(smp->length);
msg = (struct msg *) ptr;
smp = smps[++i];
ptr += MSG_LEN(smps[i]->length);
}
if (rbytes)
*rbytes = ptr - buf;
return i;
}
@ -155,9 +178,12 @@ int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t
static struct plugin p = {
.name = "msg",
.description = "VILLAS binary network format",
.type = PLUGIN_TYPE_FORMAT,
.type = PLUGIN_TYPE_IO,
.io = {
.size = 0
.sprint = msg_sprint,
.sscan = msg_sscan,
.size = 0,
.flags = IO_FORMAT_BINARY
},
};

241
lib/io/raw.c Normal file
View file

@ -0,0 +1,241 @@
/** RAW IO format
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include "sample.h"
#include "plugin.h"
#include "utils.h"
#include "compat.h"
#include "io/raw.h"
/** Convert float to host byte order */
#define SWAP_FLT_TOH(o, n) ({ \
union { float f; uint32_t i; } x = { .f = n }; \
x.i = (o) ? be32toh(x.i) : le32toh(x.i); x.f; \
})
/** Convert double to host byte order */
#define SWAP_DBL_TOH(o, n) ({ \
union { float f; uint64_t i; } x = { .f = n }; \
x.i = (o) ? be64toh(x.i) : le64toh(x.i); x.f; \
})
/** Convert float to big/little endian byte order */
#define SWAP_FLT_TOE(o, n) ({ \
union { float f; uint32_t i; } x = { .f = n }; \
x.i = (o) ? htobe32(x.i) : htole32(x.i); x.f; \
})
/** Convert double to big/little endian byte order */
#define SWAP_DBL_TOE(o, n) ({ \
union { double f; uint64_t i; } x = { .f = n }; \
x.i = (o) ? htobe64(x.i) : htole64(x.i); x.f; \
})
/** Convert integer of varying width to host byte order */
#define SWAP_INT_TOH(o, b, n) (o ? be ## b ## toh(n) : le ## b ## toh(n))
/** Convert integer of varying width to big/little endian byte order */
#define SWAP_INT_TOE(o, b, n) (o ? htobe ## b (n) : htole ## b (n))
int raw_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags)
{
int i, o = 0;
size_t nlen;
int8_t *i8 = (void *) buf;
int16_t *i16 = (void *) buf;
int32_t *i32 = (void *) buf;
int64_t *i64 = (void *) buf;
float *f32 = (void *) buf;
double *f64 = (void *) buf;
int bits = 1 << (flags >> 24);
for (i = 0; i < cnt; i++) {
nlen = (smps[i]->length + o + (flags & RAW_FAKE) ? 3 : 0) * (bits / 8);
if (nlen >= len)
break;
/* First three values are sequence, seconds and nano-seconds timestamps */
if (flags & RAW_FAKE) {
switch (bits) {
case 32:
i32[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 32, smps[i]->sequence);
i32[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 32, smps[i]->ts.origin.tv_sec);
i32[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 32, smps[i]->ts.origin.tv_nsec);
break;
case 64:
i64[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 64, smps[i]->sequence);
i64[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 64, smps[i]->ts.origin.tv_sec);
i64[o++] = SWAP_INT_TOE(flags & RAW_BE_HDR, 64, smps[i]->ts.origin.tv_nsec);
break;
}
}
for (int j = 0; j < smps[i]->length; j++) {
enum { INT, FLT } fmt;
if (flags & RAW_AUTO)
fmt = smps[i]->format & (1 << i) ? INT : FLT;
else if (flags & RAW_FLT)
fmt = FLT;
else
fmt = INT;
switch (fmt) {
case FLT:
switch (bits) {
case 32: f32[o++] = SWAP_FLT_TOE(flags & RAW_BE_FLT, smps[i]->data[j].f); break;
case 64: f64[o++] = SWAP_DBL_TOE(flags & RAW_BE_FLT, smps[i]->data[j].f); break;
}
break;
case INT:
switch (bits) {
case 8: i8 [o++] = smps[i]->data[j].i; break;
case 16: i16[o++] = SWAP_INT_TOE(flags & RAW_BE_INT, 16, smps[i]->data[j].i); break;
case 32: i32[o++] = SWAP_INT_TOE(flags & RAW_BE_INT, 32, smps[i]->data[j].i); break;
case 64: i64[o++] = SWAP_INT_TOE(flags & RAW_BE_INT, 64, smps[i]->data[j].i); break;
}
break;
}
}
}
if (wbytes)
*wbytes = o * (bits / 8);
return i;
}
int raw_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags)
{
/* The raw format can not encode multiple samples in one buffer
* as there is no support for framing. */
struct sample *smp = smps[0];
int8_t *i8 = (void *) buf;
int16_t *i16 = (void *) buf;
int32_t *i32 = (void *) buf;
int64_t *i64 = (void *) buf;
float *f32 = (void *) buf;
double *f64 = (void *) buf;
int off, bits = 1 << (*flags >> 24);
smp->length = len / (bits / 8);
if (*flags & RAW_FAKE) {
off = 3;
if (smp->length < off) {
// warn("Node %s received a packet with no fake header. Skipping...", node_name(n));
return 0;
}
smp->length -= off;
switch (bits) {
case 32:
smp->sequence = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[0]);
smp->ts.origin.tv_sec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[1]);
smp->ts.origin.tv_nsec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 32, i32[2]);
break;
case 64:
smp->sequence = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[0]);
smp->ts.origin.tv_sec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[1]);
smp->ts.origin.tv_nsec = SWAP_INT_TOH(*flags & RAW_BE_HDR, 64, i64[2]);
break;
}
}
else {
off = 0;
smp->sequence = 0;
smp->ts.origin.tv_sec = 0;
smp->ts.origin.tv_nsec = 0;
}
if (smp->length > smp->capacity) {
warn("Received more values than supported: length=%u, capacity=%u", smp->length, smp->capacity);
smp->length = smp->capacity;
}
for (int i = 0; i < smp->length; i++) {
int fmt = *flags & RAW_FLT ? SAMPLE_DATA_FORMAT_FLOAT
: SAMPLE_DATA_FORMAT_INT;
sample_set_data_format(smp, i, fmt);
switch (fmt) {
case SAMPLE_DATA_FORMAT_FLOAT:
switch (bits) {
case 32: smp->data[i].f = SWAP_FLT_TOH(*flags & RAW_BE_FLT, f32[i+off]); break;
case 64: smp->data[i].f = SWAP_DBL_TOH(*flags & RAW_BE_FLT, f64[i+off]); break;
}
break;
case SAMPLE_DATA_FORMAT_INT:
switch (bits) {
case 8: smp->data[i].i = i8[i]; break;
case 16: smp->data[i].i = (int16_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 16, i16[i+off]); break;
case 32: smp->data[i].i = (int32_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 32, i32[i+off]); break;
case 64: smp->data[i].i = (int64_t) SWAP_INT_TOH(*flags & RAW_BE_INT, 64, i64[i+off]); break;
}
break;
}
}
smp->ts.received.tv_sec = 0;
smp->ts.received.tv_nsec = 0;
return 1;
}
#define REGISTER_FORMAT_RAW(i, n, f) \
static struct plugin i = { \
.name = n, \
.description = "RAW binary data", \
.type = PLUGIN_TYPE_IO, \
.io = { \
.flags = f | IO_FORMAT_BINARY, \
.sprint = raw_sprint, \
.sscan = raw_sscan \
} \
}; \
REGISTER_PLUGIN(& i);
/* Feel free to add additional format identifiers here to suit your needs */
REGISTER_FORMAT_RAW(p, "raw", 0)
REGISTER_FORMAT_RAW(p_f32, "raw-flt32", RAW_32 | RAW_FLT)
REGISTER_FORMAT_RAW(p_f64, "raw-flt64", RAW_64 | RAW_FLT)
REGISTER_FORMAT_RAW(p_i8, "raw-int8", RAW_8)
REGISTER_FORMAT_RAW(p_i16be, "raw-int16-be", RAW_16 | RAW_BE)
REGISTER_FORMAT_RAW(p_i16le, "raw-int16-le", RAW_16)
REGISTER_FORMAT_RAW(p_i32be, "raw-int32-be", RAW_32 | RAW_BE)
REGISTER_FORMAT_RAW(p_i32le, "raw-int32-le", RAW_32)
REGISTER_FORMAT_RAW(p_i64be, "raw-int64-be", RAW_64 | RAW_BE)
REGISTER_FORMAT_RAW(p_i64le, "raw-int64-le", RAW_64)
REGISTER_FORMAT_RAW(p_gtnet, "gtnet", RAW_32 | RAW_FLT | RAW_BE)
REGISTER_FORMAT_RAW(p_gtnef, "gtnet-fake", RAW_32 | RAW_FLT | RAW_BE | RAW_FAKE)

View file

@ -21,6 +21,7 @@
*********************************************************************************/
#include <ctype.h>
#include <inttypes.h>
#include <string.h>
#include "io.h"
@ -28,9 +29,9 @@
#include "utils.h"
#include "timing.h"
#include "sample.h"
#include "formats/villas.h"
#include "io/villas.h"
size_t io_format_villas_sprint_single(char *buf, size_t len, struct sample *s, int flags)
size_t villas_sprint_single(char *buf, size_t len, struct sample *s, int flags)
{
size_t off = snprintf(buf, len, "%llu", (unsigned long long) s->ts.origin.tv_sec);
@ -47,10 +48,10 @@ size_t io_format_villas_sprint_single(char *buf, size_t len, struct sample *s, i
for (int i = 0; i < s->length; i++) {
switch ((s->format >> i) & 0x1) {
case SAMPLE_DATA_FORMAT_FLOAT:
off += snprintf(buf + off, len - off, "\t%.6f", s->data[i].f);
off += snprintf(buf + off, len - off, "\t%.6lf", s->data[i].f);
break;
case SAMPLE_DATA_FORMAT_INT:
off += snprintf(buf + off, len - off, "\t%d", s->data[i].i);
off += snprintf(buf + off, len - off, "\t%" PRIi64, s->data[i].i);
break;
}
}
@ -61,7 +62,7 @@ size_t io_format_villas_sprint_single(char *buf, size_t len, struct sample *s, i
return off;
}
size_t io_format_villas_sscan_single(const char *buf, size_t len, struct sample *s, int *flags)
size_t villas_sscan_single(const char *buf, size_t len, struct sample *s, int *flags)
{
char *end;
const char *ptr = buf;
@ -151,27 +152,35 @@ size_t io_format_villas_sscan_single(const char *buf, size_t len, struct sample
return end - buf;
}
size_t io_format_villas_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags)
int villas_sprint(char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags)
{
int i;
size_t off = 0;
for (int i = 0; i < cnt && off < len; i++)
off += io_format_villas_sprint_single(buf + off, len - off, smps[i], flags);
for (i = 0; i < cnt && off < len; i++)
off += villas_sprint_single(buf + off, len - off, smps[i], flags);
if (wbytes)
*wbytes = off;
return off;
return i;
}
size_t io_format_villas_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags)
int villas_sscan(char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags)
{
int i;
size_t off = 0;
for (int i = 0; i < cnt && off < len; i++)
off += io_format_villas_sscan_single(buf + off, len - off, smps[i], flags);
for (i = 0; i < cnt && off < len; i++)
off += villas_sscan_single(buf + off, len - off, smps[i], flags);
if (rbytes)
*rbytes = off;
return off;
return i;
}
int io_format_villas_fscan_single(FILE *f, struct sample *s, int *flags)
int villas_fscan_single(FILE *f, struct sample *s, int *flags)
{
char *ptr, line[4096];
@ -183,31 +192,29 @@ skip: if (fgets(line, sizeof(line), f) == NULL)
if (*ptr == '\0' || *ptr == '#')
goto skip;
return io_format_villas_sscan_single(line, strlen(line), s, flags);
return villas_sscan_single(line, strlen(line), s, flags);
}
int io_format_villas_fprint(FILE *f, struct sample *smps[], size_t cnt, int flags)
int villas_fprint_single(FILE *f, struct sample *s, int flags)
{
int ret;
char line[4096];
int ret, i;
for (i = 0; i < cnt; i++) {
ret = io_format_villas_sprint_single(line, sizeof(line), smps[i], flags);
if (ret < 0)
break;
ret = villas_sprint_single(line, sizeof(line), s, flags);
if (ret < 0)
return ret;
fputs(line, f);
fputs(line, f);
}
return i;
return 0;
}
int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flags)
int villas_fprint(FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = io_format_villas_fscan_single(f, smps[i], flags);
ret = villas_fprint_single(f, smps[i], flags);
if (ret < 0)
return ret;
}
@ -215,11 +222,24 @@ int io_format_villas_fscan(FILE *f, struct sample *smps[], size_t cnt, int *flag
return i;
}
int io_format_villas_open(struct io *io, const char *uri, const char *mode)
int villas_fscan(FILE *f, struct sample *smps[], unsigned cnt, int *flags)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = villas_fscan_single(f, smps[i], flags);
if (ret < 0)
return ret;
}
return i;
}
int villas_open(struct io *io, const char *uri)
{
int ret;
ret = io_stream_open(io, uri, mode);
ret = io_stream_open(io, uri);
if (ret)
return ret;
@ -229,7 +249,7 @@ int io_format_villas_open(struct io *io, const char *uri, const char *mode)
fprintf(f, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]");
if (io->flags & IO_FLAG_FLUSH)
if (io->flags & IO_FLUSH)
io_flush(io);
return 0;
@ -238,14 +258,14 @@ int io_format_villas_open(struct io *io, const char *uri, const char *mode)
static struct plugin p = {
.name = "villas",
.description = "VILLAS human readable format",
.type = PLUGIN_TYPE_FORMAT,
.type = PLUGIN_TYPE_IO,
.io = {
.open = io_format_villas_open,
.fprint = io_format_villas_fprint,
.fscan = io_format_villas_fscan,
.sprint = io_format_villas_sprint,
.sscan = io_format_villas_sscan,
.size = 0
.open = villas_open,
.fprint = villas_fprint,
.fscan = villas_fscan,
.sprint = villas_sprint,
.sscan = villas_sscan,
.size = 0
}
};

View file

@ -26,6 +26,7 @@
#include <endian.h>
#endif
#include "sample.h"
#include "plugin.h"
#include "formats/webmsg.h"
#include "formats/webmsg_format.h"
@ -74,11 +75,106 @@ int webmsg_verify(struct webmsg *m)
return 0;
}
int webmsg_to_sample(struct webmsg *msg, struct sample *smp)
{
int ret;
webmsg_ntoh(msg);
ret = webmsg_verify(msg);
if (ret)
return -1;
smp->length = MIN(msg->length, smp->capacity);
smp->sequence = msg->sequence;
smp->id = msg->id;
smp->ts.origin = WEBMSG_TS(msg);
smp->ts.received.tv_sec = -1;
smp->ts.received.tv_nsec = -1;
memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(smp->length));
return 0;
}
int webmsg_from_sample(struct webmsg *msg, struct sample *smp)
{
*msg = WEBMSG_INIT(smp->length, smp->sequence);
msg->id = smp->id;
msg->ts.sec = smp->ts.origin.tv_sec;
msg->ts.nsec = smp->ts.origin.tv_nsec;
memcpy(msg->data, smp->data, WEBMSG_DATA_LEN(smp->length));
msg_hton(msg);
return 0;
}
size_t io_format_webmsg_length(struct sample *smps[], size_t cnt)
{
size_t sz = 0;
for (int i = 0; i < cnt; i++)
sz += WEBMSG_LEN(smps[i]->length);
return sz;
}
size_t io_format_webmsg_sprint(char *buf, size_t len, struct sample *smps[], size_t cnt, int flags)
{
int ret, i = 0;
char *ptr = buf;
struct webmsg *msg = (struct webmsg *) ptr;
struct sample *smp = smps[i];
while (ptr < buf + len && i < cnt) {
ret = webmsg_from_sample(msg, smp);
if (ret)
return ret;
ptr += WEBMSG_LEN(smp->length);
msg = (struct webmsg *) ptr;
smp = smps[++i];
}
return ptr - buf;
}
size_t io_format_webmsg_sscan(char *buf, size_t len, struct sample *smps[], size_t cnt, int *flags)
{
int ret, i = 0;
char *ptr = buf;
struct webmsg *msg = (struct webmsg *) ptr;
struct sample *smp = smps[i];
while (ptr < buf + len && i < cnt) {
ret = webmsg_to_sample(msg, smp);
if (ret)
return ret;
ptr += WEBMSG_LEN(smp->length);
msg = (struct webmsg *) ptr;
smp = smps[++i];
}
return i;
}
static struct plugin p = {
.name = "webmsg",
.description = "VILLAS binary format for websockets",
.type = PLUGIN_TYPE_FORMAT,
.io = {
.format = {
.length = io_format_webmsg_length,
.sprint = io_format_webmsg_sprint,
.sscan = io_format_webmsg_sscan,
.flags = IO_FORMAT_BINARY,
.size = 0
},
};

46
lib/io_format.c Normal file
View file

@ -0,0 +1,46 @@
/** Read / write sample data in different formats.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <stdlib.h>
#include <stdio.h>
#include "io_format.h"
int io_format_sscan(struct io_format *fmt, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt, int *flags)
{
return fmt->sscan ? fmt->sscan(buf, len, rbytes, smps, cnt, flags) : -1;
}
int io_format_sprint(struct io_format *fmt, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt, int flags)
{
return fmt->sprint ? fmt->sprint(buf, len, wbytes, smps, cnt, flags) : -1;
}
int io_format_fscan(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int *flags)
{
return fmt->sprint ? fmt->fscan(f, smps, cnt, flags) : -1;
}
int io_format_fprint(struct io_format *fmt, FILE *f, struct sample *smps[], unsigned cnt, int flags)
{
return fmt->fprint ? fmt->fprint(f, smps, cnt, flags) : -1;
}

View file

@ -24,9 +24,9 @@ LIB_SRCS += $(addprefix lib/nodes/, cbuilder.c loopback.c)
ifeq ($(PLATFORM),Linux)
WITH_FPGA ?= 1
WITH_TEST_RTT ?= 1
endif
WITH_TEST_RTT ?= 0
WITH_FILE ?= 1
WITH_SIGNAL ?= 1
WITH_NGSI ?= 1
@ -91,7 +91,6 @@ endif
# Enable Socket node type when libnl3 is available
ifeq ($(WITH_SOCKET),1)
LIB_SRCS += lib/nodes/socket.c
LIB_SRCS += lib/formats/msg.c
LIB_CFLAGS += -DWITH_SOCKET
# libnl3 is optional but required for network emulation and IRQ pinning
@ -136,7 +135,6 @@ endif
ifeq ($(WITH_WEBSOCKET),1)
ifeq ($(shell $(PKGCONFIG) libwebsockets jansson; echo $$?),0)
LIB_SRCS += lib/nodes/websocket.c
LIB_SRCS += lib/formats/webmsg.c
LIB_PKGS += libwebsockets jansson
LIB_CFLAGS += -DWITH_WEBSOCKET
endif

View file

@ -94,8 +94,15 @@ int cbuilder_read(struct node *n, struct sample *smps[], unsigned cnt)
pthread_mutex_lock(&cb->mtx);
while (cb->read >= cb->step)
pthread_cond_wait(&cb->cv, &cb->mtx);
float data[smp->capacity];
smp->length = cb->model->read(&smp->data[0].f, 16);
smp->length = cb->model->read(data, smp->capacity);
/* Cast float -> double */
for (int i = 0; i < smp->length; i++)
smp->data[i].f = data[i];
smp->sequence = cb->step;
cb->read = cb->step;
@ -111,8 +118,10 @@ int cbuilder_write(struct node *n, struct sample *smps[], unsigned cnt)
struct sample *smp = smps[0];
pthread_mutex_lock(&cb->mtx);
float flt = smp->data[0].f;
cb->model->write(&smp->data[0].f, smp->length);
cb->model->write(&flt, smp->length);
cb->model->code();
cb->step++;

View file

@ -29,19 +29,7 @@
#include "timing.h"
#include "queue.h"
#include "plugin.h"
#include "formats/villas.h"
int file_reverse(struct node *n)
{
struct file *f = n->_vd;
struct file_direction tmp;
tmp = f->read;
f->read = f->write;
f->write = tmp;
return 0;
}
#include "io.h"
static char * file_format_name(const char *format, struct timespec *ts)
{
@ -56,48 +44,13 @@ static char * file_format_name(const char *format, struct timespec *ts)
return buf;
}
static AFILE * file_reopen(struct file_direction *dir)
{
if (dir->handle)
afclose(dir->handle);
return afopen(dir->uri, dir->mode);
}
static int file_parse_direction(json_t *cfg, struct file *f, int d)
{
struct file_direction *dir = (d == FILE_READ) ? &f->read : &f->write;
int ret;
json_error_t err;
const char *format = NULL;
const char *mode = NULL;
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s }",
"uri", &format,
"mode", &mode
);
if (ret)
jerror(&err, "Failed to ");
if (format)
dir->format = strdup(format);
if (mode)
dir->mode = strdup(mode);
else
dir->mode = strdup(d == FILE_READ ? "r" : "w+");
return 0;
}
static struct timespec file_calc_read_offset(const struct timespec *first, const struct timespec *epoch, enum read_epoch_mode mode)
static struct timespec file_calc_offset(const struct timespec *first, const struct timespec *epoch, enum epoch_mode mode)
{
/* Get current time */
struct timespec now = time_now();
struct timespec offset;
/* Set read_offset depending on epoch_mode */
/* Set offset depending on epoch_mode */
switch (mode) {
case FILE_EPOCH_DIRECT: /* read first value at now + epoch */
offset = time_diff(first, &now);
@ -110,7 +63,7 @@ static struct timespec file_calc_read_offset(const struct timespec *first, const
case FILE_EPOCH_RELATIVE: /* read first value at first + epoch */
return *epoch;
case FILE_EPOCH_ABSOLUTE: /* read first value at f->read_epoch */
case FILE_EPOCH_ABSOLUTE: /* read first value at f->epoch */
return time_diff(first, epoch);
default:
@ -122,81 +75,64 @@ int file_parse(struct node *n, json_t *cfg)
{
struct file *f = n->_vd;
json_t *cfg_in = NULL;
json_t *cfg_out = NULL;
int ret;
json_error_t err;
const char *uri_tmpl = NULL;
const char *format = "villas";
const char *eof = NULL;
const char *epoch_mode = NULL;
double epoch_flt = 0;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o }",
"in", &cfg_in,
"out", &cfg_out
/* Default values */
f->rate = 0;
f->eof = FILE_EOF_EXIT;
f->epoch_mode = FILE_EPOCH_DIRECT;
f->flush = 0;
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: b, s?: s, s?: f, s?: s, s?: f, s?: s }",
"uri", &uri_tmpl,
"flush", &f->flush,
"eof", &eof,
"rate", &f->rate,
"epoch_mode", &epoch_mode,
"epoch", &epoch_flt,
"format", &format
);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
if (cfg_out) {
if (file_parse_direction(cfg_out, f, FILE_WRITE))
error("Failed to parse output file for node %s", node_name(n));
f->epoch = time_from_double(epoch_flt);
f->uri_tmpl = uri_tmpl ? strdup(uri_tmpl) : NULL;
f->format = io_format_lookup(format);
if (!f->format)
error("Invalid format '%s' for node %s", format, node_name(n));
f->flush = 0;
ret = json_unpack_ex(cfg_out, &err, 0, "{ s?: b }", "flush", &f->flush);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
if (eof) {
if (!strcmp(eof, "exit"))
f->eof = FILE_EOF_EXIT;
else if (!strcmp(eof, "rewind"))
f->eof = FILE_EOF_REWIND;
else if (!strcmp(eof, "wait"))
f->eof = FILE_EOF_WAIT;
else
error("Invalid mode '%s' for 'eof' setting of node %s", eof, node_name(n));
}
if (cfg_in) {
const char *eof = NULL;
const char *epoch_mode = NULL;
double epoch_flt = 0;
/* Default values */
f->read_rate = 0;
f->read_eof = FILE_EOF_EXIT;
f->read_epoch_mode = FILE_EPOCH_DIRECT;
if (file_parse_direction(cfg_in, f, FILE_READ))
error("Failed to parse input file for node %s", node_name(n));
ret = json_unpack_ex(cfg_in, &err, 0, "{ s?: s, s?: f, s?: s, s?: f }",
"eof", &eof,
"rate", &f->read_rate,
"epoch_mode", &epoch_mode,
"epoch", &epoch_flt
);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
f->read_epoch = time_from_double(epoch_flt);
/* More read specific settings */
if (eof) {
if (!strcmp(eof, "exit"))
f->read_eof = FILE_EOF_EXIT;
else if (!strcmp(eof, "rewind"))
f->read_eof = FILE_EOF_REWIND;
else if (!strcmp(eof, "wait"))
f->read_eof = FILE_EOF_WAIT;
else
error("Invalid mode '%s' for 'eof' setting of node %s", eof, node_name(n));
}
if (epoch_mode) {
if (!strcmp(epoch_mode, "direct"))
f->read_epoch_mode = FILE_EPOCH_DIRECT;
else if (!strcmp(epoch_mode, "wait"))
f->read_epoch_mode = FILE_EPOCH_WAIT;
else if (!strcmp(epoch_mode, "relative"))
f->read_epoch_mode = FILE_EPOCH_RELATIVE;
else if (!strcmp(epoch_mode, "absolute"))
f->read_epoch_mode = FILE_EPOCH_ABSOLUTE;
else if (!strcmp(epoch_mode, "original"))
f->read_epoch_mode = FILE_EPOCH_ORIGINAL;
else
error("Invalid value '%s' for setting 'epoch_mode' of node %s", epoch_mode, node_name(n));
}
if (epoch_mode) {
if (!strcmp(epoch_mode, "direct"))
f->epoch_mode = FILE_EPOCH_DIRECT;
else if (!strcmp(epoch_mode, "wait"))
f->epoch_mode = FILE_EPOCH_WAIT;
else if (!strcmp(epoch_mode, "relative"))
f->epoch_mode = FILE_EPOCH_RELATIVE;
else if (!strcmp(epoch_mode, "absolute"))
f->epoch_mode = FILE_EPOCH_ABSOLUTE;
else if (!strcmp(epoch_mode, "original"))
f->epoch_mode = FILE_EPOCH_ORIGINAL;
else
error("Invalid value '%s' for setting 'epoch_mode' of node %s", epoch_mode, node_name(n));
}
n->_vd = f;
@ -209,53 +145,46 @@ char * file_print(struct node *n)
struct file *f = n->_vd;
char *buf = NULL;
if (f->read.format) {
const char *epoch_str = NULL;
switch (f->read_epoch_mode) {
case FILE_EPOCH_DIRECT: epoch_str = "direct"; break;
case FILE_EPOCH_WAIT: epoch_str = "wait"; break;
case FILE_EPOCH_RELATIVE: epoch_str = "relative"; break;
case FILE_EPOCH_ABSOLUTE: epoch_str = "absolute"; break;
case FILE_EPOCH_ORIGINAL: epoch_str = "original"; break;
}
const char *epoch_str = NULL;
const char *eof_str = NULL;
const char *eof_str = NULL;
switch (f->read_eof) {
case FILE_EOF_EXIT: eof_str = "exit"; break;
case FILE_EOF_WAIT: eof_str = "wait"; break;
case FILE_EOF_REWIND: eof_str = "rewind"; break;
}
strcatf(&buf, "in=%s, mode=%s, eof=%s, epoch_mode=%s, epoch=%.2f",
f->read.uri ? f->read.uri : f->read.format,
f->read.mode,
eof_str,
epoch_str,
time_to_double(&f->read_epoch)
);
if (f->read_rate)
strcatf(&buf, ", rate=%.1f", f->read_rate);
switch (f->epoch_mode) {
case FILE_EPOCH_DIRECT: epoch_str = "direct"; break;
case FILE_EPOCH_WAIT: epoch_str = "wait"; break;
case FILE_EPOCH_RELATIVE: epoch_str = "relative"; break;
case FILE_EPOCH_ABSOLUTE: epoch_str = "absolute"; break;
case FILE_EPOCH_ORIGINAL: epoch_str = "original"; break;
}
if (f->write.format) {
strcatf(&buf, ", out=%s, mode=%s",
f->write.uri ? f->write.uri : f->write.format,
f->write.mode
);
switch (f->eof) {
case FILE_EOF_EXIT: eof_str = "exit"; break;
case FILE_EOF_WAIT: eof_str = "wait"; break;
case FILE_EOF_REWIND: eof_str = "rewind"; break;
}
if (f->read_first.tv_sec || f->read_first.tv_nsec)
strcatf(&buf, ", first=%.2f", time_to_double(&f->read_first));
strcatf(&buf, "uri=%s, format=%s, flush=%s, eof=%s, epoch_mode=%s, epoch=%.2f",
f->uri ? f->uri : f->uri_tmpl,
plugin_name(f->format),
f->flush ? "yes" : "no",
eof_str,
epoch_str,
time_to_double(&f->epoch)
);
if (f->read_offset.tv_sec || f->read_offset.tv_nsec)
strcatf(&buf, ", offset=%.2f", time_to_double(&f->read_offset));
if (f->rate)
strcatf(&buf, ", rate=%.1f", f->rate);
if ((f->read_first.tv_sec || f->read_first.tv_nsec) &&
(f->read_offset.tv_sec || f->read_offset.tv_nsec)) {
if (f->first.tv_sec || f->first.tv_nsec)
strcatf(&buf, ", first=%.2f", time_to_double(&f->first));
if (f->offset.tv_sec || f->offset.tv_nsec)
strcatf(&buf, ", offset=%.2f", time_to_double(&f->offset));
if ((f->first.tv_sec || f->first.tv_nsec) &&
(f->offset.tv_sec || f->offset.tv_nsec)) {
struct timespec eta, now = time_now();
eta = time_add(&f->read_first, &f->read_offset);
eta = time_add(&f->first, &f->offset);
eta = time_diff(&now, &eta);
if (eta.tv_sec || eta.tv_nsec)
@ -270,51 +199,46 @@ int file_start(struct node *n)
struct file *f = n->_vd;
struct timespec now = time_now();
int ret;
int ret, flags;
if (f->read.format) {
/* Prepare file name */
f->read.uri = file_format_name(f->read.format, &now);
/* Prepare file name */
f->uri = file_format_name(f->uri_tmpl, &now);
/* Open file */
f->read.handle = file_reopen(&f->read);
if (!f->read.handle)
serror("Failed to open file for reading: '%s'", f->read.uri);
/* Open file */
flags = IO_FORMAT_ALL;
if (f->flush)
flags |= IO_FLUSH;
/* Create timer */
f->read_timer = f->read_rate
? timerfd_create_rate(f->read_rate)
: timerfd_create(CLOCK_REALTIME, 0);
if (f->read_timer < 0)
serror("Failed to create timer");
ret = io_init(&f->io, f->format, flags);
if (ret)
return ret;
arewind(f->read.handle);
ret = io_open(&f->io, f->uri);
if (ret)
return ret;
/* Get timestamp of first line */
if (f->read_epoch_mode != FILE_EPOCH_ORIGINAL) {
struct sample s;
struct sample *smps[] = { &s };
s.capacity = 0;
/* Create timer */
ret = periodic_task_init(&f->timer, f->rate);
if (ret)
serror("Failed to create timer");
ret = io_format_villas_fscan(f->read.handle->file, smps, 1, NULL);
if (ret < 0)
error("Failed to read first timestamp of node %s", node_name(n));
/* Get timestamp of first line */
if (f->epoch_mode != FILE_EPOCH_ORIGINAL) {
io_rewind(&f->io);
f->read_first = s.ts.origin;
f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode);
arewind(f->read.handle);
}
struct sample s;
struct sample *smps[] = { &s };
s.capacity = 0;
ret = io_scan(&f->io, smps, 1);
if (ret != 1)
error("Failed to read first timestamp of node %s", node_name(n));
f->first = s.ts.origin;
f->offset = file_calc_offset(&f->first, &f->epoch, f->epoch_mode);
}
if (f->write.format) {
/* Prepare file name */
f->write.uri = file_format_name(f->write.format, &now);
/* Open file */
f->write.handle = file_reopen(&f->write);
if (!f->write.handle)
serror("Failed to open file for writing: '%s'", f->write.uri);
}
io_rewind(&f->io);
return 0;
}
@ -322,16 +246,19 @@ int file_start(struct node *n)
int file_stop(struct node *n)
{
struct file *f = n->_vd;
int ret;
if (f->read_timer)
close(f->read_timer);
if (f->read.handle)
afclose(f->read.handle);
if (f->write.handle)
afclose(f->write.handle);
periodic_task_destroy(&f->timer);
free(f->read.uri);
free(f->write.uri);
ret = io_close(&f->io);
if (ret)
return ret;
ret = io_destroy(&f->io);
if (ret)
return ret;
free(f->uri);
return 0;
}
@ -339,76 +266,78 @@ int file_stop(struct node *n)
int file_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct file *f = n->_vd;
int values, flags;
uint64_t ex;
int ret;
uint64_t steps;
assert(f->read.handle);
assert(cnt == 1);
retry: values = io_format_villas_fscan(f->read.handle->file, smps, 1, &flags); /* Get message and timestamp */
if (values <= 0) {
if (afeof(f->read.handle)) {
switch (f->read_eof) {
retry: ret = io_scan(&f->io, smps, cnt);
if (ret <= 0) {
if (io_eof(&f->io)) {
switch (f->eof) {
case FILE_EOF_REWIND:
info("Rewind input file of node %s", node_name(n));
f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode);
arewind(f->read.handle);
f->offset = file_calc_offset(&f->first, &f->epoch, f->epoch_mode);
io_rewind(&f->io);
goto retry;
case FILE_EOF_WAIT:
usleep(10000); /* We wait 10ms before fetching again. */
adownload(f->read.handle, 1);
/* We wait 10ms before fetching again. */
usleep(100000);
/* Try to download more data if this is a remote file. */
if (f->io.mode == IO_MODE_ADVIO)
adownload(f->io.advio.input, 1);
goto retry;
case FILE_EOF_EXIT:
info("Reached end-of-file of node %s", node_name(n));
killme(SIGTERM);
pause();
}
}
else
warn("Failed to read messages from node %s: reason=%d", node_name(n), values);
warn("Failed to read messages from node %s: reason=%d", node_name(n), ret);
return 0;
}
if (f->read_epoch_mode != FILE_EPOCH_ORIGINAL) {
if (!f->read_rate || aftell(f->read.handle) == 0) {
smps[0]->ts.origin = time_add(&smps[0]->ts.origin, &f->read_offset);
/* We dont wait in FILE_EPOCH_ORIGINAL mode */
if (f->epoch_mode == FILE_EPOCH_ORIGINAL)
return cnt;
ex = timerfd_wait_until(f->read_timer, &smps[0]->ts.origin);
}
else { /* Wait with fixed rate delay */
ex = timerfd_wait(f->read_timer);
if (f->rate) {
steps = periodic_task_wait_until_next_period(&f->timer);
smps[0]->ts.origin = time_now();
}
smps[0]->ts.origin = time_now();
}
else {
smps[0]->ts.origin = time_add(&smps[0]->ts.origin, &f->offset);
/* Check for overruns */
if (ex == 0)
serror("Failed to wait for timer");
else if (ex != 1)
warn("Overrun: %" PRIu64, ex - 1);
steps = periodic_task_wait_until(&f->timer, &smps[0]->ts.origin);
}
return 1;
/* Check for overruns */
if (steps == 0)
serror("Failed to wait for timer");
else if (steps != 1)
warn("Missed steps: %" PRIu64, steps - 1);
return cnt;
}
int file_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct file *f = n->_vd;
assert(f->write.handle);
assert(cnt == 1);
io_format_villas_fprint(f->write.handle->file, smps, cnt, IO_FORMAT_ALL & ~IO_FORMAT_OFFSET);
io_print(&f->io, smps, cnt);
if (f->flush)
afflush(f->write.handle);
return 1;
return cnt;
}
static struct plugin p = {
@ -418,7 +347,6 @@ static struct plugin p = {
.node = {
.vectorize = 1,
.size = sizeof(struct file),
.reverse = file_reverse,
.parse = file_parse,
.print = file_print,
.start = file_start,

View file

@ -27,7 +27,7 @@
#include "plugin.h"
#include "nodes/nanomsg.h"
#include "utils.h"
#include "formats/msg.h"
#include "io_format.h"
int nanomsg_reverse(struct node *n)
{
@ -82,6 +82,8 @@ int nanomsg_parse(struct node *n, json_t *cfg)
int ret;
struct nanomsg *m = n->_vd;
const char *format = "villas";
json_error_t err;
json_t *cfg_pub = NULL;
@ -90,9 +92,10 @@ int nanomsg_parse(struct node *n, json_t *cfg)
list_init(&m->publisher.endpoints);
list_init(&m->subscriber.endpoints);
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o }",
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: s }",
"publish", &cfg_pub,
"subscribe", &cfg_sub
"subscribe", &cfg_sub,
"format", &format
);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
@ -108,6 +111,10 @@ int nanomsg_parse(struct node *n, json_t *cfg)
if (ret < 0)
error("Invalid type for 'subscribe' setting of node %s", node_name(n));
}
m->format = io_format_lookup(format);
if (!m->format)
error("Invalid format '%s' for node %s", format, node_name(n));
return 0;
}
@ -118,7 +125,7 @@ char * nanomsg_print(struct node *n)
char *buf = NULL;
strcatf(&buf, "subscribe=[ ");
strcatf(&buf, "format=%s, subscribe=[ ", plugin_name(m->format));
for (size_t i = 0; i < list_length(&m->subscriber.endpoints); i++) {
char *ep = list_at(&m->subscriber.endpoints, i);
@ -207,17 +214,16 @@ int nanomsg_deinit()
int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt)
{
int ret;
struct nanomsg *m = n->_vd;
char data[MSG_MAX_PACKET_LEN];
int bytes;
char data[NANOMSG_MAX_PACKET_LEN];
/* Receive payload */
ret = nn_recv(m->subscriber.socket, data, sizeof(data), 0);
if (ret < 0)
return ret;
bytes = nn_recv(m->subscriber.socket, data, sizeof(data), 0);
if (bytes < 0)
return -1;
return msg_buffer_to_samples(smps, cnt, data, ret);
return io_format_sscan(m->format, data, bytes, NULL, smps, cnt, NULL);
}
int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)
@ -225,15 +231,15 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)
int ret;
struct nanomsg *m = n->_vd;
ssize_t sent;
size_t wbytes;
char data[MSG_MAX_PACKET_LEN];
char data[NANOMSG_MAX_PACKET_LEN];
sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data));
if (sent < 0)
ret = io_format_sprint(m->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL);
if (ret <= 0)
return -1;
ret = nn_send(m->publisher.socket, data, sent, 0);
ret = nn_send(m->publisher.socket, data, wbytes, 0);
if (ret < 0)
return ret;

View file

@ -34,16 +34,13 @@
#include "config.h"
#include "utils.h"
#ifdef WITH_LIBNL_ROUTE_30
#ifdef WITH_NETEM
#include "kernel/if.h"
#include "kernel/nl.h"
#include "kernel/tc.h"
#endif /* WITH_NETEM */
#define WITH_NETEM
#endif /* WITH_LIBNL_ROUTE_30 */
#include "formats/msg.h"
#include "formats/msg_format.h"
#include "io_format.h"
#include "sample.h"
#include "queue.h"
#include "plugin.h"
@ -127,7 +124,7 @@ int socket_deinit()
char * socket_print(struct node *n)
{
struct socket *s = n->_vd;
char *layer = NULL, *header = NULL, *endian = NULL, *buf;
char *layer = NULL, *buf;
switch (s->layer) {
case SOCKET_LAYER_UDP: layer = "udp"; break;
@ -135,25 +132,10 @@ char * socket_print(struct node *n)
case SOCKET_LAYER_ETH: layer = "eth"; break;
}
switch (s->header) {
case SOCKET_HEADER_NONE: header = "none"; break;
case SOCKET_HEADER_FAKE: header = "fake"; break;
case SOCKET_HEADER_DEFAULT: header = "default"; break;
}
if (s->header == SOCKET_HEADER_DEFAULT)
endian = "auto";
else {
switch (s->endian) {
case SOCKET_ENDIAN_LITTLE: endian = "little"; break;
case SOCKET_ENDIAN_BIG: endian = "big"; break;
}
}
char *local = socket_print_addr((struct sockaddr *) &s->local);
char *remote = socket_print_addr((struct sockaddr *) &s->remote);
buf = strf("layer=%s, header=%s, endian=%s, local=%s, remote=%s", layer, header, endian, local, remote);
buf = strf("layer=%s, format=%s, local=%s, remote=%s", layer, plugin_name(s->format), local, remote);
if (s->multicast.enabled) {
char group[INET_ADDRSTRLEN];
@ -320,110 +302,15 @@ int socket_destroy(struct node *n)
return 0;
}
static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt)
{
int length;
struct socket *s = n->_vd;
char buf[MSG_MAX_PACKET_LEN];
uint32_t *values = (uint32_t *) buf;
ssize_t bytes;
struct sample *smp = smps[0];
if (cnt < 1)
return 0;
union sockaddr_union src;
socklen_t srclen = sizeof(src);
/* Receive next sample */
bytes = recvfrom(s->sd, buf, sizeof(buf), 0, &src.sa, &srclen);
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */
else if (bytes < 0)
serror("Failed recv from node %s", node_name(n));
else if (bytes % 4 != 0) {
warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes);
recv(s->sd, NULL, 0, 0); /* empty receive buffer */
return -1;
}
length = bytes / 4;
/* Strip IP header from packet */
if (s->layer == SOCKET_LAYER_IP) {
struct ip *iphdr = (struct ip *) buf;
length -= iphdr->ip_hl;
values += iphdr->ip_hl;
}
/* SOCK_RAW IP sockets to not provide the IP protocol number via recvmsg()
* So we simply set it ourself. */
if (s->layer == SOCKET_LAYER_IP) {
switch (src.sa.sa_family) {
case AF_INET: src.sin.sin_port = s->remote.sin.sin_port; break;
case AF_INET6: src.sin6.sin6_port = s->remote.sin6.sin6_port; break;
}
}
if (s->verify_source && socket_compare_addr(&src.sa, &s->remote.sa) != 0) {
char *buf = socket_print_addr((struct sockaddr *) &src);
warn("Received packet from unauthorized source: %s", buf);
free(buf);
return 0;
}
/* Convert packet contents to host endianess */
for (int i = 0; i < length; i++)
values[i] = s->endian == SOCKET_ENDIAN_BIG
? be32toh(values[i])
: le32toh(values[i]);
if (s->header == SOCKET_HEADER_FAKE) {
if (length < 3) {
warn("Node %s received a packet with no fake header. Skipping...", node_name(n));
return 0;
}
smp->sequence = values[0];
smp->ts.origin.tv_sec = values[1];
smp->ts.origin.tv_nsec = values[2];
values += 3;
length -= 3;
}
else {
smp->sequence = n->sequence++; /* Fake sequence no generated by VILLASnode */
smp->ts.origin.tv_sec = 0;
smp->ts.origin.tv_nsec = 0;
}
if (length > smp->capacity) {
warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity);
length = smp->capacity;
}
memcpy(smp->data, values, SAMPLE_DATA_LEN(length));
smp->ts.received.tv_sec = 0;
smp->ts.received.tv_nsec = 0;
smp->length = length;
return 1; /* GTNET-SKT sends every sample in a single packet */
}
static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cnt)
int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
{
int ret;
struct socket *s = n->_vd;
char buf[MSG_MAX_PACKET_LEN];
char buf[SOCKET_MAX_PACKET_LEN];
char *bufptr = buf;
ssize_t bytes;
size_t rbytes;
union sockaddr_union src;
socklen_t srclen = sizeof(src);
@ -465,100 +352,38 @@ static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cn
return 0;
}
ret = msg_buffer_to_samples(smps, cnt, bufptr, bytes);
ret = io_format_sscan(s->format, bufptr, bytes, &rbytes, smps, cnt, NULL);
if (ret < 0)
warn("Received invalid packet from node: %s reason=%d", node_name(n), ret);
if (bytes != rbytes)
warn("Received invalid packet from node: %s bytes=%zu, rbytes=%zu", node_name(n), bytes, rbytes);
return ret;
}
static int socket_write_none(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
int sent = 0;
ssize_t bytes;
if (cnt < 1)
return 0;
for (int i = 0; i < cnt; i++) {
int off = s->header == SOCKET_HEADER_FAKE ? 3 : 0;
int len = smps[i]->length + off;
uint32_t data[len];
/* First three values are sequence, seconds and nano-seconds timestamps */
if (s->header == SOCKET_HEADER_FAKE) {
data[0] = smps[i]->sequence;
data[1] = smps[i]->ts.origin.tv_sec;
data[2] = smps[i]->ts.origin.tv_nsec;
}
for (int j = 0; j < smps[i]->length; j++)
data[off + j] = s->endian == SOCKET_ENDIAN_BIG
? htobe32(smps[i]->data[j].i)
: htole32(smps[i]->data[j].i);
bytes = sendto(s->sd, data, len * sizeof(data[0]), 0,
(struct sockaddr *) &s->remote, sizeof(s->remote));
if (bytes < 0)
serror("Failed send to node %s", node_name(n));
sent++;
}
return sent;
}
static int socket_write_villas(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
char data[MSG_MAX_PACKET_LEN];
ssize_t bytes = 0, sent;
sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data));
if (sent < 0)
return -1;
/* Send message */
bytes = sendto(s->sd, data, sent, 0, (struct sockaddr *) &s->remote, sizeof(s->remote));
if (bytes < 0)
serror("Failed send to node %s", node_name(n));
return cnt;
}
int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
switch (s->header) {
case SOCKET_HEADER_NONE:
case SOCKET_HEADER_FAKE:
return socket_read_none(n, smps, cnt);
case SOCKET_HEADER_DEFAULT:
return socket_read_villas(n, smps, cnt);
}
return -1;
}
int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
switch (s->header) {
case SOCKET_HEADER_NONE:
case SOCKET_HEADER_FAKE:
return socket_write_none(n, smps, cnt);
char data[SOCKET_MAX_PACKET_LEN];
int ret;
ssize_t bytes;
size_t wbytes;
case SOCKET_HEADER_DEFAULT:
return socket_write_villas(n, smps, cnt);
}
ret = io_format_sprint(s->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL);
if (ret < 0)
return -1;
return -1;
/* Send message */
bytes = sendto(s->sd, data, wbytes, 0, (struct sockaddr *) &s->remote, sizeof(s->remote));
if (bytes < 0)
serror("Failed send to node %s", node_name(n));
if (bytes != wbytes)
warn("Partial send to node %s", node_name(n));
return cnt;
}
int socket_parse(struct node *n, json_t *cfg)
@ -569,6 +394,7 @@ int socket_parse(struct node *n, json_t *cfg)
const char *endian = NULL;
const char *layer = NULL;
const char *header = NULL;
const char *format = "villas";
int ret;
@ -581,18 +407,24 @@ int socket_parse(struct node *n, json_t *cfg)
s->endian = SOCKET_ENDIAN_BIG;
s->verify_source = 0;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s: s, s: s, s?: b, s?: o }",
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s: s, s: s, s?: b, s?: o, s?: s }",
"layer", &layer,
"header", &header,
"endian", &endian,
"remote", &remote,
"local", &local,
"verify_source", &s->verify_source,
"multicast", &cfg_multicast
"multicast", &cfg_multicast,
"format", &format
);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
/* Format */
s->format = io_format_lookup(format);
if (!s->format)
error("Invalid format '%s' for node %s", format, node_name(n));
/* IP layer */
if (layer) {
if (!strcmp(layer, "ip"))

View file

@ -30,9 +30,7 @@
#include "timing.h"
#include "utils.h"
#include "plugin.h"
#include "formats/webmsg.h"
#include "formats/webmsg_format.h"
#include "io_format.h"
#include "nodes/websocket.h"
/* Private static storage */
@ -66,7 +64,11 @@ static int websocket_connection_init(struct websocket_connection *c, struct lws
c->state = STATE_INITIALIZED;
c->wsi = wsi;
c->buf = NULL;
/** @todo: We must find a better way to determine the buffer size */
c->buflen = 1 << 12;
c->buf = alloc(c->buflen);
if (c->node) {
struct websocket *w = c->node->_vd;
@ -76,7 +78,7 @@ static int websocket_connection_init(struct websocket_connection *c, struct lws
else
list_push(&connections, c);
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
ret = queue_signalled_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
if (ret)
return ret;
@ -102,13 +104,13 @@ static int websocket_connection_destroy(struct websocket_connection *c)
if (c->_name)
free(c->_name);
ret = queue_destroy(&c->queue);
ret = queue_signalled_destroy(&c->queue);
if (ret)
return ret;
if (c->buf)
free(c->buf);
c->state = STATE_DESTROYED;
c->wsi = NULL;
@ -136,7 +138,7 @@ static int websocket_connection_write(struct websocket_connection *c, struct sam
for (int i = 0; i < cnt; i++) {
sample_get(smps[i]); /* increase reference count */
ret = queue_push(&c->queue, (void **) smps[i]);
ret = queue_signalled_push(&c->queue, (void **) smps[i]);
if (ret != 1)
warn("Queue overrun in websocket connection: %s", websocket_connection_name(c));
}
@ -171,35 +173,52 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
int ret;
struct websocket_connection *c = user;
struct webmsg *msg;
struct sample *smp;
switch (reason) {
case LWS_CALLBACK_CLIENT_ESTABLISHED:
ret = websocket_connection_init(c, wsi);
if (ret)
return -1;
c->format = io_format_lookup("villas");
return 0;
break;
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
warn("Failed to establish connection: %s", websocket_connection_name(c));
break;
case LWS_CALLBACK_ESTABLISHED:
c->state = STATE_DESTROYED;
c->mode = WEBSOCKET_MODE_SERVER;
/* We use the URI to associate this connection to a node
* and choose a protocol.
*
* Example: ws://example.com/node_1.json
* Will select the node with the name 'node_1'
* and format 'json'.
*
* If the node name is omitted, this connection
* will receive sample data from all websocket nodes
* (catch all).
*/
/* Get path of incoming request */
char *node, *format = NULL;
char uri[64];
lws_hdr_copy(wsi, uri, sizeof(uri), WSI_TOKEN_GET_URI); /* The path component of the*/
if (strlen(uri) <= 0) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid URL");
return -1;
}
if ((uri[0] == '/' && uri[1] == 0) || uri[0] == 0){
/* Catch all connection */
node = strtok(uri, "/.");
if (strlen(node) == 0)
c->node = NULL;
}
else {
char *node = uri + 1;
format = strtok(NULL, "");
/* Search for node whose name matches the URI. */
c->node = list_lookup(&p.node.instances, node);
@ -208,12 +227,21 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
return -1;
}
}
if (!format)
format = "villas";
c->format = io_format_lookup(format);
if (!c->format) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid format");
return -1;
}
ret = websocket_connection_init(c, wsi);
if (ret)
return -1;
return 0;
break;
case LWS_CALLBACK_CLOSED:
websocket_connection_destroy(c);
@ -224,72 +252,82 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
return 0;
case LWS_CALLBACK_CLIENT_WRITEABLE:
case LWS_CALLBACK_SERVER_WRITEABLE:
if (c->state == STATE_STOPPED) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_NORMAL, "Goodbye");
return -1;
}
case LWS_CALLBACK_SERVER_WRITEABLE: {
if (c->node && c->node->state != STATE_STARTED) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
return -1;
}
size_t msglen, buflen = LWS_PRE;
size_t wbytes;
int cnt = 256; //c->node ? c->node->vectorize : 1;
int pulled;
struct sample **smps = alloca(cnt * sizeof(struct sample *));
while (queue_pull(&c->queue, (void **) &smp)) {
msglen = WEBMSG_LEN(smp->length);
pulled = queue_signalled_pull_many(&c->queue, (void **) smps, cnt);
c->buf = realloc(c->buf, buflen + msglen);
if (!c->buf)
serror("realloc failed:");
io_format_sprint(c->format, c->buf + LWS_PRE, c->buflen - LWS_PRE, &wbytes, smps, pulled, IO_FORMAT_ALL);
msg = (struct webmsg *) (c->buf + buflen);
buflen += msglen;
sample_put_many(smps, pulled);
msg->version = WEBMSG_VERSION;
msg->type = WEBMSG_TYPE_DATA;
msg->length = smp->length;
msg->sequence = smp->sequence;
msg->id = smp->source->id;
msg->ts.sec = smp->ts.origin.tv_sec;
msg->ts.nsec = smp->ts.origin.tv_nsec;
memcpy(&msg->data, &smp->data, WEBMSG_DATA_LEN(smp->length));
webmsg_hton(msg);
sample_put(smp);
}
ret = lws_write(wsi, (unsigned char *) c->buf + LWS_PRE, buflen - LWS_PRE, LWS_WRITE_BINARY);
ret = lws_write(wsi, (unsigned char *) c->buf + LWS_PRE, wbytes, c->format->flags & IO_FORMAT_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
if (ret < 0) {
warn("Failed lws_write() for connection %s", websocket_connection_name(c));
return -1;
}
if (c->state == STATE_STOPPED) {
info("Closing connection %s", websocket_connection_name(c));
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_NORMAL, "Goodbye");
return -1;
}
if (queue_signalled_available(&c->queue) > 0)
lws_callback_on_writable(wsi);
return 0;
}
case LWS_CALLBACK_CLIENT_RECEIVE:
case LWS_CALLBACK_RECEIVE:
if (!lws_frame_is_binary(wsi)) {
case LWS_CALLBACK_RECEIVE: {
if (c->format->flags & IO_FORMAT_BINARY && !lws_frame_is_binary(wsi)) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Binary data expected");
return -1;
}
if (len < WEBMSG_LEN(0)) {
if (len <= 0) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid packet");
return -1;
}
if (!c->node) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Catch-all connection can not receive.");
return -1;
}
struct timespec ts_recv = time_now();
int recvd;
int cnt = 256; //c->node->vectorize;
struct websocket *w = c->node->_vd;
struct sample **smps = alloca(cnt * sizeof(struct sample *));
msg = (struct webmsg *) in;
while ((char *) msg < (char *) in + len) {
struct node *dest;
ret = sample_alloc(&w->pool, smps, cnt);
if (ret != 1) {
warn("Pool underrun for connection: %s", websocket_connection_name(c));
break;
}
/* Convert message to host byte-order */
webmsg_ntoh(msg);
recvd = io_format_sscan(c->format, in, len, NULL, smps, cnt, NULL);
if (recvd < 0) {
warn("Failed to parse sample data received on connection: %s", websocket_connection_name(c));
break;
}
struct node *dest;
for (int i = 0; i < recvd; i++) {
/* Set receive timestamp */
smps[i]->ts.received = ts_recv;
/* Find destination node of this message */
if (c->node)
@ -300,53 +338,31 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
for (int i = 0; i < list_length(&p.node.instances); i++) {
struct node *n = list_at(&p.node.instances, i);
if (n->id == msg->id) {
if (n->id == smps[i]->id) {
dest = n;
break;
}
}
if (!dest) {
if (!dest)
warn("Ignoring message due to invalid node id");
goto next;
}
}
}
struct websocket *w = dest->_vd;
ret = sample_alloc(&w->pool, &smp, 1);
if (ret != 1) {
warn("Pool underrun for connection: %s", websocket_connection_name(c));
break;
}
smp->ts.origin = WEBMSG_TS(msg);
smp->ts.received = ts_recv;
smp->sequence = msg->sequence;
smp->length = msg->length;
if (smp->length > smp->capacity) {
smp->length = smp->capacity;
warn("Dropping values for connection: %s", websocket_connection_name(c));
}
memcpy(&smp->data, &msg->data, SAMPLE_DATA_LEN(smp->length));
ret = queue_signalled_push(&w->queue, (void **) smp);
if (ret != 1) {
warn("Queue overrun for connection %s", websocket_connection_name(c));
break;
}
/* Next message */
next: msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length));
ret = queue_signalled_push_many(&w->queue, (void **) smps, recvd);
if (ret != 1) {
warn("Queue overrun for connection %s", websocket_connection_name(c));
break;
}
return 0;
}
default:
return 0;
break;
}
return 0;
}
int websocket_init(struct super_node *sn)
@ -372,8 +388,12 @@ int websocket_deinit()
}
/* Wait for all connections to be closed */
while (list_length(&connections) > 0)
usleep(0.2*1e6);
while (list_length(&connections) > 0) {
info("LWS: Waiting for connection shutdown");
sched_yield();
usleep(0.1 * 1e6);
}
list_destroy(&connections, (dtor_cb_t) websocket_destination_destroy, true);
@ -385,9 +405,7 @@ int websocket_start(struct node *n)
int ret;
struct websocket *w = n->_vd;
size_t blocklen = LWS_PRE + WEBMSG_LEN(DEFAULT_WEBSOCKET_SAMPLELEN);
ret = pool_init(&w->pool, DEFAULT_WEBSOCKET_QUEUELEN, blocklen, &memtype_hugepage);
ret = pool_init(&w->pool, DEFAULT_WEBSOCKET_QUEUELEN, SAMPLE_LEN(DEFAULT_WEBSOCKET_SAMPLELEN), &memtype_hugepage);
if (ret)
return ret;
@ -403,6 +421,7 @@ int websocket_start(struct node *n)
c->state = STATE_DESTROYED;
c->mode = WEBSOCKET_MODE_CLIENT;
c->node = n;
c->destination = d;
d->info.context = web->context;
d->info.vhost = web->vhost;
@ -428,8 +447,11 @@ int websocket_stop(struct node *n)
}
/* Wait for all connections to be closed */
while (list_length(&w->connections) > 0)
sleep(1);
while (list_length(&w->connections) > 0) {
info("LWS: Waiting for connection shutdown");
sched_yield();
usleep(0.1 * 1e6);
}
ret = queue_signalled_destroy(&w->queue);
if (ret)

View file

@ -27,10 +27,11 @@
#endif
#include "nodes/zeromq.h"
#include "node.h"
#include "utils.h"
#include "queue.h"
#include "plugin.h"
#include "formats/msg.h"
#include "io_format.h"
static void *context;
@ -97,6 +98,7 @@ int zeromq_parse(struct node *n, json_t *cfg)
const char *ep = NULL;
const char *type = NULL;
const char *filter = NULL;
const char *format = "villas";
size_t index;
json_t *cfg_pub = NULL;
@ -109,19 +111,24 @@ int zeromq_parse(struct node *n, json_t *cfg)
z->curve.enabled = false;
z->ipv6 = 0;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: o, s?: o, s?: s, s?: s, s?: b }",
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: o, s?: o, s?: s, s?: s, s?: b, s?: s }",
"subscribe", &ep,
"publish", &cfg_pub,
"curve", &cfg_curve,
"filter", &filter,
"pattern", &type,
"ipv6", &z->ipv6
"ipv6", &z->ipv6,
"format", &format
);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
z->subscriber.endpoint = ep ? strdup(ep) : NULL;
z->filter = filter ? strdup(filter) : NULL;
z->format = io_format_lookup(format);
if (!z->format)
error("Invalid format '%s' for node %s", format, node_name(n));
if (cfg_pub) {
switch (json_typeof(cfg_pub)) {
@ -202,7 +209,13 @@ char * zeromq_print(struct node *n)
#endif
}
strcatf(&buf, "pattern=%s, ipv6=%s, crypto=%s, subscribe=%s, publish=[ ", pattern, z->ipv6 ? "yes" : "no", z->curve.enabled ? "yes" : "no", z->subscriber.endpoint);
strcatf(&buf, "format=%s, pattern=%s, ipv6=%s, crypto=%s, subscribe=%s, publish=[ ",
plugin_name(z->format),
pattern,
z->ipv6 ? "yes" : "no",
z->curve.enabled ? "yes" : "no",
z->subscriber.endpoint
);
for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) {
char *ep = list_at(&z->publisher.endpoints, i);
@ -416,7 +429,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt)
if (ret < 0)
return ret;
recv = msg_buffer_to_samples(smps, cnt, zmq_msg_data(&m), zmq_msg_size(&m));
recv = io_format_sscan(z->format, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt, NULL);
ret = zmq_msg_close(&m);
if (ret)
@ -430,16 +443,16 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
int ret;
struct zeromq *z = n->_vd;
ssize_t sent;
size_t wbytes;
zmq_msg_t m;
char data[1500];
sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data));
if (sent < 0)
ret = io_format_sprint(z->format, data, sizeof(data), &wbytes, smps, cnt, IO_FORMAT_ALL);
if (ret <= 0)
return -1;
ret = zmq_msg_init_size(&m, sent);
ret = zmq_msg_init_size(&m, wbytes);
if (z->filter) {
switch (z->pattern) {
@ -457,7 +470,7 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
}
}
memcpy(zmq_msg_data(&m), data, sent);
memcpy(zmq_msg_data(&m), data, wbytes);
ret = zmq_msg_send(&m, z->publisher.socket, 0);
if (ret < 0)

View file

@ -93,7 +93,7 @@ static void usage()
printf("\n");
printf("Supported IO formats:\n");
plugin_dump(PLUGIN_TYPE_FORMAT);
plugin_dump(PLUGIN_TYPE_IO);
printf("\n");
printf("Example:");
@ -105,11 +105,9 @@ static void usage()
int main(int argc, char *argv[])
{
int ret;
int ret, recv;
char *format = "villas";
size_t recv;
/* Default values */
cnt = 1;
@ -178,7 +176,7 @@ check: if (optarg == endptr)
error("Failed to initilize memory pool");
/* Initialize IO */
p = plugin_lookup(PLUGIN_TYPE_FORMAT, format);
p = plugin_lookup(PLUGIN_TYPE_IO, format);
if (!p)
error("Unknown IO format '%s'", format);
@ -186,7 +184,7 @@ check: if (optarg == endptr)
if (ret)
error("Failed to initialize IO");
ret = io_open(&io, NULL, NULL);
ret = io_open(&io, NULL);
if (ret)
error("Failed to open IO");
@ -218,11 +216,13 @@ check: if (optarg == endptr)
error("Failed to allocate %d smps from pool", cnt);
recv = io_scan(&io, smps, cnt);
if (recv < 0)
killme(SIGTERM);
debug(15, "Read %zu smps from stdin", recv);
debug(15, "Read %u smps from stdin", recv);
hook_read(&h, smps, &recv);
hook_write(&h, smps, &recv);
hook_read(&h, smps, (unsigned *) &recv);
hook_write(&h, smps, (unsigned *) &recv);
io_print(&io, smps, recv);

View file

@ -89,7 +89,7 @@ static void usage()
printf("\n");
printf("Supported IO formats:\n");
plugin_dump(PLUGIN_TYPE_FORMAT);
plugin_dump(PLUGIN_TYPE_IO);
printf("\n");
print_copyright();

View file

@ -270,7 +270,7 @@ check: if (optarg == endptr)
if (ret)
error("Failed to initalize real-time");
p = plugin_lookup(PLUGIN_TYPE_FORMAT, format);
p = plugin_lookup(PLUGIN_TYPE_IO, format);
if (!p)
error("Invalid format: %s", format);
@ -278,7 +278,7 @@ check: if (optarg == endptr)
if (ret)
error("Failed to initialize IO");
ret = io_open(&io, NULL, NULL);
ret = io_open(&io, NULL);
if (ret)
error("Failed to open IO");

View file

@ -30,7 +30,7 @@
#include <villas/utils.h>
#include <villas/sample.h>
#include <villas/formats/villas.h>
#include <villas/io/villas.h>
#include <villas/timing.h>
#include <villas/node.h>
#include <villas/plugin.h>
@ -125,15 +125,15 @@ int main(int argc, char *argv[])
if (ret)
error("Failed to initialize node");
p = plugin_lookup(PLUGIN_TYPE_FORMAT, format);
p = plugin_lookup(PLUGIN_TYPE_IO, format);
if (!p)
error("Invalid output format '%s'", format);
ret = io_init(&io, &p->io, IO_FLAG_FLUSH | (IO_FORMAT_ALL & ~IO_FORMAT_OFFSET));
ret = io_init(&io, &p->io, IO_FLUSH | (IO_FORMAT_ALL & ~IO_FORMAT_OFFSET));
if (ret)
error("Failed to initialize output");
ret = io_open(&io, NULL, NULL);
ret = io_open(&io, NULL);
if (ret)
error("Failed to open output");

View file

@ -29,7 +29,8 @@
#include <villas/sample.h>
#include <villas/io.h>
#include <villas/formats/villas.h>
#include <villas/io_format.h>
#include <villas/io/villas.h>
#include <villas/utils.h>
#include <villas/timing.h>
#include <villas/pool.h>
@ -132,11 +133,11 @@ check: if (optarg == endptr)
serror("Failed to open file: %s", f2.path);
while (!feof(f1.handle) && !feof(f2.handle)) {
ret = io_format_villas_fscan(f1.handle, &f1.sample, 1, &f1.flags);
ret = villas_fscan(f1.handle, &f1.sample, 1, &f1.flags);
if (ret < 0 && !feof(f1.handle))
goto out;
ret = io_format_villas_fscan(f2.handle, &f2.sample, 1, &f2.flags);
ret = villas_fscan(f2.handle, &f2.sample, 1, &f2.flags);
if (ret < 0 && !feof(f2.handle))
goto out;

View file

@ -21,6 +21,7 @@
*********************************************************************************/
#include <stdio.h>
#include <float.h>
#include <criterion/criterion.h>
#include <criterion/parameterized.h>
@ -32,6 +33,7 @@
#include "plugin.h"
#include "pool.h"
#include "io.h"
#include "io/raw.h"
#define NUM_SAMPLES 10
#define NUM_VALUES 10
@ -40,23 +42,43 @@ void cr_assert_eq_sample(struct sample *s, struct sample *t)
{
cr_assert_eq(s->length, t->length);
cr_assert_eq(s->sequence, t->sequence);
cr_assert_eq(s->format, t->format);
// cr_assert_eq(s->format, t->format);
cr_assert_eq(s->ts.origin.tv_sec, t->ts.origin.tv_sec);
cr_assert_eq(s->ts.origin.tv_nsec, t->ts.origin.tv_nsec);
cr_assert_eq(s->ts.received.tv_sec, t->ts.received.tv_sec);
cr_assert_eq(s->ts.received.tv_nsec, t->ts.received.tv_nsec);
for (int i = 0; i < MIN(s->length, t->length); i++)
cr_assert_float_eq(s->data[i].f, t->data[i].f, 1e-6);
for (int i = 0; i < MIN(s->length, t->length); i++) {
switch (sample_get_data_format(t, i)) {
case SAMPLE_DATA_FORMAT_FLOAT:
cr_assert_float_eq(s->data[i].f, t->data[i].f, 1e-3, "Sample data mismatch at index %d: %f != %f", i, s->data[i].f, t->data[i].f);
break;
case SAMPLE_DATA_FORMAT_INT:
cr_assert_eq(s->data[i].i, t->data[i].i);
break;
}
}
}
ParameterizedTestParameters(io, highlevel)
{
static char formats[][32] = {
#ifdef WITH_HDF5
"hdf5",
#endif
"raw-int8",
"raw-int16-be",
"raw-int16-le",
"raw-int32-be",
"raw-int32-le",
"raw-int64-be",
"raw-int64-le",
"raw-flt32",
"raw-flt64",
"villas",
"json",
"csv"
"msg",
"gtnet",
"gtnet-fake",
};
return cr_make_param_array(char[32], formats, ARRAY_LEN(formats));
@ -64,16 +86,19 @@ ParameterizedTestParameters(io, highlevel)
ParameterizedTest(char *fmt, io, highlevel)
{
int ret;
char filename[64];
int ret, cnt;
struct io io;
struct io_format *f;
struct pool p = { .state = STATE_DESTROYED };
struct sample *smps[NUM_SAMPLES];
struct sample *smpt[NUM_SAMPLES];
ret = pool_init(&p, 2 * NUM_SAMPLES, SAMPLE_LEN(NUM_VALUES), &memtype_hugepage);
cr_assert_eq(ret, 0);
info("Testing: %s", fmt);
/* Prepare a sample with arbitrary data */
ret = sample_alloc(&p, smps, NUM_SAMPLES);
@ -83,52 +108,94 @@ ParameterizedTest(char *fmt, io, highlevel)
cr_assert_eq(ret, NUM_SAMPLES);
for (int i = 0; i < NUM_SAMPLES; i++) {
smpt[i]->capacity =
smps[i]->capacity = NUM_VALUES;
smpt[i]->capacity = NUM_VALUES;
smps[i]->length = NUM_VALUES;
smps[i]->sequence = 235 + i;
smps[i]->format = 0; /* all float */
smps[i]->ts.origin = time_now();
smps[i]->ts.received = (struct timespec) {
.tv_sec = smps[i]->ts.origin.tv_sec - 1,
.tv_nsec = smps[i]->ts.origin.tv_nsec
};
for (int j = 0; j < smps[i]->length; j++)
smps[i]->data[j].f = j * 1.2 + i * 100;
smps[i]->data[j].f = j * 0.1 + i * 100;
//smps[i]->data[j].i = -500 + j*100;
}
/* Open a file for IO */
strncpy(filename, "/tmp/villas-unit-test.XXXXXX", sizeof(filename));
mktemp(filename);
char *fn, dir[64];
strncpy(dir, "/tmp/villas.XXXXXX", sizeof(dir));
mkdtemp(dir);
ret = asprintf(&fn, "%s/file", dir);
cr_assert_gt(ret, 0);
printf("Writing to file: %s\n", filename);
f = io_format_lookup(fmt);
cr_assert_not_null(f, "Format '%s' does not exist", fmt);
struct plugin *pl;
pl = plugin_lookup(PLUGIN_TYPE_FORMAT, fmt);
cr_assert_not_null(pl, "Format '%s' does not exist", fmt);
ret = io_init(&io, &pl->io, IO_FORMAT_ALL);
ret = io_init(&io, f, IO_FORMAT_ALL);
cr_assert_eq(ret, 0);
ret = io_open(&io, filename, "w+");
ret = io_open(&io, fn, "w+");
cr_assert_eq(ret, 0);
ret = io_print(&io, smps, NUM_SAMPLES);
cr_assert_eq(ret, NUM_SAMPLES);
io_rewind(&io);
io_flush(&io);
ret = io_close(&io);
cr_assert_eq(ret, 0);
#if 1 /* Show the file contents */
char cmd[128];
snprintf(cmd, sizeof(cmd), "cat %s", filename);
if (!strcmp(fmt, "json") || !strcmp(fmt, "villas"))
snprintf(cmd, sizeof(cmd), "cat %s", fn);
else
snprintf(cmd, sizeof(cmd), "hexdump -C %s", fn);
system(cmd);
#endif
ret = io_open(&io, fn, "r");
cr_assert_eq(ret, 0);
ret = io_scan(&io, smpt, NUM_SAMPLES);
cr_assert_eq(ret, NUM_SAMPLES, "Read only %d of %d samples back", ret, NUM_SAMPLES);
cnt = io_scan(&io, smpt, NUM_SAMPLES);
for (int i = 0; i < 0; i++)
/* The RAW format has certain limitations:
* - limited accuracy if smaller datatypes are used
* - no support for vectors / framing
*
* We need to take these limitations into account before comparing.
*/
if (f->sscan == raw_sscan) {
cr_assert_eq(cnt, 1);
cr_assert_eq(smpt[0]->length, smpt[0]->capacity);
if (io.flags & RAW_FAKE) {
}
else {
smpt[0]->sequence = smps[0]->sequence;
smpt[0]->ts.origin = smps[0]->ts.origin;
}
int bits = 1 << (io.flags >> 24);
for (int j = 0; j < smpt[0]->length; j++) {
if (io.flags & RAW_FLT) {
switch (bits) {
case 32: smps[0]->data[j].f = (float) smps[0]->data[j].f; break;
case 64: smps[0]->data[j].f = (double) smps[0]->data[j].f; break;
}
}
else {
switch (bits) {
case 8: smps[0]->data[j].i = ( int8_t) smps[0]->data[j].i; break;
case 16: smps[0]->data[j].i = ( int16_t) smps[0]->data[j].i; break;
case 32: smps[0]->data[j].i = ( int32_t) smps[0]->data[j].i; break;
case 64: smps[0]->data[j].i = ( int64_t) smps[0]->data[j].i; break;
}
}
}
}
else
cr_assert_eq(cnt, NUM_SAMPLES, "Read only %d of %d samples back", cnt, NUM_SAMPLES);
for (int i = 0; i < cnt; i++)
cr_assert_eq_sample(smps[i], smpt[i]);
ret = io_close(&io);
@ -137,8 +204,13 @@ ParameterizedTest(char *fmt, io, highlevel)
ret = io_destroy(&io);
cr_assert_eq(ret, 0);
ret = unlink(filename);
ret = unlink(fn);
cr_assert_eq(ret, 0);
ret = rmdir(dir);
cr_assert_eq(ret, 0);
free(fn);
sample_free(smps, NUM_SAMPLES);
sample_free(smpt, NUM_SAMPLES);