diff --git a/etc/shmem.conf b/etc/shmem.conf index 9d3779506..b5aa3e4b4 100644 --- a/etc/shmem.conf +++ b/etc/shmem.conf @@ -21,7 +21,8 @@ nodes = { }, shmem = { type = "shmem", - name = "/villas1", + out_name = "/villas1-out", + in_name = "/villas1-in", samplelen = 4, queuelen = 32, polling = false, diff --git a/include/villas/fpga/card.h b/include/villas/fpga/card.h index 5dcd7a0d9..ba2b3ce3f 100644 --- a/include/villas/fpga/card.h +++ b/include/villas/fpga/card.h @@ -32,6 +32,8 @@ #include #include "common.h" +#include "kernel/pci.h" +#include "kernel/vfio.h" /* Forward declarations */ struct fpga_ip; @@ -92,4 +94,4 @@ void fpga_card_dump(struct fpga_card *c); /** Reset the FPGA to a known state */ int fpga_card_reset(struct fpga_card *c); -/** @} */ \ No newline at end of file +/** @} */ diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h index 91676d00b..c001d1d22 100644 --- a/include/villas/nodes/shmem.h +++ b/include/villas/nodes/shmem.h @@ -34,24 +34,17 @@ #include "pool.h" #include "queue.h" #include "config.h" - -#define DEFAULT_SHMEM_QUEUELEN 512 -#define DEFAULT_SHMEM_SAMPLELEN DEFAULT_SAMPLELEN +#include "shmem.h" /** Node-type for shared memory communication. * @see node_type */ struct shmem { - const char* name; /**< Name of the shm object. */ - int samplelen; /**< Number of data entries for each sample. */ - int queuelen; /**< Size of ingoing and outgoing queue, respectively. */ - int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to outgoing queue. */ - char **exec; /**< External program to execute on start. */ - - struct memtype *manager; /**< Manager for the shared memory region. */ - int fd; /**< Handle as returned by shm_open().*/ - void *base; /**< Pointer to the shared memory region. */ - struct shmem_shared *shared; /**< Shared datastructure. */ + const char* out_name; /**< Name of the shm object for the output queue. */ + const char* in_name; /**< Name of the shm object for the input queue. */ + struct shmem_conf conf; /**< Interface configuration struct. */ + char **exec; /**< External program to execute on start. */ + struct shmem_int intf; /**< Shmem interface */ }; /** @see node_type::print */ diff --git a/include/villas/shmem.h b/include/villas/shmem.h index 39d5d5390..0171402d1 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -36,6 +36,10 @@ extern "C" { #include "pool.h" #include "queue.h" #include "queue_signalled.h" +#include "sample.h" + +#define DEFAULT_SHMEM_QUEUELEN 512 +#define DEFAULT_SHMEM_SAMPLELEN 64 /** A signalled queue or a regular (polling) queue, depending on the polling setting. */ union shmem_queue { @@ -43,56 +47,90 @@ union shmem_queue { struct queue_signalled qs; }; -/** The structure that actually resides in the shared memory. */ -struct shmem_shared { - size_t len; /**< Total size of the shared memory region.*/ +#define SHMEM_MIN_SIZE (sizeof(struct memtype) + sizeof(struct memblock) + sizeof(pthread_barrier_t) + sizeof(pthread_barrierattr_t)) - int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */ - - union shmem_queue in; /**< Queue for samples passed from external program to node.*/ - union shmem_queue out; /**< Queue for samples passed from node to external program.*/ - - struct pool pool; /**< Pool for the samples in the queues. */ - - pthread_barrier_t start_bar; /**< Barrier for synchronizing the start of both programs. */ - pthread_barrierattr_t start_attr; - atomic_size_t node_stopped; /**< Set to 1 by VILLASNode if it is stopped/killed. */ - atomic_size_t ext_stopped; /**< Set to 1 by the external program if it is stopped/killed. */ +/** Struct containing all parameters that need to be known when creating a new + * shared memory object. */ +struct shmem_conf { + int polling; /**< Whether to use polling instead of POSIX CVs */ + int queuelen; /**< Size of the queues (in elements) */ + int samplelen; /**< Maximum number of data entries in a single sample */ }; -/** Open the shared memory object and retrieve / initialize the shared data structures. - * @param[in] name Name of the POSIX shared memory object. - * @param[inout] base_ptr The base address of the shared memory region is written to this pointer. - * @retval NULL An error occurred; errno is set appropiately. - */ -struct shmem_shared * shmem_shared_open(const char* name, void **base_ptr); +/** The structure that actually resides in the shared memory. */ +struct shmem_shared { + int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */ + union shmem_queue queue; /**< Queues for samples passed in both directions. */ + struct pool pool; /**< Pool for the samples in the queues. */ +}; -/** Close and destroy the shared memory object and related structures. - * @param shm The shared memory structure. - * @param base The base address as returned by shmem_shared_open. +/** Relevant information for one direction of the interface. */ +struct shmem_dir { + void *base; /**< Base address of the region. */ + const char *name; /**< Name of the shmem object. */ + size_t len; /**< Total size of the region. */ + struct shmem_shared *shared; /**< Actually shared datastructure */ +}; + +/** Main structure representing the shared memory interface. */ +struct shmem_int { + struct shmem_dir read, write; +}; + +/** Open the shared memory objects and retrieve / initialize the shared data structures. + * Blocks until another process connects by opening the same objects. + * + * @param[in] wname Name of the POSIX shared memory object containing the output queue. + * @param[in] rname Name of the POSIX shared memory object containing the input queue. + * @param[inout] shm The shmem_int structure that should be used for following + * calls will be written to this pointer. + * @param[in] conf Configuration parameters for the output queue. + * @retval 0 The objects were opened and initialized successfully. + * @retval <0 An error occured; errno is set accordingly. + */ +int shmem_int_open(const char* wname, const char* rname, struct shmem_int* shm, struct shmem_conf* conf); + +/** Close and destroy the shared memory interface and related structures. + * + * @param shm The shared memory interface. * @retval 0 Closing successfull. * @retval <0 An error occurred; errno is set appropiately. */ -int shmem_shared_close(struct shmem_shared *shm, void *base); +int shmem_int_close(struct shmem_int *shm); -/** Read samples from VILLASNode. - * @param shm The shared memory structure. +/** Read samples from the interface. + * + * @param shm The shared memory interface. * @param smps An array where the pointers to the samples will be written. The samples * must be freed with sample_put after use. * @param cnt Number of samples to be read. * @retval >=0 Number of samples that were read. Can be less than cnt (including 0) in case not enough samples were available. - * @retval -1 VILLASNode exited; no samples can be read anymore. + * @retval -1 The other process closed the interface; no samples can be read anymore. */ -int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt); +int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt); -/** Write samples to VILLASNode. - * @param shm The shared memory structure. - * @param smps The samples to be written. Must be allocated from shm->pool. +/** Write samples to the interface. + * + * @param shm The shared memory interface. + * @param smps The samples to be written. Must be allocated from shm_int_alloc. * @param cnt Number of samples to write. * @retval >=0 Number of samples that were successfully written. Can be less than cnt (including 0) in case of a full queue. - * @retval -1 VILLASNode exited; no samples can be written anymore. + * @retval -1 The write failed for some reason; no more samples can be written. */ -int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt); +int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt); + +/** Allocate samples to be written to the interface. The writing process must + * + * not free the samples; only the receiving process should free them using + * sample_put after use. + * @param shm The shared memory interface. + * @param smps Array where pointers to newly allocated samples will be returned. + * @param cnt Number of samples to allocate. + * @returns Number of samples that were successfully allocated (may be less then cnt). + */ +inline int shmem_int_alloc(struct shmem_int *shm, struct sample *smps[], unsigned cnt) { + return sample_alloc(&shm->write.shared->pool, smps, cnt); +} /** Returns the total size of the shared memory region with the given size of * the input/output queues (in elements) and the given number of data elements diff --git a/lib/Makefile.villas-ext.inc b/lib/Makefile.villas-ext.inc index 85473535b..1168f2fe2 100644 --- a/lib/Makefile.villas-ext.inc +++ b/lib/Makefile.villas-ext.inc @@ -26,7 +26,7 @@ LIBEXT = $(BUILDDIR)/$(LIBEXT_NAME).so.$(LIBEXT_ABI_VERSION) LIBEXT_SRCS += $(addprefix lib/, sample.c queue.c queue_signalled.c \ memory.c log.c shmem.c utils.c kernel/kernel.c list.c \ - timing.c \ + timing.c pool.c \ ) LIBEXT_LDFLAGS = -shared diff --git a/lib/kernel/pci.c b/lib/kernel/pci.c index f981201ad..8ca007587 100644 --- a/lib/kernel/pci.c +++ b/lib/kernel/pci.c @@ -8,6 +8,7 @@ #include #include #include +#include #include "log.h" #include "utils.h" @@ -20,7 +21,7 @@ int pci_init(struct pci *p) struct dirent *entry; DIR *dp; FILE *f; - char path[256]; + char path[PATH_MAX]; int ret; snprintf(path, sizeof(path), "%s/bus/pci/devices", SYSFS_PATH); diff --git a/lib/nodes/ngsi.c b/lib/nodes/ngsi.c index 0581e9e9d..e80f5bbf8 100644 --- a/lib/nodes/ngsi.c +++ b/lib/nodes/ngsi.c @@ -238,9 +238,9 @@ static int ngsi_parse_mapping(struct list *mapping, config_setting_t *cfg) struct ngsi_metadata index = { .name = "index", .type = "integer", - .value = alloc(8) + .value = alloc(11) }; - snprintf(index.value, 8, "%u", j); + snprintf(index.value, 11, "%u", j); list_push(&map.metadata, memdup(&index, sizeof(index))); list_push(&map.metadata, memdup(&source, sizeof(source))); @@ -600,4 +600,4 @@ static struct plugin p = { } }; -REGISTER_PLUGIN(&p) \ No newline at end of file +REGISTER_PLUGIN(&p) diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 5f7c38c02..c7fffc112 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -30,9 +30,9 @@ #include "kernel/kernel.h" #include "log.h" +#include "shmem.h" #include "nodes/shmem.h" #include "plugin.h" -#include "shmem.h" #include "timing.h" #include "utils.h" @@ -40,14 +40,20 @@ int shmem_parse(struct node *n, config_setting_t *cfg) { struct shmem *shm = n->_vd; - if (!config_setting_lookup_string(cfg, "name", &shm->name)) - cerror(cfg, "Missing shared memory object name"); - if (!config_setting_lookup_int(cfg, "queuelen", &shm->queuelen)) - shm->queuelen = DEFAULT_SHMEM_QUEUELEN; - if (!config_setting_lookup_int(cfg, "samplelen", &shm->samplelen)) - shm->samplelen = DEFAULT_SHMEM_SAMPLELEN; - if (!config_setting_lookup_bool(cfg, "polling", &shm->polling)) - shm->polling = false; + if (!config_setting_lookup_string(cfg, "out_name", &shm->out_name)) + cerror(cfg, "Missing shared memory output queue name"); + + if (!config_setting_lookup_string(cfg, "in_name", &shm->in_name)) + cerror(cfg, "Missing shared memory input queue name"); + + if (!config_setting_lookup_int(cfg, "queuelen", &shm->conf.queuelen)) + shm->conf.queuelen = DEFAULT_SHMEM_QUEUELEN; + + if (!config_setting_lookup_int(cfg, "samplelen", &shm->conf.samplelen)) + shm->conf.samplelen = DEFAULT_SHMEM_SAMPLELEN; + + if (!config_setting_lookup_bool(cfg, "polling", &shm->conf.polling)) + shm->conf.polling = false; config_setting_t *exec_cfg = config_setting_lookup(cfg, "exec"); if (!exec_cfg) @@ -77,48 +83,6 @@ int shmem_open(struct node *n) { struct shmem *shm = n->_vd; int ret; - size_t len; - - shm->fd = shm_open(shm->name, O_RDWR | O_CREAT, 0600); - if (shm->fd < 0) - serror("Opening shared memory object failed"); - - len = shmem_total_size(shm->queuelen, shm->queuelen, shm->samplelen); - - ret = ftruncate(shm->fd, len); - if (ret < 0) - serror("Setting size of shared memory object failed"); - - shm->base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, shm->fd, 0); - if (shm->base == MAP_FAILED) - serror("Mapping shared memory failed"); - - shm->manager = memtype_managed_init(shm->base, len); - shm->shared = memory_alloc(shm->manager, sizeof(struct shmem_shared)); - if (!shm->shared) - error("Shared memory shared struct allocation failed (not enough memory?)"); - - memset(shm->shared, 0, sizeof(struct shmem_shared)); - shm->shared->len = len; - shm->shared->polling = shm->polling; - - ret = shm->polling ? queue_init(&shm->shared->in.q, shm->queuelen, shm->manager) - : queue_signalled_init(&shm->shared->in.qs, shm->queuelen, shm->manager); - if (ret) - error("Shared memory queue allocation failed (not enough memory?)"); - - ret = shm->polling ? queue_init(&shm->shared->out.q, shm->queuelen, shm->manager) - : queue_signalled_init(&shm->shared->out.qs, shm->queuelen, shm->manager); - if (ret) - error("Shared memory queue allocation failed (not enough memory?)"); - - ret = pool_init(&shm->shared->pool, 2 * shm->queuelen, SAMPLE_LEN(shm->samplelen), shm->manager); - if (ret) - error("Shared memory pool allocation failed (not enough memory?)"); - - pthread_barrierattr_init(&shm->shared->start_attr); - pthread_barrierattr_setpshared(&shm->shared->start_attr, PTHREAD_PROCESS_SHARED); - pthread_barrier_init(&shm->shared->start_bar, &shm->shared->start_attr, 2); if (shm->exec) { ret = spawn(shm->exec[0], shm->exec); @@ -126,7 +90,9 @@ int shmem_open(struct node *n) serror("Failed to spawn external program"); } - pthread_barrier_wait(&shm->shared->start_bar); + ret = shmem_int_open(shm->out_name, shm->in_name, &shm->intf, &shm->conf); + if (ret < 0) + serror("Opening shared memory interface failed"); return 0; } @@ -134,41 +100,33 @@ int shmem_open(struct node *n) int shmem_close(struct node *n) { struct shmem* shm = n->_vd; - int ret; - if (shm->polling) - queue_close(&shm->shared->out.q); - else - queue_signalled_close(&shm->shared->out.qs); - - /* Don't destroy the data structures yet, since the other process might - * still be using them. Once both processes are done and have unmapped the - * memory, it will be freed anyway. */ - ret = munmap(shm->base, shm->shared->len); - if (ret != 0) - return ret; - - return shm_unlink(shm->name); + return shmem_int_close(&shm->intf); } int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) { struct shmem *shm = n->_vd; - int recv; - struct sample *shared_smps[cnt]; - recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) shared_smps, cnt) - : queue_signalled_pull_many(&shm->shared->in.qs, (void**) shared_smps, cnt); + + recv = shmem_int_read(&shm->intf, shared_smps, cnt); + if (recv < 0) + /* This can only really mean that the other process has exited, so close + * the interface to make sure the shared memory object is unlinked */ + shmem_int_close(&shm->intf); if (recv <= 0) return recv; sample_copy_many(smps, shared_smps, recv); sample_put_many(shared_smps, recv); + struct timespec ts_recv = time_now(); + for (int i = 0; i < recv; i++) smps[i]->ts.received = ts_recv; + return recv; } @@ -178,9 +136,9 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */ int avail, pushed, len; - avail = sample_alloc(&shm->shared->pool, shared_smps, cnt); + avail = sample_alloc(&shm->intf.write.shared->pool, shared_smps, cnt); if (avail != cnt) - warn("Pool underrun for shmem node %s", shm->name); + warn("Pool underrun for shmem node %s", shm->out_name); for (int i = 0; i < avail; i++) { /* Since the node isn't in shared memory, the source can't be accessed */ @@ -197,9 +155,7 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) shared_smps[i]->length = len; } - pushed = shm->polling ? queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail) - : queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail); - + pushed = shmem_int_write(&shm->intf, shared_smps, avail); if (pushed != avail) warn("Outgoing queue overrun for node %s", node_name(n)); @@ -211,8 +167,8 @@ char * shmem_print(struct node *n) struct shmem *shm = n->_vd; char *buf = NULL; - strcatf(&buf, "name=%s, queuelen=%d, samplelen=%d, polling=%s", - shm->name, shm->queuelen, shm->samplelen, shm->polling ? "yes" : "no"); + strcatf(&buf, "out_name=%s, in_name=%s, queuelen=%d, samplelen=%d, polling=%s", + shm->out_name, shm->in_name, shm->conf.queuelen, shm->conf.samplelen, shm->conf.polling ? "yes" : "no"); if (shm->exec) { strcatf(&buf, ", exec='"); diff --git a/lib/shmem.c b/lib/shmem.c index 79a765abc..fe9b5c8ab 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -21,7 +21,9 @@ * along with this program. If not, see . *********************************************************************************/ +#include #include +#include #include #include @@ -47,70 +49,123 @@ size_t shmem_total_size(int insize, int outsize, int sample_size) + 1024; } -struct shmem_shared * shmem_shared_open(const char *name, void **base_ptr) +int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, struct shmem_conf *conf) { - struct shmem_shared *shm; - size_t len, newlen; - void *base; char *cptr; - int fd, ret; + int fd, ret; + size_t len; + void *base; + struct memtype *manager; + struct shmem_shared *shared; + struct stat stat_buf; + sem_t *sem_own, *sem_other; - fd = shm_open(name, O_RDWR, 0); + /* Ensure both semaphores exist */ + sem_own = sem_open(wname, O_CREAT, 0600, 0); + if (sem_own == SEM_FAILED) + return -1; + + sem_other = sem_open(rname, O_CREAT, 0600, 0); + if (sem_other == SEM_FAILED) + return -1; + + /* Open and initialize the shared region for the output queue */ + fd = shm_open(wname, O_RDWR|O_CREAT|O_EXCL, 0600); if (fd < 0) - return NULL; + return -1; - /* Only map the first part (shmem_shared) first, read the correct length, - * the map it with this length. */ - len = sizeof(struct memtype) + sizeof(struct memblock) + sizeof(struct shmem_shared); + len = shmem_total_size(conf->queuelen, conf->queuelen, conf->samplelen); + if (ftruncate(fd, len) < 0) + return -1; base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (base == MAP_FAILED) - return NULL; + return -1; - /* This relies on the behaviour of the node and the allocator; it assumes - * that memtype_managed is used and the shmem_shared is the first allocated object */ - cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); - shm = (struct shmem_shared *) cptr; - newlen = shm->len; + close(fd); - ret = munmap(base, len); - if (ret) - return NULL; + manager = memtype_managed_init(base, len); + shared = memory_alloc(manager, sizeof(struct shmem_shared)); + if (!shared) { + errno = ENOMEM; + return -1; + } + + memset(shared, 0, sizeof(struct shmem_shared)); + shared->polling = conf->polling; + + ret = shared->polling ? queue_init(&shared->queue.q, conf->queuelen, manager) + : queue_signalled_init(&shared->queue.qs, conf->queuelen, manager); + if (ret) { + errno = ENOMEM; + return -1; + } + + ret = pool_init(&shared->pool, conf->queuelen, SAMPLE_LEN(conf->samplelen), manager); + if (ret) { + errno = ENOMEM; + return -1; + } + + shm->write.base = base; + shm->write.name = wname; + shm->write.len = len; + shm->write.shared = shared; + + /* Post own semaphore and wait on the other one, so both processes know that + * both regions are initialized */ + sem_post(sem_own); + sem_wait(sem_other); + + /* Open and map the other region */ + fd = shm_open(rname, O_RDWR, 0); + if (fd < 0) + return -1; + + if (fstat(fd, &stat_buf) < 0) + return -1; + + len = stat_buf.st_size; + base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - base = mmap(NULL, newlen, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (base == MAP_FAILED) - return NULL; + return -1; - /* Adress might have moved */ cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); - if (base_ptr) - *base_ptr = base; + shared = (struct shmem_shared *) cptr; + shm->read.base = base; + shm->read.name = rname; + shm->read.len = len; + shm->read.shared = shared; - shm = (struct shmem_shared *) cptr; + /* Unlink the semaphores; we don't need them anymore */ + sem_unlink(wname); - pthread_barrier_wait(&shm->start_bar); - - return shm; + return 0; } -int shmem_shared_close(struct shmem_shared *shm, void *base) +int shmem_int_close(struct shmem_int *shm) { - if (shm->polling) - queue_close(&shm->in.q); + if (shm->write.shared->polling) + queue_close(&shm->write.shared->queue.q); else - queue_signalled_close(&shm->in.qs); + queue_signalled_close(&shm->write.shared->queue.qs); - return munmap(base, shm->len); + munmap(shm->read.base, shm->read.len); + munmap(shm->write.base, shm->write.len); + shm_unlink(shm->write.name); + + return 0; } -int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) +int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt) { - return shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt) - : queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt); + return shm->read.shared->polling ? queue_pull_many(&shm->read.shared->queue.q, (void **) smps, cnt) + : queue_signalled_pull_many(&shm->read.shared->queue.qs, (void **) smps, cnt); } -int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) +int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt) { - return shm->polling ? queue_push_many(&shm->in.q, (void **) smps, cnt) - : queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt); + return shm->write.shared->polling ? queue_push_many(&shm->write.shared->queue.q, (void **) smps, cnt) + : queue_signalled_push_many(&shm->write.shared->queue.qs, (void **) smps, cnt); } diff --git a/src/fpga-bench-overruns.c b/src/fpga-bench-overruns.c index 4fc6b44f0..4e82a2dd5 100644 --- a/src/fpga-bench-overruns.c +++ b/src/fpga-bench-overruns.c @@ -7,6 +7,11 @@ #include #include +#include +#include +#include +#include +#include #include /* Some hard-coded configuration for the FPGA benchmarks */ diff --git a/src/pipe.c b/src/pipe.c index 97557f527..eb93e5428 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -60,8 +60,6 @@ pthread_t ptid; /**< Parent thread id */ static void quit(int signal, siginfo_t *sinfo, void *ctx) { - int ret; - if (recvv.started) { pthread_cancel(recvv.thread); pthread_join(recvv.thread, NULL); @@ -74,11 +72,13 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) pool_destroy(&sendd.pool); } - ret = super_node_stop(&sn); - if (ret) - error("Failed to stop super-node"); - - super_node_destroy(&sn); + node_stop(node); + node_destroy(node); + + if (node->_vt->start == websocket_start) { + web_stop(&sn.web); + api_stop(&sn.api); + } info(GRN("Goodbye!")); exit(EXIT_SUCCESS); diff --git a/src/test-shmem.c b/src/test-shmem.c index 06b38fab4..14e195ce1 100644 --- a/src/test-shmem.c +++ b/src/test-shmem.c @@ -25,7 +25,6 @@ #include #include -#include #include #include #include @@ -34,40 +33,46 @@ #include void *base; -struct shmem_shared *shared; +struct shmem_int shm; void usage() { - printf("Usage: villas-test-shmem SHM_NAME VECTORIZE\n"); - printf(" SHMNAME name of the shared memory object\n"); + printf("Usage: villas-test-shmem WNAME VECTORIZE\n"); + printf(" WNAME name of the shared memory object for the output queue\n"); + printf(" RNAME name of the shared memory object for the input queue\n"); printf(" VECTORIZE maximum number of samples to read/write at a time\n"); } void quit(int sig) { - shmem_shared_close(shared, base); + shmem_int_close(&shm); exit(1); } int main(int argc, char* argv[]) { struct log log; + int readcnt, writecnt, avail; + struct shmem_conf conf = { + .queuelen = DEFAULT_SHMEM_QUEUELEN, + .samplelen = DEFAULT_SHMEM_SAMPLELEN, + .polling = 0, + }; log_init(&log, V, LOG_ALL); log_start(&log); - int readcnt, writecnt, avail; - if (argc != 3) { + if (argc != 4) { usage(); return 1; } - char *object = argv[1]; - int vectorize = atoi(argv[2]); + char *wname = argv[1]; + char *rname = argv[2]; + int vectorize = atoi(argv[3]); - shared = shmem_shared_open(object, &base); - if (!shared) + if (shmem_int_open(wname, rname, &shm, &conf) < 0) serror("Failed to open shmem interface"); signal(SIGINT, quit); @@ -75,13 +80,13 @@ int main(int argc, char* argv[]) struct sample *insmps[vectorize], *outsmps[vectorize]; while (1) { - readcnt = shmem_shared_read(shared, insmps, vectorize); + readcnt = shmem_int_read(&shm, insmps, vectorize); if (readcnt == -1) { printf("Node stopped, exiting"); break; } - avail = sample_alloc(&shared->pool, outsmps, readcnt); + avail = shmem_int_alloc(&shm, outsmps, readcnt); if (avail < readcnt) warn("Pool underrun: %d / %d\n", avail, readcnt); @@ -98,10 +103,11 @@ int main(int argc, char* argv[]) for (int i = 0; i < readcnt; i++) sample_put(insmps[i]); - writecnt = shmem_shared_write(shared, outsmps, avail); + writecnt = shmem_int_write(&shm, outsmps, avail); if (writecnt < avail) warn("Short write"); info("Read / Write: %d / %d", readcnt, writecnt); } + shmem_int_close(&shm); } diff --git a/tests/integration/pipe-loopback-shmem.sh b/tests/integration/pipe-loopback-shmem.sh new file mode 100755 index 000000000..81db13082 --- /dev/null +++ b/tests/integration/pipe-loopback-shmem.sh @@ -0,0 +1,72 @@ +#!/bin/bash +# +# Integration loopback test for villas-pipe. +# +# @author Steffen Vogel +# @copyright 2017, Institute for Automation of Complex Power Systems, EONERC +# @license GNU General Public License (version 3) +# +# VILLASnode +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +################################################################################## + +CONFIG_FILE=$(mktemp) +INPUT_FILE=$(mktemp) +OUTPUT_FILE=$(mktemp) + +for POLLING in true false; do +for VECTORIZE in 1 5 20; do + +cat > ${CONFIG_FILE} << EOF +nodes = { + node1 = { + type = "shmem"; + out_name = "/villas-test"; + in_name = "/villas-test"; + samplelen = 4; + queuelen = 32; + polling = ${POLLING}; + vectorize = ${VECTORIZE} + } +} +EOF + +# Generate test data +villas-signal random -l 20 -n > ${INPUT_FILE} + +# We delay EOF of the INPUT_FILE by 1 second in order to wait for incoming data to be received +villas-pipe ${CONFIG_FILE} node1 > ${OUTPUT_FILE} < <(cat ${INPUT_FILE}; sleep 1; echo -n) + +# Comapre data +villas-test-cmp ${INPUT_FILE} ${OUTPUT_FILE} +RC=$? + +if (( ${RC} != 0 )); then + echo "=========== Sub-test failed for: polling=${POLLING}, vecotrize=${VECTORIZE}" + cat ${CONFIG_FILE} + echo + cat ${INPUT_FILE} + echo + cat ${OUTPUT_FILE} + exit ${RC} +else + echo "=========== Sub-test succeeded for: polling=${POLLING}, vecotrize=${VECTORIZE}" +fi + +done; done + +rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE} + +exit $RC diff --git a/tests/integration/pipe-loopback-socket-full.sh b/tests/integration/pipe-loopback-socket-full.sh index f30677a15..0bc3df499 100755 --- a/tests/integration/pipe-loopback-socket-full.sh +++ b/tests/integration/pipe-loopback-socket-full.sh @@ -30,7 +30,7 @@ THEORIES=$(mktemp) # Generate test data villas-signal random -l 10 -n > ${INPUT_FILE} -for LAYER in udp ip eth; do +for LAYER in udp ip eth; do for HEADER in none default; do for ENDIAN in big little; do for VERIFY_SOURCE in true false; do @@ -93,11 +93,13 @@ EOF villas-pipe -r ${CONFIG_FILE} node2 > ${OUTPUT_FILE} & PID=$! -sleep 0.1 +sleep 0.5 # We delay EOF of the INPUT_FILE by 1 second in order to wait for incoming data to be received villas-pipe -s ${CONFIG_FILE} node1 < ${INPUT_FILE} +sleep 0.5 + kill ${PID} # Comapre data @@ -105,7 +107,7 @@ villas-test-cmp ${INPUT_FILE} ${OUTPUT_FILE} RC=$? if (( ${RC} != 0 )); then - echo "=========== Sub-test failed for: ${LAYER} ${HEADER} ${ENDIAN} ${VERIFY_SOURCE}" + echo "=========== Sub-test failed for: layer=${LAYER}, header=${HEADER}, endian=${ENDIAN} verify_source=${VERIFY_SOURCE}" cat ${CONFIG_FILE} echo cat ${INPUT_FILE} @@ -113,7 +115,7 @@ if (( ${RC} != 0 )); then cat ${OUTPUT_FILE} exit ${RC} else - echo "=========== Sub-test succeeded for: ${LAYER} ${HEADER} ${ENDIAN} ${VERIFY_SOURCE}" + echo "=========== Sub-test succeeded for: layer=${LAYER}, header=${HEADER}, endian=${ENDIAN} verify_source=${VERIFY_SOURCE}" fi done; done; done; done