1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

several smaller cleanups: coding-style, variable names, white spaces, intermediate variables

This commit is contained in:
Steffen Vogel 2017-04-15 18:59:22 +02:00
parent 12d43eebd9
commit b23000e2a8
10 changed files with 231 additions and 148 deletions

View file

@ -22,17 +22,17 @@
#define DEFAULT_SHMEM_QUEUESIZE 512
struct shmem {
const char* name; /**< Name of the shm object. */
int sample_size; /**< Number of data entries for each sample. */
int insize, outsize; /**< Size of ingoing and outgoing queue, respectively. */
int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */
int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */
char **exec; /**< External program to execute on start. */
const char* name; /**< Name of the shm object. */
int sample_size; /**< Number of data entries for each sample. */
int insize, outsize; /**< Size of ingoing and outgoing queue, respectively. */
int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */
int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */
const char * const exec; /**< External program to execute on start. */
struct memtype *manager; /**< Manager for the shared memory region. */
int fd; /**< Handle as returned by shm_open().*/
void *base; /**< Pointer to the shared memory region. */
struct shmem_shared *shared; /**< Shared datastructure. */
struct memtype *manager; /**< Manager for the shared memory region. */
int fd; /**< Handle as returned by shm_open().*/
void *base; /**< Pointer to the shared memory region. */
struct shmem_shared *shared; /**< Shared datastructure. */
};
char *shmem_print(struct node *n);

View file

@ -13,10 +13,10 @@
/** Wrapper around queue that uses POSIX CV's for signalling writes. */
struct queue_signalled {
struct queue q; /**< Actual underlying queue. */
pthread_cond_t ready; /**< Condition variable to signal writes to the queue. */
struct queue q; /**< Actual underlying queue. */
pthread_cond_t ready; /**< Condition variable to signal writes to the queue. */
pthread_condattr_t readyattr;
pthread_mutex_t mt; /**< Mutex for ready. */
pthread_mutex_t mt; /**< Mutex for ready. */
pthread_mutexattr_t mtattr;
};

View file

