From 7befa753900bc9b4bceef627e164f30ce00520a4 Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Thu, 9 Jan 2025 15:10:25 +0000 Subject: [PATCH] fix: Add reconnect attempt after the connection is broken. Move the establishment of tcp connection out of socket_start so that it will not block other node. Signed-off-by: Jitpanu Maneeratpongsuk --- include/villas/nodes/socket.hpp | 3 ++ lib/nodes/socket.cpp | 85 +++++++++++++++++++-------------- 2 files changed, 51 insertions(+), 37 deletions(-) diff --git a/include/villas/nodes/socket.hpp b/include/villas/nodes/socket.hpp index 671d508f4..1c78de648 100644 --- a/include/villas/nodes/socket.hpp +++ b/include/villas/nodes/socket.hpp @@ -24,6 +24,7 @@ struct Socket { int sd; // The socket descriptor int clt_sd; // TCP client socket descriptor int verify_source; // Verify the source address of incoming packets against socket::remote. + bool tcp_connect = false; // TCP connection status bit enum SocketLayer layer; // The OSI / IP layer which should be used for this socket @@ -69,6 +70,8 @@ int socket_read(NodeCompat *n, struct Sample *const smps[], unsigned cnt); int socket_parse(NodeCompat *n, json_t *json); +void socket_tcp_connection(NodeCompat *n, Socket *s); + char *socket_print(NodeCompat *n); } // namespace node diff --git a/lib/nodes/socket.cpp b/lib/nodes/socket.cpp index 47983df17..6050c1208 100644 --- a/lib/nodes/socket.cpp +++ b/lib/nodes/socket.cpp @@ -28,7 +28,7 @@ #include #endif // WITH_NETEM -#define MAX_CONNECTION_RETRIES 5 +#define MAX_CONNECTION_RETRIES 40 #define RETRIES_DELAY 2 using namespace villas; @@ -249,43 +249,11 @@ int villas::node::socket_start(NodeCompat *n) { addrlen = sizeof(s->in.saddr); } - if (s->layer == SocketLayer::TCP_CLIENT) { - // Attempt to connect to TCP server. - int retries = 0; - while (retries < MAX_CONNECTION_RETRIES) { - n->logger->info("Attempting to connect to server: attempt={}...", retries + 1); - ret = connect(s->sd, (struct sockaddr *)&s->out.saddr, addrlen); - if (ret == 0) { - break; - } else { - retries++; - if (retries < MAX_CONNECTION_RETRIES) { - sleep(RETRIES_DELAY); - } - } - } - - } else { + if (s->layer != SocketLayer::TCP_CLIENT) ret = bind(s->sd, (struct sockaddr *)&s->in.saddr, addrlen); - } - if (ret < 0) { - if (s->layer == SocketLayer::TCP_CLIENT) { - throw SystemError("Failed to connect to TCP server"); - } else { - throw SystemError("Failed to bind socket"); - } - } - - // TCP Server listen for client connection. - if (s->layer == SocketLayer::TCP_SERVER) { - listen(s->sd, 5); - // Accept client connection and get client socket descriptor. - s->clt_sd = accept(s->sd, nullptr, nullptr); - if (s->clt_sd < 0) { - throw SystemError("Failed to accept connection"); - } - } + if (ret < 0) + throw SystemError("Failed to bind socket"); if (s->multicast.enabled) { ret = setsockopt(s->sd, IPPROTO_IP, IP_MULTICAST_LOOP, &s->multicast.loop, @@ -400,9 +368,15 @@ int villas::node::socket_read(NodeCompat *n, struct Sample *const smps[], if (s->layer == SocketLayer::TCP_CLIENT) { // Receive data from server. + if (!s->tcp_connect) + villas::node::socket_tcp_connection(n, s); + bytes = recv(s->sd, s->in.buf, s->in.buflen, 0); } else if (s->layer == SocketLayer::TCP_SERVER) { // Receive data from client. + if (!s->tcp_connect) + villas::node::socket_tcp_connection(n, s); + bytes = recv(s->clt_sd, s->in.buf, s->in.buflen, 0); } else { bytes = recvfrom(s->sd, s->in.buf, s->in.buflen, 0, &src.sa, &srclen); @@ -413,8 +387,12 @@ int villas::node::socket_read(NodeCompat *n, struct Sample *const smps[], return -1; throw SystemError("Failed recvfrom()"); - } else if (bytes == 0) + } else if (bytes == 0) { + if (s->layer == SocketLayer::TCP_CLIENT || s->layer == SocketLayer::TCP_SERVER) + s->tcp_connect = false; + return 0; + } ptr = s->in.buf; @@ -456,6 +434,39 @@ int villas::node::socket_read(NodeCompat *n, struct Sample *const smps[], return ret; } +void villas::node::socket_tcp_connection(NodeCompat *n, Socket *s) { + int ret; + if (s->layer == SocketLayer::TCP_CLIENT) { + close(s->sd); + s->sd = socket(s->in.saddr.sa.sa_family, SOCK_STREAM, 0); + // Attemp to connect to TCP server. + int retries = 0; + while (retries < MAX_CONNECTION_RETRIES) { + n->logger->info("Attempting to connect to server: attempt={}...", retries + 1); + ret = connect(s->sd, (struct sockaddr *)&s->out.saddr, sizeof(s->in.saddr)); + if (ret == 0) { + s->tcp_connect = true; + break; + } else { + retries++; + if (retries < MAX_CONNECTION_RETRIES) { + sleep(RETRIES_DELAY); + } + } + } + if (ret < 0) + throw SystemError("Failed to conenct to TCP server"); + } else if (s->layer == SocketLayer::TCP_SERVER) { + listen(s->sd, 5); + // Accept client connection and get client socket descriptor. + s->clt_sd = accept(s->sd, nullptr, nullptr); + if (s->clt_sd < 0) { + throw SystemError("Failed to accept connection"); + } + s->tcp_connect = true; + } +} + int villas::node::socket_write(NodeCompat *n, struct Sample *const smps[], unsigned cnt) { auto *s = n->getData();