1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

socket: adapt to new signal code and separate node-type configuration into in/out sections

This commit is contained in:
Steffen Vogel 2018-08-20 18:26:14 +02:00
parent b9982d41ef
commit df94f7fb91
2 changed files with 43 additions and 29 deletions

View file

@ -154,7 +154,7 @@ char * socket_print_addr(struct sockaddr *saddr);
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int socket_parse_addr(const char *str, struct sockaddr *sa, enum socket_layer layer, int flags);
int socket_parse_address(const char *str, struct sockaddr *sa, enum socket_layer layer, int flags);
int socket_compare_addr(struct sockaddr *x, struct sockaddr *y);

View file

@ -163,11 +163,11 @@ char * socket_print(struct node *n)
inet_ntop(AF_INET, &s->multicast.mreq.imr_multiaddr, group, sizeof(group));
inet_ntop(AF_INET, &s->multicast.mreq.imr_interface, interface, sizeof(interface));
strcatf(&buf, ", multicast.enabled=%s", s->multicast.enabled ? "yes" : "no");
strcatf(&buf, ", multicast.loop=%s", s->multicast.loop ? "yes" : "no");
strcatf(&buf, ", multicast.group=%s", group);
strcatf(&buf, ", multicast.interface=%s", s->multicast.mreq.imr_interface.s_addr == INADDR_ANY ? "any" : interface);
strcatf(&buf, ", multicast.ttl=%u", s->multicast.ttl);
strcatf(&buf, ", in.multicast.enabled=%s", s->multicast.enabled ? "yes" : "no");
strcatf(&buf, ", in.multicast.loop=%s", s->multicast.loop ? "yes" : "no");
strcatf(&buf, ", in.multicast.group=%s", group);
strcatf(&buf, ", in.multicast.interface=%s", s->multicast.mreq.imr_interface.s_addr == INADDR_ANY ? "any" : interface);
strcatf(&buf, ", in.multicast.ttl=%u", s->multicast.ttl);
}
free(local);
@ -176,12 +176,10 @@ char * socket_print(struct node *n)
return buf;
}
int socket_start(struct node *n)
int socket_check(struct node *n)
{
struct socket *s = (struct socket *) n->_vd;
int ret;
// TODO: Move to socket_check() ?
/* Some checks on the addresses */
if (s->layer != SOCKET_LAYER_UNIX) {
if (s->local.sa.sa_family != s->remote.sa.sa_family)
@ -211,8 +209,20 @@ int socket_start(struct node *n)
error("Multicast group address of node %s must be within 224.0.0.0/4", node_name(n));
}
return 0;
}
int socket_start(struct node *n)
{
struct socket *s = (struct socket *) n->_vd;
int ret;
/* Initialize IO */
ret = io_init(&s->io, s->format, n, SAMPLE_HAS_ALL);
ret = io_init(&s->io, s->format, &n->signals, SAMPLE_HAS_ALL);
if (ret)
return ret;
ret = io_check(&s->io);
if (ret)
return ret;
@ -360,18 +370,17 @@ int socket_stop(struct node *n)
return ret;
}
ret = io_destroy(&s->io);
if (ret)
return ret;
return 0;
}
int socket_destroy(struct node *n)
{
int ret;
struct socket *s = (struct socket *) n->_vd;
ret = io_destroy(&s->io);
if (ret)
return ret;
#ifdef WITH_NETEM
rtnl_qdisc_put(s->tc_qdisc);
rtnl_cls_put(s->tc_classifier);
@ -502,13 +511,15 @@ retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt);
addrlen = sizeof(s->local);
}
bytes = sendto(s->sd, buf, wbytes, MSG_DONTWAIT, (struct sockaddr *) &s->remote, addrlen);
retry2: bytes = sendto(s->sd, buf, wbytes, 0, (struct sockaddr *) &s->remote, addrlen);
if (bytes < 0) {
if ((errno == EPERM) ||
(errno == ENOENT && s->layer == SOCKET_LAYER_UNIX))
warn("Failed send to node %s: %s", node_name(n), strerror(errno));
else if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
else if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
warn("socket: send would block");
goto retry2;
}
else
serror("Failed send to node %s", node_name(n));
}
@ -542,14 +553,16 @@ int socket_parse(struct node *n, json_t *cfg)
s->tc_qdisc = NULL;
#endif /* WITH_NETEM */
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: s, s: s, s?: b, s?: o, s?: o, s?: s }",
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s: { s: s, s?: o }, s: { s: s, s?: b, s?: o } }",
"layer", &layer,
"remote", &remote,
"local", &local,
"verify_source", &s->verify_source,
"multicast", &json_multicast,
"netem", &json_netem,
"format", &format
"format", &format,
"out",
"address", &remote,
"netem", &json_netem,
"in",
"address", &local,
"verify_source", &s->verify_source,
"multicast", &json_multicast
);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
@ -575,13 +588,13 @@ int socket_parse(struct node *n, json_t *cfg)
error("Invalid layer '%s' for node %s", layer, node_name(n));
}
ret = socket_parse_addr(remote, (struct sockaddr *) &s->remote, s->layer, 0);
ret = socket_parse_address(remote, (struct sockaddr *) &s->remote, s->layer, 0);
if (ret) {
error("Failed to resolve remote address '%s' of node %s: %s",
remote, node_name(n), gai_strerror(ret));
}
ret = socket_parse_addr(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE);
ret = socket_parse_address(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE);
if (ret) {
error("Failed to resolve local address '%s' of node %s: %s",
local, node_name(n), gai_strerror(ret));
@ -693,7 +706,7 @@ char * socket_print_addr(struct sockaddr *saddr)
return buf;
}
int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_layer layer, int flags)
int socket_parse_address(const char *addr, struct sockaddr *saddr, enum socket_layer layer, int flags)
{
/** @todo: Add support for IPv6 */
union sockaddr_union *sa = (union sockaddr_union *) saddr;
@ -704,10 +717,10 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye
if (layer == SOCKET_LAYER_UNIX) { /* Format: "/path/to/socket" */
sa->sun.sun_family = AF_UNIX;
if (strlen(addr) > sizeof(sa->sun.sun_path)-1)
if (strlen(addr) > sizeof(sa->sun.sun_path) - 1)
error("Length of unix socket path is too long!");
memcpy(sa->sun.sun_path, addr, strlen(sa->sun.sun_path)+1);
memcpy(sa->sun.sun_path, addr, strlen(addr) + 1);
ret = 0;
}
@ -859,6 +872,7 @@ static struct plugin p = {
.reverse = socket_reverse,
.parse = socket_parse,
.print = socket_print,
.check = socket_check,
.start = socket_start,
.stop = socket_stop,
.read = socket_read,