mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
merging path_write() into path_run()
This commit is contained in:
parent
69a16b2ac7
commit
de1058cbd6
1 changed files with 28 additions and 33 deletions
61
lib/path.c
61
lib/path.c
|
@ -19,38 +19,6 @@
|
|||
#include "pool.h"
|
||||
#include "queue.h"
|
||||
|
||||
static void path_write(struct path *p, bool resend)
|
||||
{
|
||||
list_foreach(struct node *n, &p->destinations) {
|
||||
int cnt = n->vectorize;
|
||||
int sent, tosend, available, released;
|
||||
struct sample *smps[n->vectorize];
|
||||
|
||||
available = queue_pull_many(&p->queue, (void **) smps, cnt);
|
||||
if (available < cnt)
|
||||
warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt);
|
||||
|
||||
if (available == 0)
|
||||
continue;
|
||||
|
||||
tosend = hook_run(p, smps, available, HOOK_WRITE);
|
||||
if (tosend == 0)
|
||||
continue;
|
||||
|
||||
sent = node_write(n, smps, tosend);
|
||||
if (sent < 0)
|
||||
error("Failed to sent %u samples to node %s", cnt, node_name(n));
|
||||
else if (sent < tosend)
|
||||
warn("Partial write to node %s", node_name(n));
|
||||
|
||||
debug(DBG_PATH | 15, "Sent %u messages to node %s", sent, node_name(n));
|
||||
|
||||
released = pool_put_many(&p->pool, (void **) smps, sent);
|
||||
if (sent != released)
|
||||
warn("Failed to release %u samples to pool for path %s", sent - released, path_name(p));
|
||||
}
|
||||
}
|
||||
|
||||
/** Main thread function per path: receive -> sent messages */
|
||||
static void * path_run(void *arg)
|
||||
{
|
||||
|
@ -91,7 +59,34 @@ static void * path_run(void *arg)
|
|||
|
||||
debug(DBG_PATH | 3, "Enqueuing %u samples to queue of path %s", enqueue, path_name(p));
|
||||
|
||||
path_write(p);
|
||||
list_foreach(struct node *n, &p->destinations) {
|
||||
int cnt = n->vectorize;
|
||||
int sent, tosend, available, released;
|
||||
struct sample *smps[n->vectorize];
|
||||
|
||||
available = queue_pull_many(&p->queue, (void **) smps, cnt);
|
||||
if (available < cnt)
|
||||
warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt);
|
||||
|
||||
if (available == 0)
|
||||
continue;
|
||||
|
||||
tosend = hook_run(p, smps, available, HOOK_WRITE);
|
||||
if (tosend == 0)
|
||||
continue;
|
||||
|
||||
sent = node_write(n, smps, tosend);
|
||||
if (sent < 0)
|
||||
error("Failed to sent %u samples to node %s", cnt, node_name(n));
|
||||
else if (sent < tosend)
|
||||
warn("Partial write to node %s", node_name(n));
|
||||
|
||||
debug(DBG_PATH | 15, "Sent %u messages to node %s", sent, node_name(n));
|
||||
|
||||
released = pool_put_many(&p->pool, (void **) smps, sent);
|
||||
if (sent != released)
|
||||
warn("Failed to release %u samples to pool for path %s", sent - released, path_name(p));
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
|
Loading…
Add table
Reference in a new issue