diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h index 10dfeae2f..5bf02ec96 100644 --- a/include/villas/nodes/shmem.h +++ b/include/villas/nodes/shmem.h @@ -16,7 +16,6 @@ struct shmem { int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */ int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */ - size_t len; /**< Overall size of shared memory object. */ struct memtype *manager; /**< Manager for the shared memory region. */ int fd; /**< Handle as returned by shm_open().*/ void *base; /**< Pointer to the shared memory region. */ diff --git a/include/villas/shmem.h b/include/villas/shmem.h index 442d05174..543936240 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -11,13 +11,20 @@ union shmem_queue { struct queue_signalled qs; }; -/** The structure that actually resides in the shared memory. TODO better name?*/ +/** The structure that actually resides in the shared memory. */ struct shmem_shared { + size_t len; /**< Total size of the shared memory region.*/ union shmem_queue in; /**< Queue for samples passed from external program to node.*/ + int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */ union shmem_queue out; /**< Queue for samples passed from node to external program.*/ + int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */ struct pool pool; /**< Pool for the samples in the queues. */ }; -struct shmem_shared * shmem_int_open(const char* name, size_t len); +struct shmem_shared * shmem_shared_open(const char* name); + +int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt); + +int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt); size_t shmem_total_size(int insize, int outsize, int sample_size); diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index e1bdc6181..69a1ec689 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -40,20 +40,22 @@ int shmem_open(struct node *n) { serror("Opening shared memory object failed"); shm->fd = r; - shm->len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size); - if (ftruncate(shm->fd, shm->len) < 0) + size_t len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size); + if (ftruncate(shm->fd, len) < 0) serror("Setting size of shared memory object failed"); - /* TODO: we could use huge pages here as well */ - shm->base = mmap(NULL, shm->len, PROT_READ|PROT_WRITE, MAP_SHARED, shm->fd, 0); + shm->base = mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_SHARED, shm->fd, 0); if (shm->base == MAP_FAILED) serror("Mapping shared memory failed"); - shm->manager = memtype_managed_init(shm->base, shm->len); + shm->manager = memtype_managed_init(shm->base, len); shm->shared = memory_alloc(shm->manager, sizeof(struct shmem_shared)); if (!shm->shared) error("Shm shared struct allocation failed (not enough memory?)"); memset(shm->shared, 0, sizeof(struct shmem_shared)); + shm->shared->len = len; + shm->shared->cond_in = shm->cond_in; + shm->shared->cond_out = shm->cond_out; if (shm->cond_in) { if (queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager) < 0) error("Shm queue allocation failed (not enough memory?)"); @@ -76,6 +78,7 @@ int shmem_open(struct node *n) { int shmem_close(struct node *n) { struct shmem* shm = n->_vd; + size_t len = shm->shared->len; if (shm->cond_in) queue_signalled_destroy(&shm->shared->in.qs); else @@ -85,7 +88,7 @@ int shmem_close(struct node *n) { else queue_destroy(&shm->shared->out.q); pool_destroy(&shm->shared->pool); - int r = munmap(shm->base, shm->len); + int r = munmap(shm->base, len); if (r != 0) return r; return shm_unlink(shm->name); diff --git a/lib/shmem.c b/lib/shmem.c index 89d4bc3d1..401405e53 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -24,16 +24,42 @@ size_t shmem_total_size(int insize, int outsize, int sample_size) + 1024; } -struct shmem_shared* shmem_int_open(const char *name, size_t len) +struct shmem_shared* shmem_shared_open(const char *name) { int fd = shm_open(name, O_RDWR, 0); if (fd < 0) return NULL; + /* Only map the first part (shmem_shared) first, read the correct length, + * the map it with this length. */ + size_t len = sizeof(struct memtype) + sizeof(struct memblock) + sizeof(struct shmem_shared); void *base = mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); if (base == MAP_FAILED) return NULL; /* This relies on the behaviour of the node and the allocator; it assumes * that memtype_managed is used and the shmem_shared is the first allocated object */ char *cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); + struct shmem_shared *shm = (struct shmem_shared *) cptr; + size_t newlen = shm->len; + if (munmap(base, len)) + return NULL; + base = mmap(NULL, newlen, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); + if (base == MAP_FAILED) + return NULL; + /* Adress might have moved */ + cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); return (struct shmem_shared *) cptr; } + +int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) { + if (shm->cond_out) + return queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt); + else + return queue_pull_many(&shm->out.q, (void **) smps, cnt); +} + +int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) { + if (shm->cond_in) + return queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt); + else + return queue_pull_many(&shm->in.q, (void **) smps, cnt); +} diff --git a/src/shmem.c b/src/shmem.c index dc9e7fb71..1a0057ab7 100644 --- a/src/shmem.c +++ b/src/shmem.c @@ -40,19 +40,15 @@ int main(int argc, char* argv[]) error("Node '%s' does not exist!", argv[2]); struct shmem* shm = node->_vd; - size_t len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size); - struct shmem_shared *shmem = shmem_int_open(shm->name, len); - if (!shmem) + struct shmem_shared *shared = shmem_shared_open(shm->name); + if (!shared) serror("Failed to open shmem interface"); struct sample *insmps[node->vectorize], *outsmps[node->vectorize]; while (1) { int r, w; - if (shm->cond_out) - r = queue_signalled_pull_many(&shmem->out.qs, (void **) insmps, node->vectorize); - else - r = queue_pull_many(&shmem->out.q, (void **) insmps, node->vectorize); - int avail = sample_alloc(&shmem->pool, outsmps, r); + r = shmem_shared_read(shared, insmps, node->vectorize); + int avail = sample_alloc(&shared->pool, outsmps, r); if (avail < r) warn("pool underrun (%d/%d)\n", avail, r); for (int i = 0; i < r; i++) @@ -66,10 +62,7 @@ int main(int argc, char* argv[]) } for (int i = 0; i < r; i++) sample_put(insmps[i]); - if (shm->cond_in) - w = queue_signalled_push_many(&shmem->in.qs, (void **) outsmps, avail); - else - w = queue_push_many(&shmem->in.q, (void **) outsmps, avail); + w = shmem_shared_write(shared, outsmps, avail); if (w < avail) warn("short write (%d/%d)\n", w, r); }