@ -1,4 +1,3 @@
#pragma once
/** Shared-memory interface: The interface functions that the external program should use.
*
* @file
@ -6,6 +5,7 @@
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
*********************************************************************************/
#pragma once
#include "pool.h"
#include "queue.h"
@ -18,12 +18,16 @@ union shmem_queue {
/** The structure that actually resides in the shared memory. */
struct shmem_shared {
size_t len; /**< Total size of the shared memory region.*/
union shmem_queue in; /**< Queue for samples passed from external program to node.*/
int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */
union shmem_queue out; /**< Queue for samples passed from node to external program.*/
int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */
struct pool pool; /**< Pool for the samples in the queues. */
size_t len; /**< Total size of the shared memory region.*/
int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */
int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */
union shmem_queue in; /**< Queue for samples passed from external program to node.*/
union shmem_queue out; /**< Queue for samples passed from node to external program.*/
struct pool pool; /**< Pool for the samples in the queues. */
pthread_barrier_t start_bar;
pthread_barrierattr_t start_attr;
atomic_size_t node_stopped;

View file

@ -18,12 +18,12 @@
#include "shmem.h"
#include "utils.h"
int shmem_parse(struct node *n, config_setting_t *cfg) {
int shmem_parse(struct node *n, config_setting_t *cfg)
{
struct shmem *shm = n->_vd;
if (!config_setting_lookup_string(cfg, "name", &shm->name))
cerror(cfg, "Missing shm object name");
cerror(cfg, "Missing shared memory object name");
if (!config_setting_lookup_int(cfg, "insize", &shm->insize))
shm->insize = DEFAULT_SHMEM_QUEUESIZE;
if (!config_setting_lookup_int(cfg, "outsize", &shm->outsize))
@ -34,147 +34,181 @@ int shmem_parse(struct node *n, config_setting_t *cfg) {
shm->cond_out = false;
if (!config_setting_lookup_bool(cfg, "cond_in", &shm->cond_in))
shm->cond_in = false;
config_setting_t *exec_setting = config_setting_lookup(cfg, "exec");
if (!exec_setting) {
config_setting_t *exec_cfg = config_setting_lookup(cfg, "exec");
if (!exec_cfg)
shm->exec = NULL;
} else {
if (!config_setting_is_array(exec_setting))
cerror(exec_setting, "Invalid format for exec");
shm->exec = malloc(sizeof(char*) * (config_setting_length(exec_setting) + 1));
else {
if (!config_setting_is_array(exec_cfg))
cerror(exec_cfg, "Invalid format for exec");
shm->exec = alloc(sizeof(char *) * (config_setting_length(exec_cfg) + 1));
int i;
for (i = 0; i < config_setting_length(exec_setting); i++) {
const char* elm = config_setting_get_string_elem(exec_setting, i);
if (!elm)
cerror(exec_setting, "Invalid format for exec");
shm->exec[i] = strdup(elm);
for (i = 0; i < config_setting_length(exec_cfg); i++) {
shm->exec[i] = config_setting_get_string_elem(exec_cfg, i);
if (!shm->exec[i])
cerror(exec_cfg, "Invalid format for exec");
}
shm->exec[i] = NULL;
}
return 0;
}
int shmem_open(struct node *n) {
int shmem_open(struct node *n)
{
struct shmem *shm = n->_vd;
int r = shm_open(shm->name, O_RDWR|O_CREAT, 0600);
if (r < 0)
int ret;
size_t len;
shm->fd = shm_open(shm->name, O_RDWR | O_CREAT, 0600);
if (shm->fd < 0)
serror("Opening shared memory object failed");
shm->fd = r;
size_t len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size);
if (ftruncate(shm->fd, len) < 0)
len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size);
ret = ftruncate(shm->fd, len);
if (ret < 0)
serror("Setting size of shared memory object failed");
shm->base = mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_SHARED, shm->fd, 0);
shm->base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, shm->fd, 0);
if (shm->base == MAP_FAILED)
serror("Mapping shared memory failed");
shm->manager = memtype_managed_init(shm->base, len);
shm->shared = memory_alloc(shm->manager, sizeof(struct shmem_shared));
if (!shm->shared)
error("Shm shared struct allocation failed (not enough memory?)");
error("Shared memory shared struct allocation failed (not enough memory?)");
memset(shm->shared, 0, sizeof(struct shmem_shared));
shm->shared->len = len;
shm->shared->cond_in = shm->cond_in;
shm->shared->cond_out = shm->cond_out;
if (shm->cond_in) {
if (queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager) < 0)
error("Shm queue allocation failed (not enough memory?)");
} else {
if (queue_init(&shm->shared->in.q, shm->insize, shm->manager) < 0)
error("Shm queue allocation failed (not enough memory?)");
}
if (shm->cond_out) {
if (queue_signalled_init(&shm->shared->out.qs, shm->outsize, shm->manager) < 0)
error("Shm queue allocation failed (not enough memory?)");
} else {
if (queue_init(&shm->shared->out.q, shm->outsize, shm->manager) < 0)
error("Shm queue allocation failed (not enough memory?)");
}
if (pool_init(&shm->shared->pool, shm->insize+shm->outsize, SAMPLE_LEN(shm->sample_size), shm->manager) < 0)
error("Shm pool allocation failed (not enough memory?)");
ret = shm->cond_in ? queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager)
: queue_init(&shm->shared->in.q, shm->insize, shm->manager);
if (ret)
error("Shared memory queue allocation failed (not enough memory?)");
ret = shm->cond_out ? queue_signalled_init(&shm->shared->out.qs, shm->outsize, shm->manager)
: queue_init(&shm->shared->out.q, shm->outsize, shm->manager);
if (ret)
error("Shared memory queue allocation failed (not enough memory?)");
ret = pool_init(&shm->shared->pool, shm->insize+shm->outsize, SAMPLE_LEN(shm->sample_size), shm->manager);
if (ret)
error("Shared memory pool allocation failed (not enough memory?)");
pthread_barrierattr_init(&shm->shared->start_attr);
pthread_barrierattr_setpshared(&shm->shared->start_attr, PTHREAD_PROCESS_SHARED);
pthread_barrier_init(&shm->shared->start_bar, &shm->shared->start_attr, 2);
if (shm->exec && !spawn(shm->exec[0], shm->exec))
serror("Failed to spawn external program");
if (shm->exec) {
ret = spawn(shm->exec[0], shm->exec);
if (!ret)
serror("Failed to spawn external program");
}
pthread_barrier_wait(&shm->shared->start_bar);
return 0;
}
int shmem_close(struct node *n) {
int shmem_close(struct node *n)
{
struct shmem* shm = n->_vd;
size_t len = shm->shared->len;
int ret;
atomic_store_explicit(&shm->shared->node_stopped, 1, memory_order_relaxed);
if (shm->cond_out) {
pthread_mutex_lock(&shm->shared->out.qs.mt);
pthread_cond_broadcast(&shm->shared->out.qs.ready);
pthread_mutex_unlock(&shm->shared->out.qs.mt);
}
/* Don't destroy the data structures yet, since the other process might
* still be using them. Once both processes are done and have unmapped the
* memory, it will be freed anyway. */
int r = munmap(shm->base, len);
if (r != 0)
return r;
ret = munmap(shm->base, shm->shared->len);
if (ret != 0)
return ret;
return shm_unlink(shm->name);
}
int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) {
int shmem_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct shmem *shm = n->_vd;
int r;
if (shm->cond_in)
r = queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt);
else
r = queue_pull_many(&shm->shared->in.q, (void**) smps, cnt);
if (!r && atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed))
int ret;
ret = shm->cond_in ? queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt)
: queue_pull_many(&shm->shared->in.q, (void**) smps, cnt);
if (ret <= 0)
return ret;
/* Check if remote process is still running */
ret = atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed);
if (ret)
return -1;
return r;
return ret;
}
int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) {
int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct shmem *shm = n->_vd;
/* Samples need to be copied to the shared pool first */
struct sample *shared_smps[cnt];
int avail = sample_alloc(&shm->shared->pool, shared_smps, cnt);
struct sample *shared_smps[cnt]; /**< Samples need to be copied to the shared pool first */
int avail, pushed, len;
avail = sample_alloc(&shm->shared->pool, shared_smps, cnt);
if (avail != cnt)
warn("Pool underrun for shmem node %s", shm->name);
for (int i = 0; i < avail; i++) {
/* Since the node isn't in shared memory, the source can't be accessed */
shared_smps[i]->source = NULL;
shared_smps[i]->sequence = smps[i]->sequence;
shared_smps[i]->ts = smps[i]->ts;
int len = MIN(smps[i]->length, shared_smps[i]->capacity);
len = MIN(smps[i]->length, shared_smps[i]->capacity);
if (len != smps[i]->length)
warn("Losing data because of sample capacity mismatch in shmem node %s", shm->name);
warn("Losing data because of sample capacity mismatch in node %s", node_name(n));
memcpy(shared_smps[i]->data, smps[i]->data, len*sizeof(smps[0]->data[0]));
shared_smps[i]->length = len;
sample_get(shared_smps[i]);
}
int pushed;
if (atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) {
for (int i = 0; i < avail; i++)
sample_put(shared_smps[i]);
return -1;
}
if (shm->cond_out)
pushed = queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail);
else
pushed = queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail);
pushed = shm->cond_out ? queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail)
: queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail);
if (pushed != avail)
warn("Outqueue overrun for shmem node %s", shm->name);
warn("Outgoing queue overrun for node %s", node_name(n));
return pushed;
}
char *shmem_print(struct node *n) {
char * shmem_print(struct node *n)
{
struct shmem *shm = n->_vd;
char *buf = NULL;
strcatf(&buf, "name=%s, insize=%d, outsize=%d, sample_size=%d", shm->name, shm->insize, shm->outsize, shm->sample_size);
return buf;
};

View file

@ -453,7 +453,7 @@ int path_reverse(struct path *p, struct path *r)
for (size_t i = 0; i < list_length(&p->hooks); i++) {
struct hook *h = list_at(&p->hooks, i);
struct hook hc = {.state = STATE_DESTROYED};
struct hook hc = { .state = STATE_DESTROYED };
ret = hook_init(&hc, h->_vt, p);
if (ret)

View file

@ -9,37 +9,49 @@
int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem)
{
int r = queue_init(&qs->q, size, mem);
if (r < 0)
return r;
int ret;
ret = queue_init(&qs->q, size, mem);
if (ret < 0)
return ret;
pthread_mutexattr_init(&qs->mtattr);
pthread_mutexattr_setpshared(&qs->mtattr, PTHREAD_PROCESS_SHARED);
pthread_condattr_init(&qs->readyattr);
pthread_condattr_setpshared(&qs->readyattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&qs->mt, &qs->mtattr);
pthread_cond_init(&qs->ready, &qs->readyattr);
return 0;
}
int queue_signalled_destroy(struct queue_signalled *qs)
{
int r = queue_destroy(&qs->q);
if (r < 0)
return r;
int ret;
ret = queue_destroy(&qs->q);
if (ret < 0)
return ret;
pthread_cond_destroy(&qs->ready);
pthread_mutex_destroy(&qs->mt);
return 0;
}
int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt)
{
int r = queue_push_many(&qs->q, ptr, cnt);
if (r > 0) {
pthread_mutex_lock(&qs->mt);
pthread_cond_broadcast(&qs->ready);
pthread_mutex_unlock(&qs->mt);
}
return r;
int ret;
ret = queue_push_many(&qs->q, ptr, cnt);
if (ret < 0)
return ret;
pthread_mutex_lock(&qs->mt);
pthread_cond_broadcast(&qs->ready);
pthread_mutex_unlock(&qs->mt);
return ret;
}
int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt)
@ -50,5 +62,6 @@ int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cn
pthread_cond_wait(&qs->ready, &qs->mt);
pthread_mutex_unlock(&qs->mt);
pthread_cleanup_pop(0);
return queue_pull_many(&qs->q, ptr, cnt);
}

View file

@ -17,79 +17,100 @@
size_t shmem_total_size(int insize, int outsize, int sample_size)
{
// we have the constant const of the memtype header
/* We have the constant const of the memtype header */
return sizeof(struct memtype)
// and the shared struct itself
/* and the shared struct itself */
+ sizeof(struct shmem_shared)
// the size of the 2 queues and the queue for the pool
+ (insize + outsize) * (2*sizeof(struct queue_cell))
// the size of the pool
/* the size of the 2 queues and the queue for the pool */
+ (insize + outsize) * (2 * sizeof(struct queue_cell))
/* the size of the pool */
+ (insize + outsize) * kernel_get_cacheline_size() * CEIL(SAMPLE_LEN(sample_size), kernel_get_cacheline_size())
// a memblock for each allocation (1 shmem_shared, 3 queues, 1 pool)
/* a memblock for each allocation (1 shmem_shared, 3 queues, 1 pool) */
+ 5 * sizeof(struct memblock)
// and some extra buffer for alignment
/* and some extra buffer for alignment */
+ 1024;
}
struct shmem_shared* shmem_shared_open(const char *name, void **base_ptr)
struct shmem_shared * shmem_shared_open(const char *name, void **base_ptr)
{
int fd = shm_open(name, O_RDWR, 0);
struct shmem_shared *shm;
size_t len, newlen;
void *base;
char *cptr;
int fd, ret;
fd = shm_open(name, O_RDWR, 0);
if (fd < 0)
return NULL;
/* Only map the first part (shmem_shared) first, read the correct length,
* the map it with this length. */
size_t len = sizeof(struct memtype) + sizeof(struct memblock) + sizeof(struct shmem_shared);
void *base = mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
len = sizeof(struct memtype) + sizeof(struct memblock) + sizeof(struct shmem_shared);
base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
return NULL;
/* This relies on the behaviour of the node and the allocator; it assumes
* that memtype_managed is used and the shmem_shared is the first allocated object */
char *cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
struct shmem_shared *shm = (struct shmem_shared *) cptr;
size_t newlen = shm->len;
if (munmap(base, len))
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
shm = (struct shmem_shared *) cptr;
newlen = shm->len;
ret = munmap(base, len);
if (ret)
return NULL;
base = mmap(NULL, newlen, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
base = mmap(NULL, newlen, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
return NULL;
/* Adress might have moved */
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
if (base_ptr)
*base_ptr = base;
shm = (struct shmem_shared *) cptr;
pthread_barrier_wait(&shm->start_bar);
return shm;
}
int shmem_shared_close(struct shmem_shared *shm, void *base)
{
atomic_store_explicit(&shm->ext_stopped, 1, memory_order_relaxed);
if (shm->cond_in) {
pthread_mutex_lock(&shm->in.qs.mt);
pthread_cond_broadcast(&shm->in.qs.ready);
pthread_mutex_unlock(&shm->in.qs.mt);
}
return munmap(base, shm->len);
}
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
{
int r;
if (shm->cond_out)
r = queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt);
else
r = queue_pull_many(&shm->out.q, (void **) smps, cnt);
if (!r && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed))
int ret;
ret = shm->cond_out ? queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt)
: queue_pull_many(&shm->out.q, (void **) smps, cnt);
if (!ret && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed))
return -1;
return r;
return ret;
}
int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
{
if (atomic_load_explicit(&shm->node_stopped, memory_order_relaxed))
int ret;
ret = atomic_load_explicit(&shm->node_stopped, memory_order_relaxed);
if (ret)
return -1;
if (shm->cond_in)
return queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt);
else
return queue_push_many(&shm->in.q, (void **) smps, cnt);
return shm->cond_in ? queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt)
: queue_push_many(&shm->in.q, (void **) smps, cnt);
}

