1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

feat: Add basic tcp connection to socket node type

Signed-off-by: Jitpanu Maneeratpongsuk <jitpanu.maneeratpongsuk@rwth-aachen.de>
This commit is contained in:
Jitpanu Maneeratpongsuk 2024-12-19 15:09:19 +00:00 committed by al3xa23
parent 560af2961a
commit 5df3fabcd1
4 changed files with 90 additions and 6 deletions

View file

@ -22,6 +22,7 @@ class NodeCompat;
struct Socket {
int sd; // The socket descriptor
int clt_sd; // TCP client socket descriptor
int verify_source; // Verify the source address of incoming packets against socket::remote.
enum SocketLayer

View file

@ -35,7 +35,7 @@ union sockaddr_union {
namespace villas {
namespace node {
enum class SocketLayer { ETH, IP, UDP, UNIX };
enum class SocketLayer { ETH, IP, UDP, UNIX, TCP_CLIENT, TCP_SERVER};
/* Generate printable socket address depending on the address family
*

View file

@ -28,6 +28,9 @@
#include <villas/kernel/nl.hpp>
#endif // WITH_NETEM
#define MAX_CONNECTION_RETRIES 5
#define RETRIES_DELAY 2
using namespace villas;
using namespace villas::utils;
using namespace villas::node;
@ -97,6 +100,11 @@ char *villas::node::socket_print(NodeCompat *n) {
case SocketLayer::UNIX:
layer = "unix";
break;
case SocketLayer::TCP_SERVER:
case SocketLayer::TCP_CLIENT:
layer = "tcp";
break;
}
char *local = socket_print_addr((struct sockaddr *)&s->in.saddr);
@ -195,6 +203,11 @@ int villas::node::socket_start(NodeCompat *n) {
s->sd = socket(s->in.saddr.sa.sa_family, SOCK_DGRAM, 0);
break;
case SocketLayer::TCP_SERVER:
case SocketLayer::TCP_CLIENT:
s->sd = socket(s->in.saddr.sa.sa_family, SOCK_STREAM, 0);
break;
default:
throw RuntimeError("Invalid socket type!");
}
@ -233,9 +246,43 @@ int villas::node::socket_start(NodeCompat *n) {
addrlen = sizeof(s->in.saddr);
}
ret = bind(s->sd, (struct sockaddr *)&s->in.saddr, addrlen);
if (ret < 0)
throw SystemError("Failed to bind socket");
if (s->layer == SocketLayer::TCP_CLIENT) {
//Attempt to connect to TCP server
int retries = 0;
while (retries < MAX_CONNECTION_RETRIES) {
n->logger->info("Attempting({}) to connect to server..", retries + 1);
ret = connect(s->sd, (struct sockaddr *)&s->out.saddr, addrlen);
if (ret == 0) {
break;
} else {
retries++;
if (retries < MAX_CONNECTION_RETRIES) {
sleep(RETRIES_DELAY);
}
}
}
} else {
ret = bind(s->sd, (struct sockaddr *)&s->in.saddr, addrlen);
}
if (ret < 0) {
if (s->layer == SocketLayer::TCP_CLIENT) {
throw SystemError("Failed to connect to TCP server");
} else {
throw SystemError("Failed to bind socket");
}
}
//TCP Server listen for client connection
if (s->layer == SocketLayer::TCP_SERVER) {
listen(s->sd, 5);
//Accept client connection and get client socket descriptor
s->clt_sd = accept(s->sd, nullptr, nullptr);
if (s->clt_sd < 0) {
throw SystemError("Failed to accept connection");
}
}
if (s->multicast.enabled) {
ret = setsockopt(s->sd, IPPROTO_IP, IP_MULTICAST_LOOP, &s->multicast.loop,
@ -258,6 +305,8 @@ int villas::node::socket_start(NodeCompat *n) {
int prio;
switch (s->layer) {
case SocketLayer::UDP:
case SocketLayer::TCP_SERVER:
case SocketLayer::TCP_CLIENT:
case SocketLayer::IP:
prio = IPTOS_LOWDELAY;
if (setsockopt(s->sd, IPPROTO_IP, IP_TOS, &prio, sizeof(prio)))
@ -316,7 +365,12 @@ int villas::node::socket_stop(NodeCompat *n) {
}
if (s->sd >= 0) {
//Close client socket descriptor
if (s->layer == SocketLayer::TCP_SERVER)
close(s->clt_sd);
ret = close(s->sd);
if (ret)
return ret;
}
@ -340,7 +394,17 @@ int villas::node::socket_read(NodeCompat *n, struct Sample *const smps[],
socklen_t srclen = sizeof(src);
// Receive next sample
bytes = recvfrom(s->sd, s->in.buf, s->in.buflen, 0, &src.sa, &srclen);
if (s->layer == SocketLayer::TCP_CLIENT) {
//Receive data from server
bytes = recv(s->sd, s->in.buf, s->in.buflen, 0);
} else if (s->layer == SocketLayer::TCP_SERVER) {
//Receive data from client
bytes = recv(s->clt_sd, s->in.buf, s->in.buflen, 0);
} else {
bytes = recvfrom(s->sd, s->in.buf, s->in.buflen, 0, &src.sa, &srclen);
}
if (bytes < 0) {
if (errno == EINTR)
return -1;
@ -445,8 +509,17 @@ retry:
}
retry2:
bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *)&s->out.saddr,
if (s->layer == SocketLayer::TCP_CLIENT) {
//Send data to TCP server
bytes = send(s->sd, s->out.buf, wbytes, 0);
} else if (s->layer == SocketLayer::TCP_SERVER) {
//Send data to TCP client
bytes = send(s->clt_sd, s->out.buf, wbytes, 0);
} else {
bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *)&s->out.saddr,
addrlen);
}
if (bytes < 0) {
if ((errno == EPERM) || (errno == ENOENT && s->layer == SocketLayer::UNIX))
n->logger->warn("Failed sendto(): {}", strerror(errno));
@ -505,6 +578,10 @@ int villas::node::socket_parse(NodeCompat *n, json_t *json) {
s->layer = SocketLayer::UDP;
else if (!strcmp(layer, "unix") || !strcmp(layer, "local"))
s->layer = SocketLayer::UNIX;
else if (!strcmp(layer, "tcp_client"))
s->layer = SocketLayer::TCP_CLIENT;
else if (!strcmp(layer, "tcp_server"))
s->layer = SocketLayer::TCP_SERVER;
else
throw SystemError("Invalid layer '{}'", layer);
}

View file

@ -156,6 +156,12 @@ int villas::node::socket_parse_address(const char *addr, struct sockaddr *saddr,
hint.ai_protocol = IPPROTO_UDP;
break;
case SocketLayer::TCP_CLIENT:
case SocketLayer::TCP_SERVER:
hint.ai_socktype = SOCK_STREAM;
hint.ai_protocol = IPPROTO_TCP;
break;
default:
throw RuntimeError("Invalid address type");
}