From 064ebc6fbe6940755727578fc46319362532b6e3 Mon Sep 17 00:00:00 2001 From: Umar Farooq Date: Sun, 18 Sep 2016 21:41:16 +0200 Subject: [PATCH] SPSC bounded without atomic functions --- .gitignore | 2 +- Makefile | 7 +- spsc_queue.c | 65 +++++++++------- spsc_queue.h | 42 ++++++----- spsc_test.c | 131 -------------------------------- spsc_test_fib.c | 195 ++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 259 insertions(+), 183 deletions(-) delete mode 100644 spsc_test.c create mode 100644 spsc_test_fib.c diff --git a/.gitignore b/.gitignore index 3f5a4c3..5bab3a5 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,6 @@ spsc_ub_test_fib spsc_ub_test_default -spsc_test +spsc_test_fib mpmc_test mpmc_test_fib diff --git a/Makefile b/Makefile index 4c30a86..059c682 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,4 @@ -#TARGETS = spsc_ub_test_fib spsc_ub_test_default mpmc_test spsc_test -TARGETS = spsc_ub_test_fib spsc_ub_test_default mpmc_test mpmc_test_fib +TARGETS = spsc_ub_test_fib spsc_ub_test_default mpmc_test mpmc_test_fib spsc_test_fib CFLAGS = -Wall -std=c11 ifeq ($(shell uname), Linux) LIBS = -pthread @@ -29,8 +28,8 @@ mpmc_test: mpmc_test.o mpmc_queue.o memory.o mpmc_test_fib: mpmc_test_fib.o mpmc_queue.o memory.o $(CC) $^ -Wall $(LIBS) -o $@ -#spsc_test: spsc_test.o spsc_queue.o memory.o -# $(CC) $^ -Wall $(LIBS) -o $@ +spsc_test_fib: spsc_test_fib.o spsc_queue.o memory.o + $(CC) $^ -Wall $(LIBS) -o $@ %.o: %.c $(CC) $(CFLAGS) -c $< -o $@ diff --git a/spsc_queue.c b/spsc_queue.c index 33c2647..1abd2bc 100644 --- a/spsc_queue.c +++ b/spsc_queue.c @@ -1,7 +1,7 @@ /** Lock-free Single-Producer Single-consumer (SPSC) queue. * * @author Umar Farooq - * @copyright 2016 Umar Farooq + * @copyright 2016 Umar Farooq * @license BSD 2-Clause License * * All rights reserved. @@ -30,34 +30,33 @@ #include "spsc_queue.h" -void * spsc_queue_init(size_t size, const struct memtype *mem) +struct spsc_queue * spsc_queue_init(struct spsc_queue * q, size_t size, const struct memtype *mem) { - struct spsc_queue *q; - - if (size < sizeof(struct spsc_queue) + 2 * sizeof(q->pointers[0])) - return NULL; - /* Queue size must be 2 exponent */ if ((size < 2) || ((size & (size - 1)) != 0)) return NULL; - q = memory_alloc(mem, sizeof(struct spsc_queue) + sizeof(q->pointers[0]) * size); + q = memory_alloc(mem, sizeof(struct spsc_queue) + (sizeof(q->pointers[0]) * size)); + if (!q) + return NULL; + + q->mem = mem; q->capacity = size - 1; - - atomic_init(&q->tail, 0); - atomic_init(&q->head, 0); + + atomic_init(&q->_tail, 0); + atomic_init(&q->_head, 0); return q; } -void spsc_queue_destroy(struct spsc_queue *q) +int spsc_queue_destroy(struct spsc_queue *q) { - /* Nothing to do here */ - return; + const struct memtype mem = *(q->mem); /** @todo Memory is not being freed properly */ + return memory_free(&mem, q, sizeof(struct spsc_queue) + ((q->capacity + 1) * sizeof(q->pointers[0]))); } -int spsc_queue_get_many(struct spsc_queue *q, void *ptrs[], size_t cnt) +int spsc_queue_get_many(struct spsc_queue *q, void **ptrs[], size_t cnt) { int filled_slots = q->capacity - spsc_queue_available(q); @@ -65,28 +64,28 @@ int spsc_queue_get_many(struct spsc_queue *q, void *ptrs[], size_t cnt) cnt = filled_slots; for (int i = 0; i < cnt; i++) - ptrs[i] = q->pointers[q->head % (q->capacity + 1)]; + ptrs[i] = &(q->pointers[q->_head % (q->capacity + 1)]); return cnt; } -int spsc_queue_push_many(struct spsc_queue *q, void **ptrs, size_t cnt) +int spsc_queue_push_many(struct spsc_queue *q, void *ptrs[], size_t cnt) { - //int free_slots = q->tail < q->head ? q->head - q->tail - 1 : q->head + (q->capacity - q->tail); + //int free_slots = q->_tail < q->_head ? q->_head - q->_tail - 1 : q->_head + (q->capacity - q->_tail); int free_slots = spsc_queue_available(q); if (cnt > free_slots) cnt = free_slots; for (int i = 0; i < cnt; i++) { - q->pointers[q->tail] = ptrs[i]; //--? alternate use (q->tail + i)%(q->capacity + 1) as index and update q->tail at the end of loop - q->tail = (q->tail + 1)%(q->capacity + 1); + q->pointers[q->_tail] = ptrs[i]; //--? or (q->_tail + i)%(q->capacity + 1) as index and update q->_tail at end of loop + q->_tail = (q->_tail + 1)%(q->capacity + 1); } return cnt; } -int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs, size_t cnt) +int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs[], size_t cnt) { int filled_slots = q->capacity - spsc_queue_available(q); @@ -94,8 +93,8 @@ int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs, size_t cnt) cnt = filled_slots; for (int i = 0; i < cnt; i++) { - ptrs[i] = q->pointers[q->head]; - q->head = (q->head + 1)%(q->capacity + 1); + *ptrs[i] = q->pointers[q->_head]; + q->_head = (q->_head + 1)%(q->capacity + 1); } return cnt; @@ -103,8 +102,20 @@ int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs, size_t cnt) int spsc_queue_available(struct spsc_queue *q) //--? make this func inline { - if (q->tail < q->head) - return q->head - q->tail - 1; + if (q->_tail < q->_head) + return q->_head - q->_tail - 1; else - return q->head + (q->capacity - q->tail); -} \ No newline at end of file + return q->_head + (q->capacity - q->_tail); +} +/** +int spsc_debug(struct spsc_queue *q) +{ + printf("q->_tail %d, q->_head %d, q->capacity %lu\n", q->_tail, q->_head, q->capacity); + int temp_count = q->_head; + for (int i = 0; i < q->capacity - spsc_queue_available(q); i++) { + printf("Value of stored pointer %d: %p\n", i, q->pointers[q->_head]); + temp_count = (temp_count + 1)%(q->capacity + 1); + } + + return 0; +} */ \ No newline at end of file diff --git a/spsc_queue.h b/spsc_queue.h index a42a0f1..b44109e 100644 --- a/spsc_queue.h +++ b/spsc_queue.h @@ -49,19 +49,25 @@ struct spsc_queue { size_t capacity; /**< Total number of available pointers in queue::array */ /* Consumer part */ - atomic_int tail; /**< Tail pointer of queue*/ + _Atomic int _tail; /**< Tail pointer of queue*/ cacheline_pad_t _pad1; /* Producer part */ - atomic_int head; /**< Head pointer of queue*/ + _Atomic int _head; /**< Head pointer of queue*/ void *pointers[]; /**< Circular buffer. */ }; /** Initiliaze a new queue and allocate memory. */ -void * spsc_queue_init(size_t size, const struct memtype *mem); +struct spsc_queue * spsc_queue_init(struct spsc_queue *q, size_t size, const struct memtype *mem); /** Release memory of queue. */ -void spsc_queue_destroy(struct spsc_queue *q); +int spsc_queue_destroy(struct spsc_queue *q); + +/** Return the number of free slots in a queue + * + * Note: This is only an estimate! + */ +int spsc_queue_available(struct spsc_queue *q); /** Enqueue up to \p cnt elements from \p ptrs[] at the queue tail pointed by \p tail. * @@ -85,33 +91,29 @@ int spsc_queue_push_many(struct spsc_queue *q, void *ptrs[], size_t cnt); * @param[in,out] head A pointer to a queue head. The value will be updated to reflect the new head. * @return The number of elements which have been dequeued. */ -int spsc_queue_pull_many(struct spsc_queue *q, void *ptrs[], size_t cnt); +int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs[], size_t cnt); /** Fill \p ptrs with \p cnt elements of the queue starting at entry \p pos. */ -int spsc_queue_get_many(struct spsc_queue *q, void *ptrs[], size_t cnt); - -/** Get the first element in the queue */ -static inline int spsc_queue_get(struct spsc_queue *q, void **ptr) -{ - return spsc_queue_get_many(q, ptr, 1); -} +int spsc_queue_get_many(struct spsc_queue *q, void **ptrs[], size_t cnt); /** Enqueue a new block at the tail of the queue. */ -static inline int spsc_queue_push(struct spsc_queue *q, void **ptr) +static inline int spsc_queue_push(struct spsc_queue *q, void *ptr) { - return spsc_queue_push_many(q, ptr, 1); + return spsc_queue_push_many(q, &ptr, 1); } /** Dequeue the first block at the head of the queue. */ static inline int spsc_queue_pull(struct spsc_queue *q, void **ptr) { - return spsc_queue_pull_many(q, ptr, 1); + return spsc_queue_pull_many(q, &ptr, 1); } -/** Return the number of free slots in a queue - * - * Note: This is only an estimate! - */ -int spsc_queue_available(struct spsc_queue *q); +/** Get the first element in the queue */ +static inline int spsc_queue_get(struct spsc_queue *q, void **ptr) +{ + return spsc_queue_get_many(q, &ptr, 1); +} + +//int spsc_debug(struct spsc_queue *q); #endif /* _SPSC_QUEUE_H_ */ \ No newline at end of file diff --git a/spsc_test.c b/spsc_test.c deleted file mode 100644 index a278261..0000000 --- a/spsc_test.c +++ /dev/null @@ -1,131 +0,0 @@ -#include -#include - -/* Required to allocate hugepages on Apple OS X */ -#ifdef __MACH__ - #include -#endif - -/* There are not many compilers yet which support threads.h natively. - * We will fallback to a drop-in replacement which is based on pthreads.h */ -#include "c11threads.h" - -#include "spsc_queue.h" - -#define N 2000000 - -/* Static global storage */ -int fibs[N]; - -int producer(void *ctx) -{ - struct spsc_queue *q = (struct spsc_queue *) ctx; - - /* Enqueue */ - for (int count = 0, n1 = 0, n2 = 1, r; count < N; count++) { - fibs[count] = n1 + n2; - - void *fibptr = (void *) &fibs[count]; - - r = spsc_queue_push(q, &fibptr); - if (r != 1) { - printf("Queue push failed\n"); - return -1; - } - - n1 = n2; n2 = fibs[count]; - } - - return 0; -} - -int consumer(void *ctx) -{ - struct spsc_queue *q = (struct spsc_queue *) ctx; - - /* Dequeue */ - for (int count = 0, n1 = 0, n2 = 1, r; count < N; count++) { - int fib = n1 + n2; - int *pulled; - - r = spsc_queue_pull(q, (void **) &pulled); - if (r != 1) { - printf("Queue pull failed: %d\n", r); - return -1; - } - - if (*pulled != fib) { - printf("Pulled != fib\n"); - return -1; - } - - n1 = n2; n2 = fib; - } - - - return 0; -} - -int test_single_threaded(struct spsc_queue *q) -{ - int resp, resc; - - resp = producer(q); - if (resp) - printf("Enqueuing failed"); - - resc = consumer(q); - if (resc) - printf("Consumer failed"); - - if (resc || resp) - printf("Single Thread Test Failed"); - else - printf("Single Thread Test Complete\n"); - - return 0; -} - -int test_multi_threaded(struct spsc_queue *q) -{ - thrd_t thrp, thrc; - int resp, resc; - - thrd_create(&thrp, producer, q); - thrd_create(&thrc, consumer, q); - - thrd_join(thrp, &resp); - thrd_join(thrc, &resc); - - if (resc || resp) - printf("Queue Test failed"); - else - printf("Two-thread Test Complete\n"); - - return 0; -} - -void queue_info(struct spsc_queue *q) -{ - printf("Queue tail %d, queue head %d, queue capacity %lu, Queue free slots %d\n", q->tail, q->head, q->capacity, spsc_queue_available(q)); -} - -int main(int argc, char *argv[]) -{ - struct spsc_queue *q; - size_t queue_size = 1 << 20; // 16 MiB; - - q = spsc_queue_init(q, queue_size, &memtype_hugepage); - - if (q) { //--? pass &q ....**ptr - printf("Error in initialization\n"); - return -1; - } - - queue_info(q); - - test_single_threaded(q); - test_multi_threaded(q); - - return 0; -} \ No newline at end of file diff --git a/spsc_test_fib.c b/spsc_test_fib.c new file mode 100644 index 0000000..0d2d53c --- /dev/null +++ b/spsc_test_fib.c @@ -0,0 +1,195 @@ +#ifdef __linux__ + #define _GNU_SOURCE +#endif + +#include +#include +#include +#include +#include + +#include "spsc_queue.h" +#include "memory.h" +#include "c11threads.h" + +/* Usage example */ + +#define N 20000000 + +int volatile g_start = 0; + +/** Get thread id as integer + * In contrast to pthread_t which is an opaque type */ +uint64_t thread_get_id() +{ +#ifdef __MACH__ + uint64_t id; + pthread_threadid_np(pthread_self(), &id); + return id; +#elif defined(SYS_gettid) + return (int) syscall(SYS_gettid); +#endif + return -1; +} + +/** Get CPU timestep counter */ +__attribute__((always_inline)) static inline uint64_t rdtscp() +{ + uint64_t tsc; + + __asm__ ("rdtscp;" + "shl $32, %%rdx;" + "or %%rdx,%%rax" + : "=a" (tsc) + : + : "%rcx", "%rdx", "memory"); + + return tsc; +} + +/** Sleep, do nothing */ +__attribute__((always_inline)) static inline void nop() +{ + __asm__("rep nop;"); +} + +/* Static global storage */ +int fibs[N]; + +int producer(void *ctx) +{ + printf("producer\n"); //DELETEME + struct spsc_queue *q = (struct spsc_queue *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t pause = rand() % 1000; + + /* Wait for global start signal */ + while (g_start == 0) + thrd_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != pause; i += 1) + nop(); + + /* Enqueue */ + for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) { + fibs[count] = n1 + n2; + + void *fibptr = (void *) &fibs[count]; + + if (!spsc_queue_push(q, fibptr)) { + printf("Queue push failed\n"); + return -1; + } + + n1 = n2; n2 = fibs[count]; + } + + return 0; +} + +int consumer(void *ctx) +{ + printf("consumer\n"); //DELETEME + struct spsc_queue *q = (struct spsc_queue *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t pause = rand() % 1000; + + /* Wait for global start signal */ + while (g_start == 0) + thrd_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != pause; i += 1) + nop(); + + /* Dequeue */ + for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) { + int fib = n1 + n2; + int *pulled; + + while (!spsc_queue_pull(q, (void **) &pulled)) { + //printf("Queue empty: %d\n", temp); + //return -1; + } + + if (*pulled != fib) { + printf("Pulled != fib\n"); + return -1; + } + + n1 = n2; n2 = fib; + } + + return 0; +} + +int test_single_threaded(struct spsc_queue *q) +{ + int resp, resc; + + g_start = 1; + + resp = producer(q); + if (resp) + printf("Enqueuing failed"); + + resc = consumer(q); + if (resc) + printf("Consumer failed"); + + if (resc || resp) + printf("Single Thread Test Failed"); + else + printf("Single Thread Test Complete\n"); + + return 0; +} + +int test_multi_threaded(struct spsc_queue *q) +{ + thrd_t thrp, thrc; + int resp, resc; + + thrd_create(&thrp, consumer, q); /** @todo Why producer thread runs earlier? */ + thrd_create(&thrc, producer, q); + + sleep(1); + + uint64_t start = rdtscp(); + g_start = 1; + + thrd_join(thrp, &resp); + thrd_join(thrc, &resc); + + uint64_t end = rdtscp(); + + if (resc || resp) + printf("Queue Test failed"); + else + printf("Two-thread Test Complete\n"); + + printf("cycles/op = %lu\n", (end - start) / N ); + + if (spsc_queue_available(q) != q->capacity) + printf("slots in use? There is something wrong with the test %d\n", spsc_queue_available(q)); + + return 0; +} + +int main() +{ + struct spsc_queue * q = NULL; + q = spsc_queue_init(q, 1<<20, &memtype_hugepage); /** @todo change size>1 in case of bounded queue impl. memtype_hugepage impl for un_spsc */ + + //test_single_threaded(q); + test_multi_threaded(q); + + int ret = spsc_queue_destroy(q); + if (ret) + printf("Failed to destroy queue: %d", ret); + + return 0; +}