mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
remove fixed-rate sending (will be replaced by new register node-type)
This commit is contained in:
parent
7a7b36e678
commit
64ee734dc2
3 changed files with 7 additions and 56 deletions
|
@ -42,7 +42,7 @@ struct path
|
|||
|
||||
struct node *in; /**< Pointer to the incoming node */
|
||||
|
||||
struct queue queue; /**< A ring buffer for all received messages (unmodified) */
|
||||
struct queue queue; /**< A ring buffer for all received messages (unmodified) */
|
||||
struct pool pool; /**< Memory pool for messages / samples. */
|
||||
|
||||
struct list destinations; /**< List of all outgoing nodes */
|
||||
|
@ -51,11 +51,8 @@ struct path
|
|||
int samplelen; /**< Maximum number of values per sample for this path. */
|
||||
int queuelen; /**< Size of sample queue for this path. */
|
||||
int enabled; /**< Is this path enabled */
|
||||
int tfd; /**< Timer file descriptor for fixed rate sending */
|
||||
double rate; /**< Send messages with a fixed rate over this path */
|
||||
|
||||
pthread_t recv_tid; /**< The thread id for this path */
|
||||
pthread_t sent_tid; /**< A second thread id for fixed rate sending thread */
|
||||
pthread_t tid; /**< The thread id for this path */
|
||||
|
||||
config_setting_t *cfg; /**< A pointer to the libconfig object which instantiated this path */
|
||||
|
||||
|
@ -74,7 +71,6 @@ struct path
|
|||
uintmax_t invalid; /**< Counter for invalid messages */
|
||||
uintmax_t skipped; /**< Counter for skipped messages due to hooks */
|
||||
uintmax_t dropped; /**< Counter for dropped messages due to reordering */
|
||||
uintmax_t overrun; /**< Counter of overruns for fixed-rate sending */
|
||||
|
||||
/** @} */
|
||||
};
|
||||
|
|
|
@ -201,8 +201,6 @@ int cfg_parse_path(config_setting_t *cfg,
|
|||
reverse = 0;
|
||||
if (!config_setting_lookup_bool(cfg, "enabled", &p->enabled))
|
||||
p->enabled = 1;
|
||||
if (!config_setting_lookup_float(cfg, "rate", &p->rate))
|
||||
p->rate = 0; /* disabled */
|
||||
|
||||
p->cfg = cfg;
|
||||
|
||||
|
|
53
lib/path.c
53
lib/path.c
|
@ -51,32 +51,7 @@ static void path_write(struct path *p, bool resend)
|
|||
}
|
||||
}
|
||||
|
||||
/** Send messages asynchronously */
|
||||
static void * path_run_async(void *arg)
|
||||
{
|
||||
struct path *p = arg;
|
||||
|
||||
/* Block until 1/p->rate seconds elapsed */
|
||||
for (;;) {
|
||||
/* Check for overruns */
|
||||
uint64_t expir = timerfd_wait(p->tfd);
|
||||
if (expir == 0)
|
||||
perror("Failed to wait for timer");
|
||||
else if (expir > 1) {
|
||||
p->overrun += expir;
|
||||
warn("Overrun detected for path: overruns=%" PRIu64, expir);
|
||||
}
|
||||
|
||||
if (hook_run(p, NULL, 0, HOOK_ASYNC))
|
||||
continue;
|
||||
|
||||
path_write(p, true);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/** Receive messages */
|
||||
/** Main thread function per path: receive -> sent messages */
|
||||
static void * path_run(void *arg)
|
||||
{
|
||||
struct path *p = arg;
|
||||
|
@ -116,9 +91,7 @@ static void * path_run(void *arg)
|
|||
|
||||
debug(DBG_PATH | 3, "Enqueuing %u samples to queue of path %s", enqueue, path_name(p));
|
||||
|
||||
/* At fixed rate mode, messages are send by another (asynchronous) thread */
|
||||
if (p->rate == 0)
|
||||
path_write(p, false);
|
||||
path_write(p);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -128,25 +101,16 @@ int path_start(struct path *p)
|
|||
{
|
||||
int ret;
|
||||
|
||||
info("Starting path: %s (#hooks=%zu, rate=%.1f)",
|
||||
path_name(p), list_length(&p->hooks), p->rate);
|
||||
info("Starting path: %s (#hooks=%zu)",
|
||||
path_name(p), list_length(&p->hooks));
|
||||
|
||||
ret = hook_run(p, NULL, 0, HOOK_PATH_START);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
/* At fixed rate mode, we start another thread for sending */
|
||||
if (p->rate) {
|
||||
p->tfd = timerfd_create_rate(p->rate);
|
||||
if (p->tfd < 0)
|
||||
serror("Failed to create timer");
|
||||
|
||||
pthread_create(&p->sent_tid, NULL, &path_run_async, p);
|
||||
}
|
||||
|
||||
p->state = PATH_RUNNING;
|
||||
|
||||
return pthread_create(&p->recv_tid, NULL, &path_run, p);
|
||||
return pthread_create(&p->tid, NULL, &path_run, p);
|
||||
}
|
||||
|
||||
int path_stop(struct path *p)
|
||||
|
@ -156,13 +120,6 @@ int path_stop(struct path *p)
|
|||
pthread_cancel(p->recv_tid);
|
||||
pthread_join(p->recv_tid, NULL);
|
||||
|
||||
if (p->rate) {
|
||||
pthread_cancel(p->sent_tid);
|
||||
pthread_join(p->sent_tid, NULL);
|
||||
|
||||
close(p->tfd);
|
||||
}
|
||||
|
||||
p->state = PATH_STOPPED;
|
||||
|
||||
if (hook_run(p, NULL, 0, HOOK_PATH_STOP))
|
||||
|
|
Loading…
Add table
Reference in a new issue