mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
move more information to shared part of shmem node
This commit is contained in:
parent
3a7af08799
commit
61b10ed4f1
5 changed files with 50 additions and 22 deletions
|
@ -16,7 +16,6 @@ struct shmem {
|
|||
int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */
|
||||
int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */
|
||||
|
||||
size_t len; /**< Overall size of shared memory object. */
|
||||
struct memtype *manager; /**< Manager for the shared memory region. */
|
||||
int fd; /**< Handle as returned by shm_open().*/
|
||||
void *base; /**< Pointer to the shared memory region. */
|
||||
|
|
|
@ -11,13 +11,20 @@ union shmem_queue {
|
|||
struct queue_signalled qs;
|
||||
};
|
||||
|
||||
/** The structure that actually resides in the shared memory. TODO better name?*/
|
||||
/** The structure that actually resides in the shared memory. */
|
||||
struct shmem_shared {
|
||||
size_t len; /**< Total size of the shared memory region.*/
|
||||
union shmem_queue in; /**< Queue for samples passed from external program to node.*/
|
||||
int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */
|
||||
union shmem_queue out; /**< Queue for samples passed from node to external program.*/
|
||||
int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */
|
||||
struct pool pool; /**< Pool for the samples in the queues. */
|
||||
};
|
||||
|
||||
struct shmem_shared * shmem_int_open(const char* name, size_t len);
|
||||
struct shmem_shared * shmem_shared_open(const char* name);
|
||||
|
||||
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt);
|
||||
|
||||
int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt);
|
||||
|
||||
size_t shmem_total_size(int insize, int outsize, int sample_size);
|
||||
|
|
|
@ -40,20 +40,22 @@ int shmem_open(struct node *n) {
|
|||
serror("Opening shared memory object failed");
|
||||
|
||||
shm->fd = r;
|
||||
shm->len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size);
|
||||
if (ftruncate(shm->fd, shm->len) < 0)
|
||||
size_t len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size);
|
||||
if (ftruncate(shm->fd, len) < 0)
|
||||
serror("Setting size of shared memory object failed");
|
||||
/* TODO: we could use huge pages here as well */
|
||||
shm->base = mmap(NULL, shm->len, PROT_READ|PROT_WRITE, MAP_SHARED, shm->fd, 0);
|
||||
shm->base = mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_SHARED, shm->fd, 0);
|
||||
|
||||
if (shm->base == MAP_FAILED)
|
||||
serror("Mapping shared memory failed");
|
||||
|
||||
shm->manager = memtype_managed_init(shm->base, shm->len);
|
||||
shm->manager = memtype_managed_init(shm->base, len);
|
||||
shm->shared = memory_alloc(shm->manager, sizeof(struct shmem_shared));
|
||||
if (!shm->shared)
|
||||
error("Shm shared struct allocation failed (not enough memory?)");
|
||||
memset(shm->shared, 0, sizeof(struct shmem_shared));
|
||||
shm->shared->len = len;
|
||||
shm->shared->cond_in = shm->cond_in;
|
||||
shm->shared->cond_out = shm->cond_out;
|
||||
if (shm->cond_in) {
|
||||
if (queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager) < 0)
|
||||
error("Shm queue allocation failed (not enough memory?)");
|
||||
|
@ -76,6 +78,7 @@ int shmem_open(struct node *n) {
|
|||
|
||||
int shmem_close(struct node *n) {
|
||||
struct shmem* shm = n->_vd;
|
||||
size_t len = shm->shared->len;
|
||||
if (shm->cond_in)
|
||||
queue_signalled_destroy(&shm->shared->in.qs);
|
||||
else
|
||||
|
@ -85,7 +88,7 @@ int shmem_close(struct node *n) {
|
|||
else
|
||||
queue_destroy(&shm->shared->out.q);
|
||||
pool_destroy(&shm->shared->pool);
|
||||
int r = munmap(shm->base, shm->len);
|
||||
int r = munmap(shm->base, len);
|
||||
if (r != 0)
|
||||
return r;
|
||||
return shm_unlink(shm->name);
|
||||
|
|
28
lib/shmem.c
28
lib/shmem.c
|
@ -24,16 +24,42 @@ size_t shmem_total_size(int insize, int outsize, int sample_size)
|
|||
+ 1024;
|
||||
}
|
||||
|
||||
struct shmem_shared* shmem_int_open(const char *name, size_t len)
|
||||
struct shmem_shared* shmem_shared_open(const char *name)
|
||||
{
|
||||
int fd = shm_open(name, O_RDWR, 0);
|
||||
if (fd < 0)
|
||||
return NULL;
|
||||
/* Only map the first part (shmem_shared) first, read the correct length,
|
||||
* the map it with this length. */
|
||||
size_t len = sizeof(struct memtype) + sizeof(struct memblock) + sizeof(struct shmem_shared);
|
||||
void *base = mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
|
||||
if (base == MAP_FAILED)
|
||||
return NULL;
|
||||
/* This relies on the behaviour of the node and the allocator; it assumes
|
||||
* that memtype_managed is used and the shmem_shared is the first allocated object */
|
||||
char *cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
|
||||
struct shmem_shared *shm = (struct shmem_shared *) cptr;
|
||||
size_t newlen = shm->len;
|
||||
if (munmap(base, len))
|
||||
return NULL;
|
||||
base = mmap(NULL, newlen, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
|
||||
if (base == MAP_FAILED)
|
||||
return NULL;
|
||||
/* Adress might have moved */
|
||||
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
|
||||
return (struct shmem_shared *) cptr;
|
||||
}
|
||||
|
||||
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) {
|
||||
if (shm->cond_out)
|
||||
return queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt);
|
||||
else
|
||||
return queue_pull_many(&shm->out.q, (void **) smps, cnt);
|
||||
}
|
||||
|
||||
int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) {
|
||||
if (shm->cond_in)
|
||||
return queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt);
|
||||
else
|
||||
return queue_pull_many(&shm->in.q, (void **) smps, cnt);
|
||||
}
|
||||
|
|
17
src/shmem.c
17
src/shmem.c
|
@ -40,19 +40,15 @@ int main(int argc, char* argv[])
|
|||
error("Node '%s' does not exist!", argv[2]);
|
||||
|
||||
struct shmem* shm = node->_vd;
|
||||
size_t len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size);
|
||||
struct shmem_shared *shmem = shmem_int_open(shm->name, len);
|
||||
if (!shmem)
|
||||
struct shmem_shared *shared = shmem_shared_open(shm->name);
|
||||
if (!shared)
|
||||
serror("Failed to open shmem interface");
|
||||
|
||||
struct sample *insmps[node->vectorize], *outsmps[node->vectorize];
|
||||
while (1) {
|
||||
int r, w;
|
||||
if (shm->cond_out)
|
||||
r = queue_signalled_pull_many(&shmem->out.qs, (void **) insmps, node->vectorize);
|
||||
else
|
||||
r = queue_pull_many(&shmem->out.q, (void **) insmps, node->vectorize);
|
||||
int avail = sample_alloc(&shmem->pool, outsmps, r);
|
||||
r = shmem_shared_read(shared, insmps, node->vectorize);
|
||||
int avail = sample_alloc(&shared->pool, outsmps, r);
|
||||
if (avail < r)
|
||||
warn("pool underrun (%d/%d)\n", avail, r);
|
||||
for (int i = 0; i < r; i++)
|
||||
|
@ -66,10 +62,7 @@ int main(int argc, char* argv[])
|
|||
}
|
||||
for (int i = 0; i < r; i++)
|
||||
sample_put(insmps[i]);
|
||||
if (shm->cond_in)
|
||||
w = queue_signalled_push_many(&shmem->in.qs, (void **) outsmps, avail);
|
||||
else
|
||||
w = queue_push_many(&shmem->in.q, (void **) outsmps, avail);
|
||||
w = shmem_shared_write(shared, outsmps, avail);
|
||||
if (w < avail)
|
||||
warn("short write (%d/%d)\n", w, r);
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue