1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

merged asynchronous (fixed-rate) and synchronous sending routines in a new function

This commit is contained in:
Steffen Vogel 2015-05-06 13:22:22 +02:00
parent 06b7b1ec92
commit c9e805f6fa

View file

@ -23,8 +23,24 @@
/** Linked list of paths. */
struct list paths;
static void path_write(struct path *p)
{
FOREACH(&p->destinations, it) {
int sent = node_write(
it->node, /* Destination node */
p->pool, /* Pool of received messages */
p->poolsize, /* Size of the pool */
p->received-it->node->combine, /* Index of the first message which should be sent */
it->node->combine /* Number of messages which should be sent */
);
debug(1, "Sent %u messages to node '%s'", sent, it->node->name);
p->sent += sent;
}
}
/** Send messages asynchronously */
static void * path_send(void *arg)
static void * path_run_async(void *arg)
{
struct path *p = arg;
struct itimerspec its = {
@ -39,14 +55,9 @@ static void * path_send(void *arg)
if (timerfd_settime(p->tfd, 0, &its, NULL))
serror("Failed to start timer");
FOREACH(&p->destinations, it)
p->sent += node_write(p->in, p->pool, p->poolsize, p->received, p->in->combine);
debug(10, "Sent %u messages to %u destination nodes", p->in->combine, p->destinations.length);
}
/* Block until 1/p->rate seconds elapsed */
while (timerfd_wait(p->tfd))
path_write(p);
return NULL;
}
@ -116,12 +127,8 @@ skip: while (1) {
}
/* At fixed rate mode, messages are send by another thread */
if (!p->rate) {
FOREACH(&p->destinations, it)
node_write(p->in, p->pool, p->poolsize, p->received, p->in->combine);
p->sent++;
}
if (!p->rate)
path_write(p);
}
return NULL;
@ -136,7 +143,7 @@ int path_start(struct path *p)
/* At fixed rate mode, we start another thread for sending */
if (p->rate)
pthread_create(&p->sent_tid, NULL, &path_send, p);
pthread_create(&p->sent_tid, NULL, &path_run_async, p);
return pthread_create(&p->recv_tid, NULL, &path_run, p);
}