diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index 53912a90b..ba6fb3a94 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -29,6 +29,8 @@ #pragma once +#include + #include "node.h" #include "list.h" @@ -37,6 +39,14 @@ struct zeromq { char *filter; + struct { + int enabled; + struct { + char public_key[41]; + char secret_key[41]; + } server, client; + } curve; + enum { ZEROMQ_PATTERN_PUBSUB, ZEROMQ_PATTERN_RADIODISH diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index bc750ad5b..4083d8d57 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -91,7 +91,7 @@ int zeromq_parse(struct node *n, config_setting_t *cfg) const char *ep, *type, *filter; - config_setting_t *cfg_pub; + config_setting_t *cfg_pub, *cfg_curve; list_init(&z->publisher.endpoints); @@ -124,6 +124,38 @@ int zeromq_parse(struct node *n, config_setting_t *cfg) } } + cfg_curve = config_setting_lookup(cfg, "curve"); + if (cfg_curve) { + if (!config_setting_is_group(cfg_curve)) + cerror(cfg_curve, "The curve setting must be a group"); + + const char *public_key, *secret_key; + + if (!config_setting_lookup_string(cfg_curve, "public_key", &public_key)) + cerror(cfg_curve, "Setting 'curve.public_key' is missing"); + + if (!config_setting_lookup_string(cfg_curve, "secret_key", &secret_key)) + cerror(cfg_curve, "Setting 'curve.secret_key' is missing"); + + if (!config_setting_lookup_bool(cfg_curve, "enabled", &z->curve.enabled)) + z->curve.enabled = true; + + if (strlen(secret_key) != 40) + cerror(cfg_curve, "Setting 'curve.secret_key' must be a Z85 encoded CurveZMQ key"); + + if (strlen(public_key) != 40) + cerror(cfg_curve, "Setting 'curve.public_key' must be a Z85 encoded CurveZMQ key"); + + strncpy(z->curve.server.public_key, public_key, 41); + strncpy(z->curve.server.secret_key, secret_key, 41); + } + else + z->curve.enabled = false; + + /** @todo We should fix this. Its mostly done. */ + if (z->curve.enabled) + cerror(cfg_curve, "CurveZMQ support is currently broken"); + if (config_setting_lookup_string(cfg, "filter", &filter)) z->filter = strdup(filter); else @@ -156,7 +188,7 @@ char * zeromq_print(struct node *n) case ZEROMQ_PATTERN_RADIODISH: pattern = "radiodish"; break; } - strcatf(&buf, "pattern=%s, subscribe=%s, publish=[ ", pattern, z->subscriber.endpoint); + strcatf(&buf, "pattern=%s, ipv6=%s, crypto=%s, subscribe=%s, publish=[ ", pattern, z->ipv6 ? "yes" : "no", z->curve.enabled ? "yes" : "no", z->subscriber.endpoint); for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { char *ep = list_at(&z->publisher.endpoints, i); @@ -205,9 +237,6 @@ int zeromq_start(struct node *n) if (ret) return ret; - /* Bind subscriber socket */ - if (z->subscriber.endpoint) { - ret = zmq_bind(z->subscriber.socket, z->subscriber.endpoint); ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) return ret; @@ -219,6 +248,38 @@ int zeromq_start(struct node *n) return ret; } + if (z->curve.enabled) { + /* Publisher has server role */ + ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SECRETKEY, z->curve.server.secret_key, 41); + if (ret) + return ret; + + ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_PUBLICKEY, z->curve.server.public_key, 41); + if (ret) + return ret; + + int curve_server = 1; + ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SERVER, &curve_server, sizeof(curve_server)); + if (ret) + return ret; + } + + if (z->curve.enabled) { + /* Create temporary client keys first */ + ret = zmq_curve_keypair(z->curve.client.public_key, z->curve.client.secret_key); + if (ret) + return ret; + + /* Subscriber has client role */ + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SECRETKEY, z->curve.client.secret_key, 41); + if (ret) + return ret; + + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_PUBLICKEY, z->curve.client.public_key, 41); + if (ret) + return ret; + + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SERVERKEY, z->curve.server.public_key, 41); if (ret) return ret; } @@ -236,14 +297,25 @@ int zeromq_start(struct node *n) ret = zmq_connect(server_mon, "inproc://monitor-server"); assert(ret == 0); #endif + + /* Bind subscriber socket */ + if (z->subscriber.endpoint) { + ret = zmq_bind(z->subscriber.socket, z->subscriber.endpoint); + if (ret) { + info("Failed to bind ZeroMQ socket: endpoint=%s, error=%s", z->subscriber.endpoint, zmq_strerror(errno)); + return ret; + } + } /* Connect publisher socket */ for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { char *ep = list_at(&z->publisher.endpoints, i); ret = zmq_connect(z->publisher.socket, ep); - if (ret) + if (ret) { + info("Failed to connect to ZeroMQ endpoint: endpoint=%s, error=%s", ep, zmq_strerror(errno)); return ret; + } } #ifdef ZMQ_BUILD_DRAFT_API