diff --git a/lib/path.c b/lib/path.c index c4a512416..9edb4aadb 100644 --- a/lib/path.c +++ b/lib/path.c @@ -39,6 +39,9 @@ #include #include +/* Forward declaration */ +static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt); + static int path_source_init(struct path_source *ps) { int ret; @@ -65,6 +68,71 @@ static int path_source_destroy(struct path_source *ps) return 0; } +static void path_source_read(struct path_source *ps, struct path *p, int i) +{ + int recv, tomux, ready, cnt; + + cnt = ps->node->vectorize; + + struct sample *read_smps[cnt]; + struct sample *muxed_smps[cnt]; + struct sample **tomux_smps; + + /* Fill smps[] free sample blocks from the pool */ + ready = sample_alloc_many(&ps->pool, read_smps, cnt); + if (ready != cnt) + warn("Pool underrun for path source %s", node_name(ps->node)); + + /* Read ready samples and store them to blocks pointed by smps[] */ + recv = node_read(ps->node, read_smps, ready); + if (recv == 0) + goto out2; + else if (recv < 0) + error("Failed to read samples from node %s", node_name(ps->node)); + else if (recv < ready) + warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready); + + bitset_set(&p->received, i); + + if (p->mode == PATH_MODE_ANY) { /* Mux all samples */ + tomux_smps = read_smps; + tomux = recv; + } + else { /* Mux only last sample and discard others */ + tomux_smps = read_smps + recv - 1; + tomux = 1; + } + + for (int i = 0; i < tomux; i++) { + muxed_smps[i] = i == 0 + ? sample_clone(p->last_sample) + : sample_clone(muxed_smps[i-1]); + + muxed_smps[i]->sequence = p->last_sequence + 1; + + mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL); + } + + sample_copy(p->last_sample, muxed_smps[tomux-1]); + + debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received)); + + if (bitset_test(&p->mask, i)) { + /* Check if we received an update from all nodes/ */ + if ((p->mode == PATH_MODE_ANY) || + (p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) + { + path_destination_enqueue(p, muxed_smps, tomux); + + /* Reset bitset of updated nodes */ + bitset_clear_all(&p->received); + } + } + + sample_put_many(muxed_smps, tomux); +out2: sample_put_many(read_smps, ready); +} + static int path_destination_init(struct path_destination *pd, int queuelen) { int ret; @@ -113,11 +181,64 @@ static void path_destination_enqueue(struct path *p, struct sample *smps[], unsi sample_put_many(clones, cloned); } -/** Main thread function per path: read samples -> write samples */ -static void * path_run(void *arg) +static void path_destination_write(struct path_destination *pd, struct path *p) +{ + int cnt = pd->node->vectorize; + int sent; + int available; + int released; + + struct sample *smps[cnt]; + + /* As long as there are still samples in the queue */ + while (1) { + available = queue_pull_many(&pd->queue, (void **) smps, cnt); + if (available == 0) + break; + else if (available < cnt) + debug(LOG_PATH | 5, "Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); + + debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p)); + + sent = node_write(pd->node, smps, available); + if (sent < 0) + error("Failed to sent %u samples to node %s", cnt, node_name(pd->node)); + else if (sent < available) + warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, available); + + released = sample_put_many(smps, sent); + + debug(LOG_PATH | 15, "Released %d samples back to memory pool", released); + } +} + +static void * path_run_single(void *arg) { - int ret, recv, tomux, ready, cnt; struct path *p = arg; + struct path_source *ps = (struct path_source *) list_at(&p->sources, 0); + + debug(1, "Started path %s in single mode", path_name(p)); + + for (;;) { + path_source_read(ps, p, 0); + + for (size_t i = 0; i < list_length(&p->destinations); i++) { + struct path_destination *pd = (struct path_destination *) list_at(&p->destinations, i); + + path_destination_write(pd, p); + } + } + + return NULL; +} + +/** Main thread function per path: read samples -> write samples */ +static void * path_run_poll(void *arg) +{ + int ret; + struct path *p = arg; + + debug(1, "Started path %s in polling mode", path_name(p)); for (;;) { ret = poll(p->reader.pfds, p->reader.nfds, -1); @@ -138,65 +259,7 @@ static void * path_run(void *arg) } /* A source is ready to receive samples */ else { - cnt = ps->node->vectorize; - - struct sample *read_smps[cnt]; - struct sample *muxed_smps[cnt]; - struct sample **tomux_smps; - - /* Fill smps[] free sample blocks from the pool */ - ready = sample_alloc_many(&ps->pool, read_smps, cnt); - if (ready != cnt) - warn("Pool underrun for path source %s", node_name(ps->node)); - - /* Read ready samples and store them to blocks pointed by smps[] */ - recv = node_read(ps->node, read_smps, ready); - if (recv == 0) - goto out2; - else if (recv < 0) - error("Failed to read samples from node %s", node_name(ps->node)); - else if (recv < ready) - warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready); - - bitset_set(&p->received, i); - - if (p->mode == PATH_MODE_ANY) { /* Mux all samples */ - tomux_smps = read_smps; - tomux = recv; - } - else { /* Mux only last sample and discard others */ - tomux_smps = read_smps + recv - 1; - tomux = 1; - } - - for (int i = 0; i < tomux; i++) { - muxed_smps[i] = i == 0 - ? sample_clone(p->last_sample) - : sample_clone(muxed_smps[i-1]); - - muxed_smps[i]->sequence = p->last_sequence++; - - mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL); - } - - sample_copy(p->last_sample, muxed_smps[tomux-1]); - - debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received)); - - if (bitset_test(&p->mask, i)) { - /* Check if we received an update from all nodes/ */ - if ((p->mode == PATH_MODE_ANY) || - (p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) - { - path_destination_enqueue(p, muxed_smps, tomux); - - /* Reset bitset of updated nodes */ - bitset_clear_all(&p->received); - } - } - - sample_put_many(muxed_smps, tomux); -out2: sample_put_many(read_smps, ready); + path_source_read(ps, p, i); } } } @@ -204,33 +267,7 @@ out2: sample_put_many(read_smps, ready); for (size_t i = 0; i < list_length(&p->destinations); i++) { struct path_destination *pd = (struct path_destination *) list_at(&p->destinations, i); - int cnt = pd->node->vectorize; - int sent; - int available; - int released; - - struct sample *smps[cnt]; - - /* As long as there are still samples in the queue */ - while (1) { - available = queue_pull_many(&pd->queue, (void **) smps, cnt); - if (available == 0) - break; - else if (available < cnt) - debug(LOG_PATH | 5, "Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); - - debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p)); - - sent = node_write(pd->node, smps, available); - if (sent < 0) - error("Failed to sent %u samples to node %s", cnt, node_name(pd->node)); - else if (sent < available) - warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, available); - - released = sample_put_many(smps, sent); - - debug(LOG_PATH | 15, "Released %d samples back to memory pool", released); - } + path_destination_write(pd, p); } } @@ -633,8 +670,18 @@ int path_start(struct path *p) } } - /* Start one thread per path for sending to destinations */ - ret = pthread_create(&p->tid, NULL, &path_run, p); + /* Start one thread per path for sending to destinations + * + * Special case: If the path only has a single source and this source + * does not offer a file descriptor for polling, we will use a special + * thread function. + */ + struct path_source *ps0 = (struct path_source *) list_at(&p->sources, 0); + if (list_length(&p->sources) == 1 && node_fd(ps->node) == -1) + ret = pthread_create(&p->tid, NULL, &path_run_single, p); + else + ret = pthread_create(&p->tid, NULL, &path_run_poll, p); + if (ret) return ret;