diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index 8fe37f2c3..c27c67b8c 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -64,12 +64,14 @@ protected: std::thread thread; + bool stop; bool enabled; int limit; public: PipeDirection(struct node *n, struct io *i, bool en = true, int lim = -1) : node(n), io(i), + stop(false), enabled(en), limit(lim) { @@ -97,12 +99,18 @@ public: void startThread() { + stop = false; if (enabled) thread = std::thread(&villas::node::tools::PipeDirection::run, this); } void stopThread() { + stop = true; + + /* We send a signal to the thread in order to interrupt blocking system calls */ + pthread_kill(thread.native_handle(), SIGUSR1); + thread.join(); } }; @@ -123,7 +131,7 @@ public: struct sample *smps[node->out.vectorize]; - while (node->state == STATE_STARTED && !io_eof(io)) { + while (!stop && !io_eof(io)) { allocated = sample_alloc_many(&pool, smps, node->out.vectorize); if (allocated < 0) throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize); @@ -132,6 +140,9 @@ public: scanned = io_scan(io, smps, allocated); if (scanned < 0) { + if (stop) + goto leave2; + logger->warn("Failed to read samples from stdin"); continue; } @@ -157,6 +168,8 @@ public: goto leave; } +leave2: return; + leave: if (io_eof(io)) { if (limit < 0) { logger->info("Reached end-of-file. Terminating..."); @@ -187,7 +200,7 @@ public: unsigned release; struct sample *smps[node->in.vectorize]; - while (node->state == STATE_STARTED) { + while (!stop) { allocated = sample_alloc_many(&pool, smps, node->in.vectorize); if (allocated < 0) throw RuntimeError("Failed to allocate {} samples from receive pool.", node->in.vectorize); @@ -198,7 +211,7 @@ public: recv = node_read(node, smps, allocated, &release); if (recv < 0) { - if (node->state == STATE_STOPPING) + if (node->state == STATE_STOPPING || stop) goto leave2; else logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv); @@ -214,6 +227,8 @@ public: sample_decref_many(smps, release); } + return; + leave: logger->info("Reached receive limit. Terminating..."); leave2: raise(SIGINT); } @@ -273,10 +288,13 @@ protected: void handler(int signal, siginfo_t *sinfo, void *ctx) { switch (signal) { - case SIGALRM: + case SIGALRM: logger->info("Reached timeout. Terminating..."); break; + case SIGUSR1: + break; /* ignore silently */ + default: logger->info("Received {} signal. Terminating...", strsignal(signal)); break; diff --git a/src/villas-test-rtt.cpp b/src/villas-test-rtt.cpp index 05f339973..3d9d6021d 100644 --- a/src/villas-test-rtt.cpp +++ b/src/villas-test-rtt.cpp @@ -88,7 +88,7 @@ protected: int count; - hist_cnt_t hist_warmup; + Hist::cnt_t hist_warmup; int hist_buckets; void handler(int signal, siginfo_t *sinfo, void *ctx)