diff --git a/include/villas/path.h b/include/villas/path.h index 801acf51d..fb01e5d23 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -42,7 +42,7 @@ struct path struct node *in; /**< Pointer to the incoming node */ - struct mpmc_queue queue; /**< A ring buffer for all received messages (unmodified) */ + struct queue queue; /**< A ring buffer for all received messages (unmodified) */ struct pool pool; /**< Memory pool for messages / samples. */ struct list destinations; /**< List of all outgoing nodes */ diff --git a/include/villas/pool.h b/include/villas/pool.h index c6ffcc466..4c0fc70f4 100644 --- a/include/villas/pool.h +++ b/include/villas/pool.h @@ -28,7 +28,7 @@ struct pool { size_t blocksz; /**< Length of a block in bytes */ size_t alignment; /**< Alignment of a block in bytes */ - struct mpmc_queue queue; /**< The queue which is used to keep track of free blocks */ + struct queue queue; /**< The queue which is used to keep track of free blocks */ }; #define INLINE static inline __attribute__((unused)) @@ -42,26 +42,26 @@ int pool_destroy(struct pool *p); /** Pop cnt values from the stack an place them in the array blocks */ INLINE ssize_t pool_get_many(struct pool *p, void *blocks[], size_t cnt) { - return mpmc_queue_pull_many(&p->queue, blocks, cnt); + return queue_pull_many(&p->queue, blocks, cnt); } /** Push cnt values which are giving by the array values to the stack. */ INLINE ssize_t pool_put_many(struct pool *p, void *blocks[], size_t cnt) { - return mpmc_queue_push_many(&p->queue, blocks, cnt); + return queue_push_many(&p->queue, blocks, cnt); } /** Get a free memory block from pool. */ INLINE void * pool_get(struct pool *p) { void *ptr; - return mpmc_queue_pull(&p->queue, &ptr) == 1 ? ptr : NULL; + return queue_pull(&p->queue, &ptr) == 1 ? ptr : NULL; } /** Release a memory block back to the pool. */ INLINE int pool_put(struct pool *p, void *buf) { - return mpmc_queue_push(&p->queue, buf); + return queue_push(&p->queue, buf); } #endif /* _POOL_H_ */ \ No newline at end of file diff --git a/include/villas/queue.h b/include/villas/queue.h index bf2a6cac0..40baf1f17 100644 --- a/include/villas/queue.h +++ b/include/villas/queue.h @@ -42,12 +42,12 @@ #define CACHELINE_SIZE 64 typedef char cacheline_pad_t[CACHELINE_SIZE]; -struct mpmc_queue { +struct queue { cacheline_pad_t _pad0; /**< Shared area: all threads read */ struct memtype const * mem; size_t buffer_mask; - struct mpmc_queue_cell { + struct queue_cell { atomic_size_t sequence; void *data; } *buffer; @@ -64,24 +64,24 @@ struct mpmc_queue { }; /** Initialize MPMC queue */ -int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem); +int queue_init(struct queue *q, size_t size, const struct memtype *mem); /** Desroy MPMC queue and release memory */ -int mpmc_queue_destroy(struct mpmc_queue *q); +int queue_destroy(struct queue *q); /** Return estimation of current queue usage. * * Note: This is only an estimation and not accurate as long other * threads are performing operations. */ -size_t mpmc_queue_available(struct mpmc_queue *q); +size_t queue_available(struct queue *q); -int mpmc_queue_push(struct mpmc_queue *q, void *ptr); +int queue_push(struct queue *q, void *ptr); -int mpmc_queue_pull(struct mpmc_queue *q, void **ptr); +int queue_pull(struct queue *q, void **ptr); -int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt); +int queue_push_many(struct queue *q, void *ptr[], size_t cnt); -int mpmc_queue_pull_many(struct mpmc_queue *q, void *ptr[], size_t cnt); +int queue_pull_many(struct queue *q, void *ptr[], size_t cnt); #endif /* _MPMC_QUEUE_H_ */ \ No newline at end of file diff --git a/lib/path.c b/lib/path.c index b783c816f..69667cbcc 100644 --- a/lib/path.c +++ b/lib/path.c @@ -26,7 +26,7 @@ static void path_write(struct path *p, bool resend) int sent, tosend, available, released; struct sample *smps[n->vectorize]; - available = mpmc_queue_pull_many(&p->queue, (void **) smps, cnt); + available = queue_pull_many(&p->queue, (void **) smps, cnt); if (available < cnt) warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); @@ -108,7 +108,7 @@ static void * path_run(void *arg) p->skipped += recv - enqueue; } - enqueued = mpmc_queue_push_many(&p->queue, (void **) smps, enqueue); + enqueued = queue_push_many(&p->queue, (void **) smps, enqueue); if (enqueue != enqueued) warn("Failed to enqueue %u samples for path %s", enqueue - enqueued, path_name(p)); @@ -219,7 +219,7 @@ int path_prepare(struct path *p) if (ret) error("Failed to allocate memory pool for path"); - ret = mpmc_queue_init(&p->queue, p->queuelen, &memtype_hugepage); + ret = queue_init(&p->queue, p->queuelen, &memtype_hugepage); if (ret) error("Failed to initialize queue for path"); @@ -233,7 +233,7 @@ void path_destroy(struct path *p) list_destroy(&p->destinations, NULL, false); list_destroy(&p->hooks, NULL, true); - mpmc_queue_destroy(&p->queue); + queue_destroy(&p->queue); pool_destroy(&p->pool); free(p->_name); diff --git a/lib/pool.c b/lib/pool.c index 90317bd9e..156200c5a 100644 --- a/lib/pool.c +++ b/lib/pool.c @@ -26,17 +26,17 @@ int pool_init(struct pool *p, size_t blocksz, size_t cnt, const struct memtype * else debug(DBG_POOL | 4, "Allocated %#zx bytes for memory pool", p->len); - mpmc_queue_init(&p->queue, cnt, m); + queue_init(&p->queue, cnt, m); for (int i = 0; i < cnt; i++) - mpmc_queue_push(&p->queue, (char *) p->buffer + i * p->blocksz); + queue_push(&p->queue, (char *) p->buffer + i * p->blocksz); return 0; } int pool_destroy(struct pool *p) { - mpmc_queue_destroy(&p->queue); + queue_destroy(&p->queue); return memory_free(p->mem, p->buffer, p->len); } \ No newline at end of file diff --git a/lib/queue.c b/lib/queue.c index 136a7d263..1d1b797bd 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -34,7 +34,7 @@ #include "queue.h" /** Initialize MPMC queue */ -int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem) +int queue_init(struct queue *q, size_t size, const struct memtype *mem) { /* Queue size must be 2 exponent */ if ((size < 2) || ((size & (size - 1)) != 0)) @@ -55,7 +55,7 @@ int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem return 0; } -int mpmc_queue_destroy(struct mpmc_queue *q) +int queue_destroy(struct queue *q) { return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(sizeof(q->buffer[0]))); } @@ -65,15 +65,15 @@ int mpmc_queue_destroy(struct mpmc_queue *q) * Note: This is only an estimation and not accurate as long other * threads are performing operations. */ -size_t mpmc_queue_available(struct mpmc_queue *q) +size_t queue_available(struct queue *q) { return atomic_load_explicit(&q->tail, memory_order_relaxed) - atomic_load_explicit(&q->head, memory_order_relaxed); } -int mpmc_queue_push(struct mpmc_queue *q, void *ptr) +int queue_push(struct queue *q, void *ptr) { - struct mpmc_queue_cell *cell; + struct queue_cell *cell; size_t pos, seq; intptr_t diff; @@ -99,9 +99,9 @@ int mpmc_queue_push(struct mpmc_queue *q, void *ptr) return 1; } -int mpmc_queue_pull(struct mpmc_queue *q, void **ptr) +int queue_pull(struct queue *q, void **ptr) { - struct mpmc_queue_cell *cell; + struct queue_cell *cell; size_t pos, seq; intptr_t diff; @@ -128,13 +128,13 @@ int mpmc_queue_pull(struct mpmc_queue *q, void **ptr) return 1; } -int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt) +int queue_push_many(struct queue *q, void *ptr[], size_t cnt) { int ret; size_t i; for (i = 0; i < cnt; i++) { - ret = mpmc_queue_push(q, ptr[i]); + ret = queue_push(q, ptr[i]); if (!ret) break; } @@ -142,13 +142,13 @@ int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt) return i; } -int mpmc_queue_pull_many(struct mpmc_queue *q, void *ptr[], size_t cnt) +int queue_pull_many(struct queue *q, void *ptr[], size_t cnt) { int ret; size_t i; for (i = 0; i < cnt; i++) { - ret = mpmc_queue_pull(q, &ptr[i]); + ret = queue_pull(q, &ptr[i]); if (!ret) break; }