diff --git a/etc/example.conf b/etc/example.conf index 9476b832c..20ccc268f 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -40,5 +40,6 @@ paths = ( reverse = true, # Setup a path in the reverse direction too in = "acs", # Name of the node we listen to (see above) out = "sintef", # Name of the node we send to + rate = 100 # Send message over this path with a fixed (equalized) rate } ); diff --git a/include/path.h b/include/path.h index d96e89850..f4024cb01 100644 --- a/include/path.h +++ b/include/path.h @@ -33,6 +33,12 @@ struct path */ int (*hook)(struct msg *m); + /** Send messages with a fixed rate over this path */ + double rate; + + /** A pointer to the last received message */ + struct msg *last; + /** Counter for received messages */ unsigned int received; /** Counter for messages which arrived reordered */ @@ -42,8 +48,8 @@ struct path /** Last known message number */ unsigned int sequence; - /** The thread for this path */ - pthread_t tid; + /** The thread ids for this path */ + pthread_t tid, tid2; /** A pointer to the libconfig object which instantiated this path */ config_setting_t *cfg; diff --git a/src/cfg.c b/src/cfg.c index caa44d5de..facc098ae 100644 --- a/src/cfg.c +++ b/src/cfg.c @@ -107,10 +107,6 @@ int config_parse_path(config_setting_t *cfg, int enabled = 1; int reverse = 0; - /* Optional settings */ - config_setting_lookup_bool(cfg, "enabled", &enabled); - config_setting_lookup_bool(cfg, "reverse", &reverse); - struct path *path = (struct path *) malloc(sizeof(struct path)); if (!path) error("Failed to allocate memory for path"); @@ -132,6 +128,11 @@ int config_parse_path(config_setting_t *cfg, if (!path->out) cerror(cfg, "Invalid output node '%s'", out_str); + /* Optional settings */ + config_setting_lookup_bool(cfg, "enabled", &enabled); + config_setting_lookup_bool(cfg, "reverse", &reverse); + config_setting_lookup_float(cfg, "rate", &path->rate); + path->cfg = cfg; debug(3, "Loaded path from '%s' to '%s'", path->in->name, path->out->name); diff --git a/src/path.c b/src/path.c index 017fab722..ebeaa143c 100644 --- a/src/path.c +++ b/src/path.c @@ -7,21 +7,67 @@ #include #include +#include #include +#include +#include + +#include #include "cfg.h" #include "utils.h" #include "path.h" -/** - * @brief This is the main thread function per path - */ +#define sigev_notify_thread_id _sigev_un._tid + +/** Send messages */ +static void * path_send(void *arg) +{ + struct path *p = (struct path *) arg; + timer_t tmr; + sigset_t set; + + struct sigevent sev = { + .sigev_notify = SIGEV_THREAD_ID, + .sigev_signo = SIGALRM, + .sigev_notify_thread_id = syscall(SYS_gettid) + }; + + struct itimerspec its = { + .it_interval = timespec_rate(p->rate), + .it_value = { 1, 0 } + }; + + sigemptyset(&set); + sigaddset(&set, SIGALRM); + if(pthread_sigmask(SIG_BLOCK, &set, NULL)) + perror("Set signal mask"); + + if (timer_create(CLOCK_REALTIME, &sev, &tmr)) + perror("Failed to create timer"); + + if (timer_settime(tmr, 0, &its, NULL)) + perror("Failed to start timer"); + + while (1) { + int sig; + sigwait(&set, &sig); + + msg_send(p->last, p->out); + } + + return NULL; +} + +/** Receive messages */ static void * path_run(void *arg) { struct path *p = (struct path *) arg; struct msg m; - /* main thread loop */ + p->last = &m; + + /* Main thread loop */ while (1) { msg_recv(&m, p->in); /* Receive message */ @@ -47,7 +93,11 @@ static void * path_run(void *arg) continue; } - msg_send(&m, p->out); /* Send message */ + /* At fixed rate mode, messages are send by another thread */ + if (p->rate) + continue; + + msg_send(p->last, p->out); } return NULL; @@ -55,15 +105,22 @@ static void * path_run(void *arg) int path_start(struct path *p) { + /* At fixed rate mode, we start another thread for sending */ + if (p->rate) + pthread_create(&p->tid2, NULL, &path_send, (void *) p); + return pthread_create(&p->tid, NULL, &path_run, (void *) p); } int path_stop(struct path *p) { - void *ret; - pthread_cancel(p->tid); - pthread_join(p->tid, &ret); + pthread_join(p->tid, NULL); + + if (p->rate) { + pthread_cancel(p->tid2); + pthread_join(p->tid2, NULL); + } return 0; }