diff --git a/include/file.h b/include/file.h index 88f69e8d6..2d65a2f84 100644 --- a/include/file.h +++ b/include/file.h @@ -51,7 +51,6 @@ struct file { struct timespec read_epoch; /**< The epoch timestamp from the configuration. */ struct timespec read_offset; /**< An offset between the timestamp in the input file and the current time */ - int read_sequence; /**< Sequence number of last message which has been written to file::path_out */ int read_timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ double read_rate; /**< The read rate. */ }; @@ -69,9 +68,9 @@ int file_open(struct node *n); int file_close(struct node *n); /** @see node_vtable::read */ -int file_read(struct node *n, struct pool *pool, int cnt); +int file_read(struct node *n, struct sample *smps[], unsigned cnt); /** @see node_vtable::write */ -int file_write(struct node *n, struct pool *pool, int cnt); +int file_write(struct node *n, struct sample *smps[], unsigned cnt); #endif /** _FILE_H_ @} */ diff --git a/include/socket.h b/include/socket.h index 192d9eb83..166f81e5c 100644 --- a/include/socket.h +++ b/include/socket.h @@ -72,10 +72,10 @@ int socket_open(struct node *n); int socket_close(struct node *n); /** @see node_vtable::write */ -int socket_write(struct node *n, struct pool *pool, int cnt); +int socket_write(struct node *n, struct sample *smps[], unsigned cnt); /** @see node_vtable::read */ -int socket_read(struct node *n, struct pool *pool, int cnt); +int socket_read(struct node *n, struct sample *smps[], unsigned cnt); /** @see node_vtable::parse */ int socket_parse(struct node *n, config_setting_t *cfg); diff --git a/lib/file.c b/lib/file.c index cd6e603a5..cf3c2e55a 100644 --- a/lib/file.c +++ b/lib/file.c @@ -13,13 +13,16 @@ #include "file.h" #include "utils.h" #include "timing.h" -#include "pool.h" +#include "queue.h" int file_reverse(struct node *n) { struct file *f = n->_vd; - - SWAP(f->read, f->write); + struct file_direction tmp; + + tmp = f->read; + f->read = f->write; + f->write = tmp; return 0; } @@ -79,8 +82,10 @@ int file_parse(struct node *n, config_setting_t *cfg) cerror(cfg_out, "Failed to parse output file for node %s", node_name(n)); /* More write specific settings */ - if (!config_setting_lookup_int(cfg_out, "split", &f->write.split)) - f->write.split = 0; /* Save all samples in a single file */ + if (config_setting_lookup_int(cfg_out, "split", &f->write.split)) + f->write.split <<= 20; /* in MiB */ + else + f->write.split = -1; /* Save all samples in a single file */ } cfg_in = config_setting_get_member(cfg, "in"); @@ -90,7 +95,7 @@ int file_parse(struct node *n, config_setting_t *cfg) /* More read specific settings */ if (!config_setting_lookup_bool(cfg_in, "splitted", &f->read.split)) - f->read.split = 0; /* Save all samples in a single file */ + f->read.split = 0; /* Input files are suffixed with split indizes (.000, .001) */ if (!config_setting_lookup_float(cfg_in, "rate", &f->read_rate)) f->read_rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */ @@ -202,12 +207,12 @@ int file_open(struct node *n) struct timespec now = time_now(); /* Get timestamp of first line */ - struct msg m; - int ret = msg_fscan(f->read.handle, &m, NULL, NULL); rewind(f->read.handle); + struct sample s; + int ret = sample_fscan(f->read.handle, &s, NULL); rewind(f->read.handle); if (ret < 0) error("Failed to read first timestamp of node %s", node_name(n)); - f->read_first = MSG_TS(&m); + f->read_first = s.ts.origin; /* Set read_offset depending on epoch_mode */ switch (f->read_epoch_mode) { @@ -262,99 +267,80 @@ int file_close(struct node *n) return 0; } -int file_read(struct node *n, struct pool *pool, int cnt) +int file_read(struct node *n, struct sample *smps[], unsigned cnt) { struct file *f = n->_vd; - int values, flags, i = 0; + struct sample *s = smps[0]; + int values, flags; - if (f->read.handle) { - for (i = 0; i < cnt; i++) { - struct msg *cur = pool_getrel(pool, i); - - /* Get message and timestamp */ -retry: values = msg_fscan(f->read.handle, cur, &flags, NULL); - if (values < 0) { - if (feof(f->read.handle)) { - if (f->read.split) { - f->read.chunk++; - f->read.handle = file_reopen(&f->read); - if (!f->read.handle) - return 0; - - info("Open new input chunk of node %s: %d", node_name(n), f->read.chunk); - } - else { - info("Rewind input file of node %s", node_name(n)); - rewind(f->read.handle); - goto retry; - } - } - else - warn("Failed to read messages from node %s: reason=%d", node_name(n), values); + assert(f->read.handle); + assert(cnt == 1); - return 0; +retry: values = sample_fscan(f->read.handle, s, &flags); /* Get message and timestamp */ + if (values < 0) { + if (feof(f->read.handle)) { + if (f->read.split) { + f->read.chunk++; + f->read.handle = file_reopen(&f->read); + if (!f->read.handle) + return 0; + + info("Open new input chunk of node %s: %d", node_name(n), f->read.chunk); } - - /* Fix missing sequence no */ - cur->sequence = f->read_sequence = (flags & MSG_PRINT_SEQUENCE) ? cur->sequence : f->read_sequence + 1; - - if (!f->read_rate || ftell(f->read.handle) == 0) { - struct timespec until = time_add(&MSG_TS(cur), &f->read_offset); - if (timerfd_wait_until(f->read_timer, &until) == 0) - serror("Failed to wait for timer"); - - /* Update timestamp */ - cur->ts.sec = until.tv_sec; - cur->ts.nsec = until.tv_nsec; - } - else { /* Wait with fixed rate delay */ - if (timerfd_wait(f->read_timer) == 0) - serror("Failed to wait for timer"); - - /* Update timestamp */ - struct timespec now = time_now(); - cur->ts.sec = now.tv_sec; - cur->ts.nsec = now.tv_nsec; + else { + info("Rewind input file of node %s", node_name(n)); + rewind(f->read.handle); + goto retry; } } - } - else - error("Can not read from node %s", node_name(n)); + else + warn("Failed to read messages from node %s: reason=%d", node_name(n), values); - return i; + return 0; + } + + if (!f->read_rate || ftell(f->read.handle) == 0) { + s->ts.origin = time_add(&s->ts.origin, &f->read_offset); + if (timerfd_wait_until(f->read_timer, &s->ts.origin) == 0) + serror("Failed to wait for timer"); + } + else { /* Wait with fixed rate delay */ + if (timerfd_wait(f->read_timer) == 0) + serror("Failed to wait for timer"); + + /* Update timestamp */ + s->ts.origin = time_now(); + } + + return 1; } -int file_write(struct node *n, struct pool *pool, int cnt) +int file_write(struct node *n, struct sample *smps[], unsigned cnt) { - int i = 0; struct file *f = n->_vd; + struct sample *s = smps[0]; - if (f->write.handle) { - for (i = 0; i < cnt; i++) { - struct msg *m = pool_getrel(pool, i); + assert(f->write.handle); + assert(cnt == 1); - /* Split file if requested */ - if ((f->write.split > 0) && ftell(f->write.handle) > f->write.split * (1 << 20)) { - f->write.chunk++; - f->write.handle = file_reopen(&f->write); + /* Split file if requested */ + if (f->write.split > 0 && ftell(f->write.handle) > f->write.split) { + f->write.chunk++; + f->write.handle = file_reopen(&f->write); - info("Splitted output node %s: chunk=%u", node_name(n), f->write.chunk); - } - - msg_fprint(f->write.handle, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0); - } - fflush(f->write.handle); + info("Splitted output node %s: chunk=%u", node_name(n), f->write.chunk); } - else - error("Can not write to node %s", node_name(n)); + + sample_fprint(f->write.handle, s, SAMPLE_ALL & ~SAMPLE_OFFSET); + fflush(f->write.handle); - return i; + return 1; } static struct node_type vt = { .name = "file", .description = "support for file log / replay node type", - .vectorize = 0, /* unlimited */ + .vectorize = 1, .size = sizeof(struct file), .reverse = file_reverse, .parse = file_parse, diff --git a/lib/socket.c b/lib/socket.c index a3e6e646f..28c5bdc0d 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -1,8 +1,4 @@ /** Various socket related functions - * - * Parse and print addresses, connect, close, etc... - * - * S2SS uses these functions to setup the network emulation feature. * * @author Steffen Vogel * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC @@ -12,7 +8,6 @@ #include #include -#include #include #include @@ -24,13 +19,16 @@ #include #include +#include "socket.h" +#include "config.h" +#include "utils.h" + #include "if.h" #include "nl.h" #include "tc.h" -#include "config.h" -#include "utils.h" -#include "socket.h" -#include "pool.h" +#include "msg.h" +#include "sample.h" +#include "queue.h" /* Forward declartions */ static struct node_type vt; @@ -139,11 +137,14 @@ int socket_open(struct node *n) if (ret < 0) serror("Failed to bind socket"); - /* Set fwmark for outgoing packets */ - if (setsockopt(s->sd, SOL_SOCKET, SO_MARK, &s->mark, sizeof(s->mark))) - serror("Failed to set FW mark for outgoing packets"); - else - debug(4, "Set FW mark for socket (sd=%u) to %u", s->sd, s->mark); + /* Set fwmark for outgoing packets if netem is enabled for this node */ + if (s->mark) { + ret = setsockopt(s->sd, SOL_SOCKET, SO_MARK, &s->mark, sizeof(s->mark)); + if (ret) + serror("Failed to set FW mark for outgoing packets"); + else + debug(DBG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", s->sd, s->mark); + } /* Set socket priority, QoS or TOS IP options */ int prio; @@ -154,7 +155,7 @@ int socket_open(struct node *n) if (setsockopt(s->sd, IPPROTO_IP, IP_TOS, &prio, sizeof(prio))) serror("Failed to set type of service (QoS)"); else - debug(4, "Set QoS/TOS IP option for node %s to %#x", node_name(n), prio); + debug(DBG_SOCKET | 4, "Set QoS/TOS IP option for node %s to %#x", node_name(n), prio); break; default: @@ -162,7 +163,7 @@ int socket_open(struct node *n) if (setsockopt(s->sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio))) serror("Failed to set socket priority"); else - debug(4, "Set socket priority for node %s to %d", node_name(n), prio); + debug(DBG_SOCKET | 4, "Set socket priority for node %s to %d", node_name(n), prio); break; } @@ -172,8 +173,11 @@ int socket_open(struct node *n) int socket_reverse(struct node *n) { struct socket *s = n->_vd; + union sockaddr_union tmp; - SWAP(s->remote, s->local); + tmp = s->local; + s->local = s->remote; + s->remote = tmp; return 0; } @@ -198,33 +202,54 @@ int socket_destroy(struct node *n) return 0; } -int socket_read(struct node *n, struct pool *pool, int cnt) +int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; + + int samples, ret, received; + ssize_t bytes; - int bytes; - struct iovec iov[cnt]; + struct msg msgs[cnt]; + struct msg hdr; + + struct iovec iov[2*cnt]; struct msghdr mhdr = { - .msg_iov = iov, - .msg_iovlen = ARRAY_LEN(iov) + .msg_iov = iov }; + + /* Peak into message header of the first sample and to get total packet size. */ + bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); + if (bytes < sizeof(struct msg) || bytes % 4 != 0) { + warn("Packet size is invalid"); + return -1; + } - /* Wait until next packet received */ - poll(&(struct pollfd) { .fd = s->sd, .events = POLLIN }, 1, -1); + ret = msg_verify(&hdr); + if (ret) { + warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); + return -1; + } + + /* Convert message to host endianess */ + if (hdr.endian != MSG_ENDIAN_HOST) + msg_swap(&hdr); + + samples = bytes / MSG_LEN(hdr.values); + + if (samples > cnt) { + warn("Received more samples than supported. Dropping %u samples", samples - cnt); + samples = cnt; + } - /* Get size of received packet in bytes */ - ioctl(s->sd, FIONREAD, &bytes); - - /* Check if packet length is correct */ - if (bytes % (cnt * 4) != 0) - error("Packet length not dividable by 4: received=%u, cnt=%u", bytes, cnt); - if (bytes / cnt > pool->stride) - error("Packet length is too large: received=%u, cnt=%u, max=%zu", bytes, cnt, pool->stride); - - for (int i = 0; i < cnt; i++) { - /* All messages of a packet must have equal length! */ - iov[i].iov_base = pool_getrel(pool, i); - iov[i].iov_len = bytes / cnt; + /* We expect that all received samples have the same amount of values! */ + for (int i = 0; i < samples; i++) { + iov[2*i+0].iov_base = &msgs[i]; + iov[2*i+0].iov_len = MSG_LEN(0); + + iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values); + + mhdr.msg_iovlen += 2; } /* Receive message from socket */ @@ -232,53 +257,59 @@ int socket_read(struct node *n, struct pool *pool, int cnt) if (bytes == 0) error("Remote node %s closed the connection", node_name(n)); else if (bytes < 0) - serror("Failed recv"); + serror("Failed recv from node %s", node_name(n)); - debug(17, "Received packet of %u bytes: %u samples a %u values per sample", bytes, cnt, (bytes / cnt) / 4 - 4); - - for (int i = 0; i < cnt; i++) { - struct msg *m = pool_getrel(pool, i); + for (received = 0; received < samples; received++) { + struct msg *m = &msgs[received]; + struct sample *s = smps[received]; + + ret = msg_verify(m); + if (ret) + break; + + if (m->values != hdr.values) + break; /* Convert message to host endianess */ if (m->endian != MSG_ENDIAN_HOST) msg_swap(m); - /* Check integrity of packet */ - if (bytes / cnt != MSG_LEN(m->values)) - error("Invalid message len: %u for node %s", MSG_LEN(m->values), node_name(n)); - - bytes -= MSG_LEN(m->values); + s->length = m->values; + s->sequence = m->sequence; + s->ts.origin = MSG_TS(m); } + + debug(DBG_SOCKET | 17, "Received message of %zd bytes: %u samples", bytes, received); - /* Check packet integrity */ - if (bytes != 0) - error("Packet length does not match message header length! %u bytes left over.", bytes); - - return cnt; + return received; } -int socket_write(struct node *n, struct pool *pool, int cnt) +int socket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; - int bytes, sent = 0; - - /** @todo we should check the MTU */ - - struct iovec iov[cnt]; - for (int i = 0; i < cnt; i++) { - struct msg *m = pool_getrel(pool, i); - - iov[sent].iov_base = m; - iov[sent].iov_len = MSG_LEN(m->values); - - sent++; - } + ssize_t bytes; + struct msg msgs[cnt]; + struct iovec iov[2*cnt]; struct msghdr mhdr = { .msg_iov = iov, - .msg_iovlen = sent + .msg_iovlen = ARRAY_LEN(iov) }; + /* Construct iovecs */ + for (int i = 0; i < cnt; i++) { + msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); + + msgs[i].ts.sec = smps[i]->ts.origin.tv_sec; + msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec; + + iov[i*2+0].iov_base = &msgs[i]; + iov[i*2+0].iov_len = MSG_LEN(0); + + iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); + } + /* Specify destination address for connection-less procotols */ switch (s->layer) { case LAYER_UDP: @@ -289,13 +320,14 @@ int socket_write(struct node *n, struct pool *pool, int cnt) break; } + /* Send message */ bytes = sendmsg(s->sd, &mhdr, 0); if (bytes < 0) - serror("Failed send"); + serror("Failed send to node %s", node_name(n)); - debug(17, "Sent packet of %u bytes: %u samples a %u values per sample", bytes, cnt, (bytes / cnt) / 4 - 4); + debug(DBG_SOCKET | 17, "Sent packet of %zd bytes with %u samples", bytes, cnt); - return sent; + return cnt; } int socket_parse(struct node *n, config_setting_t *cfg) @@ -482,7 +514,7 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye static struct node_type vt = { .name = "socket", - .description = "Network socket (libnl3)", + .description = "BSD network sockets", .vectorize = 0, /* unlimited */ .size = sizeof(struct socket), .destroy = socket_destroy,