1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

pipe: some refactoring and cleanups

This commit is contained in:
Steffen Vogel 2022-01-11 09:10:52 -05:00
parent 4829184a7d
commit e4a47e0e95

View file

@ -50,6 +50,8 @@ namespace villas {
namespace node {
namespace tools {
class Pipe;
class PipeDirection {
protected:
@ -63,13 +65,15 @@ protected:
bool stop;
bool enabled;
int limit;
int count;
public:
PipeDirection(Node *n, Format *fmt, bool en, int lim, const std::string &name) :
node(n),
formatter(fmt),
stop(false),
enabled(en),
limit(lim)
limit(lim),
count(0)
{
auto loggerName = fmt::format("pipe:{}", name);
logger = logging.get(loggerName);
@ -105,9 +109,8 @@ public:
stop = true;
if (enabled) {
/* We send a signal to the thread in order to interrupt blocking system calls */
pthread_kill(thread.native_handle(), SIGUSR1);
// We send a SIGUSR2 to the threads to unblock their blocking read() syscalls
pthread_kill(thread.native_handle(), SIGUSR2);
thread.join();
}
}
@ -115,6 +118,8 @@ public:
class PipeSendDirection : public PipeDirection {
friend Pipe;
public:
PipeSendDirection(Node *n, Format *i, bool en = true, int lim = -1) :
PipeDirection(n, i, en, lim, "send")
@ -123,12 +128,14 @@ public:
virtual
void run()
{
logger->debug("Send thread started");
unsigned last_sequenceno = 0;
int scanned, sent, allocated, cnt = 0;
int scanned, sent, allocated;
struct Sample *smps[node->out.vectorize];
while (!stop && !feof(stdin)) {
while (!stop) {
allocated = sample_alloc_many(&pool, smps, node->out.vectorize);
if (allocated < 0)
throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize);
@ -137,11 +144,12 @@ public:
scanned = formatter->scan(stdin, smps, allocated);
if (scanned < 0) {
if (stop)
if (feof(stdin))
goto leave;
else if (stop)
goto leave2;
logger->warn("Failed to read samples from stdin");
continue;
logger->warn("Failed to read from stdin");
}
else if (scanned == 0)
continue;
@ -158,32 +166,29 @@ public:
sample_decref_many(smps, scanned);
cnt += sent;
if (limit > 0 && cnt >= limit)
count += sent;
if (limit > 0 && count >= limit)
goto leave;
}
leave2:
logger->info("Send thread stopped");
return;
goto leave2;
leave: if (feof(stdin)) {
if (limit < 0) {
logger->info("Reached end-of-file. Terminating...");
raise(SIGINT);
}
else
logger->info("Reached end-of-file. Wait for receive side...");
}
else {
logger->info("Reached send limit. Terminating...");
raise(SIGINT);
}
leave:
if (feof(stdin))
logger->info("Reached end-of-file.");
else
logger->info("Reached send limit.");
raise(SIGUSR1);
leave2:
logger->debug("Send thread stopped");
}
};
class PipeReceiveDirection : public PipeDirection {
friend Pipe;
public:
PipeReceiveDirection(Node *n, Format *i, bool en = true, int lim = -1) :
PipeDirection(n, i, en, lim, "recv")
@ -192,7 +197,9 @@ public:
virtual
void run()
{
int recv, cnt = 0, allocated = 0;
logger->debug("Receive thread started");
int recv, allocated = 0;
struct Sample *smps[node->in.vectorize];
while (!stop) {
@ -212,21 +219,22 @@ public:
else {
formatter->print(stdout, smps, recv);
cnt += recv;
if (limit > 0 && cnt >= limit)
count += recv;
if (limit > 0 && count >= limit)
goto leave;
}
sample_decref_many(smps, allocated);
}
return;
goto leave2;
leave:
logger->info("Reached receive limit. Terminating...");
logger->info("Reached receive limit.");
raise(SIGUSR1);
leave2:
logger->info("Receive thread stopped");
raise(SIGINT);
logger->debug("Receive thread stopped");
sample_decref_many(smps, allocated);
}
};
@ -241,15 +249,15 @@ public:
reverse(false),
format("villas.human"),
dtypes("64f"),
config_cli(json_object()),
enable_write(true),
enable_read(true),
limit_send(-1),
limit_recv(-1)
config_cli(json_object())
{
int ret;
send.enabled = true;
send.limit = -1;
ret = memory::init(DEFAULT_NR_HUGEPAGES);
recv.enabled = true;
recv.limit = -1;
int ret = memory::init(DEFAULT_NR_HUGEPAGES);
if (ret)
throw RuntimeError("Failed to initialize memory");
}
@ -274,27 +282,48 @@ protected:
json_t *config_cli;
bool enable_write;
bool enable_read;
int limit_send;
int limit_recv;
struct {
int limit;
bool enabled;
std::unique_ptr<PipeReceiveDirection> dir;
} recv;
struct {
int limit;
bool enabled;
std::unique_ptr<PipeSendDirection> dir;
} send;
void handler(int signal, siginfo_t *sinfo, void *ctx)
{
logger->debug("Received {} signal.", strsignal(signal));
switch (signal) {
case SIGALRM:
logger->info("Reached timeout. Terminating...");
logger->info("Reached timeout.");
stop = true;
break;
case SIGUSR1:
break; /* ignore silently */
if (recv.dir->enabled) {
if (recv.dir->limit < 0 && feof(stdin))
stop = true;
if (recv.dir->limit > 0 && recv.dir->count >= recv.dir->limit)
stop = true;
}
if (send.dir->enabled && send.dir->limit > 0) {
if (send.dir->count >= send.dir->limit)
stop = true;
}
break;
default:
logger->info("Received {} signal. Terminating...", strsignal(signal));
stop = true;
break;
}
stop = true;
}
void usage()
@ -342,19 +371,19 @@ protected:
break;
case 's':
enable_read = false; // send only
recv.enabled = false; // send only
break;
case 'r':
enable_write = false; // receive only
send.enabled = false; // receive only
break;
case 'l':
limit_recv = strtoul(optarg, &endptr, 10);
recv.limit = strtoul(optarg, &endptr, 10);
goto check;
case 'L':
limit_send = strtoul(optarg, &endptr, 10);
send.limit = strtoul(optarg, &endptr, 10);
goto check;
case 'T':
@ -420,10 +449,10 @@ check: if (optarg == endptr)
if (!node)
throw RuntimeError("Node {} does not exist!", nodestr);
if (enable_read && !(node->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_READ))
if (recv.enabled && !(node->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_READ))
throw RuntimeError("Node {} can not receive data. Consider using send-only mode by using '-s' option", nodestr);
if (enable_write && !(node->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_WRITE))
if (send.enabled && !(node->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_WRITE))
throw RuntimeError("Node {} can not send data. Consider using receive-only mode by using '-r' option", nodestr);
#if defined(WITH_NODE_WEBSOCKET) && defined(WITH_WEB)
@ -455,24 +484,27 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Failed to start node {}: reason={}", *node, ret);
PipeReceiveDirection recv_dir(node, formatter, enable_read, limit_recv);
PipeSendDirection send_dir(node, formatter, enable_write, limit_send);
recv.dir = std::make_unique<PipeReceiveDirection>(node, formatter, recv.enabled, recv.limit);
send.dir = std::make_unique<PipeSendDirection>(node, formatter, send.enabled, send.limit);
recv_dir.startThread();
send_dir.startThread();
recv.dir->startThread();
send.dir->startThread();
/* Arm timeout timer */
alarm(timeout);
while (!stop)
sleep(1);
recv_dir.stopThread();
send_dir.stopThread();
usleep(0.1e6);
/* We are stopping the node here in order to unblock the receiving threads
* Node::read() call and allow it to be joined(). */
ret = node->stop();
if (ret)
throw RuntimeError("Failed to stop node {}: reason={}", *node, ret);
recv.dir->stopThread();
send.dir->stopThread();
sn.stopInterfaces();
ret = node->getFactory()->stop();