SPSC bounded without atomic functions

This commit is contained in:
Umar Farooq 2016-09-18 21:41:16 +02:00
parent e4d59650ed
commit 064ebc6fbe
6 changed files with 259 additions and 183 deletions

2
.gitignore vendored
View file

@ -5,6 +5,6 @@
spsc_ub_test_fib
spsc_ub_test_default
spsc_test
spsc_test_fib
mpmc_test
mpmc_test_fib

View file

@ -1,5 +1,4 @@
#TARGETS = spsc_ub_test_fib spsc_ub_test_default mpmc_test spsc_test
TARGETS = spsc_ub_test_fib spsc_ub_test_default mpmc_test mpmc_test_fib
TARGETS = spsc_ub_test_fib spsc_ub_test_default mpmc_test mpmc_test_fib spsc_test_fib
CFLAGS = -Wall -std=c11
ifeq ($(shell uname), Linux)
LIBS = -pthread
@ -29,8 +28,8 @@ mpmc_test: mpmc_test.o mpmc_queue.o memory.o
mpmc_test_fib: mpmc_test_fib.o mpmc_queue.o memory.o
$(CC) $^ -Wall $(LIBS) -o $@
#spsc_test: spsc_test.o spsc_queue.o memory.o
# $(CC) $^ -Wall $(LIBS) -o $@
spsc_test_fib: spsc_test_fib.o spsc_queue.o memory.o
$(CC) $^ -Wall $(LIBS) -o $@
%.o: %.c
$(CC) $(CFLAGS) -c $< -o $@

View file

@ -1,7 +1,7 @@
/** Lock-free Single-Producer Single-consumer (SPSC) queue.
*
* @author Umar Farooq
* @copyright 2016 Umar Farooq
* @copyright 2016 Umar Farooq <umar1.farooq1@gmail.com>
* @license BSD 2-Clause License
*
* All rights reserved.
@ -30,34 +30,33 @@
#include "spsc_queue.h"
void * spsc_queue_init(size_t size, const struct memtype *mem)
struct spsc_queue * spsc_queue_init(struct spsc_queue * q, size_t size, const struct memtype *mem)
{
struct spsc_queue *q;
if (size < sizeof(struct spsc_queue) + 2 * sizeof(q->pointers[0]))
return NULL;
/* Queue size must be 2 exponent */
if ((size < 2) || ((size & (size - 1)) != 0))
return NULL;
q = memory_alloc(mem, sizeof(struct spsc_queue) + sizeof(q->pointers[0]) * size);
q = memory_alloc(mem, sizeof(struct spsc_queue) + (sizeof(q->pointers[0]) * size));
if (!q)
return NULL;
q->mem = mem;
q->capacity = size - 1;
atomic_init(&q->tail, 0);
atomic_init(&q->head, 0);
atomic_init(&q->_tail, 0);
atomic_init(&q->_head, 0);
return q;
}
void spsc_queue_destroy(struct spsc_queue *q)
int spsc_queue_destroy(struct spsc_queue *q)
{
/* Nothing to do here */
return;
const struct memtype mem = *(q->mem); /** @todo Memory is not being freed properly */
return memory_free(&mem, q, sizeof(struct spsc_queue) + ((q->capacity + 1) * sizeof(q->pointers[0])));
}
int spsc_queue_get_many(struct spsc_queue *q, void *ptrs[], size_t cnt)
int spsc_queue_get_many(struct spsc_queue *q, void **ptrs[], size_t cnt)
{
int filled_slots = q->capacity - spsc_queue_available(q);
@ -65,28 +64,28 @@ int spsc_queue_get_many(struct spsc_queue *q, void *ptrs[], size_t cnt)
cnt = filled_slots;
for (int i = 0; i < cnt; i++)
ptrs[i] = q->pointers[q->head % (q->capacity + 1)];
ptrs[i] = &(q->pointers[q->_head % (q->capacity + 1)]);
return cnt;
}
int spsc_queue_push_many(struct spsc_queue *q, void **ptrs, size_t cnt)
int spsc_queue_push_many(struct spsc_queue *q, void *ptrs[], size_t cnt)
{
//int free_slots = q->tail < q->head ? q->head - q->tail - 1 : q->head + (q->capacity - q->tail);
//int free_slots = q->_tail < q->_head ? q->_head - q->_tail - 1 : q->_head + (q->capacity - q->_tail);
int free_slots = spsc_queue_available(q);
if (cnt > free_slots)
cnt = free_slots;
for (int i = 0; i < cnt; i++) {
q->pointers[q->tail] = ptrs[i]; //--? alternate use (q->tail + i)%(q->capacity + 1) as index and update q->tail at the end of loop
q->tail = (q->tail + 1)%(q->capacity + 1);
q->pointers[q->_tail] = ptrs[i]; //--? or (q->_tail + i)%(q->capacity + 1) as index and update q->_tail at end of loop
q->_tail = (q->_tail + 1)%(q->capacity + 1);
}
return cnt;
}
int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs, size_t cnt)
int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs[], size_t cnt)
{
int filled_slots = q->capacity - spsc_queue_available(q);
@ -94,8 +93,8 @@ int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs, size_t cnt)
cnt = filled_slots;
for (int i = 0; i < cnt; i++) {
ptrs[i] = q->pointers[q->head];
q->head = (q->head + 1)%(q->capacity + 1);
*ptrs[i] = q->pointers[q->_head];
q->_head = (q->_head + 1)%(q->capacity + 1);
}
return cnt;
@ -103,8 +102,20 @@ int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs, size_t cnt)
int spsc_queue_available(struct spsc_queue *q) //--? make this func inline
{
if (q->tail < q->head)
return q->head - q->tail - 1;
if (q->_tail < q->_head)
return q->_head - q->_tail - 1;
else
return q->head + (q->capacity - q->tail);
}
return q->_head + (q->capacity - q->_tail);
}
/**
int spsc_debug(struct spsc_queue *q)
{
printf("q->_tail %d, q->_head %d, q->capacity %lu\n", q->_tail, q->_head, q->capacity);
int temp_count = q->_head;
for (int i = 0; i < q->capacity - spsc_queue_available(q); i++) {
printf("Value of stored pointer %d: %p\n", i, q->pointers[q->_head]);
temp_count = (temp_count + 1)%(q->capacity + 1);
}
return 0;
} */

