From 5a994169362d0c39dac69991d55fb8debe459688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Mon, 3 Dec 2007 14:55:55 +0000 Subject: [PATCH] Add an asynchronous hostname resolver and make tcp client use it --- Makefile | 3 +- resolver.c | 321 +++++++++++++++++++++++++++++++++++++++++++++++++++++ resolver.h | 29 +++++ tcp.c | 50 ++++++--- tcp.h | 5 +- 5 files changed, 391 insertions(+), 17 deletions(-) create mode 100644 resolver.c create mode 100644 resolver.h diff --git a/Makefile b/Makefile index 129683b6..f3e1abe2 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,8 @@ -include ../config.mak SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c \ - subscriptions.c tsmux.c tsdemux.c pes.c buffer.c tcp.c plugin.c + subscriptions.c tsmux.c tsdemux.c pes.c buffer.c tcp.c plugin.c \ + resolver.c SRCS += http.c htmlui.c diff --git a/resolver.c b/resolver.c new file mode 100644 index 00000000..81d11485 --- /dev/null +++ b/resolver.c @@ -0,0 +1,321 @@ +/* + * Async resolver + * Copyright (C) 2007 Andreas Öman + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include "resolver.h" +#include "dispatch.h" + +static int res_seq_tally; + +static int to_thread_fd[2]; +static int from_thread_fd[2]; + +LIST_HEAD(res_list, res); + +static struct res_list pending_resolve; + +static void *resolver_loop(void *aux); + +typedef struct res { + void (*cb)(void *opaque, struct sockaddr *so, const char *error); + void *opaque; + int seqno; + LIST_ENTRY(res) link; +} res_t; + +static char response_buf[256]; +static int response_buf_ptr; + +/** + * Process response from resolver thread and invoke callback + */ +static void +resolver_socket_callback(int events, void *opaque, int fd) +{ + htsmsg_t *m; + int r, msglen; + uint32_t seq; + const char *err; + res_t *res; + struct sockaddr_in si; + const void *ipv4; + size_t ipv4len; + + if(response_buf_ptr < 4) { + msglen = 4; + } else { + msglen = (response_buf[0] << 24 | response_buf[1] << 16 | + response_buf[2] << 8 | response_buf[3]) + 4; + if(msglen > 250) { + fprintf(stderr, "Internal resolver error, max msglen exceeded\n"); + exit(1); + } + } + + r = read(from_thread_fd[0], response_buf + response_buf_ptr, + msglen - response_buf_ptr); + if(r < 1) { + fprintf(stderr, "Internal resolver error, reading failed (%d)\n", r); + perror("read"); + exit(1); + } + + response_buf_ptr += r; + + if(msglen > 4 && response_buf_ptr == msglen) { + m = htsmsg_binary_deserialize(response_buf + 4, msglen - 4, NULL); + + htsmsg_get_u32(m, "seq", &seq); + + LIST_FOREACH(res, &pending_resolve, link) + if(res->seqno == seq) + break; + + if(res != NULL) { + + err = htsmsg_get_str(m, "error"); + if(err) { + res->cb(res->opaque, NULL, err); + } else if(htsmsg_get_bin(m, "ipv4", &ipv4, &ipv4len) == 0) { + memset(&si, 0, sizeof(si)); + si.sin_family = AF_INET; + memcpy(&si.sin_addr, ipv4, 4); + res->cb(res->opaque, (struct sockaddr *)&si, NULL); + } + + LIST_REMOVE(res, link); + free(res); + } + response_buf_ptr = 0; + } +} + + + + +/** + * Setup async resolver + */ +static void +async_resolver_init(void) +{ + static int inited; + pthread_t ptid; + int fd; + + if(inited) + return; + + inited = 1; + + if(pipe(to_thread_fd) == -1) { + perror("Async resolver, cannot create pipe"); + exit(1); + } + + if(pipe(from_thread_fd) == -1) { + perror("Async resolver, cannot create pipe"); + exit(1); + } + + pthread_create(&ptid, NULL, resolver_loop, NULL); + + fd = from_thread_fd[0]; + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK); + dispatch_addfd(fd, resolver_socket_callback, NULL, DISPATCH_READ); +} + + + +/** + * Resolver thread + */ + +static void * +resolver_loop(void *aux) +{ + htsmsg_t *m; + void *buf; + int r, res, herr; + unsigned int l, len; + struct hostent hostbuf, *hp; + size_t hstbuflen; + char *tmphstbuf; + const char *hostname; + const char *errtxt; + + hstbuflen = 1024; + tmphstbuf = malloc(hstbuflen); + + while(1) { + + r = read(to_thread_fd[0], &l, 4); + if(r != 4) { + fprintf(stderr, "resolver: read error: header, r = %d\n", r); + perror("read"); + break; + } + + l = ntohl(l); + buf = malloc(l); + + if(read(to_thread_fd[0], buf, l) != l) { + free(buf); + fprintf(stderr, "resolver: read error: payload, r = %d\n", r); + perror("read"); + break; + } + + m = htsmsg_binary_deserialize(buf, l, buf); + free(buf); + if(m == NULL) { + fprintf(stderr, "resolver: cannot deserialize\n"); + continue; + } + hostname = htsmsg_get_str(m, "hostname"); + if(hostname == NULL) { + fprintf(stderr, "resolver: missing hostname\n"); + break; + } + while((res = gethostbyname_r(hostname, &hostbuf, tmphstbuf, hstbuflen, + &hp, &herr)) == ERANGE) { + hstbuflen *= 2; + tmphstbuf = realloc (tmphstbuf, hstbuflen); + } + + if(res != 0) { + htsmsg_add_str(m, "error", "internal error"); + } else if(herr != 0) { + switch(herr) { + case HOST_NOT_FOUND: + errtxt = "The specified host is unknown"; + break; + case NO_ADDRESS: + errtxt = "The requested name is valid but does not have an IP address"; + break; + + case NO_RECOVERY: + errtxt = "A non-recoverable name server error occurred"; + break; + + case TRY_AGAIN: + errtxt = "A temporary error occurred on an authoritative name server"; + break; + + default: + errtxt = "Unknown error"; + break; + } + htsmsg_add_str(m, "error", errtxt); + } else if(hp == NULL) { + htsmsg_add_str(m, "error", "internal error"); + } else { + switch(hp->h_addrtype) { + + case AF_INET: + htsmsg_add_bin(m, "ipv4", hp->h_addr, 4); + break; + + default: + htsmsg_add_str(m, "error", "Unsupported address family"); + break; + } + } + + if(htsmsg_binary_serialize(m, &buf, &len, -1) < 0) { + fprintf(stderr, "Resolver: serialization error\n"); + break; + } + + write(from_thread_fd[1], buf, len); +free(buf); + } + fprintf(stderr, "Internal resolver error\n"); + exit(1); +} + + +/** + * Public function for resolving a hostname with a callback and opaque + */ +void * +async_resolve(const char *hostname, + void (*cb)(void *opaque, struct sockaddr *so, const char *error), + void *opaque) +{ + htsmsg_t *m; + void *buf; + unsigned int len; + int r; + res_t *res; + + async_resolver_init(); + + res_seq_tally++; + + m = htsmsg_create(); + htsmsg_add_str(m, "hostname", hostname); + htsmsg_add_u32(m, "seq", res_seq_tally); + + if(htsmsg_binary_serialize(m, &buf, &len, -1) < 0) { + htsmsg_destroy(m); + return NULL; + } + + r = write(to_thread_fd[1], buf, len); + + free(buf); + htsmsg_destroy(m); + + if(r < 0) + return NULL; + + res = calloc(1, sizeof(res_t)); + res->cb = cb; + res->opaque = opaque; + res->seqno = res_seq_tally; + + LIST_INSERT_HEAD(&pending_resolve, res, link); + return res; +} + +/** + * Cancel a pending resolve + */ +void +async_resolve_cancel(void *ar) +{ + res_t *res = ar; + + LIST_REMOVE(res, link); + free(res); +} diff --git a/resolver.h b/resolver.h new file mode 100644 index 00000000..3c9e6817 --- /dev/null +++ b/resolver.h @@ -0,0 +1,29 @@ +/* + * Async resolver + * Copyright (C) 2007 Andreas Öman + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef RESOLVER_H +#define RESOLVER_H + +void *async_resolve(const char *hostname, + void (*cb)(void *opaque, struct sockaddr *so, + const char *error), + void *opaque); + +void async_resolve_cancel(void *ar); + +#endif /* RESOLVER_H */ diff --git a/tcp.c b/tcp.c index f5e750fb..df02755f 100644 --- a/tcp.c +++ b/tcp.c @@ -31,6 +31,7 @@ #include "dispatch.h" #include "tcp.h" +#include "resolver.h" static void tcp_client_reconnect_timeout(void *aux, int64_t now); @@ -391,7 +392,9 @@ tcp_start_session(tcp_session_t *ses) snprintf(ses->tcp_peer_txt, sizeof(ses->tcp_peer_txt), "%s:%d", inet_ntoa(si->sin_addr), ntohs(si->sin_port)); - syslog(LOG_INFO, "%s: %s: connected", ses->tcp_name, ses->tcp_peer_txt); + syslog(LOG_INFO, "%s: %s%sConnected to %s", ses->tcp_name, + ses->tcp_hostname ?: "", ses->tcp_hostname ? ": " : "", + ses->tcp_peer_txt); ses->tcp_dispatch_handle = dispatch_addfd(ses->tcp_fd, tcp_socket_callback, @@ -421,8 +424,8 @@ tcp_client_connect_fail(tcp_session_t *c, int error) struct sockaddr_in *si = (struct sockaddr_in *)&c->tcp_peer_addr; - syslog(LOG_ERR, "%s: Unable to connect to %s:%d -- %s", - c->tcp_name, inet_ntoa(si->sin_addr), + syslog(LOG_ERR, "%s: Unable to connect to \"%s\" (%s) : %d -- %s", + c->tcp_name, c->tcp_hostname, inet_ntoa(si->sin_addr), ntohs(si->sin_port), strerror(error)); /* Try to reconnect in 10 seconds */ @@ -471,11 +474,28 @@ tcp_client_connect_timeout(void *aux, int64_t now) * */ static void -tcp_session_try_connect(tcp_session_t *c) +tcp_session_peer_resolved(void *aux, struct sockaddr *so, const char *error) { + tcp_session_t *c = aux; + struct sockaddr_in *si; + + if(error != NULL) { + syslog(LOG_ERR, "%s: Unable to resolve \"%s\" -- %s", + c->tcp_name, c->tcp_hostname, error); + /* Try again in 30 seconds */ + dtimer_arm(&c->tcp_timer, tcp_client_reconnect_timeout, c, 30); + return; + } + + c->tcp_fd = socket(AF_INET, SOCK_STREAM, 0); fcntl(c->tcp_fd, F_SETFL, fcntl(c->tcp_fd, F_GETFL) | O_NONBLOCK); + + si = (struct sockaddr_in *)&c->tcp_peer_addr; + memcpy(si, so, sizeof(struct sockaddr_in)); + si->sin_port = htons(c->tcp_port); + if(connect(c->tcp_fd, (struct sockaddr *)&c->tcp_peer_addr, sizeof(struct sockaddr_in)) == 0) { tcp_client_connected(c); @@ -496,6 +516,15 @@ tcp_session_try_connect(tcp_session_t *c) tcp_client_connect_fail(c, errno); } +/** + * Start by resolving hostname + */ +static void +tcp_session_try_connect(tcp_session_t *c) +{ + async_resolve(c->tcp_hostname, tcp_session_peer_resolved, c); +} + /** * We come here after a failed connection attempt or if we disconnected @@ -512,24 +541,17 @@ tcp_client_reconnect_timeout(void *aux, int64_t now) * Create a TCP based client */ void * -tcp_create_client(struct in_addr ip, int port, size_t session_size, +tcp_create_client(const char *hostname, int port, size_t session_size, const char *name, tcp_callback_t *cb) { - struct sockaddr_in *si; tcp_session_t *c = calloc(1, session_size); c->tcp_callback = cb; c->tcp_name = strdup(name); - - si = (struct sockaddr_in *)&c->tcp_peer_addr; - - memset(si, 0, sizeof(struct sockaddr_in)); - si->sin_family = AF_INET; - si->sin_port = htons(port); - si->sin_addr = ip; + c->tcp_port = port; + c->tcp_hostname = strdup(hostname); tcp_session_try_connect(c); - return c; } diff --git a/tcp.h b/tcp.h index bf58a989..5300fc1b 100644 --- a/tcp.h +++ b/tcp.h @@ -65,7 +65,8 @@ typedef struct tcp_session { dtimer_t tcp_timer; char *tcp_name; - + int tcp_port; + char *tcp_hostname; /* Output queueing */ @@ -104,7 +105,7 @@ void tcp_qprintf(tcp_queue_t *tq, const char *fmt, ...); void tcp_output_queue(tcp_session_t *ses, tcp_queue_t *dst, tcp_queue_t *src); -void *tcp_create_client(struct in_addr ip, int port, size_t session_size, +void *tcp_create_client(const char *hostname, int port, size_t session_size, const char *name, tcp_callback_t *cb); #endif /* TCP_H_ */