mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
shm node: signal the other program if one exits
This commit is contained in:
parent
1365f9d026
commit
39d7c451fb
4 changed files with 72 additions and 22 deletions
|
@ -19,9 +19,13 @@ struct shmem_shared {
|
|||
union shmem_queue out; /**< Queue for samples passed from node to external program.*/
|
||||
int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */
|
||||
struct pool pool; /**< Pool for the samples in the queues. */
|
||||
atomic_size_t node_stopped;
|
||||
atomic_size_t ext_stopped;
|
||||
};
|
||||
|
||||
struct shmem_shared * shmem_shared_open(const char* name);
|
||||
struct shmem_shared * shmem_shared_open(const char* name, void **base_ptr);
|
||||
|
||||
int shmem_shared_close(struct shmem_shared *shm, void *base);
|
||||
|
||||
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt);
|
||||
|
||||
|
|
|
@ -79,15 +79,15 @@ int shmem_open(struct node *n) {
|
|||
int shmem_close(struct node *n) {
|
||||
struct shmem* shm = n->_vd;
|
||||
size_t len = shm->shared->len;
|
||||
if (shm->cond_in)
|
||||
queue_signalled_destroy(&shm->shared->in.qs);
|
||||
else
|
||||
queue_destroy(&shm->shared->in.q);
|
||||
if (shm->cond_out)
|
||||
queue_signalled_destroy(&shm->shared->out.qs);
|
||||
else
|
||||
queue_destroy(&shm->shared->out.q);
|
||||
pool_destroy(&shm->shared->pool);
|
||||
atomic_store_explicit(&shm->shared->node_stopped, 1, memory_order_relaxed);
|
||||
if (shm->cond_out) {
|
||||
pthread_mutex_lock(&shm->shared->out.qs.mt);
|
||||
pthread_cond_broadcast(&shm->shared->out.qs.ready);
|
||||
pthread_mutex_unlock(&shm->shared->out.qs.mt);
|
||||
}
|
||||
/* Don't destroy the data structures yet, since the other process might
|
||||
* still be using them. Once both processes are done and have unmapped the
|
||||
* memory, it will be freed anyway. */
|
||||
int r = munmap(shm->base, len);
|
||||
if (r != 0)
|
||||
return r;
|
||||
|
@ -96,9 +96,14 @@ int shmem_close(struct node *n) {
|
|||
|
||||
int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) {
|
||||
struct shmem *shm = n->_vd;
|
||||
int r;
|
||||
if (shm->cond_in)
|
||||
return queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt);
|
||||
return queue_pull_many(&shm->shared->in.q, (void**) smps, cnt);
|
||||
r = queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt);
|
||||
else
|
||||
r = queue_pull_many(&shm->shared->in.q, (void**) smps, cnt);
|
||||
if (!r && atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed))
|
||||
return -1;
|
||||
return r;
|
||||
}
|
||||
|
||||
int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) {
|
||||
|
@ -122,6 +127,11 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) {
|
|||
sample_get(shared_smps[i]);
|
||||
}
|
||||
int pushed;
|
||||
if (atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) {
|
||||
for (int i = 0; i < avail; i++)
|
||||
sample_put(shared_smps[i]);
|
||||
return -1;
|
||||
}
|
||||
if (shm->cond_out)
|
||||
pushed = queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail);
|
||||
else
|
||||
|
|
37
lib/shmem.c
37
lib/shmem.c
|
@ -24,7 +24,7 @@ size_t shmem_total_size(int insize, int outsize, int sample_size)
|
|||
+ 1024;
|
||||
}
|
||||
|
||||
struct shmem_shared* shmem_shared_open(const char *name)
|
||||
struct shmem_shared* shmem_shared_open(const char *name, void **base_ptr)
|
||||
{
|
||||
int fd = shm_open(name, O_RDWR, 0);
|
||||
if (fd < 0)
|
||||
|
@ -47,19 +47,40 @@ struct shmem_shared* shmem_shared_open(const char *name)
|
|||
return NULL;
|
||||
/* Adress might have moved */
|
||||
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
|
||||
if (base_ptr)
|
||||
*base_ptr = base;
|
||||
return (struct shmem_shared *) cptr;
|
||||
}
|
||||
|
||||
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) {
|
||||
if (shm->cond_out)
|
||||
return queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt);
|
||||
else
|
||||
return queue_pull_many(&shm->out.q, (void **) smps, cnt);
|
||||
int shmem_shared_close(struct shmem_shared *shm, void *base)
|
||||
{
|
||||
atomic_store_explicit(&shm->ext_stopped, 1, memory_order_relaxed);
|
||||
if (shm->cond_in) {
|
||||
pthread_mutex_lock(&shm->in.qs.mt);
|
||||
pthread_cond_broadcast(&shm->in.qs.ready);
|
||||
pthread_mutex_unlock(&shm->in.qs.mt);
|
||||
}
|
||||
return munmap(base, shm->len);
|
||||
}
|
||||
|
||||
int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) {
|
||||
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int r;
|
||||
if (shm->cond_out)
|
||||
r = queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt);
|
||||
else
|
||||
r = queue_pull_many(&shm->out.q, (void **) smps, cnt);
|
||||
if (!r && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed))
|
||||
return -1;
|
||||
return r;
|
||||
}
|
||||
|
||||
int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
if (atomic_load_explicit(&shm->node_stopped, memory_order_relaxed))
|
||||
return -1;
|
||||
if (shm->cond_in)
|
||||
return queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt);
|
||||
else
|
||||
return queue_pull_many(&shm->in.q, (void **) smps, cnt);
|
||||
return queue_push_many(&shm->in.q, (void **) smps, cnt);
|
||||
}
|
||||
|
|
19
src/shmem.c
19
src/shmem.c
|
@ -7,7 +7,7 @@
|
|||
#include "node.h"
|
||||
#include "nodes/shmem.h"
|
||||
#include "pool.h"
|
||||
#include "queue.h"
|
||||
#include "queue_signalled.h"
|
||||
#include "sample.h"
|
||||
#include "sample_io.h"
|
||||
#include "shmem.h"
|
||||
|
@ -17,12 +17,21 @@
|
|||
|
||||
#define VECTORIZE 8
|
||||
|
||||
void *base;
|
||||
struct shmem_shared *shared;
|
||||
|
||||
void usage()
|
||||
{
|
||||
printf("Usage: villas-shmem SHM_NAME\n");
|
||||
printf(" SHMNAME name of the shared memory object\n");
|
||||
}
|
||||
|
||||
void quit(int sig)
|
||||
{
|
||||
shmem_shared_close(shared, base);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
if (argc != 2) {
|
||||
|
@ -30,14 +39,20 @@ int main(int argc, char* argv[])
|
|||
return 1;
|
||||
}
|
||||
|
||||
struct shmem_shared *shared = shmem_shared_open(argv[1]);
|
||||
shared = shmem_shared_open(argv[1], &base);
|
||||
if (!shared)
|
||||
serror("Failed to open shmem interface");
|
||||
|
||||
signal(SIGINT, quit);
|
||||
signal(SIGTERM, quit);
|
||||
struct sample *insmps[VECTORIZE], *outsmps[VECTORIZE];
|
||||
while (1) {
|
||||
int r, w;
|
||||
r = shmem_shared_read(shared, insmps, VECTORIZE);
|
||||
if (r == -1) {
|
||||
printf("node stopped, exiting\n");
|
||||
break;
|
||||
}
|
||||
int avail = sample_alloc(&shared->pool, outsmps, r);
|
||||
if (avail < r)
|
||||
warn("pool underrun (%d/%d)\n", avail, r);
|
||||
|
|
Loading…
Add table
Reference in a new issue