1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

smaller fixes

This commit is contained in:
Steffen Vogel 2019-06-05 19:00:09 +02:00
parent 6433caac28
commit 923ef88fb5
2 changed files with 23 additions and 5 deletions

View file

@ -64,12 +64,14 @@ protected:
std::thread thread;
bool stop;
bool enabled;
int limit;
public:
PipeDirection(struct node *n, struct io *i, bool en = true, int lim = -1) :
node(n),
io(i),
stop(false),
enabled(en),
limit(lim)
{
@ -97,12 +99,18 @@ public:
void startThread()
{
stop = false;
if (enabled)
thread = std::thread(&villas::node::tools::PipeDirection::run, this);
}
void stopThread()
{
stop = true;
/* We send a signal to the thread in order to interrupt blocking system calls */
pthread_kill(thread.native_handle(), SIGUSR1);
thread.join();
}
};
@ -123,7 +131,7 @@ public:
struct sample *smps[node->out.vectorize];
while (node->state == STATE_STARTED && !io_eof(io)) {
while (!stop && !io_eof(io)) {
allocated = sample_alloc_many(&pool, smps, node->out.vectorize);
if (allocated < 0)
throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize);
@ -132,6 +140,9 @@ public:
scanned = io_scan(io, smps, allocated);
if (scanned < 0) {
if (stop)
goto leave2;
logger->warn("Failed to read samples from stdin");
continue;
}
@ -157,6 +168,8 @@ public:
goto leave;
}
leave2: return;
leave: if (io_eof(io)) {
if (limit < 0) {
logger->info("Reached end-of-file. Terminating...");
@ -187,7 +200,7 @@ public:
unsigned release;
struct sample *smps[node->in.vectorize];
while (node->state == STATE_STARTED) {
while (!stop) {
allocated = sample_alloc_many(&pool, smps, node->in.vectorize);
if (allocated < 0)
throw RuntimeError("Failed to allocate {} samples from receive pool.", node->in.vectorize);
@ -198,7 +211,7 @@ public:
recv = node_read(node, smps, allocated, &release);
if (recv < 0) {
if (node->state == STATE_STOPPING)
if (node->state == STATE_STOPPING || stop)
goto leave2;
else
logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv);
@ -214,6 +227,8 @@ public:
sample_decref_many(smps, release);
}
return;
leave: logger->info("Reached receive limit. Terminating...");
leave2: raise(SIGINT);
}
@ -273,10 +288,13 @@ protected:
void handler(int signal, siginfo_t *sinfo, void *ctx)
{
switch (signal) {
case SIGALRM:
case SIGALRM:
logger->info("Reached timeout. Terminating...");
break;
case SIGUSR1:
break; /* ignore silently */
default:
logger->info("Received {} signal. Terminating...", strsignal(signal));
break;

View file

@ -88,7 +88,7 @@ protected:
int count;
hist_cnt_t hist_warmup;
Hist::cnt_t hist_warmup;
int hist_buckets;
void handler(int signal, siginfo_t *sinfo, void *ctx)