mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
zeromq: adapt to new signal code and separate node-type configuration into in/out sections
This commit is contained in:
parent
7738244136
commit
b9982d41ef
2 changed files with 86 additions and 65 deletions
|
@ -51,7 +51,6 @@ struct sample;
|
|||
struct zeromq {
|
||||
int ipv6;
|
||||
|
||||
char *filter;
|
||||
|
||||
struct format_type *format;
|
||||
struct io io;
|
||||
|
@ -75,12 +74,14 @@ struct zeromq {
|
|||
void *socket; /**< ZeroMQ socket. */
|
||||
void *mon_socket;
|
||||
char *endpoint;
|
||||
} subscriber;
|
||||
char *filter;
|
||||
} in;
|
||||
|
||||
struct {
|
||||
void *socket; /**< ZeroMQ socket. */
|
||||
struct list endpoints;
|
||||
} publisher;
|
||||
char *filter;
|
||||
} out;
|
||||
};
|
||||
|
||||
/** @see node_type::print */
|
||||
|
|
|
@ -79,14 +79,14 @@ int zeromq_reverse(struct node *n)
|
|||
{
|
||||
struct zeromq *z = (struct zeromq *) n->_vd;
|
||||
|
||||
if (list_length(&z->publisher.endpoints) != 1)
|
||||
if (list_length(&z->out.endpoints) != 1)
|
||||
return -1;
|
||||
|
||||
char *subscriber = z->subscriber.endpoint;
|
||||
char *publisher = list_first(&z->publisher.endpoints);
|
||||
char *subscriber = z->in.endpoint;
|
||||
char *publisher = list_first(&z->out.endpoints);
|
||||
|
||||
z->subscriber.endpoint = publisher;
|
||||
list_set(&z->publisher.endpoints, 0, subscriber);
|
||||
z->in.endpoint = publisher;
|
||||
list_set(&z->out.endpoints, 0, subscriber);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -98,7 +98,8 @@ int zeromq_parse(struct node *n, json_t *cfg)
|
|||
int ret;
|
||||
const char *ep = NULL;
|
||||
const char *type = NULL;
|
||||
const char *filter = NULL;
|
||||
const char *in_filter = NULL;
|
||||
const char *out_filter = NULL;
|
||||
const char *format = "villas.human";
|
||||
|
||||
size_t i;
|
||||
|
@ -107,16 +108,19 @@ int zeromq_parse(struct node *n, json_t *cfg)
|
|||
json_t *json_val;
|
||||
json_error_t err;
|
||||
|
||||
list_init(&z->publisher.endpoints);
|
||||
list_init(&z->out.endpoints);
|
||||
|
||||
z->curve.enabled = false;
|
||||
z->ipv6 = 0;
|
||||
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: o, s?: o, s?: s, s?: s, s?: b, s?: s }",
|
||||
"subscribe", &ep,
|
||||
"publish", &json_pub,
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s?: { s?: s, s?: s }, s?: { s?: o, s?: s }, s?: o, s?: s, s?: b, s?: s }",
|
||||
"in",
|
||||
"subscribe", &ep,
|
||||
"filter", &in_filter,
|
||||
"out",
|
||||
"publish", &json_pub,
|
||||
"filter", &out_filter,
|
||||
"curve", &json_curve,
|
||||
"filter", &filter,
|
||||
"pattern", &type,
|
||||
"ipv6", &z->ipv6,
|
||||
"format", &format
|
||||
|
@ -124,8 +128,9 @@ int zeromq_parse(struct node *n, json_t *cfg)
|
|||
if (ret)
|
||||
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
|
||||
|
||||
z->subscriber.endpoint = ep ? strdup(ep) : NULL;
|
||||
z->filter = filter ? strdup(filter) : NULL;
|
||||
z->in.endpoint = ep ? strdup(ep) : NULL;
|
||||
z->in.filter = in_filter ? strdup(in_filter) : NULL;
|
||||
z->out.filter = out_filter ? strdup(out_filter) : NULL;
|
||||
|
||||
z->format = format_type_lookup(format);
|
||||
if (!z->format)
|
||||
|
@ -139,14 +144,14 @@ int zeromq_parse(struct node *n, json_t *cfg)
|
|||
if (!ep)
|
||||
error("All 'publish' settings must be strings");
|
||||
|
||||
list_push(&z->publisher.endpoints, strdup(ep));
|
||||
list_push(&z->out.endpoints, strdup(ep));
|
||||
}
|
||||
break;
|
||||
|
||||
case JSON_STRING:
|
||||
ep = json_string_value(json_pub);
|
||||
|
||||
list_push(&z->publisher.endpoints, strdup(ep));
|
||||
list_push(&z->out.endpoints, strdup(ep));
|
||||
|
||||
break;
|
||||
|
||||
|
@ -210,24 +215,27 @@ char * zeromq_print(struct node *n)
|
|||
#endif
|
||||
}
|
||||
|
||||
strcatf(&buf, "format=%s, pattern=%s, ipv6=%s, crypto=%s, subscribe=%s, publish=[ ",
|
||||
strcatf(&buf, "format=%s, pattern=%s, ipv6=%s, crypto=%s, in.subscribe=%s, out.publish=[ ",
|
||||
format_type_name(z->format),
|
||||
pattern,
|
||||
z->ipv6 ? "yes" : "no",
|
||||
z->curve.enabled ? "yes" : "no",
|
||||
z->subscriber.endpoint
|
||||
z->in.endpoint ? z->in.endpoint : ""
|
||||
);
|
||||
|
||||
for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) {
|
||||
char *ep = (char *) list_at(&z->publisher.endpoints, i);
|
||||
for (size_t i = 0; i < list_length(&z->out.endpoints); i++) {
|
||||
char *ep = (char *) list_at(&z->out.endpoints, i);
|
||||
|
||||
strcatf(&buf, "%s ", ep);
|
||||
}
|
||||
|
||||
strcatf(&buf, "]");
|
||||
|
||||
if (z->filter)
|
||||
strcatf(&buf, ", filter=%s", z->filter);
|
||||
if (z->in.filter)
|
||||
strcatf(&buf, ", in.filter=%s", z->in.filter);
|
||||
|
||||
if (z->out.filter)
|
||||
strcatf(&buf, ", out.filter=%s", z->out.filter);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
@ -249,25 +257,29 @@ int zeromq_start(struct node *n)
|
|||
int ret;
|
||||
struct zeromq *z = (struct zeromq *) n->_vd;
|
||||
|
||||
ret = io_init(&z->io, z->format, n, SAMPLE_HAS_ALL);
|
||||
ret = io_init(&z->io, z->format, &n->signals, SAMPLE_HAS_ALL);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = io_check(&z->io);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
switch (z->pattern) {
|
||||
#ifdef ZMQ_BUILD_DISH
|
||||
case ZEROMQ_PATTERN_RADIODISH:
|
||||
z->subscriber.socket = zmq_socket(context, ZMQ_DISH);
|
||||
z->publisher.socket = zmq_socket(context, ZMQ_RADIO);
|
||||
z->in.socket = zmq_socket(context, ZMQ_DISH);
|
||||
z->out.socket = zmq_socket(context, ZMQ_RADIO);
|
||||
break;
|
||||
#endif
|
||||
|
||||
case ZEROMQ_PATTERN_PUBSUB:
|
||||
z->subscriber.socket = zmq_socket(context, ZMQ_SUB);
|
||||
z->publisher.socket = zmq_socket(context, ZMQ_PUB);
|
||||
z->in.socket = zmq_socket(context, ZMQ_SUB);
|
||||
z->out.socket = zmq_socket(context, ZMQ_PUB);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!z->subscriber.socket || !z->publisher.socket) {
|
||||
if (!z->in.socket || !z->out.socket) {
|
||||
ret = -1;
|
||||
goto fail;
|
||||
}
|
||||
|
@ -276,12 +288,12 @@ int zeromq_start(struct node *n)
|
|||
switch (z->pattern) {
|
||||
#ifdef ZMQ_BUILD_DISH
|
||||
case ZEROMQ_PATTERN_RADIODISH:
|
||||
ret = zmq_join(z->subscriber.socket, z->filter);
|
||||
ret = zmq_join(z->in.socket, z->in.filter);
|
||||
break;
|
||||
#endif
|
||||
|
||||
case ZEROMQ_PATTERN_PUBSUB:
|
||||
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0);
|
||||
ret = zmq_setsockopt(z->in.socket, ZMQ_SUBSCRIBE, z->in.filter, z->in.filter ? strlen(z->in.filter) : 0);
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -291,90 +303,88 @@ int zeromq_start(struct node *n)
|
|||
if (ret < 0)
|
||||
goto fail;
|
||||
|
||||
ret = zmq_setsockopt(z->publisher.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6));
|
||||
ret = zmq_setsockopt(z->out.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6));
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6));
|
||||
ret = zmq_setsockopt(z->in.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6));
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
if (z->curve.enabled) {
|
||||
/* Publisher has server role */
|
||||
ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SECRETKEY, z->curve.server.secret_key, 41);
|
||||
ret = zmq_setsockopt(z->out.socket, ZMQ_CURVE_SECRETKEY, z->curve.server.secret_key, 41);
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_PUBLICKEY, z->curve.server.public_key, 41);
|
||||
ret = zmq_setsockopt(z->out.socket, ZMQ_CURVE_PUBLICKEY, z->curve.server.public_key, 41);
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
int curve_server = 1;
|
||||
ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SERVER, &curve_server, sizeof(curve_server));
|
||||
ret = zmq_setsockopt(z->out.socket, ZMQ_CURVE_SERVER, &curve_server, sizeof(curve_server));
|
||||
if (ret)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (z->curve.enabled) {
|
||||
/* Create temporary client keys first */
|
||||
ret = zmq_curve_keypair(z->curve.client.public_key, z->curve.client.secret_key);
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
/* Subscriber has client role */
|
||||
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SECRETKEY, z->curve.client.secret_key, 41);
|
||||
ret = zmq_setsockopt(z->in.socket, ZMQ_CURVE_SECRETKEY, z->curve.client.secret_key, 41);
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_PUBLICKEY, z->curve.client.public_key, 41);
|
||||
ret = zmq_setsockopt(z->in.socket, ZMQ_CURVE_PUBLICKEY, z->curve.client.public_key, 41);
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SERVERKEY, z->curve.server.public_key, 41);
|
||||
ret = zmq_setsockopt(z->in.socket, ZMQ_CURVE_SERVERKEY, z->curve.server.public_key, 41);
|
||||
if (ret)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
/* Monitor handshake events on the server */
|
||||
ret = zmq_socket_monitor(z->subscriber.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED);
|
||||
ret = zmq_socket_monitor(z->in.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED);
|
||||
if (ret < 0)
|
||||
goto fail;
|
||||
|
||||
/* Create socket for collecting monitor events */
|
||||
z->subscriber.mon_socket = zmq_socket(context, ZMQ_PAIR);
|
||||
if (!z->subscriber.mon_socket) {
|
||||
z->in.mon_socket = zmq_socket(context, ZMQ_PAIR);
|
||||
if (!z->in.mon_socket) {
|
||||
ret = -1;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/* Connect it to the inproc endpoints so they'll get events */
|
||||
ret = zmq_connect(z->subscriber.mon_socket, "inproc://monitor-server");
|
||||
ret = zmq_connect(z->in.mon_socket, "inproc://monitor-server");
|
||||
if (ret < 0)
|
||||
goto fail;
|
||||
#endif
|
||||
|
||||
/* Spawn server for publisher */
|
||||
for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) {
|
||||
char *ep = (char *) list_at(&z->publisher.endpoints, i);
|
||||
for (size_t i = 0; i < list_length(&z->out.endpoints); i++) {
|
||||
char *ep = (char *) list_at(&z->out.endpoints, i);
|
||||
|
||||
ret = zmq_bind(z->publisher.socket, ep);
|
||||
ret = zmq_bind(z->out.socket, ep);
|
||||
if (ret < 0)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/* Connect subscribers to server socket */
|
||||
if (z->subscriber.endpoint) {
|
||||
ret = zmq_connect(z->subscriber.socket, z->subscriber.endpoint);
|
||||
if (z->in.endpoint) {
|
||||
ret = zmq_connect(z->in.socket, z->in.endpoint);
|
||||
if (ret < 0) {
|
||||
info("Failed to bind ZeroMQ socket: endpoint=%s, error=%s", z->subscriber.endpoint, zmq_strerror(errno));
|
||||
info("Failed to bind ZeroMQ socket: endpoint=%s, error=%s", z->in.endpoint, zmq_strerror(errno));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
if (z->curve.enabled) {
|
||||
ret = get_monitor_event(z->subscriber.mon_socket, NULL, NULL);
|
||||
ret = get_monitor_event(z->in.mon_socket, NULL, NULL);
|
||||
return ret == ZMQ_EVENT_HANDSHAKE_SUCCEED;
|
||||
}
|
||||
else
|
||||
|
@ -394,17 +404,21 @@ int zeromq_stop(struct node *n)
|
|||
int ret;
|
||||
struct zeromq *z = (struct zeromq *) n->_vd;
|
||||
|
||||
ret = zmq_close(z->subscriber.socket);
|
||||
ret = zmq_close(z->in.socket);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
ret = zmq_close(z->subscriber.mon_socket);
|
||||
ret = zmq_close(z->in.mon_socket);
|
||||
if (ret)
|
||||
return ret;
|
||||
#endif
|
||||
|
||||
return zmq_close(z->publisher.socket);
|
||||
ret = io_destroy(&z->io);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return zmq_close(z->out.socket);
|
||||
}
|
||||
|
||||
int zeromq_destroy(struct node *n)
|
||||
|
@ -412,7 +426,13 @@ int zeromq_destroy(struct node *n)
|
|||
int ret;
|
||||
struct zeromq *z = (struct zeromq *) n->_vd;
|
||||
|
||||
ret = io_destroy(&z->io);
|
||||
if (z->in.filter)
|
||||
free(z->in.filter);
|
||||
|
||||
if (z->out.filter)
|
||||
free(z->out.filter);
|
||||
|
||||
ret = list_destroy(&z->out.endpoints, NULL, true);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
|
@ -430,11 +450,11 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r
|
|||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
if (z->filter) {
|
||||
if (z->in.filter) {
|
||||
switch (z->pattern) {
|
||||
case ZEROMQ_PATTERN_PUBSUB:
|
||||
/* Discard envelope */
|
||||
zmq_recv(z->subscriber.socket, NULL, 0, 0);
|
||||
zmq_recv(z->in.socket, NULL, 0, 0);
|
||||
break;
|
||||
|
||||
default: { }
|
||||
|
@ -442,7 +462,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r
|
|||
}
|
||||
|
||||
/* Receive payload */
|
||||
ret = zmq_msg_recv(&m, z->subscriber.socket, 0);
|
||||
ret = zmq_msg_recv(&m, z->in.socket, 0);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
|
@ -471,25 +491,25 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *
|
|||
|
||||
ret = zmq_msg_init_size(&m, wbytes);
|
||||
|
||||
if (z->filter) {
|
||||
if (z->out.filter) {
|
||||
switch (z->pattern) {
|
||||
#ifdef ZMQ_BUILD_DISH
|
||||
case ZEROMQ_PATTERN_RADIODISH:
|
||||
ret = zmq_msg_set_group(&m, z->filter);
|
||||
ret = zmq_msg_set_group(&m, z->out.filter);
|
||||
if (ret < 0)
|
||||
goto fail;
|
||||
break;
|
||||
#endif
|
||||
|
||||
case ZEROMQ_PATTERN_PUBSUB: /* Send envelope */
|
||||
zmq_send(z->publisher.socket, z->filter, strlen(z->filter), ZMQ_SNDMORE);
|
||||
zmq_send(z->out.socket, z->out.filter, strlen(z->out.filter), ZMQ_SNDMORE);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
memcpy(zmq_msg_data(&m), data, wbytes);
|
||||
|
||||
ret = zmq_msg_send(&m, z->publisher.socket, 0);
|
||||
ret = zmq_msg_send(&m, z->out.socket, 0);
|
||||
if (ret < 0)
|
||||
goto fail;
|
||||
|
||||
|
@ -513,7 +533,7 @@ int zeromq_fd(struct node *n)
|
|||
int fd;
|
||||
size_t len = sizeof(fd);
|
||||
|
||||
ret = zmq_getsockopt(z->subscriber.socket, ZMQ_FD, &fd, &len);
|
||||
ret = zmq_getsockopt(z->in.socket, ZMQ_FD, &fd, &len);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue