diff --git a/lib/node.c b/lib/node.c index 3ef00a568..a4753db79 100644 --- a/lib/node.c +++ b/lib/node.c @@ -421,7 +421,7 @@ int node_stop(struct node *n) { int ret; - if (n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT) + if (n->state != STATE_STOPPING && n->state != STATE_STARTED && n->state != STATE_CONNECTED && n->state != STATE_PENDING_CONNECT) return 0; info("Stopping node %s", node_name(n)); @@ -548,12 +548,13 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel { int readd, nread = 0; - if (n->state == STATE_PAUSED) - return 0; - - assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED || n->state == STATE_PENDING_CONNECT); assert(node_type(n)->read); + if (n->state == STATE_PAUSED || n->state == STATE_PENDING_CONNECT) + return 0; + else if (n->state != STATE_STARTED && n->state != STATE_CONNECTED) + return -1; + /* Send in parts if vector not supported */ if (node_type(n)->vectorize > 0 && node_type(n)->vectorize < cnt) { while (cnt - nread > 0) { @@ -593,9 +594,13 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re { int tosend, sent, nsent = 0; - assert(n->state == STATE_STARTED || n->state == STATE_CONNECTED); assert(node_type(n)->write); + if (n->state == STATE_PAUSED || n->state == STATE_PENDING_CONNECT) + return 0; + else if (n->state != STATE_STARTED && n->state != STATE_CONNECTED) + return -1; + #ifdef WITH_HOOKS /* Run write hooks */ cnt = hook_process_list(&n->out.hooks, smps, cnt); diff --git a/lib/path.c b/lib/path.c index aaa90ee72..95af22e36 100644 --- a/lib/path.c +++ b/lib/path.c @@ -73,9 +73,9 @@ static int path_source_destroy(struct path_source *ps) return 0; } -static void path_source_read(struct path_source *ps, struct path *p, int i) +static int path_source_read(struct path_source *ps, struct path *p, int i) { - int recv, tomux, allocated, cnt; + int recv, tomux, allocated, cnt, toenqueue, enqueued = 0; unsigned release; cnt = ps->node->in.vectorize; @@ -93,10 +93,20 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) release = allocated; recv = node_read(ps->node, read_smps, allocated, &release); - if (recv == 0) + if (recv == 0) { + enqueued = 0; goto out2; - else if (recv < 0) - error("Failed to read samples from node %s", node_name(ps->node)); + } + else if (recv < 0) { + if (ps->node->state == STATE_STOPPING) { + p->state = STATE_STOPPING; + + enqueued = -1; + goto out2; + } + else + error("Failed to read samples from node %s", node_name(ps->node)); + } else if (recv < allocated) warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated); @@ -134,30 +144,33 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received)); #ifdef WITH_HOOKS - int toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux); + toenqueue = hook_process_list(&p->hooks, muxed_smps, tomux); if (toenqueue != tomux) { int skipped = tomux - toenqueue; debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p)); } #else - int toenqueue = tomux; + toenqueue = tomux; #endif if (bitset_test(&p->mask, i)) { /* Check if we received an update from all nodes/ */ if ((p->mode == PATH_MODE_ANY) || - (p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) - { + (p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) { path_destination_enqueue(p, muxed_smps, toenqueue); /* Reset bitset of updated nodes */ bitset_clear_all(&p->received); + + enqueued = toenqueue; } } sample_decref_many(muxed_smps, tomux); out2: sample_decref_many(read_smps, release); + + return enqueued; } static int path_destination_init(struct path_destination *pd, int queuelen) @@ -244,21 +257,24 @@ static void path_destination_write(struct path_destination *pd, struct path *p) static void * path_run_single(void *arg) { + int ret; struct path *p = arg; struct path_source *ps = (struct path_source *) vlist_at(&p->sources, 0); debug(1, "Started path %s in single mode", path_name(p)); - for (;;) { - path_source_read(ps, p, 0); + while (p->state == STATE_STARTED) { + pthread_testcancel(); + + ret = path_source_read(ps, p, 0); + if (ret <= 0) + continue; for (size_t i = 0; i < vlist_length(&p->destinations); i++) { struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i); path_destination_write(pd, p); } - - pthread_testcancel(); } return NULL; @@ -272,7 +288,7 @@ static void * path_run_poll(void *arg) debug(1, "Started path %s in polling mode", path_name(p)); - for (;;) { + while (p->state == STATE_STARTED) { ret = poll(p->reader.pfds, p->reader.nfds, -1); if (ret < 0) serror("Failed to poll"); @@ -292,9 +308,8 @@ static void * path_run_poll(void *arg) path_destination_enqueue(p, &p->last_sample, 1); } /* A source is ready to receive samples */ - else { + else path_source_read(ps, p, i); - } } } @@ -521,7 +536,6 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) "rate", &p->rate, "mask", &json_mask, "original_sequence_no", &p->original_sequence_no - ); if (ret) jerror(&err, "Failed to parse path configuration"); @@ -806,14 +820,13 @@ int path_stop(struct path *p) { int ret; - if (p->state != STATE_STARTED) + if (p->state != STATE_STARTED && p->state != STATE_STOPPING) return 0; info("Stopping path: %s", path_name(p)); - ret = pthread_cancel(p->tid); - if (ret) - return ret; + if (p->state != STATE_STOPPING) + p->state = STATE_STOPPING; ret = pthread_join(p->tid, NULL); if (ret) @@ -930,7 +943,7 @@ int path_reverse(struct path *p, struct path *r) new_me->node = new_ps->node; new_me->type = MAPPING_TYPE_DATA; new_me->data.offset = 0; - new_me->length = 0; + new_me->length = vlist_length(&new_me->node->in.signals); vlist_init(&new_ps->mappings); vlist_push(&new_ps->mappings, new_me); @@ -952,5 +965,6 @@ int path_reverse(struct path *p, struct path *r) vlist_push(&r->hooks, g); } #endif /* WITH_HOOKS */ + return 0; } diff --git a/lib/super_node.cpp b/lib/super_node.cpp index 5394dc71e..66eeb10e8 100644 --- a/lib/super_node.cpp +++ b/lib/super_node.cpp @@ -513,21 +513,25 @@ SuperNode::~SuperNode() int SuperNode::periodic() { -#ifdef WITH_HOOKS int ret; + int started = 0; + for (size_t i = 0; i < vlist_length(&paths); i++) { auto *p = (struct path *) vlist_at(&paths, i); - if (p->state != STATE_STARTED) - continue; + if (p->state == STATE_STARTED) { + started++; - for (size_t j = 0; j < vlist_length(&p->hooks); j++) { - hook *h = (struct hook *) vlist_at(&p->hooks, j); +#ifdef WITH_HOOKS + for (size_t j = 0; j < vlist_length(&p->hooks); j++) { + hook *h = (struct hook *) vlist_at(&p->hooks, j); - ret = hook_periodic(h); - if (ret) - return ret; + ret = hook_periodic(h); + if (ret) + return ret; + } +#endif /* WITH_HOOKS */ } } @@ -537,6 +541,7 @@ int SuperNode::periodic() if (n->state != STATE_STARTED) continue; +#ifdef WITH_HOOKS for (size_t j = 0; j < vlist_length(&n->in.hooks); j++) { auto *h = (struct hook *) vlist_at(&n->in.hooks, j); @@ -552,8 +557,15 @@ int SuperNode::periodic() if (ret) return ret; } +#endif /* WITH_HOOKS */ } -#endif + + if (state == STATE_STARTED && started == 0) { + info("No more active paths. Stopping super-node"); + + return -1; + } + return 0; } diff --git a/src/villas-node.cpp b/src/villas-node.cpp index 921362ec7..017003284 100644 --- a/src/villas-node.cpp +++ b/src/villas-node.cpp @@ -55,11 +55,22 @@ using namespace villas; using namespace villas::node; using namespace villas::plugin; -static std::atomic stop(false); +SuperNode sn; static void quit(int signal, siginfo_t *sinfo, void *ctx) { - stop = true; + Logger logger = logging.get("node"); + + switch (signal) { + case SIGALRM: + logger->info("Reached timeout. Terminating..."); + break; + + default: + logger->info("Received {} signal. Terminating...", strsignal(signal)); + } + + sn.setState(STATE_STOPPING); } static void usage() @@ -108,7 +119,6 @@ int main(int argc, char *argv[]) { int ret; - SuperNode sn; Logger logger = logging.get("node"); try { @@ -175,6 +185,7 @@ int main(int argc, char *argv[]) throw RuntimeError("Failed to verify configuration"); sn.start(); + sn.run(); sn.stop(); logger->info(CLR_GRN("Goodbye!")); diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index a96976308..51df16d56 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -107,8 +107,15 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) { Logger logger = logging.get("pipe"); - if (signal == SIGALRM) - logger->info("Reached timeout. Terminating..."); + switch (signal) { + case SIGALRM: + logger->info("Reached timeout. Terminating..."); + break; + + default: + logger->info("Received {} signal. Terminating...", strsignal(signal)); + break; + } stop = true; } @@ -145,7 +152,7 @@ static void * send_loop(void *ctx) struct node *node = dirs->send.node; struct sample *smps[node->out.vectorize]; - while (!io_eof(dirs->send.io)) { + while (node->state == STATE_STARTED && !io_eof(dirs->send.io)) { allocated = sample_alloc_many(&dirs->send.pool, smps, node->out.vectorize); if (allocated < 0) throw RuntimeError("Failed to get {} samples out of send pool.", node->out.vectorize); @@ -184,14 +191,14 @@ static void * send_loop(void *ctx) leave: if (io_eof(dirs->send.io)) { if (dirs->recv.limit < 0) { logger->info("Reached end-of-file. Terminating..."); - killme(SIGTERM); + stop = true; } else logger->info("Reached end-of-file. Wait for receive side..."); } else { logger->info("Reached send limit. Terminating..."); - killme(SIGTERM); + stop = true; } return nullptr; @@ -207,7 +214,7 @@ static void * recv_loop(void *ctx) struct node *node = dirs->recv.node; struct sample *smps[node->in.vectorize]; - for (;;) { + while (node->state == STATE_STARTED) { allocated = sample_alloc_many(&dirs->recv.pool, smps, node->in.vectorize); if (allocated < 0) throw RuntimeError("Failed to allocate {} samples from receive pool.", node->in.vectorize); @@ -217,8 +224,12 @@ static void * recv_loop(void *ctx) release = allocated; recv = node_read(node, smps, allocated, &release); - if (recv < 0) - logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv); + if (recv < 0) { + if (node->state == STATE_STOPPING) + goto leave2; + else + logger->warn("Failed to receive samples from node {}: reason={}", node_name(node), recv); + } else { io_print(dirs->recv.io, smps, recv); @@ -232,7 +243,7 @@ static void * recv_loop(void *ctx) } leave: logger->info("Reached receive limit. Terminating..."); - killme(SIGTERM); +leave2: stop = true; return nullptr; } @@ -411,7 +422,7 @@ check: if (optarg == endptr) alarm(timeout); while (!stop) - pause(); + sleep(1); if (dirs.recv.enabled) { pthread_cancel(dirs.recv.thread); diff --git a/src/villas-signal.cpp b/src/villas-signal.cpp index 9270e1beb..594b2cdb0 100644 --- a/src/villas-signal.cpp +++ b/src/villas-signal.cpp @@ -160,6 +160,17 @@ void usage() static void quit(int signal, siginfo_t *sinfo, void *ctx) { + Logger logger = logging.get("signal"); + + switch (signal) { + case SIGALRM: + logger->info("Reached timeout. Terminating..."); + break; + + default: + logger->info("Received {} signal. Terminating...", strsignal(signal)); + } + stop = true; } @@ -246,16 +257,20 @@ int main(int argc, char *argv[]) if (ret) throw RuntimeError("Failed to open output"); - while (!stop) { + while (!stop && n.state == STATE_STARTED) { t = sample_alloc(&q); unsigned release = 1; // release = allocated - ret = node_read(&n, &t, 1, &release); - if (ret > 0) - io_print(&io, &t, 1); +retry: ret = node_read(&n, &t, 1, &release); + if (ret == 0) + goto retry; + else if (ret < 0) + goto out; - sample_decref(t); + io_print(&io, &t, 1); + +out: sample_decref(t); } ret = node_stop(&n);