diff --git a/include/villas/path_destination.h b/include/villas/path_destination.h index 2a82baa64..42e6b64cd 100644 --- a/include/villas/path_destination.h +++ b/include/villas/path_destination.h @@ -53,8 +53,12 @@ int path_destination_prepare(struct vpath_destination *pd, int queuelen); void path_destination_check(struct vpath_destination *pd); -void path_destination_enqueue(struct vpath *p, const struct sample * const smps[], unsigned cnt); +void path_destination_enqueue(struct vpath *pd, struct sample * const smps[], unsigned cnt); -void path_destination_write(struct vpath_destination *pd, struct vpath *p); +void path_destination_write(struct vpath_destination *pd); + +void path_destination_enqueue_all(struct vpath *p, struct sample * const smps[], unsigned cnt); + +void path_destination_write_all(struct vpath *p); /** @} */ diff --git a/lib/path.cpp b/lib/path.cpp index 4d1427ea2..77564ce6b 100644 --- a/lib/path.cpp +++ b/lib/path.cpp @@ -73,11 +73,7 @@ static void * path_run_single(void *arg) if (ret <= 0) continue; - for (size_t i = 0; i < vlist_length(&p->destinations); i++) { - struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i); - - path_destination_write(pd, p); - } + path_destination_write_all(p); } return nullptr; @@ -111,7 +107,7 @@ static void * path_run_poll(void *arg) p->last_sample->sequence = p->last_sequence++; - path_destination_enqueue(p, &p->last_sample, 1); + path_destination_enqueue_all(p, &p->last_sample, 1); } /* A source is ready to receive samples */ else @@ -119,11 +115,7 @@ static void * path_run_poll(void *arg) } } - for (size_t i = 0; i < vlist_length(&p->destinations); i++) { - struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i); - - path_destination_write(pd, p); - } + path_destination_write_all(p); } return nullptr; diff --git a/lib/path_destination.cpp b/lib/path_destination.cpp index 8f81b9b91..234aa1828 100644 --- a/lib/path_destination.cpp +++ b/lib/path_destination.cpp @@ -64,61 +64,50 @@ int path_destination_destroy(struct vpath_destination *pd) return 0; } -void path_destination_enqueue(struct vpath *p, const struct sample * const smps[], unsigned cnt) +void path_destination_enqueue(struct vpath_destination *pd, struct sample * const smps[], unsigned cnt) { - unsigned enqueued, cloned; + unsigned enqueued; - struct sample *clones[cnt]; + /* Increase reference counter of these samples as they are now also owned by the queue. */ + sample_incref_many(smps, cnt); - cloned = sample_clone_many(clones, smps, cnt); - if (cloned < cnt) - p->logger->warn("Pool underrun in path {}", path_name(p)); + enqueued = queue_push_many(&pd->queue, (void **) smps, cnt); + if (enqueued != cnt) + pd->logger->warn("Queue overrun for path {}", path_name(pd->path)); - for (size_t i = 0; i < vlist_length(&p->destinations); i++) { - struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i); + pd->logger->debug("Enqueued {} samples to destination {} of path {}", enqueued, node_name(pd->node), path_name(pd->path)); - enqueued = queue_push_many(&pd->queue, (void **) clones, cloned); - if (enqueued != cnt) - p->logger->warn("Queue overrun for path {}", path_name(p)); - - /* Increase reference counter of these samples as they are now also owned by the queue. */ - sample_incref_many(clones, cloned); - - p->logger->debug("Enqueued {} samples to destination {} of path {}", enqueued, node_name(pd->node), path_name(p)); - } - - sample_decref_many(clones, cloned); } -void path_destination_write(struct vpath_destination *pd, struct vpath *p) +void path_destination_write(struct vpath_destination *pd) { int cnt = pd->node->out.vectorize; int sent; - int allocated; + int pulled; struct sample *smps[cnt]; /* As long as there are still samples in the queue */ while (1) { - allocated = queue_pull_many(&pd->queue, (void **) smps, cnt); - if (allocated == 0) + pulled = queue_pull_many(&pd->queue, (void **) smps, cnt); + if (pulled == 0) break; - else if (allocated < cnt) - p->logger->debug("Queue underrun for path {}: allocated={} expected={}", path_name(p), allocated, cnt); + else if (pulled < cnt) + pd->logger->debug("Queue underrun for path {}: pulled={} expected={}", path_name(pd->path), pulled, cnt); - p->logger->debug("Dequeued {} samples from queue of node {} which is part of path {}", allocated, node_name(pd->node), path_name(p)); + pd->logger->debug("Pulled {} samples from queue of node {} which is part of path {}", pulled, node_name(pd->node), path_name(pd->path)); - sent = node_write(pd->node, smps, allocated); + sent = node_write(pd->node, smps, pulled, true); if (sent < 0) { - p->logger->error("Failed to sent {} samples to node {}: reason={}", cnt, node_name(pd->node), sent); + pd->logger->error("Failed to sent {} samples to node {}: reason={}", cnt, node_name(pd->node), sent); return; } - else if (sent < allocated) - p->logger->debug("Partial write to node {}: written={}, expected={}", node_name(pd->node), sent, allocated); + else if (sent < pulled) + pd->logger->debug("Partial write to node {}: written={}, expected={}", node_name(pd->node), sent, pulled); - int released = sample_decref_many(smps, allocated); + int released = sample_decref_many(smps, pulled); - p->logger->debug("Released {} samples back to memory pool", released); + pd->logger->debug("Released {} samples back to memory pool", released); } } @@ -130,3 +119,21 @@ void path_destination_check(struct vpath_destination *pd) if (!node_type(pd->node)->write) throw RuntimeError("Destiation node {} is not supported as a sink for path ", node_name(pd->node)); } + +void path_destination_enqueue_all(struct vpath *p, struct sample * const smps[], unsigned cnt) +{ + for (size_t i = 0; i < vlist_length(&p->destinations); i++) { + struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i); + + path_destination_enqueue(pd, smps, cnt); + } +} + +void path_destination_write_all(struct vpath *p) +{ + for (size_t i = 0; i < vlist_length(&p->destinations); i++) { + struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i); + + path_destination_write(pd); + } +}