diff --git a/tests/unit/queue.c b/tests/unit/queue.c index 5034b9f40..37b90d4c8 100644 --- a/tests/unit/queue.c +++ b/tests/unit/queue.c @@ -310,7 +310,7 @@ ParameterizedTest(struct param *p, queue, multi_threaded, .timeout = 20) for (int i = 0; i < p->thread_count; ++i) pthread_create(&threads[i], NULL, p->thread_func, p); - sleep(1); + sleep(0.2); start_tsc_time = rdtsc(); p->start = 1; @@ -318,8 +318,6 @@ ParameterizedTest(struct param *p, queue, multi_threaded, .timeout = 20) for (int i = 0; i < p->thread_count; ++i) pthread_join(threads[i], NULL); - pthread_barrier_destroy(&barrier); - end_tsc_time = rdtsc(); cycpop = (end_tsc_time - start_tsc_time) / p->iter_count; @@ -333,6 +331,9 @@ ParameterizedTest(struct param *p, queue, multi_threaded, .timeout = 20) ret = queue_destroy(&p->queue); cr_assert_eq(ret, 0, "Failed to destroy queue"); + + ret = pthread_barrier_destroy(&barrier); + cr_assert_eq(ret, 0, "Failed to destroy barrier"); } Test(queue, init_destroy) diff --git a/tests/unit/queue_signalled.c b/tests/unit/queue_signalled.c new file mode 100644 index 000000000..4d79fdb00 --- /dev/null +++ b/tests/unit/queue_signalled.c @@ -0,0 +1,161 @@ +/** Unit tests for queue_signalled + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include + +#include +#include + +#include "utils.h" +#include "memory.h" +#include "queue_signalled.h" + +#define NUM_ELEM 1000 + +struct param { + int flags; + void * (*consumer_func)(void *); +}; + +static void * producer(void * ctx) +{ + int ret; + struct queue_signalled *q = ctx; + + for (intptr_t i = 0; i < NUM_ELEM; i++) { + ret = queue_signalled_push(q, (void *) i); + if (ret != 1) + return (void *) 1; /* Indicates an error to the parent thread */ + + usleep(0.1e-3 * 1e6); /* 1 ms */ + } + + return NULL; +} + +static void * consumer(void * ctx) +{ + int ret; + struct queue_signalled *q = ctx; + + void *data[NUM_ELEM]; + + for (intptr_t i = 0; i < NUM_ELEM;) { + ret = queue_signalled_pull_many(q, data, ARRAY_LEN(data)); + if (ret <= 0) + return (void *) 1; /* Indicates an error to the parent thread */ + + for (intptr_t j = 0; j < ret; j++, i++) { + if ((intptr_t) data[j] != i) + return (void *) 2; /* Indicates an error to the parent thread */ + } + } + + return NULL; +} + + void * polled_consumer(void *ctx) +{ + int ret, fd; + struct queue_signalled *q = ctx; + + fd = queue_signalled_fd(q); + cr_assert_geq(fd, 0); + + struct pollfd pfd = { + .fd = fd, + .events = POLLIN + }; + + for (intptr_t i = 0; i < NUM_ELEM; i++) { +again: ret = poll(&pfd, 1, -1); + if (ret < 0) + return (void *) 3; + else if (ret == 0) + goto again; + + + void *p; + ret = queue_signalled_pull(q, &p); + if (ret != 1) + return (void *) 1; /* Indicates an error to the parent thread */ + + if ((intptr_t) p != i) + return (void *) 2; /* Indicates an error to the parent thread */ + } + + return NULL; +} + +ParameterizedTestParameters(queue_signalled, simple) +{ + static struct param params[] = { + { 0, consumer }, + { QUEUE_SIGNALLED_PTHREAD, consumer }, + { QUEUE_SIGNALLED_PTHREAD, consumer }, + { QUEUE_SIGNALLED_PTHREAD | QUEUE_SIGNALLED_PROCESS_SHARED, consumer }, + { QUEUE_SIGNALLED_POLLING, consumer }, +#ifdef __linux__ + { QUEUE_SIGNALLED_EVENTFD, consumer }, + { QUEUE_SIGNALLED_EVENTFD, polled_consumer } +#endif + }; + + return cr_make_param_array(struct param, params, ARRAY_LEN(params)); +} + +ParameterizedTest(struct param *param, queue_signalled, simple, .timeout = 5) +{ + int ret; + void *r1, *r2; + struct queue_signalled q = { .queue.state = STATE_DESTROYED }; + + pthread_t t1, t2; + + ret = queue_signalled_init(&q, LOG2_CEIL(NUM_ELEM), &memtype_heap, param->flags); + cr_assert_eq(ret, 0, "Failed to initialize queue: flags=%#x, ret=%d", param->flags, ret); + + ret = pthread_create(&t1, NULL, producer, &q); + cr_assert_eq(ret, 0); + + ret = pthread_create(&t2, NULL, param->consumer_func, &q); + cr_assert_eq(ret, 0); + + ret = pthread_join(t1, &r1); + cr_assert_eq(ret, 0); + + ret = pthread_join(t2, &r2); + cr_assert_eq(ret, 0); + + cr_assert_null(r1, "Producer failed: %p", r1); + cr_assert_null(r2, "Consumer failed: %p", r2); + + ret = queue_signalled_available(&q); + cr_assert_eq(ret, 0); + + ret = queue_signalled_close(&q); + cr_assert_eq(ret, 0); + + ret = queue_signalled_destroy(&q); + cr_assert_eq(ret, 0); +} \ No newline at end of file