diff --git a/include/villas/queue.h b/include/villas/queue.h index feee51e4f..6663f2b09 100644 --- a/include/villas/queue.h +++ b/include/villas/queue.h @@ -1,93 +1,81 @@ -/** Lock-free Single-Producer Single-consumer (SPSC) queue. +/** Lock-free Multiple-Producer Multiple-consumer (MPMC) queue. * - * This datastructure queues void pointers in a FIFO style. - * Every queue element has an associated reference count which is - * used to tell wheater the queue is full or empty. + * Based on Dmitry Vyukov#s Bounded MPMC queue: + * http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue * - * Queue head and tail pointers are not part of this datastructure. - * In combination with the reference counting, this enables multiple - * readers each with its own head pointer. - * Each reader will read the same data. - * - * @file - * @author Steffen Vogel - * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC - * This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential. - * Unauthorized copying of this file, via any medium is strictly prohibited. + * @author Steffen Vogel + * @copyright 2016 Steffen Vogel + * @license BSD 2-Clause License + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modiffication, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#ifndef _QUEUE_H_ -#define _QUEUE_H_ +#ifndef _MPMC_QUEUE_H_ +#define _MPMC_QUEUE_H_ -#include #include #include -typedef uintmax_t qptr_t; +#include "memory.h" -struct queue { - size_t size; /**< Number of pointers in queue::array */ - _Atomic int *refcnts; /**< Reference counts to blocks in queue::array */ - _Atomic int readers; /**< Initial reference count for pushed blocks */ +static size_t const cacheline_size = 64; +typedef char cacheline_pad_t[cacheline_size]; - void **pointers; /**< Pointers to queue data */ +struct mpmc_queue { + cacheline_pad_t _pad0; /**< Shared area: all threads read */ + struct memtype const * mem; + size_t buffer_mask; + struct mpmc_queue_cell { + atomic_size_t sequence; + void *data; + } *buffer; + cacheline_pad_t _pad1; /**> Producer area: only producers read & write */ + atomic_size_t tail; /**> Queue tail pointer */ + cacheline_pad_t _pad2; /**> Consumer area: only consumers read & write */ + atomic_size_t head; /**> Queue head pointer */ + cacheline_pad_t _pad3; /**> @todo Why needed? */ }; -/** Initiliaze a new queue and allocate memory. */ -int queue_init(struct queue *q, size_t size); +/** Initialize MPMC queue */ +int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem); -/** Release memory of queue. */ -void queue_destroy(struct queue *q); +/** Desroy MPMC queue and release memory */ +int mpmc_queue_destroy(struct mpmc_queue *q); -/** Increment the number of readers for this queue. +/** Return estimation of current queue usage. * - * Important: To garuantee thread-safety this function must by called by - * the (only) writer which holds \p tail. - */ -void queue_reader_add(struct queue *q, qptr_t head, qptr_t tail); + * Note: This is only an estimation and not accurate as long other + * threads are performing operations. + */ +size_t mpmc_queue_available(struct mpmc_queue *q); -/** Decrement the number of readers for this queue. - * - * Important: To garuantee thread-safety this function must by called by - * the (only) writer which holds \p tail. - */ -void queue_reader_remove(struct queue *q, qptr_t head, qptr_t tail); +int mpmc_queue_push(struct mpmc_queue *q, void *ptr); -/** Enqueue up to \p cnt elements from \p ptrs[] at the queue tail pointed by \p tail. - * - * It may happen that the queue is (nearly) full and there is no more - * space to enqueue more elments. - * In this case a call to this function will return a value which is smaller than \p cnt - * or even zero if the queue was already full. - * - * @param q A pointer to the queue datastructure. - * @param[in] ptrs An array of void-pointers which should be enqueued. - * @param cnt The length of the pointer array \p ptrs. - * @param[in,out] tail A pointer to the current tail of the queue. The tail will be updated to new tail after eqeuing. - * @return The function returns the number of successfully enqueued elements from \p ptrs. - */ -int queue_push_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t *tail); +int mpmc_queue_pull(struct mpmc_queue *q, void **ptr); -/** Dequeue up to \p cnt elements from the queue and place them into the array \p ptrs[]. - * - * @param q A pointer to the queue datastructure. - * @param[out] ptrs An array with space at least \cnt elements which will receive pointers to the released elements. - * @param cnt The maximum number of elements which should be dequeued. It defines the size of \p ptrs. - * @param[in,out] head A pointer to a queue head. The value will be updated to reflect the new head. - * @return The number of elements which have been dequeued and whose reference counts have reached zero. - */ -int queue_pull_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t *head); +int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt); -/** Fill \p ptrs with \p cnt elements of the queue starting at entry \p pos. */ -int queue_get_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t pos); +int mpmc_queue_pull_many(struct mpmc_queue *q, void **ptr[], size_t cnt); - -int queue_get(struct queue *q, void **ptr, qptr_t pos); - -/** Enqueue a new block at the tail of the queue which is given by the qptr_t. */ -int queue_push(struct queue *q, void *ptr, qptr_t *tail); - -/** Dequeue the first block at the head of the queue. */ -int queue_pull(struct queue *q, void **ptr, qptr_t *head); - -#endif /* _QUEUE_H_ */ \ No newline at end of file +#endif /* _MPMC_QUEUE_H_ */ \ No newline at end of file diff --git a/lib/queue.c b/lib/queue.c index 40b420693..cfa46ee5d 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -1,133 +1,157 @@ -/** Lock-free Single-Producer Single-consumer (SPSC) queue. +/** Lock-free Multiple-Producer Multiple-consumer (MPMC) queue. * - * @author Steffen Vogel - * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC - * This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential. - * Unauthorized copying of this file, via any medium is strictly prohibited. + * Based on Dmitry Vyukov#s Bounded MPMC queue: + * http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue + * + * @author Steffen Vogel + * @copyright 2016 Steffen Vogel + * @license BSD 2-Clause License + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modiffication, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#include "queue.h" -#include "utils.h" +#include "mpmc_queue.h" -int queue_init(struct queue *q, size_t size) +/** Initialize MPMC queue */ +int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem) { - q->size = size; + /* Queue size must be 2 exponent */ + if ((size < 2) || ((size & (size - 1)) != 0)) + return -1; - q->pointers = alloc(size * sizeof(void *)); - q->refcnts = alloc(size * sizeof(atomic_uint)); - q->readers = 0; + q->mem = mem; + q->buffer_mask = size - 1; + q->buffer = memory_alloc(q->mem, sizeof(q->buffer[0]) * size); + if (!q->buffer) + return -2; + + for (size_t i = 0; i != size; i += 1) + atomic_store_explicit(&q->buffer[i].sequence, i, memory_order_relaxed); + atomic_store_explicit(&q->tail, 0, memory_order_relaxed); + atomic_store_explicit(&q->head, 0, memory_order_relaxed); + return 0; } -void queue_destroy(struct queue *q) +int mpmc_queue_destroy(struct mpmc_queue *q) { - free(q->pointers); - free(q->refcnts); + return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(sizeof(q->buffer[0]))); } -void queue_reader_add(struct queue *q, qptr_t head, qptr_t tail) +/** Return estimation of current queue usage. + * + * Note: This is only an estimation and not accurate as long other + * threads are performing operations. + */ +size_t mpmc_queue_available(struct mpmc_queue *q) { - assert (head <= tail); - - atomic_fetch_add(&q->readers, 1); - - for (qptr_t i = head; i < tail; i++) - atomic_fetch_add(&q->refcnts[i % q->size], 1); + return atomic_load_explicit(&q->tail, memory_order_relaxed) - + atomic_load_explicit(&q->head, memory_order_relaxed); } -void queue_reader_remove(struct queue *q, qptr_t head, qptr_t tail) +int mpmc_queue_push(struct mpmc_queue *q, void *ptr) { - assert (head <= tail); + struct mpmc_queue_cell *cell; + size_t pos, seq; + intptr_t diff; - atomic_fetch_sub(&q->readers, 1); - - for (qptr_t i = head; i < tail; i++) - atomic_fetch_sub(&q->refcnts[i % q->size], 1); -} + pos = atomic_load_explicit(&q->tail, memory_order_relaxed); + for (;;) { + cell = &q->buffer[pos & q->buffer_mask]; + seq = atomic_load_explicit(&cell->sequence, memory_order_acquire); + diff = (intptr_t) seq - (intptr_t) pos; -int queue_get(struct queue *q, void **ptr, qptr_t pos) -{ - int refcnt = atomic_load(&q->refcnts[pos % q->size]); - if (refcnt == 0) - return 0; - - *ptr = q->pointers[pos % q->size]; + if (diff == 0) { + if (atomic_compare_exchange_weak_explicit(&q->tail, &pos, pos + 1, memory_order_relaxed, memory_order_seq_cst)) + break; + } + else if (diff < 0) + return 0; + else + pos = atomic_load_explicit(&q->tail, memory_order_relaxed); + } + + cell->data = ptr; + atomic_store_explicit(&cell->sequence, pos + 1, memory_order_release); return 1; } -int queue_push(struct queue *q, void *ptr, qptr_t *tail) +int mpmc_queue_pull(struct mpmc_queue *q, void **ptr) { - int refcnt; - - do { - refcnt = atomic_load(&q->refcnts[*tail % q->size]); - if (refcnt != 0) - return 0; /* Queue is full */ - - q->pointers[*tail % q->size] = ptr; - } while (!atomic_compare_exchange_weak(&q->refcnts[*tail % q->size], &refcnt, q->readers)); - - *tail = *tail + 1; + struct mpmc_queue_cell *cell; + size_t pos, seq; + intptr_t diff; + pos = atomic_load_explicit(&q->head, memory_order_relaxed); + for (;;) { + cell = &q->buffer[pos & q->buffer_mask]; + + seq = atomic_load_explicit(&cell->sequence, memory_order_acquire); + diff = (intptr_t) seq - (intptr_t) (pos + 1); + + if (diff == 0) { + if (atomic_compare_exchange_weak_explicit(&q->head, &pos, pos + 1, memory_order_relaxed, memory_order_seq_cst)) + break; + } + else if (diff < 0) + return 0; + else + pos = atomic_load_explicit(&q->head, memory_order_relaxed); + } + + *ptr = cell->data; + atomic_store_explicit(&cell->sequence, pos + q->buffer_mask + 1, memory_order_release); + return 1; } -int queue_pull(struct queue *q, void **ptr, qptr_t *head) +int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt) { - int refcnt; - - do { - refcnt = atomic_load(&q->refcnts[*head % q->size]); - if (refcnt == 0) - return -1; /* Queue is empty */ - - *ptr = q->pointers[*head % q->size]; - } while (!atomic_compare_exchange_weak(&q->refcnts[*head % q->size], &refcnt, refcnt - 1)); - - *head = *head + 1; - - return refcnt == 1 ? 1 : 0; -} - -int queue_get_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t pos) -{ - int ret, i; + int ret; + size_t i; for (i = 0; i < cnt; i++) { - ret = queue_get(q, &ptrs[i], pos + i); - if (ret == 0) + ret = mpmc_queue_push(q, ptr[i]); + if (!ret) break; } - + return i; } -int queue_push_many(struct queue *q, void **ptrs, size_t cnt, qptr_t *tail) +int mpmc_queue_pull_many(struct mpmc_queue *q, void **ptr[], size_t cnt) { - int ret, i; + int ret; + size_t i; for (i = 0; i < cnt; i++) { - ret = queue_push(q, ptrs[i], tail); - if (ret == 0) + ret = mpmc_queue_pull(q, ptr[i]); + if (!ret) break; } - + return i; } - -int queue_pull_many(struct queue *q, void **ptrs, size_t cnt, qptr_t *head) -{ - int released = 0, ret; - - for (int i = 0; i < cnt; i++) { - ret = queue_pull(q, &ptrs[i], head); - if (ret < 0) /* empty */ - break; - if (ret == 1) - released++; - } - - return released; -} \ No newline at end of file