1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-30 00:00:11 +01:00

pipe: refactor datastructure for storing direction details

This commit is contained in:
Steffen Vogel 2018-10-28 13:24:23 +01:00
parent 64f0bcb160
commit 7ce144d7d0

View file

@ -60,6 +60,11 @@ public:
limit(-1)
{ }
Direction(const Direction &c)
{
io = c.io;
}
struct pool pool;
struct node *node;
struct io *io;
@ -70,6 +75,11 @@ public:
int limit;
};
struct Directions {
Direction send;
Direction recv;
};
static std::atomic<bool> stop(false);
static void quit(int signal, siginfo_t *sinfo, void *ctx)
@ -105,30 +115,30 @@ static void usage()
static void * send_loop(void *ctx)
{
Direction *d = static_cast<Direction *>(ctx);
Directions *dirs = static_cast<Directions*>(ctx);
Logger logger = logging.get("pipe");
unsigned last_sequenceno = 0, release;
int ret, scanned, sent, allocated, cnt = 0;
struct sample *smps[d->node->out.vectorize];
struct sample *smps[dirs->send.node->out.vectorize];
/* Initialize memory */
unsigned pool_size = node_type(d->node)->pool_size ? node_type(d->node)->pool_size : LOG2_CEIL(d->node->out.vectorize);
unsigned pool_size = node_type(dirs->send.node)->pool_size ? node_type(dirs->send.node)->pool_size : LOG2_CEIL(dirs->send.node->out.vectorize);
ret = pool_init(&d->pool, pool_size, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), node_memory_type(d->node, &memory_hugepage));
ret = pool_init(&dirs->send.pool, pool_size, SAMPLE_LENGTH(DEFAULT_SAMPLE_LENGTH), node_memory_type(dirs->send.node, &memory_hugepage));
if (ret < 0)
throw new RuntimeError("Failed to allocate memory for receive pool.");
while (!io_eof(d->io)) {
allocated = sample_alloc_many(&d->pool, smps, d->node->out.vectorize);
while (!io_eof(dirs->send.io)) {
allocated = sample_alloc_many(&dirs->send.pool, smps, dirs->send.node->out.vectorize);
if (ret < 0)
throw new RuntimeError("Failed to get {} samples out of send pool ({}).", d->node->out.vectorize, ret);
else if (allocated < d->node->out.vectorize)
throw new RuntimeError("Failed to get {} samples out of send pool ({}).", dirs->send.node->out.vectorize, ret);
else if (allocated < dirs->send.node->out.vectorize)
logger->warn("Send pool underrun");
scanned = io_scan(d->io, smps, allocated);
scanned = io_scan(dirs->send.io, smps, allocated);
if (scanned < 0) {
logger->warn("Failed to read samples from stdin");
continue;
@ -146,23 +156,23 @@ static void * send_loop(void *ctx)
release = allocated;
sent = node_write(d->node, smps, scanned, &release);
sent = node_write(dirs->send.node, smps, scanned, &release);
if (sent < 0)
logger->warn("Failed to sent samples to node {}: reason={}", node_name(d->node), sent);
logger->warn("Failed to sent samples to node {}: reason={}", node_name(dirs->send.node), sent);
else if (sent < scanned)
logger->warn("Failed to sent {} out of {} samples to node {}", scanned-sent, scanned, node_name(d->node));
logger->warn("Failed to sent {} out of {} samples to node {}", scanned-sent, scanned, node_name(dirs->send.node));
sample_decref_many(smps, release);
cnt += sent;
if (d->limit > 0 && cnt >= d->limit)
if (dirs->send.limit > 0 && cnt >= dirs->send.limit)
goto leave;
pthread_testcancel();
}
leave: if (io_eof(d->io)) {
if (d->limit < 0) {
leave: if (io_eof(dirs->send.io)) {
if (dirs->recv.limit < 0) {
logger->info("Reached end-of-file. Terminating...");
killme(SIGTERM);
}
@ -179,38 +189,38 @@ leave: if (io_eof(d->io)) {
static void * recv_loop(void *ctx)
{
Direction *d = static_cast<Direction *>(ctx);
Directions *dirs = static_cast<Directions*>(ctx);
Logger logger = logging.get("pipe");
int recv, ret, cnt = 0, allocated = 0;
unsigned release;
struct sample *smps[d->node->in.vectorize];
struct sample *smps[dirs->recv.node->in.vectorize];
/* Initialize memory */
unsigned pool_size = node_type(d->node)->pool_size ? node_type(d->node)->pool_size : LOG2_CEIL(d->node->in.vectorize);
unsigned pool_size = node_type(dirs->recv.node)->pool_size ? node_type(dirs->recv.node)->pool_size : LOG2_CEIL(dirs->recv.node->in.vectorize);
ret = pool_init(&d->pool, pool_size, SAMPLE_LENGTH(list_length(&d->node->signals)), node_memory_type(d->node, &memory_hugepage));
ret = pool_init(&dirs->recv.pool, pool_size, SAMPLE_LENGTH(list_length(&dirs->recv.node->signals)), node_memory_type(dirs->recv.node, &memory_hugepage));
if (ret < 0)
throw new RuntimeError("Failed to allocate memory for receive pool.");
for (;;) {
allocated = sample_alloc_many(&d->pool, smps, d->node->in.vectorize);
allocated = sample_alloc_many(&dirs->recv.pool, smps, dirs->recv.node->in.vectorize);
if (allocated < 0)
throw new RuntimeError("Failed to allocate {} samples from receive pool.", d->node->in.vectorize);
else if (allocated < d->node->in.vectorize)
logger->warn("Receive pool underrun: allocated only {} of {} samples", allocated, d->node->in.vectorize);
throw new RuntimeError("Failed to allocate {} samples from receive pool.", dirs->recv.node->in.vectorize);
else if (allocated < dirs->recv.node->in.vectorize)
logger->warn("Receive pool underrun: allocated only {} of {} samples", allocated, dirs->recv.node->in.vectorize);
release = allocated;
recv = node_read(d->node, smps, allocated, &release);
recv = node_read(dirs->recv.node, smps, allocated, &release);
if (recv < 0)
logger->warn("Failed to receive samples from node {}: reason={}", node_name(d->node), recv);
logger->warn("Failed to receive samples from node {}: reason={}", node_name(dirs->recv.node), recv);
else {
io_print(d->io, smps, recv);
io_print(dirs->recv.io, smps, recv);
cnt += recv;
if (d->limit > 0 && cnt >= d->limit)
if (dirs->recv.limit > 0 && cnt >= dirs->recv.limit)
goto leave;
}
@ -233,8 +243,10 @@ int main(int argc, char *argv[])
struct node *node;
static struct io io = { .state = STATE_DESTROYED };
Direction sendd(&io);
Direction recvv(&io);
Directions dirs = {
.send = Direction(&io),
.recv = Direction(&io)
};
SuperNode sn; /**< The global configuration */
Logger logger = logging.get("pipe");
@ -258,19 +270,19 @@ int main(int argc, char *argv[])
break;
case 's':
recvv.enabled = false; // send only
dirs.recv.enabled = false; // send only
break;
case 'r':
sendd.enabled = false; // receive only
dirs.send.enabled = false; // receive only
break;
case 'l':
recvv.limit = strtoul(optarg, &endptr, 10);
dirs.recv.limit = strtoul(optarg, &endptr, 10);
goto check;
case 'L':
sendd.limit = strtoul(optarg, &endptr, 10);
dirs.send.limit = strtoul(optarg, &endptr, 10);
goto check;
case 't':
@ -304,6 +316,8 @@ check: if (optarg == endptr)
exit(EXIT_FAILURE);
}
info("Logging level: %d", logging.getLevel());
char *uri = argv[optind];
char *nodestr = argv[optind+1];
struct format_type *fmt;
@ -376,14 +390,14 @@ check: if (optarg == endptr)
throw new RuntimeError("Failed to start node {}: reason={}", node_name(node), ret);
/* Start threads */
if (recvv.enabled) {
recvv.node = node;
pthread_create(&recvv.thread, nullptr, recv_loop, &recvv);
if (dirs.recv.enabled) {
dirs.recv.node = node;
pthread_create(&dirs.recv.thread, nullptr, recv_loop, &dirs);
}
if (sendd.enabled) {
sendd.node = node;
pthread_create(&sendd.thread, nullptr, send_loop, &sendd);
if (dirs.send.enabled) {
dirs.send.node = node;
pthread_create(&dirs.send.thread, nullptr, send_loop, &dirs);
}
alarm(timeout);
@ -391,14 +405,14 @@ check: if (optarg == endptr)
while (!stop)
pause();
if (recvv.enabled) {
pthread_cancel(recvv.thread);
pthread_join(recvv.thread, nullptr);
if (dirs.recv.enabled) {
pthread_cancel(dirs.recv.thread);
pthread_join(dirs.recv.thread, nullptr);
}
if (sendd.enabled) {
pthread_cancel(sendd.thread);
pthread_join(sendd.thread, nullptr);
if (dirs.send.enabled) {
pthread_cancel(dirs.send.thread);
pthread_join(dirs.send.thread, nullptr);
}
ret = node_stop(node);
@ -409,14 +423,14 @@ check: if (optarg == endptr)
if (ret)
throw new RuntimeError("Failed to stop node type {}: reason={}", node_type_name(node->_vt), ret);
if (recvv.enabled) {
ret = pool_destroy(&recvv.pool);
if (dirs.recv.enabled) {
ret = pool_destroy(&dirs.recv.pool);
if (ret)
throw new RuntimeError("Failed to destroy pool");
}
if (sendd.enabled) {
ret = pool_destroy(&sendd.pool);
if (dirs.send.enabled) {
ret = pool_destroy(&dirs.send.pool);
if (ret)
throw new RuntimeError("Failed to destroy pool");
}