diff --git a/etc/shmem.conf b/etc/shmem.conf index 9d3779506..b5aa3e4b4 100644 --- a/etc/shmem.conf +++ b/etc/shmem.conf @@ -21,7 +21,8 @@ nodes = { }, shmem = { type = "shmem", - name = "/villas1", + out_name = "/villas1-out", + in_name = "/villas1-in", samplelen = 4, queuelen = 32, polling = false, diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h index 791e4b6ba..fa697bbd8 100644 --- a/include/villas/nodes/shmem.h +++ b/include/villas/nodes/shmem.h @@ -40,7 +40,8 @@ * @see node_type */ struct shmem { - const char* name; /**< Name of the shm object. */ + const char* out_name; /**< Name of the shm object for the output queue. */ + const char* in_name; /**< Name of the shm object for the input queue. */ struct shmem_conf conf; /**< Interface configuration struct. */ char **exec; /**< External program to execute on start. */ struct shmem_int intf; /**< Shmem interface */ diff --git a/include/villas/shmem.h b/include/villas/shmem.h index 7fe8420f0..03bd91927 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -59,38 +59,35 @@ struct shmem_conf { /** The structure that actually resides in the shared memory. */ struct shmem_shared { - pthread_barrier_t start_bar; /**< Barrier for synchronizing the start of both programs. */ + int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */ - int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */ + union shmem_queue queue; /**< Queues for samples passed in both directions. */ - union shmem_queue queue[2]; /**< Queues for samples passed in both directions. - 0: primary -> secondary, 1: secondary -> primary */ + struct pool pool; /**< Pool for the samples in the queues. */ +}; - struct pool pool; /**< Pool for the samples in the queues. */ +struct shmem_dir { + void *base; + const char *name; + size_t len; + struct shmem_shared *shared; }; /** Main structure representing the shared memory interface. */ struct shmem_int { - void* base; /**< Base address of the mapping (needed for munmap) */ - const char* name; /**< Name of the shared memory object */ - size_t len; /**< Total size of the shared memory region */ - struct shmem_shared *shared; /**< Pointer to mapped shared structure */ - int secondary; /**< Set to 1 if this is the secondary user (i.e. not the one - that created the object); 0 otherwise. */ + struct shmem_dir read, write; }; -/** Open the shared memory object and retrieve / initialize the shared data structures. - * If the object isn't already present, it is created instead. - * @param[in] name Name of the POSIX shared memory object. +/** Open the shared memory objects and retrieve / initialize the shared data structures. + * @param[in] wname Name of the POSIX shared memory object containing the output queue. + * @param[in] rname Name of the POSIX shared memory object containing the input queue. * @param[inout] shm The shmem_int structure that should be used for following * calls will be written to this pointer. - * @param[in] conf Configuration parameters for the interface. This struct is - * ignored if the shared memory object is already present. - * @retval 1 The object was created successfully. - * @retval 0 The existing object was opened successfully. + * @param[in] conf Configuration parameters for the output queue. + * @retval 0 The objects were opened and initialized successfully. * @retval <0 An error occured; errno is set accordingly. */ -int shmem_int_open(const char* name, struct shmem_int* shm, struct shmem_conf* conf); +int shmem_int_open(const char* wname, const char* rname, struct shmem_int* shm, struct shmem_conf* conf); /** Close and destroy the shared memory interface and related structures. * @param shm The shared memory interface. diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 306902f0c..e19c32196 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -40,8 +40,10 @@ int shmem_parse(struct node *n, config_setting_t *cfg) { struct shmem *shm = n->_vd; - if (!config_setting_lookup_string(cfg, "name", &shm->name)) - cerror(cfg, "Missing shared memory object name"); + if (!config_setting_lookup_string(cfg, "out_name", &shm->out_name)) + cerror(cfg, "Missing shared memory output queue name"); + if (!config_setting_lookup_string(cfg, "in_name", &shm->in_name)) + cerror(cfg, "Missing shared memory input queue name"); if (!config_setting_lookup_int(cfg, "queuelen", &shm->conf.queuelen)) shm->conf.queuelen = DEFAULT_SHMEM_QUEUELEN; if (!config_setting_lookup_int(cfg, "samplelen", &shm->conf.samplelen)) @@ -84,7 +86,7 @@ int shmem_open(struct node *n) serror("Failed to spawn external program"); } - ret = shmem_int_open(shm->name, &shm->intf, &shm->conf); + ret = shmem_int_open(shm->out_name, shm->in_name, &shm->intf, &shm->conf); if (ret < 0) serror("Opening shared memory interface failed"); @@ -105,6 +107,10 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) struct sample *shared_smps[cnt]; recv = shmem_int_read(&shm->intf, shared_smps, cnt); + if (recv < 0) + /* This can only really mean that the other process has exited, so close + * the interface to make sure the shared memory object is unlinked */ + shmem_int_close(&shm->intf); if (recv <= 0) return recv; @@ -122,9 +128,9 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */ int avail, pushed, len; - avail = sample_alloc(&shm->intf.shared->pool, shared_smps, cnt); + avail = sample_alloc(&shm->intf.write.shared->pool, shared_smps, cnt); if (avail != cnt) - warn("Pool underrun for shmem node %s", shm->name); + warn("Pool underrun for shmem node %s", shm->out_name); for (int i = 0; i < avail; i++) { /* Since the node isn't in shared memory, the source can't be accessed */ @@ -153,8 +159,8 @@ char * shmem_print(struct node *n) struct shmem *shm = n->_vd; char *buf = NULL; - strcatf(&buf, "name=%s, queuelen=%d, samplelen=%d, polling=%s", - shm->name, shm->conf.queuelen, shm->conf.samplelen, shm->conf.polling ? "yes" : "no"); + strcatf(&buf, "out_name=%s, in_name=%s, queuelen=%d, samplelen=%d, polling=%s", + shm->out_name, shm->in_name, shm->conf.queuelen, shm->conf.samplelen, shm->conf.polling ? "yes" : "no"); if (shm->exec) { strcatf(&buf, ", exec='"); diff --git a/lib/shmem.c b/lib/shmem.c index 45c443990..c22e54a5b 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -48,85 +49,37 @@ size_t shmem_total_size(int insize, int outsize, int sample_size) + 1024; } -int shmem_int_open(const char *name, struct shmem_int* shm, struct shmem_conf* conf) +int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, struct shmem_conf *conf) { - struct shmem_shared *shared; - pthread_barrierattr_t attr; - struct memtype *manager; - struct stat stat; + char *cptr; + int fd, ret; size_t len; void *base; - char *cptr; - int fd, ret; + struct memtype *manager; + struct shmem_shared *shared; + struct stat stat_buf; + sem_t *sem_own, *sem_other; - shm->name = name; - fd = shm_open(name, O_RDWR|O_CREAT|O_EXCL, 0600); - if (fd < 0) { - if (errno != EEXIST) - return -1; - /* Already present; reopen it nonexclusively */ - fd = shm_open(name, O_RDWR, 0); - if (fd < 0) - return -1; - - /* Theoretically, the other process might have created the object, but - * isn't done with initializing it yet. So in the creating process, - * we only reserve a small amount of memory, just enough for the barrier, - * and init the barrier, and then resize the object. Thus, here we first - * wait for the object to be resized, then wait on the barrier. - */ - while (1) { - if (fstat(fd, &stat) < 0) - return -1; - if (stat.st_size > SHMEM_MIN_SIZE) - break; - } - - len = stat.st_size; - base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (base == MAP_FAILED) - return -1; - - /* This relies on the behaviour of the node and the allocator; it assumes - * that memtype_managed is used and the shmem_shared is the first allocated object */ - cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); - shared = (struct shmem_shared *) cptr; - - pthread_barrier_wait(&shared->start_bar); - - shm->base = base; - shm->shared = shared; - shm->len = 0; - shm->secondary = 1; - - return 0; - } - /* Only map the barrier and init it */ - ret = ftruncate(fd, SHMEM_MIN_SIZE); - if (ret < 0) + /* Ensure both semaphores exist */ + sem_own = sem_open(wname, O_CREAT, 0600, 0); + if (sem_own == SEM_FAILED) return -1; - base = mmap(NULL, SHMEM_MIN_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (base == MAP_FAILED) + sem_other = sem_open(rname, O_CREAT, 0600, 0); + if (sem_other == SEM_FAILED) return -1; - /* Again, this assumes that memtype_managed uses first-fit */ - cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); - shared = (struct shmem_shared*) cptr; - pthread_barrierattr_init(&attr); - pthread_barrierattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); - pthread_barrier_init(&shared->start_bar, &attr, 2); - - /* Remap it with the real size */ + /* Open and initialize the shared region for the output queue */ + fd = shm_open(wname, O_RDWR|O_CREAT|O_EXCL, 0600); + if (fd < 0) + return -1; len = shmem_total_size(conf->queuelen, conf->queuelen, conf->samplelen); - if (munmap(base, SHMEM_MIN_SIZE) < 0) - return -1; if (ftruncate(fd, len) < 0) return -1; base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (base == MAP_FAILED) return -1; + close(fd); - /* Init everything else */ manager = memtype_managed_init(base, len); shared = memory_alloc(manager, sizeof(struct shmem_shared)); if (!shared) { @@ -134,64 +87,76 @@ int shmem_int_open(const char *name, struct shmem_int* shm, struct shmem_conf* c return -1; } - memset((char *) shared + sizeof(pthread_barrier_t), 0, sizeof(struct shmem_shared) - sizeof(pthread_barrier_t)); + memset(shared, 0, sizeof(struct shmem_shared)); shared->polling = conf->polling; - ret = shared->polling ? queue_init(&shared->queue[0].q, conf->queuelen, manager) - : queue_signalled_init(&shared->queue[0].qs, conf->queuelen, manager); + ret = shared->polling ? queue_init(&shared->queue.q, conf->queuelen, manager) + : queue_signalled_init(&shared->queue.qs, conf->queuelen, manager); if (ret) { errno = ENOMEM; return -1; } - ret = shared->polling ? queue_init(&shared->queue[1].q, conf->queuelen, manager) - : queue_signalled_init(&shared->queue[1].qs, conf->queuelen, manager); + ret = pool_init(&shared->pool, conf->queuelen, SAMPLE_LEN(conf->samplelen), manager); if (ret) { errno = ENOMEM; return -1; } - ret = pool_init(&shared->pool, 2 * conf->queuelen, SAMPLE_LEN(conf->samplelen), manager); - if (ret) { - errno = ENOMEM; + shm->write.base = base; + shm->write.name = wname; + shm->write.len = len; + shm->write.shared = shared; + + /* Post own semaphore and wait on the other one, so both processes know that + * both regions are initialized */ + sem_post(sem_own); + sem_wait(sem_other); + + /* Open and map the other region */ + fd = shm_open(rname, O_RDWR, 0); + if (fd < 0) + return -1; + if (fstat(fd, &stat_buf) < 0) + return -1; + len = stat_buf.st_size; + base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (base == MAP_FAILED) return -1; - } - shm->base = base; - shm->len = len; - shm->shared = shared; - shm->secondary = 0; + cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); + shared = (struct shmem_shared *) cptr; + shm->read.base = base; + shm->read.name = rname; + shm->read.len = len; + shm->read.shared = shared; - pthread_barrier_wait(&shared->start_bar); - - return 1; + /* Unlink the semaphores; we don't need them anymore */ + sem_unlink(wname); + return 0; } int shmem_int_close(struct shmem_int *shm) { - union shmem_queue * queue = &shm->shared->queue[shm->secondary]; - if (shm->shared->polling) - queue_close(&queue->q); + if (shm->write.shared->polling) + queue_close(&shm->write.shared->queue.q); else - queue_signalled_close(&queue->qs); - if (!shm->secondary) - /* Ignore the error here; the only thing that is really possible is that - * the object was deleted already, which we can't do anything about anyway. */ - shm_unlink(shm->name); + queue_signalled_close(&shm->write.shared->queue.qs); - return munmap(shm->base, shm->len); + munmap(shm->read.base, shm->read.len); + munmap(shm->write.base, shm->write.len); + shm_unlink(shm->write.name); + return 0; } int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt) { - union shmem_queue* queue = &shm->shared->queue[1-shm->secondary]; - return shm->shared->polling ? queue_pull_many(&queue->q, (void **) smps, cnt) - : queue_signalled_pull_many(&queue->qs, (void **) smps, cnt); + return shm->read.shared->polling ? queue_pull_many(&shm->read.shared->queue.q, (void **) smps, cnt) + : queue_signalled_pull_many(&shm->read.shared->queue.qs, (void **) smps, cnt); } int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt) { - union shmem_queue* queue = &shm->shared->queue[shm->secondary]; - return shm->shared->polling ? queue_push_many(&queue->q, (void **) smps, cnt) - : queue_signalled_push_many(&queue->qs, (void **) smps, cnt); + return shm->write.shared->polling ? queue_push_many(&shm->write.shared->queue.q, (void **) smps, cnt) + : queue_signalled_push_many(&shm->write.shared->queue.qs, (void **) smps, cnt); } diff --git a/src/test-shmem.c b/src/test-shmem.c index bde800e9e..74c3a50a3 100644 --- a/src/test-shmem.c +++ b/src/test-shmem.c @@ -37,8 +37,9 @@ struct shmem_int shm; void usage() { - printf("Usage: villas-test-shmem SHM_NAME VECTORIZE\n"); - printf(" SHMNAME name of the shared memory object\n"); + printf("Usage: villas-test-shmem WNAME VECTORIZE\n"); + printf(" WNAME name of the shared memory object for the output queue\n"); + printf(" RNAME name of the shared memory object for the input queue\n"); printf(" VECTORIZE maximum number of samples to read/write at a time\n"); } @@ -62,15 +63,16 @@ int main(int argc, char* argv[]) log_start(&log); - if (argc != 3) { + if (argc != 4) { usage(); return 1; } - char *object = argv[1]; - int vectorize = atoi(argv[2]); + char *wname = argv[1]; + char *rname = argv[2]; + int vectorize = atoi(argv[3]); - if (shmem_int_open(object, &shm, &conf) < 0) + if (shmem_int_open(wname, rname, &shm, &conf) < 0) serror("Failed to open shmem interface"); signal(SIGINT, quit); @@ -84,7 +86,7 @@ int main(int argc, char* argv[]) break; } - avail = sample_alloc(&shm.shared->pool, outsmps, readcnt); + avail = sample_alloc(&shm.write.shared->pool, outsmps, readcnt); if (avail < readcnt) warn("Pool underrun: %d / %d\n", avail, readcnt);