Add an asynchronous hostname resolver and make tcp client use it

This commit is contained in:
Andreas Öman 2007-12-03 14:55:55 +00:00
parent 7bf3d600db
commit 5a99416936
5 changed files with 391 additions and 17 deletions

View file

@ -1,7 +1,8 @@
-include ../config.mak
SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c \
subscriptions.c tsmux.c tsdemux.c pes.c buffer.c tcp.c plugin.c
subscriptions.c tsmux.c tsdemux.c pes.c buffer.c tcp.c plugin.c \
resolver.c
SRCS += http.c htmlui.c

321
resolver.c Normal file
View file

@ -0,0 +1,321 @@
/*
* Async resolver
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <libhts/htsmsg.h>
#include <libhts/htsmsg_binary.h>
#include <libhts/htsq.h>
#include "resolver.h"
#include "dispatch.h"
static int res_seq_tally;
static int to_thread_fd[2];
static int from_thread_fd[2];
LIST_HEAD(res_list, res);
static struct res_list pending_resolve;
static void *resolver_loop(void *aux);
typedef struct res {
void (*cb)(void *opaque, struct sockaddr *so, const char *error);
void *opaque;
int seqno;
LIST_ENTRY(res) link;
} res_t;
static char response_buf[256];
static int response_buf_ptr;
/**
* Process response from resolver thread and invoke callback
*/
static void
resolver_socket_callback(int events, void *opaque, int fd)
{
htsmsg_t *m;
int r, msglen;
uint32_t seq;
const char *err;
res_t *res;
struct sockaddr_in si;
const void *ipv4;
size_t ipv4len;
if(response_buf_ptr < 4) {
msglen = 4;
} else {
msglen = (response_buf[0] << 24 | response_buf[1] << 16 |
response_buf[2] << 8 | response_buf[3]) + 4;
if(msglen > 250) {
fprintf(stderr, "Internal resolver error, max msglen exceeded\n");
exit(1);
}
}
r = read(from_thread_fd[0], response_buf + response_buf_ptr,
msglen - response_buf_ptr);
if(r < 1) {
fprintf(stderr, "Internal resolver error, reading failed (%d)\n", r);
perror("read");
exit(1);
}
response_buf_ptr += r;
if(msglen > 4 && response_buf_ptr == msglen) {
m = htsmsg_binary_deserialize(response_buf + 4, msglen - 4, NULL);
htsmsg_get_u32(m, "seq", &seq);
LIST_FOREACH(res, &pending_resolve, link)
if(res->seqno == seq)
break;
if(res != NULL) {
err = htsmsg_get_str(m, "error");
if(err) {
res->cb(res->opaque, NULL, err);
} else if(htsmsg_get_bin(m, "ipv4", &ipv4, &ipv4len) == 0) {
memset(&si, 0, sizeof(si));
si.sin_family = AF_INET;
memcpy(&si.sin_addr, ipv4, 4);
res->cb(res->opaque, (struct sockaddr *)&si, NULL);
}
LIST_REMOVE(res, link);
free(res);
}
response_buf_ptr = 0;
}
}
/**
* Setup async resolver
*/
static void
async_resolver_init(void)
{
static int inited;
pthread_t ptid;
int fd;
if(inited)
return;
inited = 1;
if(pipe(to_thread_fd) == -1) {
perror("Async resolver, cannot create pipe");
exit(1);
}
if(pipe(from_thread_fd) == -1) {
perror("Async resolver, cannot create pipe");
exit(1);
}
pthread_create(&ptid, NULL, resolver_loop, NULL);
fd = from_thread_fd[0];
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
dispatch_addfd(fd, resolver_socket_callback, NULL, DISPATCH_READ);
}
/**
* Resolver thread
*/
static void *
resolver_loop(void *aux)
{
htsmsg_t *m;
void *buf;
int r, res, herr;
unsigned int l, len;
struct hostent hostbuf, *hp;
size_t hstbuflen;
char *tmphstbuf;
const char *hostname;
const char *errtxt;
hstbuflen = 1024;
tmphstbuf = malloc(hstbuflen);
while(1) {
r = read(to_thread_fd[0], &l, 4);
if(r != 4) {
fprintf(stderr, "resolver: read error: header, r = %d\n", r);
perror("read");
break;
}
l = ntohl(l);
buf = malloc(l);
if(read(to_thread_fd[0], buf, l) != l) {
free(buf);
fprintf(stderr, "resolver: read error: payload, r = %d\n", r);
perror("read");
break;
}
m = htsmsg_binary_deserialize(buf, l, buf);
free(buf);
if(m == NULL) {
fprintf(stderr, "resolver: cannot deserialize\n");
continue;
}
hostname = htsmsg_get_str(m, "hostname");
if(hostname == NULL) {
fprintf(stderr, "resolver: missing hostname\n");
break;
}
while((res = gethostbyname_r(hostname, &hostbuf, tmphstbuf, hstbuflen,
&hp, &herr)) == ERANGE) {
hstbuflen *= 2;
tmphstbuf = realloc (tmphstbuf, hstbuflen);
}
if(res != 0) {
htsmsg_add_str(m, "error", "internal error");
} else if(herr != 0) {
switch(herr) {
case HOST_NOT_FOUND:
errtxt = "The specified host is unknown";
break;
case NO_ADDRESS:
errtxt = "The requested name is valid but does not have an IP address";
break;
case NO_RECOVERY:
errtxt = "A non-recoverable name server error occurred";
break;
case TRY_AGAIN:
errtxt = "A temporary error occurred on an authoritative name server";
break;
default:
errtxt = "Unknown error";
break;
}
htsmsg_add_str(m, "error", errtxt);
} else if(hp == NULL) {
htsmsg_add_str(m, "error", "internal error");
} else {
switch(hp->h_addrtype) {
case AF_INET:
htsmsg_add_bin(m, "ipv4", hp->h_addr, 4);
break;
default:
htsmsg_add_str(m, "error", "Unsupported address family");
break;
}
}
if(htsmsg_binary_serialize(m, &buf, &len, -1) < 0) {
fprintf(stderr, "Resolver: serialization error\n");
break;
}
write(from_thread_fd[1], buf, len);
free(buf);
}
fprintf(stderr, "Internal resolver error\n");
exit(1);
}
/**
* Public function for resolving a hostname with a callback and opaque
*/
void *
async_resolve(const char *hostname,
void (*cb)(void *opaque, struct sockaddr *so, const char *error),
void *opaque)
{
htsmsg_t *m;
void *buf;
unsigned int len;
int r;
res_t *res;
async_resolver_init();
res_seq_tally++;
m = htsmsg_create();
htsmsg_add_str(m, "hostname", hostname);
htsmsg_add_u32(m, "seq", res_seq_tally);
if(htsmsg_binary_serialize(m, &buf, &len, -1) < 0) {
htsmsg_destroy(m);
return NULL;
}
r = write(to_thread_fd[1], buf, len);
free(buf);
htsmsg_destroy(m);
if(r < 0)
return NULL;
res = calloc(1, sizeof(res_t));
res->cb = cb;
res->opaque = opaque;
res->seqno = res_seq_tally;
LIST_INSERT_HEAD(&pending_resolve, res, link);
return res;
}
/**
* Cancel a pending resolve
*/
void
async_resolve_cancel(void *ar)
{
res_t *res = ar;
LIST_REMOVE(res, link);
free(res);
}

