1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

zeromq: some code cleanup

This commit is contained in:
Steffen Vogel 2017-05-23 11:13:41 +02:00
parent 890d5e2497
commit df028c3b28
2 changed files with 43 additions and 29 deletions

View file

@ -54,6 +54,7 @@ struct zeromq {
struct {
void *socket; /**< ZeroMQ socket. */
void *mon_socket;
char *endpoint;
} subscriber;

View file

@ -43,25 +43,25 @@ static int get_monitor_event(void *monitor, int *value, char **address)
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; /* Interruped, presumably. */
assert (zmq_msg_more (&msg));
assert(zmq_msg_more (&msg));
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint8_t *data = (uint8_t *) zmq_msg_data(&msg);
uint16_t event = *(uint16_t *) (data);
if (value)
*value = *(uint32_t *) (data + 2);
/* Second frame in message contains event address */
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, monitor, 0) == -1)
return -1; /* Interruped, presumably. */
assert (!zmq_msg_more (&msg));
assert(!zmq_msg_more(&msg));
if (address) {
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
size_t size = zmq_msg_size (&msg);
*address = (char *) malloc (size + 1);
memcpy (*address, data, size);
uint8_t *data = (uint8_t *) zmq_msg_data(&msg);
size_t size = zmq_msg_size(&msg);
*address = (char *) malloc(size + 1);
memcpy(*address, data, size);
*address [size] = 0;
}
@ -233,17 +233,22 @@ int zeromq_start(struct node *n)
break;
}
if (!z->subscriber.socket || !z->publisher.socket) {
ret = -1;
goto fail;
}
ret = zmq_setsockopt(z->publisher.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6));
if (ret)
return ret;
goto fail;
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6));
if (ret)
return ret;
/* Subscribe to pubsub messages. */
if (z->filter && z->pattern == ZEROMQ_PATTERN_PUBSUB) {
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, strlen(z->filter));
if (z->pattern == ZEROMQ_PATTERN_PUBSUB) {
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0);
if (ret < 0)
return ret;
}
@ -252,50 +257,55 @@ int zeromq_start(struct node *n)
/* Publisher has server role */
ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SECRETKEY, z->curve.server.secret_key, 41);
if (ret)
return ret;
goto fail;
ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_PUBLICKEY, z->curve.server.public_key, 41);
if (ret)
return ret;
goto fail;
int curve_server = 1;
ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SERVER, &curve_server, sizeof(curve_server));
if (ret)
return ret;
goto fail;
}
if (z->curve.enabled) {
/* Create temporary client keys first */
ret = zmq_curve_keypair(z->curve.client.public_key, z->curve.client.secret_key);
if (ret)
return ret;
goto fail;
/* Subscriber has client role */
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SECRETKEY, z->curve.client.secret_key, 41);
if (ret)
return ret;
goto fail;
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_PUBLICKEY, z->curve.client.public_key, 41);
if (ret)
return ret;
goto fail;
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SERVERKEY, z->curve.server.public_key, 41);
if (ret)
return ret;
goto fail;
}
#ifdef ZMQ_BUILD_DRAFT_API
/* Monitor handshake events on the server */
ret = zmq_socket_monitor(z->subscriber.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED);
assert(ret == 0);
if (ret < 0)
goto fail;
/* Create socket for collecting monitor events */
void *server_mon = zmq_socket(context, ZMQ_PAIR);
assert(server_mon);
z->subscriber.mon_socket = zmq_socket(context, ZMQ_PAIR);
if (!z->subscriber.mon_socket) {
ret = -1;
goto fail;
}
/* Connect it to the inproc endpoints so they'll get events */
ret = zmq_connect(server_mon, "inproc://monitor-server");
assert(ret == 0);
ret = zmq_connect(z->subscriber.mon_socket, "inproc://monitor-server");
if (ret < 0)
goto fail;
#endif
/* Bind subscriber socket */
@ -312,18 +322,21 @@ int zeromq_start(struct node *n)
char *ep = list_at(&z->publisher.endpoints, i);
ret = zmq_connect(z->publisher.socket, ep);
if (ret) {
info("Failed to connect to ZeroMQ endpoint: endpoint=%s, error=%s", ep, zmq_strerror(errno));
return ret;
}
if (ret)
goto fail;
}
#ifdef ZMQ_BUILD_DRAFT_API
ret = get_monitor_event(server_mon, NULL, NULL);
ret = get_monitor_event(z->subscriber.mon_socket, NULL, NULL);
return ret == ZMQ_EVENT_HANDSHAKE_SUCCEED;
#else
return 0;
#endif
fail:
info("Failed to start ZeroMQ node: %s, error=%s", node_name(n), zmq_strerror(errno));
return ret;
}
int zeromq_stop(struct node *n)