1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

path: special case for single-source paths

This commit is contained in:
Steffen Vogel 2018-05-08 08:16:14 +02:00
parent fa66861d2d
commit 915a7568bf

View file

@ -39,6 +39,9 @@
#include <villas/stats.h>
#include <villas/node.h>
/* Forward declaration */
static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt);
static int path_source_init(struct path_source *ps)
{
int ret;
@ -65,6 +68,71 @@ static int path_source_destroy(struct path_source *ps)
return 0;
}
static void path_source_read(struct path_source *ps, struct path *p, int i)
{
int recv, tomux, ready, cnt;
cnt = ps->node->vectorize;
struct sample *read_smps[cnt];
struct sample *muxed_smps[cnt];
struct sample **tomux_smps;
/* Fill smps[] free sample blocks from the pool */
ready = sample_alloc_many(&ps->pool, read_smps, cnt);
if (ready != cnt)
warn("Pool underrun for path source %s", node_name(ps->node));
/* Read ready samples and store them to blocks pointed by smps[] */
recv = node_read(ps->node, read_smps, ready);
if (recv == 0)
goto out2;
else if (recv < 0)
error("Failed to read samples from node %s", node_name(ps->node));
else if (recv < ready)
warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready);
bitset_set(&p->received, i);
if (p->mode == PATH_MODE_ANY) { /* Mux all samples */
tomux_smps = read_smps;
tomux = recv;
}
else { /* Mux only last sample and discard others */
tomux_smps = read_smps + recv - 1;
tomux = 1;
}
for (int i = 0; i < tomux; i++) {
muxed_smps[i] = i == 0
? sample_clone(p->last_sample)
: sample_clone(muxed_smps[i-1]);
muxed_smps[i]->sequence = p->last_sequence + 1;
mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL);
}
sample_copy(p->last_sample, muxed_smps[tomux-1]);
debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received));
if (bitset_test(&p->mask, i)) {
/* Check if we received an update from all nodes/ */
if ((p->mode == PATH_MODE_ANY) ||
(p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received)))
{
path_destination_enqueue(p, muxed_smps, tomux);
/* Reset bitset of updated nodes */
bitset_clear_all(&p->received);
}
}
sample_put_many(muxed_smps, tomux);
out2: sample_put_many(read_smps, ready);
}
static int path_destination_init(struct path_destination *pd, int queuelen)
{
int ret;
@ -113,11 +181,64 @@ static void path_destination_enqueue(struct path *p, struct sample *smps[], unsi
sample_put_many(clones, cloned);
}
/** Main thread function per path: read samples -> write samples */
static void * path_run(void *arg)
static void path_destination_write(struct path_destination *pd, struct path *p)
{
int cnt = pd->node->vectorize;
int sent;
int available;
int released;
struct sample *smps[cnt];
/* As long as there are still samples in the queue */
while (1) {
available = queue_pull_many(&pd->queue, (void **) smps, cnt);
if (available == 0)
break;
else if (available < cnt)
debug(LOG_PATH | 5, "Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt);
debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p));
sent = node_write(pd->node, smps, available);
if (sent < 0)
error("Failed to sent %u samples to node %s", cnt, node_name(pd->node));
else if (sent < available)
warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, available);
released = sample_put_many(smps, sent);
debug(LOG_PATH | 15, "Released %d samples back to memory pool", released);
}
}
static void * path_run_single(void *arg)
{
int ret, recv, tomux, ready, cnt;
struct path *p = arg;
struct path_source *ps = (struct path_source *) list_at(&p->sources, 0);
debug(1, "Started path %s in single mode", path_name(p));
for (;;) {
path_source_read(ps, p, 0);
for (size_t i = 0; i < list_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) list_at(&p->destinations, i);
path_destination_write(pd, p);
}
}
return NULL;
}
/** Main thread function per path: read samples -> write samples */
static void * path_run_poll(void *arg)
{
int ret;
struct path *p = arg;
debug(1, "Started path %s in polling mode", path_name(p));
for (;;) {
ret = poll(p->reader.pfds, p->reader.nfds, -1);
@ -138,65 +259,7 @@ static void * path_run(void *arg)
}
/* A source is ready to receive samples */
else {
cnt = ps->node->vectorize;
struct sample *read_smps[cnt];
struct sample *muxed_smps[cnt];
struct sample **tomux_smps;
/* Fill smps[] free sample blocks from the pool */
ready = sample_alloc_many(&ps->pool, read_smps, cnt);
if (ready != cnt)
warn("Pool underrun for path source %s", node_name(ps->node));
/* Read ready samples and store them to blocks pointed by smps[] */
recv = node_read(ps->node, read_smps, ready);
if (recv == 0)
goto out2;
else if (recv < 0)
error("Failed to read samples from node %s", node_name(ps->node));
else if (recv < ready)
warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready);
bitset_set(&p->received, i);
if (p->mode == PATH_MODE_ANY) { /* Mux all samples */
tomux_smps = read_smps;
tomux = recv;
}
else { /* Mux only last sample and discard others */
tomux_smps = read_smps + recv - 1;
tomux = 1;
}
for (int i = 0; i < tomux; i++) {
muxed_smps[i] = i == 0
? sample_clone(p->last_sample)
: sample_clone(muxed_smps[i-1]);
muxed_smps[i]->sequence = p->last_sequence++;
mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL);
}
sample_copy(p->last_sample, muxed_smps[tomux-1]);
debug(15, "Path %s received = %s", path_name(p), bitset_dump(&p->received));
if (bitset_test(&p->mask, i)) {
/* Check if we received an update from all nodes/ */
if ((p->mode == PATH_MODE_ANY) ||
(p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received)))
{
path_destination_enqueue(p, muxed_smps, tomux);
/* Reset bitset of updated nodes */
bitset_clear_all(&p->received);
}
}
sample_put_many(muxed_smps, tomux);
out2: sample_put_many(read_smps, ready);
path_source_read(ps, p, i);
}
}
}
@ -204,33 +267,7 @@ out2: sample_put_many(read_smps, ready);
for (size_t i = 0; i < list_length(&p->destinations); i++) {
struct path_destination *pd = (struct path_destination *) list_at(&p->destinations, i);
int cnt = pd->node->vectorize;
int sent;
int available;
int released;
struct sample *smps[cnt];
/* As long as there are still samples in the queue */
while (1) {
available = queue_pull_many(&pd->queue, (void **) smps, cnt);
if (available == 0)
break;
else if (available < cnt)
debug(LOG_PATH | 5, "Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt);
debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p));
sent = node_write(pd->node, smps, available);
if (sent < 0)
error("Failed to sent %u samples to node %s", cnt, node_name(pd->node));
else if (sent < available)
warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, available);
released = sample_put_many(smps, sent);
debug(LOG_PATH | 15, "Released %d samples back to memory pool", released);
}
path_destination_write(pd, p);
}
}
@ -633,8 +670,18 @@ int path_start(struct path *p)
}
}
/* Start one thread per path for sending to destinations */
ret = pthread_create(&p->tid, NULL, &path_run, p);
/* Start one thread per path for sending to destinations
*
* Special case: If the path only has a single source and this source
* does not offer a file descriptor for polling, we will use a special
* thread function.
*/
struct path_source *ps0 = (struct path_source *) list_at(&p->sources, 0);
if (list_length(&p->sources) == 1 && node_fd(ps->node) == -1)
ret = pthread_create(&p->tid, NULL, &path_run_single, p);
else
ret = pthread_create(&p->tid, NULL, &path_run_poll, p);
if (ret)
return ret;