diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h index 23357e11f..b307b4ae4 100644 --- a/include/villas/nodes/shmem.h +++ b/include/villas/nodes/shmem.h @@ -22,17 +22,17 @@ #define DEFAULT_SHMEM_QUEUESIZE 512 struct shmem { - const char* name; /**< Name of the shm object. */ - int sample_size; /**< Number of data entries for each sample. */ - int insize, outsize; /**< Size of ingoing and outgoing queue, respectively. */ - int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */ - int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */ - char **exec; /**< External program to execute on start. */ + const char* name; /**< Name of the shm object. */ + int sample_size; /**< Number of data entries for each sample. */ + int insize, outsize; /**< Size of ingoing and outgoing queue, respectively. */ + int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */ + int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */ + const char * const exec; /**< External program to execute on start. */ - struct memtype *manager; /**< Manager for the shared memory region. */ - int fd; /**< Handle as returned by shm_open().*/ - void *base; /**< Pointer to the shared memory region. */ - struct shmem_shared *shared; /**< Shared datastructure. */ + struct memtype *manager; /**< Manager for the shared memory region. */ + int fd; /**< Handle as returned by shm_open().*/ + void *base; /**< Pointer to the shared memory region. */ + struct shmem_shared *shared; /**< Shared datastructure. */ }; char *shmem_print(struct node *n); diff --git a/include/villas/queue_signalled.h b/include/villas/queue_signalled.h index 27baebbd9..740fb2e75 100644 --- a/include/villas/queue_signalled.h +++ b/include/villas/queue_signalled.h @@ -13,10 +13,10 @@ /** Wrapper around queue that uses POSIX CV's for signalling writes. */ struct queue_signalled { - struct queue q; /**< Actual underlying queue. */ - pthread_cond_t ready; /**< Condition variable to signal writes to the queue. */ + struct queue q; /**< Actual underlying queue. */ + pthread_cond_t ready; /**< Condition variable to signal writes to the queue. */ pthread_condattr_t readyattr; - pthread_mutex_t mt; /**< Mutex for ready. */ + pthread_mutex_t mt; /**< Mutex for ready. */ pthread_mutexattr_t mtattr; }; diff --git a/include/villas/shmem.h b/include/villas/shmem.h index 9ab8b05eb..d10149db3 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -1,4 +1,3 @@ -#pragma once /** Shared-memory interface: The interface functions that the external program should use. * * @file @@ -6,6 +5,7 @@ * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC *********************************************************************************/ +#pragma once #include "pool.h" #include "queue.h" @@ -18,12 +18,16 @@ union shmem_queue { /** The structure that actually resides in the shared memory. */ struct shmem_shared { - size_t len; /**< Total size of the shared memory region.*/ - union shmem_queue in; /**< Queue for samples passed from external program to node.*/ - int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */ - union shmem_queue out; /**< Queue for samples passed from node to external program.*/ - int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */ - struct pool pool; /**< Pool for the samples in the queues. */ + size_t len; /**< Total size of the shared memory region.*/ + + int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */ + int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */ + + union shmem_queue in; /**< Queue for samples passed from external program to node.*/ + union shmem_queue out; /**< Queue for samples passed from node to external program.*/ + + struct pool pool; /**< Pool for the samples in the queues. */ + pthread_barrier_t start_bar; pthread_barrierattr_t start_attr; atomic_size_t node_stopped; diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 5a42f5150..b78cebd68 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -18,12 +18,12 @@ #include "shmem.h" #include "utils.h" -int shmem_parse(struct node *n, config_setting_t *cfg) { +int shmem_parse(struct node *n, config_setting_t *cfg) +{ struct shmem *shm = n->_vd; if (!config_setting_lookup_string(cfg, "name", &shm->name)) - cerror(cfg, "Missing shm object name"); - + cerror(cfg, "Missing shared memory object name"); if (!config_setting_lookup_int(cfg, "insize", &shm->insize)) shm->insize = DEFAULT_SHMEM_QUEUESIZE; if (!config_setting_lookup_int(cfg, "outsize", &shm->outsize)) @@ -34,147 +34,181 @@ int shmem_parse(struct node *n, config_setting_t *cfg) { shm->cond_out = false; if (!config_setting_lookup_bool(cfg, "cond_in", &shm->cond_in)) shm->cond_in = false; - config_setting_t *exec_setting = config_setting_lookup(cfg, "exec"); - if (!exec_setting) { + + config_setting_t *exec_cfg = config_setting_lookup(cfg, "exec"); + if (!exec_cfg) shm->exec = NULL; - } else { - if (!config_setting_is_array(exec_setting)) - cerror(exec_setting, "Invalid format for exec"); - shm->exec = malloc(sizeof(char*) * (config_setting_length(exec_setting) + 1)); + else { + if (!config_setting_is_array(exec_cfg)) + cerror(exec_cfg, "Invalid format for exec"); + + shm->exec = alloc(sizeof(char *) * (config_setting_length(exec_cfg) + 1)); + int i; - for (i = 0; i < config_setting_length(exec_setting); i++) { - const char* elm = config_setting_get_string_elem(exec_setting, i); - if (!elm) - cerror(exec_setting, "Invalid format for exec"); - shm->exec[i] = strdup(elm); + for (i = 0; i < config_setting_length(exec_cfg); i++) { + shm->exec[i] = config_setting_get_string_elem(exec_cfg, i); + if (!shm->exec[i]) + cerror(exec_cfg, "Invalid format for exec"); } + shm->exec[i] = NULL; } return 0; } -int shmem_open(struct node *n) { +int shmem_open(struct node *n) +{ struct shmem *shm = n->_vd; - - int r = shm_open(shm->name, O_RDWR|O_CREAT, 0600); - if (r < 0) + int ret; + size_t len; + + shm->fd = shm_open(shm->name, O_RDWR | O_CREAT, 0600); + if (shm->fd < 0) serror("Opening shared memory object failed"); - - shm->fd = r; - size_t len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size); - if (ftruncate(shm->fd, len) < 0) + + len = shmem_total_size(shm->insize, shm->outsize, shm->sample_size); + + ret = ftruncate(shm->fd, len); + if (ret < 0) serror("Setting size of shared memory object failed"); - shm->base = mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_SHARED, shm->fd, 0); + shm->base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, shm->fd, 0); if (shm->base == MAP_FAILED) serror("Mapping shared memory failed"); shm->manager = memtype_managed_init(shm->base, len); shm->shared = memory_alloc(shm->manager, sizeof(struct shmem_shared)); if (!shm->shared) - error("Shm shared struct allocation failed (not enough memory?)"); + error("Shared memory shared struct allocation failed (not enough memory?)"); + memset(shm->shared, 0, sizeof(struct shmem_shared)); shm->shared->len = len; shm->shared->cond_in = shm->cond_in; shm->shared->cond_out = shm->cond_out; - if (shm->cond_in) { - if (queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager) < 0) - error("Shm queue allocation failed (not enough memory?)"); - } else { - if (queue_init(&shm->shared->in.q, shm->insize, shm->manager) < 0) - error("Shm queue allocation failed (not enough memory?)"); - } - if (shm->cond_out) { - if (queue_signalled_init(&shm->shared->out.qs, shm->outsize, shm->manager) < 0) - error("Shm queue allocation failed (not enough memory?)"); - } else { - if (queue_init(&shm->shared->out.q, shm->outsize, shm->manager) < 0) - error("Shm queue allocation failed (not enough memory?)"); - } - if (pool_init(&shm->shared->pool, shm->insize+shm->outsize, SAMPLE_LEN(shm->sample_size), shm->manager) < 0) - error("Shm pool allocation failed (not enough memory?)"); + + ret = shm->cond_in ? queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager) + : queue_init(&shm->shared->in.q, shm->insize, shm->manager); + if (ret) + error("Shared memory queue allocation failed (not enough memory?)"); + + ret = shm->cond_out ? queue_signalled_init(&shm->shared->out.qs, shm->outsize, shm->manager) + : queue_init(&shm->shared->out.q, shm->outsize, shm->manager); + if (ret) + error("Shared memory queue allocation failed (not enough memory?)"); + + ret = pool_init(&shm->shared->pool, shm->insize+shm->outsize, SAMPLE_LEN(shm->sample_size), shm->manager); + if (ret) + error("Shared memory pool allocation failed (not enough memory?)"); pthread_barrierattr_init(&shm->shared->start_attr); pthread_barrierattr_setpshared(&shm->shared->start_attr, PTHREAD_PROCESS_SHARED); pthread_barrier_init(&shm->shared->start_bar, &shm->shared->start_attr, 2); - if (shm->exec && !spawn(shm->exec[0], shm->exec)) - serror("Failed to spawn external program"); + if (shm->exec) { + ret = spawn(shm->exec[0], shm->exec); + if (!ret) + serror("Failed to spawn external program"); + } pthread_barrier_wait(&shm->shared->start_bar); + return 0; } -int shmem_close(struct node *n) { +int shmem_close(struct node *n) +{ struct shmem* shm = n->_vd; - size_t len = shm->shared->len; + int ret; + atomic_store_explicit(&shm->shared->node_stopped, 1, memory_order_relaxed); + if (shm->cond_out) { pthread_mutex_lock(&shm->shared->out.qs.mt); pthread_cond_broadcast(&shm->shared->out.qs.ready); pthread_mutex_unlock(&shm->shared->out.qs.mt); } + /* Don't destroy the data structures yet, since the other process might * still be using them. Once both processes are done and have unmapped the * memory, it will be freed anyway. */ - int r = munmap(shm->base, len); - if (r != 0) - return r; + ret = munmap(shm->base, shm->shared->len); + if (ret != 0) + return ret; + return shm_unlink(shm->name); } -int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) { +int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) +{ struct shmem *shm = n->_vd; - int r; - if (shm->cond_in) - r = queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt); - else - r = queue_pull_many(&shm->shared->in.q, (void**) smps, cnt); - if (!r && atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) + + int ret; + + ret = shm->cond_in ? queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt) + : queue_pull_many(&shm->shared->in.q, (void**) smps, cnt); + + if (ret <= 0) + return ret; + + /* Check if remote process is still running */ + ret = atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed); + if (ret) return -1; - return r; + + return ret; } -int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) { +int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) +{ struct shmem *shm = n->_vd; - - /* Samples need to be copied to the shared pool first */ - struct sample *shared_smps[cnt]; - int avail = sample_alloc(&shm->shared->pool, shared_smps, cnt); + struct sample *shared_smps[cnt]; /**< Samples need to be copied to the shared pool first */ + int avail, pushed, len; + + avail = sample_alloc(&shm->shared->pool, shared_smps, cnt); if (avail != cnt) warn("Pool underrun for shmem node %s", shm->name); + for (int i = 0; i < avail; i++) { /* Since the node isn't in shared memory, the source can't be accessed */ shared_smps[i]->source = NULL; shared_smps[i]->sequence = smps[i]->sequence; shared_smps[i]->ts = smps[i]->ts; - int len = MIN(smps[i]->length, shared_smps[i]->capacity); + + len = MIN(smps[i]->length, shared_smps[i]->capacity); if (len != smps[i]->length) - warn("Losing data because of sample capacity mismatch in shmem node %s", shm->name); + warn("Losing data because of sample capacity mismatch in node %s", node_name(n)); + memcpy(shared_smps[i]->data, smps[i]->data, len*sizeof(smps[0]->data[0])); + shared_smps[i]->length = len; + sample_get(shared_smps[i]); } - int pushed; + if (atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) { for (int i = 0; i < avail; i++) sample_put(shared_smps[i]); + return -1; } - if (shm->cond_out) - pushed = queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail); - else - pushed = queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail); + + pushed = shm->cond_out ? queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail) + : queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail); + if (pushed != avail) - warn("Outqueue overrun for shmem node %s", shm->name); + warn("Outgoing queue overrun for node %s", node_name(n)); + return pushed; } -char *shmem_print(struct node *n) { +char * shmem_print(struct node *n) +{ struct shmem *shm = n->_vd; char *buf = NULL; + strcatf(&buf, "name=%s, insize=%d, outsize=%d, sample_size=%d", shm->name, shm->insize, shm->outsize, shm->sample_size); + return buf; }; diff --git a/lib/path.c b/lib/path.c index bf643ac3e..78befd321 100644 --- a/lib/path.c +++ b/lib/path.c @@ -453,7 +453,7 @@ int path_reverse(struct path *p, struct path *r) for (size_t i = 0; i < list_length(&p->hooks); i++) { struct hook *h = list_at(&p->hooks, i); - struct hook hc = {.state = STATE_DESTROYED}; + struct hook hc = { .state = STATE_DESTROYED }; ret = hook_init(&hc, h->_vt, p); if (ret) diff --git a/lib/queue_signalled.c b/lib/queue_signalled.c index 5e0c59a1b..f6f535e36 100644 --- a/lib/queue_signalled.c +++ b/lib/queue_signalled.c @@ -9,37 +9,49 @@ int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem) { - int r = queue_init(&qs->q, size, mem); - if (r < 0) - return r; + int ret; + + ret = queue_init(&qs->q, size, mem); + if (ret < 0) + return ret; + pthread_mutexattr_init(&qs->mtattr); pthread_mutexattr_setpshared(&qs->mtattr, PTHREAD_PROCESS_SHARED); pthread_condattr_init(&qs->readyattr); pthread_condattr_setpshared(&qs->readyattr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(&qs->mt, &qs->mtattr); pthread_cond_init(&qs->ready, &qs->readyattr); + return 0; } int queue_signalled_destroy(struct queue_signalled *qs) { - int r = queue_destroy(&qs->q); - if (r < 0) - return r; + int ret; + + ret = queue_destroy(&qs->q); + if (ret < 0) + return ret; + pthread_cond_destroy(&qs->ready); pthread_mutex_destroy(&qs->mt); + return 0; } int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt) { - int r = queue_push_many(&qs->q, ptr, cnt); - if (r > 0) { - pthread_mutex_lock(&qs->mt); - pthread_cond_broadcast(&qs->ready); - pthread_mutex_unlock(&qs->mt); - } - return r; + int ret; + + ret = queue_push_many(&qs->q, ptr, cnt); + if (ret < 0) + return ret; + + pthread_mutex_lock(&qs->mt); + pthread_cond_broadcast(&qs->ready); + pthread_mutex_unlock(&qs->mt); + + return ret; } int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt) @@ -50,5 +62,6 @@ int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cn pthread_cond_wait(&qs->ready, &qs->mt); pthread_mutex_unlock(&qs->mt); pthread_cleanup_pop(0); + return queue_pull_many(&qs->q, ptr, cnt); } diff --git a/lib/shmem.c b/lib/shmem.c index c796a05f8..3b44eb0f6 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -17,79 +17,100 @@ size_t shmem_total_size(int insize, int outsize, int sample_size) { - // we have the constant const of the memtype header + /* We have the constant const of the memtype header */ return sizeof(struct memtype) - // and the shared struct itself + /* and the shared struct itself */ + sizeof(struct shmem_shared) - // the size of the 2 queues and the queue for the pool - + (insize + outsize) * (2*sizeof(struct queue_cell)) - // the size of the pool + /* the size of the 2 queues and the queue for the pool */ + + (insize + outsize) * (2 * sizeof(struct queue_cell)) + /* the size of the pool */ + (insize + outsize) * kernel_get_cacheline_size() * CEIL(SAMPLE_LEN(sample_size), kernel_get_cacheline_size()) - // a memblock for each allocation (1 shmem_shared, 3 queues, 1 pool) + /* a memblock for each allocation (1 shmem_shared, 3 queues, 1 pool) */ + 5 * sizeof(struct memblock) - // and some extra buffer for alignment + /* and some extra buffer for alignment */ + 1024; } -struct shmem_shared* shmem_shared_open(const char *name, void **base_ptr) +struct shmem_shared * shmem_shared_open(const char *name, void **base_ptr) { - int fd = shm_open(name, O_RDWR, 0); + struct shmem_shared *shm; + size_t len, newlen; + void *base; + char *cptr; + int fd, ret; + + fd = shm_open(name, O_RDWR, 0); if (fd < 0) return NULL; + /* Only map the first part (shmem_shared) first, read the correct length, * the map it with this length. */ - size_t len = sizeof(struct memtype) + sizeof(struct memblock) + sizeof(struct shmem_shared); - void *base = mmap(NULL, len, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); + len = sizeof(struct memtype) + sizeof(struct memblock) + sizeof(struct shmem_shared); + + base = mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (base == MAP_FAILED) return NULL; + /* This relies on the behaviour of the node and the allocator; it assumes * that memtype_managed is used and the shmem_shared is the first allocated object */ - char *cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); - struct shmem_shared *shm = (struct shmem_shared *) cptr; - size_t newlen = shm->len; - if (munmap(base, len)) + cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); + shm = (struct shmem_shared *) cptr; + newlen = shm->len; + + ret = munmap(base, len); + if (ret) return NULL; - base = mmap(NULL, newlen, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); + + base = mmap(NULL, newlen, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (base == MAP_FAILED) return NULL; + /* Adress might have moved */ cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); if (base_ptr) *base_ptr = base; + shm = (struct shmem_shared *) cptr; + pthread_barrier_wait(&shm->start_bar); + return shm; } int shmem_shared_close(struct shmem_shared *shm, void *base) { atomic_store_explicit(&shm->ext_stopped, 1, memory_order_relaxed); + if (shm->cond_in) { pthread_mutex_lock(&shm->in.qs.mt); pthread_cond_broadcast(&shm->in.qs.ready); pthread_mutex_unlock(&shm->in.qs.mt); } + return munmap(base, shm->len); } int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) { - int r; - if (shm->cond_out) - r = queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt); - else - r = queue_pull_many(&shm->out.q, (void **) smps, cnt); - if (!r && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed)) + int ret; + + ret = shm->cond_out ? queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt) + : queue_pull_many(&shm->out.q, (void **) smps, cnt); + + if (!ret && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed)) return -1; - return r; + + return ret; } int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) { - if (atomic_load_explicit(&shm->node_stopped, memory_order_relaxed)) + int ret; + + ret = atomic_load_explicit(&shm->node_stopped, memory_order_relaxed); + if (ret) return -1; - if (shm->cond_in) - return queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt); - else - return queue_push_many(&shm->in.q, (void **) smps, cnt); + + return shm->cond_in ? queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt) + : queue_push_many(&shm->in.q, (void **) smps, cnt); } diff --git a/lib/super_node.c b/lib/super_node.c index 62a390282..2d05216d1 100644 --- a/lib/super_node.c +++ b/lib/super_node.c @@ -270,7 +270,7 @@ int super_node_parse(struct super_node *sn, config_setting_t *cfg) list_push(&sn->paths, memdup(&p, sizeof(p))); if (p.reverse) { - struct path r = {.state = STATE_DESTROYED}; + struct path r = { .state = STATE_DESTROYED }; ret = path_init(&r, sn); if (ret) diff --git a/lib/utils.c b/lib/utils.c index 50ba3756a..2a9025990 100644 --- a/lib/utils.c +++ b/lib/utils.c @@ -293,10 +293,12 @@ void signals_init(void (*cb)(int signal, siginfo_t *sinfo, void *ctx)) sigemptyset(&sa_quit.sa_mask); sigaction(SIGINT, &sa_quit, NULL); sigaction(SIGTERM, &sa_quit, NULL); + struct sigaction sa_chld = { .sa_flags = 0, .sa_handler = SIG_IGN }; + sigaction(SIGCHLD, &sa_chld, NULL); } diff --git a/src/shmem.c b/src/shmem.c index 33f349020..1ab347261 100644 --- a/src/shmem.c +++ b/src/shmem.c @@ -38,6 +38,8 @@ void quit(int sig) int main(int argc, char* argv[]) { + int readcnt, writecnt, avail; + if (argc != 2) { usage(); return 1; @@ -51,30 +53,37 @@ int main(int argc, char* argv[]) signal(SIGTERM, quit); struct sample *insmps[VECTORIZE], *outsmps[VECTORIZE]; while (1) { - int r, w; - r = shmem_shared_read(shared, insmps, VECTORIZE); - if (r == -1) { - printf("node stopped, exiting\n"); + + readcnt = shmem_shared_read(shared, insmps, VECTORIZE); + if (readcnt == -1) { + printf("Node stopped, exiting\n"); break; } - int avail = sample_alloc(&shared->pool, outsmps, r); - if (avail < r) - warn("pool underrun (%d/%d)\n", avail, r); - for (int i = 0; i < r; i++) { - printf("got sample: seq %d recv %ld.%ld\n", insmps[i]->sequence, - insmps[i]->ts.received.tv_sec, insmps[i]->ts.received.tv_nsec); - } + + avail = sample_alloc(&shared->pool, outsmps, readcnt); + if (avail < readcnt) + warn("Pool underrun: %d / %d\n", avail, readcnt); + + for (int i = 0; i < readcnt; i++) + sample_io_villas_fprint(stdout, insmps[i], SAMPLE_IO_ALL); + for (int i = 0; i < avail; i++) { outsmps[i]->sequence = insmps[i]->sequence; outsmps[i]->ts = insmps[i]->ts; + int len = MIN(insmps[i]->length, outsmps[i]->capacity); - memcpy(outsmps[i]->data, insmps[i]->data, len*sizeof(insmps[0]->data[0])); + memcpy(outsmps[i]->data, insmps[i]->data, SAMPLE_DATA_LEN(len)); + outsmps[i]->length = len; } - for (int i = 0; i < r; i++) + + for (int i = 0; i < readcnt; i++) sample_put(insmps[i]); - w = shmem_shared_write(shared, outsmps, avail); - if (w < avail) - warn("short write (%d/%d)\n", w, r); + + writecnt = shmem_shared_write(shared, outsmps, avail); + if (writecnt < avail) + warn("Short write"); + + info("Read / Write: %d / %d", readcnt, writecnt); } }