1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

new signal declaration code

This commit is contained in:
Steffen Vogel 2018-08-20 18:31:27 +02:00
parent 86003e0b78
commit 5707ee9d58
37 changed files with 941 additions and 525 deletions

View file

@ -60,8 +60,13 @@ nodes = {
type = "socket",
layer = "udp"
format = "protobuf",
local = "*:12000"
remote = "134.130.169.32:12000"
in = {
address = "*:12000"
},
out = {
remote = "134.130.169.32:12000"
}
},
sine1 = {

View file

@ -51,14 +51,24 @@ nodes = {
socket1 = {
type = "socket",
layper = "udp",
local = "*:12000",
remote = "127.0.0.1:12001"
in = {
address = "*:12000"
},
out = {
address = "127.0.0.1:12001"
}
},
socket2 = {
type = "socket",
layper = "udp",
local = "*:12001",
remote = "127.0.0.1:12000"
layer = "udp",
in = {
address = "*:12001"
},
out = {
address = "127.0.0.1:1200"
}
}
};

View file

@ -64,29 +64,32 @@ nodes = {
format = "gtnet.fake", # For a list of available node-types run: 'villas-node -h'
verify_source = true, # Check if source address of incoming packets matches the remote address.
in = {
address = "127.0.0.1:12001" # This node only received messages on this IP:Port pair
verify_source = true, # Check if source address of incoming packets matches the remote address.
multicast = { # IGMP multicast is only support for layer = (ip|udp)
enabled = true,
local = "127.0.0.1:12001", # This node only received messages on this IP:Port pair
remote = "127.0.0.1:12000", # This node sents outgoing messages to this IP:Port pair
netem = { # Network emulation settings
enabled = true,
# Those settings can be specified for each node invidually!
delay = 100000, # Additional latency in microseconds
jitter = 30000, # Jitter in uS
distribution = "normal", # Distribution of delay: uniform, normal, pareto, paretonormal
loss = 10 # Packet loss in percent
duplicate = 10, # Duplication in percent
corrupt = 10 # Corruption in percent
group = "224.1.2.3", # The multicast group. Must be within 224.0.0.0/4
interface = "1.2.3.4", # The IP address of the interface which should receive multicast packets.
ttl = 128, # The time to live for outgoing multicast packets.
loop = false, # Whether or not to loopback outgoing multicast packets to the local host.
}
},
multicast = { # IGMP multicast is only support for layer = (ip|udp)
enabled = true,
group = "224.1.2.3", # The multicast group. Must be within 224.0.0.0/4
interface = "1.2.3.4", # The IP address of the interface which should receive multicast packets.
ttl = 128, # The time to live for outgoing multicast packets.
loop = false, # Whether or not to loopback outgoing multicast packets to the local host.
out = {
address = "127.0.0.1:12000", # This node sents outgoing messages to this IP:Port pair
netem = { # Network emulation settings
enabled = true,
# Those settings can be specified for each node invidually!
delay = 100000, # Additional latency in microseconds
jitter = 30000, # Jitter in uS
distribution = "normal", # Distribution of delay: uniform, normal, pareto, paretonormal
loss = 10 # Packet loss in percent
duplicate = 10, # Duplication in percent
corrupt = 10 # Corruption in percent
}
}
},
ethernet_node = {
@ -95,15 +98,23 @@ nodes = {
### The following settings are specific to the socket node-type!! ###
layer = "eth",
local = "12:34:56:78:90:AB%eth0:12002",
remote = "12:34:56:78:90:AB%eth0:12002"
in = {
address = "12:34:56:78:90:AB%eth0:12002"
},
out = {
address = "12:34:56:78:90:AB%eth0:12002"
}
},
unix_domain_node = {
type = "socket",
layer = "unix", # Datagram UNIX domain sockets require two endpoints
local = "/var/run/villas-node/node.sock",
remote = "/var/run/villas-node/client.sock"
in = {
address = "/var/run/villas-node/node.sock"
},
out = {
address = "/var/run/villas-node/client.sock"
}
},
opal_node = { # The server can be started as an Asynchronous process
type = "opal", # from within an OPAL-RT model.
@ -120,14 +131,22 @@ nodes = {
### The following settings are specific to the file node-type!! ###
uri = "logs/input.log", # These options specify the path prefix where the the files are stored
epoch_mode = "direct" # One of: direct (default), wait, relative, absolute
epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0).
in = {
epoch_mode = "direct" # One of: direct (default), wait, relative, absolute
epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0).
# Consult the documentation of a full explanation
rate = 2.0 # A constant rate at which the lines of the input files should be read
rate = 2.0 # A constant rate at which the lines of the input files should be read
# A missing or zero value will use the timestamp in the first column
# of the file to determine the pause between consecutive lines.
buffer_size = 1000000
},
out = {
flush = true
buffer_size = 1000000
}
},
ngsi_node = {
type = "ngsi",
@ -158,36 +177,47 @@ nodes = {
nanomsg_node = {
type = "nanomsg",
publish = [
"tcp://*:12000", # TCP socket
"ipc:///tmp/test.ipc", # Interprocess communication
"inproc://test" # Inprocess communication
],
subscribe = [
"tcp://127.0.0.1:12000",
"ipc:///tmp/test.ipc",
"inproc://test"
]
out = {
endpoints = [
"tcp://*:12000", # TCP socket
"ipc:///tmp/test.ipc", # Interprocess communication
"inproc://test" # Inprocess communication
],
}
in = {
endpoints = [
"tcp://127.0.0.1:12000",
"ipc:///tmp/test.ipc",
"inproc://test"
]
}
},
zeromq_node = {
type = "zeromq",
pattern = "pubsub", # The ZeroMQ pattern. One of pubsub, radiodish
ipv6 = false, # Enable IPv6 support
filter = "ab184", # A filter which is prefix matched
curve = { # Z85 encoded Curve25519 keys
enabled = true,
public_key = "Veg+Q.V-c&1k>yVh663gQ^7fL($y47gybE-nZP1L",
secret_key = "HPY.+mFuB[jGs@(zZr6$IZ1H1dZ7Ji*j>oi@O?Pc"
}
subscribe = "tcp://*:1234" # The subscribe endpoint.
in = {
subscribe = "tcp://*:1234" # The subscribe endpoint.
# See http://api.zeromq.org/2-1:zmq-bind for details.
filter = "ab184", # A filter which is prefix matched for each received msg
}
publish = [ # The publish endpoints.
"tcp://localhost:1235", # See http://api.zeromq.org/2-1:zmq-connect for details.
"tcp://localhost:12444"
],
out = {
publish = [ # The publish endpoints.
"tcp://localhost:1235", # See http://api.zeromq.org/2-1:zmq-connect for details.
"tcp://localhost:12444"
],
filter = "ab184", # A prefix which is prepended to each send message.
}
},
signal_node = {
type = "signal",
@ -209,9 +239,13 @@ nodes = {
shmem_node = {
type = "shmem",
in_name = "sn1_in", # Name of shared memory segment for receiving side
out_name = "sn1_in", # Name of shared memory segment for sending side
in = {
name = "sn1_in"
}, # Name of shared memory segment for receiving side
out = {
name = "sn1_in" # Name of shared memory segment for sending side
},
queuelen = 1024, # Length of the queues
polling = true, # We can busy-wait or use pthread condition variables for synchronizations
@ -296,24 +330,24 @@ nodes = {
interface = "lo",
dst_address = "01:0c:cd:01:00:01",
publish = {
fields = [
"float32",
"float64",
"int8",
"int32"
out = {
signals = [
{ iec_type = "float32" },
{ iec_type = "float64" },
{ iec_type = "int8" },
{ iec_type = "int32" }
],
svid = "test1234",
smpmod = "samples_per_second",
confrev = 55
},
subscribe = {
fields = [
"float32",
"float64",
"int8",
"int32"
in = {
signals = [
{ iec_type = "float32" },
{ iec_type = "float64" },
{ iec_type = "int8" },
{ iec_type = "int32" }
]
}
},
@ -323,7 +357,7 @@ nodes = {
format = "protobuf",
username = "guest",
password = "guest",
password = "guest",
host = "localhost",
port = 1883,
@ -331,9 +365,12 @@ nodes = {
retain = false,
qos = 0,
publish = "test-topic",
subscribe = "test-topic",
out = {
publish = "test-topic"
},
in = {
subscribe = "test-topic"
},
ssl = {
enabled = false,
insecure = true,
@ -384,7 +421,7 @@ paths = (
out = "file_node", # This path includes all available example hooks.
builtin = false, # By default, all paths will have a few builtin hooks attached to them.
# When collecting statistics or measurements these are undesired.
# When collecting statistics or measurements these are undesired.
# A complete list of supported hooks
@ -405,13 +442,6 @@ paths = (
ratio = 2 # Only forward every 2nd message
},
{
type = "convert"
mask = 0x1 # only convert the first value
mode = "fixed" # Convert all values to fixed precission. Use 'float' to convert to floating point.
scale = 1.0
},
{
type = "skip_first"

View file

@ -7,7 +7,7 @@ logging = {
nodes = {
influxdb_node = {
type = "influxdb",
samplelen = 3,
signals = 3,
server = "relaxed_colden:8089",
key = "villas",

View file

@ -1,24 +1,7 @@
{
"stats" : 1,
"nodes" : {
"node1" : {
"type" : "socket",
"local" : "127.0.0.1:12000",
"remote" : "127.0.0.1:12001",
"hooks" : [
{
"type" : "stats",
"verbose" : true
}
]
"type" : "loopback"
}
},
"paths" : [
{
"in" : "node1",
"out" : "node1"
}
]
}
}

View file

@ -23,7 +23,7 @@ nodes = {
type = "shmem",
out_name = "/villas1-out",
in_name = "/villas1-in",
samplelen = 4,
signals = 4,
queuelen = 32,
polling = false,
vectorize = 1

View file

@ -33,10 +33,6 @@ extern "C" {
struct sample;
struct io;
enum format_type_flags {
FORMAT_TYPE_BINARY = (1 << 8)
};
struct format_type {
int (*init)(struct io *io);
int (*destroy)(struct io *io);
@ -82,7 +78,7 @@ struct format_type {
int (*scan)( struct io *io, struct sample *smps[], unsigned cnt);
/** Print a header. */
void (*header)(struct io *io);
void (*header)(struct io *io, const struct sample *smp);
/** Print a footer. */
void (*footer)(struct io *io);
@ -94,7 +90,7 @@ struct format_type {
*/
/** @see format_type_sscan */
int (*sscan)(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
int (*sscan)(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
/** @see format_type_sprint */
int (*sprint)(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt);

View file

@ -23,18 +23,20 @@
#pragma once
#include <stdio.h>
#include <stdlib.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Forward declarations. */
struct io;
struct sample;
void csv_header(struct io *io);
void csv_header(struct io *io, const struct sample *smp);
int csv_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
int csv_sprint(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
int csv_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
#ifdef __cplusplus
}

View file

@ -33,7 +33,7 @@ struct sample;
struct io;
int json_reserve_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt);
int json_reserve_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
int json_reserve_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
int json_reserve_print(struct io *io, struct sample *smps[], unsigned cnt);
int json_reserve_scan(struct io *io, struct sample *smps[], unsigned cnt);

View file

@ -32,7 +32,7 @@ extern "C" {
struct sample;
int json_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt);
int json_sscan(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt);
int json_sscan(struct io *io, const char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt);
int json_print(struct io *io, struct sample *smps[], unsigned cnt);
int json_scan(struct io *io, struct sample *smps[], unsigned cnt);

View file

@ -29,6 +29,7 @@ extern "C" {
/* Forward declaration */
struct msg;
struct sample;
struct list;
/** Convert msg from network to host byteorder */
void msg_ntoh(struct msg *m);
@ -53,10 +54,10 @@ void msg_hdr_ntoh(struct msg *m);
int msg_verify(struct msg *m);
/** Copy fields from \p msg into \p smp. */
int msg_to_sample(struct msg *msg, struct sample *smp);
int msg_to_sample(struct msg *msg, struct sample *smp, struct list *signals);
/** Copy fields form \p smp into \p msg. */
int msg_from_sample(struct msg *msg, struct sample *smp);
int msg_from_sample(struct msg *msg, struct sample *smp, struct list *signals);
#ifdef __cplusplus
}

View file

@ -36,7 +36,7 @@ struct sample;
int protobuf_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt);
/** Read struct sample's from buffer \p buf into samples \p smps. */
int protobuf_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
int protobuf_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
#ifdef __cplusplus
}

View file

@ -29,39 +29,33 @@
extern "C" {
#endif
/* float128 is currently not yet supported as htole128() functions a missing */
#if 0 && defined(__GNUC__) && defined(__linux__)
#define HAS_128BIT
#endif
/* Forward declarations */
struct sample;
enum raw_flags {
RAW_FAKE = (1 << 16), /**< Treat the first three values as: sequenceno, seconds, nanoseconds */
/** Treat the first three values as: sequenceno, seconds, nanoseconds */
RAW_FAKE_HEADER = (1 << 16) | SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE,
RAW_BIG_ENDIAN = (1 << 7), /**< Encode data in big-endian byte order */
RAW_BE_INT = (1 << 17), /**< Byte-order for integer data: big-endian if set. */
RAW_BE_FLT = (1 << 18), /**< Byte-order for floating point data: big-endian if set. */
RAW_BE_HDR = (1 << 19), /**< Byte-order for fake header fields: big-endian if set. */
/** Byte-order for all fields: big-endian if set. */
RAW_BE = RAW_BE_INT | RAW_BE_FLT | RAW_BE_HDR,
/** Mix floating and integer types.
*
* io_raw_sscan() parses all values as single / double precission fp.
* io_raw_sprint() uses sample::format to determine the type.
*/
RAW_AUTO = (1 << 22),
RAW_FLT = (1 << 23), /**< Data-type: floating point otherwise integer. */
//RAW_1 = (0 << 24), /**< Pack each value as a single bit. */
RAW_8 = (3 << 24), /**< Pack each value as a byte. */
RAW_16 = (4 << 24), /**< Pack each value as a word. */
RAW_32 = (5 << 24), /**< Pack each value as a double word. */
RAW_64 = (6 << 24) /**< Pack each value as a quad word. */
RAW_BITS_8 = (3 << 24), /**< Pack each value as a byte. */
RAW_BITS_16 = (4 << 24), /**< Pack each value as a word. */
RAW_BITS_32 = (5 << 24), /**< Pack each value as a double word. */
RAW_BITS_64 = (6 << 24), /**< Pack each value as a quad word. */
#ifdef HAS_128BIT
RAW_128 = (7 << 24) /**< Pack each value as a double quad word. */
#endif
};
/** Copy / read struct msg's from buffer \p buf to / fram samples \p smps. */
int raw_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt);
/** Read struct sample's from buffer \p buf into samples \p smps. */
int raw_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
int raw_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
#ifdef __cplusplus
}

View file

@ -42,7 +42,7 @@ enum villas_binary_flags {
int villas_binary_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt);
/** Read struct sample's from buffer \p buf into samples \p smps. */
int villas_binary_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
int villas_binary_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
#ifdef __cplusplus
}

View file

@ -33,11 +33,11 @@ extern "C" {
struct io;
struct sample;
void villas_human_header(struct io *io);
void villas_human_header(struct io *io, const struct sample *smp);
int villas_human_print(struct io *io, struct sample *smps[], unsigned cnt);
int villas_human_scan(struct io *io, struct sample *smps[], unsigned cnt);
#ifdef __cplusplus
}
#endif
#endif

View file

@ -36,9 +36,13 @@ struct sample;
struct format_type;
enum io_flags {
/* Bits 0-7 are reserved for for flags defined by enum sample_flags */
IO_FLUSH = (1 << 8), /**< Flush the output stream after each chunk of samples. */
IO_NONBLOCK = (1 << 9), /**< Dont block io_read() while waiting for new samples. */
IO_NEWLINES = (1 << 10) /**< The samples of this format are newline delimited. */
IO_NEWLINES = (1 << 10), /**< The samples of this format are newline delimited. */
IO_DESTROY_SIGNALS = (1 << 11), /**< Signal descriptors are managed by this IO instance. Destroy them in io_destoy() */
IO_HAS_BINARY_PAYLOAD = (1 << 12), /**< This IO instance en/decodes binary payloads. */
IO_AUTO_DETECT_FORMAT = (1 << 13) /**< This IO instance supports format auto-detection during decoding. */
};
struct io {
@ -59,11 +63,11 @@ struct io {
char *buffer;
size_t buflen;
struct list *signals;
struct node *node;
} in, out;
struct list *signals; /**< Signal meta data for parsed samples by io_scan() */
bool header_printed;
enum {
IO_MODE_STDIO,
IO_MODE_ADVIO,
@ -71,18 +75,22 @@ struct io {
} mode;
void *_vd;
struct format_type *_vt;
const struct format_type *_vt;
};
int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags);
int io_init(struct io *io, const struct format_type *fmt, struct list *signals, int flags);
int io_init_auto(struct io *io, const struct format_type *fmt, int len, int flags);
int io_destroy(struct io *io);
int io_check(struct io *io);
int io_open(struct io *io, const char *uri);
int io_close(struct io *io);
void io_header(struct io *io);
void io_header(struct io *io, const struct sample *smp);
void io_footer(struct io *io);
@ -98,6 +106,8 @@ int io_flush(struct io *io);
int io_fd(struct io *io);
const struct format_type * io_type(struct io *io);
int io_stream_open(struct io *io, const char *uri);
int io_stream_close(struct io *io);
@ -124,7 +134,7 @@ FILE * io_stream_output(struct io *io);
* @retval >=0 The number of samples which have been parsed from \p buf and written into \p smps.
* @retval <0 Something went wrong.
*/
int io_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
int io_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt);
/** Print \p cnt samples from \p smps into buffer \p buf of length \p len.
*

View file

@ -48,7 +48,6 @@ struct node_direction {
int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
struct list hooks; /**< List of write hooks (struct hook). */
struct list signals; /**< List of signal meta data such as signal names */
json_t *cfg; /**< A JSON object containing the configuration of the node. */
};
@ -65,14 +64,15 @@ struct node
char *_name_long; /**< Singleton: A string used to print to screen. */
int affinity; /**< CPU Affinity of this node */
int samplelen; /**< The maximum number of values this node can receive. */
unsigned sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */
uint64_t sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */
struct stats *stats; /**< Statistic counters. This is a pointer to the statistic hooks private data. */
struct node_direction in, out;
struct list signals; /**< Signal meta data for data which is __received__ by node_read(). */
enum state state;
struct node_type *_vt; /**< Virtual functions (C++ OOP style) */

View file

@ -41,9 +41,14 @@ struct node;
struct super_node;
struct sample;
enum node_type_flags {
NODE_TYPE_PROVIDES_SIGNALS = (1 << 0)
};
/** C++ like vtable construct for node_types */
struct node_type {
int vectorize; /**< Maximal vector length supported by this node type. Zero is unlimited. */
int flags;
enum state state; /**< State of this node-type. */

View file

@ -99,7 +99,6 @@ struct path {
int reverse; /**< This path as a matching reverse path. */
int builtin; /**< This path should use built-in hooks by default. */
int queuelen; /**< The queue length for each path_destination::queue */
int samplelen; /**< Will be calculated based on path::sources.mappings */
char *_name; /**< Singleton: A string which is used to print this path to screen. */

View file

@ -49,12 +49,11 @@ struct pool;
/** Parts of a sample that can be serialized / de-serialized by the IO formats */
enum sample_flags {
SAMPLE_HAS_ORIGIN = (1 << 0), /**< Include origin timestamp in output. */
SAMPLE_HAS_RECEIVED = (1 << 1), /**< Include receive timestamp in output. */
SAMPLE_HAS_TS_ORIGIN = (1 << 0), /**< Include origin timestamp in output. */
SAMPLE_HAS_TS_RECEIVED = (1 << 1), /**< Include receive timestamp in output. */
SAMPLE_HAS_OFFSET = (1 << 2), /**< Include offset (received - origin timestamp) in output. */
SAMPLE_HAS_ID = (1 << 3), /**< This sample has a valid sample::id field. */
SAMPLE_HAS_SEQUENCE = (1 << 4), /**< Include sequence number in output. */
SAMPLE_HAS_VALUES = (1 << 5), /**< Include values in output. */
SAMPLE_HAS_SEQUENCE = (1 << 3), /**< Include sequence number in output. */
SAMPLE_HAS_DATA = (1 << 4), /**< Include values in output. */
SAMPLE_HAS_ALL = (1 << 5) - 1, /**< Enable all output options. */
SAMPLE_IS_FIRST = (1 << 16), /**< This sample is the first of a new simulation case */
@ -129,7 +128,7 @@ int sample_copy_many(struct sample *dsts[], struct sample *srcs[], int cnt);
int sample_incref_many(struct sample *smps[], int cnt);
int sample_decref_many(struct sample *smps[], int cnt);
enum signal_format sample_format(const struct sample *s, unsigned idx);
enum signal_type sample_format(const struct sample *s, unsigned idx);
#ifdef __cplusplus
}

View file

@ -53,13 +53,13 @@ union signal_data {
float _Complex z; /**< Complex values. */
};
enum signal_format {
SIGNAL_FORMAT_INVALID = 0, /**< Signal format is invalid. */
SIGNAL_FORMAT_AUTO = 1, /**< Signal format is unknown. Try autodetection. */
SIGNAL_FORMAT_FLOAT = 2, /**< See signal_data::f */
SIGNAL_FORMAT_INT = 3, /**< See signal_data::i */
SIGNAL_FORMAT_BOOL = 4, /**< See signal_data::b */
SIGNAL_FORMAT_COMPLEX = 5 /**< See signal_data::z */
enum signal_type {
SIGNAL_TYPE_INVALID = 0, /**< Signal type is invalid. */
SIGNAL_TYPE_AUTO = 1, /**< Signal type is unknown. Attempt autodetection. */
SIGNAL_TYPE_FLOAT = 2, /**< See signal_data::f */
SIGNAL_TYPE_INTEGER = 3, /**< See signal_data::i */
SIGNAL_TYPE_BOOLEAN = 4, /**< See signal_data::b */
SIGNAL_TYPE_COMPLEX = 5 /**< See signal_data::z */
};
/** Signal descriptor.
@ -76,7 +76,7 @@ struct signal {
atomic_int refcnt; /**< Reference counter. */
enum signal_format format;
enum signal_type type;
};
/** Initialize a signal with default values. */
@ -86,7 +86,7 @@ int signal_init(struct signal *s);
int signal_destroy(struct signal *s);
/** Allocate memory for a new signal, and initialize it with provided values. */
struct signal * signal_create(const char *name, const char *unit, enum signal_format fmt);
struct signal * signal_create(const char *name, const char *unit, enum signal_type fmt);
/** Destroy and release memory of signal. */
int signal_free(struct signal *s);
@ -97,6 +97,9 @@ int signal_incref(struct signal *s);
/** Decrease reference counter. */
int signal_decref(struct signal *s);
/** Copy a signal. */
struct signal * signal_copy(struct signal *s);
/** Parse signal description. */
int signal_parse(struct signal *s, json_t *cfg);
@ -105,18 +108,18 @@ int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, u
int signal_list_parse(struct list *list, json_t *cfg);
int signal_list_generate(struct list *list, unsigned len, enum signal_format fmt);
int signal_list_generate(struct list *list, unsigned len, enum signal_type fmt);
void signal_list_dump(const struct list *list);
enum signal_format signal_format_from_str(const char *str);
enum signal_type signal_type_from_str(const char *str);
const char * signal_format_to_str(enum signal_format fmt);
const char * signal_type_to_str(enum signal_type fmt);
enum signal_format signal_format_detect(const char *val);
enum signal_type signal_type_detect(const char *val);
/** Convert signal data from one description/format to another. */
void signal_data_convert(union signal_data *data, const struct signal *from, const struct signal *to);
void signal_data_cast(union signal_data *data, const struct signal *from, const struct signal *to);
/** Print value of a signal to a character buffer. */
int signal_data_snprint(const union signal_data *data, const struct signal *sig, char *buf, size_t len);

View file

@ -40,14 +40,6 @@ int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node
h->enabled = 1;
h->priority = vt->priority;
/* Node hooks can only used with nodes,
Path hooks only with paths.. */
if ((!(vt->flags & HOOK_NODE_READ) && n) ||
(!(vt->flags & HOOK_NODE_WRITE) && n) ||
(!(vt->flags & HOOK_PATH) && p))
return -1;
h->path = p;
h->node = n;
@ -244,12 +236,7 @@ int hook_init_builtin_list(struct list *l, bool builtin, int mask, struct path *
{
int ret;
ret = list_init(l);
if (ret)
return ret;
if (!builtin)
return 0;
assert(l->state == STATE_INITIALIZED);
for (size_t i = 0; i < list_length(&plugins); i++) {
struct plugin *q = (struct plugin *) list_at(&plugins, i);
@ -260,18 +247,21 @@ int hook_init_builtin_list(struct list *l, bool builtin, int mask, struct path *
if (q->type != PLUGIN_TYPE_HOOK)
continue;
if (vt->flags & mask)
continue;
if (builtin &&
vt->flags & HOOK_BUILTIN &&
vt->flags & mask)
{
h = (struct hook *) alloc(sizeof(struct hook));
if (!h)
return -1;
h = (struct hook *) alloc(sizeof(struct hook));
if (!h)
return -1;
ret = hook_init(h, vt, p, n);
if (ret)
return ret;
ret = hook_init(h, vt, p, n);
if (ret)
return ret;
list_push(l, h);
list_push(l, h);
}
}
return 0;

View file

@ -33,6 +33,7 @@ set(HOOK_SRC
limit_rate.c
scale.c
fix.c
cast.c
)
if(WITH_IO)

201
lib/hooks/cast.c Normal file
View file

@ -0,0 +1,201 @@
/** Cast hook.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2018, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASnode
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
/** @addtogroup hooks Hook functions
* @{
*/
#include <string.h>
#include <villas/hook.h>
#include <villas/plugin.h>
#include <villas/node.h>
#include <villas/path.h>
#include <villas/sample.h>
struct cast {
struct list operations;
struct list signals;
};
static int cast_init(struct hook *h)
{
int ret;
struct cast *c = (struct cast *) h->_vd;
struct list *orig_signals;
if (h->node)
orig_signals = &h->node->signals;
else if (h->path)
orig_signals = &h->path->signals;
else
return -1;
ret = list_init(&c->signals);
if (ret)
return ret;
/* Copy original signal list */
for (int i = 0; i < list_length(orig_signals); i++) {
struct signal *orig_sig = list_at(orig_signals, i);
struct signal *new_sig = signal_copy(orig_sig);
list_push(&c->signals, new_sig);
}
return 0;
}
static int cast_destroy(struct hook *h)
{
int ret;
struct cast *c = (struct cast *) h->_vd;
ret = list_destroy(&c->signals, (dtor_cb_t) signal_decref, false);
if (ret)
return ret;
return 0;
}
static int cast_parse(struct hook *h, json_t *cfg)
{
int ret;
struct cast *c = (struct cast *) h->_vd;
struct signal *sig;
size_t i;
json_t *json_signals;
json_t *json_signal;
ret = json_unpack(cfg, "{ s: o }",
"signals", &json_signals
);
if (ret)
return ret;
if (json_is_array(json_signals))
return -1;
json_array_foreach(json_signals, i, json_signal) {
int index = -1;
const char *name = NULL;
const char *new_name = NULL;
const char *new_unit = NULL;
const char *new_format = NULL;
ret = json_unpack(json_signal, "{ s?: s, s?: i, s?: s, s?: s, s?: s }",
"name", &name,
"index", &index,
"new_format", &new_format,
"new_name", &new_name,
"new_unit", &new_unit
);
if (ret)
return ret;
/* Find matching original signal descriptor */
if (index >= 0 && name != NULL)
return -1;
if (index < 0 && name == NULL)
return -1;
sig = name
? list_lookup(&c->signals, name)
: list_at_safe(&c->signals, index);
if (!sig)
return -1;
/* Cast to new format */
if (new_format) {
enum signal_type fmt;
fmt = signal_type_from_str(new_format);
if (fmt == SIGNAL_TYPE_INVALID)
return -1;
sig->type = fmt;
}
/* Set new name */
if (new_name) {
if (sig->name)
free(sig->name);
sig->name = strdup(new_name);
}
/* Set new unit */
if (new_unit) {
if (sig->unit)
free(sig->unit);
sig->unit = strdup(new_unit);
}
}
return 0;
}
static int cast_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
struct cast *c = (struct cast *) h->_vd;
for (int i = 0; i < *cnt; i++) {
struct sample *smp = smps[i];
for (int j = 0; j < smp->length; j++) {
struct signal *orig_sig = list_at(smp->signals, j);
struct signal *new_sig = list_at(&c->signals, j);
signal_data_cast(&smp->data[j], orig_sig, new_sig);
}
/* Replace signal descriptors of sample */
smp->signals = &c->signals;
}
return 0;
}
static struct plugin p = {
.name = "cast",
.description = "Cast signals",
.type = PLUGIN_TYPE_HOOK,
.hook = {
.flags = HOOK_NODE_READ | HOOK_PATH,
.priority = 99,
.init = cast_init,
.destroy = cast_destroy,
.parse = cast_parse,
.process = cast_process,
.size = sizeof(struct cast)
}
};
REGISTER_PLUGIN(&p)
/** @} */

145
lib/io.c
View file

@ -79,7 +79,7 @@ skip: bytes = getdelim(&io->in.buffer, &io->in.buflen, io->delimiter, f);
return i;
}
int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags)
int io_init(struct io *io, const struct format_type *fmt, struct list *signals, int flags)
{
int ret;
@ -88,9 +88,9 @@ int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags)
io->_vt = fmt;
io->_vd = alloc(fmt->size);
io->flags = flags | io->_vt->flags;
io->delimiter = io->_vt->delimiter ? io->_vt->delimiter : '\n';
io->separator = io->_vt->separator ? io->_vt->separator : '\t';
io->flags = flags | (io_type(io)->flags & ~SAMPLE_HAS_ALL);
io->delimiter = io_type(io)->delimiter ? io_type(io)->delimiter : '\n';
io->separator = io_type(io)->separator ? io_type(io)->separator : '\t';
io->in.buflen =
io->out.buflen = 4096;
@ -98,19 +98,9 @@ int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags)
io->in.buffer = alloc(io->in.buflen);
io->out.buffer = alloc(io->out.buflen);
io->input.node = n;
io->output.node = n;
io->signals = signals;
if (n) {
io->input.signals = &n->in.signals;
io->output.signals = &n->out.signals;
}
else {
io->input.signals = NULL;
io->output.signals = NULL;
}
ret = io->_vt->init ? io->_vt->init(io) : 0;
ret = io_type(io)->init ? io_type(io)->init(io) : 0;
if (ret)
return ret;
@ -119,13 +109,35 @@ int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags)
return 0;
}
int io_init_auto(struct io *io, const struct format_type *fmt, int len, int flags)
{
int ret;
struct list *signals;
signals = alloc(sizeof(struct list));
signals->state = STATE_DESTROYED;
ret = list_init(signals);
if (ret)
return ret;
ret = signal_list_generate(signals, len, SIGNAL_TYPE_AUTO);
if (ret)
return ret;
flags |= IO_DESTROY_SIGNALS;
return io_init(io, fmt, signals, flags);
}
int io_destroy(struct io *io)
{
int ret;
assert(io->state == STATE_CLOSED || io->state == STATE_INITIALIZED);
assert(io->state == STATE_CLOSED || io->state == STATE_INITIALIZED || io->state == STATE_CHECKED);
ret = io->_vt->destroy ? io->_vt->destroy(io) : 0;
ret = io_type(io)->destroy ? io_type(io)->destroy(io) : 0;
if (ret)
return ret;
@ -133,11 +145,37 @@ int io_destroy(struct io *io)
free(io->in.buffer);
free(io->out.buffer);
if (io->flags & IO_DESTROY_SIGNALS) {
ret = list_destroy(io->signals, (dtor_cb_t) signal_decref, false);
if (ret)
return ret;
}
io->state = STATE_DESTROYED;
return 0;
}
int io_check(struct io *io)
{
assert(io->state != STATE_DESTROYED);
for (size_t i = 0; i < list_length(io->signals); i++) {
struct signal *sig = (struct signal *) list_at(io->signals, i);
if (sig->type == SIGNAL_TYPE_AUTO) {
if (io_type(io)->flags & IO_AUTO_DETECT_FORMAT)
continue;
return -1;
}
}
io->state = STATE_CHECKED;
return 0;
}
int io_stream_open(struct io *io, const char *uri)
{
int ret;
@ -306,18 +344,17 @@ int io_open(struct io *io, const char *uri)
{
int ret;
assert(io->state == STATE_INITIALIZED);
assert(io->state == STATE_CHECKED);
ret = io->_vt->open
? io->_vt->open(io, uri)
ret = io_type(io)->open
? io_type(io)->open(io, uri)
: io_stream_open(io, uri);
if (ret)
return ret;
io->header_printed = false;
io->state = STATE_OPENED;
io_header(io);
return 0;
}
@ -329,8 +366,8 @@ int io_close(struct io *io)
io_footer(io);
ret = io->_vt->close
? io->_vt->close(io)
ret = io_type(io)->close
? io_type(io)->close(io)
: io_stream_close(io);
if (ret)
return ret;
@ -344,8 +381,8 @@ int io_flush(struct io *io)
{
assert(io->state == STATE_OPENED);
return io->_vt->flush
? io->_vt->flush(io)
return io_type(io)->flush
? io_type(io)->flush(io)
: io_stream_flush(io);
}
@ -353,8 +390,8 @@ int io_eof(struct io *io)
{
assert(io->state == STATE_OPENED);
return io->_vt->eof
? io->_vt->eof(io)
return io_type(io)->eof
? io_type(io)->eof(io)
: io_stream_eof(io);
}
@ -362,8 +399,8 @@ void io_rewind(struct io *io)
{
assert(io->state == STATE_OPENED);
if (io->_vt->rewind)
io->_vt->rewind(io);
if (io_type(io)->rewind)
io_type(io)->rewind(io);
else
io_stream_rewind(io);
}
@ -372,25 +409,32 @@ int io_fd(struct io *io)
{
assert(io->state == STATE_OPENED);
return io->_vt->fd
? io->_vt->fd(io)
return io_type(io)->fd
? io_type(io)->fd(io)
: io_stream_fd(io);
}
void io_header(struct io *io)
const struct format_type * io_type(struct io *io)
{
return io->_vt;
}
void io_header(struct io *io, const struct sample *smp)
{
assert(io->state == STATE_OPENED);
if (io->_vt->header)
io->_vt->header(io);
if (io_type(io)->header)
io_type(io)->header(io, smp);
io->header_printed = true;
}
void io_footer(struct io *io)
{
assert(io->state == STATE_OPENED);
if (io->_vt->footer)
io->_vt->footer(io);
if (io_type(io)->footer)
io_type(io)->footer(io);
}
int io_print(struct io *io, struct sample *smps[], unsigned cnt)
@ -399,11 +443,14 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt)
assert(io->state == STATE_OPENED);
if (!io->header_printed && cnt > 0)
io_header(io, smps[0]);
if (io->flags & IO_NEWLINES)
ret = io_print_lines(io, smps, cnt);
else if (io->_vt->print)
ret = io->_vt->print(io, smps, cnt);
else if (io->_vt->sprint) {
else if (io_type(io)->print)
ret = io_type(io)->print(io, smps, cnt);
else if (io_type(io)->sprint) {
FILE *f = io_stream_output(io);
size_t wbytes;
@ -428,9 +475,9 @@ int io_scan(struct io *io, struct sample *smps[], unsigned cnt)
if (io->flags & IO_NEWLINES)
ret = io_scan_lines(io, smps, cnt);
else if (io->_vt->scan)
ret = io->_vt->scan(io, smps, cnt);
else if (io->_vt->sscan) {
else if (io_type(io)->scan)
ret = io_type(io)->scan(io, smps, cnt);
else if (io_type(io)->sscan) {
FILE *f = io_stream_input(io);
size_t bytes, rbytes;
@ -462,16 +509,16 @@ FILE * io_stream_input(struct io *io) {
: io->in.stream.std;
}
int io_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
int io_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt)
{
struct format_type *fmt = io->_vt;
assert(io->state == STATE_CHECKED || io->state == STATE_OPENED);
return fmt->sscan ? fmt->sscan(io, buf, len, rbytes, smps, cnt) : -1;
return io_type(io)->sscan ? io_type(io)->sscan(io, buf, len, rbytes, smps, cnt) : -1;
}
int io_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt)
{
struct format_type *fmt = io->_vt;
assert(io->state == STATE_CHECKED || io->state == STATE_OPENED);
return fmt->sprint ? fmt->sprint(io, buf, len, wbytes, smps, cnt) : -1;
return io_type(io)->sprint ? io_type(io)->sprint(io, buf, len, wbytes, smps, cnt) : -1;
}

View file

@ -41,12 +41,16 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct list *no
if (nodes) {
node = strtok(cpy, ".");
if (!node)
if (!node) {
warn("Missing node name");
goto invalid_format;
}
me->node = list_lookup(nodes, node);
if (!me->node)
if (!me->node) {
warn("Unknown node %s", node);
goto invalid_format;
}
type = strtok(NULL, ".[");
if (!type)
@ -65,16 +69,22 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct list *no
me->length = 1;
field = strtok(NULL, ".");
if (!field)
if (!field) {
warn("Missing stats type");
goto invalid_format;
}
subfield = strtok(NULL, ".");
if (!subfield)
if (!subfield) {
warn("Missing stats sub-type");
goto invalid_format;
}
id = stats_lookup_id(field);
if (id < 0)
if (id < 0) {
warn("Invalid stats type");
goto invalid_format;
}
me->stats.id = id;
@ -92,38 +102,48 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct list *no
me->stats.type = MAPPING_STATS_TYPE_VAR;
else if (!strcmp(subfield, "stddev"))
me->stats.type = MAPPING_STATS_TYPE_STDDEV;
else
else {
warn("Invalid stats sub-type");
goto invalid_format;
}
}
else if (!strcmp(type, "hdr")) {
me->type = MAPPING_TYPE_HEADER;
me->length = 1;
field = strtok(NULL, ".");
if (!field)
if (!field) {
warn("Missing header type");
goto invalid_format;
}
if (!strcmp(field, "sequence"))
me->header.type = MAPPING_HEADER_TYPE_SEQUENCE;
else if (!strcmp(field, "length"))
me->header.type = MAPPING_HEADER_TYPE_LENGTH;
else
else {
warn("Invalid header type");
goto invalid_format;
}
}
else if (!strcmp(type, "ts")) {
me->type = MAPPING_TYPE_TIMESTAMP;
me->length = 2;
field = strtok(NULL, ".");
if (!field)
if (!field) {
warn("Missing timestamp type");
goto invalid_format;
}
if (!strcmp(field, "origin"))
me->timestamp.type = MAPPING_TIMESTAMP_TYPE_ORIGIN;
else if (!strcmp(field, "received"))
me->timestamp.type = MAPPING_TIMESTAMP_TYPE_RECEIVED;
else
else {
warn("Invalid timestamp type");
goto invalid_format;
}
}
else if (!strcmp(type, "data")) {
char *first_str, *last_str;
@ -134,31 +154,36 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct list *no
first_str = strtok(NULL, "-]");
if (first_str) {
if (me->node)
first = list_lookup_index(&me->node->in.signals, first_str);
first = list_lookup_index(&me->node->signals, first_str);
if (first < 0) {
char *endptr;
first = strtoul(first_str, &endptr, 10);
if (endptr != first_str + strlen(first_str))
if (endptr != first_str + strlen(first_str)) {
warn("Failed to parse data range");
goto invalid_format;
}
}
}
else {
/* Map all signals */
me->data.offset = 0;
me->length = 0;
me->length = me->node ? list_length(&me->node->signals) : 0;
goto end;
}
last_str = strtok(NULL, "]");
if (last_str) {
if (me->node)
last = list_lookup_index(&me->node->in.signals, last_str);
last = list_lookup_index(&me->node->signals, last_str);
if (last < 0) {
char *endptr;
last = strtoul(last_str, &endptr, 10);
if (endptr != last_str + strlen(last_str))
if (endptr != last_str + strlen(last_str)) {
warn("Failed to parse data range");
goto invalid_format;
}
}
}
else
@ -189,11 +214,11 @@ invalid_format:
return -1;
}
int mapping_parse(struct mapping_entry *me, json_t *j, struct list *nodes)
int mapping_parse(struct mapping_entry *me, json_t *cfg, struct list *nodes)
{
const char *str;
str = json_string_value(j);
str = json_string_value(cfg);
if (!str)
return -1;
@ -250,17 +275,13 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
if (len + off > remapped->capacity)
return -1;
if (len + off > remapped->length)
remapped->length = len + off;
switch (me->type) {
case MAPPING_TYPE_STATS: {
const struct hist *h = &s->histograms[me->stats.id];
switch (me->stats.type) {
case MAPPING_STATS_TYPE_TOTAL:
sample_set_data_format(remapped, off, SAMPLE_DATA_FORMAT_INT);
remapped->data[off++].f = h->total;
remapped->data[off++].i = h->total;
break;
case MAPPING_STATS_TYPE_LAST:
remapped->data[off++].f = h->last;
@ -299,18 +320,13 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
return -1;
}
sample_set_data_format(remapped, off, SAMPLE_DATA_FORMAT_INT);
sample_set_data_format(remapped, off+1, SAMPLE_DATA_FORMAT_INT);
remapped->data[off++].i = ts->tv_sec;
remapped->data[off++].i = ts->tv_nsec;
break;
}
sample_set_data_format(remapped, off, SAMPLE_DATA_FORMAT_INT);
case MAPPING_TYPE_HEADER:
switch (me->header.type) {
case MAPPING_HEADER_TYPE_LENGTH:
remapped->data[off++].i = original->length;
@ -318,9 +334,6 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
case MAPPING_HEADER_TYPE_SEQUENCE:
remapped->data[off++].i = original->sequence;
break;
case MAPPING_HDR_FORMAT:
remapped->data[off++].i = original->format;
break;
default:
return -1;
}
@ -329,14 +342,10 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
case MAPPING_TYPE_DATA:
for (int j = me->data.offset; j < len + me->data.offset; j++) {
if (j >= original->length) {
sample_set_data_format(remapped, off, SAMPLE_DATA_FORMAT_FLOAT);
if (j >= original->length)
remapped->data[off++].f = 0;
}
else {
sample_set_data_format(remapped, off, sample_get_data_format(original, j));
else
remapped->data[off++] = original->data[j];
}
}
break;
@ -349,10 +358,6 @@ int mapping_remap(const struct list *m, struct sample *remapped, const struct sa
{
int ret;
/* We copy all the header fields */
remapped->sequence = original->sequence;
remapped->ts = original->ts;
for (size_t i = 0; i < list_length(m); i++) {
struct mapping_entry *me = (struct mapping_entry *) list_at(m, i);
@ -371,7 +376,7 @@ int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str)
assert(me->length == 0 || index < me->length);
if (me->node)
strcatf(str, "%s.", node_name(me->node));
strcatf(str, "%s.", node_name_short(me->node));
switch (me->type) {
case MAPPING_TYPE_STATS:

View file

@ -34,23 +34,27 @@
#include <villas/hash_table.h>
#include <villas/kernel/kernel.h>
struct hash_table allocations = { .state = STATE_DESTROYED };
static struct hash_table allocations = { .state = STATE_DESTROYED };
__attribute__((constructor))
static void init_allocations()
{
hash_table_init(&allocations, 100);
}
__attribute__((destructor))
static void destroy_allocations()
{
/** @todo: Release remaining allocations? */
hash_table_destroy(&allocations, NULL, false);
}
int memory_init(int hugepages)
{
int ret;
info("Initialize memory sub-system");
if (allocations.state == STATE_DESTROYED) {
ret = hash_table_init(&allocations, 100);
if (ret)
return ret;
}
info("Initialize memory sub-system: #hugepages=%d", hugepages);
#ifdef __linux__
int pagecnt, pagesz;
int pagecnt, pagesz, ret;
struct rlimit l;
pagecnt = kernel_get_nr_hugepages();
@ -109,17 +113,19 @@ void * memory_alloc(struct memory_type *m, size_t len)
}
void * memory_alloc_aligned(struct memory_type *m, size_t len, size_t alignment)
{
{
int ret;
struct memory_allocation *ma = m->alloc(m, len, alignment);
if(ma == NULL){
if (ma == NULL) {
warn("memory_alloc_aligned: allocating memory for memory_allocation failed for memory type %s. Reason: %s", m->name, strerror(errno) );
return NULL;
}
ret = hash_table_insert(&allocations, ma->address, ma);
if(ret){
if (ret) {
warn("memory_alloc_aligned: Inserting into hash table failed!");
return NULL;
}
debug(LOG_MEM | 5, "Allocated %#zx bytes of %#zx-byte-aligned %s memory: %p", ma->length, ma->alignment, ma->type->name, ma->address);

View file

@ -46,6 +46,9 @@ static int node_direction_init2(struct node_direction *nd, struct node *n)
ret = hook_init_builtin_list(&nd->hooks, nd->builtin, m, NULL, n);
if (ret)
return ret;
/* We sort the hooks according to their priority before starting the path */
list_sort(&nd->hooks, hook_cmp_priority);
#endif /* WITH_HOOKS */
return 0;
@ -53,10 +56,16 @@ static int node_direction_init2(struct node_direction *nd, struct node *n)
static int node_direction_init(struct node_direction *nd, struct node *n)
{
int ret;
nd->enabled = 0;
nd->vectorize = 1;
nd->builtin = 1;
ret = list_init(&nd->hooks);
if (ret)
return ret;
return 0;
}
@ -64,10 +73,6 @@ static int node_direction_destroy(struct node_direction *nd, struct node *n)
{
int ret;
ret = list_destroy(&nd->signals, (dtor_cb_t) signal_decref, true);
if (ret)
return ret;
#ifdef WITH_HOOKS
ret = list_destroy(&nd->hooks, (dtor_cb_t) hook_destroy, true);
if (ret)
@ -83,14 +88,12 @@ static int node_direction_parse(struct node_direction *nd, struct node *n, json_
json_error_t err;
json_t *json_hooks = NULL;
json_t *json_signals = NULL;
nd->cfg = cfg;
nd->enabled = 1;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b, s?: b }",
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: i, s?: b, s?: b }",
"hooks", &json_hooks,
"signals", &json_signals,
"vectorize", &nd->vectorize,
"builtin", &nd->builtin,
"enabled", &nd->enabled
@ -110,12 +113,6 @@ static int node_direction_parse(struct node_direction *nd, struct node *n, json_
}
#endif /* WITH_HOOKS */
if (json_signals) {
ret = signal_parse_list(&nd->signals, json_signals);
if (ret)
error("Failed to parse signal definition of node '%s'", node_name(n));
}
return 0;
}
@ -136,9 +133,6 @@ static int node_direction_start(struct node_direction *nd, struct node *n)
#ifdef WITH_HOOKS
int ret;
/* We sort the hooks according to their priority before starting the path */
list_sort(&nd->hooks, hook_cmp_priority);
for (size_t i = 0; i < list_length(&nd->hooks); i++) {
struct hook *h = (struct hook *) list_at(&nd->hooks, i);
@ -181,9 +175,9 @@ int node_init(struct node *n, struct node_type *vt)
n->_name = NULL;
n->_name_long = NULL;
/* Default values */
n->samplelen = DEFAULT_SAMPLE_LENGTH;
list_init(&n->signals);
/* Default values */
ret = node_direction_init(&n->in, n);
if (ret)
return ret;
@ -199,27 +193,61 @@ int node_init(struct node *n, struct node_type *vt)
return 0;
}
int node_init2(struct node *n)
{
int ret;
assert(n->state == STATE_CHECKED);
ret = node_direction_init2(&n->in, n);
if (ret)
return ret;
ret = node_direction_init2(&n->out, n);
if (ret)
return ret;
return 0;
}
int node_parse(struct node *n, json_t *json, const char *name)
{
struct node_type *nt;
int ret;
int ret, samplelen = DEFAULT_SAMPLE_LENGTH;
json_error_t err;
json_t *json_signals = NULL;
const char *type;
n->name = strdup(name);
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: i }",
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o, s?: i } }",
"type", &type,
"samplelen", &n->samplelen
"in",
"signals", &json_signals,
"samplelen", &samplelen
);
if (ret)
jerror(&err, "Failed to parse node '%s'", node_name(n));
jerror(&err, "Failed to parse node %s", node_name(n));
nt = node_type_lookup(type);
assert(nt == node_type(n));
if (nt->flags & NODE_TYPE_PROVIDES_SIGNALS) {
if (json_signals)
error("Node %s does not support signal definitions", node_name(n));
}
else {
if (json_signals) {
ret = signal_list_parse(&n->signals, json_signals);
if (ret)
error("Failed to parse signal definition of node %s", node_name(n));
}
else
signal_list_generate(&n->signals, samplelen, SIGNAL_TYPE_AUTO);
}
struct {
const char *str;
struct node_direction *dir;
@ -228,10 +256,10 @@ int node_parse(struct node *n, json_t *json, const char *name)
{ "out", &n->out }
};
const char *fields[] = { "builtin", "vectorize", "signals", "hooks" };
const char *fields[] = { "builtin", "vectorize", "hooks" };
for (int j = 0; j < ARRAY_LEN(dirs); j++) {
json_t *json_dir = json_object_get(json, dirs [j].str);
json_t *json_dir = json_object_get(json, dirs[j].str);
// Skip if direction is unused
if (!json_dir)
@ -291,22 +319,20 @@ int node_start(struct node *n)
assert(node_type(n)->state == STATE_STARTED);
info("Starting node %s", node_name_long(n));
{
ret = node_direction_start(&n->in, n);
if (ret)
return ret;
ret = node_direction_start(&n->out, n);
if (ret)
return ret;
ret = node_direction_start(&n->in, n);
if (ret)
return ret;
ret = node_type(n)->start ? node_type(n)->start(n) : 0;
if (ret)
return ret;
}
ret = node_direction_start(&n->out, n);
if (ret)
return ret;
ret = node_type(n)->start ? node_type(n)->start(n) : 0;
if (ret)
return ret;
n->state = STATE_STARTED;
n->sequence = 0;
return ret;
@ -343,6 +369,10 @@ int node_destroy(struct node *n)
int ret;
assert(n->state != STATE_DESTROYED && n->state != STATE_STARTED);
ret = list_destroy(&n->signals, (dtor_cb_t) signal_decref, false);
if (ret)
return ret;
ret = node_direction_destroy(&n->in, n);
if (ret)
return ret;
@ -351,11 +381,10 @@ int node_destroy(struct node *n)
if (ret)
return ret;
if (node_type(n)->destroy){
if (node_type(n)->destroy) {
ret = (int) node_type(n)->destroy(n);
if(ret){
if (ret)
return ret;
}
}
list_remove(&node_type(n)->instances, n);
@ -469,11 +498,13 @@ char * node_name_long(struct node *n)
if (node_type(n)->print) {
struct node_type *vt = node_type(n);
char *name_long = vt->print(n);
strcatf(&n->_name_long, "%s: #in.hooks=%zu, in.vectorize=%d, #out.hooks=%zu, out.vectorize=%d, samplelen=%d, %s",
strcatf(&n->_name_long, "%s: #in.signals=%zu, #in.hooks=%zu, in.vectorize=%d, #out.hooks=%zu, out.vectorize=%d, %s",
node_name(n),
list_length(&n->signals),
list_length(&n->in.hooks), n->in.vectorize,
list_length(&n->out.hooks), n->out.vectorize,
n->samplelen, name_long);
name_long
);
free(name_long);
}

View file

@ -38,6 +38,7 @@
#include <villas/memory.h>
#include <villas/stats.h>
#include <villas/node.h>
#include <villas/signal.h>
/* Forward declaration */
static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt);
@ -50,7 +51,7 @@ static int path_source_init(struct path_source *ps)
if (ps->node->_vt->pool_size)
pool_size = ps->node->_vt->pool_size;
ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(ps->node->in.samplelen), node_memory_type(ps->node, &memory_hugepage));
ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(list_length(&ps->node->signals)), node_memory_type(ps->node, &memory_hugepage));
if (ret)
return ret;
@ -115,7 +116,8 @@ static void path_source_read(struct path_source *ps, struct path *p, int i)
? sample_clone(p->last_sample)
: sample_clone(muxed_smps[i-1]);
muxed_smps[i]->sequence = p->last_sequence + 1;
muxed_smps[i]->sequence = p->last_sequence++;
muxed_smps[i]->ts = tomux_smps[i]->ts;
mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL);
}
@ -301,11 +303,25 @@ static void * path_run_poll(void *arg)
int path_init(struct path *p)
{
int ret;
assert(p->state == STATE_DESTROYED);
list_init(&p->destinations);
list_init(&p->sources);
list_init(&p->signals);
ret = list_init(&p->destinations);
if (ret)
return ret;
ret = list_init(&p->sources);
if (ret)
return ret;
ret = list_init(&p->signals);
if (ret)
return ret;
ret = list_init(&p->hooks);
if (ret)
return ret;
p->_name = NULL;
@ -379,7 +395,7 @@ int path_init2(struct path *p)
/* Initialize destinations */
struct memory_type *pool_mt = &memory_hugepage;
int pool_size = (unsigned) MAX(1, list_length(&p->destinations) * p->queuelen);
int pool_size = MAX(1, list_length(&p->destinations)) * p->queuelen;
for (size_t i = 0; i < list_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) list_at(&p->destinations, i);
@ -395,6 +411,9 @@ int path_init2(struct path *p)
return ret;
}
bitset_init(&p->received, list_length(&p->sources));
bitset_init(&p->mask, list_length(&p->sources));
/* Initialize sources */
for (size_t i = 0; i < list_length(&p->sources); i++) {
struct path_source *ps = (struct path_source *) list_at(&p->sources, i);
@ -402,16 +421,6 @@ int path_init2(struct path *p)
ret = path_source_init(ps);
if (ret)
return ret;
}
bitset_init(&p->received, list_length(&p->sources));
bitset_init(&p->mask, list_length(&p->sources));
/* Calc sample length of path and initialize bitset */
p->samplelen = 0;
for (size_t i = 0; i < list_length(&p->sources); i++) {
struct path_source *ps = (struct path_source *) list_at(&p->sources, i);
if (ps->masked)
bitset_set(&p->mask, i);
@ -419,25 +428,42 @@ int path_init2(struct path *p)
for (size_t i = 0; i < list_length(&ps->mappings); i++) {
struct mapping_entry *me = (struct mapping_entry *) list_at(&ps->mappings, i);
int len = me->length;
int off = me->offset;
int len = me->length;
if (off + len > p->samplelen)
p->samplelen = off + len;
for (int j = 0; j < len; j++) {
struct signal *sig;
/* For data mappings we simple refer to the existing
* signal descriptors of the source node. */
if (me->type == MAPPING_TYPE_DATA) {
sig = (struct signal *) list_at_safe(&me->node->signals, me->data.offset + j);
if (!sig) {
warn("Failed to create signal description for path %s", path_name(p));
continue;
}
signal_incref(sig);
}
/* For other mappings we create new signal descriptors */
else {
sig = alloc(sizeof(struct signal));
ret = signal_init_from_mapping(sig, me, j);
if (ret)
return -1;
}
list_extend(&p->signals, off + j + 1, NULL);
list_set(&p->signals, off + j, sig);
}
}
}
if (!p->samplelen)
p->samplelen = DEFAULT_SAMPLE_LENGTH;
ret = pool_init(&p->pool, pool_size, SAMPLE_LENGTH(p->samplelen), pool_mt);
ret = pool_init(&p->pool, pool_size, SAMPLE_LENGTH(list_length(&p->signals)), pool_mt);
if (ret)
return ret;
p->last_sample = sample_alloc(&p->pool);
if (!p->last_sample)
return -1;
/* Prepare poll() */
if (p->poll) {
ret = path_init_poll(p);
@ -669,15 +695,16 @@ int path_start(struct path *p)
mask = bitset_dump(&p->mask);
info("Starting path %s: mode=%s, poll=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, samplelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu",
info("Starting path %s: #signals=%zu, mode=%s, poll=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu",
path_name(p),
list_length(&p->signals),
mode,
p->poll ? "yes" : "no",
mask,
p->rate,
p->enabled ? "yes" : "no",
p->reverse ? "yes" : "no",
p->queuelen, p->samplelen,
p->queuelen,
list_length(&p->hooks),
list_length(&p->sources),
list_length(&p->destinations)
@ -699,25 +726,20 @@ int path_start(struct path *p)
bitset_clear_all(&p->received);
/* We initialize the intial sample with zeros */
for (size_t i = 0; i < list_length(&p->sources); i++) {
struct path_source *ps = (struct path_source *) list_at(&p->sources, i);
/* We initialize the intial sample */
p->last_sample = sample_alloc(&p->pool);
if (!p->last_sample)
return -1;
for (size_t j = 0; j < list_length(&ps->mappings); j++) {
struct mapping_entry *me = (struct mapping_entry *) list_at(&ps->mappings, j);
p->last_sample->length = list_length(&p->signals);
p->last_sample->signals = &p->signals;
p->last_sample->sequence = 0;
p->last_sample->flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_DATA;
int len = me->length;
int off = me->offset;
for (size_t i = 0; i < p->last_sample->length; i++) {
struct signal *sig = (struct signal *) list_at(p->last_sample->signals, i);
if (len + off > p->last_sample->length)
p->last_sample->length = len + off;
for (int k = off; k < off + len; k++) {
p->last_sample->data[k].f = 0;
sample_set_data_format(p->last_sample, k, SAMPLE_DATA_FORMAT_FLOAT);
}
}
p->last_sample->data[i] = sig->init;
}
/* Start one thread per path for sending to destinations
@ -762,6 +784,8 @@ int path_stop(struct path *p)
}
#endif /* WITH_HOOKS */
sample_decref(p->last_sample);
p->state = STATE_STOPPED;
return 0;
@ -777,7 +801,7 @@ int path_destroy(struct path *p)
#endif
list_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true);
list_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true);
list_destroy(&p->signals, (dtor_cb_t) signal_decref, true);
list_destroy(&p->signals, (dtor_cb_t) signal_decref, false);
if (p->reader.pfds)
free(p->reader.pfds);

View file

@ -28,6 +28,8 @@
#include <villas/sample.h>
#include <villas/utils.h>
#include <villas/timing.h>
#include <villas/signal.h>
#include <villas/list.h>
int sample_init(struct sample *s)
{
@ -150,6 +152,7 @@ int sample_copy(struct sample *dst, struct sample *src)
dst->sequence = src->sequence;
dst->flags = src->flags;
dst->ts = src->ts;
dst->signals = src->signals;
memcpy(&dst->data, &src->data, SAMPLE_DATA_LENGTH(dst->length));
@ -217,7 +220,7 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags)
}
/* Compare timestamp */
if (flags & SAMPLE_HAS_ORIGIN) {
if (flags & SAMPLE_HAS_TS_ORIGIN) {
if (time_delta(&a->ts.origin, &b->ts.origin) > epsilon) {
printf("ts.origin: %f != %f\n", time_to_double(&a->ts.origin), time_to_double(&b->ts.origin));
return 3;
@ -225,27 +228,47 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags)
}
/* Compare data */
if (flags & SAMPLE_HAS_VALUES) {
if (flags & SAMPLE_HAS_DATA) {
if (a->length != b->length) {
printf("length: %d != %d\n", a->length, b->length);
return 4;
}
for (int i = 0; i < a->length; i++) {
switch (sample_get_data_format(a, i)) {
case SAMPLE_DATA_FORMAT_FLOAT:
/* Compare format */
if (sample_format(a, i) != sample_format(b, i))
return 6;
switch (sample_format(a, i)) {
case SIGNAL_TYPE_FLOAT:
if (fabs(a->data[i].f - b->data[i].f) > epsilon) {
printf("data[%d].f: %f != %f\n", i, a->data[i].f, b->data[i].f);
return 5;
}
break;
case SAMPLE_DATA_FORMAT_INT:
case SIGNAL_TYPE_INTEGER:
if (a->data[i].i != b->data[i].i) {
printf("data[%d].i: %" PRId64 " != %" PRId64 "\n", i, a->data[i].i, b->data[i].i);
return 5;
}
break;
case SIGNAL_TYPE_BOOLEAN:
if (a->data[i].b != b->data[i].b) {
printf("data[%d].b: %s != %s\n", i, a->data[i].b ? "true" : "false", b->data[i].b ? "true" : "false");
return 5;
}
break;
case SIGNAL_TYPE_COMPLEX:
if (cabs(a->data[i].z - b->data[i].z) > epsilon) {
printf("data[%d].z: %f+%fi != %f+%fi\n", i, creal(a->data[i].z), cimag(a->data[i].z), creal(b->data[i].z), cimag(b->data[i].z));
return 5;
}
break;
default: { }
}
}
}
@ -253,11 +276,11 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags)
return 0;
}
enum signal_format sample_format(const struct sample *s, unsigned idx)
enum signal_type sample_format(const struct sample *s, unsigned idx)
{
struct signal *sig;
sig = (struct signal *) list_at_safe(s->signals, idx);
return sig ? sig->format : SIGNAL_FORMAT_UNKNOWN;
return sig ? sig->type : SIGNAL_TYPE_AUTO;
}

View file

@ -34,7 +34,7 @@ int signal_init(struct signal *s)
s->name = NULL;
s->unit = NULL;
s->format = SIGNAL_FORMAT_AUTO;
s->type = SIGNAL_TYPE_AUTO;
s->refcnt = ATOMIC_VAR_INIT(1);
@ -57,7 +57,7 @@ int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, u
case MAPPING_TYPE_STATS:
switch (me->stats.type) {
case MAPPING_STATS_TYPE_TOTAL:
s->format = SIGNAL_FORMAT_INT;
s->type = SIGNAL_TYPE_INTEGER;
break;
case MAPPING_STATS_TYPE_LAST:
@ -66,7 +66,7 @@ int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, u
case MAPPING_STATS_TYPE_MEAN:
case MAPPING_STATS_TYPE_VAR:
case MAPPING_STATS_TYPE_STDDEV:
s->format = SIGNAL_FORMAT_FLOAT;
s->type = SIGNAL_TYPE_FLOAT;
break;
}
break;
@ -75,13 +75,13 @@ int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, u
switch (me->header.type) {
case MAPPING_HEADER_TYPE_LENGTH:
case MAPPING_HEADER_TYPE_SEQUENCE:
s->format = SIGNAL_FORMAT_INT;
s->type = SIGNAL_TYPE_INTEGER;
break;
}
break;
case MAPPING_TYPE_TIMESTAMP:
s->format = SIGNAL_FORMAT_INT;
s->type = SIGNAL_TYPE_INTEGER;
break;
case MAPPING_TYPE_DATA:
@ -103,7 +103,7 @@ int signal_destroy(struct signal *s)
return 0;
}
struct signal * signal_create(const char *name, const char *unit, enum signal_format fmt)
struct signal * signal_create(const char *name, const char *unit, enum signal_type fmt)
{
int ret;
struct signal *sig;
@ -122,7 +122,7 @@ struct signal * signal_create(const char *name, const char *unit, enum signal_fo
if (unit)
sig->unit = strdup(unit);
sig->format = fmt;
sig->type = fmt;
return sig;
}
@ -156,6 +156,29 @@ int signal_decref(struct signal *s)
return prev - 1;
}
struct signal * signal_copy(struct signal *s)
{
struct signal *ns;
ns = alloc(sizeof(struct signal));
if (!ns)
return NULL;
signal_init(ns);
ns->type = s->type;
ns->init = s->init;
ns->enabled = s->enabled;
if (s->name)
ns->name = strdup(s->name);
if (s->unit)
ns->name = strdup(s->unit);
return ns;
}
int signal_parse(struct signal *s, json_t *cfg)
{
int ret;
@ -163,12 +186,12 @@ int signal_parse(struct signal *s, json_t *cfg)
json_t *json_init = NULL;
const char *name = NULL;
const char *unit = NULL;
const char *format = NULL;
const char *type = NULL;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: o, s?: b }",
"name", &name,
"unit", &unit,
"format", &format,
"type", &type,
"init", &json_init,
"enabled", &s->enabled
);
@ -181,9 +204,9 @@ int signal_parse(struct signal *s, json_t *cfg)
if (unit)
s->unit = strdup(unit);
if (format) {
s->format = signal_format_from_str(format);
if (s->format == SIGNAL_FORMAT_INVALID)
if (type) {
s->type = signal_type_from_str(type);
if (s->type == SIGNAL_TYPE_INVALID)
return -1;
}
@ -229,7 +252,7 @@ int signal_list_parse(struct list *list, json_t *cfg)
return 0;
}
int signal_list_generate(struct list *list, unsigned len, enum signal_format fmt)
int signal_list_generate(struct list *list, unsigned len, enum signal_type fmt)
{
for (int i = 0; i < len; i++) {
char name[32];
@ -253,126 +276,126 @@ void signal_list_dump(const struct list *list)
struct signal *sig = list_at(list, i);
if (sig->unit)
info(" %d: %s [%s] = %s", i, sig->name, sig->unit, signal_format_to_str(sig->format));
info(" %d: %s [%s] = %s", i, sig->name, sig->unit, signal_type_to_str(sig->type));
else
info(" %d: %s = %s", i, sig->name, signal_format_to_str(sig->format));
info(" %d: %s = %s", i, sig->name, signal_type_to_str(sig->type));
}
}
/* Signal format */
/* Signal type */
enum signal_format signal_format_from_str(const char *str)
enum signal_type signal_type_from_str(const char *str)
{
if (!strcmp(str, "boolean"))
return SIGNAL_FORMAT_BOOL;
return SIGNAL_TYPE_BOOLEAN;
else if (!strcmp(str, "complex"))
return SIGNAL_FORMAT_COMPLEX;
return SIGNAL_TYPE_COMPLEX;
else if (!strcmp(str, "float"))
return SIGNAL_FORMAT_FLOAT;
return SIGNAL_TYPE_FLOAT;
else if (!strcmp(str, "integer"))
return SIGNAL_FORMAT_INT;
return SIGNAL_TYPE_INTEGER;
else if (!strcmp(str, "auto"))
return SIGNAL_FORMAT_AUTO;
return SIGNAL_TYPE_AUTO;
else
return SIGNAL_FORMAT_INVALID;
return SIGNAL_TYPE_INVALID;
}
const char * signal_format_to_str(enum signal_format fmt)
const char * signal_type_to_str(enum signal_type fmt)
{
switch (fmt) {
case SIGNAL_FORMAT_BOOL:
case SIGNAL_TYPE_BOOLEAN:
return "boolean";
case SIGNAL_FORMAT_COMPLEX:
case SIGNAL_TYPE_COMPLEX:
return "complex";
case SIGNAL_FORMAT_FLOAT:
case SIGNAL_TYPE_FLOAT:
return "float";
case SIGNAL_FORMAT_INT:
case SIGNAL_TYPE_INTEGER:
return "integer";
case SIGNAL_FORMAT_AUTO:
case SIGNAL_TYPE_AUTO:
return "auto";
case SIGNAL_FORMAT_INVALID:
case SIGNAL_TYPE_INVALID:
return "invalid";
}
return NULL;
}
enum signal_format signal_format_detect(const char *val)
enum signal_type signal_type_detect(const char *val)
{
char *brk;
int len;
debug(LOG_IO | 5, "Attempt to detect format of: %s", val);
debug(LOG_IO | 5, "Attempt to detect type of: %s", val);
brk = strchr(val, 'i');
if (brk)
return SIGNAL_FORMAT_COMPLEX;
return SIGNAL_TYPE_COMPLEX;
brk = strchr(val, '.');
if (brk)
return SIGNAL_FORMAT_FLOAT;
return SIGNAL_TYPE_FLOAT;
len = strlen(val);
if (len == 1 && (val[0] == '1' || val[0] == '0'))
return SIGNAL_FORMAT_BOOL;
return SIGNAL_TYPE_BOOLEAN;
return SIGNAL_FORMAT_INT;
return SIGNAL_TYPE_INTEGER;
}
/* Signal data */
void signal_data_set(union signal_data *data, const struct signal *sig, double val)
{
switch (sig->format) {
case SIGNAL_FORMAT_BOOL:
switch (sig->type) {
case SIGNAL_TYPE_BOOLEAN:
data->b = val;
break;
case SIGNAL_FORMAT_FLOAT:
case SIGNAL_TYPE_FLOAT:
data->f = val;
break;
case SIGNAL_FORMAT_INT:
case SIGNAL_TYPE_INTEGER:
data->i = val;
break;
case SIGNAL_FORMAT_COMPLEX:
case SIGNAL_TYPE_COMPLEX:
data->z = val;
break;
case SIGNAL_FORMAT_INVALID:
case SIGNAL_FORMAT_AUTO:
case SIGNAL_TYPE_INVALID:
case SIGNAL_TYPE_AUTO:
memset(data, 0, sizeof(union signal_data));
break;
}
}
void signal_data_convert(union signal_data *data, const struct signal *from, const struct signal *to)
void signal_data_cast(union signal_data *data, const struct signal *from, const struct signal *to)
{
if (from == to) /* Nothing to do */
if (from->type == to->type) /* Nothing to do */
return;
switch (to->format) {
case SIGNAL_FORMAT_BOOL:
switch(from->format) {
case SIGNAL_FORMAT_BOOL:
switch (to->type) {
case SIGNAL_TYPE_BOOLEAN:
switch(from->type) {
case SIGNAL_TYPE_BOOLEAN:
data->b = data->b;
break;
case SIGNAL_FORMAT_INT:
case SIGNAL_TYPE_INTEGER:
data->b = data->i;
break;
case SIGNAL_FORMAT_FLOAT:
case SIGNAL_TYPE_FLOAT:
data->b = data->f;
break;
case SIGNAL_FORMAT_COMPLEX:
case SIGNAL_TYPE_COMPLEX:
data->b = creal(data->z);
break;
@ -380,21 +403,21 @@ void signal_data_convert(union signal_data *data, const struct signal *from, con
}
break;
case SIGNAL_FORMAT_INT:
switch(from->format) {
case SIGNAL_FORMAT_BOOL:
case SIGNAL_TYPE_INTEGER:
switch(from->type) {
case SIGNAL_TYPE_BOOLEAN:
data->i = data->b;
break;
case SIGNAL_FORMAT_INT:
case SIGNAL_TYPE_INTEGER:
data->i = data->i;
break;
case SIGNAL_FORMAT_FLOAT:
case SIGNAL_TYPE_FLOAT:
data->i = data->f;
break;
case SIGNAL_FORMAT_COMPLEX:
case SIGNAL_TYPE_COMPLEX:
data->i = creal(data->z);
break;
@ -402,21 +425,21 @@ void signal_data_convert(union signal_data *data, const struct signal *from, con
}
break;
case SIGNAL_FORMAT_FLOAT:
switch(from->format) {
case SIGNAL_FORMAT_BOOL:
case SIGNAL_TYPE_FLOAT:
switch(from->type) {
case SIGNAL_TYPE_BOOLEAN:
data->f = data->b;
break;
case SIGNAL_FORMAT_INT:
case SIGNAL_TYPE_INTEGER:
data->f = data->i;
break;
case SIGNAL_FORMAT_FLOAT:
case SIGNAL_TYPE_FLOAT:
data->f = data->f;
break;
case SIGNAL_FORMAT_COMPLEX:
case SIGNAL_TYPE_COMPLEX:
data->f = creal(data->z);
break;
@ -424,21 +447,21 @@ void signal_data_convert(union signal_data *data, const struct signal *from, con
}
break;
case SIGNAL_FORMAT_COMPLEX:
switch(from->format) {
case SIGNAL_FORMAT_BOOL:
case SIGNAL_TYPE_COMPLEX:
switch(from->type) {
case SIGNAL_TYPE_BOOLEAN:
data->z = CMPLXF(data->b, 0);
break;
case SIGNAL_FORMAT_INT:
case SIGNAL_TYPE_INTEGER:
data->z = CMPLXF(data->i, 0);
break;
case SIGNAL_FORMAT_FLOAT:
case SIGNAL_TYPE_FLOAT:
data->z = CMPLXF(data->f, 0);
break;
case SIGNAL_FORMAT_COMPLEX:
case SIGNAL_TYPE_COMPLEX:
data->z = data->z;
break;
@ -452,20 +475,20 @@ void signal_data_convert(union signal_data *data, const struct signal *from, con
int signal_data_parse_str(union signal_data *data, const struct signal *sig, const char *ptr, char **end)
{
switch (sig->format) {
case SIGNAL_FORMAT_FLOAT:
switch (sig->type) {
case SIGNAL_TYPE_FLOAT:
data->f = strtod(ptr, end);
break;
case SIGNAL_FORMAT_INT:
case SIGNAL_TYPE_INTEGER:
data->i = strtol(ptr, end, 10);
break;
case SIGNAL_FORMAT_BOOL:
case SIGNAL_TYPE_BOOLEAN:
data->b = strtol(ptr, end, 10);
break;
case SIGNAL_FORMAT_COMPLEX: {
case SIGNAL_TYPE_COMPLEX: {
float real, imag;
real = strtod(ptr, end);
@ -487,8 +510,8 @@ int signal_data_parse_str(union signal_data *data, const struct signal *sig, con
break;
}
case SIGNAL_FORMAT_AUTO:
case SIGNAL_FORMAT_INVALID:
case SIGNAL_TYPE_AUTO:
case SIGNAL_TYPE_INVALID:
return -1;
}
@ -499,20 +522,20 @@ int signal_data_parse_json(union signal_data *data, const struct signal *sig, js
{
int ret;
switch (sig->format) {
case SIGNAL_FORMAT_FLOAT:
switch (sig->type) {
case SIGNAL_TYPE_FLOAT:
data->f = json_real_value(cfg);
break;
case SIGNAL_FORMAT_INT:
case SIGNAL_TYPE_INTEGER:
data->i = json_integer_value(cfg);
break;
case SIGNAL_FORMAT_BOOL:
case SIGNAL_TYPE_BOOLEAN:
data->b = json_boolean_value(cfg);
break;
case SIGNAL_FORMAT_COMPLEX: {
case SIGNAL_TYPE_COMPLEX: {
double real, imag;
ret = json_unpack(cfg, "{ s: F, s: F }",
@ -526,8 +549,8 @@ int signal_data_parse_json(union signal_data *data, const struct signal *sig, js
break;
}
case SIGNAL_FORMAT_INVALID:
case SIGNAL_FORMAT_AUTO:
case SIGNAL_TYPE_INVALID:
case SIGNAL_TYPE_AUTO:
return -1;
}
@ -536,17 +559,17 @@ int signal_data_parse_json(union signal_data *data, const struct signal *sig, js
int signal_data_snprint(const union signal_data *data, const struct signal *sig, char *buf, size_t len)
{
switch (sig->format) {
case SIGNAL_FORMAT_FLOAT:
switch (sig->type) {
case SIGNAL_TYPE_FLOAT:
return snprintf(buf, len, "%.6f", data->f);
case SIGNAL_FORMAT_INT:
case SIGNAL_TYPE_INTEGER:
return snprintf(buf, len, "%" PRIi64, data->i);
case SIGNAL_FORMAT_BOOL:
case SIGNAL_TYPE_BOOLEAN:
return snprintf(buf, len, "%u", data->b);
case SIGNAL_FORMAT_COMPLEX:
case SIGNAL_TYPE_COMPLEX:
return snprintf(buf, len, "%.6f%+.6fi", creal(data->z), cimag(data->z));
default:

View file

@ -384,12 +384,12 @@ int super_node_start(struct super_node *sn)
for (size_t i = 0; i < list_length(&sn->nodes); i++) {
struct node *n = (struct node *) list_at(&sn->nodes, i);
ret = node_init2(n);
if (ret)
error("Failed to prepare node: %s", node_name(n));
int refs = list_count(&sn->paths, (cmp_cb_t) path_uses_node, n);
if (refs > 0) {
ret = node_init2(n);
if (ret)
error("Failed to start node: %s", node_name(n));
ret = node_start(n);
if (ret)
error("Failed to start node: %s", node_name(n));
@ -405,7 +405,7 @@ int super_node_start(struct super_node *sn)
if (p->enabled) {
ret = path_init2(p);
if (ret)
error("Failed to start path: %s", path_name(p));
error("Failed to prepare path: %s", path_name(p));
ret = path_start(p);
if (ret)

View file

@ -108,10 +108,14 @@ check: if (optarg == endptr)
if (!fmt)
error("Invalid format: %s", dirs[i].name);
ret = io_init(dirs[i].io, fmt, NULL, SAMPLE_HAS_ALL);
ret = io_init_auto(dirs[i].io, fmt, DEFAULT_SAMPLE_LENGTH, SAMPLE_HAS_ALL);
if (ret)
error("Failed to initialize IO: %s", dirs[i].name);
ret = io_check(dirs[i].io);
if (ret)
error("Failed to validate IO configuration");
ret = io_open(dirs[i].io, NULL);
if (ret)
error("Failed to open IO");

View file

@ -200,10 +200,14 @@ check: if (optarg == endptr)
if (!ft)
error("Unknown IO format '%s'", format);
ret = io_init(&io, ft, NULL, SAMPLE_HAS_ALL);
ret = io_init_auto(&io, ft, DEFAULT_SAMPLE_LENGTH, SAMPLE_HAS_ALL);
if (ret)
error("Failed to initialize IO");
ret = io_check(&io);
if (ret)
error("Failed to validate IO configuration");
ret = io_open(&io, NULL);
if (ret)
error("Failed to open IO");

View file

@ -27,6 +27,7 @@
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include <iostream>
@ -205,7 +206,7 @@ static void * recv_loop(void *ctx)
/* Initialize memory */
unsigned pool_size = node_type(node)->pool_size ? node_type(node)->pool_size : LOG2_CEIL(node->in.vectorize);
ret = pool_init(&recvv.pool, pool_size, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), node_memory_type(node, &memory_hugepage));
ret = pool_init(&recvv.pool, pool_size, SAMPLE_LENGTH(list_length(&node->signals)), node_memory_type(node, &memory_hugepage));
if (ret < 0)
error("Failed to allocate memory for receive pool.");
@ -222,16 +223,15 @@ static void * recv_loop(void *ctx)
recv = node_read(node, smps, allocated, &release);
if (recv < 0)
warn("Failed to receive samples from node %s: reason=%d", node_name(node), recv);
else {
io_print(&io, smps, recv);
io_print(&io, smps, recv);
cnt += recv;
if (recvv.limit > 0 && cnt >= recvv.limit)
goto leave;
}
sample_decref_many(smps, release);
cnt += recv;
if (recvv.limit > 0 && cnt >= recvv.limit)
goto leave;
pthread_testcancel();
}
@ -343,10 +343,14 @@ check: if (optarg == endptr)
if (!fmt)
error("Invalid format: %s", format);
ret = io_init(&io, fmt, NULL, SAMPLE_HAS_ALL);
ret = io_init_auto(&io, fmt, DEFAULT_SAMPLE_LENGTH, SAMPLE_HAS_ALL);
if (ret)
error("Failed to initialize IO");
ret = io_check(&io);
if (ret)
error("Failed to validate IO configuration");
ret = io_open(&io, NULL);
if (ret)
error("Failed to open IO");
@ -358,8 +362,13 @@ check: if (optarg == endptr)
#ifdef Libwebsockets_FOUND
/* Only start web subsystem if villas-pipe is used with a websocket node */
if (node->_vt->start == websocket_start) {
web_start(&sn.web);
api_start(&sn.api);
ret = web_start(&sn.web);
if (ret)
error("Failed to start web subsystem");
ret = api_start(&sn.api);
if (ret)
error("Failed to start API subsystem");
}
#endif /* Libwebsockets_FOUND */
@ -368,7 +377,7 @@ check: if (optarg == endptr)
ret = node_type_start(node->_vt, &sn);
if (ret)
error("Failed to intialize node type: %s", node_type_name(node->_vt));
error("Failed to intialize node type %s: reason=%d", node_type_name(node->_vt), ret);
ret = node_check(node);
if (ret)

View file

@ -186,6 +186,7 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
int main(int argc, char *argv[])
{
int ret;
json_t *cfg;
struct node_type *nt;
struct format_type *ft;
@ -207,6 +208,12 @@ int main(int argc, char *argv[])
if (ret)
error("Failed to intialize signals");
ft = format_type_lookup(format);
if (!ft)
error("Invalid output format '%s'", format);
memory_init(0); // Otherwise, ht->size in hash_table_hash() will be zero
nt = node_type_lookup("signal");
if (!nt)
error("Signal generation is not supported.");
@ -215,19 +222,6 @@ int main(int argc, char *argv[])
if (ret)
error("Failed to initialize node");
ft = format_type_lookup(format);
if (!ft)
error("Invalid output format '%s'", format);
memory_init(0); // Otherwise, ht->size in hash_table_hash() will be zero
ret = io_init(&io, ft, NULL, IO_FLUSH | (SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET));
if (ret)
error("Failed to initialize output");
ret = io_open(&io, NULL);
if (ret)
error("Failed to open output");
cfg = parse_cli(argc, argv);
if (!cfg) {
usage();
@ -249,7 +243,7 @@ int main(int argc, char *argv[])
if (ret)
error("Failed to verify node configuration");
ret = pool_init(&q, 16, SAMPLE_LENGTH(n.in.samplelen), &memory_heap);
ret = pool_init(&q, 16, SAMPLE_LENGTH(list_length(&n.signals)), &memory_heap);
if (ret)
error("Failed to initialize pool");
@ -259,7 +253,19 @@ int main(int argc, char *argv[])
ret = node_start(&n);
if (ret)
serror("Failed to start node");
error("Failed to start node %s: reason=%d", node_name(&n), ret);
ret = io_init(&io, ft, &n.signals, IO_FLUSH | (SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET));
if (ret)
error("Failed to initialize output");
ret = io_check(&io);
if (ret)
error("Failed to validate IO configuration");
ret = io_open(&io, NULL);
if (ret)
error("Failed to open output");
for (;;) {
t = sample_alloc(&q);

View file

@ -69,12 +69,12 @@ void usage()
int main(int argc, char *argv[])
{
int ret;
int ret, rc = 0;
/* Default values */
double epsilon = 1e-9;
const char *format = "villas.human";
int flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_VALUES | SAMPLE_HAS_ORIGIN;
int flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA | SAMPLE_HAS_TS_ORIGIN;
struct pool pool = { .state = STATE_DESTROYED };
@ -86,10 +86,10 @@ int main(int argc, char *argv[])
epsilon = strtod(optarg, &endptr);
goto check;
case 'v':
flags &= ~SAMPLE_HAS_VALUES;
flags &= ~SAMPLE_HAS_DATA;
break;
case 't':
flags &= ~SAMPLE_HAS_ORIGIN;
flags &= ~SAMPLE_HAS_TS_ORIGIN;
break;
case 's':
flags &= ~SAMPLE_HAS_SEQUENCE;
@ -138,10 +138,14 @@ check: if (optarg == endptr)
if (!s[i].fmt)
error("Invalid IO format: %s", s[i].format);
ret = io_init(&s[i].io, s[i].fmt, NULL, 0);
ret = io_init_auto(&s[i].io, s[i].fmt, DEFAULT_SAMPLE_LENGTH, 0);
if (ret)
error("Failed to initialize IO");
ret = io_check(&s[i].io);
if (ret)
error("Failed to validate IO configuration");
ret = io_open(&s[i].io, s[i].path);
if (ret)
error("Failed to open file: %s", s[i].path);
@ -168,7 +172,7 @@ retry: eofs = 0;
ret = 0;
else {
std::cout << "length unequal" << std::endl;
ret = 1;
rc = 1;
}
goto out;
@ -180,15 +184,16 @@ retry: eofs = 0;
if (ret <= 0)
failed++;
}
if (failed)
goto retry;
/* We compare all files against the first one */
for (int i = 1; i < n; i++) {
ret = sample_cmp(s[0].sample, s[i].sample, epsilon, flags);
if (ret)
if (ret) {
rc = ret;
goto out;
}
}
line++;
@ -210,5 +215,5 @@ out: for (int i = 0; i < n; i++) {
if (ret)
error("Failed to destroy pool");
return ret;
return rc;
}