1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

task: simplify interface

This commit is contained in:
Steffen Vogel 2017-09-16 15:33:01 +02:00
parent 5e620942ce
commit fba30731aa
8 changed files with 96 additions and 81 deletions

View file

@ -36,20 +36,20 @@
#if defined(__MACH__)
#define PERIODIC_TASK_IMPL NANOSLEEP
#else
#elif defined(__linux__)
#define PERIODIC_TASK_IMPL TIMERFD
#else
#error "Platform not supported"
#endif
struct task {
struct timespec period;
#if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP || PERIODIC_TASK_IMPL == NANOSLEEP
struct timespec next_period;
#elif PERIODIC_TASK_IMPL == TIMERFD
int fd;
#else
#error "Invalid period task implementation"
int clock; /**< CLOCK_{MONOTONIC,REALTIME} */
struct timespec period; /**< The period of periodic invations of this task */
struct timespec next; /**< The timer value for the next invocation */
#if PERIODIC_TASK_IMPL == TIMERFD
int fd; /**< The timerfd_create(2) file descriptior. */
#endif
int clock;
};
/** Create a new task with the given rate. */
@ -62,14 +62,14 @@ int task_destroy(struct task *t);
* @retval 0 An error occured. Maybe the task was stopped.
* @retval >0 The nummer of runs this task already fired.
*/
uint64_t task_wait_until_next_period(struct task *t);
uint64_t task_wait(struct task *t);
int task_set_next(struct task *t, struct timespec *next);
int task_set_timeout(struct task *t, double to);
int task_set_rate(struct task *t, double rate);
/** Wait until a fixed time in the future is reached
/** Returns a poll'able file descriptor which becomes readable when the timer expires.
*
* @param until A pointer to a time in the future.
* Note: currently not supported on all platforms.
*/
int task_wait_until(struct task *t, const struct timespec *until);
int task_fd(struct task *t);

View file

@ -315,14 +315,15 @@ retry: ret = io_scan(&f->io, smps, cnt);
return cnt;
if (f->rate) {
steps = task_wait_until_next_period(&f->task);
steps = task_wait(&f->task);
smps[0]->ts.origin = time_now();
}
else {
smps[0]->ts.origin = time_add(&smps[0]->ts.origin, &f->offset);
steps = task_wait_until(&f->task, &smps[0]->ts.origin);
task_set_next(&f->task, &smps[0]->ts.origin);
steps = task_wait(&f->task);
}
/* Check for overruns */

View file

@ -539,7 +539,7 @@ int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt)
struct ngsi *i = n->_vd;
int ret;
if (task_wait_until_next_period(&i->task) == 0)
if (task_wait(&i->task) == 0)
perror("Failed to wait for task");
json_t *rentity;

View file

@ -220,7 +220,7 @@ int signal_read(struct node *n, struct sample *smps[], unsigned cnt)
/* Throttle output if desired */
if (s->rt) {
/* Block until 1/p->rate seconds elapsed */
steps = task_wait_until_next_period(&s->task);
steps = task_wait(&s->task);
if (steps > 1)
warn("Missed steps: %u", steps);

View file

@ -128,7 +128,7 @@ int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt)
if (!sn->node->stats)
return 0;
task_wait_until_next_period(&sn->task);
task_wait(&sn->task);
smps[0]->length = MIN(STATS_COUNT * 6, smps[0]->capacity);
smps[0]->flags = SAMPLE_HAS_VALUES;

View file

