diff --git a/lib/path.c b/lib/path.c index 57b5ca1a1..7a259871e 100644 --- a/lib/path.c +++ b/lib/path.c @@ -24,7 +24,7 @@ static void path_read(struct path *p) int recv; int enqueue; int enqueued; - int ready = 0; /**< Number of blocks in smps[] which are allocated and ready to be used by node_read(). */ + int ready; /**< Number of blocks in smps[] which are allocated and ready to be used by node_read(). */ struct path_source *ps = p->source; @@ -33,7 +33,7 @@ static void path_read(struct path *p) struct sample *smps[cnt]; /* Fill smps[] free sample blocks from the pool */ - ready += sample_alloc(&ps->pool, smps, cnt - ready); + ready = sample_alloc(&ps->pool, smps, cnt); if (ready != cnt) warn("Pool underrun for path %s", path_name(p)); @@ -41,8 +41,11 @@ static void path_read(struct path *p) recv = node_read(ps->node, smps, ready); if (recv < 0) error("Failed to receive message from node %s", node_name(ps->node)); - else if (recv < ready) + else if (recv < ready) { warn("Partial read for path %s: read=%u expected=%u", path_name(p), recv, ready); + /* Free samples that weren't written to */ + sample_free(smps+recv, ready-recv); + } debug(DBG_PATH | 15, "Received %u messages from node %s", recv, node_name(ps->node)); @@ -54,16 +57,23 @@ static void path_read(struct path *p) stats_update(p->stats->delta, STATS_SKIPPED, recv - enqueue); } + /* Keep track of the lowest index that wasn't enqueued; + * all following samples must be freed here */ + int refd = 0; list_foreach(struct path_destination *pd, &p->destinations) { enqueued = queue_push_many(&pd->queue, (void **) smps, enqueue); if (enqueue != enqueued) warn("Queue overrun for path %s", path_name(p)); - for (int i = 0; i < enqueued; i++) + for (int i = 0; i < enqueued; i++) { sample_get(smps[i]); /* increase reference count */ + refd = i; + } debug(DBG_PATH | 15, "Enqueued %u samples from %s to queue of %s", enqueued, node_name(ps->node), node_name(pd->node)); } + if (refd != recv-1) + sample_free(smps+refd+1, recv-refd-1); } static void path_write(struct path *p) @@ -376,4 +386,4 @@ int path_parse(struct path *p, config_setting_t *cfg, struct list *nodes, struct list_destroy(&destinations, NULL, false); return 0; -} \ No newline at end of file +}