diff --git a/include/villas/path.h b/include/villas/path.h index e3684f304..e3e26d863 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -42,6 +42,8 @@ #ifdef __cplusplus #include + #include + extern "C" { #endif @@ -91,6 +93,8 @@ struct path { json_t *cfg; /**< A JSON object containing the configuration of the path. */ #ifdef __cplusplus + villas::Logger logger; + std::bitset<128> mask; /**< A mask of path_sources which are enabled for poll(). */ std::bitset<128> received; /**< A mask of path_sources for which we already received samples. */ #endif diff --git a/lib/path.cpp b/lib/path.cpp index db8a7bd49..e00f10545 100644 --- a/lib/path.cpp +++ b/lib/path.cpp @@ -43,6 +43,8 @@ #include #include +using namespace villas; + static void * path_run_single(void *arg) { int ret; @@ -77,7 +79,7 @@ static void * path_run_poll(void *arg) if (ret < 0) serror("Failed to poll"); - debug(10, "Path %s returned from poll(2)", path_name(p)); + p->logger->debug("Path {} returned from poll(2)", path_name(p)); for (int i = 0; i < p->reader.nfds; i++) { struct path_source *ps = (struct path_source *) vlist_at(&p->sources, i); @@ -113,6 +115,11 @@ int path_init(struct path *p) assert(p->state == STATE_DESTROYED); + p->logger = logging.get("path"); + + new (&p->received) std::bitset<128>; + new (&p->mask) std::bitset<128>; + ret = vlist_init(&p->destinations); if (ret) return ret; @@ -167,8 +174,10 @@ static int path_prepare_poll(struct path *p) p->reader.pfds = (struct pollfd *) realloc(p->reader.pfds, p->reader.nfds * sizeof(struct pollfd)); for (int i = 0; i < m; i++) { - if (fds[i] < 0) - error("Failed to get file descriptor for node %s", node_name(ps->node)); + if (fds[i] < 0) { + p->logger->error("Failed to get file descriptor for node {}", node_name(ps->node)); + return -1; + } /* This slot is only used if it is not masked */ p->reader.pfds[n].events = POLLIN; @@ -187,8 +196,10 @@ static int path_prepare_poll(struct path *p) p->reader.pfds[p->reader.nfds-1].events = POLLIN; p->reader.pfds[p->reader.nfds-1].fd = task_fd(&p->timeout); - if (p->reader.pfds[p->reader.nfds-1].fd < 0) - error("Failed to get file descriptor for timer of path %s", path_name(p)); + if (p->reader.pfds[p->reader.nfds-1].fd < 0) { + p->logger->warn("Failed to get file descriptor for timer of path {}", path_name(p)); + return -1; + } } return 0; @@ -218,9 +229,6 @@ int path_prepare(struct path *p) return ret; } - new (&p->received) std::bitset<128>; - new (&p->mask) std::bitset<128>; - /* Initialize sources */ for (size_t i = 0; i < vlist_length(&p->sources); i++) { struct path_source *ps = (struct path_source *) vlist_at(&p->sources, i); @@ -248,7 +256,7 @@ int path_prepare(struct path *p) if (me->type == MAPPING_TYPE_DATA) { sig = (struct signal *) vlist_at_safe(sigs, me->data.offset + j); if (!sig) { - warning("Failed to create signal description for path %s", path_name(p)); + p->logger->warn("Failed to create signal description for path {}", path_name(p)); continue; } @@ -335,8 +343,10 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) /* Input node(s) */ ret = mapping_list_parse(&sources, json_in, nodes); - if (ret) - error("Failed to parse input mapping of path %s", path_name(p)); + if (ret) { + p->logger->error("Failed to parse input mapping of path {}", path_name(p)); + return -1; + } /* Optional settings */ if (mode) { @@ -344,8 +354,10 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) p->mode = PATH_MODE_ANY; else if (!strcmp(mode, "all")) p->mode = PATH_MODE_ALL; - else - error("Invalid path mode '%s'", mode); + else { + p->logger->error("Invalid path mode '{}'", mode); + return -1; + } } /* Output node(s) */ @@ -383,8 +395,10 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) vlist_push(&p->sources, ps); } - if (!node_is_enabled(ps->node)) - error("Source %s of path %s is not enabled", node_name(ps->node), path_name(p)); + if (!node_is_enabled(ps->node)) { + p->logger->error("Source {} of path {} is not enabled", node_name(ps->node), path_name(p)); + return -1; + } vlist_push(&ps->mappings, me); } @@ -396,8 +410,10 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) pd->node = n; - if (!node_is_enabled(pd->node)) - error("Destination %s of path %s is not enabled", node_name(pd->node), path_name(p)); + if (!node_is_enabled(pd->node)) { + p->logger->error("Destination {} of path {} is not enabled", node_name(pd->node), path_name(p)); + return -1; + } vlist_push(&p->destinations, pd); } @@ -406,8 +422,10 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) json_t *json_entry; size_t i; - if (!json_is_array(json_mask)) - error("The 'mask' setting must be a list of node names"); + if (!json_is_array(json_mask)) { + p->logger->error("The 'mask' setting must be a list of node names"); + return -1; + } json_array_foreach(json_mask, i, json_entry) { const char *name; @@ -415,12 +433,16 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) struct path_source *ps = NULL; name = json_string_value(json_entry); - if (!name) - error("The 'mask' setting must be a list of node names"); + if (!name) { + p->logger->error("The 'mask' setting must be a list of node names"); + return -1; + } node = (struct node *) vlist_lookup(nodes, name); - if (!node) - error("The 'mask' entry '%s' is not a valid node name", name); + if (!node) { + p->logger->error("The 'mask' entry '{}' is not a valid node name", name); + return -1; + } /* Search correspondending path_source to node */ for (size_t i = 0; i < vlist_length(&p->sources); i++) { @@ -432,8 +454,10 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) } } - if (!ps) - error("Node %s is not a source of the path %s", node_name(node), path_name(p)); + if (!ps) { + p->logger->error("Node {} is not a source of the path {}", node_name(node), path_name(p)); + return -1; + } ps->masked = true; } @@ -483,8 +507,10 @@ int path_check(struct path *p) { assert(p->state != STATE_DESTROYED); - if (p->rate < 0) - error("Setting 'rate' of path %s must be a positive number.", path_name(p)); + if (p->rate < 0) { + p->logger->error("Setting 'rate' of path {} must be a positive number.", path_name(p)); + return -1; + } if (p->poll) { if (p->rate <= 0) { @@ -492,38 +518,48 @@ int path_check(struct path *p) for (size_t i = 0; i < vlist_length(&p->sources); i++) { struct path_source *ps = (struct path_source *) vlist_at(&p->sources, i); - if (!node_type(ps->node)->poll_fds) - error("Node %s can not be used in polling mode with path %s", node_name(ps->node), path_name(p)); + if (!node_type(ps->node)->poll_fds) { + p->logger->error("Node {} can not be used in polling mode with path {}", node_name(ps->node), path_name(p)); + return -1; + } } } } else { /* Check that we do not need to multiplex between multiple sources when polling is disabled */ - if (vlist_length(&p->sources) > 1) - error("Setting 'poll' must be active if the path has more than one source"); + if (vlist_length(&p->sources) > 1) { + p->logger->error("Setting 'poll' must be active if the path has more than one source"); + return -1; + } /* Check that we do not use the fixed rate feature when polling is disabled */ - if (p->rate > 0) - error("Setting 'poll' must be activated when used together with setting 'rate'"); + if (p->rate > 0) { + p->logger->error("Setting 'poll' must be activated when used together with setting 'rate'"); + return -1; + } } for (size_t i = 0; i < vlist_length(&p->sources); i++) { struct path_source *ps = (struct path_source *) vlist_at(&p->sources, i); - if (!node_type(ps->node)->read) - error("Node %s is not supported as a source for path %s", node_name(ps->node), path_name(p)); + if (!node_type(ps->node)->read) { + p->logger->error("Node {} is not supported as a source for path {}", node_name(ps->node), path_name(p)); + return -1; + } } for (size_t i = 0; i < vlist_length(&p->destinations); i++) { struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i); - if (!node_type(pd->node)->write) - error("Destiation node %s is not supported as a sink for path %s", node_name(pd->node), path_name(p)); + if (!node_type(pd->node)->write) { + p->logger->error("Destiation node {} is not supported as a sink for path ", node_name(pd->node), path_name(p)); + return -1; + } } if (!IS_POW2(p->queuelen)) { p->queuelen = LOG2_CEIL(p->queuelen); - warning("Queue length should always be a power of 2. Adjusting to %d", p->queuelen); + p->logger->warn("Queue length should always be a power of 2. Adjusting to {}", p->queuelen); } p->state = STATE_CHECKED; @@ -552,7 +588,9 @@ int path_start(struct path *p) break; } - info("Starting path %s: #signals=%zu, #hooks=%zu, #sources=%zu, #destinations=%zu, mode=%s, poll=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, original_sequence_no=%s", + p->logger->info("Starting path {}: #signals={}, #hooks={}, #sources={}, " + "#destinations={}, mode={}, poll={}, mask={:b}, rate={}, " + "enabled={}, reversed={}, queuelen={}, original_sequence_no={}", path_name(p), vlist_length(&p->signals), vlist_length(&p->hooks), @@ -560,7 +598,7 @@ int path_start(struct path *p) vlist_length(&p->destinations), mode, p->poll ? "yes" : "no", - p->mask.to_string().c_str(), + p->mask.to_ullong(), p->rate, path_is_enabled(p) ? "yes" : "no", path_is_reversed(p) ? "yes" : "no", @@ -616,7 +654,7 @@ int path_stop(struct path *p) if (p->state != STATE_STARTED && p->state != STATE_STOPPING) return 0; - info("Stopping path: %s", path_name(p)); + p->logger->info("Stopping path: {}", path_name(p)); if (p->state != STATE_STOPPING) p->state = STATE_STOPPING; @@ -679,7 +717,16 @@ int path_destroy(struct path *p) if (p->rate > 0) task_destroy(&p->timeout); - pool_destroy(&p->pool); + ret = pool_destroy(&p->pool); + if (ret) + return ret; + + using bs = std::bitset<128>; + using lg = spdlog::logger; + + p->received.~bs(); + p->mask.~bs(); + p->logger->~lg(); p->state = STATE_DESTROYED; diff --git a/lib/path_destination.cpp b/lib/path_destination.cpp index fa0d2c032..11f3cc71a 100644 --- a/lib/path_destination.cpp +++ b/lib/path_destination.cpp @@ -57,19 +57,19 @@ void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cn cloned = sample_clone_many(clones, smps, cnt); if (cloned < cnt) - warning("Pool underrun in path %s", path_name(p)); + p->logger->warn("Pool underrun in path {}", path_name(p)); for (size_t i = 0; i < vlist_length(&p->destinations); i++) { struct path_destination *pd = (struct path_destination *) vlist_at(&p->destinations, i); enqueued = queue_push_many(&pd->queue, (void **) clones, cloned); if (enqueued != cnt) - warning("Queue overrun for path %s", path_name(p)); + p->logger->warn("Queue overrun for path {}", path_name(p)); /* Increase reference counter of these samples as they are now also owned by the queue. */ sample_incref_many(clones, cloned); - debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p)); + p->logger->debug("Enqueued {} samples to destination {} of path {}", enqueued, node_name(pd->node), path_name(p)); } sample_decref_many(clones, cloned); @@ -91,20 +91,22 @@ void path_destination_write(struct path_destination *pd, struct path *p) if (allocated == 0) break; else if (allocated < cnt) - debug(LOG_PATH | 5, "Queue underrun for path %s: allocated=%u expected=%u", path_name(p), allocated, cnt); + p->logger->debug("Queue underrun for path {}: allocated={} expected={}", path_name(p), allocated, cnt); - debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", allocated, node_name(pd->node), path_name(p)); + p->logger->debug("Dequeued {} samples from queue of node {} which is part of path {}", allocated, node_name(pd->node), path_name(p)); release = allocated; sent = node_write(pd->node, smps, allocated, &release); - if (sent < 0) - error("Failed to sent %u samples to node %s: reason=%d", cnt, node_name(pd->node), sent); + if (sent < 0) { + p->logger->error("Failed to sent {} samples to node {}: reason={}", cnt, node_name(pd->node), sent); + return; + } else if (sent < allocated) - warning("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated); + p->logger->warn("Partial write to node {}: written={}, expected={}", node_name(pd->node), sent, allocated); released = sample_decref_many(smps, release); - debug(LOG_PATH | 15, "Released %d samples back to memory pool", released); + p->logger->debug("Released {} samples back to memory pool", released); } } diff --git a/lib/path_source.cpp b/lib/path_source.cpp index e4ee715f6..e31fe4721 100644 --- a/lib/path_source.cpp +++ b/lib/path_source.cpp @@ -73,7 +73,7 @@ int path_source_read(struct path_source *ps, struct path *p, int i) /* Fill smps[] free sample blocks from the pool */ allocated = sample_alloc_many(&ps->pool, read_smps, cnt); if (allocated != cnt) - warning("Pool underrun for path source %s", node_name(ps->node)); + p->logger->warn("Pool underrun for path source {}", node_name(ps->node)); /* Read ready samples and store them to blocks pointed by smps[] */ release = allocated; @@ -90,11 +90,13 @@ int path_source_read(struct path_source *ps, struct path *p, int i) enqueued = -1; goto out2; } - else - error("Failed to read samples from node %s", node_name(ps->node)); + else { + p->logger->error("Failed to read samples from node {}", node_name(ps->node)); + goto out2; + } } else if (recv < allocated) - warning("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, allocated); + p->logger->warn("Partial read for path {}: read={}, expected={}", path_name(p), recv, allocated); p->received.set(i); @@ -129,14 +131,14 @@ int path_source_read(struct path_source *ps, struct path *p, int i) sample_copy(p->last_sample, muxed_smps[tomux-1]); - debug(15, "Path %s received = %s", path_name(p), p->received.to_string().c_str()); + p->logger->debug("Path {} received = {}", path_name(p), p->received.to_ullong()); #ifdef WITH_HOOKS toenqueue = hook_list_process(&p->hooks, muxed_smps, tomux); if (toenqueue != tomux) { int skipped = tomux - toenqueue; - debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p)); + p->logger->debug("Hooks skipped {} out of {} samples for path {}", skipped, tomux, path_name(p)); } #else toenqueue = tomux;