Add atomic functions, memory not freed at the end due to unknown reason
This commit is contained in:
parent
064ebc6fbe
commit
b1890be20e
3 changed files with 19 additions and 30 deletions
25
spsc_queue.c
25
spsc_queue.c
|
@ -52,7 +52,7 @@ struct spsc_queue * spsc_queue_init(struct spsc_queue * q, size_t size, const st
|
|||
|
||||
int spsc_queue_destroy(struct spsc_queue *q)
|
||||
{
|
||||
const struct memtype mem = *(q->mem); /** @todo Memory is not being freed properly */
|
||||
const struct memtype mem = *(q->mem); /** @todo Memory is not being freed properly??? */
|
||||
return memory_free(&mem, q, sizeof(struct spsc_queue) + ((q->capacity + 1) * sizeof(q->pointers[0])));
|
||||
}
|
||||
|
||||
|
@ -63,6 +63,7 @@ int spsc_queue_get_many(struct spsc_queue *q, void **ptrs[], size_t cnt)
|
|||
if (cnt > filled_slots)
|
||||
cnt = filled_slots;
|
||||
|
||||
/**@todo Is atomic_load_explicit needed here for loading q->_head? */
|
||||
for (int i = 0; i < cnt; i++)
|
||||
ptrs[i] = &(q->pointers[q->_head % (q->capacity + 1)]);
|
||||
|
||||
|
@ -78,8 +79,8 @@ int spsc_queue_push_many(struct spsc_queue *q, void *ptrs[], size_t cnt)
|
|||
cnt = free_slots;
|
||||
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
q->pointers[q->_tail] = ptrs[i]; //--? or (q->_tail + i)%(q->capacity + 1) as index and update q->_tail at end of loop
|
||||
q->_tail = (q->_tail + 1)%(q->capacity + 1);
|
||||
q->pointers[q->_tail] = ptrs[i];
|
||||
atomic_store_explicit(&q->_tail, (q->_tail + 1)%(q->capacity + 1), memory_order_release);
|
||||
}
|
||||
|
||||
return cnt;
|
||||
|
@ -94,28 +95,16 @@ int spsc_queue_pull_many(struct spsc_queue *q, void **ptrs[], size_t cnt)
|
|||
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
*ptrs[i] = q->pointers[q->_head];
|
||||
q->_head = (q->_head + 1)%(q->capacity + 1);
|
||||
atomic_store_explicit(&q->_head, (q->_head + 1)%(q->capacity + 1), memory_order_release);
|
||||
}
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
||||
int spsc_queue_available(struct spsc_queue *q) //--? make this func inline
|
||||
int spsc_queue_available(struct spsc_queue *q)
|
||||
{
|
||||
if (q->_tail < q->_head)
|
||||
if (atomic_load_explicit(&q->_tail, memory_order_consume) < atomic_load_explicit(&q->_head, memory_order_consume))
|
||||
return q->_head - q->_tail - 1;
|
||||
else
|
||||
return q->_head + (q->capacity - q->_tail);
|
||||
}
|
||||
/**
|
||||
int spsc_debug(struct spsc_queue *q)
|
||||
{
|
||||
printf("q->_tail %d, q->_head %d, q->capacity %lu\n", q->_tail, q->_head, q->capacity);
|
||||
int temp_count = q->_head;
|
||||
for (int i = 0; i < q->capacity - spsc_queue_available(q); i++) {
|
||||
printf("Value of stored pointer %d: %p\n", i, q->pointers[q->_head]);
|
||||
temp_count = (temp_count + 1)%(q->capacity + 1);
|
||||
}
|
||||
|
||||
return 0;
|
||||
} */
|
|
@ -114,6 +114,4 @@ static inline int spsc_queue_get(struct spsc_queue *q, void **ptr)
|
|||
return spsc_queue_get_many(q, &ptr, 1);
|
||||
}
|
||||
|
||||
//int spsc_debug(struct spsc_queue *q);
|
||||
|
||||
#endif /* _SPSC_QUEUE_H_ */
|
|
@ -58,7 +58,7 @@ int fibs[N];
|
|||
|
||||
int producer(void *ctx)
|
||||
{
|
||||
printf("producer\n"); //DELETEME
|
||||
printf("producer\n");
|
||||
struct spsc_queue *q = (struct spsc_queue *) ctx;
|
||||
|
||||
srand((unsigned) time(0) + thread_get_id());
|
||||
|
@ -79,7 +79,7 @@ int producer(void *ctx)
|
|||
void *fibptr = (void *) &fibs[count];
|
||||
|
||||
if (!spsc_queue_push(q, fibptr)) {
|
||||
printf("Queue push failed\n");
|
||||
printf("Queue push failed at count %lu\n", count);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -91,7 +91,7 @@ int producer(void *ctx)
|
|||
|
||||
int consumer(void *ctx)
|
||||
{
|
||||
printf("consumer\n"); //DELETEME
|
||||
printf("consumer\n");
|
||||
struct spsc_queue *q = (struct spsc_queue *) ctx;
|
||||
|
||||
srand((unsigned) time(0) + thread_get_id());
|
||||
|
@ -134,14 +134,14 @@ int test_single_threaded(struct spsc_queue *q)
|
|||
|
||||
resp = producer(q);
|
||||
if (resp)
|
||||
printf("Enqueuing failed");
|
||||
printf("Enqueuing failed\n");
|
||||
|
||||
resc = consumer(q);
|
||||
if (resc)
|
||||
printf("Consumer failed");
|
||||
printf("Consumer failed\n");
|
||||
|
||||
if (resc || resp)
|
||||
printf("Single Thread Test Failed");
|
||||
printf("Single Thread Test Failed\n");
|
||||
else
|
||||
printf("Single Thread Test Complete\n");
|
||||
|
||||
|
@ -150,6 +150,8 @@ int test_single_threaded(struct spsc_queue *q)
|
|||
|
||||
int test_multi_threaded(struct spsc_queue *q)
|
||||
{
|
||||
g_start = 0;
|
||||
|
||||
thrd_t thrp, thrc;
|
||||
int resp, resc;
|
||||
|
||||
|
@ -167,7 +169,7 @@ int test_multi_threaded(struct spsc_queue *q)
|
|||
uint64_t end = rdtscp();
|
||||
|
||||
if (resc || resp)
|
||||
printf("Queue Test failed");
|
||||
printf("Queue Test failed\n");
|
||||
else
|
||||
printf("Two-thread Test Complete\n");
|
||||
|
||||
|
@ -182,14 +184,14 @@ int test_multi_threaded(struct spsc_queue *q)
|
|||
int main()
|
||||
{
|
||||
struct spsc_queue * q = NULL;
|
||||
q = spsc_queue_init(q, 1<<20, &memtype_hugepage); /** @todo change size>1 in case of bounded queue impl. memtype_hugepage impl for un_spsc */
|
||||
q = spsc_queue_init(q, 1<<20, &memtype_hugepage);
|
||||
|
||||
//test_single_threaded(q);
|
||||
//test_single_threaded(q); /** Single threaded test fails with N > queue size*/
|
||||
test_multi_threaded(q);
|
||||
|
||||
int ret = spsc_queue_destroy(q);
|
||||
if (ret)
|
||||
printf("Failed to destroy queue: %d", ret);
|
||||
printf("Failed to destroy queue: %d\n", ret);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue