diff --git a/etc/comedi.conf b/etc/comedi.conf index a224f75aa..04905614d 100644 --- a/etc/comedi.conf +++ b/etc/comedi.conf @@ -60,8 +60,13 @@ nodes = { type = "socket", layer = "udp" format = "protobuf", - local = "*:12000" - remote = "134.130.169.32:12000" + + in = { + address = "*:12000" + }, + out = { + remote = "134.130.169.32:12000" + } }, sine1 = { diff --git a/etc/eric-lab.conf b/etc/eric-lab.conf index e5d25105f..2c43ae58d 100644 --- a/etc/eric-lab.conf +++ b/etc/eric-lab.conf @@ -51,14 +51,24 @@ nodes = { socket1 = { type = "socket", layper = "udp", - local = "*:12000", - remote = "127.0.0.1:12001" + + in = { + address = "*:12000" + }, + out = { + address = "127.0.0.1:12001" + } }, socket2 = { type = "socket", - layper = "udp", - local = "*:12001", - remote = "127.0.0.1:12000" + layer = "udp", + + in = { + address = "*:12001" + }, + out = { + address = "127.0.0.1:1200" + } } }; diff --git a/etc/example.conf b/etc/example.conf index 9ea618331..c0a5a1970 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -64,29 +64,32 @@ nodes = { format = "gtnet.fake", # For a list of available node-types run: 'villas-node -h' - verify_source = true, # Check if source address of incoming packets matches the remote address. + in = { + address = "127.0.0.1:12001" # This node only received messages on this IP:Port pair + + verify_source = true, # Check if source address of incoming packets matches the remote address. + multicast = { # IGMP multicast is only support for layer = (ip|udp) + enabled = true, - local = "127.0.0.1:12001", # This node only received messages on this IP:Port pair - remote = "127.0.0.1:12000", # This node sents outgoing messages to this IP:Port pair - - netem = { # Network emulation settings - enabled = true, - # Those settings can be specified for each node invidually! - delay = 100000, # Additional latency in microseconds - jitter = 30000, # Jitter in uS - distribution = "normal", # Distribution of delay: uniform, normal, pareto, paretonormal - loss = 10 # Packet loss in percent - duplicate = 10, # Duplication in percent - corrupt = 10 # Corruption in percent + group = "224.1.2.3", # The multicast group. Must be within 224.0.0.0/4 + interface = "1.2.3.4", # The IP address of the interface which should receive multicast packets. + ttl = 128, # The time to live for outgoing multicast packets. + loop = false, # Whether or not to loopback outgoing multicast packets to the local host. + } }, - - multicast = { # IGMP multicast is only support for layer = (ip|udp) - enabled = true, - - group = "224.1.2.3", # The multicast group. Must be within 224.0.0.0/4 - interface = "1.2.3.4", # The IP address of the interface which should receive multicast packets. - ttl = 128, # The time to live for outgoing multicast packets. - loop = false, # Whether or not to loopback outgoing multicast packets to the local host. + out = { + address = "127.0.0.1:12000", # This node sents outgoing messages to this IP:Port pair + + netem = { # Network emulation settings + enabled = true, + # Those settings can be specified for each node invidually! + delay = 100000, # Additional latency in microseconds + jitter = 30000, # Jitter in uS + distribution = "normal", # Distribution of delay: uniform, normal, pareto, paretonormal + loss = 10 # Packet loss in percent + duplicate = 10, # Duplication in percent + corrupt = 10 # Corruption in percent + } } }, ethernet_node = { @@ -95,15 +98,23 @@ nodes = { ### The following settings are specific to the socket node-type!! ### layer = "eth", - local = "12:34:56:78:90:AB%eth0:12002", - remote = "12:34:56:78:90:AB%eth0:12002" + in = { + address = "12:34:56:78:90:AB%eth0:12002" + }, + out = { + address = "12:34:56:78:90:AB%eth0:12002" + } }, unix_domain_node = { type = "socket", layer = "unix", # Datagram UNIX domain sockets require two endpoints - local = "/var/run/villas-node/node.sock", - remote = "/var/run/villas-node/client.sock" + in = { + address = "/var/run/villas-node/node.sock" + }, + out = { + address = "/var/run/villas-node/client.sock" + } }, opal_node = { # The server can be started as an Asynchronous process type = "opal", # from within an OPAL-RT model. @@ -120,14 +131,22 @@ nodes = { ### The following settings are specific to the file node-type!! ### uri = "logs/input.log", # These options specify the path prefix where the the files are stored - - epoch_mode = "direct" # One of: direct (default), wait, relative, absolute - epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0). + + in = { + epoch_mode = "direct" # One of: direct (default), wait, relative, absolute + epoch = 10 # The interpretation of this value depends on epoch_mode (default is 0). # Consult the documentation of a full explanation - rate = 2.0 # A constant rate at which the lines of the input files should be read + rate = 2.0 # A constant rate at which the lines of the input files should be read # A missing or zero value will use the timestamp in the first column # of the file to determine the pause between consecutive lines. + + buffer_size = 1000000 + }, + out = { + flush = true + buffer_size = 1000000 + } }, ngsi_node = { type = "ngsi", @@ -158,36 +177,47 @@ nodes = { nanomsg_node = { type = "nanomsg", - publish = [ - "tcp://*:12000", # TCP socket - "ipc:///tmp/test.ipc", # Interprocess communication - "inproc://test" # Inprocess communication - ], - subscribe = [ - "tcp://127.0.0.1:12000", - "ipc:///tmp/test.ipc", - "inproc://test" - ] + out = { + endpoints = [ + "tcp://*:12000", # TCP socket + "ipc:///tmp/test.ipc", # Interprocess communication + "inproc://test" # Inprocess communication + ], + } + in = { + endpoints = [ + "tcp://127.0.0.1:12000", + "ipc:///tmp/test.ipc", + "inproc://test" + ] + } }, zeromq_node = { type = "zeromq", pattern = "pubsub", # The ZeroMQ pattern. One of pubsub, radiodish ipv6 = false, # Enable IPv6 support - filter = "ab184", # A filter which is prefix matched + curve = { # Z85 encoded Curve25519 keys enabled = true, public_key = "Veg+Q.V-c&1k>yVh663gQ^7fL($y47gybE-nZP1L", secret_key = "HPY.+mFuB[jGs@(zZr6$IZ1H1dZ7Ji*j>oi@O?Pc" } - subscribe = "tcp://*:1234" # The subscribe endpoint. + in = { + subscribe = "tcp://*:1234" # The subscribe endpoint. # See http://api.zeromq.org/2-1:zmq-bind for details. + filter = "ab184", # A filter which is prefix matched for each received msg + } - publish = [ # The publish endpoints. - "tcp://localhost:1235", # See http://api.zeromq.org/2-1:zmq-connect for details. - "tcp://localhost:12444" - ], + out = { + publish = [ # The publish endpoints. + "tcp://localhost:1235", # See http://api.zeromq.org/2-1:zmq-connect for details. + "tcp://localhost:12444" + ], + + filter = "ab184", # A prefix which is prepended to each send message. + } }, signal_node = { type = "signal", @@ -209,9 +239,13 @@ nodes = { shmem_node = { type = "shmem", - in_name = "sn1_in", # Name of shared memory segment for receiving side - out_name = "sn1_in", # Name of shared memory segment for sending side - + in = { + name = "sn1_in" + }, # Name of shared memory segment for receiving side + out = { + name = "sn1_in" # Name of shared memory segment for sending side + }, + queuelen = 1024, # Length of the queues polling = true, # We can busy-wait or use pthread condition variables for synchronizations @@ -296,24 +330,24 @@ nodes = { interface = "lo", dst_address = "01:0c:cd:01:00:01", - publish = { - fields = [ - "float32", - "float64", - "int8", - "int32" + out = { + signals = [ + { iec_type = "float32" }, + { iec_type = "float64" }, + { iec_type = "int8" }, + { iec_type = "int32" } ], svid = "test1234", smpmod = "samples_per_second", confrev = 55 }, - subscribe = { - fields = [ - "float32", - "float64", - "int8", - "int32" + in = { + signals = [ + { iec_type = "float32" }, + { iec_type = "float64" }, + { iec_type = "int8" }, + { iec_type = "int32" } ] } }, @@ -323,7 +357,7 @@ nodes = { format = "protobuf", username = "guest", - password = "guest", + password = "guest", host = "localhost", port = 1883, @@ -331,9 +365,12 @@ nodes = { retain = false, qos = 0, - publish = "test-topic", - subscribe = "test-topic", - + out = { + publish = "test-topic" + }, + in = { + subscribe = "test-topic" + }, ssl = { enabled = false, insecure = true, @@ -384,7 +421,7 @@ paths = ( out = "file_node", # This path includes all available example hooks. builtin = false, # By default, all paths will have a few builtin hooks attached to them. - # When collecting statistics or measurements these are undesired. + # When collecting statistics or measurements these are undesired. # A complete list of supported hooks @@ -405,13 +442,6 @@ paths = ( ratio = 2 # Only forward every 2nd message }, - { - type = "convert" - - mask = 0x1 # only convert the first value - mode = "fixed" # Convert all values to fixed precission. Use 'float' to convert to floating point. - scale = 1.0 - }, { type = "skip_first" diff --git a/etc/influxdb.conf b/etc/influxdb.conf index ff1a798a5..b44287062 100644 --- a/etc/influxdb.conf +++ b/etc/influxdb.conf @@ -7,7 +7,7 @@ logging = { nodes = { influxdb_node = { type = "influxdb", - samplelen = 3, + signals = 3, server = "relaxed_colden:8089", key = "villas", diff --git a/etc/loopback.json b/etc/loopback.json index 1afe3b9fc..4730adf03 100644 --- a/etc/loopback.json +++ b/etc/loopback.json @@ -1,24 +1,7 @@ { - "stats" : 1, - "nodes" : { "node1" : { - "type" : "socket", - "local" : "127.0.0.1:12000", - "remote" : "127.0.0.1:12001", - - "hooks" : [ - { - "type" : "stats", - "verbose" : true - } - ] + "type" : "loopback" } - }, - "paths" : [ - { - "in" : "node1", - "out" : "node1" - } - ] + } } diff --git a/etc/shmem.conf b/etc/shmem.conf index b5aa3e4b4..f50751c7e 100644 --- a/etc/shmem.conf +++ b/etc/shmem.conf @@ -23,7 +23,7 @@ nodes = { type = "shmem", out_name = "/villas1-out", in_name = "/villas1-in", - samplelen = 4, + signals = 4, queuelen = 32, polling = false, vectorize = 1 diff --git a/include/villas/format_type.h b/include/villas/format_type.h index 9b78be40d..83ce9be55 100644 --- a/include/villas/format_type.h +++ b/include/villas/format_type.h @@ -33,10 +33,6 @@ extern "C" { struct sample; struct io; -enum format_type_flags { - FORMAT_TYPE_BINARY = (1 << 8) -}; - struct format_type { int (*init)(struct io *io); int (*destroy)(struct io *io); @@ -82,7 +78,7 @@ struct format_type { int (*scan)( struct io *io, struct sample *smps[], unsigned cnt); /** Print a header. */ - void (*header)(struct io *io); + void (*header)(struct io *io, const struct sample *smp); /** Print a footer. */ void (*footer)(struct io *io); @@ -94,7 +90,7 @@ struct format_type { */ /** @see format_type_sscan */ - int (*sscan)(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); + int (*sscan)(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); /** @see format_type_sprint */ int (*sprint)(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt); diff --git a/include/villas/formats/csv.h b/include/villas/formats/csv.h index f9f944131..16d2ae532 100644 --- a/include/villas/formats/csv.h +++ b/include/villas/formats/csv.h @@ -23,18 +23,20 @@ #pragma once -#include +#include #ifdef __cplusplus extern "C" { #endif + /* Forward declarations. */ +struct io; struct sample; -void csv_header(struct io *io); +void csv_header(struct io *io, const struct sample *smp); -int csv_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); int csv_sprint(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); +int csv_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); #ifdef __cplusplus } diff --git a/include/villas/formats/json-reserve.h b/include/villas/formats/json-reserve.h index f6e6c227b..20ff4a712 100644 --- a/include/villas/formats/json-reserve.h +++ b/include/villas/formats/json-reserve.h @@ -33,7 +33,7 @@ struct sample; struct io; int json_reserve_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt); -int json_reserve_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); +int json_reserve_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); int json_reserve_print(struct io *io, struct sample *smps[], unsigned cnt); int json_reserve_scan(struct io *io, struct sample *smps[], unsigned cnt); diff --git a/include/villas/formats/json.h b/include/villas/formats/json.h index ea73e0ff6..6706bd7fa 100644 --- a/include/villas/formats/json.h +++ b/include/villas/formats/json.h @@ -32,7 +32,7 @@ extern "C" { struct sample; int json_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt); -int json_sscan(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt); +int json_sscan(struct io *io, const char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt); int json_print(struct io *io, struct sample *smps[], unsigned cnt); int json_scan(struct io *io, struct sample *smps[], unsigned cnt); diff --git a/include/villas/formats/msg.h b/include/villas/formats/msg.h index c69e21bbc..7b3b91b9e 100644 --- a/include/villas/formats/msg.h +++ b/include/villas/formats/msg.h @@ -29,6 +29,7 @@ extern "C" { /* Forward declaration */ struct msg; struct sample; +struct list; /** Convert msg from network to host byteorder */ void msg_ntoh(struct msg *m); @@ -53,10 +54,10 @@ void msg_hdr_ntoh(struct msg *m); int msg_verify(struct msg *m); /** Copy fields from \p msg into \p smp. */ -int msg_to_sample(struct msg *msg, struct sample *smp); +int msg_to_sample(struct msg *msg, struct sample *smp, struct list *signals); /** Copy fields form \p smp into \p msg. */ -int msg_from_sample(struct msg *msg, struct sample *smp); +int msg_from_sample(struct msg *msg, struct sample *smp, struct list *signals); #ifdef __cplusplus } diff --git a/include/villas/formats/protobuf.h b/include/villas/formats/protobuf.h index d6dda00dc..79468c0b8 100644 --- a/include/villas/formats/protobuf.h +++ b/include/villas/formats/protobuf.h @@ -36,7 +36,7 @@ struct sample; int protobuf_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt); /** Read struct sample's from buffer \p buf into samples \p smps. */ -int protobuf_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); +int protobuf_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); #ifdef __cplusplus } diff --git a/include/villas/formats/raw.h b/include/villas/formats/raw.h index 3ee518b2f..024a21d31 100644 --- a/include/villas/formats/raw.h +++ b/include/villas/formats/raw.h @@ -29,39 +29,33 @@ extern "C" { #endif +/* float128 is currently not yet supported as htole128() functions a missing */ +#if 0 && defined(__GNUC__) && defined(__linux__) + #define HAS_128BIT +#endif + /* Forward declarations */ struct sample; enum raw_flags { - RAW_FAKE = (1 << 16), /**< Treat the first three values as: sequenceno, seconds, nanoseconds */ + /** Treat the first three values as: sequenceno, seconds, nanoseconds */ + RAW_FAKE_HEADER = (1 << 16) | SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_SEQUENCE, + RAW_BIG_ENDIAN = (1 << 7), /**< Encode data in big-endian byte order */ - RAW_BE_INT = (1 << 17), /**< Byte-order for integer data: big-endian if set. */ - RAW_BE_FLT = (1 << 18), /**< Byte-order for floating point data: big-endian if set. */ - RAW_BE_HDR = (1 << 19), /**< Byte-order for fake header fields: big-endian if set. */ - - /** Byte-order for all fields: big-endian if set. */ - RAW_BE = RAW_BE_INT | RAW_BE_FLT | RAW_BE_HDR, - - /** Mix floating and integer types. - * - * io_raw_sscan() parses all values as single / double precission fp. - * io_raw_sprint() uses sample::format to determine the type. - */ - RAW_AUTO = (1 << 22), - RAW_FLT = (1 << 23), /**< Data-type: floating point otherwise integer. */ - - //RAW_1 = (0 << 24), /**< Pack each value as a single bit. */ - RAW_8 = (3 << 24), /**< Pack each value as a byte. */ - RAW_16 = (4 << 24), /**< Pack each value as a word. */ - RAW_32 = (5 << 24), /**< Pack each value as a double word. */ - RAW_64 = (6 << 24) /**< Pack each value as a quad word. */ + RAW_BITS_8 = (3 << 24), /**< Pack each value as a byte. */ + RAW_BITS_16 = (4 << 24), /**< Pack each value as a word. */ + RAW_BITS_32 = (5 << 24), /**< Pack each value as a double word. */ + RAW_BITS_64 = (6 << 24), /**< Pack each value as a quad word. */ +#ifdef HAS_128BIT + RAW_128 = (7 << 24) /**< Pack each value as a double quad word. */ +#endif }; /** Copy / read struct msg's from buffer \p buf to / fram samples \p smps. */ int raw_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt); /** Read struct sample's from buffer \p buf into samples \p smps. */ -int raw_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); +int raw_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); #ifdef __cplusplus } diff --git a/include/villas/formats/villas_binary.h b/include/villas/formats/villas_binary.h index 8fd4428c3..c01f61f95 100644 --- a/include/villas/formats/villas_binary.h +++ b/include/villas/formats/villas_binary.h @@ -42,7 +42,7 @@ enum villas_binary_flags { int villas_binary_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt); /** Read struct sample's from buffer \p buf into samples \p smps. */ -int villas_binary_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); +int villas_binary_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); #ifdef __cplusplus } diff --git a/include/villas/formats/villas_human.h b/include/villas/formats/villas_human.h index c372b14ef..2865b6f71 100644 --- a/include/villas/formats/villas_human.h +++ b/include/villas/formats/villas_human.h @@ -33,11 +33,11 @@ extern "C" { struct io; struct sample; -void villas_human_header(struct io *io); +void villas_human_header(struct io *io, const struct sample *smp); int villas_human_print(struct io *io, struct sample *smps[], unsigned cnt); int villas_human_scan(struct io *io, struct sample *smps[], unsigned cnt); #ifdef __cplusplus } -#endif \ No newline at end of file +#endif diff --git a/include/villas/io.h b/include/villas/io.h index 9aca8fa92..98271cb39 100644 --- a/include/villas/io.h +++ b/include/villas/io.h @@ -36,9 +36,13 @@ struct sample; struct format_type; enum io_flags { + /* Bits 0-7 are reserved for for flags defined by enum sample_flags */ IO_FLUSH = (1 << 8), /**< Flush the output stream after each chunk of samples. */ IO_NONBLOCK = (1 << 9), /**< Dont block io_read() while waiting for new samples. */ - IO_NEWLINES = (1 << 10) /**< The samples of this format are newline delimited. */ + IO_NEWLINES = (1 << 10), /**< The samples of this format are newline delimited. */ + IO_DESTROY_SIGNALS = (1 << 11), /**< Signal descriptors are managed by this IO instance. Destroy them in io_destoy() */ + IO_HAS_BINARY_PAYLOAD = (1 << 12), /**< This IO instance en/decodes binary payloads. */ + IO_AUTO_DETECT_FORMAT = (1 << 13) /**< This IO instance supports format auto-detection during decoding. */ }; struct io { @@ -59,11 +63,11 @@ struct io { char *buffer; size_t buflen; - - struct list *signals; - struct node *node; } in, out; + struct list *signals; /**< Signal meta data for parsed samples by io_scan() */ + bool header_printed; + enum { IO_MODE_STDIO, IO_MODE_ADVIO, @@ -71,18 +75,22 @@ struct io { } mode; void *_vd; - struct format_type *_vt; + const struct format_type *_vt; }; -int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags); +int io_init(struct io *io, const struct format_type *fmt, struct list *signals, int flags); + +int io_init_auto(struct io *io, const struct format_type *fmt, int len, int flags); int io_destroy(struct io *io); +int io_check(struct io *io); + int io_open(struct io *io, const char *uri); int io_close(struct io *io); -void io_header(struct io *io); +void io_header(struct io *io, const struct sample *smp); void io_footer(struct io *io); @@ -98,6 +106,8 @@ int io_flush(struct io *io); int io_fd(struct io *io); +const struct format_type * io_type(struct io *io); + int io_stream_open(struct io *io, const char *uri); int io_stream_close(struct io *io); @@ -124,7 +134,7 @@ FILE * io_stream_output(struct io *io); * @retval >=0 The number of samples which have been parsed from \p buf and written into \p smps. * @retval <0 Something went wrong. */ -int io_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); +int io_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt); /** Print \p cnt samples from \p smps into buffer \p buf of length \p len. * diff --git a/include/villas/node.h b/include/villas/node.h index 9d741d92c..73ffe1929 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -48,7 +48,6 @@ struct node_direction { int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */ struct list hooks; /**< List of write hooks (struct hook). */ - struct list signals; /**< List of signal meta data such as signal names */ json_t *cfg; /**< A JSON object containing the configuration of the node. */ }; @@ -65,14 +64,15 @@ struct node char *_name_long; /**< Singleton: A string used to print to screen. */ int affinity; /**< CPU Affinity of this node */ - int samplelen; /**< The maximum number of values this node can receive. */ - unsigned sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */ + uint64_t sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */ struct stats *stats; /**< Statistic counters. This is a pointer to the statistic hooks private data. */ struct node_direction in, out; + struct list signals; /**< Signal meta data for data which is __received__ by node_read(). */ + enum state state; struct node_type *_vt; /**< Virtual functions (C++ OOP style) */ diff --git a/include/villas/node_type.h b/include/villas/node_type.h index c1449e63d..bbb40835f 100644 --- a/include/villas/node_type.h +++ b/include/villas/node_type.h @@ -41,9 +41,14 @@ struct node; struct super_node; struct sample; +enum node_type_flags { + NODE_TYPE_PROVIDES_SIGNALS = (1 << 0) +}; + /** C++ like vtable construct for node_types */ struct node_type { int vectorize; /**< Maximal vector length supported by this node type. Zero is unlimited. */ + int flags; enum state state; /**< State of this node-type. */ diff --git a/include/villas/path.h b/include/villas/path.h index 74d26e88a..edb3f6750 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -99,7 +99,6 @@ struct path { int reverse; /**< This path as a matching reverse path. */ int builtin; /**< This path should use built-in hooks by default. */ int queuelen; /**< The queue length for each path_destination::queue */ - int samplelen; /**< Will be calculated based on path::sources.mappings */ char *_name; /**< Singleton: A string which is used to print this path to screen. */ diff --git a/include/villas/sample.h b/include/villas/sample.h index 0770a7d03..1aaa5c233 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -49,12 +49,11 @@ struct pool; /** Parts of a sample that can be serialized / de-serialized by the IO formats */ enum sample_flags { - SAMPLE_HAS_ORIGIN = (1 << 0), /**< Include origin timestamp in output. */ - SAMPLE_HAS_RECEIVED = (1 << 1), /**< Include receive timestamp in output. */ + SAMPLE_HAS_TS_ORIGIN = (1 << 0), /**< Include origin timestamp in output. */ + SAMPLE_HAS_TS_RECEIVED = (1 << 1), /**< Include receive timestamp in output. */ SAMPLE_HAS_OFFSET = (1 << 2), /**< Include offset (received - origin timestamp) in output. */ - SAMPLE_HAS_ID = (1 << 3), /**< This sample has a valid sample::id field. */ - SAMPLE_HAS_SEQUENCE = (1 << 4), /**< Include sequence number in output. */ - SAMPLE_HAS_VALUES = (1 << 5), /**< Include values in output. */ + SAMPLE_HAS_SEQUENCE = (1 << 3), /**< Include sequence number in output. */ + SAMPLE_HAS_DATA = (1 << 4), /**< Include values in output. */ SAMPLE_HAS_ALL = (1 << 5) - 1, /**< Enable all output options. */ SAMPLE_IS_FIRST = (1 << 16), /**< This sample is the first of a new simulation case */ @@ -129,7 +128,7 @@ int sample_copy_many(struct sample *dsts[], struct sample *srcs[], int cnt); int sample_incref_many(struct sample *smps[], int cnt); int sample_decref_many(struct sample *smps[], int cnt); -enum signal_format sample_format(const struct sample *s, unsigned idx); +enum signal_type sample_format(const struct sample *s, unsigned idx); #ifdef __cplusplus } diff --git a/include/villas/signal.h b/include/villas/signal.h index 312a6a12b..46b36ae96 100644 --- a/include/villas/signal.h +++ b/include/villas/signal.h @@ -53,13 +53,13 @@ union signal_data { float _Complex z; /**< Complex values. */ }; -enum signal_format { - SIGNAL_FORMAT_INVALID = 0, /**< Signal format is invalid. */ - SIGNAL_FORMAT_AUTO = 1, /**< Signal format is unknown. Try autodetection. */ - SIGNAL_FORMAT_FLOAT = 2, /**< See signal_data::f */ - SIGNAL_FORMAT_INT = 3, /**< See signal_data::i */ - SIGNAL_FORMAT_BOOL = 4, /**< See signal_data::b */ - SIGNAL_FORMAT_COMPLEX = 5 /**< See signal_data::z */ +enum signal_type { + SIGNAL_TYPE_INVALID = 0, /**< Signal type is invalid. */ + SIGNAL_TYPE_AUTO = 1, /**< Signal type is unknown. Attempt autodetection. */ + SIGNAL_TYPE_FLOAT = 2, /**< See signal_data::f */ + SIGNAL_TYPE_INTEGER = 3, /**< See signal_data::i */ + SIGNAL_TYPE_BOOLEAN = 4, /**< See signal_data::b */ + SIGNAL_TYPE_COMPLEX = 5 /**< See signal_data::z */ }; /** Signal descriptor. @@ -76,7 +76,7 @@ struct signal { atomic_int refcnt; /**< Reference counter. */ - enum signal_format format; + enum signal_type type; }; /** Initialize a signal with default values. */ @@ -86,7 +86,7 @@ int signal_init(struct signal *s); int signal_destroy(struct signal *s); /** Allocate memory for a new signal, and initialize it with provided values. */ -struct signal * signal_create(const char *name, const char *unit, enum signal_format fmt); +struct signal * signal_create(const char *name, const char *unit, enum signal_type fmt); /** Destroy and release memory of signal. */ int signal_free(struct signal *s); @@ -97,6 +97,9 @@ int signal_incref(struct signal *s); /** Decrease reference counter. */ int signal_decref(struct signal *s); +/** Copy a signal. */ +struct signal * signal_copy(struct signal *s); + /** Parse signal description. */ int signal_parse(struct signal *s, json_t *cfg); @@ -105,18 +108,18 @@ int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, u int signal_list_parse(struct list *list, json_t *cfg); -int signal_list_generate(struct list *list, unsigned len, enum signal_format fmt); +int signal_list_generate(struct list *list, unsigned len, enum signal_type fmt); void signal_list_dump(const struct list *list); -enum signal_format signal_format_from_str(const char *str); +enum signal_type signal_type_from_str(const char *str); -const char * signal_format_to_str(enum signal_format fmt); +const char * signal_type_to_str(enum signal_type fmt); -enum signal_format signal_format_detect(const char *val); +enum signal_type signal_type_detect(const char *val); /** Convert signal data from one description/format to another. */ -void signal_data_convert(union signal_data *data, const struct signal *from, const struct signal *to); +void signal_data_cast(union signal_data *data, const struct signal *from, const struct signal *to); /** Print value of a signal to a character buffer. */ int signal_data_snprint(const union signal_data *data, const struct signal *sig, char *buf, size_t len); diff --git a/lib/hook.c b/lib/hook.c index 133f5bbe3..d2fd1f2ad 100644 --- a/lib/hook.c +++ b/lib/hook.c @@ -40,14 +40,6 @@ int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node h->enabled = 1; h->priority = vt->priority; - - /* Node hooks can only used with nodes, - Path hooks only with paths.. */ - if ((!(vt->flags & HOOK_NODE_READ) && n) || - (!(vt->flags & HOOK_NODE_WRITE) && n) || - (!(vt->flags & HOOK_PATH) && p)) - return -1; - h->path = p; h->node = n; @@ -244,12 +236,7 @@ int hook_init_builtin_list(struct list *l, bool builtin, int mask, struct path * { int ret; - ret = list_init(l); - if (ret) - return ret; - - if (!builtin) - return 0; + assert(l->state == STATE_INITIALIZED); for (size_t i = 0; i < list_length(&plugins); i++) { struct plugin *q = (struct plugin *) list_at(&plugins, i); @@ -260,18 +247,21 @@ int hook_init_builtin_list(struct list *l, bool builtin, int mask, struct path * if (q->type != PLUGIN_TYPE_HOOK) continue; - if (vt->flags & mask) - continue; + if (builtin && + vt->flags & HOOK_BUILTIN && + vt->flags & mask) + { - h = (struct hook *) alloc(sizeof(struct hook)); - if (!h) - return -1; + h = (struct hook *) alloc(sizeof(struct hook)); + if (!h) + return -1; - ret = hook_init(h, vt, p, n); - if (ret) - return ret; + ret = hook_init(h, vt, p, n); + if (ret) + return ret; - list_push(l, h); + list_push(l, h); + } } return 0; diff --git a/lib/hooks/CMakeLists.txt b/lib/hooks/CMakeLists.txt index 86cf1ddb3..9055d287c 100644 --- a/lib/hooks/CMakeLists.txt +++ b/lib/hooks/CMakeLists.txt @@ -33,6 +33,7 @@ set(HOOK_SRC limit_rate.c scale.c fix.c + cast.c ) if(WITH_IO) diff --git a/lib/hooks/cast.c b/lib/hooks/cast.c new file mode 100644 index 000000000..026c3faa3 --- /dev/null +++ b/lib/hooks/cast.c @@ -0,0 +1,201 @@ + +/** Cast hook. + * + * @author Steffen Vogel + * @copyright 2018, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** @addtogroup hooks Hook functions + * @{ + */ + +#include + +#include +#include +#include +#include +#include + +struct cast { + struct list operations; + + struct list signals; +}; + +static int cast_init(struct hook *h) +{ + int ret; + struct cast *c = (struct cast *) h->_vd; + struct list *orig_signals; + + if (h->node) + orig_signals = &h->node->signals; + else if (h->path) + orig_signals = &h->path->signals; + else + return -1; + + ret = list_init(&c->signals); + if (ret) + return ret; + + /* Copy original signal list */ + for (int i = 0; i < list_length(orig_signals); i++) { + struct signal *orig_sig = list_at(orig_signals, i); + struct signal *new_sig = signal_copy(orig_sig); + + list_push(&c->signals, new_sig); + } + + return 0; +} + +static int cast_destroy(struct hook *h) +{ + int ret; + struct cast *c = (struct cast *) h->_vd; + + ret = list_destroy(&c->signals, (dtor_cb_t) signal_decref, false); + if (ret) + return ret; + + return 0; +} + +static int cast_parse(struct hook *h, json_t *cfg) +{ + int ret; + struct cast *c = (struct cast *) h->_vd; + struct signal *sig; + + size_t i; + json_t *json_signals; + json_t *json_signal; + + ret = json_unpack(cfg, "{ s: o }", + "signals", &json_signals + ); + if (ret) + return ret; + + if (json_is_array(json_signals)) + return -1; + + json_array_foreach(json_signals, i, json_signal) { + int index = -1; + const char *name = NULL; + + const char *new_name = NULL; + const char *new_unit = NULL; + const char *new_format = NULL; + + ret = json_unpack(json_signal, "{ s?: s, s?: i, s?: s, s?: s, s?: s }", + "name", &name, + "index", &index, + "new_format", &new_format, + "new_name", &new_name, + "new_unit", &new_unit + ); + if (ret) + return ret; + + /* Find matching original signal descriptor */ + if (index >= 0 && name != NULL) + return -1; + + if (index < 0 && name == NULL) + return -1; + + sig = name + ? list_lookup(&c->signals, name) + : list_at_safe(&c->signals, index); + if (!sig) + return -1; + + /* Cast to new format */ + if (new_format) { + enum signal_type fmt; + + fmt = signal_type_from_str(new_format); + if (fmt == SIGNAL_TYPE_INVALID) + return -1; + + sig->type = fmt; + } + + /* Set new name */ + if (new_name) { + if (sig->name) + free(sig->name); + + sig->name = strdup(new_name); + } + + /* Set new unit */ + if (new_unit) { + if (sig->unit) + free(sig->unit); + + sig->unit = strdup(new_unit); + } + } + + return 0; +} + +static int cast_process(struct hook *h, struct sample *smps[], unsigned *cnt) +{ + struct cast *c = (struct cast *) h->_vd; + + for (int i = 0; i < *cnt; i++) { + struct sample *smp = smps[i]; + + for (int j = 0; j < smp->length; j++) { + struct signal *orig_sig = list_at(smp->signals, j); + struct signal *new_sig = list_at(&c->signals, j); + + signal_data_cast(&smp->data[j], orig_sig, new_sig); + } + + /* Replace signal descriptors of sample */ + smp->signals = &c->signals; + } + + return 0; +} + +static struct plugin p = { + .name = "cast", + .description = "Cast signals", + .type = PLUGIN_TYPE_HOOK, + .hook = { + .flags = HOOK_NODE_READ | HOOK_PATH, + .priority = 99, + .init = cast_init, + .destroy = cast_destroy, + .parse = cast_parse, + .process = cast_process, + .size = sizeof(struct cast) + } +}; + +REGISTER_PLUGIN(&p) + +/** @} */ diff --git a/lib/io.c b/lib/io.c index 74a1f9661..4ab978b6c 100644 --- a/lib/io.c +++ b/lib/io.c @@ -79,7 +79,7 @@ skip: bytes = getdelim(&io->in.buffer, &io->in.buflen, io->delimiter, f); return i; } -int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags) +int io_init(struct io *io, const struct format_type *fmt, struct list *signals, int flags) { int ret; @@ -88,9 +88,9 @@ int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags) io->_vt = fmt; io->_vd = alloc(fmt->size); - io->flags = flags | io->_vt->flags; - io->delimiter = io->_vt->delimiter ? io->_vt->delimiter : '\n'; - io->separator = io->_vt->separator ? io->_vt->separator : '\t'; + io->flags = flags | (io_type(io)->flags & ~SAMPLE_HAS_ALL); + io->delimiter = io_type(io)->delimiter ? io_type(io)->delimiter : '\n'; + io->separator = io_type(io)->separator ? io_type(io)->separator : '\t'; io->in.buflen = io->out.buflen = 4096; @@ -98,19 +98,9 @@ int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags) io->in.buffer = alloc(io->in.buflen); io->out.buffer = alloc(io->out.buflen); - io->input.node = n; - io->output.node = n; + io->signals = signals; - if (n) { - io->input.signals = &n->in.signals; - io->output.signals = &n->out.signals; - } - else { - io->input.signals = NULL; - io->output.signals = NULL; - } - - ret = io->_vt->init ? io->_vt->init(io) : 0; + ret = io_type(io)->init ? io_type(io)->init(io) : 0; if (ret) return ret; @@ -119,13 +109,35 @@ int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags) return 0; } +int io_init_auto(struct io *io, const struct format_type *fmt, int len, int flags) +{ + int ret; + struct list *signals; + + signals = alloc(sizeof(struct list)); + + signals->state = STATE_DESTROYED; + + ret = list_init(signals); + if (ret) + return ret; + + ret = signal_list_generate(signals, len, SIGNAL_TYPE_AUTO); + if (ret) + return ret; + + flags |= IO_DESTROY_SIGNALS; + + return io_init(io, fmt, signals, flags); +} + int io_destroy(struct io *io) { int ret; - assert(io->state == STATE_CLOSED || io->state == STATE_INITIALIZED); + assert(io->state == STATE_CLOSED || io->state == STATE_INITIALIZED || io->state == STATE_CHECKED); - ret = io->_vt->destroy ? io->_vt->destroy(io) : 0; + ret = io_type(io)->destroy ? io_type(io)->destroy(io) : 0; if (ret) return ret; @@ -133,11 +145,37 @@ int io_destroy(struct io *io) free(io->in.buffer); free(io->out.buffer); + if (io->flags & IO_DESTROY_SIGNALS) { + ret = list_destroy(io->signals, (dtor_cb_t) signal_decref, false); + if (ret) + return ret; + } + io->state = STATE_DESTROYED; return 0; } +int io_check(struct io *io) +{ + assert(io->state != STATE_DESTROYED); + + for (size_t i = 0; i < list_length(io->signals); i++) { + struct signal *sig = (struct signal *) list_at(io->signals, i); + + if (sig->type == SIGNAL_TYPE_AUTO) { + if (io_type(io)->flags & IO_AUTO_DETECT_FORMAT) + continue; + + return -1; + } + } + + io->state = STATE_CHECKED; + + return 0; +} + int io_stream_open(struct io *io, const char *uri) { int ret; @@ -306,18 +344,17 @@ int io_open(struct io *io, const char *uri) { int ret; - assert(io->state == STATE_INITIALIZED); + assert(io->state == STATE_CHECKED); - ret = io->_vt->open - ? io->_vt->open(io, uri) + ret = io_type(io)->open + ? io_type(io)->open(io, uri) : io_stream_open(io, uri); if (ret) return ret; + io->header_printed = false; io->state = STATE_OPENED; - io_header(io); - return 0; } @@ -329,8 +366,8 @@ int io_close(struct io *io) io_footer(io); - ret = io->_vt->close - ? io->_vt->close(io) + ret = io_type(io)->close + ? io_type(io)->close(io) : io_stream_close(io); if (ret) return ret; @@ -344,8 +381,8 @@ int io_flush(struct io *io) { assert(io->state == STATE_OPENED); - return io->_vt->flush - ? io->_vt->flush(io) + return io_type(io)->flush + ? io_type(io)->flush(io) : io_stream_flush(io); } @@ -353,8 +390,8 @@ int io_eof(struct io *io) { assert(io->state == STATE_OPENED); - return io->_vt->eof - ? io->_vt->eof(io) + return io_type(io)->eof + ? io_type(io)->eof(io) : io_stream_eof(io); } @@ -362,8 +399,8 @@ void io_rewind(struct io *io) { assert(io->state == STATE_OPENED); - if (io->_vt->rewind) - io->_vt->rewind(io); + if (io_type(io)->rewind) + io_type(io)->rewind(io); else io_stream_rewind(io); } @@ -372,25 +409,32 @@ int io_fd(struct io *io) { assert(io->state == STATE_OPENED); - return io->_vt->fd - ? io->_vt->fd(io) + return io_type(io)->fd + ? io_type(io)->fd(io) : io_stream_fd(io); } -void io_header(struct io *io) +const struct format_type * io_type(struct io *io) +{ + return io->_vt; +} + +void io_header(struct io *io, const struct sample *smp) { assert(io->state == STATE_OPENED); - if (io->_vt->header) - io->_vt->header(io); + if (io_type(io)->header) + io_type(io)->header(io, smp); + + io->header_printed = true; } void io_footer(struct io *io) { assert(io->state == STATE_OPENED); - if (io->_vt->footer) - io->_vt->footer(io); + if (io_type(io)->footer) + io_type(io)->footer(io); } int io_print(struct io *io, struct sample *smps[], unsigned cnt) @@ -399,11 +443,14 @@ int io_print(struct io *io, struct sample *smps[], unsigned cnt) assert(io->state == STATE_OPENED); + if (!io->header_printed && cnt > 0) + io_header(io, smps[0]); + if (io->flags & IO_NEWLINES) ret = io_print_lines(io, smps, cnt); - else if (io->_vt->print) - ret = io->_vt->print(io, smps, cnt); - else if (io->_vt->sprint) { + else if (io_type(io)->print) + ret = io_type(io)->print(io, smps, cnt); + else if (io_type(io)->sprint) { FILE *f = io_stream_output(io); size_t wbytes; @@ -428,9 +475,9 @@ int io_scan(struct io *io, struct sample *smps[], unsigned cnt) if (io->flags & IO_NEWLINES) ret = io_scan_lines(io, smps, cnt); - else if (io->_vt->scan) - ret = io->_vt->scan(io, smps, cnt); - else if (io->_vt->sscan) { + else if (io_type(io)->scan) + ret = io_type(io)->scan(io, smps, cnt); + else if (io_type(io)->sscan) { FILE *f = io_stream_input(io); size_t bytes, rbytes; @@ -462,16 +509,16 @@ FILE * io_stream_input(struct io *io) { : io->in.stream.std; } -int io_sscan(struct io *io, char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) +int io_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, struct sample *smps[], unsigned cnt) { - struct format_type *fmt = io->_vt; + assert(io->state == STATE_CHECKED || io->state == STATE_OPENED); - return fmt->sscan ? fmt->sscan(io, buf, len, rbytes, smps, cnt) : -1; + return io_type(io)->sscan ? io_type(io)->sscan(io, buf, len, rbytes, smps, cnt) : -1; } int io_sprint(struct io *io, char *buf, size_t len, size_t *wbytes, struct sample *smps[], unsigned cnt) { - struct format_type *fmt = io->_vt; + assert(io->state == STATE_CHECKED || io->state == STATE_OPENED); - return fmt->sprint ? fmt->sprint(io, buf, len, wbytes, smps, cnt) : -1; + return io_type(io)->sprint ? io_type(io)->sprint(io, buf, len, wbytes, smps, cnt) : -1; } diff --git a/lib/mapping.c b/lib/mapping.c index f4b219342..69a1c273c 100644 --- a/lib/mapping.c +++ b/lib/mapping.c @@ -41,12 +41,16 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct list *no if (nodes) { node = strtok(cpy, "."); - if (!node) + if (!node) { + warn("Missing node name"); goto invalid_format; + } me->node = list_lookup(nodes, node); - if (!me->node) + if (!me->node) { + warn("Unknown node %s", node); goto invalid_format; + } type = strtok(NULL, ".["); if (!type) @@ -65,16 +69,22 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct list *no me->length = 1; field = strtok(NULL, "."); - if (!field) + if (!field) { + warn("Missing stats type"); goto invalid_format; + } subfield = strtok(NULL, "."); - if (!subfield) + if (!subfield) { + warn("Missing stats sub-type"); goto invalid_format; + } id = stats_lookup_id(field); - if (id < 0) + if (id < 0) { + warn("Invalid stats type"); goto invalid_format; + } me->stats.id = id; @@ -92,38 +102,48 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct list *no me->stats.type = MAPPING_STATS_TYPE_VAR; else if (!strcmp(subfield, "stddev")) me->stats.type = MAPPING_STATS_TYPE_STDDEV; - else + else { + warn("Invalid stats sub-type"); goto invalid_format; + } } else if (!strcmp(type, "hdr")) { me->type = MAPPING_TYPE_HEADER; me->length = 1; field = strtok(NULL, "."); - if (!field) + if (!field) { + warn("Missing header type"); goto invalid_format; + } if (!strcmp(field, "sequence")) me->header.type = MAPPING_HEADER_TYPE_SEQUENCE; else if (!strcmp(field, "length")) me->header.type = MAPPING_HEADER_TYPE_LENGTH; - else + else { + warn("Invalid header type"); goto invalid_format; + } } else if (!strcmp(type, "ts")) { me->type = MAPPING_TYPE_TIMESTAMP; me->length = 2; field = strtok(NULL, "."); - if (!field) + if (!field) { + warn("Missing timestamp type"); goto invalid_format; + } if (!strcmp(field, "origin")) me->timestamp.type = MAPPING_TIMESTAMP_TYPE_ORIGIN; else if (!strcmp(field, "received")) me->timestamp.type = MAPPING_TIMESTAMP_TYPE_RECEIVED; - else + else { + warn("Invalid timestamp type"); goto invalid_format; + } } else if (!strcmp(type, "data")) { char *first_str, *last_str; @@ -134,31 +154,36 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct list *no first_str = strtok(NULL, "-]"); if (first_str) { if (me->node) - first = list_lookup_index(&me->node->in.signals, first_str); + first = list_lookup_index(&me->node->signals, first_str); if (first < 0) { char *endptr; first = strtoul(first_str, &endptr, 10); - if (endptr != first_str + strlen(first_str)) + if (endptr != first_str + strlen(first_str)) { + warn("Failed to parse data range"); goto invalid_format; + } } } else { + /* Map all signals */ me->data.offset = 0; - me->length = 0; + me->length = me->node ? list_length(&me->node->signals) : 0; goto end; } last_str = strtok(NULL, "]"); if (last_str) { if (me->node) - last = list_lookup_index(&me->node->in.signals, last_str); + last = list_lookup_index(&me->node->signals, last_str); if (last < 0) { char *endptr; last = strtoul(last_str, &endptr, 10); - if (endptr != last_str + strlen(last_str)) + if (endptr != last_str + strlen(last_str)) { + warn("Failed to parse data range"); goto invalid_format; + } } } else @@ -189,11 +214,11 @@ invalid_format: return -1; } -int mapping_parse(struct mapping_entry *me, json_t *j, struct list *nodes) +int mapping_parse(struct mapping_entry *me, json_t *cfg, struct list *nodes) { const char *str; - str = json_string_value(j); + str = json_string_value(cfg); if (!str) return -1; @@ -250,17 +275,13 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons if (len + off > remapped->capacity) return -1; - if (len + off > remapped->length) - remapped->length = len + off; - switch (me->type) { case MAPPING_TYPE_STATS: { const struct hist *h = &s->histograms[me->stats.id]; switch (me->stats.type) { case MAPPING_STATS_TYPE_TOTAL: - sample_set_data_format(remapped, off, SAMPLE_DATA_FORMAT_INT); - remapped->data[off++].f = h->total; + remapped->data[off++].i = h->total; break; case MAPPING_STATS_TYPE_LAST: remapped->data[off++].f = h->last; @@ -299,18 +320,13 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons return -1; } - sample_set_data_format(remapped, off, SAMPLE_DATA_FORMAT_INT); - sample_set_data_format(remapped, off+1, SAMPLE_DATA_FORMAT_INT); - remapped->data[off++].i = ts->tv_sec; remapped->data[off++].i = ts->tv_nsec; break; } - sample_set_data_format(remapped, off, SAMPLE_DATA_FORMAT_INT); case MAPPING_TYPE_HEADER: - switch (me->header.type) { case MAPPING_HEADER_TYPE_LENGTH: remapped->data[off++].i = original->length; @@ -318,9 +334,6 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons case MAPPING_HEADER_TYPE_SEQUENCE: remapped->data[off++].i = original->sequence; break; - case MAPPING_HDR_FORMAT: - remapped->data[off++].i = original->format; - break; default: return -1; } @@ -329,14 +342,10 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons case MAPPING_TYPE_DATA: for (int j = me->data.offset; j < len + me->data.offset; j++) { - if (j >= original->length) { - sample_set_data_format(remapped, off, SAMPLE_DATA_FORMAT_FLOAT); + if (j >= original->length) remapped->data[off++].f = 0; - } - else { - sample_set_data_format(remapped, off, sample_get_data_format(original, j)); + else remapped->data[off++] = original->data[j]; - } } break; @@ -349,10 +358,6 @@ int mapping_remap(const struct list *m, struct sample *remapped, const struct sa { int ret; - /* We copy all the header fields */ - remapped->sequence = original->sequence; - remapped->ts = original->ts; - for (size_t i = 0; i < list_length(m); i++) { struct mapping_entry *me = (struct mapping_entry *) list_at(m, i); @@ -371,7 +376,7 @@ int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str) assert(me->length == 0 || index < me->length); if (me->node) - strcatf(str, "%s.", node_name(me->node)); + strcatf(str, "%s.", node_name_short(me->node)); switch (me->type) { case MAPPING_TYPE_STATS: diff --git a/lib/memory.c b/lib/memory.c index 6eb11eb52..72edeca8e 100644 --- a/lib/memory.c +++ b/lib/memory.c @@ -34,23 +34,27 @@ #include #include -struct hash_table allocations = { .state = STATE_DESTROYED }; +static struct hash_table allocations = { .state = STATE_DESTROYED }; + +__attribute__((constructor)) +static void init_allocations() +{ + hash_table_init(&allocations, 100); +} + +__attribute__((destructor)) +static void destroy_allocations() +{ + /** @todo: Release remaining allocations? */ + hash_table_destroy(&allocations, NULL, false); +} int memory_init(int hugepages) { - int ret; - - info("Initialize memory sub-system"); - - - if (allocations.state == STATE_DESTROYED) { - ret = hash_table_init(&allocations, 100); - if (ret) - return ret; - } + info("Initialize memory sub-system: #hugepages=%d", hugepages); #ifdef __linux__ - int pagecnt, pagesz; + int pagecnt, pagesz, ret; struct rlimit l; pagecnt = kernel_get_nr_hugepages(); @@ -109,17 +113,19 @@ void * memory_alloc(struct memory_type *m, size_t len) } void * memory_alloc_aligned(struct memory_type *m, size_t len, size_t alignment) -{ +{ int ret; struct memory_allocation *ma = m->alloc(m, len, alignment); - if(ma == NULL){ + if (ma == NULL) { warn("memory_alloc_aligned: allocating memory for memory_allocation failed for memory type %s. Reason: %s", m->name, strerror(errno) ); + return NULL; } ret = hash_table_insert(&allocations, ma->address, ma); - if(ret){ + if (ret) { warn("memory_alloc_aligned: Inserting into hash table failed!"); + return NULL; } debug(LOG_MEM | 5, "Allocated %#zx bytes of %#zx-byte-aligned %s memory: %p", ma->length, ma->alignment, ma->type->name, ma->address); diff --git a/lib/node.c b/lib/node.c index eaaa20193..a867e2dd0 100644 --- a/lib/node.c +++ b/lib/node.c @@ -46,6 +46,9 @@ static int node_direction_init2(struct node_direction *nd, struct node *n) ret = hook_init_builtin_list(&nd->hooks, nd->builtin, m, NULL, n); if (ret) return ret; + + /* We sort the hooks according to their priority before starting the path */ + list_sort(&nd->hooks, hook_cmp_priority); #endif /* WITH_HOOKS */ return 0; @@ -53,10 +56,16 @@ static int node_direction_init2(struct node_direction *nd, struct node *n) static int node_direction_init(struct node_direction *nd, struct node *n) { + int ret; + nd->enabled = 0; nd->vectorize = 1; nd->builtin = 1; + ret = list_init(&nd->hooks); + if (ret) + return ret; + return 0; } @@ -64,10 +73,6 @@ static int node_direction_destroy(struct node_direction *nd, struct node *n) { int ret; - ret = list_destroy(&nd->signals, (dtor_cb_t) signal_decref, true); - if (ret) - return ret; - #ifdef WITH_HOOKS ret = list_destroy(&nd->hooks, (dtor_cb_t) hook_destroy, true); if (ret) @@ -83,14 +88,12 @@ static int node_direction_parse(struct node_direction *nd, struct node *n, json_ json_error_t err; json_t *json_hooks = NULL; - json_t *json_signals = NULL; nd->cfg = cfg; nd->enabled = 1; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b, s?: b }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: i, s?: b, s?: b }", "hooks", &json_hooks, - "signals", &json_signals, "vectorize", &nd->vectorize, "builtin", &nd->builtin, "enabled", &nd->enabled @@ -110,12 +113,6 @@ static int node_direction_parse(struct node_direction *nd, struct node *n, json_ } #endif /* WITH_HOOKS */ - if (json_signals) { - ret = signal_parse_list(&nd->signals, json_signals); - if (ret) - error("Failed to parse signal definition of node '%s'", node_name(n)); - } - return 0; } @@ -136,9 +133,6 @@ static int node_direction_start(struct node_direction *nd, struct node *n) #ifdef WITH_HOOKS int ret; - /* We sort the hooks according to their priority before starting the path */ - list_sort(&nd->hooks, hook_cmp_priority); - for (size_t i = 0; i < list_length(&nd->hooks); i++) { struct hook *h = (struct hook *) list_at(&nd->hooks, i); @@ -181,9 +175,9 @@ int node_init(struct node *n, struct node_type *vt) n->_name = NULL; n->_name_long = NULL; - /* Default values */ - n->samplelen = DEFAULT_SAMPLE_LENGTH; + list_init(&n->signals); + /* Default values */ ret = node_direction_init(&n->in, n); if (ret) return ret; @@ -199,27 +193,61 @@ int node_init(struct node *n, struct node_type *vt) return 0; } +int node_init2(struct node *n) +{ + int ret; + + assert(n->state == STATE_CHECKED); + + ret = node_direction_init2(&n->in, n); + if (ret) + return ret; + + ret = node_direction_init2(&n->out, n); + if (ret) + return ret; + + return 0; +} + int node_parse(struct node *n, json_t *json, const char *name) { struct node_type *nt; - int ret; + int ret, samplelen = DEFAULT_SAMPLE_LENGTH; json_error_t err; + json_t *json_signals = NULL; const char *type; n->name = strdup(name); - ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: i }", + ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o, s?: i } }", "type", &type, - "samplelen", &n->samplelen + "in", + "signals", &json_signals, + "samplelen", &samplelen ); if (ret) - jerror(&err, "Failed to parse node '%s'", node_name(n)); + jerror(&err, "Failed to parse node %s", node_name(n)); nt = node_type_lookup(type); assert(nt == node_type(n)); + if (nt->flags & NODE_TYPE_PROVIDES_SIGNALS) { + if (json_signals) + error("Node %s does not support signal definitions", node_name(n)); + } + else { + if (json_signals) { + ret = signal_list_parse(&n->signals, json_signals); + if (ret) + error("Failed to parse signal definition of node %s", node_name(n)); + } + else + signal_list_generate(&n->signals, samplelen, SIGNAL_TYPE_AUTO); + } + struct { const char *str; struct node_direction *dir; @@ -228,10 +256,10 @@ int node_parse(struct node *n, json_t *json, const char *name) { "out", &n->out } }; - const char *fields[] = { "builtin", "vectorize", "signals", "hooks" }; + const char *fields[] = { "builtin", "vectorize", "hooks" }; for (int j = 0; j < ARRAY_LEN(dirs); j++) { - json_t *json_dir = json_object_get(json, dirs [j].str); + json_t *json_dir = json_object_get(json, dirs[j].str); // Skip if direction is unused if (!json_dir) @@ -291,22 +319,20 @@ int node_start(struct node *n) assert(node_type(n)->state == STATE_STARTED); info("Starting node %s", node_name_long(n)); - { - ret = node_direction_start(&n->in, n); - if (ret) - return ret; - ret = node_direction_start(&n->out, n); - if (ret) - return ret; + ret = node_direction_start(&n->in, n); + if (ret) + return ret; - ret = node_type(n)->start ? node_type(n)->start(n) : 0; - if (ret) - return ret; - } + ret = node_direction_start(&n->out, n); + if (ret) + return ret; + + ret = node_type(n)->start ? node_type(n)->start(n) : 0; + if (ret) + return ret; n->state = STATE_STARTED; - n->sequence = 0; return ret; @@ -343,6 +369,10 @@ int node_destroy(struct node *n) int ret; assert(n->state != STATE_DESTROYED && n->state != STATE_STARTED); + ret = list_destroy(&n->signals, (dtor_cb_t) signal_decref, false); + if (ret) + return ret; + ret = node_direction_destroy(&n->in, n); if (ret) return ret; @@ -351,11 +381,10 @@ int node_destroy(struct node *n) if (ret) return ret; - if (node_type(n)->destroy){ + if (node_type(n)->destroy) { ret = (int) node_type(n)->destroy(n); - if(ret){ + if (ret) return ret; - } } list_remove(&node_type(n)->instances, n); @@ -469,11 +498,13 @@ char * node_name_long(struct node *n) if (node_type(n)->print) { struct node_type *vt = node_type(n); char *name_long = vt->print(n); - strcatf(&n->_name_long, "%s: #in.hooks=%zu, in.vectorize=%d, #out.hooks=%zu, out.vectorize=%d, samplelen=%d, %s", + strcatf(&n->_name_long, "%s: #in.signals=%zu, #in.hooks=%zu, in.vectorize=%d, #out.hooks=%zu, out.vectorize=%d, %s", node_name(n), + list_length(&n->signals), list_length(&n->in.hooks), n->in.vectorize, list_length(&n->out.hooks), n->out.vectorize, - n->samplelen, name_long); + name_long + ); free(name_long); } diff --git a/lib/path.c b/lib/path.c index d82790082..f4db9f1f9 100644 --- a/lib/path.c +++ b/lib/path.c @@ -38,6 +38,7 @@ #include #include #include +#include /* Forward declaration */ static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt); @@ -50,7 +51,7 @@ static int path_source_init(struct path_source *ps) if (ps->node->_vt->pool_size) pool_size = ps->node->_vt->pool_size; - ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(ps->node->in.samplelen), node_memory_type(ps->node, &memory_hugepage)); + ret = pool_init(&ps->pool, pool_size, SAMPLE_LENGTH(list_length(&ps->node->signals)), node_memory_type(ps->node, &memory_hugepage)); if (ret) return ret; @@ -115,7 +116,8 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) ? sample_clone(p->last_sample) : sample_clone(muxed_smps[i-1]); - muxed_smps[i]->sequence = p->last_sequence + 1; + muxed_smps[i]->sequence = p->last_sequence++; + muxed_smps[i]->ts = tomux_smps[i]->ts; mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL); } @@ -301,11 +303,25 @@ static void * path_run_poll(void *arg) int path_init(struct path *p) { + int ret; + assert(p->state == STATE_DESTROYED); - list_init(&p->destinations); - list_init(&p->sources); - list_init(&p->signals); + ret = list_init(&p->destinations); + if (ret) + return ret; + + ret = list_init(&p->sources); + if (ret) + return ret; + + ret = list_init(&p->signals); + if (ret) + return ret; + + ret = list_init(&p->hooks); + if (ret) + return ret; p->_name = NULL; @@ -379,7 +395,7 @@ int path_init2(struct path *p) /* Initialize destinations */ struct memory_type *pool_mt = &memory_hugepage; - int pool_size = (unsigned) MAX(1, list_length(&p->destinations) * p->queuelen); + int pool_size = MAX(1, list_length(&p->destinations)) * p->queuelen; for (size_t i = 0; i < list_length(&p->destinations); i++) { struct path_destination *pd = (struct path_destination *) list_at(&p->destinations, i); @@ -395,6 +411,9 @@ int path_init2(struct path *p) return ret; } + bitset_init(&p->received, list_length(&p->sources)); + bitset_init(&p->mask, list_length(&p->sources)); + /* Initialize sources */ for (size_t i = 0; i < list_length(&p->sources); i++) { struct path_source *ps = (struct path_source *) list_at(&p->sources, i); @@ -402,16 +421,6 @@ int path_init2(struct path *p) ret = path_source_init(ps); if (ret) return ret; - } - - bitset_init(&p->received, list_length(&p->sources)); - bitset_init(&p->mask, list_length(&p->sources)); - - /* Calc sample length of path and initialize bitset */ - p->samplelen = 0; - - for (size_t i = 0; i < list_length(&p->sources); i++) { - struct path_source *ps = (struct path_source *) list_at(&p->sources, i); if (ps->masked) bitset_set(&p->mask, i); @@ -419,25 +428,42 @@ int path_init2(struct path *p) for (size_t i = 0; i < list_length(&ps->mappings); i++) { struct mapping_entry *me = (struct mapping_entry *) list_at(&ps->mappings, i); - int len = me->length; int off = me->offset; + int len = me->length; - if (off + len > p->samplelen) - p->samplelen = off + len; + for (int j = 0; j < len; j++) { + struct signal *sig; + + /* For data mappings we simple refer to the existing + * signal descriptors of the source node. */ + if (me->type == MAPPING_TYPE_DATA) { + sig = (struct signal *) list_at_safe(&me->node->signals, me->data.offset + j); + if (!sig) { + warn("Failed to create signal description for path %s", path_name(p)); + continue; + } + + signal_incref(sig); + } + /* For other mappings we create new signal descriptors */ + else { + sig = alloc(sizeof(struct signal)); + + ret = signal_init_from_mapping(sig, me, j); + if (ret) + return -1; + } + + list_extend(&p->signals, off + j + 1, NULL); + list_set(&p->signals, off + j, sig); + } } } - if (!p->samplelen) - p->samplelen = DEFAULT_SAMPLE_LENGTH; - - ret = pool_init(&p->pool, pool_size, SAMPLE_LENGTH(p->samplelen), pool_mt); + ret = pool_init(&p->pool, pool_size, SAMPLE_LENGTH(list_length(&p->signals)), pool_mt); if (ret) return ret; - p->last_sample = sample_alloc(&p->pool); - if (!p->last_sample) - return -1; - /* Prepare poll() */ if (p->poll) { ret = path_init_poll(p); @@ -669,15 +695,16 @@ int path_start(struct path *p) mask = bitset_dump(&p->mask); - info("Starting path %s: mode=%s, poll=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, samplelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu", + info("Starting path %s: #signals=%zu, mode=%s, poll=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu", path_name(p), + list_length(&p->signals), mode, p->poll ? "yes" : "no", mask, p->rate, p->enabled ? "yes" : "no", p->reverse ? "yes" : "no", - p->queuelen, p->samplelen, + p->queuelen, list_length(&p->hooks), list_length(&p->sources), list_length(&p->destinations) @@ -699,25 +726,20 @@ int path_start(struct path *p) bitset_clear_all(&p->received); - /* We initialize the intial sample with zeros */ - for (size_t i = 0; i < list_length(&p->sources); i++) { - struct path_source *ps = (struct path_source *) list_at(&p->sources, i); + /* We initialize the intial sample */ + p->last_sample = sample_alloc(&p->pool); + if (!p->last_sample) + return -1; - for (size_t j = 0; j < list_length(&ps->mappings); j++) { - struct mapping_entry *me = (struct mapping_entry *) list_at(&ps->mappings, j); + p->last_sample->length = list_length(&p->signals); + p->last_sample->signals = &p->signals; + p->last_sample->sequence = 0; + p->last_sample->flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_DATA; - int len = me->length; - int off = me->offset; + for (size_t i = 0; i < p->last_sample->length; i++) { + struct signal *sig = (struct signal *) list_at(p->last_sample->signals, i); - if (len + off > p->last_sample->length) - p->last_sample->length = len + off; - - for (int k = off; k < off + len; k++) { - p->last_sample->data[k].f = 0; - - sample_set_data_format(p->last_sample, k, SAMPLE_DATA_FORMAT_FLOAT); - } - } + p->last_sample->data[i] = sig->init; } /* Start one thread per path for sending to destinations @@ -762,6 +784,8 @@ int path_stop(struct path *p) } #endif /* WITH_HOOKS */ + sample_decref(p->last_sample); + p->state = STATE_STOPPED; return 0; @@ -777,7 +801,7 @@ int path_destroy(struct path *p) #endif list_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true); list_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true); - list_destroy(&p->signals, (dtor_cb_t) signal_decref, true); + list_destroy(&p->signals, (dtor_cb_t) signal_decref, false); if (p->reader.pfds) free(p->reader.pfds); diff --git a/lib/sample.c b/lib/sample.c index 9bae7afd8..6f86da5eb 100644 --- a/lib/sample.c +++ b/lib/sample.c @@ -28,6 +28,8 @@ #include #include #include +#include +#include int sample_init(struct sample *s) { @@ -150,6 +152,7 @@ int sample_copy(struct sample *dst, struct sample *src) dst->sequence = src->sequence; dst->flags = src->flags; dst->ts = src->ts; + dst->signals = src->signals; memcpy(&dst->data, &src->data, SAMPLE_DATA_LENGTH(dst->length)); @@ -217,7 +220,7 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags) } /* Compare timestamp */ - if (flags & SAMPLE_HAS_ORIGIN) { + if (flags & SAMPLE_HAS_TS_ORIGIN) { if (time_delta(&a->ts.origin, &b->ts.origin) > epsilon) { printf("ts.origin: %f != %f\n", time_to_double(&a->ts.origin), time_to_double(&b->ts.origin)); return 3; @@ -225,27 +228,47 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags) } /* Compare data */ - if (flags & SAMPLE_HAS_VALUES) { + if (flags & SAMPLE_HAS_DATA) { if (a->length != b->length) { printf("length: %d != %d\n", a->length, b->length); return 4; } for (int i = 0; i < a->length; i++) { - switch (sample_get_data_format(a, i)) { - case SAMPLE_DATA_FORMAT_FLOAT: + /* Compare format */ + if (sample_format(a, i) != sample_format(b, i)) + return 6; + + switch (sample_format(a, i)) { + case SIGNAL_TYPE_FLOAT: if (fabs(a->data[i].f - b->data[i].f) > epsilon) { printf("data[%d].f: %f != %f\n", i, a->data[i].f, b->data[i].f); return 5; } break; - case SAMPLE_DATA_FORMAT_INT: + case SIGNAL_TYPE_INTEGER: if (a->data[i].i != b->data[i].i) { printf("data[%d].i: %" PRId64 " != %" PRId64 "\n", i, a->data[i].i, b->data[i].i); return 5; } break; + + case SIGNAL_TYPE_BOOLEAN: + if (a->data[i].b != b->data[i].b) { + printf("data[%d].b: %s != %s\n", i, a->data[i].b ? "true" : "false", b->data[i].b ? "true" : "false"); + return 5; + } + break; + + case SIGNAL_TYPE_COMPLEX: + if (cabs(a->data[i].z - b->data[i].z) > epsilon) { + printf("data[%d].z: %f+%fi != %f+%fi\n", i, creal(a->data[i].z), cimag(a->data[i].z), creal(b->data[i].z), cimag(b->data[i].z)); + return 5; + } + break; + + default: { } } } } @@ -253,11 +276,11 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags) return 0; } -enum signal_format sample_format(const struct sample *s, unsigned idx) +enum signal_type sample_format(const struct sample *s, unsigned idx) { struct signal *sig; sig = (struct signal *) list_at_safe(s->signals, idx); - return sig ? sig->format : SIGNAL_FORMAT_UNKNOWN; + return sig ? sig->type : SIGNAL_TYPE_AUTO; } diff --git a/lib/signal.c b/lib/signal.c index 356cc48b4..ccfb554fd 100644 --- a/lib/signal.c +++ b/lib/signal.c @@ -34,7 +34,7 @@ int signal_init(struct signal *s) s->name = NULL; s->unit = NULL; - s->format = SIGNAL_FORMAT_AUTO; + s->type = SIGNAL_TYPE_AUTO; s->refcnt = ATOMIC_VAR_INIT(1); @@ -57,7 +57,7 @@ int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, u case MAPPING_TYPE_STATS: switch (me->stats.type) { case MAPPING_STATS_TYPE_TOTAL: - s->format = SIGNAL_FORMAT_INT; + s->type = SIGNAL_TYPE_INTEGER; break; case MAPPING_STATS_TYPE_LAST: @@ -66,7 +66,7 @@ int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, u case MAPPING_STATS_TYPE_MEAN: case MAPPING_STATS_TYPE_VAR: case MAPPING_STATS_TYPE_STDDEV: - s->format = SIGNAL_FORMAT_FLOAT; + s->type = SIGNAL_TYPE_FLOAT; break; } break; @@ -75,13 +75,13 @@ int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, u switch (me->header.type) { case MAPPING_HEADER_TYPE_LENGTH: case MAPPING_HEADER_TYPE_SEQUENCE: - s->format = SIGNAL_FORMAT_INT; + s->type = SIGNAL_TYPE_INTEGER; break; } break; case MAPPING_TYPE_TIMESTAMP: - s->format = SIGNAL_FORMAT_INT; + s->type = SIGNAL_TYPE_INTEGER; break; case MAPPING_TYPE_DATA: @@ -103,7 +103,7 @@ int signal_destroy(struct signal *s) return 0; } -struct signal * signal_create(const char *name, const char *unit, enum signal_format fmt) +struct signal * signal_create(const char *name, const char *unit, enum signal_type fmt) { int ret; struct signal *sig; @@ -122,7 +122,7 @@ struct signal * signal_create(const char *name, const char *unit, enum signal_fo if (unit) sig->unit = strdup(unit); - sig->format = fmt; + sig->type = fmt; return sig; } @@ -156,6 +156,29 @@ int signal_decref(struct signal *s) return prev - 1; } +struct signal * signal_copy(struct signal *s) +{ + struct signal *ns; + + ns = alloc(sizeof(struct signal)); + if (!ns) + return NULL; + + signal_init(ns); + + ns->type = s->type; + ns->init = s->init; + ns->enabled = s->enabled; + + if (s->name) + ns->name = strdup(s->name); + + if (s->unit) + ns->name = strdup(s->unit); + + return ns; +} + int signal_parse(struct signal *s, json_t *cfg) { int ret; @@ -163,12 +186,12 @@ int signal_parse(struct signal *s, json_t *cfg) json_t *json_init = NULL; const char *name = NULL; const char *unit = NULL; - const char *format = NULL; + const char *type = NULL; ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: o, s?: b }", "name", &name, "unit", &unit, - "format", &format, + "type", &type, "init", &json_init, "enabled", &s->enabled ); @@ -181,9 +204,9 @@ int signal_parse(struct signal *s, json_t *cfg) if (unit) s->unit = strdup(unit); - if (format) { - s->format = signal_format_from_str(format); - if (s->format == SIGNAL_FORMAT_INVALID) + if (type) { + s->type = signal_type_from_str(type); + if (s->type == SIGNAL_TYPE_INVALID) return -1; } @@ -229,7 +252,7 @@ int signal_list_parse(struct list *list, json_t *cfg) return 0; } -int signal_list_generate(struct list *list, unsigned len, enum signal_format fmt) +int signal_list_generate(struct list *list, unsigned len, enum signal_type fmt) { for (int i = 0; i < len; i++) { char name[32]; @@ -253,126 +276,126 @@ void signal_list_dump(const struct list *list) struct signal *sig = list_at(list, i); if (sig->unit) - info(" %d: %s [%s] = %s", i, sig->name, sig->unit, signal_format_to_str(sig->format)); + info(" %d: %s [%s] = %s", i, sig->name, sig->unit, signal_type_to_str(sig->type)); else - info(" %d: %s = %s", i, sig->name, signal_format_to_str(sig->format)); + info(" %d: %s = %s", i, sig->name, signal_type_to_str(sig->type)); } } -/* Signal format */ +/* Signal type */ -enum signal_format signal_format_from_str(const char *str) +enum signal_type signal_type_from_str(const char *str) { if (!strcmp(str, "boolean")) - return SIGNAL_FORMAT_BOOL; + return SIGNAL_TYPE_BOOLEAN; else if (!strcmp(str, "complex")) - return SIGNAL_FORMAT_COMPLEX; + return SIGNAL_TYPE_COMPLEX; else if (!strcmp(str, "float")) - return SIGNAL_FORMAT_FLOAT; + return SIGNAL_TYPE_FLOAT; else if (!strcmp(str, "integer")) - return SIGNAL_FORMAT_INT; + return SIGNAL_TYPE_INTEGER; else if (!strcmp(str, "auto")) - return SIGNAL_FORMAT_AUTO; + return SIGNAL_TYPE_AUTO; else - return SIGNAL_FORMAT_INVALID; + return SIGNAL_TYPE_INVALID; } -const char * signal_format_to_str(enum signal_format fmt) +const char * signal_type_to_str(enum signal_type fmt) { switch (fmt) { - case SIGNAL_FORMAT_BOOL: + case SIGNAL_TYPE_BOOLEAN: return "boolean"; - case SIGNAL_FORMAT_COMPLEX: + case SIGNAL_TYPE_COMPLEX: return "complex"; - case SIGNAL_FORMAT_FLOAT: + case SIGNAL_TYPE_FLOAT: return "float"; - case SIGNAL_FORMAT_INT: + case SIGNAL_TYPE_INTEGER: return "integer"; - case SIGNAL_FORMAT_AUTO: + case SIGNAL_TYPE_AUTO: return "auto"; - case SIGNAL_FORMAT_INVALID: + case SIGNAL_TYPE_INVALID: return "invalid"; } return NULL; } -enum signal_format signal_format_detect(const char *val) +enum signal_type signal_type_detect(const char *val) { char *brk; int len; - debug(LOG_IO | 5, "Attempt to detect format of: %s", val); + debug(LOG_IO | 5, "Attempt to detect type of: %s", val); brk = strchr(val, 'i'); if (brk) - return SIGNAL_FORMAT_COMPLEX; + return SIGNAL_TYPE_COMPLEX; brk = strchr(val, '.'); if (brk) - return SIGNAL_FORMAT_FLOAT; + return SIGNAL_TYPE_FLOAT; len = strlen(val); if (len == 1 && (val[0] == '1' || val[0] == '0')) - return SIGNAL_FORMAT_BOOL; + return SIGNAL_TYPE_BOOLEAN; - return SIGNAL_FORMAT_INT; + return SIGNAL_TYPE_INTEGER; } /* Signal data */ void signal_data_set(union signal_data *data, const struct signal *sig, double val) { - switch (sig->format) { - case SIGNAL_FORMAT_BOOL: + switch (sig->type) { + case SIGNAL_TYPE_BOOLEAN: data->b = val; break; - case SIGNAL_FORMAT_FLOAT: + case SIGNAL_TYPE_FLOAT: data->f = val; break; - case SIGNAL_FORMAT_INT: + case SIGNAL_TYPE_INTEGER: data->i = val; break; - case SIGNAL_FORMAT_COMPLEX: + case SIGNAL_TYPE_COMPLEX: data->z = val; break; - case SIGNAL_FORMAT_INVALID: - case SIGNAL_FORMAT_AUTO: + case SIGNAL_TYPE_INVALID: + case SIGNAL_TYPE_AUTO: memset(data, 0, sizeof(union signal_data)); break; } } -void signal_data_convert(union signal_data *data, const struct signal *from, const struct signal *to) +void signal_data_cast(union signal_data *data, const struct signal *from, const struct signal *to) { - if (from == to) /* Nothing to do */ + if (from->type == to->type) /* Nothing to do */ return; - switch (to->format) { - case SIGNAL_FORMAT_BOOL: - switch(from->format) { - case SIGNAL_FORMAT_BOOL: + switch (to->type) { + case SIGNAL_TYPE_BOOLEAN: + switch(from->type) { + case SIGNAL_TYPE_BOOLEAN: data->b = data->b; break; - case SIGNAL_FORMAT_INT: + case SIGNAL_TYPE_INTEGER: data->b = data->i; break; - case SIGNAL_FORMAT_FLOAT: + case SIGNAL_TYPE_FLOAT: data->b = data->f; break; - case SIGNAL_FORMAT_COMPLEX: + case SIGNAL_TYPE_COMPLEX: data->b = creal(data->z); break; @@ -380,21 +403,21 @@ void signal_data_convert(union signal_data *data, const struct signal *from, con } break; - case SIGNAL_FORMAT_INT: - switch(from->format) { - case SIGNAL_FORMAT_BOOL: + case SIGNAL_TYPE_INTEGER: + switch(from->type) { + case SIGNAL_TYPE_BOOLEAN: data->i = data->b; break; - case SIGNAL_FORMAT_INT: + case SIGNAL_TYPE_INTEGER: data->i = data->i; break; - case SIGNAL_FORMAT_FLOAT: + case SIGNAL_TYPE_FLOAT: data->i = data->f; break; - case SIGNAL_FORMAT_COMPLEX: + case SIGNAL_TYPE_COMPLEX: data->i = creal(data->z); break; @@ -402,21 +425,21 @@ void signal_data_convert(union signal_data *data, const struct signal *from, con } break; - case SIGNAL_FORMAT_FLOAT: - switch(from->format) { - case SIGNAL_FORMAT_BOOL: + case SIGNAL_TYPE_FLOAT: + switch(from->type) { + case SIGNAL_TYPE_BOOLEAN: data->f = data->b; break; - case SIGNAL_FORMAT_INT: + case SIGNAL_TYPE_INTEGER: data->f = data->i; break; - case SIGNAL_FORMAT_FLOAT: + case SIGNAL_TYPE_FLOAT: data->f = data->f; break; - case SIGNAL_FORMAT_COMPLEX: + case SIGNAL_TYPE_COMPLEX: data->f = creal(data->z); break; @@ -424,21 +447,21 @@ void signal_data_convert(union signal_data *data, const struct signal *from, con } break; - case SIGNAL_FORMAT_COMPLEX: - switch(from->format) { - case SIGNAL_FORMAT_BOOL: + case SIGNAL_TYPE_COMPLEX: + switch(from->type) { + case SIGNAL_TYPE_BOOLEAN: data->z = CMPLXF(data->b, 0); break; - case SIGNAL_FORMAT_INT: + case SIGNAL_TYPE_INTEGER: data->z = CMPLXF(data->i, 0); break; - case SIGNAL_FORMAT_FLOAT: + case SIGNAL_TYPE_FLOAT: data->z = CMPLXF(data->f, 0); break; - case SIGNAL_FORMAT_COMPLEX: + case SIGNAL_TYPE_COMPLEX: data->z = data->z; break; @@ -452,20 +475,20 @@ void signal_data_convert(union signal_data *data, const struct signal *from, con int signal_data_parse_str(union signal_data *data, const struct signal *sig, const char *ptr, char **end) { - switch (sig->format) { - case SIGNAL_FORMAT_FLOAT: + switch (sig->type) { + case SIGNAL_TYPE_FLOAT: data->f = strtod(ptr, end); break; - case SIGNAL_FORMAT_INT: + case SIGNAL_TYPE_INTEGER: data->i = strtol(ptr, end, 10); break; - case SIGNAL_FORMAT_BOOL: + case SIGNAL_TYPE_BOOLEAN: data->b = strtol(ptr, end, 10); break; - case SIGNAL_FORMAT_COMPLEX: { + case SIGNAL_TYPE_COMPLEX: { float real, imag; real = strtod(ptr, end); @@ -487,8 +510,8 @@ int signal_data_parse_str(union signal_data *data, const struct signal *sig, con break; } - case SIGNAL_FORMAT_AUTO: - case SIGNAL_FORMAT_INVALID: + case SIGNAL_TYPE_AUTO: + case SIGNAL_TYPE_INVALID: return -1; } @@ -499,20 +522,20 @@ int signal_data_parse_json(union signal_data *data, const struct signal *sig, js { int ret; - switch (sig->format) { - case SIGNAL_FORMAT_FLOAT: + switch (sig->type) { + case SIGNAL_TYPE_FLOAT: data->f = json_real_value(cfg); break; - case SIGNAL_FORMAT_INT: + case SIGNAL_TYPE_INTEGER: data->i = json_integer_value(cfg); break; - case SIGNAL_FORMAT_BOOL: + case SIGNAL_TYPE_BOOLEAN: data->b = json_boolean_value(cfg); break; - case SIGNAL_FORMAT_COMPLEX: { + case SIGNAL_TYPE_COMPLEX: { double real, imag; ret = json_unpack(cfg, "{ s: F, s: F }", @@ -526,8 +549,8 @@ int signal_data_parse_json(union signal_data *data, const struct signal *sig, js break; } - case SIGNAL_FORMAT_INVALID: - case SIGNAL_FORMAT_AUTO: + case SIGNAL_TYPE_INVALID: + case SIGNAL_TYPE_AUTO: return -1; } @@ -536,17 +559,17 @@ int signal_data_parse_json(union signal_data *data, const struct signal *sig, js int signal_data_snprint(const union signal_data *data, const struct signal *sig, char *buf, size_t len) { - switch (sig->format) { - case SIGNAL_FORMAT_FLOAT: + switch (sig->type) { + case SIGNAL_TYPE_FLOAT: return snprintf(buf, len, "%.6f", data->f); - case SIGNAL_FORMAT_INT: + case SIGNAL_TYPE_INTEGER: return snprintf(buf, len, "%" PRIi64, data->i); - case SIGNAL_FORMAT_BOOL: + case SIGNAL_TYPE_BOOLEAN: return snprintf(buf, len, "%u", data->b); - case SIGNAL_FORMAT_COMPLEX: + case SIGNAL_TYPE_COMPLEX: return snprintf(buf, len, "%.6f%+.6fi", creal(data->z), cimag(data->z)); default: diff --git a/lib/super_node.c b/lib/super_node.c index d498eac77..d6c81041d 100644 --- a/lib/super_node.c +++ b/lib/super_node.c @@ -384,12 +384,12 @@ int super_node_start(struct super_node *sn) for (size_t i = 0; i < list_length(&sn->nodes); i++) { struct node *n = (struct node *) list_at(&sn->nodes, i); + ret = node_init2(n); + if (ret) + error("Failed to prepare node: %s", node_name(n)); + int refs = list_count(&sn->paths, (cmp_cb_t) path_uses_node, n); if (refs > 0) { - ret = node_init2(n); - if (ret) - error("Failed to start node: %s", node_name(n)); - ret = node_start(n); if (ret) error("Failed to start node: %s", node_name(n)); @@ -405,7 +405,7 @@ int super_node_start(struct super_node *sn) if (p->enabled) { ret = path_init2(p); if (ret) - error("Failed to start path: %s", path_name(p)); + error("Failed to prepare path: %s", path_name(p)); ret = path_start(p); if (ret) diff --git a/src/villas-convert.cpp b/src/villas-convert.cpp index 2be242297..3070d9b01 100644 --- a/src/villas-convert.cpp +++ b/src/villas-convert.cpp @@ -108,10 +108,14 @@ check: if (optarg == endptr) if (!fmt) error("Invalid format: %s", dirs[i].name); - ret = io_init(dirs[i].io, fmt, NULL, SAMPLE_HAS_ALL); + ret = io_init_auto(dirs[i].io, fmt, DEFAULT_SAMPLE_LENGTH, SAMPLE_HAS_ALL); if (ret) error("Failed to initialize IO: %s", dirs[i].name); + ret = io_check(dirs[i].io); + if (ret) + error("Failed to validate IO configuration"); + ret = io_open(dirs[i].io, NULL); if (ret) error("Failed to open IO"); diff --git a/src/villas-hook.cpp b/src/villas-hook.cpp index 780ad8888..a119968c8 100644 --- a/src/villas-hook.cpp +++ b/src/villas-hook.cpp @@ -200,10 +200,14 @@ check: if (optarg == endptr) if (!ft) error("Unknown IO format '%s'", format); - ret = io_init(&io, ft, NULL, SAMPLE_HAS_ALL); + ret = io_init_auto(&io, ft, DEFAULT_SAMPLE_LENGTH, SAMPLE_HAS_ALL); if (ret) error("Failed to initialize IO"); + ret = io_check(&io); + if (ret) + error("Failed to validate IO configuration"); + ret = io_open(&io, NULL); if (ret) error("Failed to open IO"); diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index 5ab35f73c..7aafd2d03 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -205,7 +206,7 @@ static void * recv_loop(void *ctx) /* Initialize memory */ unsigned pool_size = node_type(node)->pool_size ? node_type(node)->pool_size : LOG2_CEIL(node->in.vectorize); - ret = pool_init(&recvv.pool, pool_size, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), node_memory_type(node, &memory_hugepage)); + ret = pool_init(&recvv.pool, pool_size, SAMPLE_LENGTH(list_length(&node->signals)), node_memory_type(node, &memory_hugepage)); if (ret < 0) error("Failed to allocate memory for receive pool."); @@ -222,16 +223,15 @@ static void * recv_loop(void *ctx) recv = node_read(node, smps, allocated, &release); if (recv < 0) warn("Failed to receive samples from node %s: reason=%d", node_name(node), recv); + else { + io_print(&io, smps, recv); - - io_print(&io, smps, recv); + cnt += recv; + if (recvv.limit > 0 && cnt >= recvv.limit) + goto leave; + } sample_decref_many(smps, release); - - cnt += recv; - if (recvv.limit > 0 && cnt >= recvv.limit) - goto leave; - pthread_testcancel(); } @@ -343,10 +343,14 @@ check: if (optarg == endptr) if (!fmt) error("Invalid format: %s", format); - ret = io_init(&io, fmt, NULL, SAMPLE_HAS_ALL); + ret = io_init_auto(&io, fmt, DEFAULT_SAMPLE_LENGTH, SAMPLE_HAS_ALL); if (ret) error("Failed to initialize IO"); + ret = io_check(&io); + if (ret) + error("Failed to validate IO configuration"); + ret = io_open(&io, NULL); if (ret) error("Failed to open IO"); @@ -358,8 +362,13 @@ check: if (optarg == endptr) #ifdef Libwebsockets_FOUND /* Only start web subsystem if villas-pipe is used with a websocket node */ if (node->_vt->start == websocket_start) { - web_start(&sn.web); - api_start(&sn.api); + ret = web_start(&sn.web); + if (ret) + error("Failed to start web subsystem"); + + ret = api_start(&sn.api); + if (ret) + error("Failed to start API subsystem"); } #endif /* Libwebsockets_FOUND */ @@ -368,7 +377,7 @@ check: if (optarg == endptr) ret = node_type_start(node->_vt, &sn); if (ret) - error("Failed to intialize node type: %s", node_type_name(node->_vt)); + error("Failed to intialize node type %s: reason=%d", node_type_name(node->_vt), ret); ret = node_check(node); if (ret) diff --git a/src/villas-signal.cpp b/src/villas-signal.cpp index 71ec9bc60..fa65272fc 100644 --- a/src/villas-signal.cpp +++ b/src/villas-signal.cpp @@ -186,6 +186,7 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) int main(int argc, char *argv[]) { int ret; + json_t *cfg; struct node_type *nt; struct format_type *ft; @@ -207,6 +208,12 @@ int main(int argc, char *argv[]) if (ret) error("Failed to intialize signals"); + ft = format_type_lookup(format); + if (!ft) + error("Invalid output format '%s'", format); + + memory_init(0); // Otherwise, ht->size in hash_table_hash() will be zero + nt = node_type_lookup("signal"); if (!nt) error("Signal generation is not supported."); @@ -215,19 +222,6 @@ int main(int argc, char *argv[]) if (ret) error("Failed to initialize node"); - ft = format_type_lookup(format); - if (!ft) - error("Invalid output format '%s'", format); - - memory_init(0); // Otherwise, ht->size in hash_table_hash() will be zero - - ret = io_init(&io, ft, NULL, IO_FLUSH | (SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET)); - if (ret) - error("Failed to initialize output"); - - ret = io_open(&io, NULL); - if (ret) - error("Failed to open output"); cfg = parse_cli(argc, argv); if (!cfg) { usage(); @@ -249,7 +243,7 @@ int main(int argc, char *argv[]) if (ret) error("Failed to verify node configuration"); - ret = pool_init(&q, 16, SAMPLE_LENGTH(n.in.samplelen), &memory_heap); + ret = pool_init(&q, 16, SAMPLE_LENGTH(list_length(&n.signals)), &memory_heap); if (ret) error("Failed to initialize pool"); @@ -259,7 +253,19 @@ int main(int argc, char *argv[]) ret = node_start(&n); if (ret) - serror("Failed to start node"); + error("Failed to start node %s: reason=%d", node_name(&n), ret); + + ret = io_init(&io, ft, &n.signals, IO_FLUSH | (SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET)); + if (ret) + error("Failed to initialize output"); + + ret = io_check(&io); + if (ret) + error("Failed to validate IO configuration"); + + ret = io_open(&io, NULL); + if (ret) + error("Failed to open output"); for (;;) { t = sample_alloc(&q); diff --git a/src/villas-test-cmp.cpp b/src/villas-test-cmp.cpp index 5915ffb76..43d476684 100644 --- a/src/villas-test-cmp.cpp +++ b/src/villas-test-cmp.cpp @@ -69,12 +69,12 @@ void usage() int main(int argc, char *argv[]) { - int ret; + int ret, rc = 0; /* Default values */ double epsilon = 1e-9; const char *format = "villas.human"; - int flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_VALUES | SAMPLE_HAS_ORIGIN; + int flags = SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_DATA | SAMPLE_HAS_TS_ORIGIN; struct pool pool = { .state = STATE_DESTROYED }; @@ -86,10 +86,10 @@ int main(int argc, char *argv[]) epsilon = strtod(optarg, &endptr); goto check; case 'v': - flags &= ~SAMPLE_HAS_VALUES; + flags &= ~SAMPLE_HAS_DATA; break; case 't': - flags &= ~SAMPLE_HAS_ORIGIN; + flags &= ~SAMPLE_HAS_TS_ORIGIN; break; case 's': flags &= ~SAMPLE_HAS_SEQUENCE; @@ -138,10 +138,14 @@ check: if (optarg == endptr) if (!s[i].fmt) error("Invalid IO format: %s", s[i].format); - ret = io_init(&s[i].io, s[i].fmt, NULL, 0); + ret = io_init_auto(&s[i].io, s[i].fmt, DEFAULT_SAMPLE_LENGTH, 0); if (ret) error("Failed to initialize IO"); + ret = io_check(&s[i].io); + if (ret) + error("Failed to validate IO configuration"); + ret = io_open(&s[i].io, s[i].path); if (ret) error("Failed to open file: %s", s[i].path); @@ -168,7 +172,7 @@ retry: eofs = 0; ret = 0; else { std::cout << "length unequal" << std::endl; - ret = 1; + rc = 1; } goto out; @@ -180,15 +184,16 @@ retry: eofs = 0; if (ret <= 0) failed++; } - if (failed) goto retry; /* We compare all files against the first one */ for (int i = 1; i < n; i++) { ret = sample_cmp(s[0].sample, s[i].sample, epsilon, flags); - if (ret) + if (ret) { + rc = ret; goto out; + } } line++; @@ -210,5 +215,5 @@ out: for (int i = 0; i < n; i++) { if (ret) error("Failed to destroy pool"); - return ret; + return rc; }