mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
replaced old queue with new MPMC implementation
This commit is contained in:
parent
e83065e916
commit
d0dc7e216e
2 changed files with 175 additions and 163 deletions
|
@ -1,93 +1,81 @@
|
|||
/** Lock-free Single-Producer Single-consumer (SPSC) queue.
|
||||
/** Lock-free Multiple-Producer Multiple-consumer (MPMC) queue.
|
||||
*
|
||||
* This datastructure queues void pointers in a FIFO style.
|
||||
* Every queue element has an associated reference count which is
|
||||
* used to tell wheater the queue is full or empty.
|
||||
* Based on Dmitry Vyukov#s Bounded MPMC queue:
|
||||
* http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
|
||||
*
|
||||
* Queue head and tail pointers are not part of this datastructure.
|
||||
* In combination with the reference counting, this enables multiple
|
||||
* readers each with its own head pointer.
|
||||
* Each reader will read the same data.
|
||||
*
|
||||
* @file
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
|
||||
* This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential.
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* @author Steffen Vogel <post@steffenvogel.de>
|
||||
* @copyright 2016 Steffen Vogel
|
||||
* @license BSD 2-Clause License
|
||||
*
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modiffication, are permitted provided that the following conditions are met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright notice, this
|
||||
* list of conditions and the following disclaimer.
|
||||
*
|
||||
* * Redistributions in binary form must reproduce the above copyright notice,
|
||||
* this list of conditions and the following disclaimer in the documentation
|
||||
* and/or other materials provided with the distribution.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#ifndef _QUEUE_H_
|
||||
#define _QUEUE_H_
|
||||
#ifndef _MPMC_QUEUE_H_
|
||||
#define _MPMC_QUEUE_H_
|
||||
|
||||
#include <stddef.h>
|
||||
#include <stdint.h>
|
||||
#include <stdatomic.h>
|
||||
|
||||
typedef uintmax_t qptr_t;
|
||||
#include "memory.h"
|
||||
|
||||
struct queue {
|
||||
size_t size; /**< Number of pointers in queue::array */
|
||||
_Atomic int *refcnts; /**< Reference counts to blocks in queue::array */
|
||||
_Atomic int readers; /**< Initial reference count for pushed blocks */
|
||||
static size_t const cacheline_size = 64;
|
||||
typedef char cacheline_pad_t[cacheline_size];
|
||||
|
||||
void **pointers; /**< Pointers to queue data */
|
||||
struct mpmc_queue {
|
||||
cacheline_pad_t _pad0; /**< Shared area: all threads read */
|
||||
struct memtype const * mem;
|
||||
size_t buffer_mask;
|
||||
struct mpmc_queue_cell {
|
||||
atomic_size_t sequence;
|
||||
void *data;
|
||||
} *buffer;
|
||||
cacheline_pad_t _pad1; /**> Producer area: only producers read & write */
|
||||
atomic_size_t tail; /**> Queue tail pointer */
|
||||
cacheline_pad_t _pad2; /**> Consumer area: only consumers read & write */
|
||||
atomic_size_t head; /**> Queue head pointer */
|
||||
cacheline_pad_t _pad3; /**> @todo Why needed? */
|
||||
};
|
||||
|
||||
/** Initiliaze a new queue and allocate memory. */
|
||||
int queue_init(struct queue *q, size_t size);
|
||||
/** Initialize MPMC queue */
|
||||
int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem);
|
||||
|
||||
/** Release memory of queue. */
|
||||
void queue_destroy(struct queue *q);
|
||||
/** Desroy MPMC queue and release memory */
|
||||
int mpmc_queue_destroy(struct mpmc_queue *q);
|
||||
|
||||
/** Increment the number of readers for this queue.
|
||||
/** Return estimation of current queue usage.
|
||||
*
|
||||
* Important: To garuantee thread-safety this function must by called by
|
||||
* the (only) writer which holds \p tail.
|
||||
*/
|
||||
void queue_reader_add(struct queue *q, qptr_t head, qptr_t tail);
|
||||
* Note: This is only an estimation and not accurate as long other
|
||||
* threads are performing operations.
|
||||
*/
|
||||
size_t mpmc_queue_available(struct mpmc_queue *q);
|
||||
|
||||
/** Decrement the number of readers for this queue.
|
||||
*
|
||||
* Important: To garuantee thread-safety this function must by called by
|
||||
* the (only) writer which holds \p tail.
|
||||
*/
|
||||
void queue_reader_remove(struct queue *q, qptr_t head, qptr_t tail);
|
||||
int mpmc_queue_push(struct mpmc_queue *q, void *ptr);
|
||||
|
||||
/** Enqueue up to \p cnt elements from \p ptrs[] at the queue tail pointed by \p tail.
|
||||
*
|
||||
* It may happen that the queue is (nearly) full and there is no more
|
||||
* space to enqueue more elments.
|
||||
* In this case a call to this function will return a value which is smaller than \p cnt
|
||||
* or even zero if the queue was already full.
|
||||
*
|
||||
* @param q A pointer to the queue datastructure.
|
||||
* @param[in] ptrs An array of void-pointers which should be enqueued.
|
||||
* @param cnt The length of the pointer array \p ptrs.
|
||||
* @param[in,out] tail A pointer to the current tail of the queue. The tail will be updated to new tail after eqeuing.
|
||||
* @return The function returns the number of successfully enqueued elements from \p ptrs.
|
||||
*/
|
||||
int queue_push_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t *tail);
|
||||
int mpmc_queue_pull(struct mpmc_queue *q, void **ptr);
|
||||
|
||||
/** Dequeue up to \p cnt elements from the queue and place them into the array \p ptrs[].
|
||||
*
|
||||
* @param q A pointer to the queue datastructure.
|
||||
* @param[out] ptrs An array with space at least \cnt elements which will receive pointers to the released elements.
|
||||
* @param cnt The maximum number of elements which should be dequeued. It defines the size of \p ptrs.
|
||||
* @param[in,out] head A pointer to a queue head. The value will be updated to reflect the new head.
|
||||
* @return The number of elements which have been dequeued <b>and whose reference counts have reached zero</b>.
|
||||
*/
|
||||
int queue_pull_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t *head);
|
||||
int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt);
|
||||
|
||||
/** Fill \p ptrs with \p cnt elements of the queue starting at entry \p pos. */
|
||||
int queue_get_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t pos);
|
||||
int mpmc_queue_pull_many(struct mpmc_queue *q, void **ptr[], size_t cnt);
|
||||
|
||||
|
||||
int queue_get(struct queue *q, void **ptr, qptr_t pos);
|
||||
|
||||
/** Enqueue a new block at the tail of the queue which is given by the qptr_t. */
|
||||
int queue_push(struct queue *q, void *ptr, qptr_t *tail);
|
||||
|
||||
/** Dequeue the first block at the head of the queue. */
|
||||
int queue_pull(struct queue *q, void **ptr, qptr_t *head);
|
||||
|
||||
#endif /* _QUEUE_H_ */
|
||||
#endif /* _MPMC_QUEUE_H_ */
|
204
lib/queue.c
204
lib/queue.c
|
@ -1,133 +1,157 @@
|
|||
/** Lock-free Single-Producer Single-consumer (SPSC) queue.
|
||||
/** Lock-free Multiple-Producer Multiple-consumer (MPMC) queue.
|
||||
*
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
|
||||
* This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential.
|
||||
* Unauthorized copying of this file, via any medium is strictly prohibited.
|
||||
* Based on Dmitry Vyukov#s Bounded MPMC queue:
|
||||
* http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
|
||||
*
|
||||
* @author Steffen Vogel <post@steffenvogel.de>
|
||||
* @copyright 2016 Steffen Vogel
|
||||
* @license BSD 2-Clause License
|
||||
*
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modiffication, are permitted provided that the following conditions are met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright notice, this
|
||||
* list of conditions and the following disclaimer.
|
||||
*
|
||||
* * Redistributions in binary form must reproduce the above copyright notice,
|
||||
* this list of conditions and the following disclaimer in the documentation
|
||||
* and/or other materials provided with the distribution.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#include "queue.h"
|
||||
#include "utils.h"
|
||||
#include "mpmc_queue.h"
|
||||
|
||||
int queue_init(struct queue *q, size_t size)
|
||||
/** Initialize MPMC queue */
|
||||
int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem)
|
||||
{
|
||||
q->size = size;
|
||||
/* Queue size must be 2 exponent */
|
||||
if ((size < 2) || ((size & (size - 1)) != 0))
|
||||
return -1;
|
||||
|
||||
q->pointers = alloc(size * sizeof(void *));
|
||||
q->refcnts = alloc(size * sizeof(atomic_uint));
|
||||
q->readers = 0;
|
||||
q->mem = mem;
|
||||
q->buffer_mask = size - 1;
|
||||
q->buffer = memory_alloc(q->mem, sizeof(q->buffer[0]) * size);
|
||||
if (!q->buffer)
|
||||
return -2;
|
||||
|
||||
for (size_t i = 0; i != size; i += 1)
|
||||
atomic_store_explicit(&q->buffer[i].sequence, i, memory_order_relaxed);
|
||||
|
||||
atomic_store_explicit(&q->tail, 0, memory_order_relaxed);
|
||||
atomic_store_explicit(&q->head, 0, memory_order_relaxed);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void queue_destroy(struct queue *q)
|
||||
int mpmc_queue_destroy(struct mpmc_queue *q)
|
||||
{
|
||||
free(q->pointers);
|
||||
free(q->refcnts);
|
||||
return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(sizeof(q->buffer[0])));
|
||||
}
|
||||
|
||||
void queue_reader_add(struct queue *q, qptr_t head, qptr_t tail)
|
||||
/** Return estimation of current queue usage.
|
||||
*
|
||||
* Note: This is only an estimation and not accurate as long other
|
||||
* threads are performing operations.
|
||||
*/
|
||||
size_t mpmc_queue_available(struct mpmc_queue *q)
|
||||
{
|
||||
assert (head <= tail);
|
||||
|
||||
atomic_fetch_add(&q->readers, 1);
|
||||
|
||||
for (qptr_t i = head; i < tail; i++)
|
||||
atomic_fetch_add(&q->refcnts[i % q->size], 1);
|
||||
return atomic_load_explicit(&q->tail, memory_order_relaxed) -
|
||||
atomic_load_explicit(&q->head, memory_order_relaxed);
|
||||
}
|
||||
|
||||
void queue_reader_remove(struct queue *q, qptr_t head, qptr_t tail)
|
||||
int mpmc_queue_push(struct mpmc_queue *q, void *ptr)
|
||||
{
|
||||
assert (head <= tail);
|
||||
struct mpmc_queue_cell *cell;
|
||||
size_t pos, seq;
|
||||
intptr_t diff;
|
||||
|
||||
atomic_fetch_sub(&q->readers, 1);
|
||||
|
||||
for (qptr_t i = head; i < tail; i++)
|
||||
atomic_fetch_sub(&q->refcnts[i % q->size], 1);
|
||||
}
|
||||
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
|
||||
for (;;) {
|
||||
cell = &q->buffer[pos & q->buffer_mask];
|
||||
seq = atomic_load_explicit(&cell->sequence, memory_order_acquire);
|
||||
diff = (intptr_t) seq - (intptr_t) pos;
|
||||
|
||||
int queue_get(struct queue *q, void **ptr, qptr_t pos)
|
||||
{
|
||||
int refcnt = atomic_load(&q->refcnts[pos % q->size]);
|
||||
if (refcnt == 0)
|
||||
return 0;
|
||||
|
||||
*ptr = q->pointers[pos % q->size];
|
||||
if (diff == 0) {
|
||||
if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, memory_order_relaxed, memory_order_seq_cst))
|
||||
break;
|
||||
}
|
||||
else if (diff < 0)
|
||||
return 0;
|
||||
else
|
||||
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
|
||||
}
|
||||
|
||||
cell->data = ptr;
|
||||
atomic_store_explicit(&cell->sequence, pos + 1, memory_order_release);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int queue_push(struct queue *q, void *ptr, qptr_t *tail)
|
||||
int mpmc_queue_pull(struct mpmc_queue *q, void **ptr)
|
||||
{
|
||||
int refcnt;
|
||||
|
||||
do {
|
||||
refcnt = atomic_load(&q->refcnts[*tail % q->size]);
|
||||
if (refcnt != 0)
|
||||
return 0; /* Queue is full */
|
||||
|
||||
q->pointers[*tail % q->size] = ptr;
|
||||
} while (!atomic_compare_exchange_weak(&q->refcnts[*tail % q->size], &refcnt, q->readers));
|
||||
|
||||
*tail = *tail + 1;
|
||||
struct mpmc_queue_cell *cell;
|
||||
size_t pos, seq;
|
||||
intptr_t diff;
|
||||
|
||||
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
|
||||
for (;;) {
|
||||
cell = &q->buffer[pos & q->buffer_mask];
|
||||
|
||||
seq = atomic_load_explicit(&cell->sequence, memory_order_acquire);
|
||||
diff = (intptr_t) seq - (intptr_t) (pos + 1);
|
||||
|
||||
if (diff == 0) {
|
||||
if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, memory_order_relaxed, memory_order_seq_cst))
|
||||
break;
|
||||
}
|
||||
else if (diff < 0)
|
||||
return 0;
|
||||
else
|
||||
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
|
||||
}
|
||||
|
||||
*ptr = cell->data;
|
||||
atomic_store_explicit(&cell->sequence, pos + q->buffer_mask + 1, memory_order_release);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int queue_pull(struct queue *q, void **ptr, qptr_t *head)
|
||||
int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt)
|
||||
{
|
||||
int refcnt;
|
||||
|
||||
do {
|
||||
refcnt = atomic_load(&q->refcnts[*head % q->size]);
|
||||
if (refcnt == 0)
|
||||
return -1; /* Queue is empty */
|
||||
|
||||
*ptr = q->pointers[*head % q->size];
|
||||
} while (!atomic_compare_exchange_weak(&q->refcnts[*head % q->size], &refcnt, refcnt - 1));
|
||||
|
||||
*head = *head + 1;
|
||||
|
||||
return refcnt == 1 ? 1 : 0;
|
||||
}
|
||||
|
||||
int queue_get_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t pos)
|
||||
{
|
||||
int ret, i;
|
||||
int ret;
|
||||
size_t i;
|
||||
|
||||
for (i = 0; i < cnt; i++) {
|
||||
ret = queue_get(q, &ptrs[i], pos + i);
|
||||
if (ret == 0)
|
||||
ret = mpmc_queue_push(q, ptr[i]);
|
||||
if (!ret)
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
return i;
|
||||
}
|
||||
|
||||
int queue_push_many(struct queue *q, void **ptrs, size_t cnt, qptr_t *tail)
|
||||
int mpmc_queue_pull_many(struct mpmc_queue *q, void **ptr[], size_t cnt)
|
||||
{
|
||||
int ret, i;
|
||||
int ret;
|
||||
size_t i;
|
||||
|
||||
for (i = 0; i < cnt; i++) {
|
||||
ret = queue_push(q, ptrs[i], tail);
|
||||
if (ret == 0)
|
||||
ret = mpmc_queue_pull(q, ptr[i]);
|
||||
if (!ret)
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
return i;
|
||||
}
|
||||
|
||||
int queue_pull_many(struct queue *q, void **ptrs, size_t cnt, qptr_t *head)
|
||||
{
|
||||
int released = 0, ret;
|
||||
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
ret = queue_pull(q, &ptrs[i], head);
|
||||
if (ret < 0) /* empty */
|
||||
break;
|
||||
if (ret == 1)
|
||||
released++;
|
||||
}
|
||||
|
||||
return released;
|
||||
}
|
Loading…
Add table
Reference in a new issue