1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

adapting file and socket node-types to new data structures

This commit is contained in:
Steffen Vogel 2016-06-08 22:39:43 +02:00
parent 104455e0d5
commit 36e27af702
4 changed files with 173 additions and 156 deletions

View file

@ -51,7 +51,6 @@ struct file {
struct timespec read_epoch; /**< The epoch timestamp from the configuration. */
struct timespec read_offset; /**< An offset between the timestamp in the input file and the current time */
int read_sequence; /**< Sequence number of last message which has been written to file::path_out */
int read_timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */
double read_rate; /**< The read rate. */
};
@ -69,9 +68,9 @@ int file_open(struct node *n);
int file_close(struct node *n);
/** @see node_vtable::read */
int file_read(struct node *n, struct pool *pool, int cnt);
int file_read(struct node *n, struct sample *smps[], unsigned cnt);
/** @see node_vtable::write */
int file_write(struct node *n, struct pool *pool, int cnt);
int file_write(struct node *n, struct sample *smps[], unsigned cnt);
#endif /** _FILE_H_ @} */

View file

@ -72,10 +72,10 @@ int socket_open(struct node *n);
int socket_close(struct node *n);
/** @see node_vtable::write */
int socket_write(struct node *n, struct pool *pool, int cnt);
int socket_write(struct node *n, struct sample *smps[], unsigned cnt);
/** @see node_vtable::read */
int socket_read(struct node *n, struct pool *pool, int cnt);
int socket_read(struct node *n, struct sample *smps[], unsigned cnt);
/** @see node_vtable::parse */
int socket_parse(struct node *n, config_setting_t *cfg);

View file

