1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

mpmc_queue => queue

This commit is contained in:
Steffen Vogel 2016-10-16 02:33:36 -04:00
parent 641d5ea7ed
commit 3839262d8d
6 changed files with 33 additions and 33 deletions

View file

@ -42,7 +42,7 @@ struct path
struct node *in; /**< Pointer to the incoming node */
struct mpmc_queue queue; /**< A ring buffer for all received messages (unmodified) */
struct queue queue; /**< A ring buffer for all received messages (unmodified) */
struct pool pool; /**< Memory pool for messages / samples. */
struct list destinations; /**< List of all outgoing nodes */

View file

@ -28,7 +28,7 @@ struct pool {
size_t blocksz; /**< Length of a block in bytes */
size_t alignment; /**< Alignment of a block in bytes */
struct mpmc_queue queue; /**< The queue which is used to keep track of free blocks */
struct queue queue; /**< The queue which is used to keep track of free blocks */
};
#define INLINE static inline __attribute__((unused))
@ -42,26 +42,26 @@ int pool_destroy(struct pool *p);
/** Pop cnt values from the stack an place them in the array blocks */
INLINE ssize_t pool_get_many(struct pool *p, void *blocks[], size_t cnt)
{
return mpmc_queue_pull_many(&p->queue, blocks, cnt);
return queue_pull_many(&p->queue, blocks, cnt);
}
/** Push cnt values which are giving by the array values to the stack. */
INLINE ssize_t pool_put_many(struct pool *p, void *blocks[], size_t cnt)
{
return mpmc_queue_push_many(&p->queue, blocks, cnt);
return queue_push_many(&p->queue, blocks, cnt);
}
/** Get a free memory block from pool. */
INLINE void * pool_get(struct pool *p)
{
void *ptr;
return mpmc_queue_pull(&p->queue, &ptr) == 1 ? ptr : NULL;
return queue_pull(&p->queue, &ptr) == 1 ? ptr : NULL;
}
/** Release a memory block back to the pool. */
INLINE int pool_put(struct pool *p, void *buf)
{
return mpmc_queue_push(&p->queue, buf);
return queue_push(&p->queue, buf);
}
#endif /* _POOL_H_ */

View file

@ -42,12 +42,12 @@
#define CACHELINE_SIZE 64
typedef char cacheline_pad_t[CACHELINE_SIZE];
struct mpmc_queue {
struct queue {
cacheline_pad_t _pad0; /**< Shared area: all threads read */
struct memtype const * mem;
size_t buffer_mask;
struct mpmc_queue_cell {
struct queue_cell {
atomic_size_t sequence;
void *data;
} *buffer;
@ -64,24 +64,24 @@ struct mpmc_queue {
};
/** Initialize MPMC queue */
int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem);
int queue_init(struct queue *q, size_t size, const struct memtype *mem);
/** Desroy MPMC queue and release memory */
int mpmc_queue_destroy(struct mpmc_queue *q);
int queue_destroy(struct queue *q);
/** Return estimation of current queue usage.
*
* Note: This is only an estimation and not accurate as long other
* threads are performing operations.
*/
size_t mpmc_queue_available(struct mpmc_queue *q);
size_t queue_available(struct queue *q);
int mpmc_queue_push(struct mpmc_queue *q, void *ptr);
int queue_push(struct queue *q, void *ptr);
int mpmc_queue_pull(struct mpmc_queue *q, void **ptr);
int queue_pull(struct queue *q, void **ptr);
int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt);
int queue_push_many(struct queue *q, void *ptr[], size_t cnt);
int mpmc_queue_pull_many(struct mpmc_queue *q, void *ptr[], size_t cnt);
int queue_pull_many(struct queue *q, void *ptr[], size_t cnt);
#endif /* _MPMC_QUEUE_H_ */

View file

@ -26,7 +26,7 @@ static void path_write(struct path *p, bool resend)
int sent, tosend, available, released;
struct sample *smps[n->vectorize];
available = mpmc_queue_pull_many(&p->queue, (void **) smps, cnt);
available = queue_pull_many(&p->queue, (void **) smps, cnt);
if (available < cnt)
warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt);
@ -108,7 +108,7 @@ static void * path_run(void *arg)
p->skipped += recv - enqueue;
}
enqueued = mpmc_queue_push_many(&p->queue, (void **) smps, enqueue);
enqueued = queue_push_many(&p->queue, (void **) smps, enqueue);
if (enqueue != enqueued)
warn("Failed to enqueue %u samples for path %s", enqueue - enqueued, path_name(p));
@ -219,7 +219,7 @@ int path_prepare(struct path *p)
if (ret)
error("Failed to allocate memory pool for path");
ret = mpmc_queue_init(&p->queue, p->queuelen, &memtype_hugepage);
ret = queue_init(&p->queue, p->queuelen, &memtype_hugepage);
if (ret)
error("Failed to initialize queue for path");
@ -233,7 +233,7 @@ void path_destroy(struct path *p)
list_destroy(&p->destinations, NULL, false);
list_destroy(&p->hooks, NULL, true);
mpmc_queue_destroy(&p->queue);
queue_destroy(&p->queue);
pool_destroy(&p->pool);
free(p->_name);

