1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

use state to shutdown

This commit is contained in:
Steffen Vogel 2019-02-11 16:39:30 +01:00
parent 401b955e24
commit 15e945feff
6 changed files with 123 additions and 55 deletions

View file

@ -421,7 +421,7 @@ int node_stop(struct node *n)
{
int ret;
if (n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT)
if (n->state != STATE_STOPPING && n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT)
return 0;
info("Stopping node %s", node_name(n));
@ -548,12 +548,13 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
{
int readd, nread = 0;
if (n->state == STATE_PAUSED)
return 0;
assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT);
assert(node_type(n)->read);
if (n->state == STATE_PAUSED || n->state == STATE_PENDING_CONNECT)
return 0;
else if (n->state != STATE_STARTED && n->state != STATE_CONNECTED)
return -1;
/* Send in parts if vector not supported */
if (node_type(n)->vectorize > 0 && node_type(n)->vectorize < cnt) {
while (cnt - nread > 0) {
@ -593,9 +594,13 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
{
int tosend, sent, nsent = 0;
assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED);
assert(node_type(n)->write);
if (n->state == STATE_PAUSED || n->state == STATE_PENDING_CONNECT)
return 0;
else if (n->state != STATE_STARTED && n->state != STATE_CONNECTED)
return -1;
#ifdef WITH_HOOKS
/* Run write hooks */
cnt = hook_process_list(&n->out.hooks, smps, cnt);

View file

@ -73,9 +73,9 @@ static int path_source_destroy(struct path_source *ps)
return 0;
}
static void path_source_read(struct path_source *ps, struct path *p, int i)
static int path_source_read(struct path_source *ps, struct path *p, int i)
{
int recv, tomux, allocated, cnt;
int recv, tomux, allocated, cnt, toenqueue, enqueued = 0;
unsigned release;
cnt = ps->node->in.vectorize;
@ -93,10 +93,20 @@ static void path_source_read(struct path_source *ps, struct path *p, int i)
release = allocated;
recv = node_read(ps->node, read_smps, allocated, &release);
if (recv == 0)
if (recv == 0) {
enqueued = 0;
goto out2;
else if (recv < 0)
error("Failed to read samples from node %s", node_name(ps->node));
}
else if (recv < 0) {
if (ps->node->state == STATE_STOPPING) {
p->state = STATE_STOPPING;
enqueued = -1;
goto out2;
}
else
error("Failed to read samples from node %s", node_name(ps->node));
}
else if (recv < allocated)
warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated);
@ -134,30 +144,33 @@ static void path_source_read(struct path_source *ps, struct path *p, int i)
debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received));
#ifdef WITH_HOOKS
int toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux);
toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux);
if (toenqueue != tomux) {
int skipped = tomux - toenqueue;
debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p));
}
#else
int toenqueue = tomux;
toenqueue = tomux;
#endif
if (bitset_test(&p->mask, i)) {
/* Check if we received an update from all nodes/ */
if ((p->mode == PATH_MODE_ANY) ||
(p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received)))
{
(p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) {
path_destination_enqueue(p, muxed_smps, toenqueue);
/* Reset bitset of updated nodes */
bitset_clear_all(&p->received);
enqueued = toenqueue;
}
}
sample_decref_many(muxed_smps, tomux);
out2: sample_decref_many(read_smps, release);
return enqueued;
}
static int path_destination_init(struct path_destination *pd, int queuelen)
@ -244,21 +257,24 @@ static void path_destination_write(struct path_destination *pd, struct path *p)
static void * path_run_single(void *arg)
{
int ret;
struct path *p = arg;
struct path_source *ps = (struct path_source *) vlist_at(&p->sources, 0);
debug(1, "Started path %s in single mode", path_name(p));
for (;;) {
path_source_read(ps, p, 0);
while (p->state == STATE_STARTED) {
pthread_testcancel();
ret = path_source_read(ps, p, 0);
if (ret <= 0)
continue;
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i);
path_destination_write(pd, p);
}
pthread_testcancel();
}
return NULL;
@ -272,7 +288,7 @@ static void * path_run_poll(void *arg)
debug(1, "Started path %s in polling mode", path_name(p));
for (;;) {
while (p->state == STATE_STARTED) {
ret = poll(p->reader.pfds, p->reader.nfds, -1);
if (ret < 0)
serror("Failed to poll");
@ -292,9 +308,8 @@ static void * path_run_poll(void *arg)
path_destination_enqueue(p, &p->last_sample, 1);
}
/* A source is ready to receive samples */
else {
else
path_source_read(ps, p, i);
}
}
}
@ -521,7 +536,6 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
"rate", &p->rate,
"mask", &json_mask,
"original_sequence_no", &p->original_sequence_no
);
if (ret)
jerror(&err, "Failed to parse path configuration");
@ -806,14 +820,13 @@ int path_stop(struct path *p)
{
int ret;
if (p->state != STATE_STARTED)
if (p->state != STATE_STARTED && p->state != STATE_STOPPING)
return 0;
info("Stopping path: %s", path_name(p));
ret = pthread_cancel(p->tid);
if (ret)
return ret;
if (p->state != STATE_STOPPING)
p->state = STATE_STOPPING;
ret = pthread_join(p->tid, NULL);
if (ret)
@ -930,7 +943,7 @@ int path_reverse(struct path *p, struct path *r)
new_me->node = new_ps->node;
new_me->type = MAPPING_TYPE_DATA;
new_me->data.offset = 0;
new_me->length = 0;
new_me->length = vlist_length(&new_me->node->in.signals);
vlist_init(&new_ps->mappings);
vlist_push(&new_ps->mappings, new_me);
@ -952,5 +965,6 @@ int path_reverse(struct path *p, struct path *r)
vlist_push(&r->hooks, g);
}
#endif /* WITH_HOOKS */
return 0;
}

View file

@ -513,21 +513,25 @@ SuperNode::~SuperNode()
int SuperNode::periodic()
{
#ifdef WITH_HOOKS
int ret;
int started = 0;
for (size_t i = 0; i < vlist_length(&paths); i++) {
auto *p = (struct path *) vlist_at(&paths, i);
if (p->state != STATE_STARTED)
continue;
if (p->state == STATE_STARTED) {
started++;
for (size_t j = 0; j < vlist_length(&p->hooks); j++) {
hook *h = (struct hook *) vlist_at(&p->hooks, j);
#ifdef WITH_HOOKS
for (size_t j = 0; j < vlist_length(&p->hooks); j++) {
hook *h = (struct hook *) vlist_at(&p->hooks, j);
ret = hook_periodic(h);
if (ret)
return ret;
ret = hook_periodic(h);
if (ret)
return ret;
}
#endif /* WITH_HOOKS */
}
}
@ -537,6 +541,7 @@ int SuperNode::periodic()
if (n->state != STATE_STARTED)
continue;
#ifdef WITH_HOOKS
for (size_t j = 0; j < vlist_length(&n->in.hooks); j++) {
auto *h = (struct hook *) vlist_at(&n->in.hooks, j);
@ -552,8 +557,15 @@ int SuperNode::periodic()
if (ret)
return ret;
}
#endif /* WITH_HOOKS */
}
#endif
if (state == STATE_STARTED && started == 0) {
info("No more active paths. Stopping super-node");
return -1;
}
return 0;
}

View file

