mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
added and tested fixed rate feature
git-svn-id: https://zerberus.eonerc.rwth-aachen.de:8443/svn/s2ss/trunk@90 8ec27952-4edc-4aab-86aa-e87bb2611832
This commit is contained in:
parent
14c3914083
commit
6f3652d7c1
4 changed files with 79 additions and 14 deletions
|
@ -40,5 +40,6 @@ paths = (
|
|||
reverse = true, # Setup a path in the reverse direction too
|
||||
in = "acs", # Name of the node we listen to (see above)
|
||||
out = "sintef", # Name of the node we send to
|
||||
rate = 100 # Send message over this path with a fixed (equalized) rate
|
||||
}
|
||||
);
|
||||
|
|
|
@ -33,6 +33,12 @@ struct path
|
|||
*/
|
||||
int (*hook)(struct msg *m);
|
||||
|
||||
/** Send messages with a fixed rate over this path */
|
||||
double rate;
|
||||
|
||||
/** A pointer to the last received message */
|
||||
struct msg *last;
|
||||
|
||||
/** Counter for received messages */
|
||||
unsigned int received;
|
||||
/** Counter for messages which arrived reordered */
|
||||
|
@ -42,8 +48,8 @@ struct path
|
|||
/** Last known message number */
|
||||
unsigned int sequence;
|
||||
|
||||
/** The thread for this path */
|
||||
pthread_t tid;
|
||||
/** The thread ids for this path */
|
||||
pthread_t tid, tid2;
|
||||
/** A pointer to the libconfig object which instantiated this path */
|
||||
config_setting_t *cfg;
|
||||
|
||||
|
|
|
@ -107,10 +107,6 @@ int config_parse_path(config_setting_t *cfg,
|
|||
int enabled = 1;
|
||||
int reverse = 0;
|
||||
|
||||
/* Optional settings */
|
||||
config_setting_lookup_bool(cfg, "enabled", &enabled);
|
||||
config_setting_lookup_bool(cfg, "reverse", &reverse);
|
||||
|
||||
struct path *path = (struct path *) malloc(sizeof(struct path));
|
||||
if (!path)
|
||||
error("Failed to allocate memory for path");
|
||||
|
@ -132,6 +128,11 @@ int config_parse_path(config_setting_t *cfg,
|
|||
if (!path->out)
|
||||
cerror(cfg, "Invalid output node '%s'", out_str);
|
||||
|
||||
/* Optional settings */
|
||||
config_setting_lookup_bool(cfg, "enabled", &enabled);
|
||||
config_setting_lookup_bool(cfg, "reverse", &reverse);
|
||||
config_setting_lookup_float(cfg, "rate", &path->rate);
|
||||
|
||||
path->cfg = cfg;
|
||||
|
||||
debug(3, "Loaded path from '%s' to '%s'", path->in->name, path->out->name);
|
||||
|
|
73
src/path.c
73
src/path.c
|
@ -7,21 +7,67 @@
|
|||
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <signal.h>
|
||||
#include <time.h>
|
||||
|
||||
#include <sys/syscall.h>
|
||||
|
||||
#include "cfg.h"
|
||||
#include "utils.h"
|
||||
#include "path.h"
|
||||
|
||||
/**
|
||||
* @brief This is the main thread function per path
|
||||
*/
|
||||
#define sigev_notify_thread_id _sigev_un._tid
|
||||
|
||||
/** Send messages */
|
||||
static void * path_send(void *arg)
|
||||
{
|
||||
struct path *p = (struct path *) arg;
|
||||
timer_t tmr;
|
||||
sigset_t set;
|
||||
|
||||
struct sigevent sev = {
|
||||
.sigev_notify = SIGEV_THREAD_ID,
|
||||
.sigev_signo = SIGALRM,
|
||||
.sigev_notify_thread_id = syscall(SYS_gettid)
|
||||
};
|
||||
|
||||
struct itimerspec its = {
|
||||
.it_interval = timespec_rate(p->rate),
|
||||
.it_value = { 1, 0 }
|
||||
};
|
||||
|
||||
sigemptyset(&set);
|
||||
sigaddset(&set, SIGALRM);
|
||||
if(pthread_sigmask(SIG_BLOCK, &set, NULL))
|
||||
perror("Set signal mask");
|
||||
|
||||
if (timer_create(CLOCK_REALTIME, &sev, &tmr))
|
||||
perror("Failed to create timer");
|
||||
|
||||
if (timer_settime(tmr, 0, &its, NULL))
|
||||
perror("Failed to start timer");
|
||||
|
||||
while (1) {
|
||||
int sig;
|
||||
sigwait(&set, &sig);
|
||||
|
||||
msg_send(p->last, p->out);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/** Receive messages */
|
||||
static void * path_run(void *arg)
|
||||
{
|
||||
struct path *p = (struct path *) arg;
|
||||
struct msg m;
|
||||
|
||||
/* main thread loop */
|
||||
p->last = &m;
|
||||
|
||||
/* Main thread loop */
|
||||
while (1) {
|
||||
msg_recv(&m, p->in); /* Receive message */
|
||||
|
||||
|
@ -47,7 +93,11 @@ static void * path_run(void *arg)
|
|||
continue;
|
||||
}
|
||||
|
||||
msg_send(&m, p->out); /* Send message */
|
||||
/* At fixed rate mode, messages are send by another thread */
|
||||
if (p->rate)
|
||||
continue;
|
||||
|
||||
msg_send(p->last, p->out);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -55,15 +105,22 @@ static void * path_run(void *arg)
|
|||
|
||||
int path_start(struct path *p)
|
||||
{
|
||||
/* At fixed rate mode, we start another thread for sending */
|
||||
if (p->rate)
|
||||
pthread_create(&p->tid2, NULL, &path_send, (void *) p);
|
||||
|
||||
return pthread_create(&p->tid, NULL, &path_run, (void *) p);
|
||||
}
|
||||
|
||||
int path_stop(struct path *p)
|
||||
{
|
||||
void *ret;
|
||||
|
||||
pthread_cancel(p->tid);
|
||||
pthread_join(p->tid, &ret);
|
||||
pthread_join(p->tid, NULL);
|
||||
|
||||
if (p->rate) {
|
||||
pthread_cancel(p->tid2);
|
||||
pthread_join(p->tid2, NULL);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue