diff --git a/etc/examples/global.conf b/etc/examples/global.conf index dff079cec..0787eb2d2 100644 --- a/etc/examples/global.conf +++ b/etc/examples/global.conf @@ -45,7 +45,7 @@ logging = { # One of: "warn", "info", "error", "off", "info" - file = "/var/log/villas-node.log" # File for logs + file = "/tmp/villas-node.log" # File for logs syslog = true # Log to syslogd } diff --git a/include/villas/nodes/zeromq.hpp b/include/villas/nodes/zeromq.hpp index 5e88b4445..869089b06 100644 --- a/include/villas/nodes/zeromq.hpp +++ b/include/villas/nodes/zeromq.hpp @@ -47,11 +47,10 @@ struct sample; struct zeromq { int ipv6; - struct format_type *format; struct io io; - struct { + struct Curve { int enabled; struct { char public_key[41]; @@ -66,18 +65,13 @@ struct zeromq { #endif } pattern; - struct { + struct Dir { void *socket; /**< ZeroMQ socket. */ void *mon_socket; - char *endpoint; - char *filter; - } in; - - struct { - void *socket; /**< ZeroMQ socket. */ struct vlist endpoints; char *filter; - } out; + int bind, pending; + } in, out; }; /** @see node_type::print */ diff --git a/lib/api/server.cpp b/lib/api/server.cpp index 5d4e3461f..22a087e94 100644 --- a/lib/api/server.cpp +++ b/lib/api/server.cpp @@ -133,7 +133,7 @@ void Server::stop() ret = close(sd); if (ret) - throw SystemError("Failed to close API socket");; + throw SystemError("Failed to close API socket"); state = State::STOPPED; } diff --git a/lib/formats/protobuf.cpp b/lib/formats/protobuf.cpp index 2b764cd09..50aa52586 100644 --- a/lib/formats/protobuf.cpp +++ b/lib/formats/protobuf.cpp @@ -155,6 +155,7 @@ int protobuf_sscan(struct io *io, const char *buf, size_t len, size_t *rbytes, s struct sample *smp = smps[i]; Villas__Node__Sample *pb_smp = pb_msg->samples[i]; + smp->flags = 0; smp->signals = io->signals; if (pb_smp->type != VILLAS__NODE__SAMPLE__TYPE__DATA) { diff --git a/lib/hooks/stats.cpp b/lib/hooks/stats.cpp index 94696e402..598a8e413 100644 --- a/lib/hooks/stats.cpp +++ b/lib/hooks/stats.cpp @@ -45,20 +45,14 @@ protected: StatsHook *parent; public: - StatsWriteHook(struct path *p, struct node *n, int fl, int prio, bool en = true) : - Hook(p, n, fl, prio, en) + StatsWriteHook(StatsHook *pa, struct path *p, struct node *n, int fl, int prio, bool en = true) : + Hook(p, n, fl, prio, en), + parent(pa) { state = State::CHECKED; } - virtual Hook::Reason process(sample *smp) - { - timespec now = time_now(); - - node->stats->update(Stats::Metric::AGE, time_delta(&smp->ts.received, &now)); - - return Reason::OK; - } + virtual Hook::Reason process(sample *smp); }; class StatsReadHook : public Hook { @@ -66,9 +60,12 @@ class StatsReadHook : public Hook { protected: sample *last; + StatsHook *parent; + public: - StatsReadHook(struct path *p, struct node *n, int fl, int prio, bool en = true) : - Hook(p, n, fl, prio, en) + StatsReadHook(StatsHook *pa, struct path *p, struct node *n, int fl, int prio, bool en = true) : + Hook(p, n, fl, prio, en), + parent(pa) { state = State::CHECKED; } @@ -92,38 +89,14 @@ public: state = State::STOPPED; } - virtual Hook::Reason process(sample *smp) - { - if (last) { - if (smp->flags & last->flags & (int) SampleFlags::HAS_TS_RECEIVED) - node->stats->update(Stats::Metric::GAP_RECEIVED, time_delta(&last->ts.received, &smp->ts.received)); - - if (smp->flags & last->flags & (int) SampleFlags::HAS_TS_ORIGIN) - node->stats->update(Stats::Metric::GAP_SAMPLE, time_delta(&last->ts.origin, &smp->ts.origin)); - - if ((smp->flags & (int) SampleFlags::HAS_TS_ORIGIN) && (smp->flags & (int) SampleFlags::HAS_TS_RECEIVED)) - node->stats->update(Stats::Metric::OWD, time_delta(&smp->ts.origin, &smp->ts.received)); - - if (smp->flags & last->flags & (int) SampleFlags::HAS_SEQUENCE) { - int dist = smp->sequence - (int32_t) last->sequence; - if (dist != 1) - node->stats->update(Stats::Metric::SMPS_REORDERED, dist); - } - } - - sample_incref(smp); - - if (last) - sample_decref(last); - - last = smp; - - return Reason::OK; - } + virtual Hook::Reason process(sample *smp); }; class StatsHook : public Hook { + friend StatsReadHook; + friend StatsWriteHook; + protected: StatsReadHook *readHook; StatsWriteHook *writeHook; @@ -150,11 +123,13 @@ public: uri() { /* Add child hooks */ - readHook = new StatsReadHook(p, n, fl, prio, en); - writeHook = new StatsWriteHook(p, n, fl, prio, en); + readHook = new StatsReadHook(this, p, n, fl, prio, en); + writeHook = new StatsWriteHook(this, p, n, fl, prio, en); - vlist_push(&node->in.hooks, (void *) readHook); - vlist_push(&node->out.hooks, (void *) writeHook); + if (node) { + vlist_push(&node->in.hooks, (void *) readHook); + vlist_push(&node->out.hooks, (void *) writeHook); + } } virtual void start() @@ -189,6 +164,15 @@ public: stats->reset(); } + virtual Hook::Reason process(sample *smp) + { + // Only call readHook if it hasnt been added to the node's hook list + if (!node) + return readHook->process(smp); + + return Hook::Reason::OK; + } + virtual void periodic() { assert(state == State::STARTED); @@ -239,12 +223,51 @@ public: /* Register statistic object to path. * * This allows the path code to update statistics. */ - node->stats = stats; + if (node) + node->stats = stats; state = State::PREPARED; } }; +Hook::Reason StatsWriteHook::process(sample *smp) +{ + timespec now = time_now(); + + parent->stats->update(Stats::Metric::AGE, time_delta(&smp->ts.received, &now)); + + return Reason::OK; +} + +Hook::Reason StatsReadHook::process(sample *smp) +{ + if (last) { + if (smp->flags & last->flags & (int) SampleFlags::HAS_TS_RECEIVED) + parent->stats->update(Stats::Metric::GAP_RECEIVED, time_delta(&last->ts.received, &smp->ts.received)); + + if (smp->flags & last->flags & (int) SampleFlags::HAS_TS_ORIGIN) + parent->stats->update(Stats::Metric::GAP_SAMPLE, time_delta(&last->ts.origin, &smp->ts.origin)); + + if ((smp->flags & (int) SampleFlags::HAS_TS_ORIGIN) && (smp->flags & (int) SampleFlags::HAS_TS_RECEIVED)) + parent->stats->update(Stats::Metric::OWD, time_delta(&smp->ts.origin, &smp->ts.received)); + + if (smp->flags & last->flags & (int) SampleFlags::HAS_SEQUENCE) { + int dist = smp->sequence - (int32_t) last->sequence; + if (dist != 1) + parent->stats->update(Stats::Metric::SMPS_REORDERED, dist); + } + } + + sample_incref(smp); + + if (last) + sample_decref(last); + + last = smp; + + return Reason::OK; +} + /* Register hook */ static HookPlugin p( "stats", diff --git a/lib/nodes/zeromq.cpp b/lib/nodes/zeromq.cpp index 4f3fdd55e..edca6d554 100644 --- a/lib/nodes/zeromq.cpp +++ b/lib/nodes/zeromq.cpp @@ -33,12 +33,13 @@ #include #include #include +#include +using namespace villas; using namespace villas::utils; static void *context; -#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_MAJOR_VERSION >= 4 && ZMQ_MINOR_VERSION >= 2 && ZMQ_MINOR_VERSION >= 3 /** Read one event off the monitor socket; return value and address * by reference, if not null, and event number by value. * @@ -75,53 +76,99 @@ static int get_monitor_event(void *monitor, int *value, char **address) return event; } -#endif int zeromq_reverse(struct node *n) { struct zeromq *z = (struct zeromq *) n->_vd; - if (vlist_length(&z->out.endpoints) != 1) + if (vlist_length(&z->out.endpoints) != 1 || + vlist_length(&z->in.endpoints) != 1) return -1; - char *subscriber = z->in.endpoint; - char *publisher = (char *) vlist_first(&z->out.endpoints); + char *subscriber = (char *) vlist_first(&z->in.endpoints); + char *publisher = (char *) vlist_first(&z->out.endpoints); - z->in.endpoint = publisher; + vlist_set(&z->in.endpoints, 0, publisher); vlist_set(&z->out.endpoints, 0, subscriber); return 0; } +int zeromq_init(struct node *n) +{ + struct zeromq *z = (struct zeromq *) n->_vd; + + z->out.bind = 1; + z->in.bind = 0; + + z->curve.enabled = false; + z->ipv6 = 0; + + z->in.endpoints.state = State::DESTROYED; + z->out.endpoints.state = State::DESTROYED; + + z->in.pending = 0; + z->out.pending = 0; + + vlist_init(&z->in.endpoints); + vlist_init(&z->out.endpoints); + + return 0; +} + +int zeromq_parse_endpoints(json_t *json_ep, struct vlist *epl) +{ + json_t *json_val; + size_t i; + const char *ep; + + switch (json_typeof(json_ep)) { + case JSON_ARRAY: + json_array_foreach(json_ep, i, json_val) { + ep = json_string_value(json_val); + if (!ep) + error("All 'publish' settings must be strings"); + + vlist_push(epl, strdup(ep)); + } + break; + + case JSON_STRING: + ep = json_string_value(json_ep); + vlist_push(epl, strdup(ep)); + break; + + default: + return -1; + } + + return 0; +} + int zeromq_parse(struct node *n, json_t *cfg) { struct zeromq *z = (struct zeromq *) n->_vd; int ret; - const char *ep = nullptr; const char *type = nullptr; const char *in_filter = nullptr; const char *out_filter = nullptr; const char *format = "villas.binary"; - size_t i; - json_t *json_pub = nullptr; + json_t *json_in_ep = nullptr; + json_t *json_out_ep = nullptr; json_t *json_curve = nullptr; - json_t *json_val; json_error_t err; - vlist_init(&z->out.endpoints); - - z->curve.enabled = false; - z->ipv6 = 0; - - ret = json_unpack_ex(cfg, &err, 0, "{ s?: { s?: s, s?: s }, s?: { s?: o, s?: s }, s?: o, s?: s, s?: b, s?: s }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: { s?: o, s?: s, s?: b }, s?: { s?: o, s?: s, s?: b }, s?: o, s?: s, s?: b, s?: s }", "in", - "subscribe", &ep, + "subscribe", &json_in_ep, "filter", &in_filter, + "bind", &z->in.bind, "out", - "publish", &json_pub, + "publish", &json_out_ep, "filter", &out_filter, + "bind", &z->out.bind, "curve", &json_curve, "pattern", &type, "ipv6", &z->ipv6, @@ -130,7 +177,6 @@ int zeromq_parse(struct node *n, json_t *cfg) if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); - z->in.endpoint = ep ? strdup(ep) : nullptr; z->in.filter = in_filter ? strdup(in_filter) : nullptr; z->out.filter = out_filter ? strdup(out_filter) : nullptr; @@ -138,28 +184,16 @@ int zeromq_parse(struct node *n, json_t *cfg) if (!z->format) error("Invalid format '%s' for node %s", format, node_name(n)); - if (json_pub) { - switch (json_typeof(json_pub)) { - case JSON_ARRAY: - json_array_foreach(json_pub, i, json_val) { - ep = json_string_value(json_val); - if (!ep) - error("All 'publish' settings must be strings"); + if (json_out_ep) { + ret = zeromq_parse_endpoints(json_out_ep, &z->out.endpoints); + if (ret) + throw ConfigError(json_out_ep, "node-config-node-zeromq-publish", "Failed to parse list of publish endpoints"); + } - vlist_push(&z->out.endpoints, strdup(ep)); - } - break; - - case JSON_STRING: - ep = json_string_value(json_pub); - - vlist_push(&z->out.endpoints, strdup(ep)); - - break; - - default: - error("Invalid type for ZeroMQ publisher setting"); - } + if (json_in_ep) { + ret = zeromq_parse_endpoints(json_in_ep, &z->in.endpoints); + if (ret) + throw ConfigError(json_out_ep, "node-config-node-zeromq-subscribe", "Failed to parse list of subscribe endpoints"); } if (json_curve) { @@ -222,14 +256,23 @@ char * zeromq_print(struct node *n) #endif } - strcatf(&buf, "format=%s, pattern=%s, ipv6=%s, crypto=%s, in.subscribe=%s, out.publish=[ ", + strcatf(&buf, "format=%s, pattern=%s, ipv6=%s, crypto=%s, in.bind=%s, out.bind=%s, in.subscribe=[ ", format_type_name(z->format), pattern, z->ipv6 ? "yes" : "no", z->curve.enabled ? "yes" : "no", - z->in.endpoint ? z->in.endpoint : "" + z->in.bind ? "yes" : "no", + z->out.bind ? "yes" : "no" ); + for (size_t i = 0; i < vlist_length(&z->in.endpoints); i++) { + char *ep = (char *) vlist_at(&z->in.endpoints, i); + + strcatf(&buf, "%s ", ep); + } + + strcatf(&buf, "], out.publish=[ "); + for (size_t i = 0; i < vlist_length(&z->out.endpoints); i++) { char *ep = (char *) vlist_at(&z->out.endpoints, i); @@ -247,6 +290,17 @@ char * zeromq_print(struct node *n) return buf; } +int zeromq_check(struct node *n) +{ + struct zeromq *z = (struct zeromq *) n->_vd; + + if (vlist_length(&z->in.endpoints) == 0 && + vlist_length(&z->out.endpoints) == 0) + return -1; + + return 0; +} + int zeromq_type_start(villas::node::SuperNode *sn) { context = zmq_ctx_new(); @@ -264,6 +318,8 @@ int zeromq_start(struct node *n) int ret; struct zeromq *z = (struct zeromq *) n->_vd; + struct zeromq::Dir* dirs[] = { &z->out, &z->in }; + ret = io_init(&z->io, z->format, &n->in.signals, (int) SampleFlags::HAS_ALL & ~(int) SampleFlags::HAS_OFFSET); if (ret) return ret; @@ -275,13 +331,13 @@ int zeromq_start(struct node *n) switch (z->pattern) { #ifdef ZMQ_BUILD_DISH case zeromq::Pattern::RADIODISH: - z->in.socket = zmq_socket(context, ZMQ_DISH); + z->in.socket = zmq_socket(context, ZMQ_DISH); z->out.socket = zmq_socket(context, ZMQ_RADIO); break; #endif case zeromq::Pattern::PUBSUB: - z->in.socket = zmq_socket(context, ZMQ_SUB); + z->in.socket = zmq_socket(context, ZMQ_SUB); z->out.socket = zmq_socket(context, ZMQ_PUB); break; } @@ -310,14 +366,6 @@ int zeromq_start(struct node *n) if (ret < 0) goto fail; - ret = zmq_setsockopt(z->out.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); - if (ret) - goto fail; - - ret = zmq_setsockopt(z->in.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); - if (ret) - goto fail; - if (z->curve.enabled) { /* Publisher has server role */ ret = zmq_setsockopt(z->out.socket, ZMQ_CURVE_SECRETKEY, z->curve.server.secret_key, 41); @@ -352,40 +400,62 @@ int zeromq_start(struct node *n) goto fail; } -#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_MAJOR_VERSION >= 4 && ZMQ_MINOR_VERSION >= 2 && ZMQ_MINOR_VERSION >= 3 - /* Monitor handshake events on the server */ - ret = zmq_socket_monitor(z->in.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEEDED | ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL | ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL | ZMQ_EVENT_HANDSHAKE_FAILED_AUTH); - if (ret < 0) - goto fail; + for (auto d : dirs) { + const char *mon_ep = d == &z->in ? "inproc://monitor-in" : "inproc://monitor-out"; - /* Create socket for collecting monitor events */ - z->in.mon_socket = zmq_socket(context, ZMQ_PAIR); - if (!z->in.mon_socket) { - ret = -1; - goto fail; - } + ret = zmq_setsockopt(d->socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); + if (ret) + goto fail; - /* Connect it to the inproc endpoints so they'll get events */ - ret = zmq_connect(z->in.mon_socket, "inproc://monitor-server"); - if (ret < 0) - goto fail; -#endif + int linger = 1000; + ret = zmq_setsockopt(d->socket, ZMQ_LINGER, &linger, sizeof(linger)); + if (ret) + goto fail; - /* Spawn server for publisher */ - for (size_t i = 0; i < vlist_length(&z->out.endpoints); i++) { - char *ep = (char *) vlist_at(&z->out.endpoints, i); - - ret = zmq_bind(z->out.socket, ep); + /* Monitor events on the server */ + ret = zmq_socket_monitor(d->socket, mon_ep, ZMQ_EVENT_ALL); if (ret < 0) goto fail; + + /* Create socket for collecting monitor events */ + d->mon_socket = zmq_socket(context, ZMQ_PAIR); + if (!d->mon_socket) { + ret = -1; + goto fail; + } + + /* Connect it to the inproc endpoints so they'll get events */ + ret = zmq_connect(d->mon_socket, mon_ep); + if (ret < 0) + goto fail; + + /* Connect / bind sockets to endpoints */ + for (size_t i = 0; i < vlist_length(&d->endpoints); i++) { + char *ep = (char *) vlist_at(&d->endpoints, i); + + if (d->bind) { + ret = zmq_bind(d->socket, ep); + if (ret < 0) + goto fail; + } + else { + ret = zmq_connect(d->socket, ep); + if (ret < 0) + goto fail; + } + + d->pending++; + } } - /* Connect subscribers to server socket */ - if (z->in.endpoint) { - ret = zmq_connect(z->in.socket, z->in.endpoint); - if (ret < 0) { - info("Failed to bind ZeroMQ socket: endpoint=%s, error=%s", z->in.endpoint, zmq_strerror(errno)); - return ret; + /* Wait for all connections to be connected */ + for (auto d : dirs) { + while (d->pending > 0) { + int evt = d->bind ? ZMQ_EVENT_LISTENING : ZMQ_EVENT_CONNECTED; + + ret = get_monitor_event(d->mon_socket, nullptr, nullptr); + if (ret == evt) + d->pending--; } } @@ -411,21 +481,23 @@ int zeromq_stop(struct node *n) int ret; struct zeromq *z = (struct zeromq *) n->_vd; - ret = zmq_close(z->in.socket); - if (ret) - return ret; + struct zeromq::Dir* dirs[] = { &z->out, &z->in }; -#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_MAJOR_VERSION >= 4 && ZMQ_MINOR_VERSION >= 2 && ZMQ_MINOR_VERSION >= 3 - ret = zmq_close(z->in.mon_socket); - if (ret) - return ret; -#endif + for (auto d : dirs) { + ret = zmq_close(d->socket); + if (ret) + return ret; + + ret = zmq_close(d->mon_socket); + if (ret) + return ret; + } ret = io_destroy(&z->io); if (ret) return ret; - return zmq_close(z->out.socket); + return 0; } int zeromq_destroy(struct node *n) @@ -581,7 +653,9 @@ static void register_plugin() { p.node.size = sizeof(struct zeromq); p.node.type.start = zeromq_type_start; p.node.type.stop = zeromq_type_stop; + p.node.init = zeromq_init; p.node.destroy = zeromq_destroy; + p.node.check = zeromq_check; p.node.parse = zeromq_parse; p.node.print = zeromq_print; p.node.start = zeromq_start; diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index 5b84363d0..6848121d8 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -461,7 +461,7 @@ check: if (optarg == endptr) throw RuntimeError("Failed to start node {}: reason={}", node_name(node), ret); PipeReceiveDirection recv_dir(node, &io, enable_recv, limit_recv); - PipeSendDirection send_dir(node, &io, enable_recv, limit_recv); + PipeSendDirection send_dir(node, &io, enable_recv, limit_send); recv_dir.startThread(); send_dir.startThread(); diff --git a/tools/docker-dev.sh b/tools/docker-dev.sh index 8b0b2d9dd..1268364e2 100755 --- a/tools/docker-dev.sh +++ b/tools/docker-dev.sh @@ -28,7 +28,7 @@ SCRIPTPATH=$(dirname $SCRIPT) SRCDIR=${SRCDIR:-$(realpath ${SCRIPTPATH}/..)} BUILDDIR=${BUILDDIR:-${SRCDIR}/build} -TAG=${TAG:-develop} +TAG=${TAG:-latest} IMAGE="villas/node-dev:${TAG}" docker run \