View file

@ -49,19 +49,25 @@ struct spsc_queue {
size_t capacity; /**< Total number of available pointers in queue::array */
/* Consumer part */
atomic_int tail; /**< Tail pointer of queue*/
_Atomic int _tail; /**< Tail pointer of queue*/
cacheline_pad_t _pad1;
/* Producer part */
atomic_int head; /**< Head pointer of queue*/
_Atomic int _head; /**< Head pointer of queue*/
void *pointers[]; /**< Circular buffer. */
};
/** Initiliaze a new queue and allocate memory. */
void * spsc_queue_init(size_t size, const struct memtype *mem);
struct spsc_queue * spsc_queue_init(struct spsc_queue *q, size_t size, const struct memtype *mem);
/** Release memory of queue. */
void spsc_queue_destroy(struct spsc_queue *q);
int spsc_queue_destroy(struct spsc_queue *q);
/** Return the number of free slots in a queue
*
* Note: This is only an estimate!
*/
int spsc_queue_available(struct spsc_queue *q);
/** Enqueue up to \p cnt elements from \p ptrs[] at the queue tail pointed by \p tail.
*
@ -85,33 +91,29 @@ int spsc_queue_push_many(struct spsc_queue *q, void *ptrs[], size_t cnt);
* @param[in,out] head A pointer to a queue head. The value will be updated to reflect the new head.
* @return The number of elements which have been dequeued.
*/
int spsc_queue_pull_many(struct spsc_queue *q, void *ptrs[], size_t cnt);
int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs[], size_t cnt);
/** Fill \p ptrs with \p cnt elements of the queue starting at entry \p pos. */
int spsc_queue_get_many(struct spsc_queue *q, void *ptrs[], size_t cnt);
/** Get the first element in the queue */
static inline int spsc_queue_get(struct spsc_queue *q, void **ptr)
{
return spsc_queue_get_many(q, ptr, 1);
}
int spsc_queue_get_many(struct spsc_queue *q, void **ptrs[], size_t cnt);
/** Enqueue a new block at the tail of the queue. */
static inline int spsc_queue_push(struct spsc_queue *q, void **ptr)
static inline int spsc_queue_push(struct spsc_queue *q, void *ptr)
{
return spsc_queue_push_many(q, ptr, 1);
return spsc_queue_push_many(q, &ptr, 1);
}
/** Dequeue the first block at the head of the queue. */
static inline int spsc_queue_pull(struct spsc_queue *q, void **ptr)
{
return spsc_queue_pull_many(q, ptr, 1);
return spsc_queue_pull_many(q, &ptr, 1);
}
/** Return the number of free slots in a queue
*
* Note: This is only an estimate!
*/
int spsc_queue_available(struct spsc_queue *q);
/** Get the first element in the queue */
static inline int spsc_queue_get(struct spsc_queue *q, void **ptr)
{
return spsc_queue_get_many(q, &ptr, 1);
}
//int spsc_debug(struct spsc_queue *q);
#endif /* _SPSC_QUEUE_H_ */

