1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

restructure path_destination

This commit is contained in:
Steffen Vogel 2021-06-19 16:38:21 -04:00
parent 88045390cb
commit af30d9a5f9
3 changed files with 48 additions and 45 deletions

View file

@ -53,8 +53,12 @@ int path_destination_prepare(struct vpath_destination *pd, int queuelen);
void path_destination_check(struct vpath_destination *pd);
void path_destination_enqueue(struct vpath *p, const struct sample * const smps[], unsigned cnt);
void path_destination_enqueue(struct vpath *pd, struct sample * const smps[], unsigned cnt);
void path_destination_write(struct vpath_destination *pd, struct vpath *p);
void path_destination_write(struct vpath_destination *pd);
void path_destination_enqueue_all(struct vpath *p, struct sample * const smps[], unsigned cnt);
void path_destination_write_all(struct vpath *p);
/** @} */

View file

@ -73,11 +73,7 @@ static void * path_run_single(void *arg)
if (ret <= 0)
continue;
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i);
path_destination_write(pd, p);
}
path_destination_write_all(p);
}
return nullptr;
@ -111,7 +107,7 @@ static void * path_run_poll(void *arg)
p->last_sample->sequence = p->last_sequence++;
path_destination_enqueue(p, &p->last_sample, 1);
path_destination_enqueue_all(p, &p->last_sample, 1);
}
/* A source is ready to receive samples */
else
@ -119,11 +115,7 @@ static void * path_run_poll(void *arg)
}
}
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i);
path_destination_write(pd, p);
}
path_destination_write_all(p);
}
return nullptr;

View file

@ -64,61 +64,50 @@ int path_destination_destroy(struct vpath_destination *pd)
return 0;
}
void path_destination_enqueue(struct vpath *p, const struct sample * const smps[], unsigned cnt)
void path_destination_enqueue(struct vpath_destination *pd, struct sample * const smps[], unsigned cnt)
{
unsigned enqueued, cloned;
unsigned enqueued;
struct sample *clones[cnt];
/* Increase reference counter of these samples as they are now also owned by the queue. */
sample_incref_many(smps, cnt);
cloned = sample_clone_many(clones, smps, cnt);
if (cloned < cnt)
p->logger->warn("Pool underrun in path {}", path_name(p));
enqueued = queue_push_many(&pd->queue, (void **) smps, cnt);
if (enqueued != cnt)
pd->logger->warn("Queue overrun for path {}", path_name(pd->path));
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i);
pd->logger->debug("Enqueued {} samples to destination {} of path {}", enqueued, node_name(pd->node), path_name(pd->path));
enqueued = queue_push_many(&pd->queue, (void **) clones, cloned);
if (enqueued != cnt)
p->logger->warn("Queue overrun for path {}", path_name(p));
/* Increase reference counter of these samples as they are now also owned by the queue. */
sample_incref_many(clones, cloned);
p->logger->debug("Enqueued {} samples to destination {} of path {}", enqueued, node_name(pd->node), path_name(p));
}
sample_decref_many(clones, cloned);
}
void path_destination_write(struct vpath_destination *pd, struct vpath *p)
void path_destination_write(struct vpath_destination *pd)
{
int cnt = pd->node->out.vectorize;
int sent;
int allocated;
int pulled;
struct sample *smps[cnt];
/* As long as there are still samples in the queue */
while (1) {
allocated = queue_pull_many(&pd->queue, (void **) smps, cnt);
if (allocated == 0)
pulled = queue_pull_many(&pd->queue, (void **) smps, cnt);
if (pulled == 0)
break;
else if (allocated < cnt)
p->logger->debug("Queue underrun for path {}: allocated={} expected={}", path_name(p), allocated, cnt);
else if (pulled < cnt)
pd->logger->debug("Queue underrun for path {}: pulled={} expected={}", path_name(pd->path), pulled, cnt);
p->logger->debug("Dequeued {} samples from queue of node {} which is part of path {}", allocated, node_name(pd->node), path_name(p));
pd->logger->debug("Pulled {} samples from queue of node {} which is part of path {}", pulled, node_name(pd->node), path_name(pd->path));
sent = node_write(pd->node, smps, allocated);
sent = node_write(pd->node, smps, pulled, true);
if (sent < 0) {
p->logger->error("Failed to sent {} samples to node {}: reason={}", cnt, node_name(pd->node), sent);
pd->logger->error("Failed to sent {} samples to node {}: reason={}", cnt, node_name(pd->node), sent);
return;
}
else if (sent < allocated)
p->logger->debug("Partial write to node {}: written={}, expected={}", node_name(pd->node), sent, allocated);
else if (sent < pulled)
pd->logger->debug("Partial write to node {}: written={}, expected={}", node_name(pd->node), sent, pulled);
int released = sample_decref_many(smps, allocated);
int released = sample_decref_many(smps, pulled);
p->logger->debug("Released {} samples back to memory pool", released);
pd->logger->debug("Released {} samples back to memory pool", released);
}
}
@ -130,3 +119,21 @@ void path_destination_check(struct vpath_destination *pd)
if (!node_type(pd->node)->write)
throw RuntimeError("Destiation node {} is not supported as a sink for path ", node_name(pd->node));
}
void path_destination_enqueue_all(struct vpath *p, struct sample * const smps[], unsigned cnt)
{
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i);
path_destination_enqueue(pd, smps, cnt);
}
}
void path_destination_write_all(struct vpath *p)
{
for (size_t i = 0; i < vlist_length(&p->destinations); i++) {
struct vpath_destination *pd = (struct vpath_destination *) vlist_at(&p->destinations, i);
path_destination_write(pd);
}
}