mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
added helper function for blocking timerfd wait (returns number of runs)
This commit is contained in:
parent
90db947fa4
commit
eea6788253
6 changed files with 28 additions and 24 deletions
|
@ -10,6 +10,7 @@
|
|||
|
||||
#include <stdlib.h>
|
||||
#include <stdarg.h>
|
||||
#include <stdint.h>
|
||||
#include <errno.h>
|
||||
#include <sched.h>
|
||||
#include <string.h>
|
||||
|
@ -99,6 +100,13 @@ cpu_set_t to_cpu_set(int set);
|
|||
/** Allocate and initialize memory. */
|
||||
void * alloc(size_t bytes);
|
||||
|
||||
/** Wait until timer elapsed
|
||||
*
|
||||
* @retval 0 An error occured. Maybe the timer was stopped.
|
||||
* @retval >0 The nummer of runs this timer already fired.
|
||||
*/
|
||||
uint64_t timerfd_wait(int fd);
|
||||
|
||||
/** Get delta between two timespec structs */
|
||||
double timespec_delta(struct timespec *start, struct timespec *end);
|
||||
|
||||
|
|
|
@ -98,15 +98,15 @@ int file_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt
|
|||
{
|
||||
int i = 0;
|
||||
struct file *f = n->file;
|
||||
uint64_t runs;
|
||||
|
||||
if (f->in) {
|
||||
read(f->tfd, &runs, sizeof(runs)); /* blocking for 1/f->rate seconds */
|
||||
|
||||
for (i=0; i<cnt; i++) {
|
||||
struct msg *m = &pool[(first+i) % poolsize];
|
||||
/* Blocking for 1/f->rate seconds */
|
||||
if (timerfd_wait(f->tfd)) {
|
||||
for (i=0; i<cnt; i++) {
|
||||
struct msg *m = &pool[(first+i) % poolsize];
|
||||
|
||||
msg_fscan(f->in, m);
|
||||
msg_fscan(f->in, m);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
|
|
@ -27,10 +27,6 @@ struct list paths;
|
|||
static void * path_send(void *arg)
|
||||
{
|
||||
struct path *p = arg;
|
||||
|
||||
int ret;
|
||||
uint64_t runs;
|
||||
|
||||
struct itimerspec its = {
|
||||
.it_interval = timespec_rate(p->rate),
|
||||
.it_value = { 1, 0 }
|
||||
|
@ -40,19 +36,17 @@ static void * path_send(void *arg)
|
|||
if (p->tfd < 0)
|
||||
serror("Failed to create timer");
|
||||
|
||||
ret = timerfd_settime(p->tfd, 0, &its, NULL);
|
||||
if (ret)
|
||||
if (timerfd_settime(p->tfd, 0, &its, NULL))
|
||||
serror("Failed to start timer");
|
||||
|
||||
while (1) {
|
||||
/* Block until 1/p->rate seconds elapsed */
|
||||
read(p->tfd, &runs, sizeof(runs));
|
||||
|
||||
FOREACH(&p->destinations, it)
|
||||
p->sent += node_write(p->in, p->pool, p->poolsize, p->received, p->in->combine);
|
||||
|
||||
debug(10, "Sent %u messages to %u destination nodes", p->in->combine, p->destinations.length);
|
||||
}
|
||||
/* Block until 1/p->rate seconds elapsed */
|
||||
while (timerfd_wait(p->tfd))
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -33,7 +33,6 @@ int main(int argc, char *argv[])
|
|||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
uint64_t runs;
|
||||
int rate = atoi(argv[2]);
|
||||
struct msg m = MSG_INIT(atoi(argv[1]));
|
||||
|
||||
|
@ -53,16 +52,12 @@ int main(int argc, char *argv[])
|
|||
/* Print header */
|
||||
fprintf(stderr, "# %-6s%-12s\n", "seq", "data");
|
||||
|
||||
while (1) {
|
||||
/* Block until 1/p->rate seconds elapsed */
|
||||
read(tfd, &runs, sizeof(runs));
|
||||
|
||||
/* Block until 1/p->rate seconds elapsed */
|
||||
while ((m.sequence = timerfd_wait(tfd))) {
|
||||
msg_random(&m);
|
||||
msg_fprint(stdout, &m);
|
||||
|
||||
fflush(stdout);
|
||||
|
||||
m.sequence++;
|
||||
}
|
||||
|
||||
close(tfd);
|
||||
|
|
|
@ -214,7 +214,7 @@ int socket_read(struct node *n, struct msg *pool, int poolsize, int first, int c
|
|||
if (bytes != 0)
|
||||
error("Packet length does not match message header length!");
|
||||
|
||||
return 0;
|
||||
return cnt;
|
||||
}
|
||||
|
||||
int socket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
|
||||
|
@ -254,7 +254,7 @@ int socket_write(struct node *n, struct msg *pool, int poolsize, int first, int
|
|||
if (ret < 0)
|
||||
serror("Failed send");
|
||||
|
||||
return 0;
|
||||
return cnt;
|
||||
}
|
||||
|
||||
int socket_parse(config_setting_t *cfg, struct node *n)
|
||||
|
|
|
@ -79,6 +79,13 @@ void * alloc(size_t bytes)
|
|||
return p;
|
||||
}
|
||||
|
||||
uint64_t timerfd_wait(int fd)
|
||||
{
|
||||
uint64_t runs;
|
||||
|
||||
return read(fd, &runs, sizeof(runs)) < 0 ? 0 : runs;
|
||||
}
|
||||
|
||||
double timespec_delta(struct timespec *start, struct timespec *end)
|
||||
{
|
||||
double sec = end->tv_sec - start->tv_sec;
|
||||
|
|
Loading…
Add table
Reference in a new issue