View file

@ -1,131 +0,0 @@
#include <sys/mman.h>
#include <stdio.h>
/* Required to allocate hugepages on Apple OS X */
#ifdef __MACH__
#include <mach/vm_statistics.h>
#endif
/* There are not many compilers yet which support threads.h natively.
* We will fallback to a drop-in replacement which is based on pthreads.h */
#include "c11threads.h"
#include "spsc_queue.h"
#define N 2000000
/* Static global storage */
int fibs[N];
int producer(void *ctx)
{
struct spsc_queue *q = (struct spsc_queue *) ctx;
/* Enqueue */
for (int count = 0, n1 = 0, n2 = 1, r; count < N; count++) {
fibs[count] = n1 + n2;
void *fibptr = (void *) &fibs[count];
r = spsc_queue_push(q, &fibptr);
if (r != 1) {
printf("Queue push failed\n");
return -1;
}
n1 = n2; n2 = fibs[count];
}
return 0;
}
int consumer(void *ctx)
{
struct spsc_queue *q = (struct spsc_queue *) ctx;
/* Dequeue */
for (int count = 0, n1 = 0, n2 = 1, r; count < N; count++) {
int fib = n1 + n2;
int *pulled;
r = spsc_queue_pull(q, (void **) &pulled);
if (r != 1) {
printf("Queue pull failed: %d\n", r);
return -1;
}
if (*pulled != fib) {
printf("Pulled != fib\n");
return -1;
}
n1 = n2; n2 = fib;
}
return 0;
}
int test_single_threaded(struct spsc_queue *q)
{
int resp, resc;
resp = producer(q);
if (resp)
printf("Enqueuing failed");
resc = consumer(q);
if (resc)
printf("Consumer failed");
if (resc || resp)
printf("Single Thread Test Failed");
else
printf("Single Thread Test Complete\n");
return 0;
}
int test_multi_threaded(struct spsc_queue *q)
{
thrd_t thrp, thrc;
int resp, resc;
thrd_create(&thrp, producer, q);
thrd_create(&thrc, consumer, q);
thrd_join(thrp, &resp);
thrd_join(thrc, &resc);
if (resc || resp)
printf("Queue Test failed");
else
printf("Two-thread Test Complete\n");
return 0;
}
void queue_info(struct spsc_queue *q)
{
printf("Queue tail %d, queue head %d, queue capacity %lu, Queue free slots %d\n", q->tail, q->head, q->capacity, spsc_queue_available(q));
}
int main(int argc, char *argv[])
{
struct spsc_queue *q;
size_t queue_size = 1 << 20; // 16 MiB;
q = spsc_queue_init(q, queue_size, &memtype_hugepage);
if (q) { //--? pass &q ....**ptr
printf("Error in initialization\n");
return -1;
}
queue_info(q);
test_single_threaded(q);
test_multi_threaded(q);
return 0;
}

195
spsc_test_fib.c Normal file
View file

