1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'shmem-rework' into 'develop'

Rework shmem interface to be symmetric

See merge request !23
This commit is contained in:
Steffen Vogel 2017-06-15 15:39:57 +02:00
commit bde2b6ebba
14 changed files with 325 additions and 194 deletions

View file

@ -21,7 +21,8 @@ nodes = {
},
shmem = {
type = "shmem",
name = "/villas1",
out_name = "/villas1-out",
in_name = "/villas1-in",
samplelen = 4,
queuelen = 32,
polling = false,

View file

@ -32,6 +32,8 @@
#include <libconfig.h>
#include "common.h"
#include "kernel/pci.h"
#include "kernel/vfio.h"
/* Forward declarations */
struct fpga_ip;
@ -92,4 +94,4 @@ void fpga_card_dump(struct fpga_card *c);
/** Reset the FPGA to a known state */
int fpga_card_reset(struct fpga_card *c);
/** @} */
/** @} */

View file

@ -34,24 +34,17 @@
#include "pool.h"
#include "queue.h"
#include "config.h"
#define DEFAULT_SHMEM_QUEUELEN 512
#define DEFAULT_SHMEM_SAMPLELEN DEFAULT_SAMPLELEN
#include "shmem.h"
/** Node-type for shared memory communication.
* @see node_type
*/
struct shmem {
const char* name; /**< Name of the shm object. */
int samplelen; /**< Number of data entries for each sample. */
int queuelen; /**< Size of ingoing and outgoing queue, respectively. */
int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to outgoing queue. */
char **exec; /**< External program to execute on start. */
struct memtype *manager; /**< Manager for the shared memory region. */
int fd; /**< Handle as returned by shm_open().*/
void *base; /**< Pointer to the shared memory region. */
struct shmem_shared *shared; /**< Shared datastructure. */
const char* out_name; /**< Name of the shm object for the output queue. */
const char* in_name; /**< Name of the shm object for the input queue. */
struct shmem_conf conf; /**< Interface configuration struct. */
char **exec; /**< External program to execute on start. */
struct shmem_int intf; /**< Shmem interface */
};
/** @see node_type::print */

View file

@ -36,6 +36,10 @@ extern "C" {
#include "pool.h"
#include "queue.h"
#include "queue_signalled.h"
#include "sample.h"
#define DEFAULT_SHMEM_QUEUELEN 512
#define DEFAULT_SHMEM_SAMPLELEN 64
/** A signalled queue or a regular (polling) queue, depending on the polling setting. */
union shmem_queue {
@ -43,56 +47,90 @@ union shmem_queue {
struct queue_signalled qs;
};
/** The structure that actually resides in the shared memory. */
struct shmem_shared {
size_t len; /**< Total size of the shared memory region.*/
#define SHMEM_MIN_SIZE (sizeof(struct memtype) + sizeof(struct memblock) + sizeof(pthread_barrier_t) + sizeof(pthread_barrierattr_t))
int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */
union shmem_queue in; /**< Queue for samples passed from external program to node.*/
union shmem_queue out; /**< Queue for samples passed from node to external program.*/
struct pool pool; /**< Pool for the samples in the queues. */
pthread_barrier_t start_bar; /**< Barrier for synchronizing the start of both programs. */
pthread_barrierattr_t start_attr;
atomic_size_t node_stopped; /**< Set to 1 by VILLASNode if it is stopped/killed. */
atomic_size_t ext_stopped; /**< Set to 1 by the external program if it is stopped/killed. */
/** Struct containing all parameters that need to be known when creating a new
* shared memory object. */
struct shmem_conf {
int polling; /**< Whether to use polling instead of POSIX CVs */
int queuelen; /**< Size of the queues (in elements) */
int samplelen; /**< Maximum number of data entries in a single sample */
};
/** Open the shared memory object and retrieve / initialize the shared data structures.
* @param[in] name Name of the POSIX shared memory object.
* @param[inout] base_ptr The base address of the shared memory region is written to this pointer.
* @retval NULL An error occurred; errno is set appropiately.
*/
struct shmem_shared * shmem_shared_open(const char* name, void **base_ptr);
/** The structure that actually resides in the shared memory. */
struct shmem_shared {
int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */
union shmem_queue queue; /**< Queues for samples passed in both directions. */
struct pool pool; /**< Pool for the samples in the queues. */
};
/** Close and destroy the shared memory object and related structures.
* @param shm The shared memory structure.
* @param base The base address as returned by shmem_shared_open.
/** Relevant information for one direction of the interface. */
struct shmem_dir {
void *base; /**< Base address of the region. */
const char *name; /**< Name of the shmem object. */
size_t len; /**< Total size of the region. */
struct shmem_shared *shared; /**< Actually shared datastructure */
};
/** Main structure representing the shared memory interface. */
struct shmem_int {
struct shmem_dir read, write;
};
/** Open the shared memory objects and retrieve / initialize the shared data structures.
* Blocks until another process connects by opening the same objects.
*
* @param[in] wname Name of the POSIX shared memory object containing the output queue.
* @param[in] rname Name of the POSIX shared memory object containing the input queue.
* @param[inout] shm The shmem_int structure that should be used for following
* calls will be written to this pointer.
* @param[in] conf Configuration parameters for the output queue.
* @retval 0 The objects were opened and initialized successfully.
* @retval <0 An error occured; errno is set accordingly.
*/
int shmem_int_open(const char* wname, const char* rname, struct shmem_int* shm, struct shmem_conf* conf);
/** Close and destroy the shared memory interface and related structures.
*
* @param shm The shared memory interface.
* @retval 0 Closing successfull.
* @retval <0 An error occurred; errno is set appropiately.
*/
int shmem_shared_close(struct shmem_shared *shm, void *base);
int shmem_int_close(struct shmem_int *shm);
/** Read samples from VILLASNode.
* @param shm The shared memory structure.
/** Read samples from the interface.
*
* @param shm The shared memory interface.
* @param smps An array where the pointers to the samples will be written. The samples
* must be freed with sample_put after use.
* @param cnt Number of samples to be read.
* @retval >=0 Number of samples that were read. Can be less than cnt (including 0) in case not enough samples were available.
* @retval -1 VILLASNode exited; no samples can be read anymore.
* @retval -1 The other process closed the interface; no samples can be read anymore.
*/
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt);
int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt);
/** Write samples to VILLASNode.
* @param shm The shared memory structure.
* @param smps The samples to be written. Must be allocated from shm->pool.
/** Write samples to the interface.
*
* @param shm The shared memory interface.
* @param smps The samples to be written. Must be allocated from shm_int_alloc.
* @param cnt Number of samples to write.
* @retval >=0 Number of samples that were successfully written. Can be less than cnt (including 0) in case of a full queue.
* @retval -1 VILLASNode exited; no samples can be written anymore.
* @retval -1 The write failed for some reason; no more samples can be written.
*/
int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt);
int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt);
/** Allocate samples to be written to the interface. The writing process must
*
* not free the samples; only the receiving process should free them using
* sample_put after use.
* @param shm The shared memory interface.
* @param smps Array where pointers to newly allocated samples will be returned.
* @param cnt Number of samples to allocate.
* @returns Number of samples that were successfully allocated (may be less then cnt).
*/
inline int shmem_int_alloc(struct shmem_int *shm, struct sample *smps[], unsigned cnt) {
return sample_alloc(&shm->write.shared->pool, smps, cnt);
}
/** Returns the total size of the shared memory region with the given size of
* the input/output queues (in elements) and the given number of data elements

View file

@ -26,7 +26,7 @@ LIBEXT = $(BUILDDIR)/$(LIBEXT_NAME).so.$(LIBEXT_ABI_VERSION)
LIBEXT_SRCS += $(addprefix lib/, sample.c queue.c queue_signalled.c \
memory.c log.c shmem.c utils.c kernel/kernel.c list.c \
timing.c \
timing.c pool.c \
)
LIBEXT_LDFLAGS = -shared

View file

@ -8,6 +8,7 @@
#include <libgen.h>
#include <string.h>
#include <unistd.h>
#include <linux/limits.h>
#include "log.h"
#include "utils.h"
@ -20,7 +21,7 @@ int pci_init(struct pci *p)
struct dirent *entry;
DIR *dp;
FILE *f;
char path[256];
char path[PATH_MAX];
int ret;
snprintf(path, sizeof(path), "%s/bus/pci/devices", SYSFS_PATH);

View file

@ -238,9 +238,9 @@ static int ngsi_parse_mapping(struct list *mapping, config_setting_t *cfg)
struct ngsi_metadata index = {
.name = "index",
.type = "integer",
.value = alloc(8)
.value = alloc(11)
};
snprintf(index.value, 8, "%u", j);
snprintf(index.value, 11, "%u", j);
list_push(&map.metadata, memdup(&index, sizeof(index)));
list_push(&map.metadata, memdup(&source, sizeof(source)));
@ -600,4 +600,4 @@ static struct plugin p = {
}
};
REGISTER_PLUGIN(&p)
REGISTER_PLUGIN(&p)

View file

@ -30,9 +30,9 @@
#include "kernel/kernel.h"
#include "log.h"
#include "shmem.h"
#include "nodes/shmem.h"
#include "plugin.h"
#include "shmem.h"
#include "timing.h"
#include "utils.h"
@ -40,14 +40,20 @@ int shmem_parse(struct node *n, config_setting_t *cfg)
{
struct shmem *shm = n->_vd;
if (!config_setting_lookup_string(cfg, "name", &shm->name))
cerror(cfg, "Missing shared memory object name");
if (!config_setting_lookup_int(cfg, "queuelen", &shm->queuelen))
shm->queuelen = DEFAULT_SHMEM_QUEUELEN;
if (!config_setting_lookup_int(cfg, "samplelen", &shm->samplelen))
shm->samplelen = DEFAULT_SHMEM_SAMPLELEN;
if (!config_setting_lookup_bool(cfg, "polling", &shm->polling))
shm->polling = false;
if (!config_setting_lookup_string(cfg, "out_name", &shm->out_name))
cerror(cfg, "Missing shared memory output queue name");
if (!config_setting_lookup_string(cfg, "in_name", &shm->in_name))
cerror(cfg, "Missing shared memory input queue name");
if (!config_setting_lookup_int(cfg, "queuelen", &shm->conf.queuelen))
shm->conf.queuelen = DEFAULT_SHMEM_QUEUELEN;
if (!config_setting_lookup_int(cfg, "samplelen", &shm->conf.samplelen))
shm->conf.samplelen = DEFAULT_SHMEM_SAMPLELEN;
if (!config_setting_lookup_bool(cfg, "polling", &shm->conf.polling))
shm->conf.polling = false;
config_setting_t *exec_cfg = config_setting_lookup(cfg, "exec");
if (!exec_cfg)
@ -77,48 +83,6 @@ int shmem_open(struct node *n)
{
struct shmem *shm = n->_vd;
int ret;
size_t len;
shm->fd = shm_open(shm->name, O_RDWR | O_CREAT, 0600);
if (shm->fd < 0)
serror("Opening shared memory object failed");
len = shmem_total_size(shm->queuelen, shm->queuelen, shm->samplelen);
ret = ftruncate(shm->fd, len);
if (ret < 0)
serror("Setting size of shared memory object failed");
shm->base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, shm->fd, 0);
if (shm->base == MAP_FAILED)
serror("Mapping shared memory failed");
shm->manager = memtype_managed_init(shm->base, len);
shm->shared = memory_alloc(shm->manager, sizeof(struct shmem_shared));
if (!shm->shared)
error("Shared memory shared struct allocation failed (not enough memory?)");
memset(shm->shared, 0, sizeof(struct shmem_shared));
shm->shared->len = len;
shm->shared->polling = shm->polling;
ret = shm->polling ? queue_init(&shm->shared->in.q, shm->queuelen, shm->manager)
: queue_signalled_init(&shm->shared->in.qs, shm->queuelen, shm->manager);
if (ret)
error("Shared memory queue allocation failed (not enough memory?)");
ret = shm->polling ? queue_init(&shm->shared->out.q, shm->queuelen, shm->manager)
: queue_signalled_init(&shm->shared->out.qs, shm->queuelen, shm->manager);
if (ret)
error("Shared memory queue allocation failed (not enough memory?)");
ret = pool_init(&shm->shared->pool, 2 * shm->queuelen, SAMPLE_LEN(shm->samplelen), shm->manager);
if (ret)
error("Shared memory pool allocation failed (not enough memory?)");
pthread_barrierattr_init(&shm->shared->start_attr);
pthread_barrierattr_setpshared(&shm->shared->start_attr, PTHREAD_PROCESS_SHARED);
pthread_barrier_init(&shm->shared->start_bar, &shm->shared->start_attr, 2);
if (shm->exec) {
ret = spawn(shm->exec[0], shm->exec);
@ -126,7 +90,9 @@ int shmem_open(struct node *n)
serror("Failed to spawn external program");
}
pthread_barrier_wait(&shm->shared->start_bar);
ret = shmem_int_open(shm->out_name, shm->in_name, &shm->intf, &shm->conf);
if (ret < 0)
serror("Opening shared memory interface failed");
return 0;
}
@ -134,41 +100,33 @@ int shmem_open(struct node *n)
int shmem_close(struct node *n)
{
struct shmem* shm = n->_vd;
int ret;
if (shm->polling)
queue_close(&shm->shared->out.q);
else
queue_signalled_close(&shm->shared->out.qs);
/* Don't destroy the data structures yet, since the other process might
* still be using them. Once both processes are done and have unmapped the
* memory, it will be freed anyway. */
ret = munmap(shm->base, shm->shared->len);
if (ret != 0)
return ret;
return shm_unlink(shm->name);
return shmem_int_close(&shm->intf);
}
int shmem_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct shmem *shm = n->_vd;
int recv;
struct sample *shared_smps[cnt];
recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) shared_smps, cnt)
: queue_signalled_pull_many(&shm->shared->in.qs, (void**) shared_smps, cnt);
recv = shmem_int_read(&shm->intf, shared_smps, cnt);
if (recv < 0)
/* This can only really mean that the other process has exited, so close
* the interface to make sure the shared memory object is unlinked */
shmem_int_close(&shm->intf);
if (recv <= 0)
return recv;
sample_copy_many(smps, shared_smps, recv);
sample_put_many(shared_smps, recv);
struct timespec ts_recv = time_now();
for (int i = 0; i < recv; i++)
smps[i]->ts.received = ts_recv;
return recv;
}
@ -178,9 +136,9 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */
int avail, pushed, len;
avail = sample_alloc(&shm->shared->pool, shared_smps, cnt);
avail = sample_alloc(&shm->intf.write.shared->pool, shared_smps, cnt);
if (avail != cnt)
warn("Pool underrun for shmem node %s", shm->name);
warn("Pool underrun for shmem node %s", shm->out_name);
for (int i = 0; i < avail; i++) {
/* Since the node isn't in shared memory, the source can't be accessed */
@ -197,9 +155,7 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
shared_smps[i]->length = len;
}
pushed = shm->polling ? queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail)
: queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail);
pushed = shmem_int_write(&shm->intf, shared_smps, avail);
if (pushed != avail)
warn("Outgoing queue overrun for node %s", node_name(n));
@ -211,8 +167,8 @@ char * shmem_print(struct node *n)
struct shmem *shm = n->_vd;
char *buf = NULL;
strcatf(&buf, "name=%s, queuelen=%d, samplelen=%d, polling=%s",
shm->name, shm->queuelen, shm->samplelen, shm->polling ? "yes" : "no");
strcatf(&buf, "out_name=%s, in_name=%s, queuelen=%d, samplelen=%d, polling=%s",
shm->out_name, shm->in_name, shm->conf.queuelen, shm->conf.samplelen, shm->conf.polling ? "yes" : "no");
if (shm->exec) {
strcatf(&buf, ", exec='");

View file

@ -21,7 +21,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#include <errno.h>
#include <fcntl.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <sys/stat.h>
@ -47,70 +49,123 @@ size_t shmem_total_size(int insize, int outsize, int sample_size)
+ 1024;
}
struct shmem_shared * shmem_shared_open(const char *name, void **base_ptr)
int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, struct shmem_conf *conf)
{
struct shmem_shared *shm;
size_t len, newlen;
void *base;
char *cptr;
int fd, ret;
int fd, ret;
size_t len;
void *base;
struct memtype *manager;
struct shmem_shared *shared;
struct stat stat_buf;
sem_t *sem_own, *sem_other;
fd = shm_open(name, O_RDWR, 0);
/* Ensure both semaphores exist */
sem_own = sem_open(wname, O_CREAT, 0600, 0);
if (sem_own == SEM_FAILED)
return -1;
sem_other = sem_open(rname, O_CREAT, 0600, 0);
if (sem_other == SEM_FAILED)
return -1;
/* Open and initialize the shared region for the output queue */
fd = shm_open(wname, O_RDWR|O_CREAT|O_EXCL, 0600);
if (fd < 0)
return NULL;
return -1;
/* Only map the first part (shmem_shared) first, read the correct length,
* the map it with this length. */
len = sizeof(struct memtype) + sizeof(struct memblock) + sizeof(struct shmem_shared);
len = shmem_total_size(conf->queuelen, conf->queuelen, conf->samplelen);
if (ftruncate(fd, len) < 0)
return -1;
base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
return NULL;
return -1;
/* This relies on the behaviour of the node and the allocator; it assumes
* that memtype_managed is used and the shmem_shared is the first allocated object */
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
shm = (struct shmem_shared *) cptr;
newlen = shm->len;
close(fd);
ret = munmap(base, len);
if (ret)
return NULL;
manager = memtype_managed_init(base, len);
shared = memory_alloc(manager, sizeof(struct shmem_shared));
if (!shared) {
errno = ENOMEM;
return -1;
}
memset(shared, 0, sizeof(struct shmem_shared));
shared->polling = conf->polling;
ret = shared->polling ? queue_init(&shared->queue.q, conf->queuelen, manager)
: queue_signalled_init(&shared->queue.qs, conf->queuelen, manager);
if (ret) {
errno = ENOMEM;
return -1;
}
ret = pool_init(&shared->pool, conf->queuelen, SAMPLE_LEN(conf->samplelen), manager);
if (ret) {
errno = ENOMEM;
return -1;
}
shm->write.base = base;
shm->write.name = wname;
shm->write.len = len;
shm->write.shared = shared;
/* Post own semaphore and wait on the other one, so both processes know that
* both regions are initialized */
sem_post(sem_own);
sem_wait(sem_other);
/* Open and map the other region */
fd = shm_open(rname, O_RDWR, 0);
if (fd < 0)
return -1;
if (fstat(fd, &stat_buf) < 0)
return -1;
len = stat_buf.st_size;
base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
base = mmap(NULL, newlen, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED)
return NULL;
return -1;
/* Adress might have moved */
cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
if (base_ptr)
*base_ptr = base;
shared = (struct shmem_shared *) cptr;
shm->read.base = base;
shm->read.name = rname;
shm->read.len = len;
shm->read.shared = shared;
shm = (struct shmem_shared *) cptr;
/* Unlink the semaphores; we don't need them anymore */
sem_unlink(wname);
pthread_barrier_wait(&shm->start_bar);
return shm;
return 0;
}
int shmem_shared_close(struct shmem_shared *shm, void *base)
int shmem_int_close(struct shmem_int *shm)
{
if (shm->polling)
queue_close(&shm->in.q);
if (shm->write.shared->polling)
queue_close(&shm->write.shared->queue.q);
else
queue_signalled_close(&shm->in.qs);
queue_signalled_close(&shm->write.shared->queue.qs);
return munmap(base, shm->len);
munmap(shm->read.base, shm->read.len);
munmap(shm->write.base, shm->write.len);
shm_unlink(shm->write.name);
return 0;
}
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
{
return shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt)
: queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt);
return shm->read.shared->polling ? queue_pull_many(&shm->read.shared->queue.q, (void **) smps, cnt)
: queue_signalled_pull_many(&shm->read.shared->queue.qs, (void **) smps, cnt);
}
int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
{
return shm->polling ? queue_push_many(&shm->in.q, (void **) smps, cnt)
: queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt);
return shm->write.shared->polling ? queue_push_many(&shm->write.shared->queue.q, (void **) smps, cnt)
: queue_signalled_push_many(&shm->write.shared->queue.qs, (void **) smps, cnt);
}

