1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

shmem: use one region for each direction

This commit is contained in:
Georg Reinke 2017-06-14 13:00:43 +02:00
parent e2353e937a
commit 6265c39ce2
6 changed files with 103 additions and 131 deletions

View file

@ -21,7 +21,8 @@ nodes = {
},
shmem = {
type = "shmem",
name = "/villas1",
out_name = "/villas1-out",
in_name = "/villas1-in",
samplelen = 4,
queuelen = 32,
polling = false,

View file

@ -40,7 +40,8 @@
* @see node_type
*/
struct shmem {
const char* name; /**< Name of the shm object. */
const char* out_name; /**< Name of the shm object for the output queue. */
const char* in_name; /**< Name of the shm object for the input queue. */
struct shmem_conf conf; /**< Interface configuration struct. */
char **exec; /**< External program to execute on start. */
struct shmem_int intf; /**< Shmem interface */

View file

@ -59,38 +59,35 @@ struct shmem_conf {
/** The structure that actually resides in the shared memory. */
struct shmem_shared {
pthread_barrier_t start_bar; /**< Barrier for synchronizing the start of both programs. */
int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */
int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */
union shmem_queue queue; /**< Queues for samples passed in both directions. */
union shmem_queue queue[2]; /**< Queues for samples passed in both directions.
0: primary -> secondary, 1: secondary -> primary */
struct pool pool; /**< Pool for the samples in the queues. */
};
struct pool pool; /**< Pool for the samples in the queues. */
struct shmem_dir {
void *base;
const char *name;
size_t len;
struct shmem_shared *shared;
};
/** Main structure representing the shared memory interface. */
struct shmem_int {
void* base; /**< Base address of the mapping (needed for munmap) */
const char* name; /**< Name of the shared memory object */
size_t len; /**< Total size of the shared memory region */
struct shmem_shared *shared; /**< Pointer to mapped shared structure */
int secondary; /**< Set to 1 if this is the secondary user (i.e. not the one
that created the object); 0 otherwise. */
struct shmem_dir read, write;
};
/** Open the shared memory object and retrieve / initialize the shared data structures.
* If the object isn't already present, it is created instead.
* @param[in] name Name of the POSIX shared memory object.
/** Open the shared memory objects and retrieve / initialize the shared data structures.
* @param[in] wname Name of the POSIX shared memory object containing the output queue.
* @param[in] rname Name of the POSIX shared memory object containing the input queue.
* @param[inout] shm The shmem_int structure that should be used for following
* calls will be written to this pointer.
* @param[in] conf Configuration parameters for the interface. This struct is
* ignored if the shared memory object is already present.
* @retval 1 The object was created successfully.
* @retval 0 The existing object was opened successfully.
* @param[in] conf Configuration parameters for the output queue.
* @retval 0 The objects were opened and initialized successfully.
* @retval <0 An error occured; errno is set accordingly.
*/
int shmem_int_open(const char* name, struct shmem_int* shm, struct shmem_conf* conf);
int shmem_int_open(const char* wname, const char* rname, struct shmem_int* shm, struct shmem_conf* conf);
/** Close and destroy the shared memory interface and related structures.
* @param shm The shared memory interface.

View file

@ -40,8 +40,10 @@ int shmem_parse(struct node *n, config_setting_t *cfg)
{
struct shmem *shm = n->_vd;
if (!config_setting_lookup_string(cfg, "name", &shm->name))
cerror(cfg, "Missing shared memory object name");
if (!config_setting_lookup_string(cfg, "out_name", &shm->out_name))
cerror(cfg, "Missing shared memory output queue name");
if (!config_setting_lookup_string(cfg, "in_name", &shm->in_name))
cerror(cfg, "Missing shared memory input queue name");
if (!config_setting_lookup_int(cfg, "queuelen", &shm->conf.queuelen))
shm->conf.queuelen = DEFAULT_SHMEM_QUEUELEN;
if (!config_setting_lookup_int(cfg, "samplelen", &shm->conf.samplelen))
@ -84,7 +86,7 @@ int shmem_open(struct node *n)
serror("Failed to spawn external program");
}
ret = shmem_int_open(shm->name, &shm->intf, &shm->conf);
ret = shmem_int_open(shm->out_name, shm->in_name, &shm->intf, &shm->conf);
if (ret < 0)
serror("Opening shared memory interface failed");
@ -105,6 +107,10 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt)
struct sample *shared_smps[cnt];
recv = shmem_int_read(&shm->intf, shared_smps, cnt);
if (recv < 0)
/* This can only really mean that the other process has exited, so close
* the interface to make sure the shared memory object is unlinked */
shmem_int_close(&shm->intf);
if (recv <= 0)
return recv;
@ -122,9 +128,9 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */
int avail, pushed, len;
avail = sample_alloc(&shm->intf.shared->pool, shared_smps, cnt);
avail = sample_alloc(&shm->intf.write.shared->pool, shared_smps, cnt);
if (avail != cnt)
warn("Pool underrun for shmem node %s", shm->name);
warn("Pool underrun for shmem node %s", shm->out_name);
for (int i = 0; i < avail; i++) {
/* Since the node isn't in shared memory, the source can't be accessed */
@ -153,8 +159,8 @@ char * shmem_print(struct node *n)
struct shmem *shm = n->_vd;
char *buf = NULL;
strcatf(&buf, "name=%s, queuelen=%d, samplelen=%d, polling=%s",
shm->name, shm->conf.queuelen, shm->conf.samplelen, shm->conf.polling ? "yes" : "no");
strcatf(&buf, "out_name=%s, in_name=%s, queuelen=%d, samplelen=%d, polling=%s",
shm->out_name, shm->in_name, shm->conf.queuelen, shm->conf.samplelen, shm->conf.polling ? "yes" : "no");
if (shm->exec) {
strcatf(&buf, ", exec='");

View file

@ -23,6 +23,7 @@
#include <errno.h>
#include <fcntl.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <sys/stat.h>
@ -48,85 +49,37 @@ size_t shmem_total_size(int insize, int outsize, int sample_size)
+ 1024;
}
int shmem_int_open(const char *name, struct shmem_int* shm, struct shmem_conf* conf)
int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, struct shmem_conf *conf)
{
struct shmem_shared *shared;
pthread_barrierattr_t attr;
struct memtype *manager;
struct stat stat;
char *cptr;
int fd, ret;
size_t len;
void *base;
char *cptr;
int fd, ret;
struct memtype *manager;
struct shmem_shared *shared;
struct stat stat_buf;
sem_t *sem_own, *sem_other;
shm->name = name;
fd = shm_open(name, O_RDWR|O_CREAT|O_EXCL, 0600);
if (fd < 0) {
if (errno != EEXIST)
return -1;
/* Already present; reopen it nonexclusively */
fd = shm_open(name, O_RDWR, 0);
if (fd < 0)
return -1;
/* Theoretically, the other process might have created the object, but
* isn't done with initializing it yet. So in the creating process,
* we only reserve a small amount of memory, just enough for the barrier,
* and init the barrier, and then resize the object. Thus, here we first
* wait for the object to be resized, then wait on the barrier.
*/
while (1) {
if (fstat(fd, &stat) < 0)
return -1;
if (stat.st_size > SHMEM_MIN_SIZE)
break;
}
len = stat.st_size;
base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
return -1;
/* This relies on the behaviour of the node and the allocator; it assumes
* that memtype_managed is used and the shmem_shared is the first allocated object */
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
shared = (struct shmem_shared *) cptr;
pthread_barrier_wait(&shared->start_bar);
shm->base = base;
shm->shared = shared;
shm->len = 0;
shm->secondary = 1;
return 0;
}
/* Only map the barrier and init it */
ret = ftruncate(fd, SHMEM_MIN_SIZE);
if (ret < 0)
/* Ensure both semaphores exist */
sem_own = sem_open(wname, O_CREAT, 0600, 0);
if (sem_own == SEM_FAILED)
return -1;
base = mmap(NULL, SHMEM_MIN_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
sem_other = sem_open(rname, O_CREAT, 0600, 0);
if (sem_other == SEM_FAILED)
return -1;
/* Again, this assumes that memtype_managed uses first-fit */
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
shared = (struct shmem_shared*) cptr;
pthread_barrierattr_init(&attr);
pthread_barrierattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_barrier_init(&shared->start_bar, &attr, 2);
/* Remap it with the real size */
/* Open and initialize the shared region for the output queue */
fd = shm_open(wname, O_RDWR|O_CREAT|O_EXCL, 0600);
if (fd < 0)
return -1;
len = shmem_total_size(conf->queuelen, conf->queuelen, conf->samplelen);
if (munmap(base, SHMEM_MIN_SIZE) < 0)
return -1;
if (ftruncate(fd, len) < 0)
return -1;
base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
return -1;
close(fd);
/* Init everything else */
manager = memtype_managed_init(base, len);
shared = memory_alloc(manager, sizeof(struct shmem_shared));
if (!shared) {
@ -134,64 +87,76 @@ int shmem_int_open(const char *name, struct shmem_int* shm, struct shmem_conf* c
return -1;
}
memset((char *) shared + sizeof(pthread_barrier_t), 0, sizeof(struct shmem_shared) - sizeof(pthread_barrier_t));
memset(shared, 0, sizeof(struct shmem_shared));
shared->polling = conf->polling;
ret = shared->polling ? queue_init(&shared->queue[0].q, conf->queuelen, manager)
: queue_signalled_init(&shared->queue[0].qs, conf->queuelen, manager);
ret = shared->polling ? queue_init(&shared->queue.q, conf->queuelen, manager)
: queue_signalled_init(&shared->queue.qs, conf->queuelen, manager);
if (ret) {
errno = ENOMEM;
return -1;
}
ret = shared->polling ? queue_init(&shared->queue[1].q, conf->queuelen, manager)
: queue_signalled_init(&shared->queue[1].qs, conf->queuelen, manager);
ret = pool_init(&shared->pool, conf->queuelen, SAMPLE_LEN(conf->samplelen), manager);
if (ret) {
errno = ENOMEM;
return -1;
}
ret = pool_init(&shared->pool, 2 * conf->queuelen, SAMPLE_LEN(conf->samplelen), manager);
if (ret) {
errno = ENOMEM;
shm->write.base = base;
shm->write.name = wname;
shm->write.len = len;
shm->write.shared = shared;
/* Post own semaphore and wait on the other one, so both processes know that
* both regions are initialized */
sem_post(sem_own);
sem_wait(sem_other);
/* Open and map the other region */
fd = shm_open(rname, O_RDWR, 0);
if (fd < 0)
return -1;
if (fstat(fd, &stat_buf) < 0)
return -1;
len = stat_buf.st_size;
base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
return -1;
}
shm->base = base;
shm->len = len;
shm->shared = shared;
shm->secondary = 0;
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
shared = (struct shmem_shared *) cptr;
shm->read.base = base;
shm->read.name = rname;
shm->read.len = len;
shm->read.shared = shared;
pthread_barrier_wait(&shared->start_bar);
return 1;
/* Unlink the semaphores; we don't need them anymore */
sem_unlink(wname);
return 0;
}
int shmem_int_close(struct shmem_int *shm)
{
union shmem_queue * queue = &shm->shared->queue[shm->secondary];
if (shm->shared->polling)
queue_close(&queue->q);
if (shm->write.shared->polling)
queue_close(&shm->write.shared->queue.q);
else
queue_signalled_close(&queue->qs);
if (!shm->secondary)
/* Ignore the error here; the only thing that is really possible is that
* the object was deleted already, which we can't do anything about anyway. */
shm_unlink(shm->name);
queue_signalled_close(&shm->write.shared->queue.qs);
return munmap(shm->base, shm->len);
munmap(shm->read.base, shm->read.len);
munmap(shm->write.base, shm->write.len);
shm_unlink(shm->write.name);
return 0;
}
int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
{
union shmem_queue* queue = &shm->shared->queue[1-shm->secondary];
return shm->shared->polling ? queue_pull_many(&queue->q, (void **) smps, cnt)
: queue_signalled_pull_many(&queue->qs, (void **) smps, cnt);
return shm->read.shared->polling ? queue_pull_many(&shm->read.shared->queue.q, (void **) smps, cnt)
: queue_signalled_pull_many(&shm->read.shared->queue.qs, (void **) smps, cnt);
}
int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
{
union shmem_queue* queue = &shm->shared->queue[shm->secondary];
return shm->shared->polling ? queue_push_many(&queue->q, (void **) smps, cnt)
: queue_signalled_push_many(&queue->qs, (void **) smps, cnt);
return shm->write.shared->polling ? queue_push_many(&shm->write.shared->queue.q, (void **) smps, cnt)
: queue_signalled_push_many(&shm->write.shared->queue.qs, (void **) smps, cnt);
}

View file

@ -37,8 +37,9 @@ struct shmem_int shm;
void usage()
{
printf("Usage: villas-test-shmem SHM_NAME VECTORIZE\n");
printf(" SHMNAME name of the shared memory object\n");
printf("Usage: villas-test-shmem WNAME VECTORIZE\n");
printf(" WNAME name of the shared memory object for the output queue\n");
printf(" RNAME name of the shared memory object for the input queue\n");
printf(" VECTORIZE maximum number of samples to read/write at a time\n");
}
@ -62,15 +63,16 @@ int main(int argc, char* argv[])
log_start(&log);
if (argc != 3) {
if (argc != 4) {
usage();
return 1;
}
char *object = argv[1];
int vectorize = atoi(argv[2]);
char *wname = argv[1];
char *rname = argv[2];
int vectorize = atoi(argv[3]);
if (shmem_int_open(object, &shm, &conf) < 0)
if (shmem_int_open(wname, rname, &shm, &conf) < 0)
serror("Failed to open shmem interface");
signal(SIGINT, quit);
@ -84,7 +86,7 @@ int main(int argc, char* argv[])
break;
}
avail = sample_alloc(&shm.shared->pool, outsmps, readcnt);
avail = sample_alloc(&shm.write.shared->pool, outsmps, readcnt);
if (avail < readcnt)
warn("Pool underrun: %d / %d\n", avail, readcnt);