From b1890be20e5f39038d59810fb1f975ec1df9fd11 Mon Sep 17 00:00:00 2001 From: Umar Farooq Date: Sun, 18 Sep 2016 22:45:10 +0200 Subject: [PATCH] Add atomic functions, memory not freed at the end due to unknown reason --- spsc_queue.c | 25 +++++++------------------ spsc_queue.h | 2 -- spsc_test_fib.c | 22 ++++++++++++---------- 3 files changed, 19 insertions(+), 30 deletions(-) diff --git a/spsc_queue.c b/spsc_queue.c index 1abd2bc..610f53d 100644 --- a/spsc_queue.c +++ b/spsc_queue.c @@ -52,7 +52,7 @@ struct spsc_queue * spsc_queue_init(struct spsc_queue * q, size_t size, const st int spsc_queue_destroy(struct spsc_queue *q) { - const struct memtype mem = *(q->mem); /** @todo Memory is not being freed properly */ + const struct memtype mem = *(q->mem); /** @todo Memory is not being freed properly??? */ return memory_free(&mem, q, sizeof(struct spsc_queue) + ((q->capacity + 1) * sizeof(q->pointers[0]))); } @@ -63,6 +63,7 @@ int spsc_queue_get_many(struct spsc_queue *q, void **ptrs[], size_t cnt) if (cnt > filled_slots) cnt = filled_slots; + /**@todo Is atomic_load_explicit needed here for loading q->_head? */ for (int i = 0; i < cnt; i++) ptrs[i] = &(q->pointers[q->_head % (q->capacity + 1)]); @@ -78,8 +79,8 @@ int spsc_queue_push_many(struct spsc_queue *q, void *ptrs[], size_t cnt) cnt = free_slots; for (int i = 0; i < cnt; i++) { - q->pointers[q->_tail] = ptrs[i]; //--? or (q->_tail + i)%(q->capacity + 1) as index and update q->_tail at end of loop - q->_tail = (q->_tail + 1)%(q->capacity + 1); + q->pointers[q->_tail] = ptrs[i]; + atomic_store_explicit(&q->_tail, (q->_tail + 1)%(q->capacity + 1), memory_order_release); } return cnt; @@ -94,28 +95,16 @@ int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs[], size_t cnt) for (int i = 0; i < cnt; i++) { *ptrs[i] = q->pointers[q->_head]; - q->_head = (q->_head + 1)%(q->capacity + 1); + atomic_store_explicit(&q->_head, (q->_head + 1)%(q->capacity + 1), memory_order_release); } return cnt; } -int spsc_queue_available(struct spsc_queue *q) //--? make this func inline +int spsc_queue_available(struct spsc_queue *q) { - if (q->_tail < q->_head) + if (atomic_load_explicit(&q->_tail, memory_order_consume) < atomic_load_explicit(&q->_head, memory_order_consume)) return q->_head - q->_tail - 1; else return q->_head + (q->capacity - q->_tail); } -/** -int spsc_debug(struct spsc_queue *q) -{ - printf("q->_tail %d, q->_head %d, q->capacity %lu\n", q->_tail, q->_head, q->capacity); - int temp_count = q->_head; - for (int i = 0; i < q->capacity - spsc_queue_available(q); i++) { - printf("Value of stored pointer %d: %p\n", i, q->pointers[q->_head]); - temp_count = (temp_count + 1)%(q->capacity + 1); - } - - return 0; -} */ \ No newline at end of file diff --git a/spsc_queue.h b/spsc_queue.h index b44109e..b2ef939 100644 --- a/spsc_queue.h +++ b/spsc_queue.h @@ -114,6 +114,4 @@ static inline int spsc_queue_get(struct spsc_queue *q, void **ptr) return spsc_queue_get_many(q, &ptr, 1); } -//int spsc_debug(struct spsc_queue *q); - #endif /* _SPSC_QUEUE_H_ */ \ No newline at end of file diff --git a/spsc_test_fib.c b/spsc_test_fib.c index 0d2d53c..836e7c5 100644 --- a/spsc_test_fib.c +++ b/spsc_test_fib.c @@ -58,7 +58,7 @@ int fibs[N]; int producer(void *ctx) { - printf("producer\n"); //DELETEME + printf("producer\n"); struct spsc_queue *q = (struct spsc_queue *) ctx; srand((unsigned) time(0) + thread_get_id()); @@ -79,7 +79,7 @@ int producer(void *ctx) void *fibptr = (void *) &fibs[count]; if (!spsc_queue_push(q, fibptr)) { - printf("Queue push failed\n"); + printf("Queue push failed at count %lu\n", count); return -1; } @@ -91,7 +91,7 @@ int producer(void *ctx) int consumer(void *ctx) { - printf("consumer\n"); //DELETEME + printf("consumer\n"); struct spsc_queue *q = (struct spsc_queue *) ctx; srand((unsigned) time(0) + thread_get_id()); @@ -134,14 +134,14 @@ int test_single_threaded(struct spsc_queue *q) resp = producer(q); if (resp) - printf("Enqueuing failed"); + printf("Enqueuing failed\n"); resc = consumer(q); if (resc) - printf("Consumer failed"); + printf("Consumer failed\n"); if (resc || resp) - printf("Single Thread Test Failed"); + printf("Single Thread Test Failed\n"); else printf("Single Thread Test Complete\n"); @@ -150,6 +150,8 @@ int test_single_threaded(struct spsc_queue *q) int test_multi_threaded(struct spsc_queue *q) { + g_start = 0; + thrd_t thrp, thrc; int resp, resc; @@ -167,7 +169,7 @@ int test_multi_threaded(struct spsc_queue *q) uint64_t end = rdtscp(); if (resc || resp) - printf("Queue Test failed"); + printf("Queue Test failed\n"); else printf("Two-thread Test Complete\n"); @@ -182,14 +184,14 @@ int test_multi_threaded(struct spsc_queue *q) int main() { struct spsc_queue * q = NULL; - q = spsc_queue_init(q, 1<<20, &memtype_hugepage); /** @todo change size>1 in case of bounded queue impl. memtype_hugepage impl for un_spsc */ + q = spsc_queue_init(q, 1<<20, &memtype_hugepage); - //test_single_threaded(q); + //test_single_threaded(q); /** Single threaded test fails with N > queue size*/ test_multi_threaded(q); int ret = spsc_queue_destroy(q); if (ret) - printf("Failed to destroy queue: %d", ret); + printf("Failed to destroy queue: %d\n", ret); return 0; }