/** Unit tests for queue * * @author Steffen Vogel * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC * @license GNU General Public License (version 3) * * VILLASnode * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . *********************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #define SIZE (1 << 10) static struct queue q = { .state = STATE_DESTROYED }; #if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS > 0 static pthread_barrier_t barrier; #endif struct param { int volatile start; int thread_count; int queue_size; int iter_count; int batch_size; void * (*thread_func)(void *); struct queue queue; const struct memory_type *memory_type; }; /** Get thread id as integer * In contrast to pthread_t which is an opaque type */ #ifdef __linux__ #include #endif uint64_t thread_get_id() { #ifdef __MACH__ uint64_t id; pthread_threadid_np(pthread_self(), &id); return id; #elif defined(SYS_gettid) return (int) syscall(SYS_gettid); #endif return -1; } /** Sleep, do nothing */ __attribute__((always_inline)) static inline void nop() { __asm__("rep nop;"); } static void * producer(void *ctx) { int ret; struct param *p = ctx; srand((unsigned) time(0) + thread_get_id()); size_t nops = rand() % 1000; /** @todo Criterion cr_log() is broken for multi-threaded programs */ //cr_log_info("producer: tid = %lu", thread_get_id()); #ifdef __APPLE__ #define pthread_yield pthread_yield_np #endif /* Wait for global start signal */ while (p->start == 0) pthread_yield(); //cr_log_info("producer: wait for %zd nops", nops); /* Wait for a random time */ for (size_t i = 0; i != nops; i += 1) nop(); //cr_log_info("producer: start pushing"); /* Enqueue */ for (unsigned long count = 0; count < p->iter_count; count++) { do { ret = queue_push(&p->queue, (void *) count); pthread_yield(); } while (ret != 1); } //cr_log_info("producer: finished"); return NULL; } static void * consumer(void *ctx) { int ret; struct param *p = ctx; srand((unsigned) time(0) + thread_get_id()); size_t nops = rand() % 1000; //cr_log_info("consumer: tid = %lu", thread_get_id()); /* Wait for global start signal */ while (p->start == 0) pthread_yield(); //cr_log_info("consumer: wait for %zd nops", nops); /* Wait for a random time */ for (size_t i = 0; i != nops; i += 1) nop(); //cr_log_info("consumer: start pulling"); /* Dequeue */ for (unsigned long count = 0; count < p->iter_count; count++) { void *ptr; do { ret = queue_pull(&p->queue, &ptr); } while (ret != 1); //cr_log_info("consumer: %lu\n", count); //cr_assert_eq((intptr_t) ptr, count); } //cr_log_info("consumer: finished"); return NULL; } #if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS > 0 void * producer_consumer(void *ctx) { struct param *p = ctx; srand((unsigned) time(0) + thread_get_id()); size_t nops = rand() % 1000; /* Wait for global start signal */ while (p->start == 0) pthread_yield(); /* Wait for a random time */ for (size_t i = 0; i != nops; i += 1) nop(); for (int iter = 0; iter < p->iter_count; ++iter) { pthread_barrier_wait(&barrier); for (size_t i = 0; i < p->batch_size; i++) { void *ptr = (void *) (iter * p->batch_size + i); while (!queue_push(&p->queue, ptr)) pthread_yield(); /* queue full, let other threads proceed */ } for (size_t i = 0; i < p->batch_size; i++) { void *ptr; while (!queue_pull(&p->queue, &ptr)) pthread_yield(); /* queue empty, let other threads proceed */ } } return 0; } void * producer_consumer_many(void *ctx) { struct param *p = ctx; srand((unsigned) time(0) + thread_get_id()); size_t nops = rand() % 1000; /* Wait for global start signal */ while (p->start == 0) pthread_yield(); /* Wait for a random time */ for (size_t i = 0; i != nops; i += 1) nop(); void *ptrs[p->batch_size]; for (int iter = 0; iter < p->iter_count; ++iter) { for (size_t i = 0; i < p->batch_size; i++) ptrs[i] = (void *) (iter * p->batch_size + i); pthread_barrier_wait(&barrier); int pushed = 0; do { pushed += queue_push_many(&p->queue, &ptrs[pushed], p->batch_size - pushed); if (pushed != p->batch_size) pthread_yield(); /* queue full, let other threads proceed */ } while (pushed < p->batch_size); int pulled = 0; do { pulled += queue_pull_many(&p->queue, &ptrs[pulled], p->batch_size - pulled); if (pulled != p->batch_size) pthread_yield(); /* queue empty, let other threads proceed */ } while (pulled < p->batch_size); } return 0; } #endif /* _POSIX_BARRIERS */ Test(queue, single_threaded) { int ret; struct param p = { .iter_count = 1 << 8, .queue_size = 1 << 10, .start = 1 /* we start immeadiatly */ }; ret = queue_init(&p.queue, p.queue_size, &memory_type_heap); cr_assert_eq(ret, 0, "Failed to create queue"); producer(&p); consumer(&p); cr_assert_eq(queue_available(&q), 0); ret = queue_destroy(&p.queue); cr_assert_eq(ret, 0, "Failed to create queue"); } #if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS > 0 ParameterizedTestParameters(queue, multi_threaded) { static struct param params[] = { { .iter_count = 1 << 12, .queue_size = 1 << 9, .thread_count = 32, .thread_func = producer_consumer_many, .batch_size = 10, .memory_type = &memory_type_heap }, { .iter_count = 1 << 8, .queue_size = 1 << 9, .thread_count = 4, .thread_func = producer_consumer_many, .batch_size = 100, .memory_type = &memory_type_heap }, { .iter_count = 1 << 16, .queue_size = 1 << 14, .thread_count = 16, .thread_func = producer_consumer_many, .batch_size = 100, .memory_type = &memory_type_heap }, { .iter_count = 1 << 8, .queue_size = 1 << 9, .thread_count = 4, .thread_func = producer_consumer_many, .batch_size = 10, .memory_type = &memory_type_heap }, { .iter_count = 1 << 16, .queue_size = 1 << 9, .thread_count = 16, .thread_func = producer_consumer, .batch_size = 10, .memory_type = &memory_hugepage } }; return cr_make_param_array(struct param, params, ARRAY_LEN(params)); } ParameterizedTest(struct param *p, queue, multi_threaded, .timeout = 20) { int ret, cycpop; pthread_t threads[p->thread_count]; p->start = 0; ret = queue_init(&p->queue, p->queue_size, &memory_type_heap); cr_assert_eq(ret, 0, "Failed to create queue"); uint64_t start_tsc_time, end_tsc_time; pthread_barrier_init(&barrier, NULL, p->thread_count); for (int i = 0; i < p->thread_count; ++i) pthread_create(&threads[i], NULL, p->thread_func, p); sleep(0.2); start_tsc_time = rdtsc(); p->start = 1; for (int i = 0; i < p->thread_count; ++i) pthread_join(threads[i], NULL); end_tsc_time = rdtsc(); cycpop = (end_tsc_time - start_tsc_time) / p->iter_count; if (cycpop < 400) cr_log_info("cycles/op: %u\n", cycpop); else cr_log_warn("cycles/op are very high (%u). Are you running on a hypervisor?\n", cycpop); ret = queue_available(&q); cr_assert_eq(ret, 0); ret = queue_destroy(&p->queue); cr_assert_eq(ret, 0, "Failed to destroy queue"); ret = pthread_barrier_destroy(&barrier); cr_assert_eq(ret, 0, "Failed to destroy barrier"); } #endif /* _POSIX_BARRIERS */ Test(queue, init_destroy) { int ret; struct queue q = { .state = STATE_DESTROYED }; ret = queue_init(&q, 1024, &memory_type_heap); cr_assert_eq(ret, 0); /* Should succeed */ ret = queue_destroy(&q); cr_assert_eq(ret, 0); /* Should succeed */ }