/** Wrapper around queue that uses POSIX CV's for signalling writes. * * @file * @author Georg Martin Reinke * @copyright 2014-2022, Institute for Automation of Complex Power Systems, EONERC * @license Apache 2.0 *********************************************************************************/ #include #include #ifdef HAS_EVENTFD #include #endif using namespace villas::node; static void queue_signalled_cleanup(void *p) { struct CQueueSignalled *qs = (struct CQueueSignalled *) p; if (qs->mode == QueueSignalledMode::PTHREAD) pthread_mutex_unlock(&qs->pthread.mutex); } int villas::node::queue_signalled_init(struct CQueueSignalled *qs, size_t size, struct memory::Type *mem, enum QueueSignalledMode mode, int flags) { int ret; qs->mode = mode; if (qs->mode == QueueSignalledMode::AUTO) { #ifdef __linux__ if (flags & (int) QueueSignalledFlags::PROCESS_SHARED) qs->mode = QueueSignalledMode::PTHREAD; else { #ifdef HAS_EVENTFD qs->mode = QueueSignalledMode::EVENTFD; #else qs->mode = QueueSignalledMode::PTHREAD; #endif } #else qs->mode = QueueSignalledMode::PTHREAD; #endif } ret = queue_init(&qs->queue, size, mem); if (ret < 0) return ret; if (qs->mode == QueueSignalledMode::PTHREAD) { pthread_condattr_t cvattr; pthread_mutexattr_t mtattr; pthread_mutexattr_init(&mtattr); pthread_condattr_init(&cvattr); if (flags & (int) QueueSignalledFlags::PROCESS_SHARED) { pthread_mutexattr_setpshared(&mtattr, PTHREAD_PROCESS_SHARED); pthread_condattr_setpshared(&cvattr, PTHREAD_PROCESS_SHARED); } pthread_mutex_init(&qs->pthread.mutex, &mtattr); pthread_cond_init(&qs->pthread.ready, &cvattr); pthread_mutexattr_destroy(&mtattr); pthread_condattr_destroy(&cvattr); } else if (qs->mode == QueueSignalledMode::POLLING) { /* Nothing todo */ } #ifdef HAS_EVENTFD else if (qs->mode == QueueSignalledMode::EVENTFD) { qs->eventfd = eventfd(0, 0); if (qs->eventfd < 0) return -2; } #endif else return -1; return 0; } int villas::node::queue_signalled_destroy(struct CQueueSignalled *qs) { int ret; ret = queue_destroy(&qs->queue); if (ret < 0) return ret; if (qs->mode == QueueSignalledMode::PTHREAD) { pthread_cond_destroy(&qs->pthread.ready); pthread_mutex_destroy(&qs->pthread.mutex); } else if (qs->mode == QueueSignalledMode::POLLING) { /* Nothing todo */ } #ifdef HAS_EVENTFD else if (qs->mode == QueueSignalledMode::EVENTFD) { ret = close(qs->eventfd); if (ret) return ret; } #endif else return -1; return 0; } int villas::node::queue_signalled_push(struct CQueueSignalled *qs, void *ptr) { int pushed; pushed = queue_push(&qs->queue, ptr); if (pushed <= 0) return pushed; if (qs->mode == QueueSignalledMode::PTHREAD) { pthread_mutex_lock(&qs->pthread.mutex); pthread_cond_broadcast(&qs->pthread.ready); pthread_mutex_unlock(&qs->pthread.mutex); } else if (qs->mode == QueueSignalledMode::POLLING) { /* Nothing todo */ } #ifdef HAS_EVENTFD else if (qs->mode == QueueSignalledMode::EVENTFD) { int ret; uint64_t incr = 1; ret = write(qs->eventfd, &incr, sizeof(incr)); if (ret < 0) return ret; } #endif else return -1; return pushed; } int villas::node::queue_signalled_push_many(struct CQueueSignalled *qs, void *ptr[], size_t cnt) { int pushed; pushed = queue_push_many(&qs->queue, ptr, cnt); if (pushed <= 0) return pushed; if (qs->mode == QueueSignalledMode::PTHREAD) { pthread_mutex_lock(&qs->pthread.mutex); pthread_cond_broadcast(&qs->pthread.ready); pthread_mutex_unlock(&qs->pthread.mutex); } else if (qs->mode == QueueSignalledMode::POLLING) { /* Nothing todo */ } #ifdef HAS_EVENTFD else if (qs->mode == QueueSignalledMode::EVENTFD) { int ret; uint64_t incr = 1; ret = write(qs->eventfd, &incr, sizeof(incr)); if (ret < 0) return ret; } #endif else return -1; return pushed; } int villas::node::queue_signalled_pull(struct CQueueSignalled *qs, void **ptr) { int pulled = 0; /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ pthread_cleanup_push(queue_signalled_cleanup, qs); if (qs->mode == QueueSignalledMode::PTHREAD) pthread_mutex_lock(&qs->pthread.mutex); while (!pulled) { pulled = queue_pull(&qs->queue, ptr); if (pulled < 0) break; else if (pulled == 0) { if (qs->mode == QueueSignalledMode::PTHREAD) pthread_cond_wait(&qs->pthread.ready, &qs->pthread.mutex); else if (qs->mode == QueueSignalledMode::POLLING) continue; /* Try again */ #ifdef HAS_EVENTFD else if (qs->mode == QueueSignalledMode::EVENTFD) { int ret; uint64_t cntr; ret = read(qs->eventfd, &cntr, sizeof(cntr)); if (ret < 0) break; } #endif else break; } } if (qs->mode == QueueSignalledMode::PTHREAD) pthread_mutex_unlock(&qs->pthread.mutex); pthread_cleanup_pop(0); return pulled; } int villas::node::queue_signalled_pull_many(struct CQueueSignalled *qs, void *ptr[], size_t cnt) { int pulled = 0; /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ pthread_cleanup_push(queue_signalled_cleanup, qs); if (qs->mode == QueueSignalledMode::PTHREAD) pthread_mutex_lock(&qs->pthread.mutex); while (!pulled) { pulled = queue_pull_many(&qs->queue, ptr, cnt); if (pulled < 0) break; else if (pulled == 0) { if (qs->mode == QueueSignalledMode::PTHREAD) pthread_cond_wait(&qs->pthread.ready, &qs->pthread.mutex); else if (qs->mode == QueueSignalledMode::POLLING) continue; /* Try again */ #ifdef HAS_EVENTFD else if (qs->mode == QueueSignalledMode::EVENTFD) { int ret; uint64_t cntr; ret = read(qs->eventfd, &cntr, sizeof(cntr)); if (ret < 0) break; } #endif else break; } } if (qs->mode == QueueSignalledMode::PTHREAD) pthread_mutex_unlock(&qs->pthread.mutex); pthread_cleanup_pop(0); return pulled; } int villas::node::queue_signalled_close(struct CQueueSignalled *qs) { int ret; if (qs->mode == QueueSignalledMode::PTHREAD) pthread_mutex_lock(&qs->pthread.mutex); ret = queue_close(&qs->queue); if (qs->mode == QueueSignalledMode::PTHREAD) { pthread_cond_broadcast(&qs->pthread.ready); pthread_mutex_unlock(&qs->pthread.mutex); } else if (qs->mode == QueueSignalledMode::POLLING) { /* Nothing todo */ } #ifdef HAS_EVENTFD else if (qs->mode == QueueSignalledMode::EVENTFD) { int ret; uint64_t incr = 1; ret = write(qs->eventfd, &incr, sizeof(incr)); if (ret < 0) return ret; } #endif else return -1; return ret; } int villas::node::queue_signalled_fd(struct CQueueSignalled *qs) { switch (qs->mode) { #ifdef HAS_EVENTFD case QueueSignalledMode::EVENTFD: return qs->eventfd; #endif default: { } } return -1; }