29
resolver.h Normal file
View file

@ -0,0 +1,29 @@
/*
* Async resolver
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef RESOLVER_H
#define RESOLVER_H
void *async_resolve(const char *hostname,
void (*cb)(void *opaque, struct sockaddr *so,
const char *error),
void *opaque);
void async_resolve_cancel(void *ar);
#endif /* RESOLVER_H */

50
tcp.c
View file

@ -31,6 +31,7 @@
#include "dispatch.h"
#include "tcp.h"
#include "resolver.h"
static void tcp_client_reconnect_timeout(void *aux, int64_t now);
@ -391,7 +392,9 @@ tcp_start_session(tcp_session_t *ses)
snprintf(ses->tcp_peer_txt, sizeof(ses->tcp_peer_txt), "%s:%d",
inet_ntoa(si->sin_addr), ntohs(si->sin_port));
syslog(LOG_INFO, "%s: %s: connected", ses->tcp_name, ses->tcp_peer_txt);
syslog(LOG_INFO, "%s: %s%sConnected to %s", ses->tcp_name,
ses->tcp_hostname ?: "", ses->tcp_hostname ? ": " : "",
ses->tcp_peer_txt);
ses->tcp_dispatch_handle = dispatch_addfd(ses->tcp_fd, tcp_socket_callback,
@ -421,8 +424,8 @@ tcp_client_connect_fail(tcp_session_t *c, int error)
struct sockaddr_in *si = (struct sockaddr_in *)&c->tcp_peer_addr;
syslog(LOG_ERR, "%s: Unable to connect to %s:%d -- %s",
c->tcp_name, inet_ntoa(si->sin_addr),
syslog(LOG_ERR, "%s: Unable to connect to \"%s\" (%s) : %d -- %s",
c->tcp_name, c->tcp_hostname, inet_ntoa(si->sin_addr),
ntohs(si->sin_port), strerror(error));
/* Try to reconnect in 10 seconds */
@ -471,11 +474,28 @@ tcp_client_connect_timeout(void *aux, int64_t now)
*
*/
static void
tcp_session_try_connect(tcp_session_t *c)
tcp_session_peer_resolved(void *aux, struct sockaddr *so, const char *error)
{
tcp_session_t *c = aux;
struct sockaddr_in *si;
if(error != NULL) {
syslog(LOG_ERR, "%s: Unable to resolve \"%s\" -- %s",
c->tcp_name, c->tcp_hostname, error);
/* Try again in 30 seconds */
dtimer_arm(&c->tcp_timer, tcp_client_reconnect_timeout, c, 30);
return;
}
c->tcp_fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(c->tcp_fd, F_SETFL, fcntl(c->tcp_fd, F_GETFL) | O_NONBLOCK);
si = (struct sockaddr_in *)&c->tcp_peer_addr;
memcpy(si, so, sizeof(struct sockaddr_in));
si->sin_port = htons(c->tcp_port);
if(connect(c->tcp_fd, (struct sockaddr *)&c->tcp_peer_addr,
sizeof(struct sockaddr_in)) == 0) {
tcp_client_connected(c);
@ -496,6 +516,15 @@ tcp_session_try_connect(tcp_session_t *c)
tcp_client_connect_fail(c, errno);
}
/**
* Start by resolving hostname
*/
static void
tcp_session_try_connect(tcp_session_t *c)
{
async_resolve(c->tcp_hostname, tcp_session_peer_resolved, c);
}
/**
* We come here after a failed connection attempt or if we disconnected
@ -512,24 +541,17 @@ tcp_client_reconnect_timeout(void *aux, int64_t now)
* Create a TCP based client
*/
void *
tcp_create_client(struct in_addr ip, int port, size_t session_size,
tcp_create_client(const char *hostname, int port, size_t session_size,
const char *name, tcp_callback_t *cb)
{
struct sockaddr_in *si;
tcp_session_t *c = calloc(1, session_size);
c->tcp_callback = cb;
c->tcp_name = strdup(name);
si = (struct sockaddr_in *)&c->tcp_peer_addr;
memset(si, 0, sizeof(struct sockaddr_in));
si->sin_family = AF_INET;
si->sin_port = htons(port);
si->sin_addr = ip;
c->tcp_port = port;
c->tcp_hostname = strdup(hostname);
tcp_session_try_connect(c);
return c;
}

5
tcp.h
View file

@ -65,7 +65,8 @@ typedef struct tcp_session {
dtimer_t tcp_timer;
char *tcp_name;
int tcp_port;
char *tcp_hostname;
/* Output queueing */
@ -104,7 +105,7 @@ void tcp_qprintf(tcp_queue_t *tq, const char *fmt, ...);
void tcp_output_queue(tcp_session_t *ses, tcp_queue_t *dst, tcp_queue_t *src);
void *tcp_create_client(struct in_addr ip, int port, size_t session_size,
void *tcp_create_client(const char *hostname, int port, size_t session_size,
const char *name, tcp_callback_t *cb);
#endif /* TCP_H_ */