diff --git a/server/include/utils.h b/server/include/utils.h index 239904ccf..e91bfeab9 100644 --- a/server/include/utils.h +++ b/server/include/utils.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -99,6 +100,13 @@ cpu_set_t to_cpu_set(int set); /** Allocate and initialize memory. */ void * alloc(size_t bytes); +/** Wait until timer elapsed + * + * @retval 0 An error occured. Maybe the timer was stopped. + * @retval >0 The nummer of runs this timer already fired. + */ +uint64_t timerfd_wait(int fd); + /** Get delta between two timespec structs */ double timespec_delta(struct timespec *start, struct timespec *end); diff --git a/server/src/file.c b/server/src/file.c index 6e7a68657..701737ac3 100644 --- a/server/src/file.c +++ b/server/src/file.c @@ -98,15 +98,15 @@ int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt { int i = 0; struct file *f = n->file; - uint64_t runs; if (f->in) { - read(f->tfd, &runs, sizeof(runs)); /* blocking for 1/f->rate seconds */ - - for (i=0; irate seconds */ + if (timerfd_wait(f->tfd)) { + for (i=0; iin, m); + msg_fscan(f->in, m); + } } } else diff --git a/server/src/path.c b/server/src/path.c index 192f45633..0ae8796a6 100644 --- a/server/src/path.c +++ b/server/src/path.c @@ -27,10 +27,6 @@ struct list paths; static void * path_send(void *arg) { struct path *p = arg; - - int ret; - uint64_t runs; - struct itimerspec its = { .it_interval = timespec_rate(p->rate), .it_value = { 1, 0 } @@ -40,19 +36,17 @@ static void * path_send(void *arg) if (p->tfd < 0) serror("Failed to create timer"); - ret = timerfd_settime(p->tfd, 0, &its, NULL); - if (ret) + if (timerfd_settime(p->tfd, 0, &its, NULL)) serror("Failed to start timer"); - while (1) { - /* Block until 1/p->rate seconds elapsed */ - read(p->tfd, &runs, sizeof(runs)); FOREACH(&p->destinations, it) p->sent += node_write(p->in, p->pool, p->poolsize, p->received, p->in->combine); debug(10, "Sent %u messages to %u destination nodes", p->in->combine, p->destinations.length); } + /* Block until 1/p->rate seconds elapsed */ + while (timerfd_wait(p->tfd)) return NULL; } diff --git a/server/src/random.c b/server/src/random.c index 5feeda828..89c182ba7 100644 --- a/server/src/random.c +++ b/server/src/random.c @@ -33,7 +33,6 @@ int main(int argc, char *argv[]) exit(EXIT_FAILURE); } - uint64_t runs; int rate = atoi(argv[2]); struct msg m = MSG_INIT(atoi(argv[1])); @@ -53,16 +52,12 @@ int main(int argc, char *argv[]) /* Print header */ fprintf(stderr, "# %-6s%-12s\n", "seq", "data"); - while (1) { - /* Block until 1/p->rate seconds elapsed */ - read(tfd, &runs, sizeof(runs)); - + /* Block until 1/p->rate seconds elapsed */ + while ((m.sequence = timerfd_wait(tfd))) { msg_random(&m); msg_fprint(stdout, &m); fflush(stdout); - - m.sequence++; } close(tfd); diff --git a/server/src/socket.c b/server/src/socket.c index fec5fcd8f..1c3aac511 100644 --- a/server/src/socket.c +++ b/server/src/socket.c @@ -214,7 +214,7 @@ int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int c if (bytes != 0) error("Packet length does not match message header length!"); - return 0; + return cnt; } int socket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt) @@ -254,7 +254,7 @@ int socket_write(struct node *n, struct msg *pool, int poolsize, int first, int if (ret < 0) serror("Failed send"); - return 0; + return cnt; } int socket_parse(config_setting_t *cfg, struct node *n) diff --git a/server/src/utils.c b/server/src/utils.c index e06856b3f..3d49990b9 100644 --- a/server/src/utils.c +++ b/server/src/utils.c @@ -79,6 +79,13 @@ void * alloc(size_t bytes) return p; } +uint64_t timerfd_wait(int fd) +{ + uint64_t runs; + + return read(fd, &runs, sizeof(runs)) < 0 ? 0 : runs; +} + double timespec_delta(struct timespec *start, struct timespec *end) { double sec = end->tv_sec - start->tv_sec;