diff --git a/include/villas/task.h b/include/villas/task.h index 6f67916f8..141f2f7e2 100644 --- a/include/villas/task.h +++ b/include/villas/task.h @@ -36,20 +36,20 @@ #if defined(__MACH__) #define PERIODIC_TASK_IMPL NANOSLEEP -#else +#elif defined(__linux__) #define PERIODIC_TASK_IMPL TIMERFD +#else + #error "Platform not supported" #endif struct task { - struct timespec period; -#if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP || PERIODIC_TASK_IMPL == NANOSLEEP - struct timespec next_period; -#elif PERIODIC_TASK_IMPL == TIMERFD - int fd; -#else - #error "Invalid period task implementation" + int clock; /**< CLOCK_{MONOTONIC,REALTIME} */ + + struct timespec period; /**< The period of periodic invations of this task */ + struct timespec next; /**< The timer value for the next invocation */ +#if PERIODIC_TASK_IMPL == TIMERFD + int fd; /**< The timerfd_create(2) file descriptior. */ #endif - int clock; }; /** Create a new task with the given rate. */ @@ -62,14 +62,14 @@ int task_destroy(struct task *t); * @retval 0 An error occured. Maybe the task was stopped. * @retval >0 The nummer of runs this task already fired. */ -uint64_t task_wait_until_next_period(struct task *t); +uint64_t task_wait(struct task *t); +int task_set_next(struct task *t, struct timespec *next); +int task_set_timeout(struct task *t, double to); int task_set_rate(struct task *t, double rate); -/** Wait until a fixed time in the future is reached +/** Returns a poll'able file descriptor which becomes readable when the timer expires. * - * @param until A pointer to a time in the future. + * Note: currently not supported on all platforms. */ -int task_wait_until(struct task *t, const struct timespec *until); - int task_fd(struct task *t); diff --git a/lib/nodes/file.c b/lib/nodes/file.c index d3ec27e60..ad80afda1 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -315,14 +315,15 @@ retry: ret = io_scan(&f->io, smps, cnt); return cnt; if (f->rate) { - steps = task_wait_until_next_period(&f->task); + steps = task_wait(&f->task); smps[0]->ts.origin = time_now(); } else { smps[0]->ts.origin = time_add(&smps[0]->ts.origin, &f->offset); - steps = task_wait_until(&f->task, &smps[0]->ts.origin); + task_set_next(&f->task, &smps[0]->ts.origin); + steps = task_wait(&f->task); } /* Check for overruns */ diff --git a/lib/nodes/ngsi.c b/lib/nodes/ngsi.c index e77c5bdd6..bfce5c3c8 100644 --- a/lib/nodes/ngsi.c +++ b/lib/nodes/ngsi.c @@ -539,7 +539,7 @@ int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt) struct ngsi *i = n->_vd; int ret; - if (task_wait_until_next_period(&i->task) == 0) + if (task_wait(&i->task) == 0) perror("Failed to wait for task"); json_t *rentity; diff --git a/lib/nodes/signal.c b/lib/nodes/signal.c index 215078779..0900eba98 100644 --- a/lib/nodes/signal.c +++ b/lib/nodes/signal.c @@ -220,7 +220,7 @@ int signal_read(struct node *n, struct sample *smps[], unsigned cnt) /* Throttle output if desired */ if (s->rt) { /* Block until 1/p->rate seconds elapsed */ - steps = task_wait_until_next_period(&s->task); + steps = task_wait(&s->task); if (steps > 1) warn("Missed steps: %u", steps); diff --git a/lib/nodes/stats.c b/lib/nodes/stats.c index d34f6345c..57671065b 100644 --- a/lib/nodes/stats.c +++ b/lib/nodes/stats.c @@ -128,7 +128,7 @@ int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt) if (!sn->node->stats) return 0; - task_wait_until_next_period(&sn->task); + task_wait(&sn->task); smps[0]->length = MIN(STATS_COUNT * 6, smps[0]->capacity); smps[0]->flags = SAMPLE_HAS_VALUES; diff --git a/lib/task.c b/lib/task.c index 04035d897..9b05e5da6 100644 --- a/lib/task.c +++ b/lib/task.c @@ -52,16 +52,50 @@ int task_init(struct task *t, double rate, int clock) return 0; } -int task_set_rate(struct task *t, double rate) +int task_set_timeout(struct task *t, double to) { - t->period = rate ? time_from_double(1.0 / rate) : (struct timespec) { 0, 0 }; - -#if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP || PERIODIC_TASK_IMPL == NANOSLEEP struct timespec now; clock_gettime(t->clock, &now); - t->next_period = time_add(&now, &t->period); + struct timespec timeout = time_from_double(to); + struct timespec next = time_add(&now, &timeout); + + return task_set_next(t, &next); +} + +int task_set_next(struct task *t, struct timespec *next) +{ + t->next = *next; + +#if PERIODIC_TASK_IMPL == TIMERFD + int ret; + struct itimerspec its = { + .it_interval = (struct timespec) { 0, 0 }, + .it_value = t->next + }; + + ret = timerfd_settime(t->fd, TFD_TIMER_ABSTIME, &its, NULL); + if (ret) + return ret; +#endif + + return 0; +} + +int task_set_rate(struct task *t, double rate) +{ + /* A rate of 0 will disarm the timer */ + t->period = rate ? time_from_double(1.0 / rate) : (struct timespec) { 0, 0 }; + +#if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP || PERIODIC_TASK_IMPL == NANOSLEEP + struct timespec now, next; + + clock_gettime(t->clock, &now); + + next = time_add(&now, &t->period); + + return time_set_next(t, &next); #elif PERIODIC_TASK_IMPL == TIMERFD int ret; struct itimerspec its = { @@ -98,13 +132,31 @@ static int time_lt(const struct timespec *lhs, const struct timespec *rhs) } #endif -uint64_t task_wait_until_next_period(struct task *t) +uint64_t task_wait(struct task *t) { uint64_t runs; int ret; #if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP || PERIODIC_TASK_IMPL == NANOSLEEP - ret = task_wait_until(t, &t->next_period); + struct timespec now; + + #if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP + do { + ret = clock_nanosleep(t->clock, TIMER_ABSTIME, &t->next, NULL); + } while (ret == EINTR); + #elif PERIODIC_TASK_IMPL == NANOSLEEP + struct timespec delta; + + ret = clock_gettime(t->clock, &now); + if (ret) + return ret; + + delta = time_diff(&now, &t->next); + + ret = nanosleep(&delta, NULL); + #endif + if (ret < 0) + return 0; struct timespec now; @@ -112,9 +164,8 @@ uint64_t task_wait_until_next_period(struct task *t) if (ret) return 0; - for (runs = 0; time_lt(&t->next_period, &now); runs++) - t->next_period = time_add(&t->next_period, &t->period); - + for (runs = 0; time_lt(&t->next, &now); runs++) + t->next = time_add(&t->next, &t->period); #elif PERIODIC_TASK_IMPL == TIMERFD ret = read(t->fd, &runs, sizeof(runs)); if (ret < 0) @@ -126,46 +177,6 @@ uint64_t task_wait_until_next_period(struct task *t) return runs; } -int task_wait_until(struct task *t, const struct timespec *until) -{ - int ret; - -#if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP -retry: ret = clock_nanosleep(t->clock, TIMER_ABSTIME, until, NULL); - if (ret == EINTR) - goto retry; -#elif PERIODIC_TASK_IMPL == NANOSLEEP - struct timespec now, delta; - - ret = clock_gettime(t->clock, &now); - if (ret) - return ret; - - delta = time_diff(&now, until); - - ret = nanosleep(&delta, NULL); -#elif PERIODIC_TASK_IMPL == TIMERFD - uint64_t runs; - - struct itimerspec its = { - .it_value = *until, - .it_interval = { 0, 0 } - }; - - ret = timerfd_settime(t->fd, TFD_TIMER_ABSTIME, &its, NULL); - if (ret) - return 0; - - ret = read(t->fd, &runs, sizeof(runs)); - if (ret < 0) - return ret; -#else - #error "Invalid period task implementation" -#endif - - return 0; -} - int task_fd(struct task *t) { #if PERIODIC_TASK_IMPL == TIMERFD diff --git a/src/node.c b/src/node.c index b399b5ced..4cce57a34 100644 --- a/src/node.c +++ b/src/node.c @@ -171,7 +171,7 @@ int main(int argc, char *argv[]) error("Failed to create stats timer"); for (;;) { - task_wait_until_next_period(&t); + task_wait(&t); for (size_t i = 0; i < list_length(&sn.paths); i++) { struct path *p = list_at(&sn.paths, i); diff --git a/tests/unit/task.c b/tests/unit/task.c index 5bc9943f1..56ee87d18 100644 --- a/tests/unit/task.c +++ b/tests/unit/task.c @@ -33,18 +33,18 @@ Test(task, rate, .timeout = 10) double rate = 5, waited; struct timespec start, end; struct task task; - + ret = task_init(&task, rate, CLOCK_MONOTONIC); cr_assert_eq(ret, 0); int i; for (i = 0; i < 10; i++) { clock_gettime(CLOCK_MONOTONIC, &start); - - task_wait_until_next_period(&task); - + + task_wait(&task); + clock_gettime(CLOCK_MONOTONIC, &end); - + waited = time_delta(&start, &end); if (fabs(waited - 1.0 / rate) > 10e-3) @@ -63,7 +63,7 @@ Test(task, wait_until, .timeout = 5) int ret; struct task task; struct timespec start, end, diff, future; - + ret = task_init(&task, 1, CLOCK_REALTIME); cr_assert_eq(ret, 0); @@ -72,12 +72,15 @@ Test(task, wait_until, .timeout = 5) start = time_now(); diff = time_from_double(waitfor); future = time_add(&start, &diff); - - ret = task_wait_until(&task, &future); + + ret = task_set_next(&task, &future); + cr_assert_eq(ret, 0); + + ret = task_wait(&task); end = time_now(); - - cr_assert_eq(ret, 0); + + cr_assert_eq(ret, 1); double waited = time_delta(&start, &end); @@ -85,4 +88,4 @@ Test(task, wait_until, .timeout = 5) ret = task_destroy(&task); cr_assert_eq(ret, 0); -} \ No newline at end of file +}