View file

@ -270,7 +270,7 @@ int super_node_parse(struct super_node *sn, config_setting_t *cfg)
list_push(&sn->paths, memdup(&p, sizeof(p)));
if (p.reverse) {
struct path r = {.state = STATE_DESTROYED};
struct path r = { .state = STATE_DESTROYED };
ret = path_init(&r, sn);
if (ret)

View file

@ -293,10 +293,12 @@ void signals_init(void (*cb)(int signal, siginfo_t *sinfo, void *ctx))
sigemptyset(&sa_quit.sa_mask);
sigaction(SIGINT, &sa_quit, NULL);
sigaction(SIGTERM, &sa_quit, NULL);
struct sigaction sa_chld = {
.sa_flags = 0,
.sa_handler = SIG_IGN
};
sigaction(SIGCHLD, &sa_chld, NULL);
}

View file

@ -38,6 +38,8 @@ void quit(int sig)
int main(int argc, char* argv[])
{
int readcnt, writecnt, avail;
if (argc != 2) {
usage();
return 1;
@ -51,30 +53,37 @@ int main(int argc, char* argv[])
signal(SIGTERM, quit);
struct sample *insmps[VECTORIZE], *outsmps[VECTORIZE];
while (1) {
int r, w;
r = shmem_shared_read(shared, insmps, VECTORIZE);
if (r == -1) {
printf("node stopped, exiting\n");
readcnt = shmem_shared_read(shared, insmps, VECTORIZE);
if (readcnt == -1) {
printf("Node stopped, exiting\n");
break;
}
int avail = sample_alloc(&shared->pool, outsmps, r);
if (avail < r)
warn("pool underrun (%d/%d)\n", avail, r);
for (int i = 0; i < r; i++) {
printf("got sample: seq %d recv %ld.%ld\n", insmps[i]->sequence,
insmps[i]->ts.received.tv_sec, insmps[i]->ts.received.tv_nsec);
}
avail = sample_alloc(&shared->pool, outsmps, readcnt);
if (avail < readcnt)
warn("Pool underrun: %d / %d\n", avail, readcnt);
for (int i = 0; i < readcnt; i++)
sample_io_villas_fprint(stdout, insmps[i], SAMPLE_IO_ALL);
for (int i = 0; i < avail; i++) {
outsmps[i]->sequence = insmps[i]->sequence;
outsmps[i]->ts = insmps[i]->ts;
int len = MIN(insmps[i]->length, outsmps[i]->capacity);
memcpy(outsmps[i]->data, insmps[i]->data, len*sizeof(insmps[0]->data[0]));
memcpy(outsmps[i]->data, insmps[i]->data, SAMPLE_DATA_LEN(len));
outsmps[i]->length = len;
}
for (int i = 0; i < r; i++)
for (int i = 0; i < readcnt; i++)
sample_put(insmps[i]);
w = shmem_shared_write(shared, outsmps, avail);
if (w < avail)
warn("short write (%d/%d)\n", w, r);
writecnt = shmem_shared_write(shared, outsmps, avail);
if (writecnt < avail)
warn("Short write");
info("Read / Write: %d / %d", readcnt, writecnt);
}
}