1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

pipe: simplify code

This commit is contained in:
Steffen Vogel 2019-02-06 15:12:02 +01:00
parent a9f5b782f6
commit ea5d59b5d3

View file

@ -141,13 +141,14 @@ static void * send_loop(void *ctx)
unsigned last_sequenceno = 0, release;
int scanned, sent, allocated, cnt = 0;
struct sample *smps[dirs->send.node->out.vectorize];
struct node *node = dirs->send.node;
struct sample *smps[node->out.vectorize];
while (!io_eof(dirs->send.io)) {
allocated = sample_alloc_many(&dirs->send.pool, smps, dirs->send.node->out.vectorize);
allocated = sample_alloc_many(&dirs->send.pool, smps, node->out.vectorize);
if (allocated < 0)
throw RuntimeError("Failed to get {} samples out of send pool.", dirs->send.node->out.vectorize);
else if (allocated < dirs->send.node->out.vectorize)
throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize);
else if (allocated < node->out.vectorize)
logger->warn("Send pool underrun");
scanned = io_scan(dirs->send.io, smps, allocated);
@ -168,7 +169,7 @@ static void * send_loop(void *ctx)
release = allocated;
sent = node_write(dirs->send.node, smps, scanned, &release);
sent = node_write(node, smps, scanned, &release);
sample_decref_many(smps, release);
@ -202,20 +203,21 @@ static void * recv_loop(void *ctx)
int recv, cnt = 0, allocated = 0;
unsigned release;
struct sample *smps[dirs->recv.node->in.vectorize];
struct node *node = dirs->recv.node;
struct sample *smps[node->in.vectorize];
for (;;) {
allocated = sample_alloc_many(&dirs->recv.pool, smps, dirs->recv.node->in.vectorize);
allocated = sample_alloc_many(&dirs->recv.pool, smps, node->in.vectorize);
if (allocated < 0)
throw RuntimeError("Failed to allocate {} samples from receive pool.", dirs->recv.node->in.vectorize);
else if (allocated < dirs->recv.node->in.vectorize)
logger->warn("Receive pool underrun: allocated only {} of {} samples", allocated, dirs->recv.node->in.vectorize);
throw RuntimeError("Failed to allocate {} samples from receive pool.", node->in.vectorize);
else if (allocated < node->in.vectorize)
logger->warn("Receive pool underrun: allocated only {} of {} samples", allocated, node->in.vectorize);
release = allocated;
recv = node_read(dirs->recv.node, smps, allocated, &release);
recv = node_read(node, smps, allocated, &release);
if (recv < 0)
logger->warn("Failed to receive samples from node {}: reason={}", node_name(dirs->recv.node), recv);
logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv);
else {
io_print(dirs->recv.io, smps, recv);