diff --git a/src/pipe.c b/src/pipe.c index bd4f15b94..33bca57f5 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -64,16 +64,21 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) if (recvv.enabled) { pthread_cancel(recvv.thread); pthread_join(recvv.thread, NULL); - pool_destroy(&recvv.pool); } if (sendd.enabled) { pthread_cancel(sendd.thread); pthread_join(sendd.thread, NULL); - pool_destroy(&sendd.pool); } super_node_stop(&sn); + + if (recvv.enabled) + pool_destroy(&recvv.pool); + + if (sendd.enabled) + pool_destroy(&sendd.pool); + super_node_destroy(&sn); info(CLR_GRN("Goodbye!")); @@ -101,7 +106,7 @@ static void usage() static void * send_loop(void *ctx) { - int ret, len, sent, cnt = 0; + int ret, scanned, sent, ready, cnt = 0; struct sample *smps[node->vectorize]; /* Initialize memory */ @@ -109,21 +114,29 @@ static void * send_loop(void *ctx) if (ret < 0) error("Failed to allocate memory for receive pool."); - ret = sample_alloc(&sendd.pool, smps, node->vectorize); - if (ret < 0) - error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret); - while (!io_eof(&io)) { - len = io_scan(&io, smps, node->vectorize); - if (len <= 0) + ready = sample_alloc(&sendd.pool, smps, node->vectorize); + if (ret < 0) + error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret); + else if (ready < node->vectorize) + warn("Send pool underrun"); + + scanned = io_scan(&io, smps, ready); + if (scanned < 0) { + continue; + warn("Failed to read samples from stdin"); + } + else if (scanned == 0) continue; - sent = node_write(node, smps, len); + sent = node_write(node, smps, scanned); if (sent < 0) { warn("Failed to sent samples to node %s: reason=%d", node_name(node), sent); continue; } + sample_put_many(smps, ready); + cnt += sent; if (sendd.limit > 0 && cnt >= sendd.limit) goto leave; @@ -149,7 +162,7 @@ leave: if (io_eof(&io)) { static void * recv_loop(void *ctx) { - int recv, ret, cnt = 0; + int recv, ret, cnt = 0, ready = 0; struct sample *smps[node->vectorize]; /* Initialize memory */ @@ -157,29 +170,25 @@ static void * recv_loop(void *ctx) if (ret < 0) error("Failed to allocate memory for receive pool."); - ret = sample_alloc(&recvv.pool, smps, node->vectorize); - if (ret < 0) - error("Failed to allocate %u samples from receive pool.", node->vectorize); - for (;;) { - recv = node_read(node, smps, node->vectorize); + ready = sample_alloc(&recvv.pool, smps, node->vectorize); + if (ready < 0) + error("Failed to allocate %u samples from receive pool.", node->vectorize); + else if (ready < node->vectorize) + warn("Receive pool underrun"); + + recv = node_read(node, smps, ready); if (recv < 0) { warn("Failed to receive samples from node %s: reason=%d", node_name(node), recv); continue; } - - struct timespec now = time_now(); - - /* Fix timestamps */ - for (int i = 0; i < recv; i++) { - struct sample *s = smps[i]; - - if (s->ts.received.tv_sec == -1 || s->ts.received.tv_sec == 0) - s->ts.received = now; - } + else if (recv == 0) + continue; io_print(&io, smps, recv); + sample_put_many(smps, ready); + cnt += recv; if (recvv.limit > 0 && cnt >= recvv.limit) goto leave;