View file

@ -26,17 +26,17 @@ int pool_init(struct pool *p, size_t blocksz, size_t cnt, const struct memtype *
else
debug(DBG_POOL | 4, "Allocated %#zx bytes for memory pool", p->len);
mpmc_queue_init(&p->queue, cnt, m);
queue_init(&p->queue, cnt, m);
for (int i = 0; i < cnt; i++)
mpmc_queue_push(&p->queue, (char *) p->buffer + i * p->blocksz);
queue_push(&p->queue, (char *) p->buffer + i * p->blocksz);
return 0;
}
int pool_destroy(struct pool *p)
{
mpmc_queue_destroy(&p->queue);
queue_destroy(&p->queue);
return memory_free(p->mem, p->buffer, p->len);
}

View file

@ -34,7 +34,7 @@
#include "queue.h"
/** Initialize MPMC queue */
int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem)
int queue_init(struct queue *q, size_t size, const struct memtype *mem)
{
/* Queue size must be 2 exponent */
if ((size < 2) || ((size & (size - 1)) != 0))
@ -55,7 +55,7 @@ int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem
return 0;
}
int mpmc_queue_destroy(struct mpmc_queue *q)
int queue_destroy(struct queue *q)
{
return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(sizeof(q->buffer[0])));
}
@ -65,15 +65,15 @@ int mpmc_queue_destroy(struct mpmc_queue *q)
* Note: This is only an estimation and not accurate as long other
* threads are performing operations.
*/
size_t mpmc_queue_available(struct mpmc_queue *q)
size_t queue_available(struct queue *q)
{
return atomic_load_explicit(&q->tail, memory_order_relaxed) -
atomic_load_explicit(&q->head, memory_order_relaxed);
}
int mpmc_queue_push(struct mpmc_queue *q, void *ptr)
int queue_push(struct queue *q, void *ptr)
{
struct mpmc_queue_cell *cell;
struct queue_cell *cell;
size_t pos, seq;
intptr_t diff;
@ -99,9 +99,9 @@ int mpmc_queue_push(struct mpmc_queue *q, void *ptr)
return 1;
}
int mpmc_queue_pull(struct mpmc_queue *q, void **ptr)
int queue_pull(struct queue *q, void **ptr)
{
struct mpmc_queue_cell *cell;
struct queue_cell *cell;
size_t pos, seq;
intptr_t diff;
@ -128,13 +128,13 @@ int mpmc_queue_pull(struct mpmc_queue *q, void **ptr)
return 1;
}
int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt)
int queue_push_many(struct queue *q, void *ptr[], size_t cnt)
{
int ret;
size_t i;
for (i = 0; i < cnt; i++) {
ret = mpmc_queue_push(q, ptr[i]);
ret = queue_push(q, ptr[i]);
if (!ret)
break;
}
@ -142,13 +142,13 @@ int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt)
return i;
}
int mpmc_queue_pull_many(struct mpmc_queue *q, void *ptr[], size_t cnt)
int queue_pull_many(struct queue *q, void *ptr[], size_t cnt)
{
int ret;
size_t i;
for (i = 0; i < cnt; i++) {
ret = mpmc_queue_pull(q, &ptr[i]);
ret = queue_pull(q, &ptr[i]);
if (!ret)
break;
}