From df94f7fb91eba7cfddbfe106c48f90156883834a Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 20 Aug 2018 18:26:14 +0200 Subject: [PATCH] socket: adapt to new signal code and separate node-type configuration into in/out sections --- include/villas/nodes/socket.h | 2 +- lib/nodes/socket.c | 70 +++++++++++++++++++++-------------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index 184b1e9b8..c538ed71d 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -154,7 +154,7 @@ char * socket_print_addr(struct sockaddr *saddr); * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int socket_parse_addr(const char *str, struct sockaddr *sa, enum socket_layer layer, int flags); +int socket_parse_address(const char *str, struct sockaddr *sa, enum socket_layer layer, int flags); int socket_compare_addr(struct sockaddr *x, struct sockaddr *y); diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 5810a692c..6493df554 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -163,11 +163,11 @@ char * socket_print(struct node *n) inet_ntop(AF_INET, &s->multicast.mreq.imr_multiaddr, group, sizeof(group)); inet_ntop(AF_INET, &s->multicast.mreq.imr_interface, interface, sizeof(interface)); - strcatf(&buf, ", multicast.enabled=%s", s->multicast.enabled ? "yes" : "no"); - strcatf(&buf, ", multicast.loop=%s", s->multicast.loop ? "yes" : "no"); - strcatf(&buf, ", multicast.group=%s", group); - strcatf(&buf, ", multicast.interface=%s", s->multicast.mreq.imr_interface.s_addr == INADDR_ANY ? "any" : interface); - strcatf(&buf, ", multicast.ttl=%u", s->multicast.ttl); + strcatf(&buf, ", in.multicast.enabled=%s", s->multicast.enabled ? "yes" : "no"); + strcatf(&buf, ", in.multicast.loop=%s", s->multicast.loop ? "yes" : "no"); + strcatf(&buf, ", in.multicast.group=%s", group); + strcatf(&buf, ", in.multicast.interface=%s", s->multicast.mreq.imr_interface.s_addr == INADDR_ANY ? "any" : interface); + strcatf(&buf, ", in.multicast.ttl=%u", s->multicast.ttl); } free(local); @@ -176,12 +176,10 @@ char * socket_print(struct node *n) return buf; } -int socket_start(struct node *n) +int socket_check(struct node *n) { struct socket *s = (struct socket *) n->_vd; - int ret; - // TODO: Move to socket_check() ? /* Some checks on the addresses */ if (s->layer != SOCKET_LAYER_UNIX) { if (s->local.sa.sa_family != s->remote.sa.sa_family) @@ -211,8 +209,20 @@ int socket_start(struct node *n) error("Multicast group address of node %s must be within 224.0.0.0/4", node_name(n)); } + return 0; +} + +int socket_start(struct node *n) +{ + struct socket *s = (struct socket *) n->_vd; + int ret; + /* Initialize IO */ - ret = io_init(&s->io, s->format, n, SAMPLE_HAS_ALL); + ret = io_init(&s->io, s->format, &n->signals, SAMPLE_HAS_ALL); + if (ret) + return ret; + + ret = io_check(&s->io); if (ret) return ret; @@ -360,18 +370,17 @@ int socket_stop(struct node *n) return ret; } + ret = io_destroy(&s->io); + if (ret) + return ret; + return 0; } int socket_destroy(struct node *n) { - int ret; struct socket *s = (struct socket *) n->_vd; - ret = io_destroy(&s->io); - if (ret) - return ret; - #ifdef WITH_NETEM rtnl_qdisc_put(s->tc_qdisc); rtnl_cls_put(s->tc_classifier); @@ -502,13 +511,15 @@ retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt); addrlen = sizeof(s->local); } - bytes = sendto(s->sd, buf, wbytes, MSG_DONTWAIT, (struct sockaddr *) &s->remote, addrlen); +retry2: bytes = sendto(s->sd, buf, wbytes, 0, (struct sockaddr *) &s->remote, addrlen); if (bytes < 0) { if ((errno == EPERM) || (errno == ENOENT && s->layer == SOCKET_LAYER_UNIX)) warn("Failed send to node %s: %s", node_name(n), strerror(errno)); - else if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + else if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { warn("socket: send would block"); + goto retry2; + } else serror("Failed send to node %s", node_name(n)); } @@ -542,14 +553,16 @@ int socket_parse(struct node *n, json_t *cfg) s->tc_qdisc = NULL; #endif /* WITH_NETEM */ - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: s, s: s, s?: b, s?: o, s?: o, s?: s }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s: { s: s, s?: o }, s: { s: s, s?: b, s?: o } }", "layer", &layer, - "remote", &remote, - "local", &local, - "verify_source", &s->verify_source, - "multicast", &json_multicast, - "netem", &json_netem, - "format", &format + "format", &format, + "out", + "address", &remote, + "netem", &json_netem, + "in", + "address", &local, + "verify_source", &s->verify_source, + "multicast", &json_multicast ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); @@ -575,13 +588,13 @@ int socket_parse(struct node *n, json_t *cfg) error("Invalid layer '%s' for node %s", layer, node_name(n)); } - ret = socket_parse_addr(remote, (struct sockaddr *) &s->remote, s->layer, 0); + ret = socket_parse_address(remote, (struct sockaddr *) &s->remote, s->layer, 0); if (ret) { error("Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), gai_strerror(ret)); } - ret = socket_parse_addr(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE); + ret = socket_parse_address(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE); if (ret) { error("Failed to resolve local address '%s' of node %s: %s", local, node_name(n), gai_strerror(ret)); @@ -693,7 +706,7 @@ char * socket_print_addr(struct sockaddr *saddr) return buf; } -int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_layer layer, int flags) +int socket_parse_address(const char *addr, struct sockaddr *saddr, enum socket_layer layer, int flags) { /** @todo: Add support for IPv6 */ union sockaddr_union *sa = (union sockaddr_union *) saddr; @@ -704,10 +717,10 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye if (layer == SOCKET_LAYER_UNIX) { /* Format: "/path/to/socket" */ sa->sun.sun_family = AF_UNIX; - if (strlen(addr) > sizeof(sa->sun.sun_path)-1) + if (strlen(addr) > sizeof(sa->sun.sun_path) - 1) error("Length of unix socket path is too long!"); - memcpy(sa->sun.sun_path, addr, strlen(sa->sun.sun_path)+1); + memcpy(sa->sun.sun_path, addr, strlen(addr) + 1); ret = 0; } @@ -859,6 +872,7 @@ static struct plugin p = { .reverse = socket_reverse, .parse = socket_parse, .print = socket_print, + .check = socket_check, .start = socket_start, .stop = socket_stop, .read = socket_read,