diff --git a/include/villas/queue_signalled.h b/include/villas/queue_signalled.h index 3b3c97717..4f9e012b4 100644 --- a/include/villas/queue_signalled.h +++ b/include/villas/queue_signalled.h @@ -34,9 +34,11 @@ enum queue_signalled_flags { QUEUE_SIGNALLED_POLLING = (2 << 0), #ifdef __linux__ QUEUE_SIGNALLED_EVENTFD = (3 << 0), +#elif defined(__APPLE__) + QUEUE_SIGNALLED_PIPE = (3 << 0), #endif QUEUE_SIGNALLED_MASK = 0xf, - + /* Other flags */ QUEUE_SIGNALLED_PROCESS_SHARED = (1 << 4) }; @@ -44,9 +46,9 @@ enum queue_signalled_flags { /** Wrapper around queue that uses POSIX CV's for signalling writes. */ struct queue_signalled { struct queue queue; /**< Actual underlying queue. */ - + enum queue_signalled_flags mode; - + union { struct { pthread_cond_t ready; /**< Condition variable to signal writes to the queue. */ @@ -54,6 +56,8 @@ struct queue_signalled { } pthread; #ifdef __linux__ int eventfd; +#elif defined(__APPLE__) + int pipe[2]; #endif }; }; diff --git a/lib/queue_signalled.c b/lib/queue_signalled.c index 48357ff37..a37caa316 100644 --- a/lib/queue_signalled.c +++ b/lib/queue_signalled.c @@ -48,6 +48,11 @@ int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype qs->mode = QUEUE_SIGNALLED_PTHREAD; else qs->mode = QUEUE_SIGNALLED_EVENTFD; +#elif defined(__APPLE__) + if (flags & QUEUE_SIGNALLED_PROCESS_SHARED) + qs->mode = QUEUE_SIGNALLED_PTHREAD; + else + qs->mode = QUEUE_SIGNALLED_PIPE; #else qs->mode = QUEUE_SIGNALLED_PTHREAD; #endif @@ -84,6 +89,12 @@ int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype if (qs->eventfd < 0) return -2; } +#elif defined(__APPLE__) + else if (qs->mode == QUEUE_SIGNALLED_PIPE) { + ret = pipe(qs->pipe); + if (ret < 0) + return -2; + } #endif else return -1; @@ -112,6 +123,12 @@ int queue_signalled_destroy(struct queue_signalled *qs) if (ret) return ret; } +#elif defined(__APPLE__) + else if (qs->mode == QUEUE_SIGNALLED_PIPE) { + ret = close(qs->pipe[0]) + close(qs->pipe[1]); + if (ret) + return ret; + } #endif else return -1; @@ -143,6 +160,14 @@ int queue_signalled_push(struct queue_signalled *qs, void *ptr) if (ret < 0) return ret; } +#elif defined(__APPLE__) + else if (qs->mode == QUEUE_SIGNALLED_PIPE) { + int ret; + uint8_t incr = 1; + ret = write(qs->pipe[1], &incr, sizeof(incr)); + if (ret < 0) + return ret; + } #endif else return -1; @@ -174,6 +199,14 @@ int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cn if (ret < 0) return ret; } +#elif defined(__APPLE__) + else if (qs->mode == QUEUE_SIGNALLED_PIPE) { + int ret; + uint8_t incr = 1; + ret = write(qs->pipe[1], &incr, sizeof(incr)); + if (ret < 0) + return ret; + } #endif else return -1; @@ -208,6 +241,14 @@ int queue_signalled_pull(struct queue_signalled *qs, void **ptr) if (ret < 0) break; } +#elif defined(__APPLE__) + else if (qs->mode == QUEUE_SIGNALLED_PIPE) { + int ret; + uint8_t incr = 1; + ret = read(qs->pipe[0], &incr, sizeof(incr)); + if (ret < 0) + break; + } #endif else break; @@ -249,6 +290,14 @@ int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cn if (ret < 0) break; } +#elif defined(__APPLE__) + else if (qs->mode == QUEUE_SIGNALLED_PIPE) { + int ret; + uint8_t incr = 1; + ret = read(qs->pipe[0], &incr, sizeof(incr)); + if (ret < 0) + break; + } #endif else break; @@ -288,6 +337,15 @@ int queue_signalled_close(struct queue_signalled *qs) if (ret < 0) return ret; } +#elif defined(__APPLE__) + else if (qs->mode == QUEUE_SIGNALLED_PIPE) { + int ret; + uint64_t incr = 1; + + ret = write(qs->pipe[1], &incr, sizeof(incr)); + if (ret < 0) + return ret; + } #endif else return -1; @@ -301,6 +359,9 @@ int queue_signalled_fd(struct queue_signalled *qs) #ifdef __linux__ case QUEUE_SIGNALLED_EVENTFD: return qs->eventfd; +#elif defined(__APPLE__) + case QUEUE_SIGNALLED_PIPE: + return qs->pipe[0]; #endif default: { } }