/** Path destination * * @author Steffen Vogel * @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC * @license Apache 2.0 *********************************************************************************/ #include #include #include #include #include #include #include using namespace villas; using namespace villas::node; PathDestination::PathDestination(Path *p, Node *n) : node(n), path(p) { queue.state = State::DESTROYED; } PathDestination::~PathDestination() { int ret __attribute__((unused)); ret = queue_destroy(&queue); } int PathDestination::prepare(int queuelen) { int ret; ret = queue_init(&queue, queuelen); if (ret) return ret; return 0; } void PathDestination::enqueueAll(Path *p, const struct Sample * const smps[], unsigned cnt) { unsigned enqueued, cloned; struct Sample *clones[cnt]; cloned = sample_clone_many(clones, smps, cnt); if (cloned < cnt) p->logger->warn("Pool underrun in path {}", *p); for (auto pd : p->destinations) { enqueued = queue_push_many(&pd->queue, (void **) clones, cloned); if (enqueued != cnt) p->logger->warn("Queue overrun for path {}", *p); /* Increase reference counter of these samples as they are now also owned by the queue. */ sample_incref_many(clones, cloned); p->logger->debug("Enqueued {} samples to destination {} of path {}", enqueued, *pd->node, *p); } sample_decref_many(clones, cloned); } void PathDestination::write() { int cnt = node->out.vectorize; int sent; int allocated; struct Sample *smps[cnt]; /* As long as there are still samples in the queue */ while (true) { allocated = queue_pull_many(&queue, (void **) smps, cnt); if (allocated == 0) break; else if (allocated < cnt) path->logger->debug("Queue underrun for path {}: allocated={} expected={}", *path, allocated, cnt); path->logger->debug("Dequeued {} samples from queue of node {} which is part of path {}", allocated, *node, *path); sent = node->write(smps, allocated); if (sent < 0) { path->logger->error("Failed to sent {} samples to node {}: reason={}", cnt, *node, sent); return; } else if (sent < allocated) path->logger->debug("Partial write to node {}: written={}, expected={}", *node, sent, allocated); int released = sample_decref_many(smps, allocated); path->logger->debug("Released {} samples back to memory pool", released); } } void PathDestination::check() { if (!node->isEnabled()) throw RuntimeError("Destination {} is not enabled", *node); if (!(node->getFactory()->getFlags() & (int) NodeFactory::Flags::SUPPORTS_WRITE)) throw RuntimeError("Destination node {} is not supported as a sink for path ", *node); }