diff --git a/include/villas/path.h b/include/villas/path.h index fb01e5d23..654131e2e 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -42,7 +42,7 @@ struct path struct node *in; /**< Pointer to the incoming node */ - struct queue queue; /**< A ring buffer for all received messages (unmodified) */ + struct queue queue; /**< A ring buffer for all received messages (unmodified) */ struct pool pool; /**< Memory pool for messages / samples. */ struct list destinations; /**< List of all outgoing nodes */ @@ -51,11 +51,8 @@ struct path int samplelen; /**< Maximum number of values per sample for this path. */ int queuelen; /**< Size of sample queue for this path. */ int enabled; /**< Is this path enabled */ - int tfd; /**< Timer file descriptor for fixed rate sending */ - double rate; /**< Send messages with a fixed rate over this path */ - pthread_t recv_tid; /**< The thread id for this path */ - pthread_t sent_tid; /**< A second thread id for fixed rate sending thread */ + pthread_t tid; /**< The thread id for this path */ config_setting_t *cfg; /**< A pointer to the libconfig object which instantiated this path */ @@ -74,7 +71,6 @@ struct path uintmax_t invalid; /**< Counter for invalid messages */ uintmax_t skipped; /**< Counter for skipped messages due to hooks */ uintmax_t dropped; /**< Counter for dropped messages due to reordering */ - uintmax_t overrun; /**< Counter of overruns for fixed-rate sending */ /** @} */ }; diff --git a/lib/cfg.c b/lib/cfg.c index 33fe11ed6..84fbfd35f 100644 --- a/lib/cfg.c +++ b/lib/cfg.c @@ -201,8 +201,6 @@ int cfg_parse_path(config_setting_t *cfg, reverse = 0; if (!config_setting_lookup_bool(cfg, "enabled", &p->enabled)) p->enabled = 1; - if (!config_setting_lookup_float(cfg, "rate", &p->rate)) - p->rate = 0; /* disabled */ p->cfg = cfg; diff --git a/lib/path.c b/lib/path.c index 69667cbcc..ec8e100ba 100644 --- a/lib/path.c +++ b/lib/path.c @@ -51,32 +51,7 @@ static void path_write(struct path *p, bool resend) } } -/** Send messages asynchronously */ -static void * path_run_async(void *arg) -{ - struct path *p = arg; - - /* Block until 1/p->rate seconds elapsed */ - for (;;) { - /* Check for overruns */ - uint64_t expir = timerfd_wait(p->tfd); - if (expir == 0) - perror("Failed to wait for timer"); - else if (expir > 1) { - p->overrun += expir; - warn("Overrun detected for path: overruns=%" PRIu64, expir); - } - - if (hook_run(p, NULL, 0, HOOK_ASYNC)) - continue; - - path_write(p, true); - } - - return NULL; -} - -/** Receive messages */ +/** Main thread function per path: receive -> sent messages */ static void * path_run(void *arg) { struct path *p = arg; @@ -116,9 +91,7 @@ static void * path_run(void *arg) debug(DBG_PATH | 3, "Enqueuing %u samples to queue of path %s", enqueue, path_name(p)); - /* At fixed rate mode, messages are send by another (asynchronous) thread */ - if (p->rate == 0) - path_write(p, false); + path_write(p); } return NULL; @@ -128,25 +101,16 @@ int path_start(struct path *p) { int ret; - info("Starting path: %s (#hooks=%zu, rate=%.1f)", - path_name(p), list_length(&p->hooks), p->rate); + info("Starting path: %s (#hooks=%zu)", + path_name(p), list_length(&p->hooks)); ret = hook_run(p, NULL, 0, HOOK_PATH_START); if (ret) return -1; - /* At fixed rate mode, we start another thread for sending */ - if (p->rate) { - p->tfd = timerfd_create_rate(p->rate); - if (p->tfd < 0) - serror("Failed to create timer"); - - pthread_create(&p->sent_tid, NULL, &path_run_async, p); - } - p->state = PATH_RUNNING; - return pthread_create(&p->recv_tid, NULL, &path_run, p); + return pthread_create(&p->tid, NULL, &path_run, p); } int path_stop(struct path *p) @@ -156,13 +120,6 @@ int path_stop(struct path *p) pthread_cancel(p->recv_tid); pthread_join(p->recv_tid, NULL); - if (p->rate) { - pthread_cancel(p->sent_tid); - pthread_join(p->sent_tid, NULL); - - close(p->tfd); - } - p->state = PATH_STOPPED; if (hook_run(p, NULL, 0, HOOK_PATH_STOP))