mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
pipe: fix handling of samples and their references
This commit is contained in:
parent
76ec183035
commit
0897f8f87a
1 changed files with 35 additions and 26 deletions
61
src/pipe.c
61
src/pipe.c
|
@ -64,16 +64,21 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
|
|||
if (recvv.enabled) {
|
||||
pthread_cancel(recvv.thread);
|
||||
pthread_join(recvv.thread, NULL);
|
||||
pool_destroy(&recvv.pool);
|
||||
}
|
||||
|
||||
if (sendd.enabled) {
|
||||
pthread_cancel(sendd.thread);
|
||||
pthread_join(sendd.thread, NULL);
|
||||
pool_destroy(&sendd.pool);
|
||||
}
|
||||
|
||||
super_node_stop(&sn);
|
||||
|
||||
if (recvv.enabled)
|
||||
pool_destroy(&recvv.pool);
|
||||
|
||||
if (sendd.enabled)
|
||||
pool_destroy(&sendd.pool);
|
||||
|
||||
super_node_destroy(&sn);
|
||||
|
||||
info(CLR_GRN("Goodbye!"));
|
||||
|
@ -101,7 +106,7 @@ static void usage()
|
|||
|
||||
static void * send_loop(void *ctx)
|
||||
{
|
||||
int ret, len, sent, cnt = 0;
|
||||
int ret, scanned, sent, ready, cnt = 0;
|
||||
struct sample *smps[node->vectorize];
|
||||
|
||||
/* Initialize memory */
|
||||
|
@ -109,21 +114,29 @@ static void * send_loop(void *ctx)
|
|||
if (ret < 0)
|
||||
error("Failed to allocate memory for receive pool.");
|
||||
|
||||
ret = sample_alloc(&sendd.pool, smps, node->vectorize);
|
||||
if (ret < 0)
|
||||
error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret);
|
||||
|
||||
while (!io_eof(&io)) {
|
||||
len = io_scan(&io, smps, node->vectorize);
|
||||
if (len <= 0)
|
||||
ready = sample_alloc(&sendd.pool, smps, node->vectorize);
|
||||
if (ret < 0)
|
||||
error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret);
|
||||
else if (ready < node->vectorize)
|
||||
warn("Send pool underrun");
|
||||
|
||||
scanned = io_scan(&io, smps, ready);
|
||||
if (scanned < 0) {
|
||||
continue;
|
||||
warn("Failed to read samples from stdin");
|
||||
}
|
||||
else if (scanned == 0)
|
||||
continue;
|
||||
|
||||
sent = node_write(node, smps, len);
|
||||
sent = node_write(node, smps, scanned);
|
||||
if (sent < 0) {
|
||||
warn("Failed to sent samples to node %s: reason=%d", node_name(node), sent);
|
||||
continue;
|
||||
}
|
||||
|
||||
sample_put_many(smps, ready);
|
||||
|
||||
cnt += sent;
|
||||
if (sendd.limit > 0 && cnt >= sendd.limit)
|
||||
goto leave;
|
||||
|
@ -149,7 +162,7 @@ leave: if (io_eof(&io)) {
|
|||
|
||||
static void * recv_loop(void *ctx)
|
||||
{
|
||||
int recv, ret, cnt = 0;
|
||||
int recv, ret, cnt = 0, ready = 0;
|
||||
struct sample *smps[node->vectorize];
|
||||
|
||||
/* Initialize memory */
|
||||
|
@ -157,29 +170,25 @@ static void * recv_loop(void *ctx)
|
|||
if (ret < 0)
|
||||
error("Failed to allocate memory for receive pool.");
|
||||
|
||||
ret = sample_alloc(&recvv.pool, smps, node->vectorize);
|
||||
if (ret < 0)
|
||||
error("Failed to allocate %u samples from receive pool.", node->vectorize);
|
||||
|
||||
for (;;) {
|
||||
recv = node_read(node, smps, node->vectorize);
|
||||
ready = sample_alloc(&recvv.pool, smps, node->vectorize);
|
||||
if (ready < 0)
|
||||
error("Failed to allocate %u samples from receive pool.", node->vectorize);
|
||||
else if (ready < node->vectorize)
|
||||
warn("Receive pool underrun");
|
||||
|
||||
recv = node_read(node, smps, ready);
|
||||
if (recv < 0) {
|
||||
warn("Failed to receive samples from node %s: reason=%d", node_name(node), recv);
|
||||
continue;
|
||||
}
|
||||
|
||||
struct timespec now = time_now();
|
||||
|
||||
/* Fix timestamps */
|
||||
for (int i = 0; i < recv; i++) {
|
||||
struct sample *s = smps[i];
|
||||
|
||||
if (s->ts.received.tv_sec == -1 || s->ts.received.tv_sec == 0)
|
||||
s->ts.received = now;
|
||||
}
|
||||
else if (recv == 0)
|
||||
continue;
|
||||
|
||||
io_print(&io, smps, recv);
|
||||
|
||||
sample_put_many(smps, ready);
|
||||
|
||||
cnt += recv;
|
||||
if (recvv.limit > 0 && cnt >= recvv.limit)
|
||||
goto leave;
|
||||
|
|
Loading…
Add table
Reference in a new issue