1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

added new circular buffer queue data structure

This commit is contained in:
Steffen Vogel 2016-06-08 22:26:12 +02:00
parent 1b24f29088
commit 7820e66cee
2 changed files with 226 additions and 0 deletions

93
include/queue.h Normal file
View file

@ -0,0 +1,93 @@
/** Lock-free Single-Producer Single-consumer (SPSC) queue.
*
* This datastructure queues void pointers in a FIFO style.
* Every queue element has an associated reference count which is
* used to tell wheater the queue is full or empty.
*
* Queue head and tail pointers are not part of this datastructure.
* In combination with the reference counting, this enables multiple
* readers each with its own head pointer.
* Each reader will read the same data.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
* This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*/
#ifndef _QUEUE_H_
#define _QUEUE_H_
#include <stddef.h>
#include <stdint.h>
#include <stdatomic.h>
typedef uintmax_t qptr_t;
struct queue {
size_t size; /**< Number of pointers in queue::array */
_Atomic int *refcnts; /**< Reference counts to blocks in queue::array */
_Atomic int readers; /**< Initial reference count for pushed blocks */
void **pointers; /**< Pointers to queue data */
};
/** Initiliaze a new queue and allocate memory. */
int queue_init(struct queue *q, size_t size);
/** Release memory of queue. */
void queue_destroy(struct queue *q);
/** Increment the number of readers for this queue.
*
* Important: To garuantee thread-safety this function must by called by
* the (only) writer which holds \p tail.
*/
void queue_reader_add(struct queue *q, qptr_t head, qptr_t tail);
/** Decrement the number of readers for this queue.
*
* Important: To garuantee thread-safety this function must by called by
* the (only) writer which holds \p tail.
*/
void queue_reader_remove(struct queue *q, qptr_t head, qptr_t tail);
/** Enqueue up to \p cnt elements from \p ptrs[] at the queue tail pointed by \p tail.
*
* It may happen that the queue is (nearly) full and there is no more
* space to enqueue more elments.
* In this case a call to this function will return a value which is smaller than \p cnt
* or even zero if the queue was already full.
*
* @param q A pointer to the queue datastructure.
* @param[in] ptrs An array of void-pointers which should be enqueued.
* @param cnt The length of the pointer array \p ptrs.
* @param[in,out] tail A pointer to the current tail of the queue. The tail will be updated to new tail after eqeuing.
* @return The function returns the number of successfully enqueued elements from \p ptrs.
*/
int queue_push_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t *tail);
/** Dequeue up to \p cnt elements from the queue and place them into the array \p ptrs[].
*
* @param q A pointer to the queue datastructure.
* @param[out] ptrs An array with space at least \cnt elements which will receive pointers to the released elements.
* @param cnt The maximum number of elements which should be dequeued. It defines the size of \p ptrs.
* @param[in,out] head A pointer to a queue head. The value will be updated to reflect the new head.
* @return The number of elements which have been dequeued <b>and whose reference counts have reached zero</b>.
*/
int queue_pull_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t *head);
/** Fill \p ptrs with \p cnt elements of the queue starting at entry \p pos. */
int queue_get_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t pos);
int queue_get(struct queue *q, void **ptr, qptr_t pos);
/** Enqueue a new block at the tail of the queue which is given by the qptr_t. */
int queue_push(struct queue *q, void *ptr, qptr_t *tail);
/** Dequeue the first block at the head of the queue. */
int queue_pull(struct queue *q, void **ptr, qptr_t *head);
#endif /* _QUEUE_H_ */

133
lib/queue.c Normal file
View file

@ -0,0 +1,133 @@
/** Lock-free Single-Producer Single-consumer (SPSC) queue.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
* This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*/
#include "queue.h"
#include "utils.h"
int queue_init(struct queue *q, size_t size)
{
q->size = size;
q->pointers = alloc(size * sizeof(void *));
q->refcnts = alloc(size * sizeof(atomic_uint));
q->readers = 0;
return 0;
}
void queue_destroy(struct queue *q)
{
free(q->pointers);
free(q->refcnts);
}
void queue_reader_add(struct queue *q, qptr_t head, qptr_t tail)
{
assert (head <= tail);
atomic_fetch_add(&q->readers, 1);
for (qptr_t i = head; i < tail; i++)
atomic_fetch_add(&q->refcnts[i % q->size], 1);
}
void queue_reader_remove(struct queue *q, qptr_t head, qptr_t tail)
{
assert (head <= tail);
atomic_fetch_sub(&q->readers, 1);
for (qptr_t i = head; i < tail; i++)
atomic_fetch_sub(&q->refcnts[i % q->size], 1);
}
int queue_get(struct queue *q, void **ptr, qptr_t pos)
{
int refcnt = atomic_load(&q->refcnts[pos % q->size]);
if (refcnt == 0)
return 0;
*ptr = q->pointers[pos % q->size];
return 1;
}
int queue_push(struct queue *q, void *ptr, qptr_t *tail)
{
int refcnt;
do {
refcnt = atomic_load(&q->refcnts[*tail % q->size]);
if (refcnt != 0)
return 0; /* Queue is full */
q->pointers[*tail % q->size] = ptr;
} while (!atomic_compare_exchange_weak(&q->refcnts[*tail % q->size], &refcnt, q->readers));
*tail = *tail + 1;
return 1;
}
int queue_pull(struct queue *q, void **ptr, qptr_t *head)
{
int refcnt;
do {
refcnt = atomic_load(&q->refcnts[*head % q->size]);
if (refcnt == 0)
return -1; /* Queue is empty */
*ptr = q->pointers[*head % q->size];
} while (!atomic_compare_exchange_weak(&q->refcnts[*head % q->size], &refcnt, refcnt - 1));
*head = *head + 1;
return refcnt == 1 ? 1 : 0;
}
int queue_get_many(struct queue *q, void *ptrs[], size_t cnt, qptr_t pos)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = queue_get(q, &ptrs[i], pos + i);
if (ret == 0)
break;
}
return i;
}
int queue_push_many(struct queue *q, void **ptrs, size_t cnt, qptr_t *tail)
{
int ret, i;
for (i = 0; i < cnt; i++) {
ret = queue_push(q, ptrs[i], tail);
if (ret == 0)
break;
}
return i;
}
int queue_pull_many(struct queue *q, void **ptrs, size_t cnt, qptr_t *head)
{
int released = 0, ret;
for (int i = 0; i < cnt; i++) {
ret = queue_pull(q, &ptrs[i], head);
if (ret < 0) /* empty */
break;
if (ret == 1)
released++;
}
return released;
}