From 4e2cdc8efd7a4f3503a8ed78b8fee956b7f71a6e Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 22 Aug 2017 12:31:12 +0200 Subject: [PATCH] properly handle failed read / writes to nodes --- lib/node.c | 12 ++++++++++++ src/pipe.c | 16 +++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/lib/node.c b/lib/node.c index 7d89eec18..4299eec24 100644 --- a/lib/node.c +++ b/lib/node.c @@ -199,12 +199,18 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt) if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) { while (cnt - nread > 0) { readd = n->_vt->read(n, &smps[nread], MIN(cnt - nread, n->_vt->vectorize)); + if (readd < 0) + return readd; + nread += readd; debug(LOG_NODES | 5, "Received %u samples from node %s", readd, node_name(n)); } } else { nread = n->_vt->read(n, smps, cnt); + if (nread < 0) + return nread; + debug(LOG_NODES | 5, "Received %u samples from node %s", nread, node_name(n)); } @@ -225,12 +231,18 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt) if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) { while (cnt - nsent > 0) { sent = n->_vt->write(n, &smps[nsent], MIN(cnt - nsent, n->_vt->vectorize)); + if (sent < 0) + return sent; + nsent += sent; debug(LOG_NODES | 5, "Sent %u samples to node %s", sent, node_name(n)); } } else { nsent = n->_vt->write(n, smps, cnt); + if (nsent < 0) + return nsent; + debug(LOG_NODES | 5, "Sent %u samples to node %s", nsent, node_name(n)); } diff --git a/src/pipe.c b/src/pipe.c index 1cb7fcb30..878ce1d93 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -119,6 +119,10 @@ static void * send_loop(void *ctx) continue; sent = node_write(node, smps, len); + if (sent < 0) { + warn("Failed to sent samples to node %s: reason=%d", node_name(node), sent); + continue; + } cnt += sent; if (sendd.limit > 0 && cnt >= sendd.limit) @@ -145,7 +149,7 @@ leave: if (io_eof(&io)) { static void * recv_loop(void *ctx) { - int ret, cnt = 0; + int recv, ret, cnt = 0; struct sample *smps[node->vectorize]; /* Initialize memory */ @@ -158,7 +162,12 @@ static void * recv_loop(void *ctx) error("Failed to allocate %u samples from receive pool.", node->vectorize); for (;;) { - int recv = node_read(node, smps, node->vectorize); + recv = node_read(node, smps, node->vectorize); + if (recv < 0) { + warn("Failed to receive samples from node %s: reason=%d", node_name(node), recv); + continue; + } + struct timespec now = time_now(); /* Fix timestamps */ @@ -295,6 +304,7 @@ check: if (optarg == endptr) error("Node '%s' does not exist!", nodestr); #ifdef WITH_WEBSOCKET + /* Only start web subsystem if villas-pipe is used with a websocket node */ if (node->_vt->start == websocket_start) web_start(&sn.web); #endif @@ -312,7 +322,7 @@ check: if (optarg == endptr) ret = node_start(node); if (ret) - error("Failed to start node: %s", node_name(node)); + error("Failed to start node %s: reason=%d", node_name(node), ret); /* Start threads */ if (recvv.enabled)