Add TCP client

This commit is contained in:
Andreas Öman 2007-12-02 11:09:48 +00:00
parent a7efa76276
commit c9c1afcf35
2 changed files with 213 additions and 46 deletions

246
tcp.c
View file

@ -32,6 +32,8 @@
#include "dispatch.h"
#include "tcp.h"
static void tcp_client_reconnect_timeout(void *aux, int64_t now);
/*
* printf data on a TCP queue
@ -314,18 +316,23 @@ tcp_output_queue(tcp_session_t *ses, tcp_queue_t *dst, tcp_queue_t *src)
void
tcp_disconnect(tcp_session_t *ses, int err)
{
tcp_server_t *srv = ses->tcp_server;
tcp_flush_queue(&ses->tcp_q_low);
tcp_flush_queue(&ses->tcp_q_hi);
srv->tcp_callback(TCP_DISCONNECT, ses);
ses->tcp_callback(TCP_DISCONNECT, ses);
syslog(LOG_INFO, "%s: %s: disconnected -- %s",
srv->tcp_server_name, ses->tcp_peer_txt, strerror(err));
ses->tcp_name, ses->tcp_peer_txt, strerror(err));
close(dispatch_delfd(ses->tcp_dispatch_handle));
free(ses);
if(ses->tcp_server != NULL) {
free(ses->tcp_name);
free(ses);
} else {
/* Try to reconnect in 5 seconds */
dtimer_arm(&ses->tcp_timer, tcp_client_reconnect_timeout, ses, 5);
}
}
@ -336,7 +343,6 @@ static void
tcp_socket_callback(int events, void *opaque, int fd)
{
tcp_session_t *ses = opaque;
tcp_server_t *srv = ses->tcp_server;
if(events & DISPATCH_ERR) {
tcp_disconnect(ses, ECONNRESET);
@ -344,7 +350,7 @@ tcp_socket_callback(int events, void *opaque, int fd)
}
if(events & DISPATCH_READ)
srv->tcp_callback(TCP_INPUT, ses);
ses->tcp_callback(TCP_INPUT, ses);
if(events & DISPATCH_WRITE)
tcp_transmit(ses);
@ -353,15 +359,193 @@ tcp_socket_callback(int events, void *opaque, int fd)
/**
* TCP connect callback
* Setup a TCP connection, common code between server and client
*/
static void
tcp_start_session(tcp_session_t *ses)
{
int val;
struct sockaddr_in *si = (struct sockaddr_in *)&ses->tcp_peer_addr;
fcntl(ses->tcp_fd, F_SETFL, fcntl(ses->tcp_fd, F_GETFL) | O_NONBLOCK);
val = 1;
setsockopt(ses->tcp_fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
val = 30;
setsockopt(ses->tcp_fd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val));
val = 15;
setsockopt(ses->tcp_fd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val));
val = 5;
setsockopt(ses->tcp_fd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val));
val = 1;
setsockopt(ses->tcp_fd, SOL_TCP, TCP_NODELAY, &val, sizeof(val));
tcp_init_queue(&ses->tcp_q_hi, 20 * 1000 * 1000);
tcp_init_queue(&ses->tcp_q_low, 20 * 1000 * 1000);
snprintf(ses->tcp_peer_txt, sizeof(ses->tcp_peer_txt), "%s:%d",
inet_ntoa(si->sin_addr), ntohs(si->sin_port));
syslog(LOG_INFO, "%s: %s: connected", ses->tcp_name, ses->tcp_peer_txt);
ses->tcp_dispatch_handle = dispatch_addfd(ses->tcp_fd, tcp_socket_callback,
ses, DISPATCH_READ);
ses->tcp_callback(TCP_CONNECT, ses);
}
/**
*
*/
static void
tcp_connect_callback(int events, void *opaque, int fd)
tcp_client_connected(tcp_session_t *c)
{
dtimer_disarm(&c->tcp_timer);
tcp_start_session(c);
}
/**
*
*/
static void
tcp_client_connect_fail(tcp_session_t *c, int error)
{
struct sockaddr_in *si = (struct sockaddr_in *)&c->tcp_peer_addr;
syslog(LOG_ERR, "%s: Unable to connect to %s:%d -- %s",
c->tcp_name, inet_ntoa(si->sin_addr),
ntohs(si->sin_port), strerror(error));
/* Try to reconnect in 10 seconds */
dtimer_arm(&c->tcp_timer, tcp_client_reconnect_timeout, c, 10);
}
/**
*
*/
static void
tcp_client_connect_callback(int events, void *opaque, int fd)
{
int err;
size_t errlen = sizeof(int);
tcp_session_t *c = opaque;
dispatch_delfd(c->tcp_dispatch_handle);
getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
if(err == 0) {
tcp_client_connected(c);
return;
}
close(c->tcp_fd);
tcp_client_connect_fail(c, errno);
}
/**
* We dont want to wait for connect() to time out, so we have our
* own timeout
*/
static void
tcp_client_connect_timeout(void *aux, int64_t now)
{
tcp_session_t *c = aux;
close(c->tcp_fd);
tcp_client_connect_fail(c, ETIMEDOUT);
}
/**
*
*/
static void
tcp_session_try_connect(tcp_session_t *c)
{
c->tcp_fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(c->tcp_fd, F_SETFL, fcntl(c->tcp_fd, F_GETFL) | O_NONBLOCK);
if(connect(c->tcp_fd, (struct sockaddr *)&c->tcp_peer_addr,
sizeof(struct sockaddr_in)) == 0) {
tcp_client_connected(c);
return;
}
if(errno == EINPROGRESS) {
dtimer_arm(&c->tcp_timer, tcp_client_connect_timeout, c, 10);
c->tcp_dispatch_handle =
dispatch_addfd(c->tcp_fd, tcp_client_connect_callback, c,
DISPATCH_WRITE);
return;
}
close(c->tcp_fd);
tcp_client_connect_fail(c, errno);
}
/**
* We come here after a failed connection attempt or if we disconnected
*/
static void
tcp_client_reconnect_timeout(void *aux, int64_t now)
{
tcp_session_t *c = aux;
tcp_session_try_connect(c);
}
/**
* Create a TCP based client
*/
void *
tcp_create_client(struct in_addr ip, int port, size_t session_size,
const char *name, tcp_callback_t *cb)
{
struct sockaddr_in *si;
tcp_session_t *c = calloc(1, session_size);
c->tcp_callback = cb;
c->tcp_name = strdup(name);
si = (struct sockaddr_in *)&c->tcp_peer_addr;
memset(si, 0, sizeof(struct sockaddr_in));
si->sin_family = AF_INET;
si->sin_port = htons(port);
si->sin_addr = ip;
tcp_session_try_connect(c);
return c;
}
/**
* TCP server connect callback
*/
static void
tcp_server_callback(int events, void *opaque, int fd)
{
struct sockaddr_in from;
socklen_t socklen = sizeof(struct sockaddr_in);
int newfd;
int val;
tcp_session_t *ses;
tcp_server_t *srv = opaque;
@ -369,44 +553,16 @@ tcp_connect_callback(int events, void *opaque, int fd)
return;
if((newfd = accept(fd, (struct sockaddr *)&from, &socklen)) == -1)
return;
return; /* XXX: Something more clever must be done here */
fcntl(newfd, F_SETFL, fcntl(newfd, F_GETFL) | O_NONBLOCK);
val = 1;
setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
val = 30;
setsockopt(newfd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val));
val = 15;
setsockopt(newfd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val));
val = 5;
setsockopt(newfd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val));
val = 1;
setsockopt(newfd, SOL_TCP, TCP_NODELAY, &val, sizeof(val));
ses = calloc(1, srv->tcp_session_size);
tcp_init_queue(&ses->tcp_q_hi, 20 * 1000 * 1000);
tcp_init_queue(&ses->tcp_q_low, 20 * 1000 * 1000);
ses->tcp_server = srv;
memcpy(&ses->tcp_peer_addr, &from, socklen);
snprintf(ses->tcp_peer_txt, sizeof(ses->tcp_peer_txt), "%s:%d",
inet_ntoa(from.sin_addr), ntohs(from.sin_port));
syslog(LOG_INFO, "%s: %s: connected",
srv->tcp_server_name, ses->tcp_peer_txt);
ses->tcp_fd = newfd;
ses->tcp_name = strdup(srv->tcp_server_name);
ses->tcp_callback = srv->tcp_callback;
ses->tcp_server = srv;
memcpy(&ses->tcp_peer_addr, &from, socklen);
ses->tcp_dispatch_handle = dispatch_addfd(newfd, tcp_socket_callback,
ses, DISPATCH_READ);
srv->tcp_callback(TCP_CONNECT, ses);
tcp_start_session(ses);
}
@ -449,7 +605,7 @@ tcp_create_server(int port, size_t session_size, const char *name,
}
listen(s->tcp_fd, 1);
dispatch_addfd(s->tcp_fd, tcp_connect_callback, s, DISPATCH_READ);
dispatch_addfd(s->tcp_fd, tcp_server_callback, s, DISPATCH_READ);
s->tcp_server_name = strdup(name);
}

13
tcp.h
View file

@ -57,7 +57,15 @@ typedef struct tcp_session {
int tcp_fd;
struct sockaddr_storage tcp_peer_addr;
char tcp_peer_txt[100];
tcp_server_t *tcp_server;
tcp_callback_t *tcp_callback;
tcp_server_t *tcp_server; /* if this is NULL, then we are spawned
as a client */
/* These are only used when we spawn as a client */
dtimer_t tcp_timer;
char *tcp_name;
/* Output queueing */
@ -96,4 +104,7 @@ void tcp_qprintf(tcp_queue_t *tq, const char *fmt, ...);
void tcp_output_queue(tcp_session_t *ses, tcp_queue_t *dst, tcp_queue_t *src);
void *tcp_create_client(struct in_addr ip, int port, size_t session_size,
const char *name, tcp_callback_t *cb);
#endif /* TCP_H_ */