2017-04-15 21:22:19 +02:00
|
|
|
/* Wrapper around queue that uses POSIX CV's for signalling writes.
|
|
|
|
*
|
|
|
|
* Author: Georg Martin Reinke <georg.reinke@rwth-aachen.de>
|
2022-03-15 09:28:57 -04:00
|
|
|
* SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
|
2022-07-04 18:20:03 +02:00
|
|
|
* SPDX-License-Identifier: Apache-2.0
|
2017-04-15 21:22:19 +02:00
|
|
|
*/
|
2017-04-07 12:18:08 +02:00
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
#include <villas/node/config.hpp>
|
2017-12-09 02:19:28 +08:00
|
|
|
#include <villas/queue_signalled.h>
|
2017-04-07 12:18:08 +02:00
|
|
|
|
2018-06-21 09:37:30 +02:00
|
|
|
#ifdef HAS_EVENTFD
|
2023-09-07 11:46:39 +02:00
|
|
|
#include <sys/eventfd.h>
|
2017-08-30 12:35:47 +02:00
|
|
|
#endif
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
using namespace villas::node;
|
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
static void queue_signalled_cleanup(void *p) {
|
|
|
|
struct CQueueSignalled *qs = (struct CQueueSignalled *)p;
|
2017-09-05 10:11:23 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD)
|
|
|
|
pthread_mutex_unlock(&qs->pthread.mutex);
|
2017-08-30 12:35:47 +02:00
|
|
|
}
|
2017-04-15 23:12:53 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
int villas::node::queue_signalled_init(struct CQueueSignalled *qs, size_t size,
|
|
|
|
struct memory::Type *mem,
|
|
|
|
enum QueueSignalledMode mode,
|
|
|
|
int flags) {
|
|
|
|
int ret;
|
2017-09-05 10:11:23 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
qs->mode = mode;
|
2017-09-05 10:11:23 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
if (qs->mode == QueueSignalledMode::AUTO) {
|
2017-08-30 12:35:47 +02:00
|
|
|
#ifdef __linux__
|
2023-09-07 11:46:39 +02:00
|
|
|
if (flags & (int)QueueSignalledFlags::PROCESS_SHARED)
|
|
|
|
qs->mode = QueueSignalledMode::PTHREAD;
|
|
|
|
else {
|
2018-08-08 08:53:36 +02:00
|
|
|
#ifdef HAS_EVENTFD
|
2023-09-07 11:46:39 +02:00
|
|
|
qs->mode = QueueSignalledMode::EVENTFD;
|
2018-08-08 08:53:36 +02:00
|
|
|
#else
|
2023-09-07 11:46:39 +02:00
|
|
|
qs->mode = QueueSignalledMode::PTHREAD;
|
2018-08-08 08:53:36 +02:00
|
|
|
#endif
|
2023-09-07 11:46:39 +02:00
|
|
|
}
|
2017-08-30 12:35:47 +02:00
|
|
|
#else
|
2023-09-07 11:46:39 +02:00
|
|
|
qs->mode = QueueSignalledMode::PTHREAD;
|
2017-08-30 12:35:47 +02:00
|
|
|
#endif
|
2023-09-07 11:46:39 +02:00
|
|
|
}
|
2017-04-15 23:14:14 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
ret = queue_init(&qs->queue, size, mem);
|
|
|
|
if (ret < 0)
|
|
|
|
return ret;
|
2017-04-15 18:59:22 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD) {
|
|
|
|
pthread_condattr_t cvattr;
|
|
|
|
pthread_mutexattr_t mtattr;
|
2017-09-05 10:11:23 +02:00
|
|
|
|
2022-03-15 12:13:30 -04:00
|
|
|
ret = pthread_mutexattr_init(&mtattr);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
ret = pthread_condattr_init(&cvattr);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
2017-04-15 23:12:53 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
if (flags & (int)QueueSignalledFlags::PROCESS_SHARED) {
|
2022-03-15 12:13:30 -04:00
|
|
|
ret = pthread_mutexattr_setpshared(&mtattr, PTHREAD_PROCESS_SHARED);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
ret = pthread_condattr_setpshared(&cvattr, PTHREAD_PROCESS_SHARED);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
2023-09-07 11:46:39 +02:00
|
|
|
}
|
2017-04-15 23:12:53 +02:00
|
|
|
|
2022-03-15 12:13:30 -04:00
|
|
|
ret = pthread_mutex_init(&qs->pthread.mutex, &mtattr);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
ret = pthread_cond_init(&qs->pthread.ready, &cvattr);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
ret = pthread_mutexattr_destroy(&mtattr);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2022-03-15 12:13:30 -04:00
|
|
|
ret = pthread_condattr_destroy(&cvattr);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
2023-09-07 11:46:39 +02:00
|
|
|
} else if (qs->mode == QueueSignalledMode::POLLING) {
|
2022-03-15 12:13:30 -04:00
|
|
|
/* Nothing todo */
|
2023-09-07 11:46:39 +02:00
|
|
|
}
|
2018-06-21 09:37:30 +02:00
|
|
|
#ifdef HAS_EVENTFD
|
2023-09-07 11:46:39 +02:00
|
|
|
else if (qs->mode == QueueSignalledMode::EVENTFD) {
|
|
|
|
qs->eventfd = eventfd(0, 0);
|
|
|
|
if (qs->eventfd < 0)
|
|
|
|
return -2;
|
|
|
|
}
|
2017-08-30 12:35:47 +02:00
|
|
|
#endif
|
2023-09-07 11:46:39 +02:00
|
|
|
else
|
|
|
|
return -1;
|
2017-04-15 18:59:22 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
return 0;
|
2017-04-07 12:18:08 +02:00
|
|
|
}
|
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
int villas::node::queue_signalled_destroy(struct CQueueSignalled *qs) {
|
|
|
|
int ret;
|
|
|
|
|
|
|
|
ret = queue_destroy(&qs->queue);
|
|
|
|
if (ret < 0)
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD) {
|
|
|
|
pthread_cond_destroy(&qs->pthread.ready);
|
|
|
|
pthread_mutex_destroy(&qs->pthread.mutex);
|
|
|
|
} else if (qs->mode == QueueSignalledMode::POLLING) {
|
|
|
|
// Nothing todo
|
|
|
|
}
|
2018-06-21 09:37:30 +02:00
|
|
|
#ifdef HAS_EVENTFD
|
2023-09-07 11:46:39 +02:00
|
|
|
else if (qs->mode == QueueSignalledMode::EVENTFD) {
|
|
|
|
ret = close(qs->eventfd);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
}
|
2017-08-30 12:35:47 +02:00
|
|
|
#endif
|
2023-09-07 11:46:39 +02:00
|
|
|
else
|
|
|
|
return -1;
|
2017-04-15 18:59:22 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
return 0;
|
2017-04-07 12:18:08 +02:00
|
|
|
}
|
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
int villas::node::queue_signalled_push(struct CQueueSignalled *qs, void *ptr) {
|
|
|
|
int pushed;
|
|
|
|
|
|
|
|
pushed = queue_push(&qs->queue, ptr);
|
|
|
|
if (pushed <= 0)
|
|
|
|
return pushed;
|
|
|
|
|
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD) {
|
|
|
|
pthread_mutex_lock(&qs->pthread.mutex);
|
|
|
|
pthread_cond_broadcast(&qs->pthread.ready);
|
|
|
|
pthread_mutex_unlock(&qs->pthread.mutex);
|
|
|
|
} else if (qs->mode == QueueSignalledMode::POLLING) {
|
|
|
|
// Nothing todo
|
|
|
|
}
|
2018-06-21 09:37:30 +02:00
|
|
|
#ifdef HAS_EVENTFD
|
2023-09-07 11:46:39 +02:00
|
|
|
else if (qs->mode == QueueSignalledMode::EVENTFD) {
|
|
|
|
int ret;
|
|
|
|
uint64_t incr = 1;
|
|
|
|
ret = write(qs->eventfd, &incr, sizeof(incr));
|
|
|
|
if (ret < 0)
|
|
|
|
return ret;
|
|
|
|
}
|
2017-08-30 12:35:47 +02:00
|
|
|
#endif
|
2023-09-07 11:46:39 +02:00
|
|
|
else
|
|
|
|
return -1;
|
2017-04-15 23:14:02 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
return pushed;
|
2017-04-15 23:14:02 +02:00
|
|
|
}
|
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
int villas::node::queue_signalled_push_many(struct CQueueSignalled *qs,
|
|
|
|
void *ptr[], size_t cnt) {
|
|
|
|
int pushed;
|
|
|
|
|
|
|
|
pushed = queue_push_many(&qs->queue, ptr, cnt);
|
|
|
|
if (pushed <= 0)
|
|
|
|
return pushed;
|
|
|
|
|
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD) {
|
|
|
|
pthread_mutex_lock(&qs->pthread.mutex);
|
|
|
|
pthread_cond_broadcast(&qs->pthread.ready);
|
|
|
|
pthread_mutex_unlock(&qs->pthread.mutex);
|
|
|
|
} else if (qs->mode == QueueSignalledMode::POLLING) {
|
|
|
|
// Nothing todo
|
|
|
|
}
|
2018-06-21 09:37:30 +02:00
|
|
|
#ifdef HAS_EVENTFD
|
2023-09-07 11:46:39 +02:00
|
|
|
else if (qs->mode == QueueSignalledMode::EVENTFD) {
|
|
|
|
int ret;
|
|
|
|
uint64_t incr = 1;
|
|
|
|
ret = write(qs->eventfd, &incr, sizeof(incr));
|
|
|
|
if (ret < 0)
|
|
|
|
return ret;
|
|
|
|
}
|
2017-08-30 12:35:47 +02:00
|
|
|
#endif
|
2023-09-07 11:46:39 +02:00
|
|
|
else
|
|
|
|
return -1;
|
2017-04-15 18:59:22 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
return pushed;
|
2017-04-07 12:18:08 +02:00
|
|
|
}
|
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
int villas::node::queue_signalled_pull(struct CQueueSignalled *qs, void **ptr) {
|
|
|
|
int pulled = 0;
|
|
|
|
|
|
|
|
// Make sure that qs->mutex is unlocked if this thread gets cancelled
|
|
|
|
pthread_cleanup_push(queue_signalled_cleanup, qs);
|
|
|
|
|
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD)
|
|
|
|
pthread_mutex_lock(&qs->pthread.mutex);
|
|
|
|
|
|
|
|
while (!pulled) {
|
|
|
|
pulled = queue_pull(&qs->queue, ptr);
|
|
|
|
if (pulled < 0)
|
|
|
|
break;
|
|
|
|
else if (pulled == 0) {
|
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD)
|
|
|
|
pthread_cond_wait(&qs->pthread.ready, &qs->pthread.mutex);
|
|
|
|
else if (qs->mode == QueueSignalledMode::POLLING)
|
|
|
|
continue; // Try again
|
2018-06-21 09:37:30 +02:00
|
|
|
#ifdef HAS_EVENTFD
|
2023-09-07 11:46:39 +02:00
|
|
|
else if (qs->mode == QueueSignalledMode::EVENTFD) {
|
|
|
|
int ret;
|
|
|
|
uint64_t cntr;
|
|
|
|
ret = read(qs->eventfd, &cntr, sizeof(cntr));
|
|
|
|
if (ret < 0)
|
|
|
|
break;
|
|
|
|
}
|
2017-08-30 12:35:47 +02:00
|
|
|
#endif
|
2023-09-07 11:46:39 +02:00
|
|
|
else
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2017-05-14 11:35:49 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD)
|
|
|
|
pthread_mutex_unlock(&qs->pthread.mutex);
|
2017-08-30 12:35:47 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
pthread_cleanup_pop(0);
|
2017-05-14 11:35:49 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
return pulled;
|
2017-04-15 23:14:02 +02:00
|
|
|
}
|
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
int villas::node::queue_signalled_pull_many(struct CQueueSignalled *qs,
|
|
|
|
void *ptr[], size_t cnt) {
|
|
|
|
int pulled = 0;
|
|
|
|
|
|
|
|
// Make sure that qs->mutex is unlocked if this thread gets cancelled
|
|
|
|
pthread_cleanup_push(queue_signalled_cleanup, qs);
|
|
|
|
|
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD)
|
|
|
|
pthread_mutex_lock(&qs->pthread.mutex);
|
|
|
|
|
|
|
|
while (!pulled) {
|
|
|
|
pulled = queue_pull_many(&qs->queue, ptr, cnt);
|
|
|
|
if (pulled < 0)
|
|
|
|
break;
|
|
|
|
else if (pulled == 0) {
|
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD)
|
|
|
|
pthread_cond_wait(&qs->pthread.ready, &qs->pthread.mutex);
|
|
|
|
else if (qs->mode == QueueSignalledMode::POLLING)
|
|
|
|
continue; // Try again
|
2018-06-21 09:37:30 +02:00
|
|
|
#ifdef HAS_EVENTFD
|
2023-09-07 11:46:39 +02:00
|
|
|
else if (qs->mode == QueueSignalledMode::EVENTFD) {
|
|
|
|
int ret;
|
|
|
|
uint64_t cntr;
|
|
|
|
ret = read(qs->eventfd, &cntr, sizeof(cntr));
|
|
|
|
if (ret < 0)
|
|
|
|
break;
|
|
|
|
}
|
2017-08-30 12:35:47 +02:00
|
|
|
#endif
|
2023-09-07 11:46:39 +02:00
|
|
|
else
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2017-05-14 11:35:49 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD)
|
|
|
|
pthread_mutex_unlock(&qs->pthread.mutex);
|
2017-09-05 10:11:23 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
pthread_cleanup_pop(0);
|
2017-05-14 11:35:49 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
return pulled;
|
2017-05-12 13:08:34 +02:00
|
|
|
}
|
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
int villas::node::queue_signalled_close(struct CQueueSignalled *qs) {
|
|
|
|
int ret;
|
2017-05-05 19:24:16 +00:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD)
|
|
|
|
pthread_mutex_lock(&qs->pthread.mutex);
|
2017-05-14 11:35:49 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
ret = queue_close(&qs->queue);
|
2017-09-05 10:11:23 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
if (qs->mode == QueueSignalledMode::PTHREAD) {
|
|
|
|
pthread_cond_broadcast(&qs->pthread.ready);
|
|
|
|
pthread_mutex_unlock(&qs->pthread.mutex);
|
|
|
|
} else if (qs->mode == QueueSignalledMode::POLLING) {
|
|
|
|
// Nothing todo
|
|
|
|
}
|
2018-06-21 09:37:30 +02:00
|
|
|
#ifdef HAS_EVENTFD
|
2023-09-07 11:46:39 +02:00
|
|
|
else if (qs->mode == QueueSignalledMode::EVENTFD) {
|
|
|
|
int ret;
|
|
|
|
uint64_t incr = 1;
|
|
|
|
|
|
|
|
ret = write(qs->eventfd, &incr, sizeof(incr));
|
|
|
|
if (ret < 0)
|
|
|
|
return ret;
|
|
|
|
}
|
2017-08-30 12:35:47 +02:00
|
|
|
#endif
|
2023-09-07 11:46:39 +02:00
|
|
|
else
|
|
|
|
return -1;
|
2017-05-14 11:35:49 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
return ret;
|
2017-04-07 12:18:08 +02:00
|
|
|
}
|
2017-08-30 12:35:47 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
int villas::node::queue_signalled_fd(struct CQueueSignalled *qs) {
|
|
|
|
switch (qs->mode) {
|
2018-06-21 09:37:30 +02:00
|
|
|
#ifdef HAS_EVENTFD
|
2023-09-07 11:46:39 +02:00
|
|
|
case QueueSignalledMode::EVENTFD:
|
|
|
|
return qs->eventfd;
|
2017-08-30 12:35:47 +02:00
|
|
|
#endif
|
2023-09-07 11:46:39 +02:00
|
|
|
default: {
|
|
|
|
}
|
|
|
|
}
|
2017-09-05 10:11:23 +02:00
|
|
|
|
2023-09-07 11:46:39 +02:00
|
|
|
return -1;
|
2017-08-30 12:35:47 +02:00
|
|
|
}
|