diff --git a/include/queue.h b/include/queue.h new file mode 100644 index 000000000..cbe9ee3ea --- /dev/null +++ b/include/queue.h @@ -0,0 +1,93 @@ +/** Lock-free Single-Producer Single-consumer (SPSC) queue. + * + * This datastructure queues void pointers in a FIFO style. + * Every queue element has an associated reference count which is + * used to tell wheater the queue is full or empty. + * + * Queue head and tail pointers are not part of this datastructure. + * In combination with the reference counting, this enables multiple + * readers each with its own head pointer. + * Each reader will read the same data. + * + * @file + * @author Steffen Vogel + * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC + * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. + * Unauthorized copying of this file, via any medium is strictly prohibited. + */ + +#ifndef _QUEUE_H_ +#define _QUEUE_H_ + +#include +#include +#include + +typedef uintmax_t qptr_t; + +struct queue { + size_t size; /**< Number of pointers in queue::array */ + _Atomic int *refcnts; /**< Reference counts to blocks in queue::array */ + _Atomic int readers; /**< Initial reference count for pushed blocks */ + + void **pointers; /**< Pointers to queue data */ +}; + +/** Initiliaze a new queue and allocate memory. */ +int queue_init(struct queue *q, size_t size); + +/** Release memory of queue. */ +void queue_destroy(struct queue *q); + +/** Increment the number of readers for this queue. + * + * Important: To garuantee thread-safety this function must by called by + * the (only) writer which holds \p tail. + */ +void queue_reader_add(struct queue *q, qptr_t head, qptr_t tail); + +/** Decrement the number of readers for this queue. + * + * Important: To garuantee thread-safety this function must by called by + * the (only) writer which holds \p tail. + */ +void queue_reader_remove(struct queue *q, qptr_t head, qptr_t tail); + +/** Enqueue up to \p cnt elements from \p ptrs[] at the queue tail pointed by \p tail. + * + * It may happen that the queue is (nearly) full and there is no more + * space to enqueue more elments. + * In this case a call to this function will return a value which is smaller than \p cnt + * or even zero if the queue was already full. + * + * @param q A pointer to the queue datastructure. + * @param[in] ptrs An array of void-pointers which should be enqueued. + * @param cnt The length of the pointer array \p ptrs. + * @param[in,out] tail A pointer to the current tail of the queue. The tail will be updated to new tail after eqeuing. + * @return The function returns the number of successfully enqueued elements from \p ptrs. + */ +int queue_push_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t *tail); + +/** Dequeue up to \p cnt elements from the queue and place them into the array \p ptrs[]. + * + * @param q A pointer to the queue datastructure. + * @param[out] ptrs An array with space at least \cnt elements which will receive pointers to the released elements. + * @param cnt The maximum number of elements which should be dequeued. It defines the size of \p ptrs. + * @param[in,out] head A pointer to a queue head. The value will be updated to reflect the new head. + * @return The number of elements which have been dequeued and whose reference counts have reached zero. + */ +int queue_pull_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t *head); + +/** Fill \p ptrs with \p cnt elements of the queue starting at entry \p pos. */ +int queue_get_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t pos); + + +int queue_get(struct queue *q, void **ptr, qptr_t pos); + +/** Enqueue a new block at the tail of the queue which is given by the qptr_t. */ +int queue_push(struct queue *q, void *ptr, qptr_t *tail); + +/** Dequeue the first block at the head of the queue. */ +int queue_pull(struct queue *q, void **ptr, qptr_t *head); + +#endif /* _QUEUE_H_ */ \ No newline at end of file diff --git a/lib/queue.c b/lib/queue.c new file mode 100644 index 000000000..ab6790d6d --- /dev/null +++ b/lib/queue.c @@ -0,0 +1,133 @@ +/** Lock-free Single-Producer Single-consumer (SPSC) queue. + * + * @author Steffen Vogel + * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC + * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. + * Unauthorized copying of this file, via any medium is strictly prohibited. + */ + +#include "queue.h" +#include "utils.h" + +int queue_init(struct queue *q, size_t size) +{ + q->size = size; + + q->pointers = alloc(size * sizeof(void *)); + q->refcnts = alloc(size * sizeof(atomic_uint)); + q->readers = 0; + + return 0; +} + +void queue_destroy(struct queue *q) +{ + free(q->pointers); + free(q->refcnts); +} + +void queue_reader_add(struct queue *q, qptr_t head, qptr_t tail) +{ + assert (head <= tail); + + atomic_fetch_add(&q->readers, 1); + + for (qptr_t i = head; i < tail; i++) + atomic_fetch_add(&q->refcnts[i % q->size], 1); +} + +void queue_reader_remove(struct queue *q, qptr_t head, qptr_t tail) +{ + assert (head <= tail); + + atomic_fetch_sub(&q->readers, 1); + + for (qptr_t i = head; i < tail; i++) + atomic_fetch_sub(&q->refcnts[i % q->size], 1); +} + +int queue_get(struct queue *q, void **ptr, qptr_t pos) +{ + int refcnt = atomic_load(&q->refcnts[pos % q->size]); + if (refcnt == 0) + return 0; + + *ptr = q->pointers[pos % q->size]; + + return 1; +} + +int queue_push(struct queue *q, void *ptr, qptr_t *tail) +{ + int refcnt; + + do { + refcnt = atomic_load(&q->refcnts[*tail % q->size]); + if (refcnt != 0) + return 0; /* Queue is full */ + + q->pointers[*tail % q->size] = ptr; + } while (!atomic_compare_exchange_weak(&q->refcnts[*tail % q->size], &refcnt, q->readers)); + + *tail = *tail + 1; + + return 1; +} + +int queue_pull(struct queue *q, void **ptr, qptr_t *head) +{ + int refcnt; + + do { + refcnt = atomic_load(&q->refcnts[*head % q->size]); + if (refcnt == 0) + return -1; /* Queue is empty */ + + *ptr = q->pointers[*head % q->size]; + } while (!atomic_compare_exchange_weak(&q->refcnts[*head % q->size], &refcnt, refcnt - 1)); + + *head = *head + 1; + + return refcnt == 1 ? 1 : 0; +} + +int queue_get_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t pos) +{ + int ret, i; + + for (i = 0; i < cnt; i++) { + ret = queue_get(q, &ptrs[i], pos + i); + if (ret == 0) + break; + } + + return i; +} + +int queue_push_many(struct queue *q, void **ptrs, size_t cnt, qptr_t *tail) +{ + int ret, i; + + for (i = 0; i < cnt; i++) { + ret = queue_push(q, ptrs[i], tail); + if (ret == 0) + break; + } + + return i; +} + +int queue_pull_many(struct queue *q, void **ptrs, size_t cnt, qptr_t *head) +{ + int released = 0, ret; + + for (int i = 0; i < cnt; i++) { + ret = queue_pull(q, &ptrs[i], head); + if (ret < 0) /* empty */ + break; + if (ret == 1) + released++; + } + + return released; +} \ No newline at end of file