diff --git a/lib/path.c b/lib/path.c index 61b4ed67a..ffff6ef90 100644 --- a/lib/path.c +++ b/lib/path.c @@ -61,11 +61,8 @@ static void path_read(struct path *p) recv = node_read(ps->node, smps, ready); if (recv < 0) error("Failed to receive message from node %s", node_name(ps->node)); - else if (recv < ready) { + else if (recv < ready) warn("Partial read for path %s: read=%u expected=%u", path_name(p), recv, ready); - /* Free samples that weren't written to */ - sample_free(smps+recv, ready-recv); - } /* Run preprocessing hooks for vector of samples */ enqueue = hook_read_list(&p->hooks, smps, recv); @@ -75,10 +72,9 @@ static void path_read(struct path *p) if (p->stats) stats_update(p->stats->delta, STATS_SKIPPED, recv - enqueue); } - + /* Keep track of the lowest index that wasn't enqueued; * all following samples must be freed here */ - int refd = 0; for (size_t i = 0; i < list_length(&p->destinations); i++) { struct path_destination *pd = list_at(&p->destinations, i); @@ -86,14 +82,13 @@ static void path_read(struct path *p) if (enqueue != enqueued) warn("Queue overrun for path %s", path_name(p)); - if (refd < enqueued) - refd = enqueued; + /* Increase reference counter of these samples as they are now also owned by the queue. */ + sample_get_many(smps, enqueued); debug(LOG_PATH | 15, "Enqueued %u samples from %s to queue of %s", enqueued, node_name(ps->node), node_name(pd->node)); } - /* Release those samples which have not been pushed into a queue */ - sample_free(smps + refd, ready - refd); + sample_put_many(smps, ready); } static void path_write(struct path *p)