diff --git a/.gitignore b/.gitignore index 86eafd9..2c0ecf2 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,9 @@ *.so *~ -spsc_ub_test -spsc_test +spsc_ub_test_fib +spsc_ub_test_default +spsc_test_fib mpmc_test +mpmc_test_fib +gmon.out diff --git a/Makefile b/Makefile index 11794e5..059c682 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,8 @@ -TARGETS = spsc_ub_test mpmc_test spsc_test +TARGETS = spsc_ub_test_fib spsc_ub_test_default mpmc_test mpmc_test_fib spsc_test_fib CFLAGS = -Wall -std=c11 +ifeq ($(shell uname), Linux) + LIBS = -pthread +endif DEBUG ?= 1 @@ -13,13 +16,19 @@ endif all: $(TARGETS) -spsc_ub_test: spsc_ub_test.o spsc_ub_queue.o memory.o +spsc_ub_test_fib: spsc_ub_test_fib.o spsc_ub_queue.o memory.o + $(CC) $^ -Wall $(LIBS) -o $@ + +spsc_ub_test_default: spsc_ub_test_default.o spsc_ub_queue.o memory.o $(CC) $^ -Wall $(LIBS) -o $@ mpmc_test: mpmc_test.o mpmc_queue.o memory.o $(CC) $^ -Wall $(LIBS) -o $@ + +mpmc_test_fib: mpmc_test_fib.o mpmc_queue.o memory.o + $(CC) $^ -Wall $(LIBS) -o $@ -spsc_test: spsc_test.o spsc_queue.o memory.o +spsc_test_fib: spsc_test_fib.o spsc_queue.o memory.o $(CC) $^ -Wall $(LIBS) -o $@ %.o: %.c diff --git a/README.md b/README.md index 7a8c9fb..66b7319 100644 --- a/README.md +++ b/README.md @@ -29,9 +29,31 @@ This Git repository contains lightweight simple implementation of lockless SPSC ## Credits -- Umar Farooq +- Umar Farooq <umar1.farooq1@gmail.com> - Steffen Vogel <post@steffenvogel.de> +## Initial Testing Results +All tests are using mem_heap. All tests use *_fib test. + +With clock_gettime function for cycle/op measurement: +Octopus Machine: + MPMC: 52-54 cycles/op --Something wrong, results too good to be true?? + Bounded SPSC: 230-300 cycles/op + Unbounded SPSC: 300-400 cycles/op + Side note: MPMC default test gets stuck with N=20000000, runs ok with N=2000000 with 160 cycles/op +Ubuntu VM: + MPMC: 90 cycles/op --Again result too good to be true in my opinion + Bounded SPSC: 60-65 cycles/op + Unbounded SPSC: 170-175 cycles/op + +With rdtscp function for cycle/op measurement: +Octopus Machine: Doesn't support rdtscp function, illegal instruction error. So alternate clock_gettime was used as above. + +Ubuntu VM: + MPMC: 160-200 cycles/op + Bounded SPSC: 160-165 cycles/op + Unbounded SPSC: 400-420 cycles/op + ## License #### BSD 2-Clause License @@ -53,4 +75,4 @@ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -``` \ No newline at end of file +``` diff --git a/memory.c b/memory.c index e7c01e3..dc1b379 100644 --- a/memory.c +++ b/memory.c @@ -1,9 +1,13 @@ /** * */ - +#ifdef __linux__ + #define _GNU_SOURCE +#endif + #include <stdlib.h> #include <sys/mman.h> +#include <stdio.h> //DELETEME /* Required to allocate hugepages on Apple OS X */ #ifdef __MACH__ diff --git a/memory.h b/memory.h index 19711f7..22edabd 100644 --- a/memory.h +++ b/memory.h @@ -5,6 +5,11 @@ #ifndef _MEMORY_H_ #define _MEMORY_H_ +#ifdef __linux__ + #include <stdint.h> + #include <stddef.h> +#endif + typedef void *(*memzone_allocator_t)(size_t); typedef int (*memzone_deallocator_t)(void *, size_t); diff --git a/mpmc_queue.h b/mpmc_queue.h index 6663f2b..172fb41 100644 --- a/mpmc_queue.h +++ b/mpmc_queue.h @@ -39,8 +39,9 @@ #include "memory.h" -static size_t const cacheline_size = 64; -typedef char cacheline_pad_t[cacheline_size]; +#define CACHELINE_SIZE 64 + +typedef char cacheline_pad_t[CACHELINE_SIZE]; struct mpmc_queue { cacheline_pad_t _pad0; /**< Shared area: all threads read */ diff --git a/mpmc_test.c b/mpmc_test.c index b79c2bc..e6dac4c 100644 --- a/mpmc_test.c +++ b/mpmc_test.c @@ -1,7 +1,12 @@ +#ifdef __linux__ + #define _GNU_SOURCE +#endif + #include <stdio.h> #include <stdlib.h> #include <stdint.h> #include <unistd.h> +#include <time.h> /* There are not many compilers yet which support threads.h natively. * We will fallback to a drop-in replacement which is based on pthreads.h */ @@ -19,7 +24,7 @@ size_t const thread_count = 4; size_t const batch_size = 10; -size_t const iter_count = 20000000; +size_t const iter_count = 2000000; size_t const queue_size = 1 << 20; int volatile g_start = 0; @@ -32,9 +37,10 @@ uint64_t thread_get_id() uint64_t id; pthread_threadid_np(pthread_self(), &id); return id; -#elif defined(__linux__) - return (int) gettid(); +#elif defined(SYS_gettid) + return (int) syscall(SYS_gettid); #endif + return -1; } /** Get CPU timestep counter */ @@ -49,6 +55,7 @@ __attribute__((always_inline)) static inline uint64_t rdtscp() : : "%rcx", "%rdx", "memory"); + printf("tsc %lu\n", tsc); return tsc; } @@ -96,22 +103,30 @@ int main() thrd_t threads[thread_count]; int ret; - mpmc_queue_init(&queue, queue_size, &memtype_hugepage); + mpmc_queue_init(&queue, queue_size, &memtype_heap); for (int i = 0; i != thread_count; ++i) thrd_create(&threads[i], thread_func, &queue); sleep(1); + + long long starttime, endtime; + struct timespec start, end; + + if(clock_gettime(CLOCK_REALTIME, &start)) + return -1; - uint64_t start = rdtscp(); g_start = 1; for (int i = 0; i != thread_count; ++i) thrd_join(threads[i], NULL); - uint64_t end = rdtscp(); + if(clock_gettime(CLOCK_REALTIME, &end)) + return -1; - printf("cycles/op = %llu\n", (end - start) / (batch_size * iter_count * 2 * thread_count)); + starttime = start.tv_sec*1000000000LL + start.tv_nsec; + endtime = end.tv_sec*1000000000LL + end.tv_nsec; + printf("cycles/op = %lld\n", (endtime - starttime) / (batch_size * iter_count * 2 * thread_count)); size_t used = mpmc_queue_available(&queue); if (used > 0) @@ -122,4 +137,4 @@ int main() printf("Failed to destroy queue: %d", ret); return 0; -} \ No newline at end of file +} diff --git a/mpmc_test_fib.c b/mpmc_test_fib.c new file mode 100644 index 0000000..7b5e5f1 --- /dev/null +++ b/mpmc_test_fib.c @@ -0,0 +1,198 @@ +#ifdef __linux__ + #define _GNU_SOURCE +#endif + +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <stdint.h> +#include <stddef.h> + +#include "mpmc_queue.h" +#include "memory.h" +#include "c11threads.h" + +/* Usage example */ + +#define N 20000000 + +int volatile g_start = 0; + +/** Get thread id as integer + * In contrast to pthread_t which is an opaque type */ +uint64_t thread_get_id() +{ +#ifdef __MACH__ + uint64_t id; + pthread_threadid_np(pthread_self(), &id); + return id; +#elif defined(SYS_gettid) + return (int) syscall(SYS_gettid); +#endif + return -1; +} + +/** Get CPU timestep counter */ +__attribute__((always_inline)) static inline uint64_t rdtscp() +{ + uint64_t tsc; + /** @todo not recommended to use rdtsc on multicore machine */ + __asm__ ("rdtsc;" + "shl $32, %%rdx;" + "or %%rdx,%%rax" + : "=a" (tsc) + : + : "%rcx", "%rdx", "memory"); + + return tsc; +} + +/** Sleep, do nothing */ +__attribute__((always_inline)) static inline void nop() +{ + __asm__("rep nop;"); +} + +/* Static global storage */ +int fibs[N]; + +int producer(void *ctx) +{ + //printf("producer\n"); //DELETEME + struct mpmc_queue *q = (struct mpmc_queue *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t pause = rand() % 1000; + + /* Wait for global start signal */ + while (g_start == 0) + thrd_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != pause; i += 1) + nop(); + + /* Enqueue */ + for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) { + fibs[count] = n1 + n2; + + void *fibptr = (void *) &fibs[count]; + + if (!mpmc_queue_push(q, fibptr)) { + printf("Queue push failed at count %lu\n", count); + return -1; + } + + n1 = n2; n2 = fibs[count]; + } + + return 0; +} + +int consumer(void *ctx) +{ + //printf("consumer\n"); //DELETEME + struct mpmc_queue *q = (struct mpmc_queue *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t pause = rand() % 1000; + + /* Wait for global start signal */ + while (g_start == 0) + thrd_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != pause; i += 1) + nop(); + + /* Dequeue */ + for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) { + int fib = n1 + n2; + int *pulled; + + while (!mpmc_queue_pull(q, (void **) &pulled)) { + //printf("Queue empty: %d\n", temp); + //return -1; + } + + if (*pulled != fib) { + printf("Pulled != fib\n"); + return -1; + } + + n1 = n2; n2 = fib; + } + + return 0; +} + +int test_single_threaded(struct mpmc_queue *q) +{ + int resp, resc; + g_start = 1; + + resp = producer(q); + if (resp) + printf("Enqueuing failed\n"); + + resc = consumer(q); + if (resc) + printf("Consumer failed\n"); + + if (resc || resp) + printf("Single Thread Test Failed\n"); + else + printf("Single Thread Test Complete\n"); + + return 0; +} + +int test_multi_threaded(struct mpmc_queue *q) +{ + thrd_t thrp, thrc; + int resp, resc; + + g_start = 0; + + thrd_create(&thrp, consumer, q); /** @todo Why producer thread runs earlier? */ + thrd_create(&thrc, producer, q); + + sleep(1); + + uint64_t start_tsc_time, end_tsc_time; + + start_tsc_time = rdtscp(); + g_start = 1; + + thrd_join(thrp, &resp); + thrd_join(thrc, &resc); + + end_tsc_time = rdtscp(); + + if (resc || resp) + printf("Queue Test failed\n"); + else + printf("Two-thread Test Complete\n"); + + printf("cycles/op for rdtsc %lu\n", (end_tsc_time - start_tsc_time)/N); + + size_t used = mpmc_queue_available(q); + if (used > 0) + printf("%zu slots in use? There is something wrong with the test\n", used); + + int ret = mpmc_queue_destroy(q); + if (ret) + printf("Failed to destroy queue: %d\n", ret); + + return 0; +} + +int main() +{ + struct mpmc_queue q; + mpmc_queue_init(&q, 1<<20, &memtype_heap); /** @todo change size>1 in case of bounded queue impl. memtype_hugepage impl for un_spsc */ + + test_multi_threaded(&q); + + return 0; +} diff --git a/spsc_queue.c b/spsc_queue.c index 774cbe0..fa2cb37 100644 --- a/spsc_queue.c +++ b/spsc_queue.c @@ -1,7 +1,7 @@ /** Lock-free Single-Producer Single-consumer (SPSC) queue. * * @author Umar Farooq - * @copyright 2016 Umar Farooq + * @copyright 2016 Umar Farooq <umar1.farooq1@gmail.com> * @license BSD 2-Clause License * * All rights reserved. @@ -30,63 +30,63 @@ #include "spsc_queue.h" -void * spsc_queue_init(size_t size, const struct memtype *mem) +struct spsc_queue * spsc_queue_init(struct spsc_queue * q, size_t size, const struct memtype *mem) { - struct spsc_queue *q; - - if (size < sizeof(struct spsc_queue) + 2 * sizeof(q->pointers[0])) - return NULL; - /* Queue size must be 2 exponent */ if ((size < 2) || ((size & (size - 1)) != 0)) return NULL; - q = memory_alloc(mem, sizeof(struct spsc_queue) + sizeof(q->pointers[0]) * size); + q = memory_alloc(mem, sizeof(struct spsc_queue) + (sizeof(q->pointers[0]) * size)); + if (!q) + return NULL; + + q->mem = mem; q->capacity = size - 1; - - atomic_init(&q->tail, 0); - atomic_init(&q->head, 0); + + atomic_init(&q->_tail, 0); + atomic_init(&q->_head, 0); return q; } -void spsc_queue_destroy(struct spsc_queue *q) +int spsc_queue_destroy(struct spsc_queue *q) { - /* Nothing to do here */ - return; + const struct memtype mem = *(q->mem); /** @todo Memory is not being freed properly??? */ + return memory_free(&mem, q, sizeof(struct spsc_queue) + ((q->capacity + 1) * sizeof(q->pointers[0]))); } -int spsc_queue_get_many(struct spsc_queue *q, void *ptrs[], size_t cnt) +int spsc_queue_get_many(struct spsc_queue *q, void **ptrs[], size_t cnt) { if (q->capacity <= spsc_queue_available(q)) cnt = 0; else if (cnt > q->capacity - spsc_queue_available(q)) cnt = q->capacity - spsc_queue_available(q); + /**@todo Is atomic_load_explicit needed here for loading q->_head? */ for (int i = 0; i < cnt; i++) - ptrs[i] = q->pointers[q->head % (q->capacity + 1)]; + ptrs[i] = &(q->pointers[q->_head % (q->capacity + 1)]); return cnt; } -int spsc_queue_push_many(struct spsc_queue *q, void **ptrs, size_t cnt) +int spsc_queue_push_many(struct spsc_queue *q, void *ptrs[], size_t cnt) { - //int free_slots = q->tail < q->head ? q->head - q->tail - 1 : q->head + (q->capacity - q->tail); + //int free_slots = q->_tail < q->_head ? q->_head - q->_tail - 1 : q->_head + (q->capacity - q->_tail); size_t free_slots = spsc_queue_available(q); if (cnt > free_slots) cnt = free_slots; for (size_t i = 0; i < cnt; i++) { - q->pointers[q->tail] = ptrs[i]; //--? alternate use (q->tail + i)%(q->capacity + 1) as index and update q->tail at the end of loop - q->tail = (q->tail + 1)%(q->capacity + 1); + q->pointers[q->_tail] = ptrs[i]; + atomic_store_explicit(&q->_tail, (q->_tail + 1)%(q->capacity + 1), memory_order_release); } return cnt; } -int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs, size_t cnt) +int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs[], size_t cnt) { if (q->capacity <= spsc_queue_available(q)) cnt = 0; @@ -94,17 +94,17 @@ int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs, size_t cnt) cnt = q->capacity - spsc_queue_available(q); for (size_t i = 0; i < cnt; i++) { - ptrs[i] = q->pointers[q->head]; - q->head = (q->head + 1)%(q->capacity + 1); + *ptrs[i] = q->pointers[q->_head]; + atomic_store_explicit(&q->_head, (q->_head + 1)%(q->capacity + 1), memory_order_release); } return cnt; } -int spsc_queue_available(struct spsc_queue *q) //--? make this func inline +int spsc_queue_available(struct spsc_queue *q) { - if (q->tail < q->head) - return q->head - q->tail - 1; + if (atomic_load_explicit(&q->_tail, memory_order_acquire) < atomic_load_explicit(&q->_head, memory_order_acquire)) + return q->_head - q->_tail - 1; else - return q->head + (q->capacity - q->tail); -} \ No newline at end of file + return q->_head + (q->capacity - q->_tail); +} diff --git a/spsc_queue.h b/spsc_queue.h index a42a0f1..b2ef939 100644 --- a/spsc_queue.h +++ b/spsc_queue.h @@ -49,19 +49,25 @@ struct spsc_queue { size_t capacity; /**< Total number of available pointers in queue::array */ /* Consumer part */ - atomic_int tail; /**< Tail pointer of queue*/ + _Atomic int _tail; /**< Tail pointer of queue*/ cacheline_pad_t _pad1; /* Producer part */ - atomic_int head; /**< Head pointer of queue*/ + _Atomic int _head; /**< Head pointer of queue*/ void *pointers[]; /**< Circular buffer. */ }; /** Initiliaze a new queue and allocate memory. */ -void * spsc_queue_init(size_t size, const struct memtype *mem); +struct spsc_queue * spsc_queue_init(struct spsc_queue *q, size_t size, const struct memtype *mem); /** Release memory of queue. */ -void spsc_queue_destroy(struct spsc_queue *q); +int spsc_queue_destroy(struct spsc_queue *q); + +/** Return the number of free slots in a queue + * + * Note: This is only an estimate! + */ +int spsc_queue_available(struct spsc_queue *q); /** Enqueue up to \p cnt elements from \p ptrs[] at the queue tail pointed by \p tail. * @@ -85,33 +91,27 @@ int spsc_queue_push_many(struct spsc_queue *q, void *ptrs[], size_t cnt); * @param[in,out] head A pointer to a queue head. The value will be updated to reflect the new head. * @return The number of elements which have been dequeued. */ -int spsc_queue_pull_many(struct spsc_queue *q, void *ptrs[], size_t cnt); +int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs[], size_t cnt); /** Fill \p ptrs with \p cnt elements of the queue starting at entry \p pos. */ -int spsc_queue_get_many(struct spsc_queue *q, void *ptrs[], size_t cnt); - -/** Get the first element in the queue */ -static inline int spsc_queue_get(struct spsc_queue *q, void **ptr) -{ - return spsc_queue_get_many(q, ptr, 1); -} +int spsc_queue_get_many(struct spsc_queue *q, void **ptrs[], size_t cnt); /** Enqueue a new block at the tail of the queue. */ -static inline int spsc_queue_push(struct spsc_queue *q, void **ptr) +static inline int spsc_queue_push(struct spsc_queue *q, void *ptr) { - return spsc_queue_push_many(q, ptr, 1); + return spsc_queue_push_many(q, &ptr, 1); } /** Dequeue the first block at the head of the queue. */ static inline int spsc_queue_pull(struct spsc_queue *q, void **ptr) { - return spsc_queue_pull_many(q, ptr, 1); + return spsc_queue_pull_many(q, &ptr, 1); } -/** Return the number of free slots in a queue - * - * Note: This is only an estimate! - */ -int spsc_queue_available(struct spsc_queue *q); +/** Get the first element in the queue */ +static inline int spsc_queue_get(struct spsc_queue *q, void **ptr) +{ + return spsc_queue_get_many(q, &ptr, 1); +} #endif /* _SPSC_QUEUE_H_ */ \ No newline at end of file diff --git a/spsc_test.c b/spsc_test.c deleted file mode 100644 index a278261..0000000 --- a/spsc_test.c +++ /dev/null @@ -1,131 +0,0 @@ -#include <sys/mman.h> -#include <stdio.h> - -/* Required to allocate hugepages on Apple OS X */ -#ifdef __MACH__ - #include <mach/vm_statistics.h> -#endif - -/* There are not many compilers yet which support threads.h natively. - * We will fallback to a drop-in replacement which is based on pthreads.h */ -#include "c11threads.h" - -#include "spsc_queue.h" - -#define N 2000000 - -/* Static global storage */ -int fibs[N]; - -int producer(void *ctx) -{ - struct spsc_queue *q = (struct spsc_queue *) ctx; - - /* Enqueue */ - for (int count = 0, n1 = 0, n2 = 1, r; count < N; count++) { - fibs[count] = n1 + n2; - - void *fibptr = (void *) &fibs[count]; - - r = spsc_queue_push(q, &fibptr); - if (r != 1) { - printf("Queue push failed\n"); - return -1; - } - - n1 = n2; n2 = fibs[count]; - } - - return 0; -} - -int consumer(void *ctx) -{ - struct spsc_queue *q = (struct spsc_queue *) ctx; - - /* Dequeue */ - for (int count = 0, n1 = 0, n2 = 1, r; count < N; count++) { - int fib = n1 + n2; - int *pulled; - - r = spsc_queue_pull(q, (void **) &pulled); - if (r != 1) { - printf("Queue pull failed: %d\n", r); - return -1; - } - - if (*pulled != fib) { - printf("Pulled != fib\n"); - return -1; - } - - n1 = n2; n2 = fib; - } - - - return 0; -} - -int test_single_threaded(struct spsc_queue *q) -{ - int resp, resc; - - resp = producer(q); - if (resp) - printf("Enqueuing failed"); - - resc = consumer(q); - if (resc) - printf("Consumer failed"); - - if (resc || resp) - printf("Single Thread Test Failed"); - else - printf("Single Thread Test Complete\n"); - - return 0; -} - -int test_multi_threaded(struct spsc_queue *q) -{ - thrd_t thrp, thrc; - int resp, resc; - - thrd_create(&thrp, producer, q); - thrd_create(&thrc, consumer, q); - - thrd_join(thrp, &resp); - thrd_join(thrc, &resc); - - if (resc || resp) - printf("Queue Test failed"); - else - printf("Two-thread Test Complete\n"); - - return 0; -} - -void queue_info(struct spsc_queue *q) -{ - printf("Queue tail %d, queue head %d, queue capacity %lu, Queue free slots %d\n", q->tail, q->head, q->capacity, spsc_queue_available(q)); -} - -int main(int argc, char *argv[]) -{ - struct spsc_queue *q; - size_t queue_size = 1 << 20; // 16 MiB; - - q = spsc_queue_init(q, queue_size, &memtype_hugepage); - - if (q) { //--? pass &q ....**ptr - printf("Error in initialization\n"); - return -1; - } - - queue_info(q); - - test_single_threaded(q); - test_multi_threaded(q); - - return 0; -} \ No newline at end of file diff --git a/spsc_test_fib.c b/spsc_test_fib.c new file mode 100644 index 0000000..d039623 --- /dev/null +++ b/spsc_test_fib.c @@ -0,0 +1,198 @@ +#ifdef __linux__ + #define _GNU_SOURCE +#endif + +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <stdint.h> +#include <stddef.h> + +#include "spsc_queue.h" +#include "memory.h" +#include "c11threads.h" + +/* Usage example */ + +#define N 20000000 + +int volatile g_start = 0; + +/** Get thread id as integer + * In contrast to pthread_t which is an opaque type */ +uint64_t thread_get_id() +{ +#ifdef __MACH__ + uint64_t id; + pthread_threadid_np(pthread_self(), &id); + return id; +#elif defined(SYS_gettid) + return (int) syscall(SYS_gettid); +#endif + return -1; +} + +/** Get CPU timestep counter */ +__attribute__((always_inline)) static inline uint64_t rdtscp() +{ + uint64_t tsc; + /** @todo not recommended to use rdtsc on multicore machine */ + __asm__ ("rdtsc;" + "shl $32, %%rdx;" + "or %%rdx,%%rax" + : "=a" (tsc) + : + : "%rcx", "%rdx", "memory"); + + return tsc; +} + +/** Sleep, do nothing */ +__attribute__((always_inline)) static inline void nop() +{ + __asm__("rep nop;"); +} + +/* Static global storage */ +int fibs[N]; + +int producer(void *ctx) +{ + printf("producer\n"); //DELETEME + struct spsc_queue *q = (struct spsc_queue *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t pause = rand() % 1000; + + /* Wait for global start signal */ + while (g_start == 0) + thrd_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != pause; i += 1) + nop(); + + /* Enqueue */ + for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) { + fibs[count] = n1 + n2; + + void *fibptr = (void *) &fibs[count]; + + if (!spsc_queue_push(q, fibptr)) { + printf("Queue push failed at count %lu, %d, free slots %d\n", count, 1<<20, spsc_queue_available(q)); + return -1; + } + + n1 = n2; n2 = fibs[count]; + } + + return 0; +} + +int consumer(void *ctx) +{ + printf("consumer\n"); //DELETEME + struct spsc_queue *q = (struct spsc_queue *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t pause = rand() % 1000; + + /* Wait for global start signal */ + while (g_start == 0) + thrd_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != pause; i += 1) + nop(); + + /* Dequeue */ + for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) { + int fib = n1 + n2; + int *pulled; + + while (!spsc_queue_pull(q, (void **) &pulled)) { + //printf("Queue empty: %d\n", temp); + //return -1; + } + + if (*pulled != fib) { + printf("Pulled != fib\n"); + return -1; + } + + n1 = n2; n2 = fib; + } + + return 0; +} + +int test_single_threaded(struct spsc_queue *q) +{ + int resp, resc; + g_start = 1; + resp = producer(q); + if (resp) + printf("Enqueuing failed\n"); + + resc = consumer(q); + if (resc) + printf("Consumer failed\n"); + + if (resc || resp) + printf("Single Thread Test Failed\n"); + else + printf("Single Thread Test Complete\n"); + + return 0; +} + +int test_multi_threaded(struct spsc_queue *q) +{ + thrd_t thrp, thrc; + int resp, resc; + + g_start = 0; + + thrd_create(&thrp, consumer, q); /** @todo Why producer thread runs earlier? */ + thrd_create(&thrc, producer, q); + + sleep(1); + + uint64_t start_tsc_time, end_tsc_time; + + start_tsc_time = rdtscp(); + g_start = 1; + + thrd_join(thrp, &resp); + thrd_join(thrc, &resc); + + end_tsc_time = rdtscp(); + + if (resc || resp) + printf("Queue Test failed\n"); + else + printf("Two-thread Test Complete\n"); + + printf("cycles/op for rdtsc %lu\n", (end_tsc_time - start_tsc_time)/N); + + size_t used = spsc_queue_available(q); + if (spsc_queue_available(q) != q->capacity) + printf("%zu slots in use? There is something wrong with the test\n", used); + + int ret = spsc_queue_destroy(q); + if (ret) + printf("Failed to destroy queue: %d\n", ret); + + return 0; +} + +int main() +{ + struct spsc_queue* q = NULL; + q = spsc_queue_init(q, 1<<24, &memtype_heap); + + test_multi_threaded(q); + //test_single_threaded(q); /** Single threaded test fails with N > queue size*/ + + return 0; +} diff --git a/spsc_ub_queue.c b/spsc_ub_queue.c index a4c9713..e776310 100644 --- a/spsc_ub_queue.c +++ b/spsc_ub_queue.c @@ -33,18 +33,24 @@ #include "spsc_ub_queue.h" -void spsc_ub_queue_init(struct spsc_ub_queue* q, size_t size, const struct memtype *mem) +int spsc_ub_queue_init(struct spsc_ub_queue* q, size_t size, const struct memtype *mem) { - struct node* n = memory_alloc(q->mem, sizeof(struct node) * size); - q->mem = mem; + struct node* n = memory_alloc(q->mem, sizeof(struct node) * size); n->_next = NULL; q->_tail = q->_head = q->_first= q->_tailcopy = n; - return; + /** Alloc memory at start for total size for efficiency */ + void *v = NULL; + for(unsigned long i = 0; i < size; i++) /** @todo fix this hack in bounded implementation */ + spsc_ub_queue_push(q, v); + for(unsigned long i = 0; i < size; i++) + spsc_ub_queue_pull(q, &v); + + return 0; } -void spsc_ub_queue_destroy(struct spsc_ub_queue* q) +int spsc_ub_queue_destroy(struct spsc_ub_queue* q) { struct node* n = q->_first; @@ -53,6 +59,8 @@ void spsc_ub_queue_destroy(struct spsc_ub_queue* q) memory_free(q->mem, (void *) n, sizeof(struct node)); n = next; } while (n); + + return 0; } struct node* spsc_ub_alloc_node(struct spsc_ub_queue* q) @@ -67,7 +75,7 @@ struct node* spsc_ub_alloc_node(struct spsc_ub_queue* q) } //q->_tailcopy = load_consume(q->_tail); - q->_tailcopy = atomic_load_explicit(&q->_tail, memory_order_consume); + q->_tailcopy = atomic_load_explicit(&q->_tail, memory_order_acquire); if (q->_first != q->_tailcopy) { struct node* n = q->_first; @@ -78,11 +86,12 @@ struct node* spsc_ub_alloc_node(struct spsc_ub_queue* q) return (struct node*) memory_alloc(q->mem, sizeof(struct node)); } -void spsc_ub_enqueue(struct spsc_ub_queue* q, void * v) +int spsc_ub_queue_push(struct spsc_ub_queue* q, void * v) { struct node* n = spsc_ub_alloc_node(q); - n->_next = NULL; + atomic_store_explicit(&(n->_next), NULL, memory_order_release); + //n->_next = NULL; n->_value = v; //store_release(&(q->_head->_next), n); @@ -90,19 +99,19 @@ void spsc_ub_enqueue(struct spsc_ub_queue* q, void * v) q->_head = n; - return; + return 1; } -int spsc_ub_dequeue(struct spsc_ub_queue* q, void** v) +int spsc_ub_queue_pull(struct spsc_ub_queue* q, void** v) { - if (atomic_load_explicit(&(q->_tail->_next), memory_order_consume)) { + if (atomic_load_explicit(&(q->_tail->_next), memory_order_acquire)) { *v = q->_tail->_next->_value; //store_release(&q->_tail, q->_tail->_next); atomic_store_explicit(&q->_tail, q->_tail->_next, memory_order_release); - return 0; + return 1; } - return -1; + return 0; } diff --git a/spsc_ub_queue.h b/spsc_ub_queue.h index 870428e..8635d41 100644 --- a/spsc_ub_queue.h +++ b/spsc_ub_queue.h @@ -38,8 +38,9 @@ #include "memory.h" -static size_t const cacheline_size = 64; -typedef char cacheline_pad_t[cacheline_size]; +//static size_t const cacheline_size = 64; +#define CACHELINE_SIZE 64 +typedef char cacheline_pad_t[CACHELINE_SIZE]; struct node { struct node * _Atomic _next; /**> Single linked list of nodes */ @@ -62,19 +63,33 @@ struct spsc_ub_queue /* Producer part * accessed only by producer */ - struct node* _Atomic _head; /**> Head of the queue. */ - struct node* _Atomic _first; /**> Last unused node (tail of node cache). */ - struct node* _Atomic _tailcopy; /**> Helper which points somewhere between _first and _tail */ + struct node* _head; /**> Head of the queue. */ + cacheline_pad_t _pad2; + struct node* _first; /**> Last unused node (tail of node cache). */ + cacheline_pad_t _pad3; + struct node* _tailcopy; /**> Helper which points somewhere between _first and _tail */ + cacheline_pad_t _pad4; }; -void spsc_ub_queue_init(struct spsc_ub_queue *q, size_t size, const struct memtype *mem); +/** Initialize SPSC queue */ +int spsc_ub_queue_init(struct spsc_ub_queue *q, size_t size, const struct memtype *mem); -void spsc_ub_queue_destroy(struct spsc_ub_queue *q); +/** Destroy SPSC queue and release memory */ +int spsc_ub_queue_destroy(struct spsc_ub_queue *q); +/** Allocate memory for new node. Each node stores a pointer + * value pushed to unbounded SPSC queue + */ struct node * spsc_ub_alloc_node(struct spsc_ub_queue *q); -void spsc_ub_enqueue(struct spsc_ub_queue *q, void *v); +/** Push a value from unbounded SPSC queue + * return : 1 always as its an unbounded queue + */ +int spsc_ub_queue_push(struct spsc_ub_queue *q, void *v); -int spsc_ub_dequeue(struct spsc_ub_queue *q, void **v); +/** Pull a value from unbounded SPSC queue + * return : 1 if success else 0 + */ +int spsc_ub_queue_pull(struct spsc_ub_queue *q, void **v); #endif /* _SPSC_UB_QUEUE_H_ */ \ No newline at end of file diff --git a/spsc_ub_test.c b/spsc_ub_test.c deleted file mode 100644 index 3428c61..0000000 --- a/spsc_ub_test.c +++ /dev/null @@ -1,40 +0,0 @@ -#include <stdio.h> -#include <unistd.h> -#include <stdlib.h> -#include <stdint.h> -#include <stddef.h> - -#include "spsc_ub_queue.h" -#include "memory.h" - -/* Usage example */ -int main() -{ - int *v, b, test[5] = { 1, 2, 3, 4, 5 }; - struct spsc_ub_queue q; - - spsc_ub_queue_init(&q, 1, &memtype_hugepage); /** @todo change size from 1 in case of bounded queue impl */ - - spsc_ub_enqueue(&q, &test[0]); - spsc_ub_enqueue(&q, &test[1]); - - b = spsc_ub_dequeue(&q, (void**) &v); - printf("dequeue 1 %d, return %d\n", *v, b); - - b = spsc_ub_dequeue(&q, (void**) &v); - printf("dequeue 2 %d, return %d\n", *v, b); - - spsc_ub_enqueue(&q, &test[2]); - spsc_ub_enqueue(&q, &test[3]); - - b = spsc_ub_dequeue(&q, (void**) &v); - printf("dequeue 3 %d, return %d\n", *v, b); - - b = spsc_ub_dequeue(&q, (void**) &v); - printf("dequeue 4 %d, return %d\n", *v, b); - - b = spsc_ub_dequeue(&q, (void**) &v); - printf("dequeue 5 %d, return %d\n", *v, b); - - return 0; -} diff --git a/spsc_ub_test_default.c b/spsc_ub_test_default.c new file mode 100644 index 0000000..5993527 --- /dev/null +++ b/spsc_ub_test_default.c @@ -0,0 +1,154 @@ +#ifdef __linux__ + #define _GNU_SOURCE +#endif + +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <unistd.h> + +/* There are not many compilers yet which support threads.h natively. + * We will fallback to a drop-in replacement which is based on pthreads.h */ +#include "c11threads.h" + +/* For thread_get_id() */ +#ifdef __MACH__ + #include <pthread.h> +#elif defined(__linux__) + #include <sys/types.h> +#endif + +#include "spsc_ub_queue.h" +#include "memory.h" + +size_t const thread_count = 1; +size_t const batch_size = 10; +size_t const iter_count = 20000000; +//size_t const queue_size = 1 << 10; + +int volatile g_start = 0; + +/** Get thread id as integer + * In contrast to pthread_t which is an opaque type */ +uint64_t thread_get_id() +{ +#ifdef __MACH__ + uint64_t id; + pthread_threadid_np(pthread_self(), &id); + return id; +#elif defined(SYS_gettid) + return (int) syscall(SYS_gettid); +#endif + return -1; +} + +/** Get CPU timestep counter */ +__attribute__((always_inline)) static inline uint64_t rdtscp() +{ + uint64_t tsc; + + __asm__ ("rdtscp;" + "shl $32, %%rdx;" + "or %%rdx,%%rax" + : "=a" (tsc) + : + : "%rcx", "%rdx", "memory"); + + return tsc; +} + +/** Sleep, do nothing */ +__attribute__((always_inline)) static inline void nop() +{ + __asm__("rep nop;"); +} + +int thread_func_push(void *ctx) +{ + struct spsc_ub_queue *q = (struct spsc_ub_queue *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t pause = rand() % 1000; + + /* Wait for global start signal */ + while (g_start == 0) + thrd_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != pause; i += 1) + nop(); + + for (int iter = 0; iter != iter_count; ++iter) { + for (size_t i = 0; i != batch_size; i += 1) { + void *ptr = (void *) i; + spsc_ub_queue_push(q, ptr); //FIXME + } + } + thrd_yield(); + + return 0; +} + +int thread_func_pull(void *ctx) +{ + struct spsc_ub_queue *q = (struct spsc_ub_queue *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t pause = rand() % 1000; + + /* Wait for global start signal */ + while (g_start == 0) + thrd_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != pause; i += 1) + nop(); + + for (int iter = 0; iter != iter_count; ++iter) { + for (size_t i = 0; i != batch_size; i += 1) { + void *ptr; + while (!spsc_ub_queue_pull(q, &ptr)){ + continue; + } //FIXME + } + } + thrd_yield(); // queue empty + + return 0; +} + +int main() +{ + struct spsc_ub_queue queue; + thrd_t threads[thread_count + 1]; + int ret; + + spsc_ub_queue_init(&queue, batch_size * iter_count, &memtype_heap); + + printf("spsc_ub_queue_init successful\n"); + //for (int i = 0; i != thread_count; ++i) + thrd_create(&threads[0], thread_func_push, &queue); + thrd_create(&threads[1], thread_func_pull, &queue); + + sleep(1); + + uint64_t start = rdtscp(); + g_start = 1; + + //for (int i = 0; i != thread_count; ++i) + thrd_join(threads[0], NULL); + thrd_join(threads[1], NULL); + + uint64_t end = rdtscp(); + + printf("cycles/op = %lu\n", (end - start) / (batch_size * iter_count * 2 * thread_count)); + + if (queue._tail->_next != NULL) + printf("slots in use? There is something wrong with the test\n"); + + ret = spsc_ub_queue_destroy(&queue); + if (ret) + printf("Failed to destroy queue: %d", ret); + + return 0; +} \ No newline at end of file diff --git a/spsc_ub_test_fib.c b/spsc_ub_test_fib.c new file mode 100644 index 0000000..78ddabd --- /dev/null +++ b/spsc_ub_test_fib.c @@ -0,0 +1,196 @@ +#ifdef __linux__ + #define _GNU_SOURCE +#endif + +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <stdint.h> +#include <stddef.h> + +#include "spsc_ub_queue.h" +#include "memory.h" +#include "c11threads.h" + +/* Usage example */ + +#define N 20000000 + +int volatile g_start = 0; + +/** Get thread id as integer + * In contrast to pthread_t which is an opaque type */ +uint64_t thread_get_id() +{ +#ifdef __MACH__ + uint64_t id; + pthread_threadid_np(pthread_self(), &id); + return id; +#elif defined(SYS_gettid) + return (int) syscall(SYS_gettid); +#endif + return -1; +} + +/** Get CPU timestep counter */ +__attribute__((always_inline)) static inline uint64_t rdtscp() +{ + uint64_t tsc; + /** @todo not recommended to use rdtsc on multicore machine */ + __asm__ ("rdtsc;" + "shl $32, %%rdx;" + "or %%rdx,%%rax" + : "=a" (tsc) + : + : "%rcx", "%rdx", "memory"); + + return tsc; +} + +/** Sleep, do nothing */ +__attribute__((always_inline)) static inline void nop() +{ + __asm__("rep nop;"); +} + +/* Static global storage */ +int fibs[N]; + +int producer(void *ctx) +{ + printf("producer\n"); //DELETEME + struct spsc_ub_queue *q = (struct spsc_ub_queue *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t pause = rand() % 1000; + + /* Wait for global start signal */ + while (g_start == 0) + thrd_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != pause; i += 1) + nop(); + + /* Enqueue */ + for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) { + fibs[count] = n1 + n2; + + void *fibptr = (void *) &fibs[count]; + + if (!spsc_ub_queue_push(q, fibptr)) { + printf("Queue push failed\n"); + return -1; + } + + n1 = n2; n2 = fibs[count]; + } + + return 0; +} + +int consumer(void *ctx) +{ + printf("consumer\n"); //DELETEME + struct spsc_ub_queue *q = (struct spsc_ub_queue *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t pause = rand() % 1000; + + /* Wait for global start signal */ + while (g_start == 0) + thrd_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != pause; i += 1) + nop(); + + /* Dequeue */ + for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) { + int fib = n1 + n2; + int *pulled; + + while (!spsc_ub_queue_pull(q, (void **) &pulled)) { + //printf("Queue empty: %d\n", temp); + //return -1; + } + + if (*pulled != fib) { + printf("Pulled != fib\n"); + return -1; + } + + n1 = n2; n2 = fib; + } + + return 0; +} + +int test_single_threaded(struct spsc_ub_queue *q) +{ + int resp, resc; + + resp = producer(q); + if (resp) + printf("Enqueuing failed\n"); + + resc = consumer(q); + if (resc) + printf("Consumer failed\n"); + + if (resc || resp) + printf("Single Thread Test Failed\n"); + else + printf("Single Thread Test Complete\n"); + + return 0; +} + +int test_multi_threaded(struct spsc_ub_queue *q) +{ + thrd_t thrp, thrc; + int resp, resc; + + g_start = 0; + + thrd_create(&thrp, consumer, q); /** @todo Why producer thread runs earlier? */ + thrd_create(&thrc, producer, q); + + sleep(1); + + uint64_t start_tsc_time, end_tsc_time; + + start_tsc_time = rdtscp(); + g_start = 1; + + thrd_join(thrp, &resp); + thrd_join(thrc, &resc); + + end_tsc_time = rdtscp(); + + if (resc || resp) + printf("Queue Test failed\n"); + else + printf("Two-thread Test Complete\n"); + + printf("cycles/op for rdtsc %lu\n", (end_tsc_time - start_tsc_time)/N); + + if (q->_tail->_next != NULL) + printf("slots in use? There is something wrong with the test\n"); + + int ret = spsc_ub_queue_destroy(q); + if (ret) + printf("Failed to destroy queue: %d\n", ret); + + return 0; +} + +int main() +{ + struct spsc_ub_queue q; + spsc_ub_queue_init(&q, 1<<20, &memtype_heap); /** @todo change size>1 in case of bounded queue impl. memtype_hugepage impl for un_spsc */ + + test_multi_threaded(&q); + + return 0; +}