diff --git a/lib/path.c b/lib/path.c index dde5b0e0e..f94feeea3 100644 --- a/lib/path.c +++ b/lib/path.c @@ -19,38 +19,6 @@ #include "pool.h" #include "queue.h" -static void path_write(struct path *p, bool resend) -{ - list_foreach(struct node *n, &p->destinations) { - int cnt = n->vectorize; - int sent, tosend, available, released; - struct sample *smps[n->vectorize]; - - available = queue_pull_many(&p->queue, (void **) smps, cnt); - if (available < cnt) - warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); - - if (available == 0) - continue; - - tosend = hook_run(p, smps, available, HOOK_WRITE); - if (tosend == 0) - continue; - - sent = node_write(n, smps, tosend); - if (sent < 0) - error("Failed to sent %u samples to node %s", cnt, node_name(n)); - else if (sent < tosend) - warn("Partial write to node %s", node_name(n)); - - debug(DBG_PATH | 15, "Sent %u messages to node %s", sent, node_name(n)); - - released = pool_put_many(&p->pool, (void **) smps, sent); - if (sent != released) - warn("Failed to release %u samples to pool for path %s", sent - released, path_name(p)); - } -} - /** Main thread function per path: receive -> sent messages */ static void * path_run(void *arg) { @@ -91,7 +59,34 @@ static void * path_run(void *arg) debug(DBG_PATH | 3, "Enqueuing %u samples to queue of path %s", enqueue, path_name(p)); - path_write(p); + list_foreach(struct node *n, &p->destinations) { + int cnt = n->vectorize; + int sent, tosend, available, released; + struct sample *smps[n->vectorize]; + + available = queue_pull_many(&p->queue, (void **) smps, cnt); + if (available < cnt) + warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); + + if (available == 0) + continue; + + tosend = hook_run(p, smps, available, HOOK_WRITE); + if (tosend == 0) + continue; + + sent = node_write(n, smps, tosend); + if (sent < 0) + error("Failed to sent %u samples to node %s", cnt, node_name(n)); + else if (sent < tosend) + warn("Partial write to node %s", node_name(n)); + + debug(DBG_PATH | 15, "Sent %u messages to node %s", sent, node_name(n)); + + released = pool_put_many(&p->pool, (void **) smps, sent); + if (sent != released) + warn("Failed to release %u samples to pool for path %s", sent - released, path_name(p)); + } } return NULL;