@ -52,16 +52,50 @@ int task_init(struct task *t, double rate, int clock)
return 0;
}
int task_set_rate(struct task *t, double rate)
int task_set_timeout(struct task *t, double to)
{
t->period = rate ? time_from_double(1.0 / rate) : (struct timespec) { 0, 0 };
#if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP || PERIODIC_TASK_IMPL == NANOSLEEP
struct timespec now;
clock_gettime(t->clock, &now);
t->next_period = time_add(&now, &t->period);
struct timespec timeout = time_from_double(to);
struct timespec next = time_add(&now, &timeout);
return task_set_next(t, &next);
}
int task_set_next(struct task *t, struct timespec *next)
{
t->next = *next;
#if PERIODIC_TASK_IMPL == TIMERFD
int ret;
struct itimerspec its = {
.it_interval = (struct timespec) { 0, 0 },
.it_value = t->next
};
ret = timerfd_settime(t->fd, TFD_TIMER_ABSTIME, &its, NULL);
if (ret)
return ret;
#endif
return 0;
}
int task_set_rate(struct task *t, double rate)
{
/* A rate of 0 will disarm the timer */
t->period = rate ? time_from_double(1.0 / rate) : (struct timespec) { 0, 0 };
#if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP || PERIODIC_TASK_IMPL == NANOSLEEP
struct timespec now, next;
clock_gettime(t->clock, &now);
next = time_add(&now, &t->period);
return time_set_next(t, &next);
#elif PERIODIC_TASK_IMPL == TIMERFD
int ret;
struct itimerspec its = {
@ -98,13 +132,31 @@ static int time_lt(const struct timespec *lhs, const struct timespec *rhs)
}
#endif
uint64_t task_wait_until_next_period(struct task *t)
uint64_t task_wait(struct task *t)
{
uint64_t runs;
int ret;
#if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP || PERIODIC_TASK_IMPL == NANOSLEEP
ret = task_wait_until(t, &t->next_period);
struct timespec now;
#if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP
do {
ret = clock_nanosleep(t->clock, TIMER_ABSTIME, &t->next, NULL);
} while (ret == EINTR);
#elif PERIODIC_TASK_IMPL == NANOSLEEP
struct timespec delta;
ret = clock_gettime(t->clock, &now);
if (ret)
return ret;
delta = time_diff(&now, &t->next);
ret = nanosleep(&delta, NULL);
#endif
if (ret < 0)
return 0;
struct timespec now;
@ -112,9 +164,8 @@ uint64_t task_wait_until_next_period(struct task *t)
if (ret)
return 0;
for (runs = 0; time_lt(&t->next_period, &now); runs++)
t->next_period = time_add(&t->next_period, &t->period);
for (runs = 0; time_lt(&t->next, &now); runs++)
t->next = time_add(&t->next, &t->period);
#elif PERIODIC_TASK_IMPL == TIMERFD
ret = read(t->fd, &runs, sizeof(runs));
if (ret < 0)
@ -126,46 +177,6 @@ uint64_t task_wait_until_next_period(struct task *t)
return runs;
}
int task_wait_until(struct task *t, const struct timespec *until)
{
int ret;
#if PERIODIC_TASK_IMPL == CLOCK_NANOSLEEP
retry: ret = clock_nanosleep(t->clock, TIMER_ABSTIME, until, NULL);
if (ret == EINTR)
goto retry;
#elif PERIODIC_TASK_IMPL == NANOSLEEP
struct timespec now, delta;
ret = clock_gettime(t->clock, &now);
if (ret)
return ret;
delta = time_diff(&now, until);
ret = nanosleep(&delta, NULL);
#elif PERIODIC_TASK_IMPL == TIMERFD
uint64_t runs;
struct itimerspec its = {
.it_value = *until,
.it_interval = { 0, 0 }
};
ret = timerfd_settime(t->fd, TFD_TIMER_ABSTIME, &its, NULL);
if (ret)
return 0;
ret = read(t->fd, &runs, sizeof(runs));
if (ret < 0)
return ret;
#else
#error "Invalid period task implementation"
#endif
return 0;
}
int task_fd(struct task *t)
{
#if PERIODIC_TASK_IMPL == TIMERFD

View file

@ -171,7 +171,7 @@ int main(int argc, char *argv[])
error("Failed to create stats timer");
for (;;) {
task_wait_until_next_period(&t);
task_wait(&t);
for (size_t i = 0; i < list_length(&sn.paths); i++) {
struct path *p = list_at(&sn.paths, i);

View file

@ -33,18 +33,18 @@ Test(task, rate, .timeout = 10)
double rate = 5, waited;
struct timespec start, end;
struct task task;
ret = task_init(&task, rate, CLOCK_MONOTONIC);
cr_assert_eq(ret, 0);
int i;
for (i = 0; i < 10; i++) {
clock_gettime(CLOCK_MONOTONIC, &start);
task_wait_until_next_period(&task);
task_wait(&task);
clock_gettime(CLOCK_MONOTONIC, &end);
waited = time_delta(&start, &end);
if (fabs(waited - 1.0 / rate) > 10e-3)
@ -63,7 +63,7 @@ Test(task, wait_until, .timeout = 5)
int ret;
struct task task;
struct timespec start, end, diff, future;
ret = task_init(&task, 1, CLOCK_REALTIME);
cr_assert_eq(ret, 0);
@ -72,12 +72,15 @@ Test(task, wait_until, .timeout = 5)
start = time_now();
diff = time_from_double(waitfor);
future = time_add(&start, &diff);
ret = task_wait_until(&task, &future);
ret = task_set_next(&task, &future);
cr_assert_eq(ret, 0);
ret = task_wait(&task);
end = time_now();
cr_assert_eq(ret, 0);
cr_assert_eq(ret, 1);
double waited = time_delta(&start, &end);
@ -85,4 +88,4 @@ Test(task, wait_until, .timeout = 5)
ret = task_destroy(&task);
cr_assert_eq(ret, 0);
}
}