1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00
VILLASnode/tests/unit/queue.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

326 lines
8.4 KiB
C++
Raw Permalink Normal View History

/* Unit tests for queue.
*
2022-03-15 09:18:01 -04:00
* Author: Steffen Vogel <post@steffenvogel.de>
2022-03-15 09:28:57 -04:00
* SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
2022-07-04 18:20:03 +02:00
* SPDX-License-Identifier: Apache-2.0
*/
#include <cstdint>
#include <cstdlib>
#include <ctime>
2016-10-19 01:25:52 -04:00
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
2016-10-06 17:56:55 -04:00
#include <criterion/criterion.h>
2016-10-19 01:25:52 -04:00
#include <criterion/parameterized.h>
2016-10-06 17:56:55 -04:00
#include <villas/log.hpp>
#include <villas/node/memory.hpp>
2018-03-26 12:50:15 +02:00
#include <villas/queue.h>
#include <villas/tsc.hpp>
2018-10-20 14:24:51 +02:00
#include <villas/utils.hpp>
using namespace villas;
using namespace villas::node;
extern void init_memory();
2016-10-06 17:56:55 -04:00
2016-10-19 01:25:52 -04:00
#define SIZE (1 << 10)
static struct CQueue q;
2016-10-19 01:25:52 -04:00
#if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS > 0
2017-03-29 04:26:16 +02:00
static pthread_barrier_t barrier;
#endif
2017-03-29 04:26:16 +02:00
2016-10-19 01:25:52 -04:00
struct param {
int iter_count;
2018-08-23 13:32:44 +02:00
int queue_size;
int thread_count;
2018-10-21 13:00:50 +01:00
bool many;
2018-08-23 13:32:44 +02:00
int batch_size;
struct memory::Type *mt;
2018-08-23 13:32:44 +02:00
volatile int start;
struct CQueue queue;
2016-10-19 01:25:52 -04:00
};
/* Get thread id as integer
* In contrast to pthread_t which is an opaque type */
#ifdef __linux__
#include <sys/syscall.h>
#endif
uint64_t thread_get_id() {
#ifdef __MACH__
uint64_t id;
pthread_threadid_np(pthread_self(), &id);
return id;
#elif defined(SYS_gettid)
return (int)syscall(SYS_gettid);
#endif
return -1;
}
// Sleep, do nothing
__attribute__((always_inline)) static inline void nop() { __asm__("rep nop;"); }
static void *producer(void *ctx) {
int ret;
2018-08-23 13:32:44 +02:00
struct param *p = (struct param *)ctx;
2016-10-19 01:25:52 -04:00
srand((unsigned)time(0) + thread_get_id());
size_t nops = rand() % 1000;
2016-10-19 01:25:52 -04:00
// Wait for global start signal
while (p->start == 0)
sched_yield();
2016-10-19 01:25:52 -04:00
// Wait for a random time
for (size_t i = 0; i != nops; i += 1)
nop();
2016-10-19 01:25:52 -04:00
// Enqueue
2018-08-23 13:32:44 +02:00
for (intptr_t count = 0; count < p->iter_count; count++) {
2016-10-19 01:25:52 -04:00
do {
ret = queue_push(&p->queue, (void *)count);
sched_yield();
2016-10-19 01:25:52 -04:00
} while (ret != 1);
}
2018-08-27 11:21:57 +02:00
return nullptr;
2016-10-19 01:25:52 -04:00
}
static void *consumer(void *ctx) {
int ret;
2018-08-23 13:32:44 +02:00
struct param *p = (struct param *)ctx;
2016-10-19 01:25:52 -04:00
srand((unsigned)time(0) + thread_get_id());
size_t nops = rand() % 1000;
2016-10-19 01:25:52 -04:00
// Wait for global start signal
while (p->start == 0)
sched_yield();
2016-10-19 01:25:52 -04:00
// Wait for a random time
for (size_t i = 0; i != nops; i += 1)
nop();
2016-10-19 01:25:52 -04:00
// Dequeue
2018-08-23 13:32:44 +02:00
for (intptr_t count = 0; count < p->iter_count; count++) {
intptr_t ptr;
2016-10-19 01:25:52 -04:00
do {
2018-08-23 13:32:44 +02:00
ret = queue_pull(&p->queue, (void **)&ptr);
2016-10-19 01:25:52 -04:00
} while (ret != 1);
2018-10-20 14:24:51 +02:00
//logger->info("consumer: {}", count);
2016-10-19 01:25:52 -04:00
//cr_assert_eq((intptr_t) ptr, count);
}
2018-08-27 11:21:57 +02:00
return nullptr;
2016-10-19 01:25:52 -04:00
}
#if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS > 0
2016-10-19 01:25:52 -04:00
void *producer_consumer(void *ctx) {
2018-08-23 13:32:44 +02:00
struct param *p = (struct param *)ctx;
2016-10-19 01:25:52 -04:00
srand((unsigned)time(0) + thread_get_id());
size_t nops = rand() % 1000;
2016-10-19 01:25:52 -04:00
// Wait for global start signal
while (p->start == 0)
sched_yield();
2016-10-19 01:25:52 -04:00
// Wait for a random time
for (size_t i = 0; i != nops; i += 1)
nop();
2016-10-19 01:25:52 -04:00
for (int iter = 0; iter < p->iter_count; ++iter) {
2017-03-29 04:26:16 +02:00
pthread_barrier_wait(&barrier);
2018-08-23 13:32:44 +02:00
for (intptr_t i = 0; i < p->batch_size; i++) {
2016-10-19 01:25:52 -04:00
void *ptr = (void *)(iter * p->batch_size + i);
while (!queue_push(&p->queue, ptr))
sched_yield(); // queue full, let other threads proceed
2016-10-19 01:25:52 -04:00
}
2018-08-23 13:32:44 +02:00
for (intptr_t i = 0; i < p->batch_size; i++) {
2016-10-19 01:25:52 -04:00
void *ptr;
while (!queue_pull(&p->queue, &ptr))
sched_yield(); // queue empty, let other threads proceed
2016-10-19 01:25:52 -04:00
}
}
2016-10-19 01:25:52 -04:00
return 0;
}
void *producer_consumer_many(void *ctx) {
2018-08-23 13:32:44 +02:00
struct param *p = (struct param *)ctx;
2016-10-19 01:25:52 -04:00
srand((unsigned)time(0) + thread_get_id());
size_t nops = rand() % 1000;
// Wait for global start signal
while (p->start == 0)
sched_yield();
2016-10-19 01:25:52 -04:00
// Wait for a random time
for (size_t i = 0; i != nops; i += 1)
nop();
2016-10-19 01:25:52 -04:00
void *ptrs[p->batch_size];
for (int iter = 0; iter < p->iter_count; ++iter) {
2018-08-23 13:32:44 +02:00
for (intptr_t i = 0; i < p->batch_size; i++)
2016-10-19 01:25:52 -04:00
ptrs[i] = (void *)(iter * p->batch_size + i);
2017-03-29 04:26:16 +02:00
pthread_barrier_wait(&barrier);
2016-10-19 01:25:52 -04:00
int pushed = 0;
do {
pushed +=
queue_push_many(&p->queue, &ptrs[pushed], p->batch_size - pushed);
if (pushed != p->batch_size)
sched_yield(); // queue full, let other threads proceed
2016-10-19 01:25:52 -04:00
} while (pushed < p->batch_size);
int pulled = 0;
do {
pulled +=
queue_pull_many(&p->queue, &ptrs[pulled], p->batch_size - pulled);
if (pulled != p->batch_size)
sched_yield(); // queue empty, let other threads proceed
2016-10-19 01:25:52 -04:00
} while (pulled < p->batch_size);
}
return 0;
}
#endif // _POSIX_BARRIERS
2016-10-19 01:25:52 -04:00
Test(queue, single_threaded, .init = init_memory) {
2016-10-19 01:25:52 -04:00
int ret;
struct param p;
p.iter_count = 1 << 8;
p.queue_size = 1 << 10;
p.start = 1; // we start immeadiatly
ret = queue_init(&p.queue, p.queue_size, &memory::heap);
2016-10-19 01:25:52 -04:00
cr_assert_eq(ret, 0, "Failed to create queue");
2016-10-19 01:25:52 -04:00
producer(&p);
consumer(&p);
2016-10-19 01:25:52 -04:00
cr_assert_eq(queue_available(&q), 0);
2016-10-19 01:25:52 -04:00
ret = queue_destroy(&p.queue);
cr_assert_eq(ret, 0, "Failed to create queue");
}
#if defined(_POSIX_BARRIERS) && _POSIX_BARRIERS > 0
2016-10-19 01:25:52 -04:00
ParameterizedTestParameters(queue, multi_threaded) {
static struct param params[] = {{.iter_count = 1 << 12,
.queue_size = 1 << 9,
.thread_count = 32,
2018-10-21 13:00:50 +01:00
.many = true,
2016-10-19 01:25:52 -04:00
.batch_size = 10,
.mt = &memory::heap},
2016-10-19 01:25:52 -04:00
{.iter_count = 1 << 8,
.queue_size = 1 << 9,
.thread_count = 4,
2018-10-21 13:00:50 +01:00
.many = true,
2016-10-19 01:25:52 -04:00
.batch_size = 100,
.mt = &memory::heap},
2016-10-19 01:25:52 -04:00
{.iter_count = 1 << 16,
2017-03-29 04:26:16 +02:00
.queue_size = 1 << 14,
2016-10-19 01:25:52 -04:00
.thread_count = 16,
2018-10-21 13:00:50 +01:00
.many = true,
2016-10-19 01:25:52 -04:00
.batch_size = 100,
.mt = &memory::heap},
2016-10-19 01:25:52 -04:00
{.iter_count = 1 << 8,
.queue_size = 1 << 9,
.thread_count = 4,
2018-10-21 13:00:50 +01:00
.many = true,
2016-10-19 01:25:52 -04:00
.batch_size = 10,
.mt = &memory::heap},
2016-10-19 01:25:52 -04:00
{.iter_count = 1 << 16,
.queue_size = 1 << 9,
.thread_count = 16,
2018-10-21 13:00:50 +01:00
.many = false,
2016-10-19 01:25:52 -04:00
.batch_size = 10,
.mt = &memory::mmap_hugetlb}};
2016-10-19 01:25:52 -04:00
return cr_make_param_array(struct param, params, ARRAY_LEN(params));
}
ParameterizedTest(struct param *p, queue, multi_threaded, .timeout = 20,
.init = init_memory) {
2016-10-19 01:25:52 -04:00
int ret, cycpop;
struct Tsc tsc;
2018-10-20 14:24:51 +02:00
Logger logger = logging.get("test:queue:multi_threaded");
if (!utils::isPrivileged() && p->mt == &memory::mmap_hugetlb)
cr_skip_test("Skipping memory_mmap_hugetlb tests allocatpr because we are "
"running in an unprivileged environment.");
2016-10-19 01:25:52 -04:00
pthread_t threads[p->thread_count];
2016-10-19 01:25:52 -04:00
p->start = 0;
2019-10-26 13:28:29 +02:00
ret = queue_init(&p->queue, p->queue_size, p->mt);
2016-10-19 01:25:52 -04:00
cr_assert_eq(ret, 0, "Failed to create queue");
uint64_t start_tsc_time, end_tsc_time;
2018-08-27 11:21:57 +02:00
pthread_barrier_init(&barrier, nullptr, p->thread_count);
2016-10-19 01:25:52 -04:00
for (int i = 0; i < p->thread_count; ++i)
2018-08-27 11:21:57 +02:00
pthread_create(&threads[i], nullptr,
p->many ? producer_consumer_many : producer_consumer, p);
2016-10-19 01:25:52 -04:00
sleep(0.2);
2016-10-19 01:25:52 -04:00
ret = tsc_init(&tsc);
cr_assert(!ret);
start_tsc_time = tsc_now(&tsc);
2016-10-19 01:25:52 -04:00
p->start = 1;
for (int i = 0; i < p->thread_count; ++i)
2018-08-27 11:21:57 +02:00
pthread_join(threads[i], nullptr);
end_tsc_time = tsc_now(&tsc);
2016-10-19 01:25:52 -04:00
cycpop = (end_tsc_time - start_tsc_time) / p->iter_count;
2016-10-19 01:25:52 -04:00
if (cycpop < 400)
2021-02-16 14:15:14 +01:00
logger->debug("Cycles/op: {}", cycpop);
2016-10-19 01:25:52 -04:00
else
2021-02-16 14:15:14 +01:00
logger->warn(
"Cycles/op are very high ({}). Are you running on a hypervisor?",
cycpop);
2016-10-19 01:25:52 -04:00
2017-03-25 21:23:48 +01:00
ret = queue_available(&q);
cr_assert_eq(ret, 0);
2016-10-19 01:25:52 -04:00
ret = queue_destroy(&p->queue);
2017-03-25 21:23:48 +01:00
cr_assert_eq(ret, 0, "Failed to destroy queue");
ret = pthread_barrier_destroy(&barrier);
cr_assert_eq(ret, 0, "Failed to destroy barrier");
2016-10-19 01:25:52 -04:00
}
#endif // _POSIX_BARRIERS
2016-10-19 01:25:52 -04:00
Test(queue, init_destroy, .init = init_memory) {
2016-10-19 01:25:52 -04:00
int ret;
struct CQueue q;
ret = queue_init(&q, 1024, &memory::heap);
2016-10-19 01:25:52 -04:00
cr_assert_eq(ret, 0); // Should succeed
2016-10-19 01:25:52 -04:00
ret = queue_destroy(&q);
cr_assert_eq(ret, 0); // Should succeed
}