Change MPMC queue to compile on Linux

This commit is contained in:
Umar Farooq 2016-09-09 18:38:17 +02:00
parent 77bcb86ca3
commit 8b9a5680bd
6 changed files with 23 additions and 132 deletions

View file

@ -1,5 +1,5 @@
#TARGETS = spsc_ub_test mpmc_test spsc_test
TARGETS = spsc_ub_test
#TARGETS = spsc_ub_test_fib spsc_ub_test_default mpmc_test spsc_test
TARGETS = spsc_ub_test_fib spsc_ub_test_default mpmc_test
CFLAGS = -Wall -std=c11
ifeq ($(shell uname), Linux)
LIBS = -pthread
@ -17,11 +17,14 @@ endif
all: $(TARGETS)
spsc_ub_test: spsc_ub_test.o spsc_ub_queue.o memory.o
spsc_ub_test_fib: spsc_ub_test_fib.o spsc_ub_queue.o memory.o
$(CC) $^ -Wall $(LIBS) -o $@
#mpmc_test: mpmc_test.o mpmc_queue.o memory.o
# $(CC) $^ -Wall $(LIBS) -o $@
spsc_ub_test_default: spsc_ub_test_default.o spsc_ub_queue.o memory.o
$(CC) $^ -Wall $(LIBS) -o $@
mpmc_test: mpmc_test.o mpmc_queue.o memory.o
$(CC) $^ -Wall $(LIBS) -o $@
#spsc_test: spsc_test.o spsc_queue.o memory.o
# $(CC) $^ -Wall $(LIBS) -o $@

View file

@ -39,8 +39,9 @@
#include "memory.h"
static size_t const cacheline_size = 64;
typedef char cacheline_pad_t[cacheline_size];
#define CACHELINE_SIZE 64
typedef char cacheline_pad_t[CACHELINE_SIZE];
struct mpmc_queue {
cacheline_pad_t _pad0; /**< Shared area: all threads read */

View file

@ -1,3 +1,7 @@
#ifdef __linux__
#define _GNU_SOURCE
#endif
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
@ -32,9 +36,10 @@ uint64_t thread_get_id()
uint64_t id;
pthread_threadid_np(pthread_self(), &id);
return id;
#elif defined(__linux__)
return (int) gettid();
#elif defined(SYS_gettid)
return (int) syscall(SYS_gettid);
#endif
return -1;
}
/** Get CPU timestep counter */
@ -111,7 +116,7 @@ int main()
uint64_t end = rdtscp();
printf("cycles/op = %llu\n", (end - start) / (batch_size * iter_count * 2 * thread_count));
printf("cycles/op = %lu\n", (end - start) / (batch_size * iter_count * 2 * thread_count));
size_t used = mpmc_queue_available(&queue);
if (used > 0)

View file

@ -79,7 +79,7 @@ struct node* spsc_ub_alloc_node(struct spsc_ub_queue* q)
return (struct node*) memory_alloc(q->mem, sizeof(struct node));
}
int spsc_ub_push(struct spsc_ub_queue* q, void * v)
int spsc_ub_queue_push(struct spsc_ub_queue* q, void * v)
{
struct node* n = spsc_ub_alloc_node(q);
@ -94,7 +94,7 @@ int spsc_ub_push(struct spsc_ub_queue* q, void * v)
return 0;
}
int spsc_ub_pull(struct spsc_ub_queue* q, void** v)
int spsc_ub_queue_pull(struct spsc_ub_queue* q, void** v)
{
if (atomic_load_explicit(&(q->_tail->_next), memory_order_consume)) {
*v = q->_tail->_next->_value;

View file

@ -80,9 +80,9 @@ int spsc_ub_queue_destroy(struct spsc_ub_queue *q);
struct node * spsc_ub_alloc_node(struct spsc_ub_queue *q);
/** Push a value to unbounded SPSC queue */
int spsc_ub_push(struct spsc_ub_queue *q, void *v);
int spsc_ub_queue_push(struct spsc_ub_queue *q, void *v);
/** Pull a value from unbounded SPSC queue */
int spsc_ub_pull(struct spsc_ub_queue *q, void **v);
int spsc_ub_queue_pull(struct spsc_ub_queue *q, void **v);
#endif /* _SPSC_UB_QUEUE_H_ */

View file

@ -1,118 +0,0 @@
#ifdef __linux__
#define _GNU_SOURCE
#endif
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <stddef.h>
#include "spsc_ub_queue.h"
#include "memory.h"
#include "c11threads.h"
/* Usage example */
#define N 20000000
/* Static global storage */
int fibs[N];
int producer(void *ctx)
{
printf("producer\n"); //DELETEME
struct spsc_ub_queue *q = (struct spsc_ub_queue *) ctx;
/* Enqueue */
for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) {
fibs[count] = n1 + n2;
void *fibptr = (void *) &fibs[count];
if (spsc_ub_push(q, fibptr)) {
printf("Queue push failed\n");
return -1;
}
n1 = n2; n2 = fibs[count];
}
return 0;
}
int consumer(void *ctx)
{
printf("consumer\n"); //DELETEME
struct spsc_ub_queue *q = (struct spsc_ub_queue *) ctx;
/* Dequeue */
for (unsigned long count = 0, n1 = 0, n2 = 1; count < N; count++) {
int fib = n1 + n2;
int *pulled;
while (spsc_ub_pull(q, (void **) &pulled)) {
//printf("Queue empty: %d\n", temp);
//return -1;
}
if (*pulled != fib) {
printf("Pulled != fib\n");
return -1;
}
n1 = n2; n2 = fib;
}
return 0;
}
int test_single_threaded(struct spsc_ub_queue *q)
{
int resp, resc;
resp = producer(q);
if (resp)
printf("Enqueuing failed");
resc = consumer(q);
if (resc)
printf("Consumer failed");
if (resc || resp)
printf("Single Thread Test Failed");
else
printf("Single Thread Test Complete\n");
return 0;
}
int test_multi_threaded(struct spsc_ub_queue *q)
{
thrd_t thrp, thrc;
int resp, resc;
thrd_create(&thrp, producer, q); /** @todo Why producer thread runs earlier? */
thrd_create(&thrc, consumer, q);
thrd_join(thrp, &resp);
thrd_join(thrc, &resc);
if (resc || resp)
printf("Queue Test failed");
else
printf("Two-thread Test Complete\n");
return 0;
}
int main()
{
struct spsc_ub_queue q;
spsc_ub_queue_init(&q, 1, &memtype_heap); /** @todo change size>1 in case of bounded queue impl. memtype_hugepage impl for un_spsc */
test_single_threaded(&q);
test_multi_threaded(&q);
return 0;
}