1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

netem: move intialization of Netem support into node instead of socket class

This commit is contained in:
Steffen Vogel 2019-01-21 15:50:18 +01:00
parent 90335c8c9e
commit 8190227776
10 changed files with 272 additions and 225 deletions

View file

@ -57,7 +57,7 @@ struct interface {
char irqs[IF_IRQ_MAX]; /**< List of IRQs of the NIC. */
int affinity; /**< IRQ / Core Affinity of this interface. */
struct vlist sockets; /**< Linked list of associated sockets. */
struct vlist nodes; /**< Linked list of nodes which use this interface. */
};
/** Add a new interface to the global list and lookup name, irqs...
@ -99,6 +99,11 @@ int if_start(struct interface *i);
*/
int if_stop(struct interface *i);
/** Find existing or create new interface instance on which packets for a certain destination
* will leave the system.
*/
struct interface * if_get_egress(struct sockaddr *sa, struct vlist *interfaces);
/** Lookup routing tables to get the interface on which packets for a certain destination
* will leave the system.
*
@ -107,7 +112,7 @@ int if_stop(struct interface *i);
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
int if_get_egress(struct sockaddr *sa, struct rtnl_link **link);
struct rtnl_link * if_get_egress_link(struct sockaddr *sa);
/** Get all IRQs for this interface.
*

View file

@ -42,6 +42,12 @@
extern "C" {
#endif
/* Forward declarations */
#ifdef __linux__
struct rtnl_qdisc;
struct rtnl_cls;
#endif /* __linux__ */
struct node_direction {
int enabled;
int builtin; /**< This node should use built-in hooks by default. */
@ -75,6 +81,13 @@ struct node
enum state state;
#ifdef __linux__
int mark; /**< Socket mark for netem, routing and filtering */
struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */
struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */
#endif /* __linux__ */
struct node_type *_vt; /**< Virtual functions (C++ OOP style) */
void *_vd; /**< Virtual data (used by struct node::_vt functions) */
@ -179,6 +192,8 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
int node_poll_fds(struct node *n, int fds[]);
int node_netem_fds(struct node *n, int fds[]);
struct node_type * node_type(struct node *n);
struct memory_type * node_memory_type(struct node *n, struct memory_type *parent);

View file

@ -215,6 +215,13 @@ struct node_type {
*/
int (*poll_fds)(struct node *n, int fds[]);
/** Get list of socket file descriptors for configuring network emulation.
*
* This callback is optional.
* @return The number of file descriptors which have been put into \p sds.
*/
int (*netem_fds)(struct node *n, int sds[]);
/** Return a memory allocator which should be used for sample pools passed to this node. */
struct memory_type * (*memory_type)(struct node *n, struct memory_type *parent);
};

View file

@ -51,15 +51,15 @@ struct format_type;
struct rtp {
struct rtp_sock *rs; /**< RTP socket */
struct sa local_rtp; /**< Local address of the RTP socket */
struct sa local_rtcp; /**< Local address of the RTCP socket */
struct sa remote_rtp; /**< Remote address of the RTP socket */
struct sa remote_rtcp; /**< Remote address of the RTCP socket */
struct {
struct sa saddr_rtp; /**< Local/Remote address of the RTP socket */
struct sa saddr_rtcp; /**< Local/Remote address of the RTCP socket */
} in, out;
struct format_type *format;
struct io io;
double rate; /**< Sample rate of source */
double rate; /**< Sample rate of source */
struct {
int enabled;

View file

@ -48,7 +48,6 @@
#include <linux/if_packet.h>
#endif /* LIBNL3_ROUTE_FOUND */
#ifdef __cplusplus
extern "C" {
#endif
@ -79,14 +78,10 @@ union sockaddr_union {
struct socket {
int sd; /**< The socket descriptor */
int mark; /**< Socket mark for netem, routing and filtering */
int verify_source; /**< Verify the source address of incoming packets against socket::remote. */
enum socket_layer layer; /**< The OSI / IP layer which should be used for this socket */
union sockaddr_union local; /**< Local address of the socket */
union sockaddr_union remote; /**< Remote address of the socket */
struct format_type *format;
struct io io;
@ -98,14 +93,10 @@ struct socket {
struct ip_mreq mreq; /**< A multicast group to join. */
} multicast;
#ifdef WITH_NETEM
struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */
struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */
#endif /* WITH_NETEM */
struct {
char *buf; /**< Buffer for receiving messages */
size_t buflen;
union sockaddr_union saddr; /**< Remote address of the socket */
} in, out;
};

View file

@ -50,7 +50,7 @@ int if_init(struct interface *i, struct rtnl_link *link)
else
warning("Did not found any interrupts for interface '%s'", if_name(i));
vlist_init(&i->sockets);
vlist_init(&i->nodes);
return 0;
}
@ -58,7 +58,7 @@ int if_init(struct interface *i, struct rtnl_link *link)
int if_destroy(struct interface *i)
{
/* List members are freed by the nodes they belong to. */
vlist_destroy(&i->sockets, NULL, false);
vlist_destroy(&i->nodes, NULL, false);
rtnl_qdisc_put(i->tc_qdisc);
@ -69,51 +69,49 @@ int if_destroy(struct interface *i)
int if_start(struct interface *i)
{
info("Starting interface '%s' which is used by %zu sockets", rtnl_link_get_name(i->nl_link), vlist_length(&i->sockets));
info("Starting interface '%s' which is used by %zu nodes", if_name(i), vlist_length(&i->nodes));
{
/* Set affinity for network interfaces (skip _loopback_ dev) */
//if_set_affinity(i, i->affinity);
/* Set affinity for network interfaces (skip _loopback_ dev) */
//if_set_affinity(i, i->affinity);
/* Assign fwmark's to socket nodes which have netem options */
int ret, mark = 0;
for (size_t j = 0; j < vlist_length(&i->sockets); j++) {
struct socket *s = (struct socket *) vlist_at(&i->sockets, j);
/* Assign fwmark's to nodes which have netem options */
int ret, mark = 0;
for (size_t j = 0; j < vlist_length(&i->nodes); j++) {
struct node *n = (struct node *) vlist_at(&i->nodes, j);
if (s->tc_qdisc)
s->mark = 1 + mark++;
}
if (n->tc_qdisc)
n->mark = 1 + mark++;
}
/* Abort if no node is using netem */
if (mark == 0)
return 0;
/* Abort if no node is using netem */
if (mark == 0)
return 0;
if (getuid() != 0)
error("Network emulation requires super-user privileges!");
if (getuid() != 0)
error("Network emulation requires super-user privileges!");
/* Replace root qdisc */
ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, mark);
if (ret)
error("Failed to setup priority queuing discipline: %s", nl_geterror(ret));
/* Replace root qdisc */
ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, mark);
if (ret)
error("Failed to setup priority queuing discipline: %s", nl_geterror(ret));
/* Create netem qdisks and appropriate filter per netem node */
for (size_t j = 0; j < vlist_length(&i->sockets); j++) {
struct socket *s = (struct socket *) vlist_at(&i->sockets, j);
/* Create netem qdisks and appropriate filter per netem node */
for (size_t j = 0; j < vlist_length(&i->nodes); j++) {
struct node *n = (struct node *) vlist_at(&i->nodes, j);
if (s->tc_qdisc) {
ret = tc_mark(i, &s->tc_classifier, TC_HANDLE(1, s->mark), s->mark);
if (ret)
error("Failed to setup FW mark classifier: %s", nl_geterror(ret));
if (n->tc_qdisc) {
ret = tc_mark(i, &n->tc_classifier, TC_HANDLE(1, n->mark), n->mark);
if (ret)
error("Failed to setup FW mark classifier: %s", nl_geterror(ret));
char *buf = tc_netem_print(s->tc_qdisc);
debug(LOG_IF | 5, "Starting network emulation on interface '%s' for FW mark %u: %s",
rtnl_link_get_name(i->nl_link), s->mark, buf);
free(buf);
char *buf = tc_netem_print(n->tc_qdisc);
debug(LOG_IF | 5, "Starting network emulation on interface '%s' for FW mark %u: %s",
if_name(i), n->mark, buf);
free(buf);
ret = tc_netem(i, &s->tc_qdisc, TC_HANDLE(0x1000+s->mark, 0), TC_HANDLE(1, s->mark));
if (ret)
error("Failed to setup netem qdisc: %s", nl_geterror(ret));
}
ret = tc_netem(i, &n->tc_qdisc, TC_HANDLE(0x1000+n->mark, 0), TC_HANDLE(1, n->mark));
if (ret)
error("Failed to setup netem qdisc: %s", nl_geterror(ret));
}
}
@ -132,12 +130,46 @@ int if_stop(struct interface *i)
return 0;
}
int if_get_egress(struct sockaddr *sa, struct rtnl_link **link)
const char * if_name(struct interface *i)
{
return rtnl_link_get_name(i->nl_link);
}
struct interface * if_get_egress(struct sockaddr *sa, struct vlist *interfaces)
{
int ret;
struct rtnl_link *link;
/* Determine outgoing interface */
link = if_get_egress_link(sa);
if (!link) {
char *buf = socket_print_addr(sa);
error("Failed to get interface for socket address '%s'", buf);
free(buf);
return NULL;
}
/* Search of existing interface with correct ifindex */
struct interface *i;
for (size_t k = 0; k < vlist_length(interfaces); k++) {
i = (struct interface *) vlist_at(interfaces, k);
if (rtnl_link_get_ifindex(i->nl_link) == rtnl_link_get_ifindex(link))
return i;
}
/* If not found, create a new interface */
i = alloc(sizeof(struct interface));
ret = if_init(i, link);
if (ret)
NULL;
return i;
}
struct rtnl_link * if_get_egress_link(struct sockaddr *sa)
{
int ifindex = -1;
@ -166,11 +198,8 @@ const char * if_name(struct interface *i)
}
struct nl_cache *cache = nl_cache_mngt_require("route/link");
*link = rtnl_link_get(cache, ifindex);
if (!*link)
return -1;
return 0;
return rtnl_link_get(cache, ifindex);
}
int if_get_irqs(struct interface *i)

View file

@ -175,6 +175,11 @@ int node_init(struct node *n, struct node_type *vt)
n->_name = NULL;
n->_name_long = NULL;
#ifdef WITH_NETEM
s->tc_qdisc = NULL;
s->tc_classifier = NULL;
#endif /* WITH_NETEM */
n->signals.state = STATE_DESTROYED;
vlist_init(&n->signals);
@ -222,15 +227,18 @@ int node_parse(struct node *n, json_t *json, const char *name)
json_error_t err;
json_t *json_signals = NULL;
json_t *json_netem = NULL;
const char *type;
n->name = strdup(name);
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o } }",
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o }, s?: { s?: o } }",
"type", &type,
"in",
"signals", &json_signals
"signals", &json_signals,
"out",
"netem", &json_netem
);
if (ret)
jerror(&err, "Failed to parse node %s", node_name(n));
@ -238,6 +246,21 @@ int node_parse(struct node *n, json_t *json, const char *name)
nt = node_type_lookup(type);
assert(nt == node_type(n));
if (json_netem) {
#ifdef WITH_NETEM
int enabled = 1;
ret = json_unpack_ex(json_netem, &err, 0, "{ s?: b }", "enabled", &enabled);
if (ret)
jerror(&err, "Failed to parse setting 'netem' of node %s", node_name(n));
if (enabled)
tc_netem_parse(&s->tc_qdisc, json_netem);
else
s->tc_qdisc = NULL;
#endif /* WITH_NETEM */
}
if (nt->flags & NODE_TYPE_PROVIDES_SIGNALS) {
if (json_signals)
error("Node %s does not support signal definitions", node_name(n));
@ -354,6 +377,24 @@ int node_start(struct node *n)
if (ret)
return ret;
#ifdef __linux__
/* Set fwmark for outgoing packets if netem is enabled for this node */
if (n->mark) {
int fds[16];
int num_sds = node_netem_fds(n, fds);
for (int i = 0; i < num_sds; i++) {
int fd = fds[i];
ret = setsockopt(fd, SOL_SOCKET, SO_MARK, &n->mark, sizeof(n->mark));
if (ret)
serror("Failed to set FW mark for outgoing packets");
else
debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", fd, n->mark);
}
}
#endif /* __linux__ */
n->state = STATE_STARTED;
n->sequence = 0;
@ -482,6 +523,11 @@ int node_destroy(struct node *n)
if (n->name)
free(n->name);
#ifdef WITH_NETEM
rtnl_qdisc_put(n->tc_qdisc);
rtnl_cls_put(n->tc_classifier);
#endif /* WITH_NETEM */
n->state = STATE_DESTROYED;
return 0;
@ -613,8 +659,10 @@ int node_poll_fds(struct node *n, int fds[])
{
return node_type(n)->poll_fds ? node_type(n)->poll_fds(n, fds) : -1;
}
int node_netem_fds(struct node *n, int fds[])
{
return node_type(n)->fd ? node_type(n)->fd(n) : -1;
return node_type(n)->netem_fds ? node_type(n)->netem_fds(n, fds) : -1;
}
struct node_type * node_type(struct node *n)

View file

@ -32,6 +32,7 @@
#include <re/re_mem.h>
#include <re/re_rtp.h>
#include <re/re_sys.h>
#include <re/re_udp.h>
#undef ALIGN_MASK
#include <villas/plugin.h>
@ -42,9 +43,13 @@
#include <villas/utils.h>
#include <villas/hook.h>
#include <villas/format_type.h>
#include <villas/super_node.h>
static pthread_t re_pthread;
/* Forward declartions */
static struct plugin p;
static int rtp_set_rate(struct node *n, double rate)
{
struct rtp *r = (struct rtp *) n->_vd;
@ -92,13 +97,13 @@ int rtp_reverse(struct node *n)
struct rtp *r = (struct rtp *) n->_vd;
struct sa tmp;
tmp = r->local_rtp;
r->local_rtp = r->remote_rtp;
r->remote_rtp = tmp;
tmp = r->in.saddr_rtp;
r->in.saddr_rtp = r->out.saddr_rtp;
r->out.saddr_rtp = tmp;
tmp = r->local_rtcp;
r->local_rtcp = r->remote_rtcp;
r->remote_rtcp = tmp;
tmp = r->in.saddr_rtcp;
r->in.saddr_rtcp = r->out.saddr_rtcp;
r->out.saddr_rtcp = tmp;
return 0;
}
@ -177,30 +182,30 @@ int rtp_parse(struct node *n, json_t *cfg)
error("Invalid format '%s' for node %s", format, node_name(n));
/* Remote address */
ret = sa_decode(&r->remote_rtp, remote, strlen(remote));
ret = sa_decode(&r->out.saddr_rtp, remote, strlen(remote));
if (ret) {
error("Failed to resolve remote address '%s' of node %s: %s",
remote, node_name(n), strerror(ret));
}
/* Assign even port number to RTP socket, next odd number to RTCP socket */
port = sa_port(&r->remote_rtp) & ~1;
sa_set_sa(&r->remote_rtcp, &r->remote_rtp.u.sa);
sa_set_port(&r->remote_rtp, port);
sa_set_port(&r->remote_rtcp, port+1);
port = sa_port(&r->out.saddr_rtp) & ~1;
sa_set_sa(&r->out.saddr_rtcp, &r->out.saddr_rtp.u.sa);
sa_set_port(&r->out.saddr_rtp, port);
sa_set_port(&r->out.saddr_rtcp, port+1);
/* Local address */
ret = sa_decode(&r->local_rtp, local, strlen(local));
ret = sa_decode(&r->in.saddr_rtp, local, strlen(local));
if (ret) {
error("Failed to resolve local address '%s' of node %s: %s",
local, node_name(n), strerror(ret));
}
/* Assign even port number to RTP socket, next odd number to RTCP socket */
port = sa_port(&r->local_rtp) & ~1;
sa_set_sa(&r->local_rtcp, &r->local_rtp.u.sa);
sa_set_port(&r->local_rtp, port);
sa_set_port(&r->local_rtcp, port+1);
port = sa_port(&r->in.saddr_rtp) & ~1;
sa_set_sa(&r->in.saddr_rtcp, &r->in.saddr_rtp.u.sa);
sa_set_port(&r->in.saddr_rtp, port);
sa_set_port(&r->in.saddr_rtcp, port+1);
/** @todo parse * in addresses */
@ -212,8 +217,8 @@ char * rtp_print(struct node *n)
struct rtp *r = (struct rtp *) n->_vd;
char *buf;
char *local = socket_print_addr((struct sockaddr *) &r->local_rtp.u);
char *remote = socket_print_addr((struct sockaddr *) &r->remote_rtp.u);
char *local = socket_print_addr((struct sockaddr *) &r->in.saddr_rtp.u);
char *remote = socket_print_addr((struct sockaddr *) &r->out.saddr_rtp.u);
buf = strf("format=%s, in.address=%s, out.address=%s, rtcp.enabled=%s",
format_type_name(r->format),
@ -319,11 +324,11 @@ int rtp_start(struct node *n)
vlist_push(&n->out.hooks, r->rtcp.throttle_hook);
/* Initialize RTP socket */
uint16_t port = sa_port(&r->local_rtp) & ~1;
ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local_rtp, port, port+1, r->rtcp.enabled, rtp_handler, rtcp_handler, n);
uint16_t port = sa_port(&r->in.saddr_rtp) & ~1;
ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->in.saddr_rtp, port, port+1, r->rtcp.enabled, rtp_handler, rtcp_handler, n);
/* Start RTCP session */
rtcp_start(r->rs, node_name(n), &r->remote_rtcp);
rtcp_start(r->rs, node_name(n), &r->out.saddr_rtcp);
return ret;
}
@ -357,7 +362,7 @@ static void stop_handler(int sig, siginfo_t *si, void *ctx)
re_cancel();
}
int rtp_type_start()
int rtp_type_start(struct super_node *sn)
{
int ret;
@ -383,6 +388,22 @@ int rtp_type_start()
if (ret)
return ret;
#ifdef WITH_NETEM
struct vlist *interfaces = super_node_get_interfaces(sn);
/* Gather list of used network interfaces */
for (size_t i = 0; i < vlist_length(&p.node.instances); i++) {
struct node *n = (struct node *) vlist_at(&p.node.instances, i);
struct rtp *r = (struct rtp *) n->_vd;
struct interface *i = if_get_egress(&r->out.saddr_rtp.u.sa, interfaces);
if (!i)
error("Failed to find egress interface for node: %s", node_name(n));
vlist_push(&i->nodes, n);
}
#endif /* WITH_NETEM */
return ret;
}
@ -493,7 +514,7 @@ retry: cnt = io_sprint(&r->io, buf, buflen, &wbytes, smps, cnt);
mbuf_set_pos(mb, 12);
/* Send dataset */
ret = rtp_send(r->rs, &r->remote_rtp, false, false, 21, (uint32_t) time(NULL), mb);
ret = rtp_send(r->rs, &r->out.saddr_rtp, false, false, 21, (uint32_t) time(NULL), mb);
if (ret) {
warning("Error from rtp_send, reason: %d", ret);
cnt = ret;
@ -505,9 +526,38 @@ out1: free(buf);
return cnt;
}
int rtp_poll_fds(struct node *n, int fds[])
{
struct rtp *r = (struct rtp *) n->_vd;
fds[0] = queue_signalled_fd(&r->recv_queue);
return 1;
}
int rtp_netem_fds(struct node *n, int fds[])
{
struct rtp *r = (struct rtp *) n->_vd;
int m = 0;
struct udp_sock *rtp = (struct udp_sock *) rtp_sock(r->rs);
struct udp_sock *rtcp = (struct udp_sock *) rtcp_sock(r->rs);
fds[m++] = udp_sock_fd(rtp, AF_INET);
if (r->rtcp.enabled)
fds[m++] = udp_sock_fd(rtcp, AF_INET);
return m;
}
static struct plugin p = {
.name = "rtp",
#ifdef WITH_NETEM
.description = "real-time transport protocol (libre, libnl3 netem support)",
#else
.description = "real-time transport protocol (libre)",
#endif
.type = PLUGIN_TYPE_NODE,
.node = {
.vectorize = 0,
@ -521,6 +571,8 @@ static struct plugin p = {
.stop = rtp_stop,
.read = rtp_read,
.write = rtp_write,
.poll_fds = rtp_poll_fds,
.netem_fds = rtp_netem_fds
}
};

View file

@ -36,92 +36,33 @@
#include <villas/queue.h>
#include <villas/plugin.h>
#include <villas/compat.h>
#include <villas/super_node.h>
#ifdef WITH_SOCKET_LAYER_ETH
#include <netinet/ether.h>
#endif /* WITH_SOCKET_LAYER_ETH */
#ifdef WITH_NETEM
#include <villas/kernel/if.h>
#include <villas/kernel/nl.h>
#include <villas/kernel/tc_netem.h>
#endif /* WITH_NETEM */
/* Forward declartions */
static struct plugin p;
/* Private static storage */
struct vlist interfaces = { .state = STATE_DESTROYED };
int socket_type_start(struct super_node *sn)
{
#ifdef WITH_NETEM
int ret;
nl_init(); /* Fill link cache */
vlist_init(&interfaces);
struct vlist *interfaces = super_node_get_interfaces(sn);
/* Gather list of used network interfaces */
for (size_t i = 0; i < vlist_length(&p.node.instances); i++) {
struct node *n = (struct node *) vlist_at(&p.node.instances, i);
struct socket *s = (struct socket *) n->_vd;
struct rtnl_link *link;
if (s->layer != SOCKET_LAYER_ETH &&
s->layer != SOCKET_LAYER_IP &&
s->layer != SOCKET_LAYER_UDP)
continue;
/* Determine outgoing interface */
ret = if_get_egress((struct sockaddr *) &s->remote, &link);
if (ret) {
char *buf = socket_print_addr((struct sockaddr *) &s->remote);
error("Failed to get interface for socket address '%s'", buf);
free(buf);
}
/* Search of existing interface with correct ifindex */
struct interface *i;
for (size_t k = 0; k < vlist_length(&interfaces); k++) {
i = (struct interface *) vlist_at(&interfaces, k);
if (rtnl_link_get_ifindex(i->nl_link) == rtnl_link_get_ifindex(link))
goto found;
}
/* If not found, create a new interface */
i = alloc(sizeof(struct interface));
ret = if_init(i, link);
if (ret)
if (s->layer == SOCKET_LAYER_UNIX)
continue;
vlist_push(&interfaces, i);
/* Determine outgoing interface */
struct interface *i = if_get_egress((struct sockaddr *) &s->out.saddr, interfaces);
found: vlist_push(&i->sockets, s);
vlist_push(&i->nodes, n);
}
for (size_t j = 0; j < vlist_length(&interfaces); j++) {
struct interface *i = (struct interface *) vlist_at(&interfaces, j);
if_start(i);
}
#endif /* WITH_NETEM */
return 0;
}
int socket_type_stop()
{
#ifdef WITH_NETEM
for (size_t j = 0; j < vlist_length(&interfaces); j++) {
struct interface *i = (struct interface *) vlist_at(&interfaces, j);
if_stop(i);
}
vlist_destroy(&interfaces, (dtor_cb_t) if_destroy, false);
#endif /* WITH_NETEM */
return 0;
@ -150,8 +91,8 @@ char * socket_print(struct node *n)
break;
}
char *local = socket_print_addr((struct sockaddr *) &s->local);
char *remote = socket_print_addr((struct sockaddr *) &s->remote);
char *local = socket_print_addr((struct sockaddr *) &s->in.saddr);
char *remote = socket_print_addr((struct sockaddr *) &s->out.saddr);
buf = strf("layer=%s, format=%s, in.address=%s, out.address=%s", layer, format_type_name(s->format), local, remote);
@ -181,26 +122,26 @@ int socket_check(struct node *n)
/* Some checks on the addresses */
if (s->layer != SOCKET_LAYER_UNIX) {
if (s->local.sa.sa_family != s->remote.sa.sa_family)
if (s->in.saddr.sa.sa_family != s->out.saddr.sa.sa_family)
error("Address families of local and remote must match!");
}
if (s->layer == SOCKET_LAYER_IP) {
if (ntohs(s->local.sin.sin_port) != ntohs(s->remote.sin.sin_port))
if (ntohs(s->in.saddr.sin.sin_port) != ntohs(s->out.saddr.sin.sin_port))
error("IP protocol numbers of local and remote must match!");
}
#ifdef WITH_SOCKET_LAYER_ETH
else if (s->layer == SOCKET_LAYER_ETH) {
if (ntohs(s->local.sll.sll_protocol) != ntohs(s->remote.sll.sll_protocol))
if (ntohs(s->in.saddr.sll.sll_protocol) != ntohs(s->out.saddr.sll.sll_protocol))
error("Ethertypes of local and remote must match!");
if (ntohs(s->local.sll.sll_protocol) <= 0x5DC)
if (ntohs(s->in.saddr.sll.sll_protocol) <= 0x5DC)
error("Ethertype must be large than %d or it is interpreted as an IEEE802.3 length field!", 0x5DC);
}
#endif /* WITH_SOCKET_LAYER_ETH */
if (s->multicast.enabled) {
if (s->local.sa.sa_family != AF_INET)
if (s->in.saddr.sa.sa_family != AF_INET)
error("Multicast is only supported by IPv4 for node %s", node_name(n));
uint32_t addr = ntohl(s->multicast.mreq.imr_multiaddr.s_addr);
@ -228,21 +169,21 @@ int socket_start(struct node *n)
/* Create socket */
switch (s->layer) {
case SOCKET_LAYER_UDP:
s->sd = socket(s->local.sa.sa_family, SOCK_DGRAM, IPPROTO_UDP);
s->sd = socket(s->in.saddr.sa.sa_family, SOCK_DGRAM, IPPROTO_UDP);
break;
case SOCKET_LAYER_IP:
s->sd = socket(s->local.sa.sa_family, SOCK_RAW, ntohs(s->local.sin.sin_port));
s->sd = socket(s->in.saddr.sa.sa_family, SOCK_RAW, ntohs(s->in.saddr.sin.sin_port));
break;
#ifdef WITH_SOCKET_LAYER_ETH
case SOCKET_LAYER_ETH:
s->sd = socket(s->local.sa.sa_family, SOCK_DGRAM, s->local.sll.sll_protocol);
s->sd = socket(s->in.saddr.sa.sa_family, SOCK_DGRAM, s->in.saddr.sll.sll_protocol);
break;
#endif /* WITH_SOCKET_LAYER_ETH */
case SOCKET_LAYER_UNIX:
s->sd = socket(s->local.sa.sa_family, SOCK_DGRAM, 0);
s->sd = socket(s->in.saddr.sa.sa_family, SOCK_DGRAM, 0);
break;
default:
@ -254,14 +195,14 @@ int socket_start(struct node *n)
/* Delete Unix domain socket if already existing */
if (s->layer == SOCKET_LAYER_UNIX) {
ret = unlink(s->local.sun.sun_path);
ret = unlink(s->in.saddr.sun.sun_path);
if (ret && errno != ENOENT)
return ret;
}
/* Bind socket for receiving */
socklen_t addrlen = 0;
switch(s->local.ss.ss_family) {
switch(s->in.saddr.ss.ss_family) {
case AF_INET:
addrlen = sizeof(struct sockaddr_in);
break;
@ -271,7 +212,7 @@ int socket_start(struct node *n)
break;
case AF_UNIX:
addrlen = SUN_LEN(&s->local.sun);
addrlen = SUN_LEN(&s->in.saddr.sun);
break;
#ifdef WITH_SOCKET_LAYER_ETH
@ -280,24 +221,13 @@ int socket_start(struct node *n)
break;
#endif /* WITH_SOCKET_LAYER_ETH */
default:
addrlen = sizeof(s->local);
addrlen = sizeof(s->in.saddr);
}
ret = bind(s->sd, (struct sockaddr *) &s->local, addrlen);
ret = bind(s->sd, (struct sockaddr *) &s->in.saddr, addrlen);
if (ret < 0)
serror("Failed to bind socket");
#ifdef __linux__
/* Set fwmark for outgoing packets if netem is enabled for this node */
if (s->mark) {
ret = setsockopt(s->sd, SOL_SOCKET, SO_MARK, &s->mark, sizeof(s->mark));
if (ret)
serror("Failed to set FW mark for outgoing packets");
else
debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", s->sd, s->mark);
}
#endif /* __linux__ */
if (s->multicast.enabled) {
ret = setsockopt(s->sd, IPPROTO_IP, IP_MULTICAST_LOOP, &s->multicast.loop, sizeof(s->multicast.loop));
if (ret)
@ -356,9 +286,9 @@ int socket_reverse(struct node *n)
struct socket *s = (struct socket *) n->_vd;
union sockaddr_union tmp;
tmp = s->local;
s->local = s->remote;
s->remote = tmp;
tmp = s->in.saddr;
s->in.saddr = s->out.saddr;
s->out.saddr = tmp;
return 0;
}
@ -390,18 +320,6 @@ int socket_stop(struct node *n)
return 0;
}
int socket_destroy(struct node *n)
{
#ifdef WITH_NETEM
struct socket *s = (struct socket *) n->_vd;
rtnl_qdisc_put(s->tc_qdisc);
rtnl_cls_put(s->tc_classifier);
#endif /* WITH_NETEM */
return 0;
}
int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int ret;
@ -436,16 +354,16 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r
if (s->layer == SOCKET_LAYER_IP) {
switch (src.sa.sa_family) {
case AF_INET:
src.sin.sin_port = s->remote.sin.sin_port;
src.sin.sin_port = s->out.saddr.sin.sin_port;
break;
case AF_INET6:
src.sin6.sin6_port = s->remote.sin6.sin6_port;
src.sin6.sin6_port = s->out.saddr.sin6.sin6_port;
break;
}
}
if (s->verify_source && socket_compare_addr(&src.sa, &s->remote.sa) != 0) {
if (s->verify_source && socket_compare_addr(&src.sa, &s->out.saddr.sa) != 0) {
char *buf = socket_print_addr((struct sockaddr *) &src);
warning("Received packet from unauthorized source: %s", buf);
free(buf);
@ -487,7 +405,7 @@ retry: ret = io_sprint(&s->io, s->out.buf, s->out.buflen, &wbytes, smps, cnt);
/* Send message */
socklen_t addrlen = 0;
switch(s->local.ss.ss_family) {
switch(s->in.saddr.ss.ss_family) {
case AF_INET:
addrlen = sizeof(struct sockaddr_in);
break;
@ -497,7 +415,7 @@ retry: ret = io_sprint(&s->io, s->out.buf, s->out.buflen, &wbytes, smps, cnt);
break;
case AF_UNIX:
addrlen = SUN_LEN(&s->local.sun);
addrlen = SUN_LEN(&s->in.saddr.sun);
break;
#ifdef WITH_SOCKET_LAYER_ETH
@ -506,10 +424,10 @@ retry: ret = io_sprint(&s->io, s->out.buf, s->out.buflen, &wbytes, smps, cnt);
break;
#endif /* WITH_SOCKET_LAYER_ETH */
default:
addrlen = sizeof(s->local);
addrlen = sizeof(s->in.saddr);
}
retry2: bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *) &s->remote, addrlen);
retry2: bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *) &s->out.saddr, addrlen);
if (bytes < 0) {
if ((errno == EPERM) ||
(errno == ENOENT && s->layer == SOCKET_LAYER_UNIX))
@ -539,22 +457,17 @@ int socket_parse(struct node *n, json_t *cfg)
int ret;
json_t *json_multicast = NULL;
json_t *json_netem = NULL;
json_error_t err;
/* Default values */
s->layer = SOCKET_LAYER_UDP;
s->verify_source = 0;
#ifdef WITH_NETEM
s->tc_qdisc = NULL;
#endif /* WITH_NETEM */
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s: { s: s, s?: o }, s: { s: s, s?: b, s?: o } }",
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s: { s: s }, s: { s: s, s?: b, s?: o } }",
"layer", &layer,
"format", &format,
"out",
"address", &remote,
"netem", &json_netem,
"in",
"address", &local,
"verify_source", &s->verify_source,
@ -584,13 +497,13 @@ int socket_parse(struct node *n, json_t *cfg)
error("Invalid layer '%s' for node %s", layer, node_name(n));
}
ret = socket_parse_address(remote, (struct sockaddr *) &s->remote, s->layer, 0);
ret = socket_parse_address(remote, (struct sockaddr *) &s->out.saddr, s->layer, 0);
if (ret) {
error("Failed to resolve remote address '%s' of node %s: %s",
remote, node_name(n), gai_strerror(ret));
}
ret = socket_parse_address(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE);
ret = socket_parse_address(local, (struct sockaddr *) &s->in.saddr, s->layer, AI_PASSIVE);
if (ret) {
error("Failed to resolve local address '%s' of node %s: %s",
local, node_name(n), gai_strerror(ret));
@ -630,21 +543,6 @@ int socket_parse(struct node *n, json_t *cfg)
}
}
if (json_netem) {
#ifdef WITH_NETEM
int enabled = 1;
ret = json_unpack_ex(json_netem, &err, 0, "{ s?: b }", "enabled", &enabled);
if (ret)
jerror(&err, "Failed to parse setting 'netem' of node %s", node_name(n));
if (enabled)
tc_netem_parse(&s->tc_qdisc, json_netem);
else
s->tc_qdisc = NULL;
#endif /* WITH_NETEM */
}
return 0;
}
@ -865,8 +763,6 @@ static struct plugin p = {
.vectorize = 0,
.size = sizeof(struct socket),
.type.start = socket_type_start,
.type.stop = socket_type_stop,
.destroy = socket_destroy,
.reverse = socket_reverse,
.parse = socket_parse,
.print = socket_print,
@ -876,9 +772,9 @@ static struct plugin p = {
.read = socket_read,
.write = socket_write,
.poll_fds = socket_fds,
.netem_fds = socket_fds
}
};
REGISTER_PLUGIN(&p)
LIST_INIT_STATIC(&p.node.instances)

View file

@ -62,6 +62,10 @@ SuperNode::SuperNode() :
vlist_init(&interfaces);
vlist_init(&plugins);
#ifdef WITH_NETEM
nl_init(); /* Fill link cache */
#endif /* WITH_NETEM */
char hname[128];
gethostname(hname, 128);