1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

properly handle failed read / writes to nodes

This commit is contained in:
Steffen Vogel 2017-08-22 12:31:12 +02:00
parent b9f5f350a8
commit 4e2cdc8efd
2 changed files with 25 additions and 3 deletions

View file

@ -199,12 +199,18 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt)
if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) {
while (cnt - nread > 0) {
readd = n->_vt->read(n, &smps[nread], MIN(cnt - nread, n->_vt->vectorize));
if (readd < 0)
return readd;
nread += readd;
debug(LOG_NODES | 5, "Received %u samples from node %s", readd, node_name(n));
}
}
else {
nread = n->_vt->read(n, smps, cnt);
if (nread < 0)
return nread;
debug(LOG_NODES | 5, "Received %u samples from node %s", nread, node_name(n));
}
@ -225,12 +231,18 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt)
if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) {
while (cnt - nsent > 0) {
sent = n->_vt->write(n, &smps[nsent], MIN(cnt - nsent, n->_vt->vectorize));
if (sent < 0)
return sent;
nsent += sent;
debug(LOG_NODES | 5, "Sent %u samples to node %s", sent, node_name(n));
}
}
else {
nsent = n->_vt->write(n, smps, cnt);
if (nsent < 0)
return nsent;
debug(LOG_NODES | 5, "Sent %u samples to node %s", nsent, node_name(n));
}

View file

@ -119,6 +119,10 @@ static void * send_loop(void *ctx)
continue;
sent = node_write(node, smps, len);
if (sent < 0) {
warn("Failed to sent samples to node %s: reason=%d", node_name(node), sent);
continue;
}
cnt += sent;
if (sendd.limit > 0 && cnt >= sendd.limit)
@ -145,7 +149,7 @@ leave: if (io_eof(&io)) {
static void * recv_loop(void *ctx)
{
int ret, cnt = 0;
int recv, ret, cnt = 0;
struct sample *smps[node->vectorize];
/* Initialize memory */
@ -158,7 +162,12 @@ static void * recv_loop(void *ctx)
error("Failed to allocate %u samples from receive pool.", node->vectorize);
for (;;) {
int recv = node_read(node, smps, node->vectorize);
recv = node_read(node, smps, node->vectorize);
if (recv < 0) {
warn("Failed to receive samples from node %s: reason=%d", node_name(node), recv);
continue;
}
struct timespec now = time_now();
/* Fix timestamps */
@ -295,6 +304,7 @@ check: if (optarg == endptr)
error("Node '%s' does not exist!", nodestr);
#ifdef WITH_WEBSOCKET
/* Only start web subsystem if villas-pipe is used with a websocket node */
if (node->_vt->start == websocket_start)
web_start(&sn.web);
#endif
@ -312,7 +322,7 @@ check: if (optarg == endptr)
ret = node_start(node);
if (ret)
error("Failed to start node: %s", node_name(node));
error("Failed to start node %s: reason=%d", node_name(node), ret);
/* Start threads */
if (recvv.enabled)