diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h
index 91676d00b..56d0d6cf4 100644
--- a/include/villas/nodes/shmem.h
+++ b/include/villas/nodes/shmem.h
@@ -34,24 +34,17 @@
#include "pool.h"
#include "queue.h"
#include "config.h"
-
-#define DEFAULT_SHMEM_QUEUELEN 512
-#define DEFAULT_SHMEM_SAMPLELEN DEFAULT_SAMPLELEN
+#include "shmem.h"
/** Node-type for shared memory communication.
* @see node_type
*/
struct shmem {
const char* name; /**< Name of the shm object. */
- int samplelen; /**< Number of data entries for each sample. */
- int queuelen; /**< Size of ingoing and outgoing queue, respectively. */
- int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to outgoing queue. */
+ struct shmem_conf conf; /**< Interface configuration struct. */
char **exec; /**< External program to execute on start. */
- struct memtype *manager; /**< Manager for the shared memory region. */
- int fd; /**< Handle as returned by shm_open().*/
- void *base; /**< Pointer to the shared memory region. */
- struct shmem_shared *shared; /**< Shared datastructure. */
+ struct shmem_int intf; /**< Shmem interface */
};
/** @see node_type::print */
diff --git a/include/villas/shmem.h b/include/villas/shmem.h
index 39d5d5390..3a715066a 100644
--- a/include/villas/shmem.h
+++ b/include/villas/shmem.h
@@ -33,9 +33,14 @@
extern "C" {
#endif
+#include "config.h"
#include "pool.h"
#include "queue.h"
#include "queue_signalled.h"
+#include "sample.h"
+
+#define DEFAULT_SHMEM_QUEUELEN 512
+#define DEFAULT_SHMEM_SAMPLELEN DEFAULT_SAMPLELEN
/** A signalled queue or a regular (polling) queue, depending on the polling setting. */
union shmem_queue {
@@ -43,56 +48,75 @@ union shmem_queue {
struct queue_signalled qs;
};
+#define SHMEM_MIN_SIZE (sizeof(struct memtype) + sizeof(struct memblock) + sizeof(pthread_barrier_t) + sizeof(pthread_barrierattr_t))
+
+/** Struct containing all parameters that need to be known when creating a new
+ * shared memory object. */
+struct shmem_conf {
+ int polling; /*< Whether to use polling instead of POSIX CVs */
+ int queuelen; /*< Size of the queues (in elements) */
+ int samplelen; /*< Maximum number of data entries in a single sample */
+};
+
/** The structure that actually resides in the shared memory. */
struct shmem_shared {
- size_t len; /**< Total size of the shared memory region.*/
+ pthread_barrier_t start_bar; /**< Barrier for synchronizing the start of both programs. */
int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */
- union shmem_queue in; /**< Queue for samples passed from external program to node.*/
- union shmem_queue out; /**< Queue for samples passed from node to external program.*/
+ union shmem_queue queue[2]; /**< Queues for samples passed in both directions.
+ 0: primary -> secondary, 1: secondary -> primary */
struct pool pool; /**< Pool for the samples in the queues. */
+};
- pthread_barrier_t start_bar; /**< Barrier for synchronizing the start of both programs. */
- pthread_barrierattr_t start_attr;
- atomic_size_t node_stopped; /**< Set to 1 by VILLASNode if it is stopped/killed. */
- atomic_size_t ext_stopped; /**< Set to 1 by the external program if it is stopped/killed. */
+struct shmem_int {
+ void* base; /**< Base address of the mapping (needed for munmap) */
+ const char* name; /**< Name of the shared memory object */
+ size_t len; /**< Total size of the shared memory region */
+ struct shmem_shared *shared; /**< Pointer to mapped shared structure */
+ int secondary; /**< Set to 1 if this is the secondary user (i.e. not the one
+ that created the object); 0 otherwise. */
};
/** Open the shared memory object and retrieve / initialize the shared data structures.
+ * If the object isn't already present, it is created instead.
* @param[in] name Name of the POSIX shared memory object.
- * @param[inout] base_ptr The base address of the shared memory region is written to this pointer.
- * @retval NULL An error occurred; errno is set appropiately.
+ * @param[inout] shm The shmem_int structure that should be used for following
+ * calls will be written to this pointer.
+ * @param[in] conf Configuration parameters for the interface. This struct is
+ * ignored if the shared memory object is already present.
+ * @retval 1 The object was created successfully.
+ * @retval 0 The existing object was opened successfully.
+ * @retval <0 An error occured; errno is set accordingly.
*/
-struct shmem_shared * shmem_shared_open(const char* name, void **base_ptr);
+int shmem_int_open(const char* name, struct shmem_int* shm, struct shmem_conf* conf);
-/** Close and destroy the shared memory object and related structures.
- * @param shm The shared memory structure.
- * @param base The base address as returned by shmem_shared_open.
+/** Close and destroy the shared memory interface and related structures.
+ * @param shm The shared memory interface.
* @retval 0 Closing successfull.
* @retval <0 An error occurred; errno is set appropiately.
*/
-int shmem_shared_close(struct shmem_shared *shm, void *base);
+int shmem_int_close(struct shmem_int *shm);
-/** Read samples from VILLASNode.
- * @param shm The shared memory structure.
+/** Read samples from the interface.
+ * @param shm The shared memory interface.
* @param smps An array where the pointers to the samples will be written. The samples
* must be freed with sample_put after use.
* @param cnt Number of samples to be read.
* @retval >=0 Number of samples that were read. Can be less than cnt (including 0) in case not enough samples were available.
- * @retval -1 VILLASNode exited; no samples can be read anymore.
+ * @retval -1 The other process closed the interface; no samples can be read anymore.
*/
-int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt);
+int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt);
-/** Write samples to VILLASNode.
- * @param shm The shared memory structure.
+/** Write samples to the interface.
+ * @param shm The shared memory interface.
* @param smps The samples to be written. Must be allocated from shm->pool.
* @param cnt Number of samples to write.
* @retval >=0 Number of samples that were successfully written. Can be less than cnt (including 0) in case of a full queue.
- * @retval -1 VILLASNode exited; no samples can be written anymore.
+ * @retval -1 The write failed for some reason; no more samples can be written.
*/
-int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt);
+int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt);
/** Returns the total size of the shared memory region with the given size of
* the input/output queues (in elements) and the given number of data elements
diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c
index 5f7c38c02..d1338cc7c 100644
--- a/lib/nodes/shmem.c
+++ b/lib/nodes/shmem.c
@@ -30,9 +30,9 @@
#include "kernel/kernel.h"
#include "log.h"
+#include "shmem.h"
#include "nodes/shmem.h"
#include "plugin.h"
-#include "shmem.h"
#include "timing.h"
#include "utils.h"
@@ -42,12 +42,12 @@ int shmem_parse(struct node *n, config_setting_t *cfg)
if (!config_setting_lookup_string(cfg, "name", &shm->name))
cerror(cfg, "Missing shared memory object name");
- if (!config_setting_lookup_int(cfg, "queuelen", &shm->queuelen))
- shm->queuelen = DEFAULT_SHMEM_QUEUELEN;
- if (!config_setting_lookup_int(cfg, "samplelen", &shm->samplelen))
- shm->samplelen = DEFAULT_SHMEM_SAMPLELEN;
- if (!config_setting_lookup_bool(cfg, "polling", &shm->polling))
- shm->polling = false;
+ if (!config_setting_lookup_int(cfg, "queuelen", &shm->conf.queuelen))
+ shm->conf.queuelen = DEFAULT_SHMEM_QUEUELEN;
+ if (!config_setting_lookup_int(cfg, "samplelen", &shm->conf.samplelen))
+ shm->conf.samplelen = DEFAULT_SHMEM_SAMPLELEN;
+ if (!config_setting_lookup_bool(cfg, "polling", &shm->conf.polling))
+ shm->conf.polling = false;
config_setting_t *exec_cfg = config_setting_lookup(cfg, "exec");
if (!exec_cfg)
@@ -77,56 +77,15 @@ int shmem_open(struct node *n)
{
struct shmem *shm = n->_vd;
int ret;
- size_t len;
-
- shm->fd = shm_open(shm->name, O_RDWR | O_CREAT, 0600);
- if (shm->fd < 0)
- serror("Opening shared memory object failed");
-
- len = shmem_total_size(shm->queuelen, shm->queuelen, shm->samplelen);
-
- ret = ftruncate(shm->fd, len);
- if (ret < 0)
- serror("Setting size of shared memory object failed");
-
- shm->base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, shm->fd, 0);
- if (shm->base == MAP_FAILED)
- serror("Mapping shared memory failed");
-
- shm->manager = memtype_managed_init(shm->base, len);
- shm->shared = memory_alloc(shm->manager, sizeof(struct shmem_shared));
- if (!shm->shared)
- error("Shared memory shared struct allocation failed (not enough memory?)");
-
- memset(shm->shared, 0, sizeof(struct shmem_shared));
- shm->shared->len = len;
- shm->shared->polling = shm->polling;
-
- ret = shm->polling ? queue_init(&shm->shared->in.q, shm->queuelen, shm->manager)
- : queue_signalled_init(&shm->shared->in.qs, shm->queuelen, shm->manager);
- if (ret)
- error("Shared memory queue allocation failed (not enough memory?)");
-
- ret = shm->polling ? queue_init(&shm->shared->out.q, shm->queuelen, shm->manager)
- : queue_signalled_init(&shm->shared->out.qs, shm->queuelen, shm->manager);
- if (ret)
- error("Shared memory queue allocation failed (not enough memory?)");
-
- ret = pool_init(&shm->shared->pool, 2 * shm->queuelen, SAMPLE_LEN(shm->samplelen), shm->manager);
- if (ret)
- error("Shared memory pool allocation failed (not enough memory?)");
-
- pthread_barrierattr_init(&shm->shared->start_attr);
- pthread_barrierattr_setpshared(&shm->shared->start_attr, PTHREAD_PROCESS_SHARED);
- pthread_barrier_init(&shm->shared->start_bar, &shm->shared->start_attr, 2);
if (shm->exec) {
ret = spawn(shm->exec[0], shm->exec);
if (!ret)
serror("Failed to spawn external program");
}
-
- pthread_barrier_wait(&shm->shared->start_bar);
+ ret = shmem_int_open(shm->name, &shm->intf, &shm->conf);
+ if (ret < 0)
+ serror("Opening shared memory interface failed");
return 0;
}
@@ -134,33 +93,17 @@ int shmem_open(struct node *n)
int shmem_close(struct node *n)
{
struct shmem* shm = n->_vd;
- int ret;
- if (shm->polling)
- queue_close(&shm->shared->out.q);
- else
- queue_signalled_close(&shm->shared->out.qs);
-
- /* Don't destroy the data structures yet, since the other process might
- * still be using them. Once both processes are done and have unmapped the
- * memory, it will be freed anyway. */
- ret = munmap(shm->base, shm->shared->len);
- if (ret != 0)
- return ret;
-
- return shm_unlink(shm->name);
+ return shmem_int_close(&shm->intf);
}
int shmem_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct shmem *shm = n->_vd;
-
int recv;
-
struct sample *shared_smps[cnt];
- recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) shared_smps, cnt)
- : queue_signalled_pull_many(&shm->shared->in.qs, (void**) shared_smps, cnt);
+ recv = shmem_int_read(&shm->intf, shared_smps, cnt);
if (recv <= 0)
return recv;
@@ -178,7 +121,7 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */
int avail, pushed, len;
- avail = sample_alloc(&shm->shared->pool, shared_smps, cnt);
+ avail = sample_alloc(&shm->intf.shared->pool, shared_smps, cnt);
if (avail != cnt)
warn("Pool underrun for shmem node %s", shm->name);
@@ -197,9 +140,7 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
shared_smps[i]->length = len;
}
- pushed = shm->polling ? queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail)
- : queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail);
-
+ pushed = shmem_int_write(&shm->intf, shared_smps, avail);
if (pushed != avail)
warn("Outgoing queue overrun for node %s", node_name(n));
@@ -212,7 +153,7 @@ char * shmem_print(struct node *n)
char *buf = NULL;
strcatf(&buf, "name=%s, queuelen=%d, samplelen=%d, polling=%s",
- shm->name, shm->queuelen, shm->samplelen, shm->polling ? "yes" : "no");
+ shm->name, shm->conf.queuelen, shm->conf.samplelen, shm->conf.polling ? "yes" : "no");
if (shm->exec) {
strcatf(&buf, ", exec='");
diff --git a/lib/shmem.c b/lib/shmem.c
index 79a765abc..d192ee05e 100644
--- a/lib/shmem.c
+++ b/lib/shmem.c
@@ -21,6 +21,7 @@
* along with this program. If not, see .
*********************************************************************************/
+#include
#include
#include
#include
@@ -47,70 +48,145 @@ size_t shmem_total_size(int insize, int outsize, int sample_size)
+ 1024;
}
-struct shmem_shared * shmem_shared_open(const char *name, void **base_ptr)
+int shmem_int_open(const char *name, struct shmem_int* shm, struct shmem_conf* conf)
{
- struct shmem_shared *shm;
- size_t len, newlen;
+ struct shmem_shared *shared;
+ pthread_barrierattr_t attr;
+ struct memtype *manager;
+ struct stat stat;
+ size_t len;
void *base;
char *cptr;
int fd, ret;
- fd = shm_open(name, O_RDWR, 0);
- if (fd < 0)
- return NULL;
+ shm->name = name;
+ fd = shm_open(name, O_RDWR|O_CREAT|O_EXCL, 0600);
+ if (fd < 0) {
+ if (errno != EEXIST)
+ return -1;
+ /* Already present; reopen it nonexclusively */
+ fd = shm_open(name, O_RDWR, 0);
+ if (fd < 0)
+ return -1;
+ /* Theoretically, the other process might have created the object, but
+ * isn't done with initializing it yet. So in the creating process,
+ * we only reserve a small amount of memory, just enough for the barrier,
+ * and init the barrier, and then resize the object. Thus, here we first
+ * wait for the object to be resized, then wait on the barrier.
+ */
+ while (1) {
+ if (fstat(fd, &stat) < 0)
+ return -1;
+ if (stat.st_size > SHMEM_MIN_SIZE)
+ break;
+ }
+ len = stat.st_size;
- /* Only map the first part (shmem_shared) first, read the correct length,
- * the map it with this length. */
- len = sizeof(struct memtype) + sizeof(struct memblock) + sizeof(struct shmem_shared);
+ base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (base == MAP_FAILED)
+ return -1;
+ /* This relies on the behaviour of the node and the allocator; it assumes
+ * that memtype_managed is used and the shmem_shared is the first allocated object */
+ cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
+ shared = (struct shmem_shared *) cptr;
+
+ pthread_barrier_wait(&shared->start_bar);
+ shm->base = base;
+ shm->shared = shared;
+ shm->len = 0;
+ shm->secondary = 1;
+
+ return 0;
+ }
+ /* Only map the barrier and init it */
+ ret = ftruncate(fd, SHMEM_MIN_SIZE);
+ if (ret < 0)
+ return -1;
+ base = mmap(NULL, SHMEM_MIN_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (base == MAP_FAILED)
+ return -1;
+
+ /* Again, this assumes that memtype_managed uses first-fit */
+ cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
+ shared = (struct shmem_shared*) cptr;
+ pthread_barrierattr_init(&attr);
+ pthread_barrierattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
+ pthread_barrier_init(&shared->start_bar, &attr, 2);
+
+ /* Remap it with the real size */
+ len = shmem_total_size(conf->queuelen, conf->queuelen, conf->samplelen);
+ if (munmap(base, SHMEM_MIN_SIZE) < 0)
+ return -1;
+ if (ftruncate(fd, len) < 0)
+ return -1;
base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
- return NULL;
+ return -1;
- /* This relies on the behaviour of the node and the allocator; it assumes
- * that memtype_managed is used and the shmem_shared is the first allocated object */
- cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
- shm = (struct shmem_shared *) cptr;
- newlen = shm->len;
+ /* Init everything else */
+ manager = memtype_managed_init(base, len);
+ shared = memory_alloc(manager, sizeof(struct shmem_shared));
+ if (!shared) {
+ errno = ENOMEM;
+ return -1;
+ }
- ret = munmap(base, len);
- if (ret)
- return NULL;
+ memset((char *) shared + sizeof(pthread_barrier_t), 0, sizeof(struct shmem_shared) - sizeof(pthread_barrier_t));
+ shared->polling = conf->polling;
- base = mmap(NULL, newlen, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
- if (base == MAP_FAILED)
- return NULL;
+ ret = shared->polling ? queue_init(&shared->queue[0].q, conf->queuelen, manager)
+ : queue_signalled_init(&shared->queue[0].qs, conf->queuelen, manager);
+ if (ret) {
+ errno = ENOMEM;
+ return -1;
+ }
- /* Adress might have moved */
- cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
- if (base_ptr)
- *base_ptr = base;
+ ret = shared->polling ? queue_init(&shared->queue[1].q, conf->queuelen, manager)
+ : queue_signalled_init(&shared->queue[1].qs, conf->queuelen, manager);
+ if (ret) {
+ errno = ENOMEM;
+ return -1;
+ }
- shm = (struct shmem_shared *) cptr;
-
- pthread_barrier_wait(&shm->start_bar);
-
- return shm;
+ ret = pool_init(&shared->pool, 2 * conf->queuelen, SAMPLE_LEN(conf->samplelen), manager);
+ if (ret) {
+ errno = ENOMEM;
+ return -1;
+ }
+ shm->base = base;
+ shm->len = len;
+ shm->shared = shared;
+ shm->secondary = 0;
+ pthread_barrier_wait(&shared->start_bar);
+ return 1;
}
-int shmem_shared_close(struct shmem_shared *shm, void *base)
+int shmem_int_close(struct shmem_int *shm)
{
- if (shm->polling)
- queue_close(&shm->in.q);
+ union shmem_queue * queue = &shm->shared->queue[shm->secondary];
+ if (shm->shared->polling)
+ queue_close(&queue->q);
else
- queue_signalled_close(&shm->in.qs);
+ queue_signalled_close(&queue->qs);
+ if (!shm->secondary)
+ /* Ignore the error here; the only thing that is really possible is that
+ * the object was deleted already, which we can't do anything about anyway. */
+ shm_unlink(shm->name);
- return munmap(base, shm->len);
+ return munmap(shm->base, shm->len);
}
-int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
+int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
{
- return shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt)
- : queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt);
+ union shmem_queue* queue = &shm->shared->queue[1-shm->secondary];
+ return shm->shared->polling ? queue_pull_many(&queue->q, (void **) smps, cnt)
+ : queue_signalled_pull_many(&queue->qs, (void **) smps, cnt);
}
-int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
+int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
{
- return shm->polling ? queue_push_many(&shm->in.q, (void **) smps, cnt)
- : queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt);
+ union shmem_queue* queue = &shm->shared->queue[shm->secondary];
+ return shm->shared->polling ? queue_push_many(&queue->q, (void **) smps, cnt)
+ : queue_signalled_push_many(&queue->qs, (void **) smps, cnt);
}
diff --git a/src/test-shmem.c b/src/test-shmem.c
index 06b38fab4..bde800e9e 100644
--- a/src/test-shmem.c
+++ b/src/test-shmem.c
@@ -25,7 +25,6 @@
#include
#include
-#include
#include
#include
#include
@@ -34,7 +33,7 @@
#include
void *base;
-struct shmem_shared *shared;
+struct shmem_int shm;
void usage()
{
@@ -45,18 +44,23 @@ void usage()
void quit(int sig)
{
- shmem_shared_close(shared, base);
+ shmem_int_close(&shm);
exit(1);
}
int main(int argc, char* argv[])
{
struct log log;
+ int readcnt, writecnt, avail;
+ struct shmem_conf conf = {
+ .queuelen = DEFAULT_SHMEM_QUEUELEN,
+ .samplelen = DEFAULT_SHMEM_SAMPLELEN,
+ .polling = 0,
+ };
log_init(&log, V, LOG_ALL);
log_start(&log);
- int readcnt, writecnt, avail;
if (argc != 3) {
usage();
@@ -66,8 +70,7 @@ int main(int argc, char* argv[])
char *object = argv[1];
int vectorize = atoi(argv[2]);
- shared = shmem_shared_open(object, &base);
- if (!shared)
+ if (shmem_int_open(object, &shm, &conf) < 0)
serror("Failed to open shmem interface");
signal(SIGINT, quit);
@@ -75,13 +78,13 @@ int main(int argc, char* argv[])
struct sample *insmps[vectorize], *outsmps[vectorize];
while (1) {
- readcnt = shmem_shared_read(shared, insmps, vectorize);
+ readcnt = shmem_int_read(&shm, insmps, vectorize);
if (readcnt == -1) {
printf("Node stopped, exiting");
break;
}
- avail = sample_alloc(&shared->pool, outsmps, readcnt);
+ avail = sample_alloc(&shm.shared->pool, outsmps, readcnt);
if (avail < readcnt)
warn("Pool underrun: %d / %d\n", avail, readcnt);
@@ -98,10 +101,11 @@ int main(int argc, char* argv[])
for (int i = 0; i < readcnt; i++)
sample_put(insmps[i]);
- writecnt = shmem_shared_write(shared, outsmps, avail);
+ writecnt = shmem_int_write(&shm, outsmps, avail);
if (writecnt < avail)
warn("Short write");
info("Read / Write: %d / %d", readcnt, writecnt);
}
+ shmem_int_close(&shm);
}