@ -13,13 +13,16 @@
#include "file.h"
#include "utils.h"
#include "timing.h"
#include "pool.h"
#include "queue.h"
int file_reverse(struct node *n)
{
struct file *f = n->_vd;
SWAP(f->read, f->write);
struct file_direction tmp;
tmp = f->read;
f->read = f->write;
f->write = tmp;
return 0;
}
@ -79,8 +82,10 @@ int file_parse(struct node *n, config_setting_t *cfg)
cerror(cfg_out, "Failed to parse output file for node %s", node_name(n));
/* More write specific settings */
if (!config_setting_lookup_int(cfg_out, "split", &f->write.split))
f->write.split = 0; /* Save all samples in a single file */
if (config_setting_lookup_int(cfg_out, "split", &f->write.split))
f->write.split <<= 20; /* in MiB */
else
f->write.split = -1; /* Save all samples in a single file */
}
cfg_in = config_setting_get_member(cfg, "in");
@ -90,7 +95,7 @@ int file_parse(struct node *n, config_setting_t *cfg)
/* More read specific settings */
if (!config_setting_lookup_bool(cfg_in, "splitted", &f->read.split))
f->read.split = 0; /* Save all samples in a single file */
f->read.split = 0; /* Input files are suffixed with split indizes (.000, .001) */
if (!config_setting_lookup_float(cfg_in, "rate", &f->read_rate))
f->read_rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */
@ -202,12 +207,12 @@ int file_open(struct node *n)
struct timespec now = time_now();
/* Get timestamp of first line */
struct msg m;
int ret = msg_fscan(f->read.handle, &m, NULL, NULL); rewind(f->read.handle);
struct sample s;
int ret = sample_fscan(f->read.handle, &s, NULL); rewind(f->read.handle);
if (ret < 0)
error("Failed to read first timestamp of node %s", node_name(n));
f->read_first = MSG_TS(&m);
f->read_first = s.ts.origin;
/* Set read_offset depending on epoch_mode */
switch (f->read_epoch_mode) {
@ -262,99 +267,80 @@ int file_close(struct node *n)
return 0;
}
int file_read(struct node *n, struct pool *pool, int cnt)
int file_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct file *f = n->_vd;
int values, flags, i = 0;
struct sample *s = smps[0];
int values, flags;
if (f->read.handle) {
for (i = 0; i < cnt; i++) {
struct msg *cur = pool_getrel(pool, i);
/* Get message and timestamp */
retry: values = msg_fscan(f->read.handle, cur, &flags, NULL);
if (values < 0) {
if (feof(f->read.handle)) {
if (f->read.split) {
f->read.chunk++;
f->read.handle = file_reopen(&f->read);
if (!f->read.handle)
return 0;
info("Open new input chunk of node %s: %d", node_name(n), f->read.chunk);
}
else {
info("Rewind input file of node %s", node_name(n));
rewind(f->read.handle);
goto retry;
}
}
else
warn("Failed to read messages from node %s: reason=%d", node_name(n), values);
assert(f->read.handle);
assert(cnt == 1);
return 0;
retry: values = sample_fscan(f->read.handle, s, &flags); /* Get message and timestamp */
if (values < 0) {
if (feof(f->read.handle)) {
if (f->read.split) {
f->read.chunk++;
f->read.handle = file_reopen(&f->read);
if (!f->read.handle)
return 0;
info("Open new input chunk of node %s: %d", node_name(n), f->read.chunk);
}
/* Fix missing sequence no */
cur->sequence = f->read_sequence = (flags & MSG_PRINT_SEQUENCE) ? cur->sequence : f->read_sequence + 1;
if (!f->read_rate || ftell(f->read.handle) == 0) {
struct timespec until = time_add(&MSG_TS(cur), &f->read_offset);
if (timerfd_wait_until(f->read_timer, &until) == 0)
serror("Failed to wait for timer");
/* Update timestamp */
cur->ts.sec = until.tv_sec;
cur->ts.nsec = until.tv_nsec;
}
else { /* Wait with fixed rate delay */
if (timerfd_wait(f->read_timer) == 0)
serror("Failed to wait for timer");
/* Update timestamp */
struct timespec now = time_now();
cur->ts.sec = now.tv_sec;
cur->ts.nsec = now.tv_nsec;
else {
info("Rewind input file of node %s", node_name(n));
rewind(f->read.handle);
goto retry;
}
}
}
else
error("Can not read from node %s", node_name(n));
else
warn("Failed to read messages from node %s: reason=%d", node_name(n), values);
return i;
return 0;
}
if (!f->read_rate || ftell(f->read.handle) == 0) {
s->ts.origin = time_add(&s->ts.origin, &f->read_offset);
if (timerfd_wait_until(f->read_timer, &s->ts.origin) == 0)
serror("Failed to wait for timer");
}
else { /* Wait with fixed rate delay */
if (timerfd_wait(f->read_timer) == 0)
serror("Failed to wait for timer");
/* Update timestamp */
s->ts.origin = time_now();
}
return 1;
}
int file_write(struct node *n, struct pool *pool, int cnt)
int file_write(struct node *n, struct sample *smps[], unsigned cnt)
{
int i = 0;
struct file *f = n->_vd;
struct sample *s = smps[0];
if (f->write.handle) {
for (i = 0; i < cnt; i++) {
struct msg *m = pool_getrel(pool, i);
assert(f->write.handle);
assert(cnt == 1);
/* Split file if requested */
if ((f->write.split > 0) && ftell(f->write.handle) > f->write.split * (1 << 20)) {
f->write.chunk++;
f->write.handle = file_reopen(&f->write);
/* Split file if requested */
if (f->write.split > 0 && ftell(f->write.handle) > f->write.split) {
f->write.chunk++;
f->write.handle = file_reopen(&f->write);
info("Splitted output node %s: chunk=%u", node_name(n), f->write.chunk);
}
msg_fprint(f->write.handle, m, MSG_PRINT_ALL & ~MSG_PRINT_OFFSET, 0);
}
fflush(f->write.handle);
info("Splitted output node %s: chunk=%u", node_name(n), f->write.chunk);
}
else
error("Can not write to node %s", node_name(n));
sample_fprint(f->write.handle, s, SAMPLE_ALL & ~SAMPLE_OFFSET);
fflush(f->write.handle);
return i;
return 1;
}
static struct node_type vt = {
.name = "file",
.description = "support for file log / replay node type",
.vectorize = 0, /* unlimited */
.vectorize = 1,
.size = sizeof(struct file),
.reverse = file_reverse,
.parse = file_parse,

View file

@ -1,8 +1,4 @@
/** Various socket related functions
*
* Parse and print addresses, connect, close, etc...
*
* S2SS uses these functions to setup the network emulation feature.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
@ -12,7 +8,6 @@
#include <string.h>
#include <unistd.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/socket.h>
@ -24,13 +19,16 @@
#include <linux/if_packet.h>
#include <arpa/inet.h>
#include "socket.h"
#include "config.h"
#include "utils.h"
#include "if.h"
#include "nl.h"
#include "tc.h"
#include "config.h"
#include "utils.h"
#include "socket.h"
#include "pool.h"
#include "msg.h"
#include "sample.h"
#include "queue.h"
/* Forward declartions */
static struct node_type vt;
@ -139,11 +137,14 @@ int socket_open(struct node *n)
if (ret < 0)
serror("Failed to bind socket");
/* Set fwmark for outgoing packets */
if (setsockopt(s->sd, SOL_SOCKET, SO_MARK, &s->mark, sizeof(s->mark)))
serror("Failed to set FW mark for outgoing packets");
else
debug(4, "Set FW mark for socket (sd=%u) to %u", s->sd, s->mark);
/* Set fwmark for outgoing packets if netem is enabled for this node */
if (s->mark) {
ret = setsockopt(s->sd, SOL_SOCKET, SO_MARK, &s->mark, sizeof(s->mark));
if (ret)
serror("Failed to set FW mark for outgoing packets");
else
debug(DBG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", s->sd, s->mark);
}
/* Set socket priority, QoS or TOS IP options */
int prio;
@ -154,7 +155,7 @@ int socket_open(struct node *n)
if (setsockopt(s->sd, IPPROTO_IP, IP_TOS, &prio, sizeof(prio)))
serror("Failed to set type of service (QoS)");
else
debug(4, "Set QoS/TOS IP option for node %s to %#x", node_name(n), prio);
debug(DBG_SOCKET | 4, "Set QoS/TOS IP option for node %s to %#x", node_name(n), prio);
break;
default:
@ -162,7 +163,7 @@ int socket_open(struct node *n)
if (setsockopt(s->sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio)))
serror("Failed to set socket priority");
else
debug(4, "Set socket priority for node %s to %d", node_name(n), prio);
debug(DBG_SOCKET | 4, "Set socket priority for node %s to %d", node_name(n), prio);
break;
}
@ -172,8 +173,11 @@ int socket_open(struct node *n)
int socket_reverse(struct node *n)
{
struct socket *s = n->_vd;
union sockaddr_union tmp;
SWAP(s->remote, s->local);
tmp = s->local;
s->local = s->remote;
s->remote = tmp;
return 0;
}
@ -198,33 +202,54 @@ int socket_destroy(struct node *n)
return 0;
}
int socket_read(struct node *n, struct pool *pool, int cnt)
int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
int samples, ret, received;
ssize_t bytes;
int bytes;
struct iovec iov[cnt];
struct msg msgs[cnt];
struct msg hdr;
struct iovec iov[2*cnt];
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = ARRAY_LEN(iov)
.msg_iov = iov
};
/* Peak into message header of the first sample and to get total packet size. */
bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC);
if (bytes < sizeof(struct msg) || bytes % 4 != 0) {
warn("Packet size is invalid");
return -1;
}
/* Wait until next packet received */
poll(&(struct pollfd) { .fd = s->sd, .events = POLLIN }, 1, -1);
ret = msg_verify(&hdr);
if (ret) {
warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes);
return -1;
}
/* Convert message to host endianess */
if (hdr.endian != MSG_ENDIAN_HOST)
msg_swap(&hdr);
samples = bytes / MSG_LEN(hdr.values);
if (samples > cnt) {
warn("Received more samples than supported. Dropping %u samples", samples - cnt);
samples = cnt;
}
/* Get size of received packet in bytes */
ioctl(s->sd, FIONREAD, &bytes);
/* Check if packet length is correct */
if (bytes % (cnt * 4) != 0)
error("Packet length not dividable by 4: received=%u, cnt=%u", bytes, cnt);
if (bytes / cnt > pool->stride)
error("Packet length is too large: received=%u, cnt=%u, max=%zu", bytes, cnt, pool->stride);
for (int i = 0; i < cnt; i++) {
/* All messages of a packet must have equal length! */
iov[i].iov_base = pool_getrel(pool, i);
iov[i].iov_len = bytes / cnt;
/* We expect that all received samples have the same amount of values! */
for (int i = 0; i < samples; i++) {
iov[2*i+0].iov_base = &msgs[i];
iov[2*i+0].iov_len = MSG_LEN(0);
iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]);
iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values);
mhdr.msg_iovlen += 2;
}
/* Receive message from socket */
@ -232,53 +257,59 @@ int socket_read(struct node *n, struct pool *pool, int cnt)
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n));
else if (bytes < 0)
serror("Failed recv");
serror("Failed recv from node %s", node_name(n));
debug(17, "Received packet of %u bytes: %u samples a %u values per sample", bytes, cnt, (bytes / cnt) / 4 - 4);
for (int i = 0; i < cnt; i++) {
struct msg *m = pool_getrel(pool, i);
for (received = 0; received < samples; received++) {
struct msg *m = &msgs[received];
struct sample *s = smps[received];
ret = msg_verify(m);
if (ret)
break;
if (m->values != hdr.values)
break;
/* Convert message to host endianess */
if (m->endian != MSG_ENDIAN_HOST)
msg_swap(m);
/* Check integrity of packet */
if (bytes / cnt != MSG_LEN(m->values))
error("Invalid message len: %u for node %s", MSG_LEN(m->values), node_name(n));
bytes -= MSG_LEN(m->values);
s->length = m->values;
s->sequence = m->sequence;
s->ts.origin = MSG_TS(m);
}
debug(DBG_SOCKET | 17, "Received message of %zd bytes: %u samples", bytes, received);
/* Check packet integrity */
if (bytes != 0)
error("Packet length does not match message header length! %u bytes left over.", bytes);
return cnt;
return received;
}
int socket_write(struct node *n, struct pool *pool, int cnt)
int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
int bytes, sent = 0;
/** @todo we should check the MTU */
struct iovec iov[cnt];
for (int i = 0; i < cnt; i++) {
struct msg *m = pool_getrel(pool, i);
iov[sent].iov_base = m;
iov[sent].iov_len = MSG_LEN(m->values);
sent++;
}
ssize_t bytes;
struct msg msgs[cnt];
struct iovec iov[2*cnt];
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = sent
.msg_iovlen = ARRAY_LEN(iov)
};
/* Construct iovecs */
for (int i = 0; i < cnt; i++) {
msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence);
msgs[i].ts.sec = smps[i]->ts.origin.tv_sec;
msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec;
iov[i*2+0].iov_base = &msgs[i];
iov[i*2+0].iov_len = MSG_LEN(0);
iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]);
iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length);
}
/* Specify destination address for connection-less procotols */
switch (s->layer) {
case LAYER_UDP:
@ -289,13 +320,14 @@ int socket_write(struct node *n, struct pool *pool, int cnt)
break;
}
/* Send message */
bytes = sendmsg(s->sd, &mhdr, 0);
if (bytes < 0)
serror("Failed send");
serror("Failed send to node %s", node_name(n));
debug(17, "Sent packet of %u bytes: %u samples a %u values per sample", bytes, cnt, (bytes / cnt) / 4 - 4);
debug(DBG_SOCKET | 17, "Sent packet of %zd bytes with %u samples", bytes, cnt);
return sent;
return cnt;
}
int socket_parse(struct node *n, config_setting_t *cfg)
@ -482,7 +514,7 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye
static struct node_type vt = {
.name = "socket",
.description = "Network socket (libnl3)",
.description = "BSD network sockets",
.vectorize = 0, /* unlimited */
.size = sizeof(struct socket),
.destroy = socket_destroy,