From e4a47e0e95302343561437f7aa036c75ed2ddc5e Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 11 Jan 2022 09:10:52 -0500 Subject: [PATCH] pipe: some refactoring and cleanups --- src/villas-pipe.cpp | 160 ++++++++++++++++++++++++++------------------ 1 file changed, 96 insertions(+), 64 deletions(-) diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index 4c10fad11..d5c1ee85b 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -50,6 +50,8 @@ namespace villas { namespace node { namespace tools { +class Pipe; + class PipeDirection { protected: @@ -63,13 +65,15 @@ protected: bool stop; bool enabled; int limit; + int count; public: PipeDirection(Node *n, Format *fmt, bool en, int lim, const std::string &name) : node(n), formatter(fmt), stop(false), enabled(en), - limit(lim) + limit(lim), + count(0) { auto loggerName = fmt::format("pipe:{}", name); logger = logging.get(loggerName); @@ -105,9 +109,8 @@ public: stop = true; if (enabled) { - /* We send a signal to the thread in order to interrupt blocking system calls */ - pthread_kill(thread.native_handle(), SIGUSR1); - + // We send a SIGUSR2 to the threads to unblock their blocking read() syscalls + pthread_kill(thread.native_handle(), SIGUSR2); thread.join(); } } @@ -115,6 +118,8 @@ public: class PipeSendDirection : public PipeDirection { + friend Pipe; + public: PipeSendDirection(Node *n, Format *i, bool en = true, int lim = -1) : PipeDirection(n, i, en, lim, "send") @@ -123,12 +128,14 @@ public: virtual void run() { + logger->debug("Send thread started"); + unsigned last_sequenceno = 0; - int scanned, sent, allocated, cnt = 0; + int scanned, sent, allocated; struct Sample *smps[node->out.vectorize]; - while (!stop && !feof(stdin)) { + while (!stop) { allocated = sample_alloc_many(&pool, smps, node->out.vectorize); if (allocated < 0) throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize); @@ -137,11 +144,12 @@ public: scanned = formatter->scan(stdin, smps, allocated); if (scanned < 0) { - if (stop) + if (feof(stdin)) + goto leave; + else if (stop) goto leave2; - logger->warn("Failed to read samples from stdin"); - continue; + logger->warn("Failed to read from stdin"); } else if (scanned == 0) continue; @@ -158,32 +166,29 @@ public: sample_decref_many(smps, scanned); - cnt += sent; - if (limit > 0 && cnt >= limit) + count += sent; + if (limit > 0 && count >= limit) goto leave; } -leave2: - logger->info("Send thread stopped"); - return; + goto leave2; -leave: if (feof(stdin)) { - if (limit < 0) { - logger->info("Reached end-of-file. Terminating..."); - raise(SIGINT); - } - else - logger->info("Reached end-of-file. Wait for receive side..."); - } - else { - logger->info("Reached send limit. Terminating..."); - raise(SIGINT); - } +leave: + if (feof(stdin)) + logger->info("Reached end-of-file."); + else + logger->info("Reached send limit."); + + raise(SIGUSR1); +leave2: + logger->debug("Send thread stopped"); } }; class PipeReceiveDirection : public PipeDirection { + friend Pipe; + public: PipeReceiveDirection(Node *n, Format *i, bool en = true, int lim = -1) : PipeDirection(n, i, en, lim, "recv") @@ -192,7 +197,9 @@ public: virtual void run() { - int recv, cnt = 0, allocated = 0; + logger->debug("Receive thread started"); + + int recv, allocated = 0; struct Sample *smps[node->in.vectorize]; while (!stop) { @@ -212,21 +219,22 @@ public: else { formatter->print(stdout, smps, recv); - cnt += recv; - if (limit > 0 && cnt >= limit) + count += recv; + if (limit > 0 && count >= limit) goto leave; } sample_decref_many(smps, allocated); } - return; - + goto leave2; leave: - logger->info("Reached receive limit. Terminating..."); + logger->info("Reached receive limit."); + raise(SIGUSR1); leave2: - logger->info("Receive thread stopped"); - raise(SIGINT); + logger->debug("Receive thread stopped"); + + sample_decref_many(smps, allocated); } }; @@ -241,15 +249,15 @@ public: reverse(false), format("villas.human"), dtypes("64f"), - config_cli(json_object()), - enable_write(true), - enable_read(true), - limit_send(-1), - limit_recv(-1) + config_cli(json_object()) { - int ret; + send.enabled = true; + send.limit = -1; - ret = memory::init(DEFAULT_NR_HUGEPAGES); + recv.enabled = true; + recv.limit = -1; + + int ret = memory::init(DEFAULT_NR_HUGEPAGES); if (ret) throw RuntimeError("Failed to initialize memory"); } @@ -274,27 +282,48 @@ protected: json_t *config_cli; - bool enable_write; - bool enable_read; - int limit_send; - int limit_recv; + struct { + int limit; + bool enabled; + std::unique_ptr dir; + + } recv; + + struct { + int limit; + bool enabled; + std::unique_ptr dir; + } send; void handler(int signal, siginfo_t *sinfo, void *ctx) { + logger->debug("Received {} signal.", strsignal(signal)); + switch (signal) { case SIGALRM: - logger->info("Reached timeout. Terminating..."); + logger->info("Reached timeout."); + stop = true; break; case SIGUSR1: - break; /* ignore silently */ + if (recv.dir->enabled) { + if (recv.dir->limit < 0 && feof(stdin)) + stop = true; + + if (recv.dir->limit > 0 && recv.dir->count >= recv.dir->limit) + stop = true; + } + + if (send.dir->enabled && send.dir->limit > 0) { + if (send.dir->count >= send.dir->limit) + stop = true; + } + break; default: - logger->info("Received {} signal. Terminating...", strsignal(signal)); + stop = true; break; } - - stop = true; } void usage() @@ -342,19 +371,19 @@ protected: break; case 's': - enable_read = false; // send only + recv.enabled = false; // send only break; case 'r': - enable_write = false; // receive only + send.enabled = false; // receive only break; case 'l': - limit_recv = strtoul(optarg, &endptr, 10); + recv.limit = strtoul(optarg, &endptr, 10); goto check; case 'L': - limit_send = strtoul(optarg, &endptr, 10); + send.limit = strtoul(optarg, &endptr, 10); goto check; case 'T': @@ -420,10 +449,10 @@ check: if (optarg == endptr) if (!node) throw RuntimeError("Node {} does not exist!", nodestr); - if (enable_read && !(node->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_READ)) + if (recv.enabled && !(node->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_READ)) throw RuntimeError("Node {} can not receive data. Consider using send-only mode by using '-s' option", nodestr); - if (enable_write && !(node->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_WRITE)) + if (send.enabled && !(node->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_WRITE)) throw RuntimeError("Node {} can not send data. Consider using receive-only mode by using '-r' option", nodestr); #if defined(WITH_NODE_WEBSOCKET) && defined(WITH_WEB) @@ -455,24 +484,27 @@ check: if (optarg == endptr) if (ret) throw RuntimeError("Failed to start node {}: reason={}", *node, ret); - PipeReceiveDirection recv_dir(node, formatter, enable_read, limit_recv); - PipeSendDirection send_dir(node, formatter, enable_write, limit_send); + recv.dir = std::make_unique(node, formatter, recv.enabled, recv.limit); + send.dir = std::make_unique(node, formatter, send.enabled, send.limit); - recv_dir.startThread(); - send_dir.startThread(); + recv.dir->startThread(); + send.dir->startThread(); + /* Arm timeout timer */ alarm(timeout); while (!stop) - sleep(1); - - recv_dir.stopThread(); - send_dir.stopThread(); + usleep(0.1e6); + /* We are stopping the node here in order to unblock the receiving threads + * Node::read() call and allow it to be joined(). */ ret = node->stop(); if (ret) throw RuntimeError("Failed to stop node {}: reason={}", *node, ret); + recv.dir->stopThread(); + send.dir->stopThread(); + sn.stopInterfaces(); ret = node->getFactory()->stop();