Merge branch 'master' of github.com:stv0g/c11-stdatomic-queue
This commit is contained in:
commit
b602dcd2df
17 changed files with 917 additions and 259 deletions
7
.gitignore
vendored
7
.gitignore
vendored
|
@ -3,6 +3,9 @@
|
|||
*.so
|
||||
*~
|
||||
|
||||
spsc_ub_test
|
||||
spsc_test
|
||||
spsc_ub_test_fib
|
||||
spsc_ub_test_default
|
||||
spsc_test_fib
|
||||
mpmc_test
|
||||
mpmc_test_fib
|
||||
gmon.out
|
||||
|
|
15
Makefile
15
Makefile
|
@ -1,5 +1,8 @@
|
|||
TARGETS = spsc_ub_test mpmc_test spsc_test
|
||||
TARGETS = spsc_ub_test_fib spsc_ub_test_default mpmc_test mpmc_test_fib spsc_test_fib
|
||||
CFLAGS = -Wall -std=c11
|
||||
ifeq ($(shell uname), Linux)
|
||||
LIBS = -pthread
|
||||
endif
|
||||
|
||||
DEBUG ?= 1
|
||||
|
||||
|
@ -13,13 +16,19 @@ endif
|
|||
|
||||
all: $(TARGETS)
|
||||
|
||||
spsc_ub_test: spsc_ub_test.o spsc_ub_queue.o memory.o
|
||||
spsc_ub_test_fib: spsc_ub_test_fib.o spsc_ub_queue.o memory.o
|
||||
$(CC) $^ -Wall $(LIBS) -o $@
|
||||
|
||||
spsc_ub_test_default: spsc_ub_test_default.o spsc_ub_queue.o memory.o
|
||||
$(CC) $^ -Wall $(LIBS) -o $@
|
||||
|
||||
mpmc_test: mpmc_test.o mpmc_queue.o memory.o
|
||||
$(CC) $^ -Wall $(LIBS) -o $@
|
||||
|
||||
mpmc_test_fib: mpmc_test_fib.o mpmc_queue.o memory.o
|
||||
$(CC) $^ -Wall $(LIBS) -o $@
|
||||
|
||||
spsc_test: spsc_test.o spsc_queue.o memory.o
|
||||
spsc_test_fib: spsc_test_fib.o spsc_queue.o memory.o
|
||||
$(CC) $^ -Wall $(LIBS) -o $@
|
||||
|
||||
%.o: %.c
|
||||
|
|
26
README.md
26
README.md
|
@ -29,9 +29,31 @@ This Git repository contains lightweight simple implementation of lockless SPSC
|
|||
|
||||
## Credits
|
||||
|
||||
- Umar Farooq
|
||||
- Umar Farooq <umar1.farooq1@gmail.com>
|
||||
- Steffen Vogel <post@steffenvogel.de>
|
||||
|
||||
## Initial Testing Results
|
||||
All tests are using mem_heap. All tests use *_fib test.
|
||||
|
||||
With clock_gettime function for cycle/op measurement:
|
||||
Octopus Machine:
|
||||
MPMC: 52-54 cycles/op --Something wrong, results too good to be true??
|
||||
Bounded SPSC: 230-300 cycles/op
|
||||
Unbounded SPSC: 300-400 cycles/op
|
||||
Side note: MPMC default test gets stuck with N=20000000, runs ok with N=2000000 with 160 cycles/op
|
||||
Ubuntu VM:
|
||||
MPMC: 90 cycles/op --Again result too good to be true in my opinion
|
||||
Bounded SPSC: 60-65 cycles/op
|
||||
Unbounded SPSC: 170-175 cycles/op
|
||||
|
||||
With rdtscp function for cycle/op measurement:
|
||||
Octopus Machine: Doesn't support rdtscp function, illegal instruction error. So alternate clock_gettime was used as above.
|
||||
|
||||
Ubuntu VM:
|
||||
MPMC: 160-200 cycles/op
|
||||
Bounded SPSC: 160-165 cycles/op
|
||||
Unbounded SPSC: 400-420 cycles/op
|
||||
|
||||
## License
|
||||
|
||||
#### BSD 2-Clause License
|
||||
|
@ -53,4 +75,4 @@ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
```
|
||||
```
|
||||
|
|
6
memory.c
6
memory.c
|
@ -1,9 +1,13 @@
|
|||
/**
|
||||
*
|
||||
*/
|
||||
|
||||
#ifdef __linux__
|
||||
#define _GNU_SOURCE
|
||||
#endif
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <sys/mman.h>
|
||||
#include <stdio.h> //DELETEME
|
||||
|
||||
/* Required to allocate hugepages on Apple OS X */
|
||||
#ifdef __MACH__
|
||||
|
|
5
memory.h
5
memory.h
|
@ -5,6 +5,11 @@
|
|||
#ifndef _MEMORY_H_
|
||||
#define _MEMORY_H_
|
||||
|
||||
#ifdef __linux__
|
||||
#include <stdint.h>
|
||||
#include <stddef.h>
|
||||
#endif
|
||||
|
||||
typedef void *(*memzone_allocator_t)(size_t);
|
||||
typedef int (*memzone_deallocator_t)(void *, size_t);
|
||||
|
||||
|
|
|
@ -39,8 +39,9 @@
|
|||
|
||||
#include "memory.h"
|
||||
|
||||
static size_t const cacheline_size = 64;
|
||||
typedef char cacheline_pad_t[cacheline_size];
|
||||
#define CACHELINE_SIZE 64
|
||||
|
||||
typedef char cacheline_pad_t[CACHELINE_SIZE];
|
||||
|
||||
struct mpmc_queue {
|
||||
cacheline_pad_t _pad0; /**< Shared area: all threads read */
|
||||
|
|
31
mpmc_test.c
31
mpmc_test.c
|
@ -1,7 +1,12 @@
|
|||
#ifdef __linux__
|
||||
#define _GNU_SOURCE
|
||||
#endif
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
#include <unistd.h>
|
||||
#include <time.h>
|
||||
|
||||
/* There are not many compilers yet which support threads.h natively.
|
||||
* We will fallback to a drop-in replacement which is based on pthreads.h */
|
||||
|
@ -19,7 +24,7 @@
|
|||
|
||||
size_t const thread_count = 4;
|
||||
size_t const batch_size = 10;
|
||||
size_t const iter_count = 20000000;
|
||||
size_t const iter_count = 2000000;
|
||||
size_t const queue_size = 1 << 20;
|
||||
|
||||
int volatile g_start = 0;
|
||||
|
@ -32,9 +37,10 @@ uint64_t thread_get_id()
|
|||
uint64_t id;
|
||||
pthread_threadid_np(pthread_self(), &id);
|
||||
return id;
|
||||
#elif defined(__linux__)
|
||||
return (int) gettid();
|
||||
#elif defined(SYS_gettid)
|
||||
return (int) syscall(SYS_gettid);
|
||||
#endif
|
||||
return -1;
|
||||
}
|
||||
|
||||
/** Get CPU timestep counter */
|
||||
|
@ -49,6 +55,7 @@ __attribute__((always_inline)) static inline uint64_t rdtscp()
|
|||
:
|
||||
: "%rcx", "%rdx", "memory");
|
||||
|
||||
printf("tsc %lu\n", tsc);
|
||||
return tsc;
|
||||
}
|
||||
|
||||
|
@ -96,22 +103,30 @@ int main()
|
|||
thrd_t threads[thread_count];
|
||||
int ret;
|
||||
|
||||
mpmc_queue_init(&queue, queue_size, &memtype_hugepage);
|
||||
mpmc_queue_init(&queue, queue_size, &memtype_heap);
|
||||
|
||||
for (int i = 0; i != thread_count; ++i)
|
||||
thrd_create(&threads[i], thread_func, &queue);
|
||||
|
||||
sleep(1);
|
||||
|
||||
long long starttime, endtime;
|
||||
struct timespec start, end;
|
||||
|
||||
if(clock_gettime(CLOCK_REALTIME, &start))
|
||||
return -1;
|
||||
|
||||
uint64_t start = rdtscp();
|
||||
g_start = 1;
|
||||
|
||||
for (int i = 0; i != thread_count; ++i)
|
||||
thrd_join(threads[i], NULL);
|
||||
|
||||
uint64_t end = rdtscp();
|
||||
if(clock_gettime(CLOCK_REALTIME, &end))
|
||||
return -1;
|
||||
|
||||
printf("cycles/op = %llu\n", (end - start) / (batch_size * iter_count * 2 * thread_count));
|
||||
starttime = start.tv_sec*1000000000LL + start.tv_nsec;
|
||||
endtime = end.tv_sec*1000000000LL + end.tv_nsec;
|
||||
printf("cycles/op = %lld\n", (endtime - starttime) / (batch_size * iter_count * 2 * thread_count));
|
||||
|
||||
size_t used = mpmc_queue_available(&queue);
|
||||
if (used > 0)
|
||||
|
@ -122,4 +137,4 @@ int main()
|
|||
printf("Failed to destroy queue: %d", ret);
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
198
mpmc_test_fib.c
Normal file
198
mpmc_test_fib.c
Normal file
|
@ -0,0 +1,198 @@
|
|||
#ifdef __linux__
|
||||
#define _GNU_SOURCE
|
||||
#endif
|
||||
|
||||
#include <stdio.h>
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
#include <stddef.h>
|
||||
|
||||
#include "mpmc_queue.h"
|
||||
#include "memory.h"
|
||||
#include "c11threads.h"
|
||||
|
||||
/* Usage example */
|
||||
|
||||
#define N 20000000
|
||||
|
||||
int volatile g_start = 0;
|
||||
|
||||
/** Get thread id as integer
|
||||
* In contrast to pthread_t which is an opaque type */
|
||||
uint64_t thread_get_id()
|
||||
{
|
||||
#ifdef __MACH__
|
||||
uint64_t id;
|
||||
pthread_threadid_np(pthread_self(), &id);
|
||||
return id;
|
||||
#elif defined(SYS_gettid)
|
||||
return (int) syscall(SYS_gettid);
|
||||
#endif
|
||||
return -1;
|
||||
}
|
||||
|
||||
/** Get CPU timestep counter */
|
||||
__attribute__((always_inline)) static inline uint64_t rdtscp()
|
||||
{
|
||||
uint64_t tsc;
|
||||
/** @todo not recommended to use rdtsc on multicore machine */
|
||||
__asm__ ("rdtsc;"
|
||||
"shl $32, %%rdx;"
|
||||
"or %%rdx,%%rax"
|
||||
: "=a" (tsc)
|
||||
:
|
||||
: "%rcx", "%rdx", "memory");
|
||||
|
||||
return tsc;
|
||||
}
|
||||
|
||||
/** Sleep, do nothing */
|
||||
__attribute__((always_inline)) static inline void nop()
|
||||
{
|
||||
__asm__("rep nop;");
|
||||
}
|
||||
|
||||
/* Static global storage */
|
||||
int fibs[N];
|
||||
|
||||
int producer(void *ctx)
|
||||
{
|
||||
//printf("producer\n"); //DELETEME
|
||||
struct mpmc_queue *q = (struct mpmc_queue *) ctx;
|
||||
|
||||
srand((unsigned) time(0) + thread_get_id());
|
||||
size_t pause = rand() % 1000;
|
||||
|
||||
/* Wait for global start signal */
|
||||
while (g_start == 0)
|
||||
thrd_yield();
|
||||
|
||||
/* Wait for a random time */
|
||||
for (size_t i = 0; i != pause; i += 1)
|
||||
nop();
|
||||
|
||||
/* Enqueue */
|
||||
for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) {
|
||||
fibs[count] = n1 + n2;
|
||||
|
||||
void *fibptr = (void *) &fibs[count];
|
||||
|
||||
if (!mpmc_queue_push(q, fibptr)) {
|
||||
printf("Queue push failed at count %lu\n", count);
|
||||
return -1;
|
||||
}
|
||||
|
||||
n1 = n2; n2 = fibs[count];
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int consumer(void *ctx)
|
||||
{
|
||||
//printf("consumer\n"); //DELETEME
|
||||
struct mpmc_queue *q = (struct mpmc_queue *) ctx;
|
||||
|
||||
srand((unsigned) time(0) + thread_get_id());
|
||||
size_t pause = rand() % 1000;
|
||||
|
||||
/* Wait for global start signal */
|
||||
while (g_start == 0)
|
||||
thrd_yield();
|
||||
|
||||
/* Wait for a random time */
|
||||
for (size_t i = 0; i != pause; i += 1)
|
||||
nop();
|
||||
|
||||
/* Dequeue */
|
||||
for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) {
|
||||
int fib = n1 + n2;
|
||||
int *pulled;
|
||||
|
||||
while (!mpmc_queue_pull(q, (void **) &pulled)) {
|
||||
//printf("Queue empty: %d\n", temp);
|
||||
//return -1;
|
||||
}
|
||||
|
||||
if (*pulled != fib) {
|
||||
printf("Pulled != fib\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
n1 = n2; n2 = fib;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_single_threaded(struct mpmc_queue *q)
|
||||
{
|
||||
int resp, resc;
|
||||
g_start = 1;
|
||||
|
||||
resp = producer(q);
|
||||
if (resp)
|
||||
printf("Enqueuing failed\n");
|
||||
|
||||
resc = consumer(q);
|
||||
if (resc)
|
||||
printf("Consumer failed\n");
|
||||
|
||||
if (resc || resp)
|
||||
printf("Single Thread Test Failed\n");
|
||||
else
|
||||
printf("Single Thread Test Complete\n");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_multi_threaded(struct mpmc_queue *q)
|
||||
{
|
||||
thrd_t thrp, thrc;
|
||||
int resp, resc;
|
||||
|
||||
g_start = 0;
|
||||
|
||||
thrd_create(&thrp, consumer, q); /** @todo Why producer thread runs earlier? */
|
||||
thrd_create(&thrc, producer, q);
|
||||
|
||||
sleep(1);
|
||||
|
||||
uint64_t start_tsc_time, end_tsc_time;
|
||||
|
||||
start_tsc_time = rdtscp();
|
||||
g_start = 1;
|
||||
|
||||
thrd_join(thrp, &resp);
|
||||
thrd_join(thrc, &resc);
|
||||
|
||||
end_tsc_time = rdtscp();
|
||||
|
||||
if (resc || resp)
|
||||
printf("Queue Test failed\n");
|
||||
else
|
||||
printf("Two-thread Test Complete\n");
|
||||
|
||||
printf("cycles/op for rdtsc %lu\n", (end_tsc_time - start_tsc_time)/N);
|
||||
|
||||
size_t used = mpmc_queue_available(q);
|
||||
if (used > 0)
|
||||
printf("%zu slots in use? There is something wrong with the test\n", used);
|
||||
|
||||
int ret = mpmc_queue_destroy(q);
|
||||
if (ret)
|
||||
printf("Failed to destroy queue: %d\n", ret);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
struct mpmc_queue q;
|
||||
mpmc_queue_init(&q, 1<<20, &memtype_heap); /** @todo change size>1 in case of bounded queue impl. memtype_hugepage impl for un_spsc */
|
||||
|
||||
test_multi_threaded(&q);
|
||||
|
||||
return 0;
|
||||
}
|
56
spsc_queue.c
56
spsc_queue.c
|
@ -1,7 +1,7 @@
|
|||
/** Lock-free Single-Producer Single-consumer (SPSC) queue.
|
||||
*
|
||||
* @author Umar Farooq
|
||||
* @copyright 2016 Umar Farooq
|
||||
* @copyright 2016 Umar Farooq <umar1.farooq1@gmail.com>
|
||||
* @license BSD 2-Clause License
|
||||
*
|
||||
* All rights reserved.
|
||||
|
@ -30,63 +30,63 @@
|
|||
|
||||
#include "spsc_queue.h"
|
||||
|
||||
void * spsc_queue_init(size_t size, const struct memtype *mem)
|
||||
struct spsc_queue * spsc_queue_init(struct spsc_queue * q, size_t size, const struct memtype *mem)
|
||||
{
|
||||
struct spsc_queue *q;
|
||||
|
||||
if (size < sizeof(struct spsc_queue) + 2 * sizeof(q->pointers[0]))
|
||||
return NULL;
|
||||
|
||||
/* Queue size must be 2 exponent */
|
||||
if ((size < 2) || ((size & (size - 1)) != 0))
|
||||
return NULL;
|
||||
|
||||
q = memory_alloc(mem, sizeof(struct spsc_queue) + sizeof(q->pointers[0]) * size);
|
||||
q = memory_alloc(mem, sizeof(struct spsc_queue) + (sizeof(q->pointers[0]) * size));
|
||||
if (!q)
|
||||
return NULL;
|
||||
|
||||
q->mem = mem;
|
||||
|
||||
q->capacity = size - 1;
|
||||
|
||||
atomic_init(&q->tail, 0);
|
||||
atomic_init(&q->head, 0);
|
||||
|
||||
atomic_init(&q->_tail, 0);
|
||||
atomic_init(&q->_head, 0);
|
||||
|
||||
return q;
|
||||
}
|
||||
|
||||
void spsc_queue_destroy(struct spsc_queue *q)
|
||||
int spsc_queue_destroy(struct spsc_queue *q)
|
||||
{
|
||||
/* Nothing to do here */
|
||||
return;
|
||||
const struct memtype mem = *(q->mem); /** @todo Memory is not being freed properly??? */
|
||||
return memory_free(&mem, q, sizeof(struct spsc_queue) + ((q->capacity + 1) * sizeof(q->pointers[0])));
|
||||
}
|
||||
|
||||
int spsc_queue_get_many(struct spsc_queue *q, void *ptrs[], size_t cnt)
|
||||
int spsc_queue_get_many(struct spsc_queue *q, void **ptrs[], size_t cnt)
|
||||
{
|
||||
if (q->capacity <= spsc_queue_available(q))
|
||||
cnt = 0;
|
||||
else if (cnt > q->capacity - spsc_queue_available(q))
|
||||
cnt = q->capacity - spsc_queue_available(q);
|
||||
|
||||
/**@todo Is atomic_load_explicit needed here for loading q->_head? */
|
||||
for (int i = 0; i < cnt; i++)
|
||||
ptrs[i] = q->pointers[q->head % (q->capacity + 1)];
|
||||
ptrs[i] = &(q->pointers[q->_head % (q->capacity + 1)]);
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
||||
int spsc_queue_push_many(struct spsc_queue *q, void **ptrs, size_t cnt)
|
||||
int spsc_queue_push_many(struct spsc_queue *q, void *ptrs[], size_t cnt)
|
||||
{
|
||||
//int free_slots = q->tail < q->head ? q->head - q->tail - 1 : q->head + (q->capacity - q->tail);
|
||||
//int free_slots = q->_tail < q->_head ? q->_head - q->_tail - 1 : q->_head + (q->capacity - q->_tail);
|
||||
size_t free_slots = spsc_queue_available(q);
|
||||
|
||||
if (cnt > free_slots)
|
||||
cnt = free_slots;
|
||||
|
||||
for (size_t i = 0; i < cnt; i++) {
|
||||
q->pointers[q->tail] = ptrs[i]; //--? alternate use (q->tail + i)%(q->capacity + 1) as index and update q->tail at the end of loop
|
||||
q->tail = (q->tail + 1)%(q->capacity + 1);
|
||||
q->pointers[q->_tail] = ptrs[i];
|
||||
atomic_store_explicit(&q->_tail, (q->_tail + 1)%(q->capacity + 1), memory_order_release);
|
||||
}
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
||||
int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs, size_t cnt)
|
||||
int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs[], size_t cnt)
|
||||
{
|
||||
if (q->capacity <= spsc_queue_available(q))
|
||||
cnt = 0;
|
||||
|
@ -94,17 +94,17 @@ int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs, size_t cnt)
|
|||
cnt = q->capacity - spsc_queue_available(q);
|
||||
|
||||
for (size_t i = 0; i < cnt; i++) {
|
||||
ptrs[i] = q->pointers[q->head];
|
||||
q->head = (q->head + 1)%(q->capacity + 1);
|
||||
*ptrs[i] = q->pointers[q->_head];
|
||||
atomic_store_explicit(&q->_head, (q->_head + 1)%(q->capacity + 1), memory_order_release);
|
||||
}
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
||||
int spsc_queue_available(struct spsc_queue *q) //--? make this func inline
|
||||
int spsc_queue_available(struct spsc_queue *q)
|
||||
{
|
||||
if (q->tail < q->head)
|
||||
return q->head - q->tail - 1;
|
||||
if (atomic_load_explicit(&q->_tail, memory_order_acquire) < atomic_load_explicit(&q->_head, memory_order_acquire))
|
||||
return q->_head - q->_tail - 1;
|
||||
else
|
||||
return q->head + (q->capacity - q->tail);
|
||||
}
|
||||
return q->_head + (q->capacity - q->_tail);
|
||||
}
|
||||
|
|
40
spsc_queue.h
40
spsc_queue.h
|
@ -49,19 +49,25 @@ struct spsc_queue {
|
|||
size_t capacity; /**< Total number of available pointers in queue::array */
|
||||
|
||||
/* Consumer part */
|
||||
atomic_int tail; /**< Tail pointer of queue*/
|
||||
_Atomic int _tail; /**< Tail pointer of queue*/
|
||||
cacheline_pad_t _pad1;
|
||||
/* Producer part */
|
||||
atomic_int head; /**< Head pointer of queue*/
|
||||
_Atomic int _head; /**< Head pointer of queue*/
|
||||
|
||||
void *pointers[]; /**< Circular buffer. */
|
||||
};
|
||||
|
||||
/** Initiliaze a new queue and allocate memory. */
|
||||
void * spsc_queue_init(size_t size, const struct memtype *mem);
|
||||
struct spsc_queue * spsc_queue_init(struct spsc_queue *q, size_t size, const struct memtype *mem);
|
||||
|
||||
/** Release memory of queue. */
|
||||
void spsc_queue_destroy(struct spsc_queue *q);
|
||||
int spsc_queue_destroy(struct spsc_queue *q);
|
||||
|
||||
/** Return the number of free slots in a queue
|
||||
*
|
||||
* Note: This is only an estimate!
|
||||
*/
|
||||
int spsc_queue_available(struct spsc_queue *q);
|
||||
|
||||
/** Enqueue up to \p cnt elements from \p ptrs[] at the queue tail pointed by \p tail.
|
||||
*
|
||||
|
@ -85,33 +91,27 @@ int spsc_queue_push_many(struct spsc_queue *q, void *ptrs[], size_t cnt);
|
|||
* @param[in,out] head A pointer to a queue head. The value will be updated to reflect the new head.
|
||||
* @return The number of elements which have been dequeued.
|
||||
*/
|
||||
int spsc_queue_pull_many(struct spsc_queue *q, void *ptrs[], size_t cnt);
|
||||
int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs[], size_t cnt);
|
||||
|
||||
/** Fill \p ptrs with \p cnt elements of the queue starting at entry \p pos. */
|
||||
int spsc_queue_get_many(struct spsc_queue *q, void *ptrs[], size_t cnt);
|
||||
|
||||
/** Get the first element in the queue */
|
||||
static inline int spsc_queue_get(struct spsc_queue *q, void **ptr)
|
||||
{
|
||||
return spsc_queue_get_many(q, ptr, 1);
|
||||
}
|
||||
int spsc_queue_get_many(struct spsc_queue *q, void **ptrs[], size_t cnt);
|
||||
|
||||
/** Enqueue a new block at the tail of the queue. */
|
||||
static inline int spsc_queue_push(struct spsc_queue *q, void **ptr)
|
||||
static inline int spsc_queue_push(struct spsc_queue *q, void *ptr)
|
||||
{
|
||||
return spsc_queue_push_many(q, ptr, 1);
|
||||
return spsc_queue_push_many(q, &ptr, 1);
|
||||
}
|
||||
|
||||
/** Dequeue the first block at the head of the queue. */
|
||||
static inline int spsc_queue_pull(struct spsc_queue *q, void **ptr)
|
||||
{
|
||||
return spsc_queue_pull_many(q, ptr, 1);
|
||||
return spsc_queue_pull_many(q, &ptr, 1);
|
||||
}
|
||||
|
||||
/** Return the number of free slots in a queue
|
||||
*
|
||||
* Note: This is only an estimate!
|
||||
*/
|
||||
int spsc_queue_available(struct spsc_queue *q);
|
||||
/** Get the first element in the queue */
|
||||
static inline int spsc_queue_get(struct spsc_queue *q, void **ptr)
|
||||
{
|
||||
return spsc_queue_get_many(q, &ptr, 1);
|
||||
}
|
||||
|
||||
#endif /* _SPSC_QUEUE_H_ */
|
131
spsc_test.c
131
spsc_test.c
|
@ -1,131 +0,0 @@
|
|||
#include <sys/mman.h>
|
||||
#include <stdio.h>
|
||||
|
||||
/* Required to allocate hugepages on Apple OS X */
|
||||
#ifdef __MACH__
|
||||
#include <mach/vm_statistics.h>
|
||||
#endif
|
||||
|
||||
/* There are not many compilers yet which support threads.h natively.
|
||||
* We will fallback to a drop-in replacement which is based on pthreads.h */
|
||||
#include "c11threads.h"
|
||||
|
||||
#include "spsc_queue.h"
|
||||
|
||||
#define N 2000000
|
||||
|
||||
/* Static global storage */
|
||||
int fibs[N];
|
||||
|
||||
int producer(void *ctx)
|
||||
{
|
||||
struct spsc_queue *q = (struct spsc_queue *) ctx;
|
||||
|
||||
/* Enqueue */
|
||||
for (int count = 0, n1 = 0, n2 = 1, r; count < N; count++) {
|
||||
fibs[count] = n1 + n2;
|
||||
|
||||
void *fibptr = (void *) &fibs[count];
|
||||
|
||||
r = spsc_queue_push(q, &fibptr);
|
||||
if (r != 1) {
|
||||
printf("Queue push failed\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
n1 = n2; n2 = fibs[count];
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int consumer(void *ctx)
|
||||
{
|
||||
struct spsc_queue *q = (struct spsc_queue *) ctx;
|
||||
|
||||
/* Dequeue */
|
||||
for (int count = 0, n1 = 0, n2 = 1, r; count < N; count++) {
|
||||
int fib = n1 + n2;
|
||||
int *pulled;
|
||||
|
||||
r = spsc_queue_pull(q, (void **) &pulled);
|
||||
if (r != 1) {
|
||||
printf("Queue pull failed: %d\n", r);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (*pulled != fib) {
|
||||
printf("Pulled != fib\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
n1 = n2; n2 = fib;
|
||||
}
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_single_threaded(struct spsc_queue *q)
|
||||
{
|
||||
int resp, resc;
|
||||
|
||||
resp = producer(q);
|
||||
if (resp)
|
||||
printf("Enqueuing failed");
|
||||
|
||||
resc = consumer(q);
|
||||
if (resc)
|
||||
printf("Consumer failed");
|
||||
|
||||
if (resc || resp)
|
||||
printf("Single Thread Test Failed");
|
||||
else
|
||||
printf("Single Thread Test Complete\n");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_multi_threaded(struct spsc_queue *q)
|
||||
{
|
||||
thrd_t thrp, thrc;
|
||||
int resp, resc;
|
||||
|
||||
thrd_create(&thrp, producer, q);
|
||||
thrd_create(&thrc, consumer, q);
|
||||
|
||||
thrd_join(thrp, &resp);
|
||||
thrd_join(thrc, &resc);
|
||||
|
||||
if (resc || resp)
|
||||
printf("Queue Test failed");
|
||||
else
|
||||
printf("Two-thread Test Complete\n");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void queue_info(struct spsc_queue *q)
|
||||
{
|
||||
printf("Queue tail %d, queue head %d, queue capacity %lu, Queue free slots %d\n", q->tail, q->head, q->capacity, spsc_queue_available(q));
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
struct spsc_queue *q;
|
||||
size_t queue_size = 1 << 20; // 16 MiB;
|
||||
|
||||
q = spsc_queue_init(q, queue_size, &memtype_hugepage);
|
||||
|
||||
if (q) { //--? pass &q ....**ptr
|
||||
printf("Error in initialization\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
queue_info(q);
|
||||
|
||||
test_single_threaded(q);
|
||||
test_multi_threaded(q);
|
||||
|
||||
return 0;
|
||||
}
|
198
spsc_test_fib.c
Normal file
198
spsc_test_fib.c
Normal file
|
@ -0,0 +1,198 @@
|
|||
#ifdef __linux__
|
||||
#define _GNU_SOURCE
|
||||
#endif
|
||||
|
||||
#include <stdio.h>
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
#include <stddef.h>
|
||||
|
||||
#include "spsc_queue.h"
|
||||
#include "memory.h"
|
||||
#include "c11threads.h"
|
||||
|
||||
/* Usage example */
|
||||
|
||||
#define N 20000000
|
||||
|
||||
int volatile g_start = 0;
|
||||
|
||||
/** Get thread id as integer
|
||||
* In contrast to pthread_t which is an opaque type */
|
||||
uint64_t thread_get_id()
|
||||
{
|
||||
#ifdef __MACH__
|
||||
uint64_t id;
|
||||
pthread_threadid_np(pthread_self(), &id);
|
||||
return id;
|
||||
#elif defined(SYS_gettid)
|
||||
return (int) syscall(SYS_gettid);
|
||||
#endif
|
||||
return -1;
|
||||
}
|
||||
|
||||
/** Get CPU timestep counter */
|
||||
__attribute__((always_inline)) static inline uint64_t rdtscp()
|
||||
{
|
||||
uint64_t tsc;
|
||||
/** @todo not recommended to use rdtsc on multicore machine */
|
||||
__asm__ ("rdtsc;"
|
||||
"shl $32, %%rdx;"
|
||||
"or %%rdx,%%rax"
|
||||
: "=a" (tsc)
|
||||
:
|
||||
: "%rcx", "%rdx", "memory");
|
||||
|
||||
return tsc;
|
||||
}
|
||||
|
||||
/** Sleep, do nothing */
|
||||
__attribute__((always_inline)) static inline void nop()
|
||||
{
|
||||
__asm__("rep nop;");
|
||||
}
|
||||
|
||||
/* Static global storage */
|
||||
int fibs[N];
|
||||
|
||||
int producer(void *ctx)
|
||||
{
|
||||
printf("producer\n"); //DELETEME
|
||||
struct spsc_queue *q = (struct spsc_queue *) ctx;
|
||||
|
||||
srand((unsigned) time(0) + thread_get_id());
|
||||
size_t pause = rand() % 1000;
|
||||
|
||||
/* Wait for global start signal */
|
||||
while (g_start == 0)
|
||||
thrd_yield();
|
||||
|
||||
/* Wait for a random time */
|
||||
for (size_t i = 0; i != pause; i += 1)
|
||||
nop();
|
||||
|
||||
/* Enqueue */
|
||||
for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) {
|
||||
fibs[count] = n1 + n2;
|
||||
|
||||
void *fibptr = (void *) &fibs[count];
|
||||
|
||||
if (!spsc_queue_push(q, fibptr)) {
|
||||
printf("Queue push failed at count %lu, %d, free slots %d\n", count, 1<<20, spsc_queue_available(q));
|
||||
return -1;
|
||||
}
|
||||
|
||||
n1 = n2; n2 = fibs[count];
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int consumer(void *ctx)
|
||||
{
|
||||
printf("consumer\n"); //DELETEME
|
||||
struct spsc_queue *q = (struct spsc_queue *) ctx;
|
||||
|
||||
srand((unsigned) time(0) + thread_get_id());
|
||||
size_t pause = rand() % 1000;
|
||||
|
||||
/* Wait for global start signal */
|
||||
while (g_start == 0)
|
||||
thrd_yield();
|
||||
|
||||
/* Wait for a random time */
|
||||
for (size_t i = 0; i != pause; i += 1)
|
||||
nop();
|
||||
|
||||
/* Dequeue */
|
||||
for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) {
|
||||
int fib = n1 + n2;
|
||||
int *pulled;
|
||||
|
||||
while (!spsc_queue_pull(q, (void **) &pulled)) {
|
||||
//printf("Queue empty: %d\n", temp);
|
||||
//return -1;
|
||||
}
|
||||
|
||||
if (*pulled != fib) {
|
||||
printf("Pulled != fib\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
n1 = n2; n2 = fib;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_single_threaded(struct spsc_queue *q)
|
||||
{
|
||||
int resp, resc;
|
||||
g_start = 1;
|
||||
resp = producer(q);
|
||||
if (resp)
|
||||
printf("Enqueuing failed\n");
|
||||
|
||||
resc = consumer(q);
|
||||
if (resc)
|
||||
printf("Consumer failed\n");
|
||||
|
||||
if (resc || resp)
|
||||
printf("Single Thread Test Failed\n");
|
||||
else
|
||||
printf("Single Thread Test Complete\n");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_multi_threaded(struct spsc_queue *q)
|
||||
{
|
||||
thrd_t thrp, thrc;
|
||||
int resp, resc;
|
||||
|
||||
g_start = 0;
|
||||
|
||||
thrd_create(&thrp, consumer, q); /** @todo Why producer thread runs earlier? */
|
||||
thrd_create(&thrc, producer, q);
|
||||
|
||||
sleep(1);
|
||||
|
||||
uint64_t start_tsc_time, end_tsc_time;
|
||||
|
||||
start_tsc_time = rdtscp();
|
||||
g_start = 1;
|
||||
|
||||
thrd_join(thrp, &resp);
|
||||
thrd_join(thrc, &resc);
|
||||
|
||||
end_tsc_time = rdtscp();
|
||||
|
||||
if (resc || resp)
|
||||
printf("Queue Test failed\n");
|
||||
else
|
||||
printf("Two-thread Test Complete\n");
|
||||
|
||||
printf("cycles/op for rdtsc %lu\n", (end_tsc_time - start_tsc_time)/N);
|
||||
|
||||
size_t used = spsc_queue_available(q);
|
||||
if (spsc_queue_available(q) != q->capacity)
|
||||
printf("%zu slots in use? There is something wrong with the test\n", used);
|
||||
|
||||
int ret = spsc_queue_destroy(q);
|
||||
if (ret)
|
||||
printf("Failed to destroy queue: %d\n", ret);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
struct spsc_queue* q = NULL;
|
||||
q = spsc_queue_init(q, 1<<24, &memtype_heap);
|
||||
|
||||
test_multi_threaded(q);
|
||||
//test_single_threaded(q); /** Single threaded test fails with N > queue size*/
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -33,18 +33,24 @@
|
|||
|
||||
#include "spsc_ub_queue.h"
|
||||
|
||||
void spsc_ub_queue_init(struct spsc_ub_queue* q, size_t size, const struct memtype *mem)
|
||||
int spsc_ub_queue_init(struct spsc_ub_queue* q, size_t size, const struct memtype *mem)
|
||||
{
|
||||
struct node* n = memory_alloc(q->mem, sizeof(struct node) * size);
|
||||
|
||||
q->mem = mem;
|
||||
struct node* n = memory_alloc(q->mem, sizeof(struct node) * size);
|
||||
n->_next = NULL;
|
||||
q->_tail = q->_head = q->_first= q->_tailcopy = n;
|
||||
|
||||
return;
|
||||
/** Alloc memory at start for total size for efficiency */
|
||||
void *v = NULL;
|
||||
for(unsigned long i = 0; i < size; i++) /** @todo fix this hack in bounded implementation */
|
||||
spsc_ub_queue_push(q, v);
|
||||
for(unsigned long i = 0; i < size; i++)
|
||||
spsc_ub_queue_pull(q, &v);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void spsc_ub_queue_destroy(struct spsc_ub_queue* q)
|
||||
int spsc_ub_queue_destroy(struct spsc_ub_queue* q)
|
||||
{
|
||||
struct node* n = q->_first;
|
||||
|
||||
|
@ -53,6 +59,8 @@ void spsc_ub_queue_destroy(struct spsc_ub_queue* q)
|
|||
memory_free(q->mem, (void *) n, sizeof(struct node));
|
||||
n = next;
|
||||
} while (n);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct node* spsc_ub_alloc_node(struct spsc_ub_queue* q)
|
||||
|
@ -67,7 +75,7 @@ struct node* spsc_ub_alloc_node(struct spsc_ub_queue* q)
|
|||
}
|
||||
|
||||
//q->_tailcopy = load_consume(q->_tail);
|
||||
q->_tailcopy = atomic_load_explicit(&q->_tail, memory_order_consume);
|
||||
q->_tailcopy = atomic_load_explicit(&q->_tail, memory_order_acquire);
|
||||
|
||||
if (q->_first != q->_tailcopy) {
|
||||
struct node* n = q->_first;
|
||||
|
@ -78,11 +86,12 @@ struct node* spsc_ub_alloc_node(struct spsc_ub_queue* q)
|
|||
return (struct node*) memory_alloc(q->mem, sizeof(struct node));
|
||||
}
|
||||
|
||||
void spsc_ub_enqueue(struct spsc_ub_queue* q, void * v)
|
||||
int spsc_ub_queue_push(struct spsc_ub_queue* q, void * v)
|
||||
{
|
||||
struct node* n = spsc_ub_alloc_node(q);
|
||||
|
||||
n->_next = NULL;
|
||||
atomic_store_explicit(&(n->_next), NULL, memory_order_release);
|
||||
//n->_next = NULL;
|
||||
n->_value = v;
|
||||
|
||||
//store_release(&(q->_head->_next), n);
|
||||
|
@ -90,19 +99,19 @@ void spsc_ub_enqueue(struct spsc_ub_queue* q, void * v)
|
|||
|
||||
q->_head = n;
|
||||
|
||||
return;
|
||||
return 1;
|
||||
}
|
||||
|
||||
int spsc_ub_dequeue(struct spsc_ub_queue* q, void** v)
|
||||
int spsc_ub_queue_pull(struct spsc_ub_queue* q, void** v)
|
||||
{
|
||||
if (atomic_load_explicit(&(q->_tail->_next), memory_order_consume)) {
|
||||
if (atomic_load_explicit(&(q->_tail->_next), memory_order_acquire)) {
|
||||
*v = q->_tail->_next->_value;
|
||||
|
||||
//store_release(&q->_tail, q->_tail->_next);
|
||||
atomic_store_explicit(&q->_tail, q->_tail->_next, memory_order_release);
|
||||
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -38,8 +38,9 @@
|
|||
|
||||
#include "memory.h"
|
||||
|
||||
static size_t const cacheline_size = 64;
|
||||
typedef char cacheline_pad_t[cacheline_size];
|
||||
//static size_t const cacheline_size = 64;
|
||||
#define CACHELINE_SIZE 64
|
||||
typedef char cacheline_pad_t[CACHELINE_SIZE];
|
||||
|
||||
struct node {
|
||||
struct node * _Atomic _next; /**> Single linked list of nodes */
|
||||
|
@ -62,19 +63,33 @@ struct spsc_ub_queue
|
|||
|
||||
/* Producer part
|
||||
* accessed only by producer */
|
||||
struct node* _Atomic _head; /**> Head of the queue. */
|
||||
struct node* _Atomic _first; /**> Last unused node (tail of node cache). */
|
||||
struct node* _Atomic _tailcopy; /**> Helper which points somewhere between _first and _tail */
|
||||
struct node* _head; /**> Head of the queue. */
|
||||
cacheline_pad_t _pad2;
|
||||
struct node* _first; /**> Last unused node (tail of node cache). */
|
||||
cacheline_pad_t _pad3;
|
||||
struct node* _tailcopy; /**> Helper which points somewhere between _first and _tail */
|
||||
cacheline_pad_t _pad4;
|
||||
};
|
||||
|
||||
void spsc_ub_queue_init(struct spsc_ub_queue *q, size_t size, const struct memtype *mem);
|
||||
/** Initialize SPSC queue */
|
||||
int spsc_ub_queue_init(struct spsc_ub_queue *q, size_t size, const struct memtype *mem);
|
||||
|
||||
void spsc_ub_queue_destroy(struct spsc_ub_queue *q);
|
||||
/** Destroy SPSC queue and release memory */
|
||||
int spsc_ub_queue_destroy(struct spsc_ub_queue *q);
|
||||
|
||||
/** Allocate memory for new node. Each node stores a pointer
|
||||
* value pushed to unbounded SPSC queue
|
||||
*/
|
||||
struct node * spsc_ub_alloc_node(struct spsc_ub_queue *q);
|
||||
|
||||
void spsc_ub_enqueue(struct spsc_ub_queue *q, void *v);
|
||||
/** Push a value from unbounded SPSC queue
|
||||
* return : 1 always as its an unbounded queue
|
||||
*/
|
||||
int spsc_ub_queue_push(struct spsc_ub_queue *q, void *v);
|
||||
|
||||
int spsc_ub_dequeue(struct spsc_ub_queue *q, void **v);
|
||||
/** Pull a value from unbounded SPSC queue
|
||||
* return : 1 if success else 0
|
||||
*/
|
||||
int spsc_ub_queue_pull(struct spsc_ub_queue *q, void **v);
|
||||
|
||||
#endif /* _SPSC_UB_QUEUE_H_ */
|
|
@ -1,40 +0,0 @@
|
|||
#include <stdio.h>
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
#include <stddef.h>
|
||||
|
||||
#include "spsc_ub_queue.h"
|
||||
#include "memory.h"
|
||||
|
||||
/* Usage example */
|
||||
int main()
|
||||
{
|
||||
int *v, b, test[5] = { 1, 2, 3, 4, 5 };
|
||||
struct spsc_ub_queue q;
|
||||
|
||||
spsc_ub_queue_init(&q, 1, &memtype_hugepage); /** @todo change size from 1 in case of bounded queue impl */
|
||||
|
||||
spsc_ub_enqueue(&q, &test[0]);
|
||||
spsc_ub_enqueue(&q, &test[1]);
|
||||
|
||||
b = spsc_ub_dequeue(&q, (void**) &v);
|
||||
printf("dequeue 1 %d, return %d\n", *v, b);
|
||||
|
||||
b = spsc_ub_dequeue(&q, (void**) &v);
|
||||
printf("dequeue 2 %d, return %d\n", *v, b);
|
||||
|
||||
spsc_ub_enqueue(&q, &test[2]);
|
||||
spsc_ub_enqueue(&q, &test[3]);
|
||||
|
||||
b = spsc_ub_dequeue(&q, (void**) &v);
|
||||
printf("dequeue 3 %d, return %d\n", *v, b);
|
||||
|
||||
b = spsc_ub_dequeue(&q, (void**) &v);
|
||||
printf("dequeue 4 %d, return %d\n", *v, b);
|
||||
|
||||
b = spsc_ub_dequeue(&q, (void**) &v);
|
||||
printf("dequeue 5 %d, return %d\n", *v, b);
|
||||
|
||||
return 0;
|
||||
}
|
154
spsc_ub_test_default.c
Normal file
154
spsc_ub_test_default.c
Normal file
|
@ -0,0 +1,154 @@
|
|||
#ifdef __linux__
|
||||
#define _GNU_SOURCE
|
||||
#endif
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
#include <unistd.h>
|
||||
|
||||
/* There are not many compilers yet which support threads.h natively.
|
||||
* We will fallback to a drop-in replacement which is based on pthreads.h */
|
||||
#include "c11threads.h"
|
||||
|
||||
/* For thread_get_id() */
|
||||
#ifdef __MACH__
|
||||
#include <pthread.h>
|
||||
#elif defined(__linux__)
|
||||
#include <sys/types.h>
|
||||
#endif
|
||||
|
||||
#include "spsc_ub_queue.h"
|
||||
#include "memory.h"
|
||||
|
||||
size_t const thread_count = 1;
|
||||
size_t const batch_size = 10;
|
||||
size_t const iter_count = 20000000;
|
||||
//size_t const queue_size = 1 << 10;
|
||||
|
||||
int volatile g_start = 0;
|
||||
|
||||
/** Get thread id as integer
|
||||
* In contrast to pthread_t which is an opaque type */
|
||||
uint64_t thread_get_id()
|
||||
{
|
||||
#ifdef __MACH__
|
||||
uint64_t id;
|
||||
pthread_threadid_np(pthread_self(), &id);
|
||||
return id;
|
||||
#elif defined(SYS_gettid)
|
||||
return (int) syscall(SYS_gettid);
|
||||
#endif
|
||||
return -1;
|
||||
}
|
||||
|
||||
/** Get CPU timestep counter */
|
||||
__attribute__((always_inline)) static inline uint64_t rdtscp()
|
||||
{
|
||||
uint64_t tsc;
|
||||
|
||||
__asm__ ("rdtscp;"
|
||||
"shl $32, %%rdx;"
|
||||
"or %%rdx,%%rax"
|
||||
: "=a" (tsc)
|
||||
:
|
||||
: "%rcx", "%rdx", "memory");
|
||||
|
||||
return tsc;
|
||||
}
|
||||
|
||||
/** Sleep, do nothing */
|
||||
__attribute__((always_inline)) static inline void nop()
|
||||
{
|
||||
__asm__("rep nop;");
|
||||
}
|
||||
|
||||
int thread_func_push(void *ctx)
|
||||
{
|
||||
struct spsc_ub_queue *q = (struct spsc_ub_queue *) ctx;
|
||||
|
||||
srand((unsigned) time(0) + thread_get_id());
|
||||
size_t pause = rand() % 1000;
|
||||
|
||||
/* Wait for global start signal */
|
||||
while (g_start == 0)
|
||||
thrd_yield();
|
||||
|
||||
/* Wait for a random time */
|
||||
for (size_t i = 0; i != pause; i += 1)
|
||||
nop();
|
||||
|
||||
for (int iter = 0; iter != iter_count; ++iter) {
|
||||
for (size_t i = 0; i != batch_size; i += 1) {
|
||||
void *ptr = (void *) i;
|
||||
spsc_ub_queue_push(q, ptr); //FIXME
|
||||
}
|
||||
}
|
||||
thrd_yield();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int thread_func_pull(void *ctx)
|
||||
{
|
||||
struct spsc_ub_queue *q = (struct spsc_ub_queue *) ctx;
|
||||
|
||||
srand((unsigned) time(0) + thread_get_id());
|
||||
size_t pause = rand() % 1000;
|
||||
|
||||
/* Wait for global start signal */
|
||||
while (g_start == 0)
|
||||
thrd_yield();
|
||||
|
||||
/* Wait for a random time */
|
||||
for (size_t i = 0; i != pause; i += 1)
|
||||
nop();
|
||||
|
||||
for (int iter = 0; iter != iter_count; ++iter) {
|
||||
for (size_t i = 0; i != batch_size; i += 1) {
|
||||
void *ptr;
|
||||
while (!spsc_ub_queue_pull(q, &ptr)){
|
||||
continue;
|
||||
} //FIXME
|
||||
}
|
||||
}
|
||||
thrd_yield(); // queue empty
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
struct spsc_ub_queue queue;
|
||||
thrd_t threads[thread_count + 1];
|
||||
int ret;
|
||||
|
||||
spsc_ub_queue_init(&queue, batch_size * iter_count, &memtype_heap);
|
||||
|
||||
printf("spsc_ub_queue_init successful\n");
|
||||
//for (int i = 0; i != thread_count; ++i)
|
||||
thrd_create(&threads[0], thread_func_push, &queue);
|
||||
thrd_create(&threads[1], thread_func_pull, &queue);
|
||||
|
||||
sleep(1);
|
||||
|
||||
uint64_t start = rdtscp();
|
||||
g_start = 1;
|
||||
|
||||
//for (int i = 0; i != thread_count; ++i)
|
||||
thrd_join(threads[0], NULL);
|
||||
thrd_join(threads[1], NULL);
|
||||
|
||||
uint64_t end = rdtscp();
|
||||
|
||||
printf("cycles/op = %lu\n", (end - start) / (batch_size * iter_count * 2 * thread_count));
|
||||
|
||||
if (queue._tail->_next != NULL)
|
||||
printf("slots in use? There is something wrong with the test\n");
|
||||
|
||||
ret = spsc_ub_queue_destroy(&queue);
|
||||
if (ret)
|
||||
printf("Failed to destroy queue: %d", ret);
|
||||
|
||||
return 0;
|
||||
}
|
196
spsc_ub_test_fib.c
Normal file
196
spsc_ub_test_fib.c
Normal file
|
@ -0,0 +1,196 @@
|
|||
#ifdef __linux__
|
||||
#define _GNU_SOURCE
|
||||
#endif
|
||||
|
||||
#include <stdio.h>
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
#include <stddef.h>
|
||||
|
||||
#include "spsc_ub_queue.h"
|
||||
#include "memory.h"
|
||||
#include "c11threads.h"
|
||||
|
||||
/* Usage example */
|
||||
|
||||
#define N 20000000
|
||||
|
||||
int volatile g_start = 0;
|
||||
|
||||
/** Get thread id as integer
|
||||
* In contrast to pthread_t which is an opaque type */
|
||||
uint64_t thread_get_id()
|
||||
{
|
||||
#ifdef __MACH__
|
||||
uint64_t id;
|
||||
pthread_threadid_np(pthread_self(), &id);
|
||||
return id;
|
||||
#elif defined(SYS_gettid)
|
||||
return (int) syscall(SYS_gettid);
|
||||
#endif
|
||||
return -1;
|
||||
}
|
||||
|
||||
/** Get CPU timestep counter */
|
||||
__attribute__((always_inline)) static inline uint64_t rdtscp()
|
||||
{
|
||||
uint64_t tsc;
|
||||
/** @todo not recommended to use rdtsc on multicore machine */
|
||||
__asm__ ("rdtsc;"
|
||||
"shl $32, %%rdx;"
|
||||
"or %%rdx,%%rax"
|
||||
: "=a" (tsc)
|
||||
:
|
||||
: "%rcx", "%rdx", "memory");
|
||||
|
||||
return tsc;
|
||||
}
|
||||
|
||||
/** Sleep, do nothing */
|
||||
__attribute__((always_inline)) static inline void nop()
|
||||
{
|
||||
__asm__("rep nop;");
|
||||
}
|
||||
|
||||
/* Static global storage */
|
||||
int fibs[N];
|
||||
|
||||
int producer(void *ctx)
|
||||
{
|
||||
printf("producer\n"); //DELETEME
|
||||
struct spsc_ub_queue *q = (struct spsc_ub_queue *) ctx;
|
||||
|
||||
srand((unsigned) time(0) + thread_get_id());
|
||||
size_t pause = rand() % 1000;
|
||||
|
||||
/* Wait for global start signal */
|
||||
while (g_start == 0)
|
||||
thrd_yield();
|
||||
|
||||
/* Wait for a random time */
|
||||
for (size_t i = 0; i != pause; i += 1)
|
||||
nop();
|
||||
|
||||
/* Enqueue */
|
||||
for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) {
|
||||
fibs[count] = n1 + n2;
|
||||
|
||||
void *fibptr = (void *) &fibs[count];
|
||||
|
||||
if (!spsc_ub_queue_push(q, fibptr)) {
|
||||
printf("Queue push failed\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
n1 = n2; n2 = fibs[count];
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int consumer(void *ctx)
|
||||
{
|
||||
printf("consumer\n"); //DELETEME
|
||||
struct spsc_ub_queue *q = (struct spsc_ub_queue *) ctx;
|
||||
|
||||
srand((unsigned) time(0) + thread_get_id());
|
||||
size_t pause = rand() % 1000;
|
||||
|
||||
/* Wait for global start signal */
|
||||
while (g_start == 0)
|
||||
thrd_yield();
|
||||
|
||||
/* Wait for a random time */
|
||||
for (size_t i = 0; i != pause; i += 1)
|
||||
nop();
|
||||
|
||||
/* Dequeue */
|
||||
for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) {
|
||||
int fib = n1 + n2;
|
||||
int *pulled;
|
||||
|
||||
while (!spsc_ub_queue_pull(q, (void **) &pulled)) {
|
||||
//printf("Queue empty: %d\n", temp);
|
||||
//return -1;
|
||||
}
|
||||
|
||||
if (*pulled != fib) {
|
||||
printf("Pulled != fib\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
n1 = n2; n2 = fib;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_single_threaded(struct spsc_ub_queue *q)
|
||||
{
|
||||
int resp, resc;
|
||||
|
||||
resp = producer(q);
|
||||
if (resp)
|
||||
printf("Enqueuing failed\n");
|
||||
|
||||
resc = consumer(q);
|
||||
if (resc)
|
||||
printf("Consumer failed\n");
|
||||
|
||||
if (resc || resp)
|
||||
printf("Single Thread Test Failed\n");
|
||||
else
|
||||
printf("Single Thread Test Complete\n");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_multi_threaded(struct spsc_ub_queue *q)
|
||||
{
|
||||
thrd_t thrp, thrc;
|
||||
int resp, resc;
|
||||
|
||||
g_start = 0;
|
||||
|
||||
thrd_create(&thrp, consumer, q); /** @todo Why producer thread runs earlier? */
|
||||
thrd_create(&thrc, producer, q);
|
||||
|
||||
sleep(1);
|
||||
|
||||
uint64_t start_tsc_time, end_tsc_time;
|
||||
|
||||
start_tsc_time = rdtscp();
|
||||
g_start = 1;
|
||||
|
||||
thrd_join(thrp, &resp);
|
||||
thrd_join(thrc, &resc);
|
||||
|
||||
end_tsc_time = rdtscp();
|
||||
|
||||
if (resc || resp)
|
||||
printf("Queue Test failed\n");
|
||||
else
|
||||
printf("Two-thread Test Complete\n");
|
||||
|
||||
printf("cycles/op for rdtsc %lu\n", (end_tsc_time - start_tsc_time)/N);
|
||||
|
||||
if (q->_tail->_next != NULL)
|
||||
printf("slots in use? There is something wrong with the test\n");
|
||||
|
||||
int ret = spsc_ub_queue_destroy(q);
|
||||
if (ret)
|
||||
printf("Failed to destroy queue: %d\n", ret);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
struct spsc_ub_queue q;
|
||||
spsc_ub_queue_init(&q, 1<<20, &memtype_heap); /** @todo change size>1 in case of bounded queue impl. memtype_hugepage impl for un_spsc */
|
||||
|
||||
test_multi_threaded(&q);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Add table
Reference in a new issue