diff --git a/include/villas/kernel/if.h b/include/villas/kernel/if.h index 31829a215..ab46e37d5 100644 --- a/include/villas/kernel/if.h +++ b/include/villas/kernel/if.h @@ -57,7 +57,7 @@ struct interface { char irqs[IF_IRQ_MAX]; /**< List of IRQs of the NIC. */ int affinity; /**< IRQ / Core Affinity of this interface. */ - struct vlist sockets; /**< Linked list of associated sockets. */ + struct vlist nodes; /**< Linked list of nodes which use this interface. */ }; /** Add a new interface to the global list and lookup name, irqs... @@ -99,6 +99,11 @@ int if_start(struct interface *i); */ int if_stop(struct interface *i); +/** Find existing or create new interface instance on which packets for a certain destination + * will leave the system. + */ +struct interface * if_get_egress(struct sockaddr *sa, struct vlist *interfaces); + /** Lookup routing tables to get the interface on which packets for a certain destination * will leave the system. * @@ -107,7 +112,7 @@ int if_stop(struct interface *i); * @retval 0 Success. Everything went well. * @retval <0 Error. Something went wrong. */ -int if_get_egress(struct sockaddr *sa, struct rtnl_link **link); +struct rtnl_link * if_get_egress_link(struct sockaddr *sa); /** Get all IRQs for this interface. * diff --git a/include/villas/node.h b/include/villas/node.h index 10eaa15cf..57492c34b 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -42,6 +42,12 @@ extern "C" { #endif +/* Forward declarations */ +#ifdef __linux__ + struct rtnl_qdisc; + struct rtnl_cls; +#endif /* __linux__ */ + struct node_direction { int enabled; int builtin; /**< This node should use built-in hooks by default. */ @@ -75,6 +81,13 @@ struct node enum state state; +#ifdef __linux__ + int mark; /**< Socket mark for netem, routing and filtering */ + + struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */ + struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */ +#endif /* __linux__ */ + struct node_type *_vt; /**< Virtual functions (C++ OOP style) */ void *_vd; /**< Virtual data (used by struct node::_vt functions) */ @@ -179,6 +192,8 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re int node_poll_fds(struct node *n, int fds[]); +int node_netem_fds(struct node *n, int fds[]); + struct node_type * node_type(struct node *n); struct memory_type * node_memory_type(struct node *n, struct memory_type *parent); diff --git a/include/villas/node_type.h b/include/villas/node_type.h index 3fa79dea2..64a847bf2 100644 --- a/include/villas/node_type.h +++ b/include/villas/node_type.h @@ -215,6 +215,13 @@ struct node_type { */ int (*poll_fds)(struct node *n, int fds[]); + /** Get list of socket file descriptors for configuring network emulation. + * + * This callback is optional. + * @return The number of file descriptors which have been put into \p sds. + */ + int (*netem_fds)(struct node *n, int sds[]); + /** Return a memory allocator which should be used for sample pools passed to this node. */ struct memory_type * (*memory_type)(struct node *n, struct memory_type *parent); }; diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index be2cade7a..d39371429 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -51,15 +51,15 @@ struct format_type; struct rtp { struct rtp_sock *rs; /**< RTP socket */ - struct sa local_rtp; /**< Local address of the RTP socket */ - struct sa local_rtcp; /**< Local address of the RTCP socket */ - struct sa remote_rtp; /**< Remote address of the RTP socket */ - struct sa remote_rtcp; /**< Remote address of the RTCP socket */ + struct { + struct sa saddr_rtp; /**< Local/Remote address of the RTP socket */ + struct sa saddr_rtcp; /**< Local/Remote address of the RTCP socket */ + } in, out; struct format_type *format; struct io io; - double rate; /**< Sample rate of source */ + double rate; /**< Sample rate of source */ struct { int enabled; diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index e25201042..0df901731 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -48,7 +48,6 @@ #include #endif /* LIBNL3_ROUTE_FOUND */ - #ifdef __cplusplus extern "C" { #endif @@ -79,14 +78,10 @@ union sockaddr_union { struct socket { int sd; /**< The socket descriptor */ - int mark; /**< Socket mark for netem, routing and filtering */ int verify_source; /**< Verify the source address of incoming packets against socket::remote. */ enum socket_layer layer; /**< The OSI / IP layer which should be used for this socket */ - union sockaddr_union local; /**< Local address of the socket */ - union sockaddr_union remote; /**< Remote address of the socket */ - struct format_type *format; struct io io; @@ -98,14 +93,10 @@ struct socket { struct ip_mreq mreq; /**< A multicast group to join. */ } multicast; -#ifdef WITH_NETEM - struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */ - struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */ -#endif /* WITH_NETEM */ - struct { char *buf; /**< Buffer for receiving messages */ size_t buflen; + union sockaddr_union saddr; /**< Remote address of the socket */ } in, out; }; diff --git a/lib/kernel/if.c b/lib/kernel/if.c index 75720c8fc..a3a2d6fd9 100644 --- a/lib/kernel/if.c +++ b/lib/kernel/if.c @@ -50,7 +50,7 @@ int if_init(struct interface *i, struct rtnl_link *link) else warning("Did not found any interrupts for interface '%s'", if_name(i)); - vlist_init(&i->sockets); + vlist_init(&i->nodes); return 0; } @@ -58,7 +58,7 @@ int if_init(struct interface *i, struct rtnl_link *link) int if_destroy(struct interface *i) { /* List members are freed by the nodes they belong to. */ - vlist_destroy(&i->sockets, NULL, false); + vlist_destroy(&i->nodes, NULL, false); rtnl_qdisc_put(i->tc_qdisc); @@ -69,51 +69,49 @@ int if_destroy(struct interface *i) int if_start(struct interface *i) { - info("Starting interface '%s' which is used by %zu sockets", rtnl_link_get_name(i->nl_link), vlist_length(&i->sockets)); + info("Starting interface '%s' which is used by %zu nodes", if_name(i), vlist_length(&i->nodes)); - { - /* Set affinity for network interfaces (skip _loopback_ dev) */ - //if_set_affinity(i, i->affinity); + /* Set affinity for network interfaces (skip _loopback_ dev) */ + //if_set_affinity(i, i->affinity); - /* Assign fwmark's to socket nodes which have netem options */ - int ret, mark = 0; - for (size_t j = 0; j < vlist_length(&i->sockets); j++) { - struct socket *s = (struct socket *) vlist_at(&i->sockets, j); + /* Assign fwmark's to nodes which have netem options */ + int ret, mark = 0; + for (size_t j = 0; j < vlist_length(&i->nodes); j++) { + struct node *n = (struct node *) vlist_at(&i->nodes, j); - if (s->tc_qdisc) - s->mark = 1 + mark++; - } + if (n->tc_qdisc) + n->mark = 1 + mark++; + } - /* Abort if no node is using netem */ - if (mark == 0) - return 0; + /* Abort if no node is using netem */ + if (mark == 0) + return 0; - if (getuid() != 0) - error("Network emulation requires super-user privileges!"); + if (getuid() != 0) + error("Network emulation requires super-user privileges!"); - /* Replace root qdisc */ - ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, mark); - if (ret) - error("Failed to setup priority queuing discipline: %s", nl_geterror(ret)); + /* Replace root qdisc */ + ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, mark); + if (ret) + error("Failed to setup priority queuing discipline: %s", nl_geterror(ret)); - /* Create netem qdisks and appropriate filter per netem node */ - for (size_t j = 0; j < vlist_length(&i->sockets); j++) { - struct socket *s = (struct socket *) vlist_at(&i->sockets, j); + /* Create netem qdisks and appropriate filter per netem node */ + for (size_t j = 0; j < vlist_length(&i->nodes); j++) { + struct node *n = (struct node *) vlist_at(&i->nodes, j); - if (s->tc_qdisc) { - ret = tc_mark(i, &s->tc_classifier, TC_HANDLE(1, s->mark), s->mark); - if (ret) - error("Failed to setup FW mark classifier: %s", nl_geterror(ret)); + if (n->tc_qdisc) { + ret = tc_mark(i, &n->tc_classifier, TC_HANDLE(1, n->mark), n->mark); + if (ret) + error("Failed to setup FW mark classifier: %s", nl_geterror(ret)); - char *buf = tc_netem_print(s->tc_qdisc); - debug(LOG_IF | 5, "Starting network emulation on interface '%s' for FW mark %u: %s", - rtnl_link_get_name(i->nl_link), s->mark, buf); - free(buf); + char *buf = tc_netem_print(n->tc_qdisc); + debug(LOG_IF | 5, "Starting network emulation on interface '%s' for FW mark %u: %s", + if_name(i), n->mark, buf); + free(buf); - ret = tc_netem(i, &s->tc_qdisc, TC_HANDLE(0x1000+s->mark, 0), TC_HANDLE(1, s->mark)); - if (ret) - error("Failed to setup netem qdisc: %s", nl_geterror(ret)); - } + ret = tc_netem(i, &n->tc_qdisc, TC_HANDLE(0x1000+n->mark, 0), TC_HANDLE(1, n->mark)); + if (ret) + error("Failed to setup netem qdisc: %s", nl_geterror(ret)); } } @@ -132,12 +130,46 @@ int if_stop(struct interface *i) return 0; } -int if_get_egress(struct sockaddr *sa, struct rtnl_link **link) const char * if_name(struct interface *i) { return rtnl_link_get_name(i->nl_link); } +struct interface * if_get_egress(struct sockaddr *sa, struct vlist *interfaces) +{ + int ret; + struct rtnl_link *link; + + /* Determine outgoing interface */ + link = if_get_egress_link(sa); + if (!link) { + char *buf = socket_print_addr(sa); + error("Failed to get interface for socket address '%s'", buf); + free(buf); + + return NULL; + } + + /* Search of existing interface with correct ifindex */ + struct interface *i; + for (size_t k = 0; k < vlist_length(interfaces); k++) { + i = (struct interface *) vlist_at(interfaces, k); + + if (rtnl_link_get_ifindex(i->nl_link) == rtnl_link_get_ifindex(link)) + return i; + } + + /* If not found, create a new interface */ + i = alloc(sizeof(struct interface)); + + ret = if_init(i, link); + if (ret) + NULL; + + return i; +} + +struct rtnl_link * if_get_egress_link(struct sockaddr *sa) { int ifindex = -1; @@ -166,11 +198,8 @@ const char * if_name(struct interface *i) } struct nl_cache *cache = nl_cache_mngt_require("route/link"); - *link = rtnl_link_get(cache, ifindex); - if (!*link) - return -1; - return 0; + return rtnl_link_get(cache, ifindex); } int if_get_irqs(struct interface *i) diff --git a/lib/node.c b/lib/node.c index 6dc775f00..e47649c1b 100644 --- a/lib/node.c +++ b/lib/node.c @@ -175,6 +175,11 @@ int node_init(struct node *n, struct node_type *vt) n->_name = NULL; n->_name_long = NULL; +#ifdef WITH_NETEM + s->tc_qdisc = NULL; + s->tc_classifier = NULL; +#endif /* WITH_NETEM */ + n->signals.state = STATE_DESTROYED; vlist_init(&n->signals); @@ -222,15 +227,18 @@ int node_parse(struct node *n, json_t *json, const char *name) json_error_t err; json_t *json_signals = NULL; + json_t *json_netem = NULL; const char *type; n->name = strdup(name); - ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o } }", + ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o }, s?: { s?: o } }", "type", &type, "in", - "signals", &json_signals + "signals", &json_signals, + "out", + "netem", &json_netem ); if (ret) jerror(&err, "Failed to parse node %s", node_name(n)); @@ -238,6 +246,21 @@ int node_parse(struct node *n, json_t *json, const char *name) nt = node_type_lookup(type); assert(nt == node_type(n)); + if (json_netem) { +#ifdef WITH_NETEM + int enabled = 1; + + ret = json_unpack_ex(json_netem, &err, 0, "{ s?: b }", "enabled", &enabled); + if (ret) + jerror(&err, "Failed to parse setting 'netem' of node %s", node_name(n)); + + if (enabled) + tc_netem_parse(&s->tc_qdisc, json_netem); + else + s->tc_qdisc = NULL; +#endif /* WITH_NETEM */ + } + if (nt->flags & NODE_TYPE_PROVIDES_SIGNALS) { if (json_signals) error("Node %s does not support signal definitions", node_name(n)); @@ -354,6 +377,24 @@ int node_start(struct node *n) if (ret) return ret; +#ifdef __linux__ + /* Set fwmark for outgoing packets if netem is enabled for this node */ + if (n->mark) { + int fds[16]; + int num_sds = node_netem_fds(n, fds); + + for (int i = 0; i < num_sds; i++) { + int fd = fds[i]; + + ret = setsockopt(fd, SOL_SOCKET, SO_MARK, &n->mark, sizeof(n->mark)); + if (ret) + serror("Failed to set FW mark for outgoing packets"); + else + debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", fd, n->mark); + } + } +#endif /* __linux__ */ + n->state = STATE_STARTED; n->sequence = 0; @@ -482,6 +523,11 @@ int node_destroy(struct node *n) if (n->name) free(n->name); +#ifdef WITH_NETEM + rtnl_qdisc_put(n->tc_qdisc); + rtnl_cls_put(n->tc_classifier); +#endif /* WITH_NETEM */ + n->state = STATE_DESTROYED; return 0; @@ -613,8 +659,10 @@ int node_poll_fds(struct node *n, int fds[]) { return node_type(n)->poll_fds ? node_type(n)->poll_fds(n, fds) : -1; } + +int node_netem_fds(struct node *n, int fds[]) { - return node_type(n)->fd ? node_type(n)->fd(n) : -1; + return node_type(n)->netem_fds ? node_type(n)->netem_fds(n, fds) : -1; } struct node_type * node_type(struct node *n) diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 28610edd8..e69b2ad12 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -32,6 +32,7 @@ #include #include #include +#include #undef ALIGN_MASK #include @@ -42,9 +43,13 @@ #include #include #include +#include static pthread_t re_pthread; +/* Forward declartions */ +static struct plugin p; + static int rtp_set_rate(struct node *n, double rate) { struct rtp *r = (struct rtp *) n->_vd; @@ -92,13 +97,13 @@ int rtp_reverse(struct node *n) struct rtp *r = (struct rtp *) n->_vd; struct sa tmp; - tmp = r->local_rtp; - r->local_rtp = r->remote_rtp; - r->remote_rtp = tmp; + tmp = r->in.saddr_rtp; + r->in.saddr_rtp = r->out.saddr_rtp; + r->out.saddr_rtp = tmp; - tmp = r->local_rtcp; - r->local_rtcp = r->remote_rtcp; - r->remote_rtcp = tmp; + tmp = r->in.saddr_rtcp; + r->in.saddr_rtcp = r->out.saddr_rtcp; + r->out.saddr_rtcp = tmp; return 0; } @@ -177,30 +182,30 @@ int rtp_parse(struct node *n, json_t *cfg) error("Invalid format '%s' for node %s", format, node_name(n)); /* Remote address */ - ret = sa_decode(&r->remote_rtp, remote, strlen(remote)); + ret = sa_decode(&r->out.saddr_rtp, remote, strlen(remote)); if (ret) { error("Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), strerror(ret)); } /* Assign even port number to RTP socket, next odd number to RTCP socket */ - port = sa_port(&r->remote_rtp) & ~1; - sa_set_sa(&r->remote_rtcp, &r->remote_rtp.u.sa); - sa_set_port(&r->remote_rtp, port); - sa_set_port(&r->remote_rtcp, port+1); + port = sa_port(&r->out.saddr_rtp) & ~1; + sa_set_sa(&r->out.saddr_rtcp, &r->out.saddr_rtp.u.sa); + sa_set_port(&r->out.saddr_rtp, port); + sa_set_port(&r->out.saddr_rtcp, port+1); /* Local address */ - ret = sa_decode(&r->local_rtp, local, strlen(local)); + ret = sa_decode(&r->in.saddr_rtp, local, strlen(local)); if (ret) { error("Failed to resolve local address '%s' of node %s: %s", local, node_name(n), strerror(ret)); } /* Assign even port number to RTP socket, next odd number to RTCP socket */ - port = sa_port(&r->local_rtp) & ~1; - sa_set_sa(&r->local_rtcp, &r->local_rtp.u.sa); - sa_set_port(&r->local_rtp, port); - sa_set_port(&r->local_rtcp, port+1); + port = sa_port(&r->in.saddr_rtp) & ~1; + sa_set_sa(&r->in.saddr_rtcp, &r->in.saddr_rtp.u.sa); + sa_set_port(&r->in.saddr_rtp, port); + sa_set_port(&r->in.saddr_rtcp, port+1); /** @todo parse * in addresses */ @@ -212,8 +217,8 @@ char * rtp_print(struct node *n) struct rtp *r = (struct rtp *) n->_vd; char *buf; - char *local = socket_print_addr((struct sockaddr *) &r->local_rtp.u); - char *remote = socket_print_addr((struct sockaddr *) &r->remote_rtp.u); + char *local = socket_print_addr((struct sockaddr *) &r->in.saddr_rtp.u); + char *remote = socket_print_addr((struct sockaddr *) &r->out.saddr_rtp.u); buf = strf("format=%s, in.address=%s, out.address=%s, rtcp.enabled=%s", format_type_name(r->format), @@ -319,11 +324,11 @@ int rtp_start(struct node *n) vlist_push(&n->out.hooks, r->rtcp.throttle_hook); /* Initialize RTP socket */ - uint16_t port = sa_port(&r->local_rtp) & ~1; - ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local_rtp, port, port+1, r->rtcp.enabled, rtp_handler, rtcp_handler, n); + uint16_t port = sa_port(&r->in.saddr_rtp) & ~1; + ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->in.saddr_rtp, port, port+1, r->rtcp.enabled, rtp_handler, rtcp_handler, n); /* Start RTCP session */ - rtcp_start(r->rs, node_name(n), &r->remote_rtcp); + rtcp_start(r->rs, node_name(n), &r->out.saddr_rtcp); return ret; } @@ -357,7 +362,7 @@ static void stop_handler(int sig, siginfo_t *si, void *ctx) re_cancel(); } -int rtp_type_start() +int rtp_type_start(struct super_node *sn) { int ret; @@ -383,6 +388,22 @@ int rtp_type_start() if (ret) return ret; +#ifdef WITH_NETEM + struct vlist *interfaces = super_node_get_interfaces(sn); + + /* Gather list of used network interfaces */ + for (size_t i = 0; i < vlist_length(&p.node.instances); i++) { + struct node *n = (struct node *) vlist_at(&p.node.instances, i); + struct rtp *r = (struct rtp *) n->_vd; + struct interface *i = if_get_egress(&r->out.saddr_rtp.u.sa, interfaces); + + if (!i) + error("Failed to find egress interface for node: %s", node_name(n)); + + vlist_push(&i->nodes, n); + } +#endif /* WITH_NETEM */ + return ret; } @@ -493,7 +514,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt); mbuf_set_pos(mb, 12); /* Send dataset */ - ret = rtp_send(r->rs, &r->remote_rtp, false, false, 21, (uint32_t) time(NULL), mb); + ret = rtp_send(r->rs, &r->out.saddr_rtp, false, false, 21, (uint32_t) time(NULL), mb); if (ret) { warning("Error from rtp_send, reason: %d", ret); cnt = ret; @@ -505,9 +526,38 @@ out1: free(buf); return cnt; } +int rtp_poll_fds(struct node *n, int fds[]) +{ + struct rtp *r = (struct rtp *) n->_vd; + + fds[0] = queue_signalled_fd(&r->recv_queue); + + return 1; +} + +int rtp_netem_fds(struct node *n, int fds[]) +{ + struct rtp *r = (struct rtp *) n->_vd; + + int m = 0; + struct udp_sock *rtp = (struct udp_sock *) rtp_sock(r->rs); + struct udp_sock *rtcp = (struct udp_sock *) rtcp_sock(r->rs); + + fds[m++] = udp_sock_fd(rtp, AF_INET); + + if (r->rtcp.enabled) + fds[m++] = udp_sock_fd(rtcp, AF_INET); + + return m; +} + static struct plugin p = { .name = "rtp", +#ifdef WITH_NETEM + .description = "real-time transport protocol (libre, libnl3 netem support)", +#else .description = "real-time transport protocol (libre)", +#endif .type = PLUGIN_TYPE_NODE, .node = { .vectorize = 0, @@ -521,6 +571,8 @@ static struct plugin p = { .stop = rtp_stop, .read = rtp_read, .write = rtp_write, + .poll_fds = rtp_poll_fds, + .netem_fds = rtp_netem_fds } }; diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 8c2350705..21e60bfab 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -36,92 +36,33 @@ #include #include #include +#include #ifdef WITH_SOCKET_LAYER_ETH #include #endif /* WITH_SOCKET_LAYER_ETH */ -#ifdef WITH_NETEM - #include - #include - #include -#endif /* WITH_NETEM */ - /* Forward declartions */ static struct plugin p; -/* Private static storage */ -struct vlist interfaces = { .state = STATE_DESTROYED }; - int socket_type_start(struct super_node *sn) { #ifdef WITH_NETEM - int ret; - - nl_init(); /* Fill link cache */ - vlist_init(&interfaces); + struct vlist *interfaces = super_node_get_interfaces(sn); /* Gather list of used network interfaces */ for (size_t i = 0; i < vlist_length(&p.node.instances); i++) { struct node *n = (struct node *) vlist_at(&p.node.instances, i); struct socket *s = (struct socket *) n->_vd; - struct rtnl_link *link; - if (s->layer != SOCKET_LAYER_ETH && - s->layer != SOCKET_LAYER_IP && - s->layer != SOCKET_LAYER_UDP) - continue; - - /* Determine outgoing interface */ - ret = if_get_egress((struct sockaddr *) &s->remote, &link); - if (ret) { - char *buf = socket_print_addr((struct sockaddr *) &s->remote); - error("Failed to get interface for socket address '%s'", buf); - free(buf); - } - - /* Search of existing interface with correct ifindex */ - struct interface *i; - - for (size_t k = 0; k < vlist_length(&interfaces); k++) { - i = (struct interface *) vlist_at(&interfaces, k); - - if (rtnl_link_get_ifindex(i->nl_link) == rtnl_link_get_ifindex(link)) - goto found; - } - - /* If not found, create a new interface */ - i = alloc(sizeof(struct interface)); - - ret = if_init(i, link); - if (ret) + if (s->layer == SOCKET_LAYER_UNIX) continue; - vlist_push(&interfaces, i); + /* Determine outgoing interface */ + struct interface *i = if_get_egress((struct sockaddr *) &s->out.saddr, interfaces); -found: vlist_push(&i->sockets, s); + vlist_push(&i->nodes, n); } - - for (size_t j = 0; j < vlist_length(&interfaces); j++) { - struct interface *i = (struct interface *) vlist_at(&interfaces, j); - - if_start(i); - } -#endif /* WITH_NETEM */ - - return 0; -} - -int socket_type_stop() -{ -#ifdef WITH_NETEM - for (size_t j = 0; j < vlist_length(&interfaces); j++) { - struct interface *i = (struct interface *) vlist_at(&interfaces, j); - - if_stop(i); - } - - vlist_destroy(&interfaces, (dtor_cb_t) if_destroy, false); #endif /* WITH_NETEM */ return 0; @@ -150,8 +91,8 @@ char * socket_print(struct node *n) break; } - char *local = socket_print_addr((struct sockaddr *) &s->local); - char *remote = socket_print_addr((struct sockaddr *) &s->remote); + char *local = socket_print_addr((struct sockaddr *) &s->in.saddr); + char *remote = socket_print_addr((struct sockaddr *) &s->out.saddr); buf = strf("layer=%s, format=%s, in.address=%s, out.address=%s", layer, format_type_name(s->format), local, remote); @@ -181,26 +122,26 @@ int socket_check(struct node *n) /* Some checks on the addresses */ if (s->layer != SOCKET_LAYER_UNIX) { - if (s->local.sa.sa_family != s->remote.sa.sa_family) + if (s->in.saddr.sa.sa_family != s->out.saddr.sa.sa_family) error("Address families of local and remote must match!"); } if (s->layer == SOCKET_LAYER_IP) { - if (ntohs(s->local.sin.sin_port) != ntohs(s->remote.sin.sin_port)) + if (ntohs(s->in.saddr.sin.sin_port) != ntohs(s->out.saddr.sin.sin_port)) error("IP protocol numbers of local and remote must match!"); } #ifdef WITH_SOCKET_LAYER_ETH else if (s->layer == SOCKET_LAYER_ETH) { - if (ntohs(s->local.sll.sll_protocol) != ntohs(s->remote.sll.sll_protocol)) + if (ntohs(s->in.saddr.sll.sll_protocol) != ntohs(s->out.saddr.sll.sll_protocol)) error("Ethertypes of local and remote must match!"); - if (ntohs(s->local.sll.sll_protocol) <= 0x5DC) + if (ntohs(s->in.saddr.sll.sll_protocol) <= 0x5DC) error("Ethertype must be large than %d or it is interpreted as an IEEE802.3 length field!", 0x5DC); } #endif /* WITH_SOCKET_LAYER_ETH */ if (s->multicast.enabled) { - if (s->local.sa.sa_family != AF_INET) + if (s->in.saddr.sa.sa_family != AF_INET) error("Multicast is only supported by IPv4 for node %s", node_name(n)); uint32_t addr = ntohl(s->multicast.mreq.imr_multiaddr.s_addr); @@ -228,21 +169,21 @@ int socket_start(struct node *n) /* Create socket */ switch (s->layer) { case SOCKET_LAYER_UDP: - s->sd = socket(s->local.sa.sa_family, SOCK_DGRAM, IPPROTO_UDP); + s->sd = socket(s->in.saddr.sa.sa_family, SOCK_DGRAM, IPPROTO_UDP); break; case SOCKET_LAYER_IP: - s->sd = socket(s->local.sa.sa_family, SOCK_RAW, ntohs(s->local.sin.sin_port)); + s->sd = socket(s->in.saddr.sa.sa_family, SOCK_RAW, ntohs(s->in.saddr.sin.sin_port)); break; #ifdef WITH_SOCKET_LAYER_ETH case SOCKET_LAYER_ETH: - s->sd = socket(s->local.sa.sa_family, SOCK_DGRAM, s->local.sll.sll_protocol); + s->sd = socket(s->in.saddr.sa.sa_family, SOCK_DGRAM, s->in.saddr.sll.sll_protocol); break; #endif /* WITH_SOCKET_LAYER_ETH */ case SOCKET_LAYER_UNIX: - s->sd = socket(s->local.sa.sa_family, SOCK_DGRAM, 0); + s->sd = socket(s->in.saddr.sa.sa_family, SOCK_DGRAM, 0); break; default: @@ -254,14 +195,14 @@ int socket_start(struct node *n) /* Delete Unix domain socket if already existing */ if (s->layer == SOCKET_LAYER_UNIX) { - ret = unlink(s->local.sun.sun_path); + ret = unlink(s->in.saddr.sun.sun_path); if (ret && errno != ENOENT) return ret; } /* Bind socket for receiving */ socklen_t addrlen = 0; - switch(s->local.ss.ss_family) { + switch(s->in.saddr.ss.ss_family) { case AF_INET: addrlen = sizeof(struct sockaddr_in); break; @@ -271,7 +212,7 @@ int socket_start(struct node *n) break; case AF_UNIX: - addrlen = SUN_LEN(&s->local.sun); + addrlen = SUN_LEN(&s->in.saddr.sun); break; #ifdef WITH_SOCKET_LAYER_ETH @@ -280,24 +221,13 @@ int socket_start(struct node *n) break; #endif /* WITH_SOCKET_LAYER_ETH */ default: - addrlen = sizeof(s->local); + addrlen = sizeof(s->in.saddr); } - ret = bind(s->sd, (struct sockaddr *) &s->local, addrlen); + ret = bind(s->sd, (struct sockaddr *) &s->in.saddr, addrlen); if (ret < 0) serror("Failed to bind socket"); -#ifdef __linux__ - /* Set fwmark for outgoing packets if netem is enabled for this node */ - if (s->mark) { - ret = setsockopt(s->sd, SOL_SOCKET, SO_MARK, &s->mark, sizeof(s->mark)); - if (ret) - serror("Failed to set FW mark for outgoing packets"); - else - debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", s->sd, s->mark); - } -#endif /* __linux__ */ - if (s->multicast.enabled) { ret = setsockopt(s->sd, IPPROTO_IP, IP_MULTICAST_LOOP, &s->multicast.loop, sizeof(s->multicast.loop)); if (ret) @@ -356,9 +286,9 @@ int socket_reverse(struct node *n) struct socket *s = (struct socket *) n->_vd; union sockaddr_union tmp; - tmp = s->local; - s->local = s->remote; - s->remote = tmp; + tmp = s->in.saddr; + s->in.saddr = s->out.saddr; + s->out.saddr = tmp; return 0; } @@ -390,18 +320,6 @@ int socket_stop(struct node *n) return 0; } -int socket_destroy(struct node *n) -{ -#ifdef WITH_NETEM - struct socket *s = (struct socket *) n->_vd; - - rtnl_qdisc_put(s->tc_qdisc); - rtnl_cls_put(s->tc_classifier); -#endif /* WITH_NETEM */ - - return 0; -} - int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int ret; @@ -436,16 +354,16 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r if (s->layer == SOCKET_LAYER_IP) { switch (src.sa.sa_family) { case AF_INET: - src.sin.sin_port = s->remote.sin.sin_port; + src.sin.sin_port = s->out.saddr.sin.sin_port; break; case AF_INET6: - src.sin6.sin6_port = s->remote.sin6.sin6_port; + src.sin6.sin6_port = s->out.saddr.sin6.sin6_port; break; } } - if (s->verify_source && socket_compare_addr(&src.sa, &s->remote.sa) != 0) { + if (s->verify_source && socket_compare_addr(&src.sa, &s->out.saddr.sa) != 0) { char *buf = socket_print_addr((struct sockaddr *) &src); warning("Received packet from unauthorized source: %s", buf); free(buf); @@ -487,7 +405,7 @@ retry: ret = io_sprint(&s->io, s->out.buf, s->out.buflen, &wbytes, smps, cnt); /* Send message */ socklen_t addrlen = 0; - switch(s->local.ss.ss_family) { + switch(s->in.saddr.ss.ss_family) { case AF_INET: addrlen = sizeof(struct sockaddr_in); break; @@ -497,7 +415,7 @@ retry: ret = io_sprint(&s->io, s->out.buf, s->out.buflen, &wbytes, smps, cnt); break; case AF_UNIX: - addrlen = SUN_LEN(&s->local.sun); + addrlen = SUN_LEN(&s->in.saddr.sun); break; #ifdef WITH_SOCKET_LAYER_ETH @@ -506,10 +424,10 @@ retry: ret = io_sprint(&s->io, s->out.buf, s->out.buflen, &wbytes, smps, cnt); break; #endif /* WITH_SOCKET_LAYER_ETH */ default: - addrlen = sizeof(s->local); + addrlen = sizeof(s->in.saddr); } -retry2: bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *) &s->remote, addrlen); +retry2: bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *) &s->out.saddr, addrlen); if (bytes < 0) { if ((errno == EPERM) || (errno == ENOENT && s->layer == SOCKET_LAYER_UNIX)) @@ -539,22 +457,17 @@ int socket_parse(struct node *n, json_t *cfg) int ret; json_t *json_multicast = NULL; - json_t *json_netem = NULL; json_error_t err; /* Default values */ s->layer = SOCKET_LAYER_UDP; s->verify_source = 0; -#ifdef WITH_NETEM - s->tc_qdisc = NULL; -#endif /* WITH_NETEM */ - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s: { s: s, s?: o }, s: { s: s, s?: b, s?: o } }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s: { s: s }, s: { s: s, s?: b, s?: o } }", "layer", &layer, "format", &format, "out", "address", &remote, - "netem", &json_netem, "in", "address", &local, "verify_source", &s->verify_source, @@ -584,13 +497,13 @@ int socket_parse(struct node *n, json_t *cfg) error("Invalid layer '%s' for node %s", layer, node_name(n)); } - ret = socket_parse_address(remote, (struct sockaddr *) &s->remote, s->layer, 0); + ret = socket_parse_address(remote, (struct sockaddr *) &s->out.saddr, s->layer, 0); if (ret) { error("Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), gai_strerror(ret)); } - ret = socket_parse_address(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE); + ret = socket_parse_address(local, (struct sockaddr *) &s->in.saddr, s->layer, AI_PASSIVE); if (ret) { error("Failed to resolve local address '%s' of node %s: %s", local, node_name(n), gai_strerror(ret)); @@ -630,21 +543,6 @@ int socket_parse(struct node *n, json_t *cfg) } } - if (json_netem) { -#ifdef WITH_NETEM - int enabled = 1; - - ret = json_unpack_ex(json_netem, &err, 0, "{ s?: b }", "enabled", &enabled); - if (ret) - jerror(&err, "Failed to parse setting 'netem' of node %s", node_name(n)); - - if (enabled) - tc_netem_parse(&s->tc_qdisc, json_netem); - else - s->tc_qdisc = NULL; -#endif /* WITH_NETEM */ - } - return 0; } @@ -865,8 +763,6 @@ static struct plugin p = { .vectorize = 0, .size = sizeof(struct socket), .type.start = socket_type_start, - .type.stop = socket_type_stop, - .destroy = socket_destroy, .reverse = socket_reverse, .parse = socket_parse, .print = socket_print, @@ -876,9 +772,9 @@ static struct plugin p = { .read = socket_read, .write = socket_write, .poll_fds = socket_fds, + .netem_fds = socket_fds } }; REGISTER_PLUGIN(&p) LIST_INIT_STATIC(&p.node.instances) - diff --git a/lib/super_node.cpp b/lib/super_node.cpp index 325993f59..9e5abb208 100644 --- a/lib/super_node.cpp +++ b/lib/super_node.cpp @@ -62,6 +62,10 @@ SuperNode::SuperNode() : vlist_init(&interfaces); vlist_init(&plugins); +#ifdef WITH_NETEM + nl_init(); /* Fill link cache */ +#endif /* WITH_NETEM */ + char hname[128]; gethostname(hname, 128);