@ -55,11 +55,22 @@ using namespace villas;
using namespace villas::node;
using namespace villas::plugin;
static std::atomic<bool> stop(false);
SuperNode sn;
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
stop = true;
Logger logger = logging.get("node");
switch (signal) {
case SIGALRM:
logger->info("Reached timeout. Terminating...");
break;
default:
logger->info("Received {} signal. Terminating...", strsignal(signal));
}
sn.setState(STATE_STOPPING);
}
static void usage()
@ -108,7 +119,6 @@ int main(int argc, char *argv[])
{
int ret;
SuperNode sn;
Logger logger = logging.get("node");
try {
@ -175,6 +185,7 @@ int main(int argc, char *argv[])
throw RuntimeError("Failed to verify configuration");
sn.start();
sn.run();
sn.stop();
logger->info(CLR_GRN("Goodbye!"));

View file

@ -107,8 +107,15 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
Logger logger = logging.get("pipe");
if (signal == SIGALRM)
logger->info("Reached timeout. Terminating...");
switch (signal) {
case SIGALRM:
logger->info("Reached timeout. Terminating...");
break;
default:
logger->info("Received {} signal. Terminating...", strsignal(signal));
break;
}
stop = true;
}
@ -145,7 +152,7 @@ static void * send_loop(void *ctx)
struct node *node = dirs->send.node;
struct sample *smps[node->out.vectorize];
while (!io_eof(dirs->send.io)) {
while (node->state == STATE_STARTED && !io_eof(dirs->send.io)) {
allocated = sample_alloc_many(&dirs->send.pool, smps, node->out.vectorize);
if (allocated < 0)
throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize);
@ -184,14 +191,14 @@ static void * send_loop(void *ctx)
leave: if (io_eof(dirs->send.io)) {
if (dirs->recv.limit < 0) {
logger->info("Reached end-of-file. Terminating...");
killme(SIGTERM);
stop = true;
}
else
logger->info("Reached end-of-file. Wait for receive side...");
}
else {
logger->info("Reached send limit. Terminating...");
killme(SIGTERM);
stop = true;
}
return nullptr;
@ -207,7 +214,7 @@ static void * recv_loop(void *ctx)
struct node *node = dirs->recv.node;
struct sample *smps[node->in.vectorize];
for (;;) {
while (node->state == STATE_STARTED) {
allocated = sample_alloc_many(&dirs->recv.pool, smps, node->in.vectorize);
if (allocated < 0)
throw RuntimeError("Failed to allocate {} samples from receive pool.", node->in.vectorize);
@ -217,8 +224,12 @@ static void * recv_loop(void *ctx)
release = allocated;
recv = node_read(node, smps, allocated, &release);
if (recv < 0)
logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv);
if (recv < 0) {
if (node->state == STATE_STOPPING)
goto leave2;
else
logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv);
}
else {
io_print(dirs->recv.io, smps, recv);
@ -232,7 +243,7 @@ static void * recv_loop(void *ctx)
}
leave: logger->info("Reached receive limit. Terminating...");
killme(SIGTERM);
leave2: stop = true;
return nullptr;
}
@ -411,7 +422,7 @@ check: if (optarg == endptr)
alarm(timeout);
while (!stop)
pause();
sleep(1);
if (dirs.recv.enabled) {
pthread_cancel(dirs.recv.thread);

View file

@ -160,6 +160,17 @@ void usage()
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
Logger logger = logging.get("signal");
switch (signal) {
case SIGALRM:
logger->info("Reached timeout. Terminating...");
break;
default:
logger->info("Received {} signal. Terminating...", strsignal(signal));
}
stop = true;
}
@ -246,16 +257,20 @@ int main(int argc, char *argv[])
if (ret)
throw RuntimeError("Failed to open output");
while (!stop) {
while (!stop && n.state == STATE_STARTED) {
t = sample_alloc(&q);
unsigned release = 1; // release = allocated
ret = node_read(&n, &t, 1, &release);
if (ret > 0)
io_print(&io, &t, 1);
retry: ret = node_read(&n, &t, 1, &release);
if (ret == 0)
goto retry;
else if (ret < 0)
goto out;
sample_decref(t);
io_print(&io, &t, 1);
out: sample_decref(t);
}
ret = node_stop(&n);