diff --git a/include/sample.h b/include/sample.h index a73c016ce..3a1ff945c 100644 --- a/include/sample.h +++ b/include/sample.h @@ -4,7 +4,7 @@ * @author Steffen Vogel * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC * This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential. - * Unauthorized copying of this file, via any medium is strictly prohibited. + * Unauthorized copying of this file, via any medium is strictly prohibited. */ #ifndef _SAMPLE_H_ @@ -16,6 +16,9 @@ #include #include +/* Forward declarations */ +struct pool; + /** The length of a sample datastructure with \p values values in bytes. */ #define SAMPLE_LEN(values) (sizeof(struct sample) + SAMPLE_DATA_LEN(values)) @@ -35,8 +38,9 @@ enum sample_flags { }; struct sample { - int length; /**< The number of values in sample::values. */ int sequence; /**< The sequence number of this sample. */ + int length; /**< The number of values in sample::values which are valid. */ + int capacity; /**< The number of values in sample::values for which memory is reserved. */ /** All timestamps are seconds / nano seconds after 1.1.1970 UTC */ struct { @@ -44,7 +48,7 @@ struct sample { struct timespec received; /**< The point in time when this data was received. */ struct timespec sent; /**< The point in time this data was send for the last time. */ } ts; - + /** The values. */ union { float f; /**< Floating point values (note msg::endian) */ @@ -52,6 +56,9 @@ struct sample { } values[]; }; +/** Request \p cnt samples from memory pool \p p and initialize them. */ +int sample_get_many(struct pool *p, struct sample *smps[], int cnt); + /** Print a sample in human readable form to a file stream. * * @param buf A character buffer of len bytes. diff --git a/lib/sample.c b/lib/sample.c index 8206bb97e..cd5be7873 100644 --- a/lib/sample.c +++ b/lib/sample.c @@ -8,9 +8,23 @@ #include +#include "pool.h" #include "sample.h" #include "timing.h" +int sample_get_many(struct pool *p, struct sample *smps[], int cnt) { + int ret; + + ret = pool_get_many(p, (void **) smps, cnt); + if (ret < 0) + return ret; + + for (int i = 0; i < ret; i++) + smps[i]->capacity = (p->blocksz - sizeof(**smps)) / sizeof(smps[0]->values[0]); + + return ret; +} + int sample_print(char *buf, size_t len, struct sample *s, int flags) { size_t off = snprintf(buf, len, "%llu", (unsigned long long) s->ts.origin.tv_sec); @@ -91,11 +105,11 @@ int sample_scan(const char *line, struct sample *s, int *fl) end++; } - for (s->length = 0, ptr = end; ; - s->length++, ptr = end) { + for (ptr = end, s->length = 0; + s->length < s->capacity; + ptr = end, s->length++) { - /** @todo We only support floating point values at the moment */ - s->values[s->length].f = strtod(ptr, &end); + s->values[s->length].f = strtod(ptr, &end); /** @todo We only support floating point values at the moment */ if (end == ptr) /* there are no valid FP values anymore */ break; diff --git a/lib/socket.c b/lib/socket.c index 2bd56fc60..cbfafd1bf 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -94,24 +94,23 @@ int socket_deinit() char * socket_print(struct node *n) { struct socket *s = n->_vd; - char *layer = NULL, *hdr = NULL, *buf; + char *layer = NULL, *header = NULL, *buf; switch (s->layer) { - case SOCKET_LAYER_UDP: layer = "udp"; break; + case SOCKET_LAYER_UDP: layer = "udp"; break; case SOCKET_LAYER_IP: layer = "ip"; break; case SOCKET_LAYER_ETH: layer = "eth"; break; } switch (s->header) { - case SOCKET_HEADER_GTNET_SKT: hdr = "RTDS GTNETv2-SKT"; break; - case SOCKET_HEADER_DEFAULT: - default: hdr = "VILLASnode"; break; + case SOCKET_HEADER_GTNET_SKT: header = "gtnet-skt"; break; + case SOCKET_HEADER_DEFAULT: header = "villas"; break; } char *local = socket_print_addr((struct sockaddr *) &s->local); char *remote = socket_print_addr((struct sockaddr *) &s->remote); - buf = strf("layer=%s, header=%s, local=%s, remote=%s", layer, hdr, local, remote); + buf = strf("layer=%s, header=%s, local=%s, remote=%s", layer, header, local, remote); free(local); free(remote); @@ -212,7 +211,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; - int samples, ret, received; + int samples, ret, received, length; ssize_t bytes; if (s->header == SOCKET_HEADER_GTNET_SKT) { @@ -224,28 +223,32 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) struct sample *smp = smps[0]; /* Receive next sample */ - bytes = recv(s->sd, &smp->values[0], SAMPLE_DATA_LEN(smp->length), MSG_PEEK | MSG_TRUNC); + bytes = recv(s->sd, &smp->values[0], SAMPLE_DATA_LEN(smp->capacity), MSG_TRUNC); if (bytes == 0) error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */ else if (bytes < 0) serror("Failed recv from node %s", node_name(n)); else if (bytes % 4 != 0) { - warn("Packet size is invalid. Must be multiple of 4 bytes."); + warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes); recv(s->sd, NULL, 0, 0); /* empty receive buffer */ return -1; } - received = bytes / sizeof(smp->values[0]); - if (received > smp->length) { - warn("Node %s received more samples than supported. Dropping %u samples", node_name(n), received - smp->length); - received = smp->length; + debug(3, "Received %zd bytes", bytes); + + length = bytes / SAMPLE_DATA_LEN(1); + if (length > smp->capacity) { + warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity); + length = smp->capacity; } /** @todo Should we generate sequence no here manually? * Or maybe optinally use the first data value as a sequence? * However this would require the RTDS model to be changed. */ smp->sequence = 0; - smp->length = received; + smp->length = length; + + received = 1; /* GTNET-SKT sends every sample in a single packet */ } else { struct msg msgs[cnt]; @@ -258,8 +261,8 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) /* Peak into message header of the first sample and to get total packet size. */ bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); if (bytes < sizeof(struct msg) || bytes % 4 != 0) { - warn("Packet size is invalid. Must be multiple of 4 bytes."); - recv(s->sd, &hdr, sizeof(struct msg), 0); /* empty receive buffer */ + warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes); + recv(s->sd, NULL, 0, 0); /* empty receive buffer */ return -1; } @@ -275,7 +278,6 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) msg_swap(&hdr); samples = bytes / MSG_LEN(hdr.values); - if (samples > cnt) { warn("Node %s received more samples than supported. Dropping %u samples", node_name(n), samples - cnt); samples = cnt; @@ -290,6 +292,9 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values); mhdr.msg_iovlen += 2; + + if (hdr.values > smps[i]->capacity) + error("Node %s received more values than supported. Dropping %d values.", node_name(n), hdr.values - smps[i]->capacity); } /* Receive message from socket */ diff --git a/src/pipe.c b/src/pipe.c index 732ea8aee..4a651fefd 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -79,7 +79,7 @@ void * send_loop(void *ctx) if (ret < 0) error("Failed to allocate memory for receive pool."); - ret = pool_get_many(&send_pool, (void **) smps, node->vectorize); + ret = sample_get_many(&send_pool, smps, node->vectorize); if (ret < 0) error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret); @@ -118,7 +118,7 @@ void * recv_loop(void *ctx) if (ret < 0) error("Failed to allocate memory for receive pool."); - ret = pool_get_many(&recv_pool, (void **) smps, node->vectorize); + ret = sample_get_many(&recv_pool, smps, node->vectorize); if (ret < 0) error("Failed to get %u samples out of receive pool (%d).", node->vectorize, ret);