diff --git a/etc/shmem.conf b/etc/shmem.conf new file mode 100644 index 000000000..1e4cf01ef --- /dev/null +++ b/etc/shmem.conf @@ -0,0 +1,50 @@ +# Example configuration for testing the shared memory node. +# Samples are read from a file, passed to the external program +# via shared memory, and written back to an output file. + +stats = 1; +debug = 10; + +nodes = { + infile = { + type = "file", + in = { + path = "input.log", + mode = "r", + rate = 2.0 + }, + vectorize = 1 + }, + outfile = { + type = "file", + out = { + path = "output.log", + mode = "w" + }, + vectorize = 1 + }, + shmem = { + type = "shmem", + name = "/villas1", + insize = 32, + outsize = 32, + cond_in = true, + cond_out = true, + sample_size = 4, + vectorize = 1 + } +}; + +paths = ( + { + in = "infile", + out = "shmem", + rate = 2, + hook = ["print"] + }, + { + in = "shmem", + out = "outfile", + hook = ["print"], + } +); diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h new file mode 100644 index 000000000..de66ff2c8 --- /dev/null +++ b/include/villas/nodes/shmem.h @@ -0,0 +1,59 @@ +#ifndef _SHMEM_H_ +#define _SHMEM_H_ + +#include "node.h" +#include "memory.h" +#include "pool.h" +#include "queue.h" + +#define DEFAULT_SHMEM_QUEUESIZE 512 + +/** Per-direction shared datastructure for a shmem node. */ +struct shmem_queue { + struct queue queue; /**< Actual queue where the samples are passed */ + pthread_cond_t ready; /**< Condition variable to signal writes to the queue */ + pthread_condattr_t readyattr; + pthread_mutex_t mt; /**< Mutex for ready */ + pthread_mutexattr_t mtattr; +}; + +/** The structure that actually resides in the shared memory. TODO better name?*/ +struct shmem_shared { + struct shmem_queue in; /**< Queue for samples passed from external program to node.*/ + struct shmem_queue out; /**< Queue for samples passed from node to external program.*/ + struct pool pool; /**< Pool for the samples in the queues. */ +}; + +struct shmem { + const char* name; /**< Name of the shm object. */ + int sample_size; /**< Number of data entries for each sample. */ + int insize, outsize; /**< Size of ingoing and outgoing queue, respectively. */ + int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */ + int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */ + + size_t len; /**< Overall size of shared memory object. */ + struct memtype *manager; /**< Manager for the shared memory region. */ + int fd; /**< Handle as returned by shm_open().*/ + void *base; /**< Pointer to the shared memory region. */ + struct shmem_shared *shared; /**< Shared datastructure. */ +}; + +char *shmem_print(struct node *n); + +int shmem_parse(struct node *n, config_setting_t *cfg); + +int shmem_open(struct node *n); + +int shmem_close(struct node *n); + +int shmem_read(struct node *n, struct sample *smps[], unsigned cnt); + +int shmem_write(struct node *n, struct sample *smps[], unsigned cnt); + +/* The interface functions that the external program should use. TODO put this + * in another file? */ + +struct shmem_shared * shmem_int_open(const char* name, size_t len); + +size_t shmem_total_size(int insize, int outsize, int sample_size); +#endif /* _SHMEM_H_ */ diff --git a/lib/Makefile.inc b/lib/Makefile.inc index a36c1bf9f..9a3543ed4 100644 --- a/lib/Makefile.inc +++ b/lib/Makefile.inc @@ -2,7 +2,7 @@ LIBS = $(BUILDDIR)/libvillas.so # Object files for libvillas -LIB_SRCS = $(addprefix lib/nodes/, file.c cbuilder.c) \ +LIB_SRCS = $(addprefix lib/nodes/, file.c cbuilder.c shmem.c) \ $(addprefix lib/kernel/, kernel.c rt.c) \ $(addprefix lib/, sample.c path.c node.c hook.c \ log.c utils.c cfg.c hist.c timing.c pool.c list.c \ diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c new file mode 100644 index 000000000..2405e55aa --- /dev/null +++ b/lib/nodes/shmem.c @@ -0,0 +1,174 @@ +#include +#include +#include +#include +#include +#include + +#include "kernel/kernel.h" +#include "log.h" +#include "nodes/shmem.h" + +int shmem_parse(struct node *n, config_setting_t *cfg) { + struct shmem *shm = n->_vd; + + if (!config_setting_lookup_string(cfg, "name", &shm->name)) + cerror(cfg, "Missing shm object name"); + + if (!config_setting_lookup_int(cfg, "insize", &shm->insize)) + shm->insize = DEFAULT_SHMEM_QUEUESIZE; + if (!config_setting_lookup_int(cfg, "outsize", &shm->outsize)) + shm->outsize = DEFAULT_SHMEM_QUEUESIZE; + if (!config_setting_lookup_int(cfg, "sample_size", &shm->sample_size)) + cerror(cfg, "Missing sample size setting"); + if (!config_setting_lookup_bool(cfg, "cond_out", &shm->cond_out)) + shm->cond_out = false; + if (!config_setting_lookup_bool(cfg, "cond_in", &shm->cond_in)) + shm->cond_in = false; + + return 0; +} + +/* Helper for initializing condition variables / mutexes. */ +void shmem_cond_init(struct shmem_queue *queue) { + pthread_mutexattr_init(&queue->mtattr); + pthread_mutexattr_setpshared(&queue->mtattr, PTHREAD_PROCESS_SHARED); + pthread_condattr_init(&queue->readyattr); + pthread_condattr_setpshared(&queue->readyattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&queue->mt, &queue->mtattr); + pthread_cond_init(&queue->ready, &queue->readyattr); +} + +int shmem_open(struct node *n) { + struct shmem *shm = n->_vd; + + int r = shm_open(shm->name, O_RDWR|O_CREAT, 0600); + if (r < 0) + serror("Opening shared memory object failed"); + + shm->fd = r; + shm->len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size); + if (ftruncate(shm->fd, shm->len) < 0) + serror("Setting size of shared memory object failed"); + /* TODO: we could use huge pages here as well */ + shm->base = mmap(NULL, shm->len, PROT_READ|PROT_WRITE, MAP_SHARED, shm->fd, 0); + + if (shm->base == MAP_FAILED) + serror("Mapping shared memory failed"); + + shm->manager = memtype_managed_init(shm->base, shm->len); + shm->shared = memory_alloc(shm->manager, sizeof(struct shmem_shared)); + if (!shm->shared) + error("Shm shared struct allocation failed (not enough memory?"); + queue_init(&shm->shared->in.queue, shm->insize, shm->manager); + queue_init(&shm->shared->out.queue, shm->outsize, shm->manager); + pool_init(&shm->shared->pool, shm->insize+shm->outsize, SAMPLE_LEN(shm->sample_size), shm->manager); + if (shm->cond_out) + shmem_cond_init(&shm->shared->out); + if (shm->cond_in) + shmem_cond_init(&shm->shared->in); + + return 0; +} + +int shmem_close(struct node *n) { + struct shmem* shm = n->_vd; + int r = munmap(shm->base, shm->len); + if (r != 0) + return r; + return shm_unlink(shm->name); +} + +int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) { + struct shmem *shm = n->_vd; + if (shm->cond_in) { + pthread_mutex_lock(&shm->shared->in.mt); + pthread_cond_wait(&shm->shared->in.ready, &shm->shared->in.mt); + pthread_mutex_unlock(&shm->shared->in.mt); + } + int r = queue_pull_many(&shm->shared->in.queue, (void**) smps, cnt); + return r; +} + +int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) { + struct shmem *shm = n->_vd; + + /* Samples need to be copied to the shared pool first */ + struct sample *shared_smps[cnt]; + int avail = sample_alloc(&shm->shared->pool, shared_smps, cnt); + if (avail != cnt) + warn("Pool underrun for shmem node %s", shm->name); + for (int i = 0; i < avail; i++) { + /* Since the node isn't in shared memory, the source can't be accessed */ + shared_smps[i]->source = NULL; + shared_smps[i]->sequence = smps[i]->sequence; + shared_smps[i]->ts = smps[i]->ts; + int len = MIN(smps[i]->length, shared_smps[i]->capacity); + if (len != smps[i]->length) + warn("Losing data because of sample capacity mismatch in shmem node %s", shm->name); + memcpy(shared_smps[i]->data, smps[i]->data, len*sizeof(smps[0]->data[0])); + shared_smps[i]->length = len; + sample_get(shared_smps[i]); + } + int pushed = queue_push_many(&shm->shared->out.queue, (void**) shared_smps, avail); + if (pushed != avail) + warn("Outqueue overrun for shmem node %s", shm->name); + if (pushed && shm->cond_out) { + pthread_mutex_lock(&shm->shared->out.mt); + pthread_cond_broadcast(&shm->shared->out.ready); + pthread_mutex_unlock(&shm->shared->out.mt); + } + return pushed; +} + +char *shmem_print(struct node *n) { + struct shmem *shm = n->_vd; + char *buf = NULL; + strcatf(&buf, "name=%s, insize=%d, outsize=%d, sample_size=%d", shm->name, shm->insize, shm->outsize, shm->sample_size); + return buf; +}; + +static struct node_type vt = { + .name = "shmem", + .description = "use POSIX shared memory to interface with other programs", + .vectorize = 1, + .size = sizeof(struct shmem), + .parse = shmem_parse, + .print = shmem_print, + .open = shmem_open, + .close = shmem_close, + .read = shmem_read, + .write = shmem_write +}; + +REGISTER_NODE_TYPE(&vt) + +size_t shmem_total_size(int insize, int outsize, int sample_size) +{ + // we have the constant const of the memtype header + return sizeof(struct memtype) + // and the shared struct itself + + sizeof(struct shmem_shared) + // the size of the 2 queues and the queue for the pool + + (insize + outsize) * (2*sizeof(struct queue_cell)) + // the size of the pool + + (insize + outsize) * kernel_get_cacheline_size() * CEIL(SAMPLE_LEN(sample_size), kernel_get_cacheline_size()) + // a memblock for each allocation (1 shmem_shared, 3 queues, 1 pool) + + 5 * sizeof(struct memblock) + // and some extra buffer for alignment + + 1024; +} + +struct shmem_shared* shmem_int_open(const char *name, size_t len) +{ + int fd = shm_open(name, O_RDWR, 0); + if (fd < 0) + return NULL; + void *base = mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); + if (base == MAP_FAILED) + return NULL; + /* This relies on the behaviour of the node and the allocator; it assumes + * that memtype_managed is used and the shmem_shared is the first allocated object */ + char *cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); + return (struct shmem_shared *) cptr; +} diff --git a/src/Makefile.inc b/src/Makefile.inc index 94cd8fcdb..a96a216c4 100644 --- a/src/Makefile.inc +++ b/src/Makefile.inc @@ -2,6 +2,7 @@ TARGETS = $(BUILDDIR)/villas-node \ $(BUILDDIR)/villas-pipe \ $(BUILDDIR)/villas-signal \ + $(BUILDDIR)/villas-shmem \ $(BUILDDIR)/villas-test \ $(BUILDDIR)/villas-hook @@ -47,4 +48,4 @@ install-src: src clean-src: rm -rf $(BUILDDIR)/src $(TARGETS) -.PHONY: src src-tests src-tests \ No newline at end of file +.PHONY: src src-tests src-tests diff --git a/src/shmem.c b/src/shmem.c new file mode 100644 index 000000000..9f6753ee3 --- /dev/null +++ b/src/shmem.c @@ -0,0 +1,76 @@ +/* Test "client" for the shared memory interface. + * Busy waits on the incoming queue, prints received samples and writes them + * back to the other queue. */ + +#include "config.h" +#include "cfg.h" +#include "log.h" +#include "nodes/shmem.h" +#include "pool.h" +#include "queue.h" +#include "sample.h" + +#include + +void usage() +{ + printf("Usage: villas-shmem CONFIG NODE\n"); + printf(" CONFIG path to a configuration file\n"); + printf(" NODE the name of the node which samples should be read from\n"); +} + +int main(int argc, char* argv[]) +{ + if (argc != 3) { + usage(); + return 1; + } + struct list nodes; + struct settings settings; + config_t config; + + list_init(&nodes); + cfg_parse(argv[1], &config, &settings, &nodes, NULL); + + struct node *node = list_lookup(&nodes, argv[2]); + if (!node) + error("Node '%s' does not exist!", argv[2]); + + struct shmem* shm = node->_vd; + size_t len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size); + struct shmem_shared *shmem = shmem_int_open(shm->name, len); + if (!shmem) + serror("Failed to open shmem interface"); + + struct sample *insmps[node->vectorize], *outsmps[node->vectorize]; + while (1) { + if (shm->cond_out) { + pthread_mutex_lock(&shmem->out.mt); + pthread_cond_wait(&shmem->out.ready, &shmem->out.mt); + pthread_mutex_unlock(&shmem->out.mt); + } + int r = queue_pull_many(&shmem->out.queue, (void **) insmps, node->vectorize); + int avail = sample_alloc(&shmem->pool, outsmps, r); + if (avail < r) + warn("pool underrun (%d/%d)\n", avail, r); + for (int i = 0; i < r; i++) + sample_fprint(stdout, insmps[i], SAMPLE_ALL); + for (int i = 0; i < avail; i++) { + outsmps[i]->sequence = insmps[i]->sequence; + outsmps[i]->ts = insmps[i]->ts; + int len = MIN(insmps[i]->length, outsmps[i]->capacity); + memcpy(outsmps[i]->data, insmps[i]->data, len*sizeof(insmps[0]->data[0])); + outsmps[i]->length = len; + } + for (int i = 0; i < r; i++) + sample_put(insmps[i]); + int w = queue_push_many(&shmem->in.queue, (void **) outsmps, avail); + if (w < avail) + warn("short write (%d/%d)\n", w, r); + if (shm->cond_in && w) { + pthread_mutex_lock(&shmem->in.mt); + pthread_cond_broadcast(&shmem->in.ready); + pthread_mutex_unlock(&shmem->in.mt); + } + } +}