@ -0,0 +1,195 @@
#ifdef __linux__
#define _GNU_SOURCE
#endif
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <stddef.h>
#include "spsc_queue.h"
#include "memory.h"
#include "c11threads.h"
/* Usage example */
#define N 20000000
int volatile g_start = 0;
/** Get thread id as integer
* In contrast to pthread_t which is an opaque type */
uint64_t thread_get_id()
{
#ifdef __MACH__
uint64_t id;
pthread_threadid_np(pthread_self(), &id);
return id;
#elif defined(SYS_gettid)
return (int) syscall(SYS_gettid);
#endif
return -1;
}
/** Get CPU timestep counter */
__attribute__((always_inline)) static inline uint64_t rdtscp()
{
uint64_t tsc;
__asm__ ("rdtscp;"
"shl $32, %%rdx;"
"or %%rdx,%%rax"
: "=a" (tsc)
:
: "%rcx", "%rdx", "memory");
return tsc;
}
/** Sleep, do nothing */
__attribute__((always_inline)) static inline void nop()
{
__asm__("rep nop;");
}
/* Static global storage */
int fibs[N];
int producer(void *ctx)
{
printf("producer\n"); //DELETEME
struct spsc_queue *q = (struct spsc_queue *) ctx;
srand((unsigned) time(0) + thread_get_id());
size_t pause = rand() % 1000;
/* Wait for global start signal */
while (g_start == 0)
thrd_yield();
/* Wait for a random time */
for (size_t i = 0; i != pause; i += 1)
nop();
/* Enqueue */
for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) {
fibs[count] = n1 + n2;
void *fibptr = (void *) &fibs[count];
if (!spsc_queue_push(q, fibptr)) {
printf("Queue push failed\n");
return -1;
}
n1 = n2; n2 = fibs[count];
}
return 0;
}
int consumer(void *ctx)
{
printf("consumer\n"); //DELETEME
struct spsc_queue *q = (struct spsc_queue *) ctx;
srand((unsigned) time(0) + thread_get_id());
size_t pause = rand() % 1000;
/* Wait for global start signal */
while (g_start == 0)
thrd_yield();
/* Wait for a random time */
for (size_t i = 0; i != pause; i += 1)
nop();
/* Dequeue */
for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) {
int fib = n1 + n2;
int *pulled;
while (!spsc_queue_pull(q, (void **) &pulled)) {
//printf("Queue empty: %d\n", temp);
//return -1;
}
if (*pulled != fib) {
printf("Pulled != fib\n");
return -1;
}
n1 = n2; n2 = fib;
}
return 0;
}
int test_single_threaded(struct spsc_queue *q)
{
int resp, resc;
g_start = 1;
resp = producer(q);
if (resp)
printf("Enqueuing failed");
resc = consumer(q);
if (resc)
printf("Consumer failed");
if (resc || resp)
printf("Single Thread Test Failed");
else
printf("Single Thread Test Complete\n");
return 0;
}
int test_multi_threaded(struct spsc_queue *q)
{
thrd_t thrp, thrc;
int resp, resc;
thrd_create(&thrp, consumer, q); /** @todo Why producer thread runs earlier? */
thrd_create(&thrc, producer, q);
sleep(1);
uint64_t start = rdtscp();
g_start = 1;
thrd_join(thrp, &resp);
thrd_join(thrc, &resc);
uint64_t end = rdtscp();
if (resc || resp)
printf("Queue Test failed");
else
printf("Two-thread Test Complete\n");
printf("cycles/op = %lu\n", (end - start) / N );
if (spsc_queue_available(q) != q->capacity)
printf("slots in use? There is something wrong with the test %d\n", spsc_queue_available(q));
return 0;
}
int main()
{
struct spsc_queue * q = NULL;
q = spsc_queue_init(q, 1<<20, &memtype_hugepage); /** @todo change size>1 in case of bounded queue impl. memtype_hugepage impl for un_spsc */
//test_single_threaded(q);
test_multi_threaded(q);
int ret = spsc_queue_destroy(q);
if (ret)
printf("Failed to destroy queue: %d", ret);
return 0;
}