mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
make pointers in pool and queue relative
This is necessary for the new shared memory node, since these structs may be stored in the shared memory area which is in general mapped to different virtual addresses in the different processes.
This commit is contained in:
parent
b4ba09d9f1
commit
08a60dcaca
4 changed files with 26 additions and 17 deletions
|
@ -17,7 +17,7 @@
|
|||
|
||||
/** A thread-safe memory pool */
|
||||
struct pool {
|
||||
void *buffer; /**< Address of the underlying memory area */
|
||||
size_t buffer_off; /**< Offset from the struct address to the underlying memory area */
|
||||
struct memtype *mem;
|
||||
|
||||
size_t len; /**< Length of the underlying memory area */
|
||||
|
|
|
@ -42,15 +42,17 @@
|
|||
#define CACHELINE_SIZE 64
|
||||
typedef char cacheline_pad_t[CACHELINE_SIZE];
|
||||
|
||||
struct queue_cell {
|
||||
atomic_size_t sequence;
|
||||
void *data;
|
||||
};
|
||||
|
||||
struct queue {
|
||||
cacheline_pad_t _pad0; /**< Shared area: all threads read */
|
||||
|
||||
struct memtype * mem;
|
||||
size_t buffer_mask;
|
||||
struct queue_cell {
|
||||
atomic_size_t sequence;
|
||||
void *data;
|
||||
} *buffer;
|
||||
size_t buffer_off; /**< Relative pointer to struct queue_cell[] */
|
||||
|
||||
cacheline_pad_t _pad1; /**< Producer area: only producers read & write */
|
||||
|
||||
|
|
10
lib/pool.c
10
lib/pool.c
|
@ -20,16 +20,17 @@ int pool_init(struct pool *p, size_t cnt, size_t blocksz, struct memtype *m)
|
|||
p->len = cnt * p->blocksz;
|
||||
p->mem = m;
|
||||
|
||||
p->buffer = memory_alloc_aligned(m, p->len, p->alignment);
|
||||
if (!p->buffer)
|
||||
void *buffer = memory_alloc_aligned(m, p->len, p->alignment);
|
||||
if (!buffer)
|
||||
serror("Failed to allocate memory for memory pool");
|
||||
p->buffer_off = (char*) buffer - (char*) p;
|
||||
|
||||
ret = queue_init(&p->queue, LOG2_CEIL(cnt), m);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
for (int i = 0; i < cnt; i++)
|
||||
queue_push(&p->queue, (char *) p->buffer + i * p->blocksz);
|
||||
queue_push(&p->queue, (char *) buffer + i * p->blocksz);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -38,5 +39,6 @@ int pool_destroy(struct pool *p)
|
|||
{
|
||||
queue_destroy(&p->queue);
|
||||
|
||||
return memory_free(p->mem, p->buffer, p->len);
|
||||
void *buffer = (char*) p + p->buffer_off;
|
||||
return memory_free(p->mem, buffer, p->len);
|
||||
}
|
||||
|
|
21
lib/queue.c
21
lib/queue.c
|
@ -47,12 +47,14 @@ int queue_init(struct queue *q, size_t size, struct memtype *mem)
|
|||
|
||||
q->mem = mem;
|
||||
q->buffer_mask = size - 1;
|
||||
q->buffer = memory_alloc(q->mem, sizeof(q->buffer[0]) * size);
|
||||
if (!q->buffer)
|
||||
struct queue_cell* buffer = memory_alloc(q->mem, sizeof(struct queue_cell) * size);
|
||||
if (!buffer)
|
||||
return -2;
|
||||
|
||||
q->buffer_off = (char*) buffer - (char*) q;
|
||||
|
||||
for (size_t i = 0; i != size; i += 1)
|
||||
atomic_store_explicit(&q->buffer[i].sequence, i, memory_order_relaxed);
|
||||
atomic_store_explicit(&buffer[i].sequence, i, memory_order_relaxed);
|
||||
|
||||
atomic_store_explicit(&q->tail, 0, memory_order_relaxed);
|
||||
atomic_store_explicit(&q->head, 0, memory_order_relaxed);
|
||||
|
@ -62,7 +64,8 @@ int queue_init(struct queue *q, size_t size, struct memtype *mem)
|
|||
|
||||
int queue_destroy(struct queue *q)
|
||||
{
|
||||
return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(q->buffer[0]));
|
||||
void *buffer = (char*) q + q->buffer_off;
|
||||
return memory_free(q->mem, buffer, (q->buffer_mask + 1) * sizeof(struct queue_cell));
|
||||
}
|
||||
|
||||
/** Return estimation of current queue usage.
|
||||
|
@ -78,13 +81,14 @@ size_t queue_available(struct queue *q)
|
|||
|
||||
int queue_push(struct queue *q, void *ptr)
|
||||
{
|
||||
struct queue_cell *cell;
|
||||
struct queue_cell *cell, *buffer;
|
||||
size_t pos, seq;
|
||||
intptr_t diff;
|
||||
|
||||
buffer = (struct queue_cell*) ((char*) q + q->buffer_off);
|
||||
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
|
||||
for (;;) {
|
||||
cell = &q->buffer[pos & q->buffer_mask];
|
||||
cell = &buffer[pos & q->buffer_mask];
|
||||
seq = atomic_load_explicit(&cell->sequence, memory_order_acquire);
|
||||
diff = (intptr_t) seq - (intptr_t) pos;
|
||||
|
||||
|
@ -106,13 +110,14 @@ int queue_push(struct queue *q, void *ptr)
|
|||
|
||||
int queue_pull(struct queue *q, void **ptr)
|
||||
{
|
||||
struct queue_cell *cell;
|
||||
struct queue_cell *cell, *buffer;
|
||||
size_t pos, seq;
|
||||
intptr_t diff;
|
||||
|
||||
buffer = (struct queue_cell*) ((char*) q + q->buffer_off);
|
||||
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
|
||||
for (;;) {
|
||||
cell = &q->buffer[pos & q->buffer_mask];
|
||||
cell = &buffer[pos & q->buffer_mask];
|
||||
seq = atomic_load_explicit(&cell->sequence, memory_order_acquire);
|
||||
diff = (intptr_t) seq - (intptr_t) (pos + 1);
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue