mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
mpmc_queue => queue
This commit is contained in:
parent
7e5f656567
commit
26e3885fbc
6 changed files with 33 additions and 33 deletions
|
@ -42,7 +42,7 @@ struct path
|
|||
|
||||
struct node *in; /**< Pointer to the incoming node */
|
||||
|
||||
struct mpmc_queue queue; /**< A ring buffer for all received messages (unmodified) */
|
||||
struct queue queue; /**< A ring buffer for all received messages (unmodified) */
|
||||
struct pool pool; /**< Memory pool for messages / samples. */
|
||||
|
||||
struct list destinations; /**< List of all outgoing nodes */
|
||||
|
|
|
@ -28,7 +28,7 @@ struct pool {
|
|||
size_t blocksz; /**< Length of a block in bytes */
|
||||
size_t alignment; /**< Alignment of a block in bytes */
|
||||
|
||||
struct mpmc_queue queue; /**< The queue which is used to keep track of free blocks */
|
||||
struct queue queue; /**< The queue which is used to keep track of free blocks */
|
||||
};
|
||||
|
||||
#define INLINE static inline __attribute__((unused))
|
||||
|
@ -42,26 +42,26 @@ int pool_destroy(struct pool *p);
|
|||
/** Pop cnt values from the stack an place them in the array blocks */
|
||||
INLINE ssize_t pool_get_many(struct pool *p, void *blocks[], size_t cnt)
|
||||
{
|
||||
return mpmc_queue_pull_many(&p->queue, blocks, cnt);
|
||||
return queue_pull_many(&p->queue, blocks, cnt);
|
||||
}
|
||||
|
||||
/** Push cnt values which are giving by the array values to the stack. */
|
||||
INLINE ssize_t pool_put_many(struct pool *p, void *blocks[], size_t cnt)
|
||||
{
|
||||
return mpmc_queue_push_many(&p->queue, blocks, cnt);
|
||||
return queue_push_many(&p->queue, blocks, cnt);
|
||||
}
|
||||
|
||||
/** Get a free memory block from pool. */
|
||||
INLINE void * pool_get(struct pool *p)
|
||||
{
|
||||
void *ptr;
|
||||
return mpmc_queue_pull(&p->queue, &ptr) == 1 ? ptr : NULL;
|
||||
return queue_pull(&p->queue, &ptr) == 1 ? ptr : NULL;
|
||||
}
|
||||
|
||||
/** Release a memory block back to the pool. */
|
||||
INLINE int pool_put(struct pool *p, void *buf)
|
||||
{
|
||||
return mpmc_queue_push(&p->queue, buf);
|
||||
return queue_push(&p->queue, buf);
|
||||
}
|
||||
|
||||
#endif /* _POOL_H_ */
|
|
@ -42,12 +42,12 @@
|
|||
#define CACHELINE_SIZE 64
|
||||
typedef char cacheline_pad_t[CACHELINE_SIZE];
|
||||
|
||||
struct mpmc_queue {
|
||||
struct queue {
|
||||
cacheline_pad_t _pad0; /**< Shared area: all threads read */
|
||||
|
||||
struct memtype const * mem;
|
||||
size_t buffer_mask;
|
||||
struct mpmc_queue_cell {
|
||||
struct queue_cell {
|
||||
atomic_size_t sequence;
|
||||
void *data;
|
||||
} *buffer;
|
||||
|
@ -64,24 +64,24 @@ struct mpmc_queue {
|
|||
};
|
||||
|
||||
/** Initialize MPMC queue */
|
||||
int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem);
|
||||
int queue_init(struct queue *q, size_t size, const struct memtype *mem);
|
||||
|
||||
/** Desroy MPMC queue and release memory */
|
||||
int mpmc_queue_destroy(struct mpmc_queue *q);
|
||||
int queue_destroy(struct queue *q);
|
||||
|
||||
/** Return estimation of current queue usage.
|
||||
*
|
||||
* Note: This is only an estimation and not accurate as long other
|
||||
* threads are performing operations.
|
||||
*/
|
||||
size_t mpmc_queue_available(struct mpmc_queue *q);
|
||||
size_t queue_available(struct queue *q);
|
||||
|
||||
int mpmc_queue_push(struct mpmc_queue *q, void *ptr);
|
||||
int queue_push(struct queue *q, void *ptr);
|
||||
|
||||
int mpmc_queue_pull(struct mpmc_queue *q, void **ptr);
|
||||
int queue_pull(struct queue *q, void **ptr);
|
||||
|
||||
int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt);
|
||||
int queue_push_many(struct queue *q, void *ptr[], size_t cnt);
|
||||
|
||||
int mpmc_queue_pull_many(struct mpmc_queue *q, void *ptr[], size_t cnt);
|
||||
int queue_pull_many(struct queue *q, void *ptr[], size_t cnt);
|
||||
|
||||
#endif /* _MPMC_QUEUE_H_ */
|
|
@ -26,7 +26,7 @@ static void path_write(struct path *p, bool resend)
|
|||
int sent, tosend, available, released;
|
||||
struct sample *smps[n->vectorize];
|
||||
|
||||
available = mpmc_queue_pull_many(&p->queue, (void **) smps, cnt);
|
||||
available = queue_pull_many(&p->queue, (void **) smps, cnt);
|
||||
if (available < cnt)
|
||||
warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt);
|
||||
|
||||
|
@ -108,7 +108,7 @@ static void * path_run(void *arg)
|
|||
p->skipped += recv - enqueue;
|
||||
}
|
||||
|
||||
enqueued = mpmc_queue_push_many(&p->queue, (void **) smps, enqueue);
|
||||
enqueued = queue_push_many(&p->queue, (void **) smps, enqueue);
|
||||
if (enqueue != enqueued)
|
||||
warn("Failed to enqueue %u samples for path %s", enqueue - enqueued, path_name(p));
|
||||
|
||||
|
@ -219,7 +219,7 @@ int path_prepare(struct path *p)
|
|||
if (ret)
|
||||
error("Failed to allocate memory pool for path");
|
||||
|
||||
ret = mpmc_queue_init(&p->queue, p->queuelen, &memtype_hugepage);
|
||||
ret = queue_init(&p->queue, p->queuelen, &memtype_hugepage);
|
||||
if (ret)
|
||||
error("Failed to initialize queue for path");
|
||||
|
||||
|
@ -233,7 +233,7 @@ void path_destroy(struct path *p)
|
|||
list_destroy(&p->destinations, NULL, false);
|
||||
list_destroy(&p->hooks, NULL, true);
|
||||
|
||||
mpmc_queue_destroy(&p->queue);
|
||||
queue_destroy(&p->queue);
|
||||
pool_destroy(&p->pool);
|
||||
|
||||
free(p->_name);
|
||||
|
|
|
@ -26,17 +26,17 @@ int pool_init(struct pool *p, size_t blocksz, size_t cnt, const struct memtype *
|
|||
else
|
||||
debug(DBG_POOL | 4, "Allocated %#zx bytes for memory pool", p->len);
|
||||
|
||||
mpmc_queue_init(&p->queue, cnt, m);
|
||||
queue_init(&p->queue, cnt, m);
|
||||
|
||||
for (int i = 0; i < cnt; i++)
|
||||
mpmc_queue_push(&p->queue, (char *) p->buffer + i * p->blocksz);
|
||||
queue_push(&p->queue, (char *) p->buffer + i * p->blocksz);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int pool_destroy(struct pool *p)
|
||||
{
|
||||
mpmc_queue_destroy(&p->queue);
|
||||
queue_destroy(&p->queue);
|
||||
|
||||
return memory_free(p->mem, p->buffer, p->len);
|
||||
}
|
22
lib/queue.c
22
lib/queue.c
|
@ -34,7 +34,7 @@
|
|||
#include "queue.h"
|
||||
|
||||
/** Initialize MPMC queue */
|
||||
int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem)
|
||||
int queue_init(struct queue *q, size_t size, const struct memtype *mem)
|
||||
{
|
||||
/* Queue size must be 2 exponent */
|
||||
if ((size < 2) || ((size & (size - 1)) != 0))
|
||||
|
@ -55,7 +55,7 @@ int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem
|
|||
return 0;
|
||||
}
|
||||
|
||||
int mpmc_queue_destroy(struct mpmc_queue *q)
|
||||
int queue_destroy(struct queue *q)
|
||||
{
|
||||
return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(sizeof(q->buffer[0])));
|
||||
}
|
||||
|
@ -65,15 +65,15 @@ int mpmc_queue_destroy(struct mpmc_queue *q)
|
|||
* Note: This is only an estimation and not accurate as long other
|
||||
* threads are performing operations.
|
||||
*/
|
||||
size_t mpmc_queue_available(struct mpmc_queue *q)
|
||||
size_t queue_available(struct queue *q)
|
||||
{
|
||||
return atomic_load_explicit(&q->tail, memory_order_relaxed) -
|
||||
atomic_load_explicit(&q->head, memory_order_relaxed);
|
||||
}
|
||||
|
||||
int mpmc_queue_push(struct mpmc_queue *q, void *ptr)
|
||||
int queue_push(struct queue *q, void *ptr)
|
||||
{
|
||||
struct mpmc_queue_cell *cell;
|
||||
struct queue_cell *cell;
|
||||
size_t pos, seq;
|
||||
intptr_t diff;
|
||||
|
||||
|
@ -99,9 +99,9 @@ int mpmc_queue_push(struct mpmc_queue *q, void *ptr)
|
|||
return 1;
|
||||
}
|
||||
|
||||
int mpmc_queue_pull(struct mpmc_queue *q, void **ptr)
|
||||
int queue_pull(struct queue *q, void **ptr)
|
||||
{
|
||||
struct mpmc_queue_cell *cell;
|
||||
struct queue_cell *cell;
|
||||
size_t pos, seq;
|
||||
intptr_t diff;
|
||||
|
||||
|
@ -128,13 +128,13 @@ int mpmc_queue_pull(struct mpmc_queue *q, void **ptr)
|
|||
return 1;
|
||||
}
|
||||
|
||||
int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt)
|
||||
int queue_push_many(struct queue *q, void *ptr[], size_t cnt)
|
||||
{
|
||||
int ret;
|
||||
size_t i;
|
||||
|
||||
for (i = 0; i < cnt; i++) {
|
||||
ret = mpmc_queue_push(q, ptr[i]);
|
||||
ret = queue_push(q, ptr[i]);
|
||||
if (!ret)
|
||||
break;
|
||||
}
|
||||
|
@ -142,13 +142,13 @@ int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt)
|
|||
return i;
|
||||
}
|
||||
|
||||
int mpmc_queue_pull_many(struct mpmc_queue *q, void *ptr[], size_t cnt)
|
||||
int queue_pull_many(struct queue *q, void *ptr[], size_t cnt)
|
||||
{
|
||||
int ret;
|
||||
size_t i;
|
||||
|
||||
for (i = 0; i < cnt; i++) {
|
||||
ret = mpmc_queue_pull(q, &ptr[i]);
|
||||
ret = queue_pull(q, &ptr[i]);
|
||||
if (!ret)
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue