1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

rework shmem interface to be "symmetric"

The notion of "VILLASnode" / "external program" is removed from the
interface. Now there's only a distiction regarding which process opened
the object first, and this is handled transparently by the functions.

This enables some new uses for the interface, for example connecting
two instances of VILLASnode or two instances of DPsim directly.
This commit is contained in:
Georg Reinke 2017-06-08 12:43:24 +02:00
parent 1d011e6d8f
commit 2ba3c1cbe7
5 changed files with 194 additions and 156 deletions

View file

@ -34,24 +34,17 @@
#include "pool.h"
#include "queue.h"
#include "config.h"
#define DEFAULT_SHMEM_QUEUELEN 512
#define DEFAULT_SHMEM_SAMPLELEN DEFAULT_SAMPLELEN
#include "shmem.h"
/** Node-type for shared memory communication.
* @see node_type
*/
struct shmem {
const char* name; /**< Name of the shm object. */
int samplelen; /**< Number of data entries for each sample. */
int queuelen; /**< Size of ingoing and outgoing queue, respectively. */
int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to outgoing queue. */
struct shmem_conf conf; /**< Interface configuration struct. */
char **exec; /**< External program to execute on start. */
struct memtype *manager; /**< Manager for the shared memory region. */
int fd; /**< Handle as returned by shm_open().*/
void *base; /**< Pointer to the shared memory region. */
struct shmem_shared *shared; /**< Shared datastructure. */
struct shmem_int intf; /**< Shmem interface */
};
/** @see node_type::print */

View file

