diff --git a/server/src/path.c b/server/src/path.c index 0ae8796a6..ad00f3252 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -23,8 +23,24 @@ /** Linked list of paths. */ struct list paths; +static void path_write(struct path *p) +{ + FOREACH(&p->destinations, it) { + int sent = node_write( + it->node, /* Destination node */ + p->pool, /* Pool of received messages */ + p->poolsize, /* Size of the pool */ + p->received-it->node->combine, /* Index of the first message which should be sent */ + it->node->combine /* Number of messages which should be sent */ + ); + + debug(1, "Sent %u messages to node '%s'", sent, it->node->name); + p->sent += sent; + } +} + /** Send messages asynchronously */ -static void * path_send(void *arg) +static void * path_run_async(void *arg) { struct path *p = arg; struct itimerspec its = { @@ -39,14 +55,9 @@ static void * path_send(void *arg) if (timerfd_settime(p->tfd, 0, &its, NULL)) serror("Failed to start timer"); - - FOREACH(&p->destinations, it) - p->sent += node_write(p->in, p->pool, p->poolsize, p->received, p->in->combine); - - debug(10, "Sent %u messages to %u destination nodes", p->in->combine, p->destinations.length); - } /* Block until 1/p->rate seconds elapsed */ while (timerfd_wait(p->tfd)) + path_write(p); return NULL; } @@ -116,12 +127,8 @@ skip: while (1) { } /* At fixed rate mode, messages are send by another thread */ - if (!p->rate) { - FOREACH(&p->destinations, it) - node_write(p->in, p->pool, p->poolsize, p->received, p->in->combine); - - p->sent++; - } + if (!p->rate) + path_write(p); } return NULL; @@ -136,7 +143,7 @@ int path_start(struct path *p) /* At fixed rate mode, we start another thread for sending */ if (p->rate) - pthread_create(&p->sent_tid, NULL, &path_send, p); + pthread_create(&p->sent_tid, NULL, &path_run_async, p); return pthread_create(&p->recv_tid, NULL, &path_run, p); }