From c9c1afcf35d84cd4416271fb5c70899be3f91dcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Sun, 2 Dec 2007 11:09:48 +0000 Subject: [PATCH] Add TCP client --- tcp.c | 246 +++++++++++++++++++++++++++++++++++++++++++++++----------- tcp.h | 13 +++- 2 files changed, 213 insertions(+), 46 deletions(-) diff --git a/tcp.c b/tcp.c index d7b64bf5..2f38ac97 100644 --- a/tcp.c +++ b/tcp.c @@ -32,6 +32,8 @@ #include "dispatch.h" #include "tcp.h" +static void tcp_client_reconnect_timeout(void *aux, int64_t now); + /* * printf data on a TCP queue @@ -314,18 +316,23 @@ tcp_output_queue(tcp_session_t *ses, tcp_queue_t *dst, tcp_queue_t *src) void tcp_disconnect(tcp_session_t *ses, int err) { - tcp_server_t *srv = ses->tcp_server; - tcp_flush_queue(&ses->tcp_q_low); tcp_flush_queue(&ses->tcp_q_hi); - srv->tcp_callback(TCP_DISCONNECT, ses); + ses->tcp_callback(TCP_DISCONNECT, ses); syslog(LOG_INFO, "%s: %s: disconnected -- %s", - srv->tcp_server_name, ses->tcp_peer_txt, strerror(err)); + ses->tcp_name, ses->tcp_peer_txt, strerror(err)); close(dispatch_delfd(ses->tcp_dispatch_handle)); - free(ses); + + if(ses->tcp_server != NULL) { + free(ses->tcp_name); + free(ses); + } else { + /* Try to reconnect in 5 seconds */ + dtimer_arm(&ses->tcp_timer, tcp_client_reconnect_timeout, ses, 5); + } } @@ -336,7 +343,6 @@ static void tcp_socket_callback(int events, void *opaque, int fd) { tcp_session_t *ses = opaque; - tcp_server_t *srv = ses->tcp_server; if(events & DISPATCH_ERR) { tcp_disconnect(ses, ECONNRESET); @@ -344,7 +350,7 @@ tcp_socket_callback(int events, void *opaque, int fd) } if(events & DISPATCH_READ) - srv->tcp_callback(TCP_INPUT, ses); + ses->tcp_callback(TCP_INPUT, ses); if(events & DISPATCH_WRITE) tcp_transmit(ses); @@ -353,15 +359,193 @@ tcp_socket_callback(int events, void *opaque, int fd) /** - * TCP connect callback + * Setup a TCP connection, common code between server and client + */ + +static void +tcp_start_session(tcp_session_t *ses) +{ + int val; + struct sockaddr_in *si = (struct sockaddr_in *)&ses->tcp_peer_addr; + + fcntl(ses->tcp_fd, F_SETFL, fcntl(ses->tcp_fd, F_GETFL) | O_NONBLOCK); + + val = 1; + setsockopt(ses->tcp_fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)); + + val = 30; + setsockopt(ses->tcp_fd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)); + + val = 15; + setsockopt(ses->tcp_fd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)); + + val = 5; + setsockopt(ses->tcp_fd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)); + + val = 1; + setsockopt(ses->tcp_fd, SOL_TCP, TCP_NODELAY, &val, sizeof(val)); + + tcp_init_queue(&ses->tcp_q_hi, 20 * 1000 * 1000); + tcp_init_queue(&ses->tcp_q_low, 20 * 1000 * 1000); + + snprintf(ses->tcp_peer_txt, sizeof(ses->tcp_peer_txt), "%s:%d", + inet_ntoa(si->sin_addr), ntohs(si->sin_port)); + + syslog(LOG_INFO, "%s: %s: connected", ses->tcp_name, ses->tcp_peer_txt); + + + ses->tcp_dispatch_handle = dispatch_addfd(ses->tcp_fd, tcp_socket_callback, + ses, DISPATCH_READ); + ses->tcp_callback(TCP_CONNECT, ses); +} + + + +/** + * */ static void -tcp_connect_callback(int events, void *opaque, int fd) +tcp_client_connected(tcp_session_t *c) +{ + dtimer_disarm(&c->tcp_timer); + tcp_start_session(c); +} + + +/** + * + */ +static void +tcp_client_connect_fail(tcp_session_t *c, int error) +{ + struct sockaddr_in *si = (struct sockaddr_in *)&c->tcp_peer_addr; + + + syslog(LOG_ERR, "%s: Unable to connect to %s:%d -- %s", + c->tcp_name, inet_ntoa(si->sin_addr), + ntohs(si->sin_port), strerror(error)); + + /* Try to reconnect in 10 seconds */ + + dtimer_arm(&c->tcp_timer, tcp_client_reconnect_timeout, c, 10); +} + + +/** + * + */ +static void +tcp_client_connect_callback(int events, void *opaque, int fd) +{ + int err; + size_t errlen = sizeof(int); + tcp_session_t *c = opaque; + + dispatch_delfd(c->tcp_dispatch_handle); + + getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen); + + if(err == 0) { + tcp_client_connected(c); + return; + } + + close(c->tcp_fd); + tcp_client_connect_fail(c, errno); +} + +/** + * We dont want to wait for connect() to time out, so we have our + * own timeout + */ +static void +tcp_client_connect_timeout(void *aux, int64_t now) +{ + tcp_session_t *c = aux; + + close(c->tcp_fd); + tcp_client_connect_fail(c, ETIMEDOUT); +} + +/** + * + */ +static void +tcp_session_try_connect(tcp_session_t *c) +{ + c->tcp_fd = socket(AF_INET, SOCK_STREAM, 0); + fcntl(c->tcp_fd, F_SETFL, fcntl(c->tcp_fd, F_GETFL) | O_NONBLOCK); + + if(connect(c->tcp_fd, (struct sockaddr *)&c->tcp_peer_addr, + sizeof(struct sockaddr_in)) == 0) { + tcp_client_connected(c); + return; + } + + if(errno == EINPROGRESS) { + + dtimer_arm(&c->tcp_timer, tcp_client_connect_timeout, c, 10); + + c->tcp_dispatch_handle = + dispatch_addfd(c->tcp_fd, tcp_client_connect_callback, c, + DISPATCH_WRITE); + return; + } + + close(c->tcp_fd); + tcp_client_connect_fail(c, errno); +} + + +/** + * We come here after a failed connection attempt or if we disconnected + */ +static void +tcp_client_reconnect_timeout(void *aux, int64_t now) +{ + tcp_session_t *c = aux; + tcp_session_try_connect(c); +} + + +/** + * Create a TCP based client + */ +void * +tcp_create_client(struct in_addr ip, int port, size_t session_size, + const char *name, tcp_callback_t *cb) +{ + struct sockaddr_in *si; + tcp_session_t *c = calloc(1, session_size); + + c->tcp_callback = cb; + c->tcp_name = strdup(name); + + si = (struct sockaddr_in *)&c->tcp_peer_addr; + + memset(si, 0, sizeof(struct sockaddr_in)); + si->sin_family = AF_INET; + si->sin_port = htons(port); + si->sin_addr = ip; + + tcp_session_try_connect(c); + + return c; +} + + + + + +/** + * TCP server connect callback + */ +static void +tcp_server_callback(int events, void *opaque, int fd) { struct sockaddr_in from; socklen_t socklen = sizeof(struct sockaddr_in); int newfd; - int val; tcp_session_t *ses; tcp_server_t *srv = opaque; @@ -369,44 +553,16 @@ tcp_connect_callback(int events, void *opaque, int fd) return; if((newfd = accept(fd, (struct sockaddr *)&from, &socklen)) == -1) - return; + return; /* XXX: Something more clever must be done here */ - fcntl(newfd, F_SETFL, fcntl(newfd, F_GETFL) | O_NONBLOCK); - - val = 1; - setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)); - - val = 30; - setsockopt(newfd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)); - - val = 15; - setsockopt(newfd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)); - - val = 5; - setsockopt(newfd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)); - - val = 1; - setsockopt(newfd, SOL_TCP, TCP_NODELAY, &val, sizeof(val)); - - ses = calloc(1, srv->tcp_session_size); - tcp_init_queue(&ses->tcp_q_hi, 20 * 1000 * 1000); - tcp_init_queue(&ses->tcp_q_low, 20 * 1000 * 1000); - - ses->tcp_server = srv; - - memcpy(&ses->tcp_peer_addr, &from, socklen); - snprintf(ses->tcp_peer_txt, sizeof(ses->tcp_peer_txt), "%s:%d", - inet_ntoa(from.sin_addr), ntohs(from.sin_port)); - - syslog(LOG_INFO, "%s: %s: connected", - srv->tcp_server_name, ses->tcp_peer_txt); - ses->tcp_fd = newfd; + ses->tcp_name = strdup(srv->tcp_server_name); + ses->tcp_callback = srv->tcp_callback; + ses->tcp_server = srv; + memcpy(&ses->tcp_peer_addr, &from, socklen); - ses->tcp_dispatch_handle = dispatch_addfd(newfd, tcp_socket_callback, - ses, DISPATCH_READ); - srv->tcp_callback(TCP_CONNECT, ses); + tcp_start_session(ses); } @@ -449,7 +605,7 @@ tcp_create_server(int port, size_t session_size, const char *name, } listen(s->tcp_fd, 1); - dispatch_addfd(s->tcp_fd, tcp_connect_callback, s, DISPATCH_READ); + dispatch_addfd(s->tcp_fd, tcp_server_callback, s, DISPATCH_READ); s->tcp_server_name = strdup(name); } diff --git a/tcp.h b/tcp.h index 13006bbe..bf58a989 100644 --- a/tcp.h +++ b/tcp.h @@ -57,7 +57,15 @@ typedef struct tcp_session { int tcp_fd; struct sockaddr_storage tcp_peer_addr; char tcp_peer_txt[100]; - tcp_server_t *tcp_server; + tcp_callback_t *tcp_callback; + tcp_server_t *tcp_server; /* if this is NULL, then we are spawned + as a client */ + + /* These are only used when we spawn as a client */ + + dtimer_t tcp_timer; + char *tcp_name; + /* Output queueing */ @@ -96,4 +104,7 @@ void tcp_qprintf(tcp_queue_t *tq, const char *fmt, ...); void tcp_output_queue(tcp_session_t *ses, tcp_queue_t *dst, tcp_queue_t *src); +void *tcp_create_client(struct in_addr ip, int port, size_t session_size, + const char *name, tcp_callback_t *cb); + #endif /* TCP_H_ */