@ -33,9 +33,14 @@
extern "C" {
#endif
#include "config.h"
#include "pool.h"
#include "queue.h"
#include "queue_signalled.h"
#include "sample.h"
#define DEFAULT_SHMEM_QUEUELEN 512
#define DEFAULT_SHMEM_SAMPLELEN DEFAULT_SAMPLELEN
/** A signalled queue or a regular (polling) queue, depending on the polling setting. */
union shmem_queue {
@ -43,56 +48,75 @@ union shmem_queue {
struct queue_signalled qs;
};
#define SHMEM_MIN_SIZE (sizeof(struct memtype) + sizeof(struct memblock) + sizeof(pthread_barrier_t) + sizeof(pthread_barrierattr_t))
/** Struct containing all parameters that need to be known when creating a new
* shared memory object. */
struct shmem_conf {
int polling; /*< Whether to use polling instead of POSIX CVs */
int queuelen; /*< Size of the queues (in elements) */
int samplelen; /*< Maximum number of data entries in a single sample */
};
/** The structure that actually resides in the shared memory. */
struct shmem_shared {
size_t len; /**< Total size of the shared memory region.*/
pthread_barrier_t start_bar; /**< Barrier for synchronizing the start of both programs. */
int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */
union shmem_queue in; /**< Queue for samples passed from external program to node.*/
union shmem_queue out; /**< Queue for samples passed from node to external program.*/
union shmem_queue queue[2]; /**< Queues for samples passed in both directions.
0: primary -> secondary, 1: secondary -> primary */
struct pool pool; /**< Pool for the samples in the queues. */
};
pthread_barrier_t start_bar; /**< Barrier for synchronizing the start of both programs. */
pthread_barrierattr_t start_attr;
atomic_size_t node_stopped; /**< Set to 1 by VILLASNode if it is stopped/killed. */
atomic_size_t ext_stopped; /**< Set to 1 by the external program if it is stopped/killed. */
struct shmem_int {
void* base; /**< Base address of the mapping (needed for munmap) */
const char* name; /**< Name of the shared memory object */
size_t len; /**< Total size of the shared memory region */
struct shmem_shared *shared; /**< Pointer to mapped shared structure */
int secondary; /**< Set to 1 if this is the secondary user (i.e. not the one
that created the object); 0 otherwise. */
};
/** Open the shared memory object and retrieve / initialize the shared data structures.
* If the object isn't already present, it is created instead.
* @param[in] name Name of the POSIX shared memory object.
* @param[inout] base_ptr The base address of the shared memory region is written to this pointer.
* @retval NULL An error occurred; errno is set appropiately.
* @param[inout] shm The shmem_int structure that should be used for following
* calls will be written to this pointer.
* @param[in] conf Configuration parameters for the interface. This struct is
* ignored if the shared memory object is already present.
* @retval 1 The object was created successfully.
* @retval 0 The existing object was opened successfully.
* @retval <0 An error occured; errno is set accordingly.
*/
struct shmem_shared * shmem_shared_open(const char* name, void **base_ptr);
int shmem_int_open(const char* name, struct shmem_int* shm, struct shmem_conf* conf);
/** Close and destroy the shared memory object and related structures.
* @param shm The shared memory structure.
* @param base The base address as returned by shmem_shared_open.
/** Close and destroy the shared memory interface and related structures.
* @param shm The shared memory interface.
* @retval 0 Closing successfull.
* @retval <0 An error occurred; errno is set appropiately.
*/
int shmem_shared_close(struct shmem_shared *shm, void *base);
int shmem_int_close(struct shmem_int *shm);
/** Read samples from VILLASNode.
* @param shm The shared memory structure.
/** Read samples from the interface.
* @param shm The shared memory interface.
* @param smps An array where the pointers to the samples will be written. The samples
* must be freed with sample_put after use.
* @param cnt Number of samples to be read.
* @retval >=0 Number of samples that were read. Can be less than cnt (including 0) in case not enough samples were available.
* @retval -1 VILLASNode exited; no samples can be read anymore.
* @retval -1 The other process closed the interface; no samples can be read anymore.
*/
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt);
int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt);
/** Write samples to VILLASNode.
* @param shm The shared memory structure.
/** Write samples to the interface.
* @param shm The shared memory interface.
* @param smps The samples to be written. Must be allocated from shm->pool.
* @param cnt Number of samples to write.
* @retval >=0 Number of samples that were successfully written. Can be less than cnt (including 0) in case of a full queue.
* @retval -1 VILLASNode exited; no samples can be written anymore.
* @retval -1 The write failed for some reason; no more samples can be written.
*/
int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt);
int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt);
/** Returns the total size of the shared memory region with the given size of
* the input/output queues (in elements) and the given number of data elements

View file

@ -30,9 +30,9 @@
#include "kernel/kernel.h"
#include "log.h"
#include "shmem.h"
#include "nodes/shmem.h"
#include "plugin.h"
#include "shmem.h"
#include "timing.h"
#include "utils.h"
@ -42,12 +42,12 @@ int shmem_parse(struct node *n, config_setting_t *cfg)
if (!config_setting_lookup_string(cfg, "name", &shm->name))
cerror(cfg, "Missing shared memory object name");
if (!config_setting_lookup_int(cfg, "queuelen", &shm->queuelen))
shm->queuelen = DEFAULT_SHMEM_QUEUELEN;
if (!config_setting_lookup_int(cfg, "samplelen", &shm->samplelen))
shm->samplelen = DEFAULT_SHMEM_SAMPLELEN;
if (!config_setting_lookup_bool(cfg, "polling", &shm->polling))
shm->polling = false;
if (!config_setting_lookup_int(cfg, "queuelen", &shm->conf.queuelen))
shm->conf.queuelen = DEFAULT_SHMEM_QUEUELEN;
if (!config_setting_lookup_int(cfg, "samplelen", &shm->conf.samplelen))
shm->conf.samplelen = DEFAULT_SHMEM_SAMPLELEN;
if (!config_setting_lookup_bool(cfg, "polling", &shm->conf.polling))
shm->conf.polling = false;
config_setting_t *exec_cfg = config_setting_lookup(cfg, "exec");
if (!exec_cfg)
@ -77,56 +77,15 @@ int shmem_open(struct node *n)
{
struct shmem *shm = n->_vd;
int ret;
size_t len;
shm->fd = shm_open(shm->name, O_RDWR | O_CREAT, 0600);
if (shm->fd < 0)
serror("Opening shared memory object failed");
len = shmem_total_size(shm->queuelen, shm->queuelen, shm->samplelen);
ret = ftruncate(shm->fd, len);
if (ret < 0)
serror("Setting size of shared memory object failed");
shm->base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, shm->fd, 0);
if (shm->base == MAP_FAILED)
serror("Mapping shared memory failed");
shm->manager = memtype_managed_init(shm->base, len);
shm->shared = memory_alloc(shm->manager, sizeof(struct shmem_shared));
if (!shm->shared)
error("Shared memory shared struct allocation failed (not enough memory?)");
memset(shm->shared, 0, sizeof(struct shmem_shared));
shm->shared->len = len;
shm->shared->polling = shm->polling;
ret = shm->polling ? queue_init(&shm->shared->in.q, shm->queuelen, shm->manager)
: queue_signalled_init(&shm->shared->in.qs, shm->queuelen, shm->manager);
if (ret)
error("Shared memory queue allocation failed (not enough memory?)");
ret = shm->polling ? queue_init(&shm->shared->out.q, shm->queuelen, shm->manager)
: queue_signalled_init(&shm->shared->out.qs, shm->queuelen, shm->manager);
if (ret)
error("Shared memory queue allocation failed (not enough memory?)");
ret = pool_init(&shm->shared->pool, 2 * shm->queuelen, SAMPLE_LEN(shm->samplelen), shm->manager);
if (ret)
error("Shared memory pool allocation failed (not enough memory?)");
pthread_barrierattr_init(&shm->shared->start_attr);
pthread_barrierattr_setpshared(&shm->shared->start_attr, PTHREAD_PROCESS_SHARED);
pthread_barrier_init(&shm->shared->start_bar, &shm->shared->start_attr, 2);
if (shm->exec) {
ret = spawn(shm->exec[0], shm->exec);
if (!ret)
serror("Failed to spawn external program");
}
pthread_barrier_wait(&shm->shared->start_bar);
ret = shmem_int_open(shm->name, &shm->intf, &shm->conf);
if (ret < 0)
serror("Opening shared memory interface failed");
return 0;
}
@ -134,33 +93,17 @@ int shmem_open(struct node *n)
int shmem_close(struct node *n)
{
struct shmem* shm = n->_vd;
int ret;
if (shm->polling)
queue_close(&shm->shared->out.q);
else
queue_signalled_close(&shm->shared->out.qs);
/* Don't destroy the data structures yet, since the other process might
* still be using them. Once both processes are done and have unmapped the
* memory, it will be freed anyway. */
ret = munmap(shm->base, shm->shared->len);
if (ret != 0)
return ret;
return shm_unlink(shm->name);
return shmem_int_close(&shm->intf);
}
int shmem_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct shmem *shm = n->_vd;
int recv;
struct sample *shared_smps[cnt];
recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) shared_smps, cnt)
: queue_signalled_pull_many(&shm->shared->in.qs, (void**) shared_smps, cnt);
recv = shmem_int_read(&shm->intf, shared_smps, cnt);
if (recv <= 0)
return recv;
@ -178,7 +121,7 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */
int avail, pushed, len;
avail = sample_alloc(&shm->shared->pool, shared_smps, cnt);
avail = sample_alloc(&shm->intf.shared->pool, shared_smps, cnt);
if (avail != cnt)
warn("Pool underrun for shmem node %s", shm->name);
@ -197,9 +140,7 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
shared_smps[i]->length = len;
}
pushed = shm->polling ? queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail)
: queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail);
pushed = shmem_int_write(&shm->intf, shared_smps, avail);
if (pushed != avail)
warn("Outgoing queue overrun for node %s", node_name(n));
@ -212,7 +153,7 @@ char * shmem_print(struct node *n)
char *buf = NULL;
strcatf(&buf, "name=%s, queuelen=%d, samplelen=%d, polling=%s",
shm->name, shm->queuelen, shm->samplelen, shm->polling ? "yes" : "no");
shm->name, shm->conf.queuelen, shm->conf.samplelen, shm->conf.polling ? "yes" : "no");
if (shm->exec) {
strcatf(&buf, ", exec='");

View file

@ -21,6 +21,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
@ -47,70 +48,145 @@ size_t shmem_total_size(int insize, int outsize, int sample_size)
+ 1024;
}
struct shmem_shared * shmem_shared_open(const char *name, void **base_ptr)
int shmem_int_open(const char *name, struct shmem_int* shm, struct shmem_conf* conf)
{
struct shmem_shared *shm;
size_t len, newlen;
struct shmem_shared *shared;
pthread_barrierattr_t attr;
struct memtype *manager;
struct stat stat;
size_t len;
void *base;
char *cptr;
int fd, ret;
fd = shm_open(name, O_RDWR, 0);
if (fd < 0)
return NULL;
shm->name = name;
fd = shm_open(name, O_RDWR|O_CREAT|O_EXCL, 0600);
if (fd < 0) {
if (errno != EEXIST)
return -1;
/* Already present; reopen it nonexclusively */
fd = shm_open(name, O_RDWR, 0);
if (fd < 0)
return -1;
/* Theoretically, the other process might have created the object, but
* isn't done with initializing it yet. So in the creating process,
* we only reserve a small amount of memory, just enough for the barrier,
* and init the barrier, and then resize the object. Thus, here we first
* wait for the object to be resized, then wait on the barrier.
*/
while (1) {
if (fstat(fd, &stat) < 0)
return -1;
if (stat.st_size > SHMEM_MIN_SIZE)
break;
}
len = stat.st_size;
/* Only map the first part (shmem_shared) first, read the correct length,
* the map it with this length. */
len = sizeof(struct memtype) + sizeof(struct memblock) + sizeof(struct shmem_shared);
base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
return -1;
/* This relies on the behaviour of the node and the allocator; it assumes
* that memtype_managed is used and the shmem_shared is the first allocated object */
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
shared = (struct shmem_shared *) cptr;
pthread_barrier_wait(&shared->start_bar);
shm->base = base;
shm->shared = shared;
shm->len = 0;
shm->secondary = 1;
return 0;
}
/* Only map the barrier and init it */
ret = ftruncate(fd, SHMEM_MIN_SIZE);
if (ret < 0)
return -1;
base = mmap(NULL, SHMEM_MIN_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
return -1;
/* Again, this assumes that memtype_managed uses first-fit */
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
shared = (struct shmem_shared*) cptr;
pthread_barrierattr_init(&attr);
pthread_barrierattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_barrier_init(&shared->start_bar, &attr, 2);
/* Remap it with the real size */
len = shmem_total_size(conf->queuelen, conf->queuelen, conf->samplelen);
if (munmap(base, SHMEM_MIN_SIZE) < 0)
return -1;
if (ftruncate(fd, len) < 0)
return -1;
base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
return NULL;
return -1;
/* This relies on the behaviour of the node and the allocator; it assumes
* that memtype_managed is used and the shmem_shared is the first allocated object */
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
shm = (struct shmem_shared *) cptr;
newlen = shm->len;
/* Init everything else */
manager = memtype_managed_init(base, len);
shared = memory_alloc(manager, sizeof(struct shmem_shared));
if (!shared) {
errno = ENOMEM;
return -1;
}
ret = munmap(base, len);
if (ret)
return NULL;
memset((char *) shared + sizeof(pthread_barrier_t), 0, sizeof(struct shmem_shared) - sizeof(pthread_barrier_t));
shared->polling = conf->polling;
base = mmap(NULL, newlen, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
return NULL;
ret = shared->polling ? queue_init(&shared->queue[0].q, conf->queuelen, manager)
: queue_signalled_init(&shared->queue[0].qs, conf->queuelen, manager);
if (ret) {
errno = ENOMEM;
return -1;
}
/* Adress might have moved */
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
if (base_ptr)
*base_ptr = base;
ret = shared->polling ? queue_init(&shared->queue[1].q, conf->queuelen, manager)
: queue_signalled_init(&shared->queue[1].qs, conf->queuelen, manager);
if (ret) {
errno = ENOMEM;
return -1;
}
shm = (struct shmem_shared *) cptr;
pthread_barrier_wait(&shm->start_bar);
return shm;
ret = pool_init(&shared->pool, 2 * conf->queuelen, SAMPLE_LEN(conf->samplelen), manager);
if (ret) {
errno = ENOMEM;
return -1;
}
shm->base = base;
shm->len = len;
shm->shared = shared;
shm->secondary = 0;
pthread_barrier_wait(&shared->start_bar);
return 1;
}
int shmem_shared_close(struct shmem_shared *shm, void *base)
int shmem_int_close(struct shmem_int *shm)
{
if (shm->polling)
queue_close(&shm->in.q);
union shmem_queue * queue = &shm->shared->queue[shm->secondary];
if (shm->shared->polling)
queue_close(&queue->q);
else
queue_signalled_close(&shm->in.qs);
queue_signalled_close(&queue->qs);
if (!shm->secondary)
/* Ignore the error here; the only thing that is really possible is that
* the object was deleted already, which we can't do anything about anyway. */
shm_unlink(shm->name);
return munmap(base, shm->len);
return munmap(shm->base, shm->len);
}
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
{
return shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt)
: queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt);
union shmem_queue* queue = &shm->shared->queue[1-shm->secondary];
return shm->shared->polling ? queue_pull_many(&queue->q, (void **) smps, cnt)
: queue_signalled_pull_many(&queue->qs, (void **) smps, cnt);
}
int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
{
return shm->polling ? queue_push_many(&shm->in.q, (void **) smps, cnt)
: queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt);
union shmem_queue* queue = &shm->shared->queue[shm->secondary];
return shm->shared->polling ? queue_push_many(&queue->q, (void **) smps, cnt)
: queue_signalled_push_many(&queue->qs, (void **) smps, cnt);
}