View file

@ -7,6 +7,11 @@
#include <stdio.h>
#include <sys/utsname.h>
#include <fpga/card.h>
#include <fpga/ip.h>
#include <fpga/ips/switch.h>
#include <fpga/ips/intc.h>
#include <utils.h>
#include <villas/log.h>
/* Some hard-coded configuration for the FPGA benchmarks */

View file

@ -60,8 +60,6 @@ pthread_t ptid; /**< Parent thread id */
static void quit(int signal, siginfo_t *sinfo, void *ctx)
{
int ret;
if (recvv.started) {
pthread_cancel(recvv.thread);
pthread_join(recvv.thread, NULL);
@ -74,11 +72,13 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
pool_destroy(&sendd.pool);
}
ret = super_node_stop(&sn);
if (ret)
error("Failed to stop super-node");
super_node_destroy(&sn);
node_stop(node);
node_destroy(node);
if (node->_vt->start == websocket_start) {
web_stop(&sn.web);
api_stop(&sn.api);
}
info(GRN("Goodbye!"));
exit(EXIT_SUCCESS);

View file

@ -25,7 +25,6 @@
#include <villas/log.h>
#include <villas/node.h>
#include <villas/nodes/shmem.h>
#include <villas/pool.h>
#include <villas/sample.h>
#include <villas/shmem.h>
@ -34,40 +33,46 @@
#include <string.h>
void *base;
struct shmem_shared *shared;
struct shmem_int shm;
void usage()
{
printf("Usage: villas-test-shmem SHM_NAME VECTORIZE\n");
printf(" SHMNAME name of the shared memory object\n");
printf("Usage: villas-test-shmem WNAME VECTORIZE\n");
printf(" WNAME name of the shared memory object for the output queue\n");
printf(" RNAME name of the shared memory object for the input queue\n");
printf(" VECTORIZE maximum number of samples to read/write at a time\n");
}
void quit(int sig)
{
shmem_shared_close(shared, base);
shmem_int_close(&shm);
exit(1);
}
int main(int argc, char* argv[])
{
struct log log;
int readcnt, writecnt, avail;
struct shmem_conf conf = {
.queuelen = DEFAULT_SHMEM_QUEUELEN,
.samplelen = DEFAULT_SHMEM_SAMPLELEN,
.polling = 0,
};
log_init(&log, V, LOG_ALL);
log_start(&log);
int readcnt, writecnt, avail;
if (argc != 3) {
if (argc != 4) {
usage();
return 1;
}
char *object = argv[1];
int vectorize = atoi(argv[2]);
char *wname = argv[1];
char *rname = argv[2];
int vectorize = atoi(argv[3]);
shared = shmem_shared_open(object, &base);
if (!shared)
if (shmem_int_open(wname, rname, &shm, &conf) < 0)
serror("Failed to open shmem interface");
signal(SIGINT, quit);
@ -75,13 +80,13 @@ int main(int argc, char* argv[])
struct sample *insmps[vectorize], *outsmps[vectorize];
while (1) {
readcnt = shmem_shared_read(shared, insmps, vectorize);
readcnt = shmem_int_read(&shm, insmps, vectorize);
if (readcnt == -1) {
printf("Node stopped, exiting");
break;
}
avail = sample_alloc(&shared->pool, outsmps, readcnt);
avail = shmem_int_alloc(&shm, outsmps, readcnt);
if (avail < readcnt)
warn("Pool underrun: %d / %d\n", avail, readcnt);
@ -98,10 +103,11 @@ int main(int argc, char* argv[])
for (int i = 0; i < readcnt; i++)
sample_put(insmps[i]);
writecnt = shmem_shared_write(shared, outsmps, avail);
writecnt = shmem_int_write(&shm, outsmps, avail);
if (writecnt < avail)
warn("Short write");
info("Read / Write: %d / %d", readcnt, writecnt);
}
shmem_int_close(&shm);
}

View file

@ -0,0 +1,72 @@
#!/bin/bash
#
# Integration loopback test for villas-pipe.
#
# @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
# @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
# @license GNU General Public License (version 3)
#
# VILLASnode
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
##################################################################################
CONFIG_FILE=$(mktemp)
INPUT_FILE=$(mktemp)
OUTPUT_FILE=$(mktemp)
for POLLING in true false; do
for VECTORIZE in 1 5 20; do
cat > ${CONFIG_FILE} << EOF
nodes = {
node1 = {
type = "shmem";
out_name = "/villas-test";
in_name = "/villas-test";
samplelen = 4;
queuelen = 32;
polling = ${POLLING};
vectorize = ${VECTORIZE}
}
}
EOF
# Generate test data
villas-signal random -l 20 -n > ${INPUT_FILE}
# We delay EOF of the INPUT_FILE by 1 second in order to wait for incoming data to be received
villas-pipe ${CONFIG_FILE} node1 > ${OUTPUT_FILE} < <(cat ${INPUT_FILE}; sleep 1; echo -n)
# Comapre data
villas-test-cmp ${INPUT_FILE} ${OUTPUT_FILE}
RC=$?
if (( ${RC} != 0 )); then
echo "=========== Sub-test failed for: polling=${POLLING}, vecotrize=${VECTORIZE}"
cat ${CONFIG_FILE}
echo
cat ${INPUT_FILE}
echo
cat ${OUTPUT_FILE}
exit ${RC}
else
echo "=========== Sub-test succeeded for: polling=${POLLING}, vecotrize=${VECTORIZE}"
fi
done; done
rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE}
exit $RC

View file

@ -30,7 +30,7 @@ THEORIES=$(mktemp)
# Generate test data
villas-signal random -l 10 -n > ${INPUT_FILE}
for LAYER in udp ip eth; do
for LAYER in udp ip eth; do
for HEADER in none default; do
for ENDIAN in big little; do
for VERIFY_SOURCE in true false; do
@ -93,11 +93,13 @@ EOF
villas-pipe -r ${CONFIG_FILE} node2 > ${OUTPUT_FILE} &
PID=$!
sleep 0.1
sleep 0.5
# We delay EOF of the INPUT_FILE by 1 second in order to wait for incoming data to be received
villas-pipe -s ${CONFIG_FILE} node1 < ${INPUT_FILE}
sleep 0.5
kill ${PID}
# Comapre data
@ -105,7 +107,7 @@ villas-test-cmp ${INPUT_FILE} ${OUTPUT_FILE}
RC=$?
if (( ${RC} != 0 )); then
echo "=========== Sub-test failed for: ${LAYER} ${HEADER} ${ENDIAN} ${VERIFY_SOURCE}"
echo "=========== Sub-test failed for: layer=${LAYER}, header=${HEADER}, endian=${ENDIAN} verify_source=${VERIFY_SOURCE}"
cat ${CONFIG_FILE}
echo
cat ${INPUT_FILE}
@ -113,7 +115,7 @@ if (( ${RC} != 0 )); then
cat ${OUTPUT_FILE}
exit ${RC}
else
echo "=========== Sub-test succeeded for: ${LAYER} ${HEADER} ${ENDIAN} ${VERIFY_SOURCE}"
echo "=========== Sub-test succeeded for: layer=${LAYER}, header=${HEADER}, endian=${ENDIAN} verify_source=${VERIFY_SOURCE}"
fi
done; done; done; done