View file

@ -25,7 +25,6 @@
#include <villas/log.h>
#include <villas/node.h>
#include <villas/nodes/shmem.h>
#include <villas/pool.h>
#include <villas/sample.h>
#include <villas/shmem.h>
@ -34,7 +33,7 @@
#include <string.h>
void *base;
struct shmem_shared *shared;
struct shmem_int shm;
void usage()
{
@ -45,18 +44,23 @@ void usage()
void quit(int sig)
{
shmem_shared_close(shared, base);
shmem_int_close(&shm);
exit(1);
}
int main(int argc, char* argv[])
{
struct log log;
int readcnt, writecnt, avail;
struct shmem_conf conf = {
.queuelen = DEFAULT_SHMEM_QUEUELEN,
.samplelen = DEFAULT_SHMEM_SAMPLELEN,
.polling = 0,
};
log_init(&log, V, LOG_ALL);
log_start(&log);
int readcnt, writecnt, avail;
if (argc != 3) {
usage();
@ -66,8 +70,7 @@ int main(int argc, char* argv[])
char *object = argv[1];
int vectorize = atoi(argv[2]);
shared = shmem_shared_open(object, &base);
if (!shared)
if (shmem_int_open(object, &shm, &conf) < 0)
serror("Failed to open shmem interface");
signal(SIGINT, quit);
@ -75,13 +78,13 @@ int main(int argc, char* argv[])
struct sample *insmps[vectorize], *outsmps[vectorize];
while (1) {
readcnt = shmem_shared_read(shared, insmps, vectorize);
readcnt = shmem_int_read(&shm, insmps, vectorize);
if (readcnt == -1) {
printf("Node stopped, exiting");
break;
}
avail = sample_alloc(&shared->pool, outsmps, readcnt);
avail = sample_alloc(&shm.shared->pool, outsmps, readcnt);
if (avail < readcnt)
warn("Pool underrun: %d / %d\n", avail, readcnt);
@ -98,10 +101,11 @@ int main(int argc, char* argv[])
for (int i = 0; i < readcnt; i++)
sample_put(insmps[i]);
writecnt = shmem_shared_write(shared, outsmps, avail);
writecnt = shmem_int_write(&shm, outsmps, avail);
if (writecnt < avail)
warn("Short write");
info("Read / Write: %d / %d", readcnt, writecnt);
}
shmem_int_close(&shm);
}