/* * tvheadend, TCP common functions * Copyright (C) 2007 Andreas Öman * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "tcp.h" #if 0 #include "dispatch.h" #include "resolver.h" static void tcp_client_reconnect_timeout(void *aux, int64_t now); /** * Read max 'n' bytes of data from line parser. Used to consume binary data * for mixed line / binary protocols (HTTP) * * Returns bytes read */ int tcp_line_drain(tcp_session_t *ses, void *buf, int n) { if(n > ses->tcp_input_buf_ptr) n = ses->tcp_input_buf_ptr; memcpy(buf, ses->tcp_input_buf, n); memmove(ses->tcp_input_buf, ses->tcp_input_buf + n, sizeof(ses->tcp_input_buf) - n); ses->tcp_input_buf_ptr -= n; return n; } /** * Line parser for TCP based connections. Note that callback cannot * destroy the session on its own. It must return an error to perform * disconnect. */ void tcp_line_read(tcp_session_t *ses, tcp_line_input_t *callback) { int space = sizeof(ses->tcp_input_buf) - ses->tcp_input_buf_ptr - 1; int r, cr = 0, i, err; char buf[TCP_MAX_LINE_LEN]; if(space < 1) { tcp_disconnect(ses, EBADMSG); return; } r = read(ses->tcp_fd, ses->tcp_input_buf + ses->tcp_input_buf_ptr, space); if(r < 1) { tcp_disconnect(ses, r == 0 ? ECONNRESET : errno); return; } ses->tcp_input_buf_ptr += r; ses->tcp_input_buf[ses->tcp_input_buf_ptr] = 0; while(1) { cr = 0; for(i = 0; i < ses->tcp_input_buf_ptr; i++) if(ses->tcp_input_buf[i] == 0xa) break; if(i == ses->tcp_input_buf_ptr) break; memcpy(buf, ses->tcp_input_buf, i); buf[i] = 0; i++; memmove(ses->tcp_input_buf, ses->tcp_input_buf + i, sizeof(ses->tcp_input_buf) - i); ses->tcp_input_buf_ptr -= i; i = strlen(buf); while(i > 0 && buf[i-1] < 32) buf[--i] = 0; if((err = callback(ses, buf)) != 0) { tcp_disconnect(ses, err < 0 ? 0 : err); break; } } } /** * Transmit data from any of the queues * Select hi-pri queue first if possible, but always stick on the same * queue until a whole message has been sent */ static void tcp_transmit(tcp_session_t *ses) { htsbuf_queue_t *hq = ses->tcp_q_current; htsbuf_data_t *hd; int r; again: if(hq == NULL) { if(ses->tcp_q[1].hq_size) hq = &ses->tcp_q[1]; else if(ses->tcp_q[0].hq_size) hq = &ses->tcp_q[0]; } while(hq != NULL) { hd = TAILQ_FIRST(&hq->hq_q); r = write(ses->tcp_fd, hd->hd_data + hd->hd_data_off, hd->hd_data_len - hd->hd_data_off); if(r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) r = 0; if(r == 0) break; if(r == -1) { tcp_disconnect(ses, errno); return; } hq->hq_size -= r; hd->hd_data_off += r; if(hd->hd_data_off == hd->hd_data_len) { htsbuf_data_free(hq, hd); hq = NULL; goto again; } } if(hq == NULL) { if(ses->tcp_blocked) { dispatch_clr(ses->tcp_dispatch_handle, DISPATCH_WRITE); ses->tcp_blocked = 0; } } else { if(!ses->tcp_blocked) { dispatch_set(ses->tcp_dispatch_handle, DISPATCH_WRITE); ses->tcp_blocked = 1; } } ses->tcp_q_current = hq; } /** * Enqueue a message and start transmission if nothing currently is * being sent. */ int tcp_send_msg(tcp_session_t *ses, int hiprio, void *data, size_t len) { htsbuf_queue_t *hq = &ses->tcp_q[!!hiprio]; htsbuf_data_t *hd; if(len > hq->hq_maxsize - hq->hq_size) { free(data); return -1; } hd = malloc(sizeof(htsbuf_data_t)); hd->hd_data_off = 0; hd->hd_data_len = len; hd->hd_data = data; TAILQ_INSERT_TAIL(&hq->hq_q, hd, hd_link); hq->hq_size += len; if(!ses->tcp_blocked) tcp_transmit(ses); return 0; } /** * */ void tcp_printf(tcp_session_t *ses, const char *fmt, ...) { va_list ap; va_start(ap, fmt); htsbuf_vqprintf(&ses->tcp_q[0], fmt, ap); va_end(ap); if(!ses->tcp_blocked) tcp_transmit(ses); } /** * Move a tcp queue onto a session * * Coalesce smaller chunks into bigger ones for more efficient I/O */ void tcp_output_queue(tcp_session_t *ses, int hiprio, htsbuf_queue_t *src) { htsbuf_data_t *hd; htsbuf_queue_t *dst = &ses->tcp_q[!!hiprio]; while((hd = TAILQ_FIRST(&src->hq_q)) != NULL) { TAILQ_REMOVE(&src->hq_q, hd, hd_link); TAILQ_INSERT_TAIL(&dst->hq_q, hd, hd_link); dst->hq_size += hd->hd_data_len; } src->hq_size = 0; if(!ses->tcp_blocked) tcp_transmit(ses); } /** * Disconnect handler */ void tcp_disconnect(tcp_session_t *ses, int err) { htsbuf_queue_flush(&ses->tcp_q[0]); htsbuf_queue_flush(&ses->tcp_q[1]); ses->tcp_callback(TCP_DISCONNECT, ses); if(ses->tcp_server == NULL) tvhlog(LOG_INFO, "tcp", "%s: %s: disconnected -- %s", ses->tcp_name, ses->tcp_peer_txt, strerror(err)); close(dispatch_delfd(ses->tcp_dispatch_handle)); ses->tcp_dispatch_handle = NULL; if(ses->tcp_server != NULL) { free(ses->tcp_name); free(ses); } else if(ses->tcp_enabled) { /* Try to reconnect in 2 seconds */ dtimer_arm(&ses->tcp_timer, tcp_client_reconnect_timeout, ses, 2); } } /** * Dispatcher callback */ static void tcp_socket_callback(int events, void *opaque, int fd) { tcp_session_t *ses = opaque; if(events & DISPATCH_ERR) { tcp_disconnect(ses, ECONNRESET); return; } if(events & DISPATCH_READ) ses->tcp_callback(TCP_INPUT, ses); if(events & DISPATCH_WRITE) tcp_transmit(ses); } /** * Setup a TCP connection, common code between server and client */ static void tcp_start_session(tcp_session_t *ses) { int val; struct sockaddr_in *si = (struct sockaddr_in *)&ses->tcp_peer_addr; fcntl(ses->tcp_fd, F_SETFL, fcntl(ses->tcp_fd, F_GETFL) | O_NONBLOCK); val = 1; setsockopt(ses->tcp_fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)); val = 30; setsockopt(ses->tcp_fd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)); val = 15; setsockopt(ses->tcp_fd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)); val = 5; setsockopt(ses->tcp_fd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)); val = 1; setsockopt(ses->tcp_fd, SOL_TCP, TCP_NODELAY, &val, sizeof(val)); htsbuf_queue_init(&ses->tcp_q[0], 20 * 1000 * 1000); htsbuf_queue_init(&ses->tcp_q[1], 20 * 1000 * 1000); snprintf(ses->tcp_peer_txt, sizeof(ses->tcp_peer_txt), "%s:%d", inet_ntoa(si->sin_addr), ntohs(si->sin_port)); if(ses->tcp_server == NULL) tvhlog(LOG_INFO, "tcp", "%s: %s%sConnected to %s", ses->tcp_name, ses->tcp_hostname ?: "", ses->tcp_hostname ? ": " : "", ses->tcp_peer_txt); ses->tcp_dispatch_handle = dispatch_addfd(ses->tcp_fd, tcp_socket_callback, ses, DISPATCH_READ); ses->tcp_callback(TCP_CONNECT, ses); } /** * */ static void tcp_client_connected(tcp_session_t *c) { dtimer_disarm(&c->tcp_timer); tcp_start_session(c); } /** * */ static void tcp_client_connect_fail(tcp_session_t *c, int error) { struct sockaddr_in *si = (struct sockaddr_in *)&c->tcp_peer_addr; tvhlog(LOG_ERR, "tcp", "%s: Unable to connect to \"%s\" (%s) : %d -- %s", c->tcp_name, c->tcp_hostname, inet_ntoa(si->sin_addr), ntohs(si->sin_port), strerror(error)); /* Try to reconnect in 10 seconds */ dtimer_arm(&c->tcp_timer, tcp_client_reconnect_timeout, c, 10); } /** * */ static void tcp_client_connect_callback(int events, void *opaque, int fd) { int err; socklen_t errlen = sizeof(int); tcp_session_t *c = opaque; dispatch_delfd(c->tcp_dispatch_handle); c->tcp_dispatch_handle = NULL; getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen); if(err == 0) { tcp_client_connected(c); return; } close(c->tcp_fd); tcp_client_connect_fail(c, err); } /** * We dont want to wait for connect() to time out, so we have our * own timeout */ static void tcp_client_connect_timeout(void *aux, int64_t now) { tcp_session_t *c = aux; close(c->tcp_fd); tcp_client_connect_fail(c, ETIMEDOUT); } /** * */ static void tcp_session_peer_resolved(void *aux, struct sockaddr *so, const char *error) { tcp_session_t *c = aux; struct sockaddr_in *si; c->tcp_resolver = NULL; if(error != NULL) { tvhlog(LOG_ERR, "tcp", "%s: Unable to resolve \"%s\" -- %s", c->tcp_name, c->tcp_hostname, error); /* Try again in 30 seconds */ dtimer_arm(&c->tcp_timer, tcp_client_reconnect_timeout, c, 30); return; } c->tcp_fd = socket(AF_INET, SOCK_STREAM, 0); fcntl(c->tcp_fd, F_SETFL, fcntl(c->tcp_fd, F_GETFL) | O_NONBLOCK); si = (struct sockaddr_in *)&c->tcp_peer_addr; memcpy(si, so, sizeof(struct sockaddr_in)); si->sin_port = htons(c->tcp_port); if(connect(c->tcp_fd, (struct sockaddr *)&c->tcp_peer_addr, sizeof(struct sockaddr_in)) == 0) { tcp_client_connected(c); return; } if(errno == EINPROGRESS) { dtimer_arm(&c->tcp_timer, tcp_client_connect_timeout, c, 10); c->tcp_dispatch_handle = dispatch_addfd(c->tcp_fd, tcp_client_connect_callback, c, DISPATCH_WRITE); return; } close(c->tcp_fd); tcp_client_connect_fail(c, errno); } /** * Start by resolving hostname */ static void tcp_session_try_connect(tcp_session_t *c) { c->tcp_resolver = async_resolve(c->tcp_hostname, tcp_session_peer_resolved, c); } /** * We come here after a failed connection attempt or if we disconnected */ static void tcp_client_reconnect_timeout(void *aux, int64_t now) { tcp_session_t *c = aux; tcp_session_try_connect(c); } /** * Create a TCP based client */ void * tcp_create_client(const char *hostname, int port, size_t session_size, const char *name, tcp_callback_t *cb, int enabled) { tcp_session_t *c = calloc(1, session_size); c->tcp_callback = cb; c->tcp_name = strdup(name); c->tcp_port = port; c->tcp_hostname = hostname ? strdup(hostname) : NULL; c->tcp_enabled = enabled; if(c->tcp_enabled && c->tcp_hostname) tcp_session_try_connect(c); return c; } /** * TCP server connect callback */ static void tcp_server_callback(int events, void *opaque, int fd) { struct sockaddr_in from; socklen_t socklen = sizeof(struct sockaddr_in); int newfd; tcp_session_t *ses; tcp_server_t *srv = opaque; if(!(events & DISPATCH_READ)) return; if((newfd = accept(fd, (struct sockaddr *)&from, &socklen)) == -1) return; /* XXX: Something more clever must be done here */ ses = calloc(1, srv->tcp_session_size); ses->tcp_fd = newfd; ses->tcp_name = strdup(srv->tcp_server_name); ses->tcp_callback = srv->tcp_callback; ses->tcp_server = srv; memcpy(&ses->tcp_peer_addr, &from, socklen); socklen = sizeof(struct sockaddr_storage); if(getsockname(newfd, (struct sockaddr *)&ses->tcp_self_addr, &socklen)) memset(&ses->tcp_self_addr, 0, sizeof(struct sockaddr_storage)); tcp_start_session(ses); } /** * Create a TCP based server */ void tcp_create_server(int port, size_t session_size, const char *name, tcp_callback_t *cb) { struct sockaddr_in sin; int one = 1; tcp_server_t *s = malloc(sizeof(tcp_server_t)); s->tcp_fd = socket(AF_INET, SOCK_STREAM, 0); s->tcp_session_size = session_size; s->tcp_callback = cb; memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(port); setsockopt(s->tcp_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)); fcntl(s->tcp_fd, F_SETFL, fcntl(s->tcp_fd, F_GETFL) | O_NONBLOCK); tvhlog(LOG_INFO, "tcp", "%s: Listening for TCP connections on %s:%d", name, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); if(bind(s->tcp_fd, (struct sockaddr *)&sin, sizeof(sin)) < 0) { tvhlog(LOG_ERR, "tcp", "%s: Unable to bind socket for incomming TCP connections, " "%s:%d -- %s", name, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), strerror(errno)); return; } listen(s->tcp_fd, 1); dispatch_addfd(s->tcp_fd, tcp_server_callback, s, DISPATCH_READ); s->tcp_server_name = strdup(name); } /** * */ void tcp_destroy_client(tcp_session_t *ses) { if(ses->tcp_resolver != NULL) async_resolve_cancel(ses->tcp_resolver); if(ses->tcp_dispatch_handle != NULL) tcp_disconnect(ses, 0); dtimer_disarm(&ses->tcp_timer); free(ses->tcp_name); free(ses->tcp_hostname); free(ses); } /** * */ void tcp_enable_disable(tcp_session_t *ses, int enabled) { if(ses->tcp_enabled == enabled) return; ses->tcp_enabled = enabled; if(enabled && ses->tcp_hostname != NULL) { tcp_session_try_connect(ses); } else { if(ses->tcp_resolver != NULL) { async_resolve_cancel(ses->tcp_resolver); ses->tcp_resolver = NULL; } if(ses->tcp_dispatch_handle != NULL) tcp_disconnect(ses, 0); dtimer_disarm(&ses->tcp_timer); } } /** * */ void tcp_set_hostname(tcp_session_t *ses, const char *hostname) { if(ses->tcp_hostname != NULL) free(ses->tcp_hostname); ses->tcp_hostname = strdup(hostname); } #endif int tcp_write_queue(int fd, htsbuf_queue_t *q) { htsbuf_data_t *hd; int l, r; while((hd = TAILQ_FIRST(&q->hq_q)) != NULL) { TAILQ_REMOVE(&q->hq_q, hd, hd_link); l = hd->hd_data_len - hd->hd_data_off; r = write(fd, hd->hd_data + hd->hd_data_off, l); if(l != r) { fprintf(stderr, "write error\n"); } free(hd->hd_data); free(hd); } q->hq_size = 0; return 0; } /** * */ static int tcp_fill_htsbuf_from_fd(int fd, htsbuf_queue_t *hq) { htsbuf_data_t *hd = TAILQ_LAST(&hq->hq_q, htsbuf_data_queue); int c; if(hd != NULL) { /* Fill out any previous buffer */ c = hd->hd_data_size - hd->hd_data_len; if(c > 0) { c = read(fd, hd->hd_data + hd->hd_data_len, c); if(c < 1) return -1; hd->hd_data_len += c; hq->hq_size += c; return 0; } } hd = malloc(sizeof(htsbuf_data_t)); hd->hd_data_size = 1000; hd->hd_data = malloc(hd->hd_data_size); c = read(fd, hd->hd_data, hd->hd_data_size); if(c < 1) { free(hd->hd_data); free(hd); return -1; } hd->hd_data_len = c; hd->hd_data_off = 0; TAILQ_INSERT_TAIL(&hq->hq_q, hd, hd_link); hq->hq_size += c; return 0; } /** * */ int tcp_read_line(int fd, char *buf, const size_t bufsize, htsbuf_queue_t *spill) { int len; while(1) { len = htsbuf_find(spill, 0xa); if(len == -1) { if(tcp_fill_htsbuf_from_fd(fd, spill) < 0) return -1; continue; } if(len >= bufsize - 1) return -1; htsbuf_read(spill, buf, len); buf[len] = 0; while(len > 0 && buf[len - 1] < 32) buf[--len] = 0; htsbuf_drop(spill, 1); /* Drop the \n */ return 0; } } /** * */ int tcp_read_data(int fd, char *buf, const size_t bufsize, htsbuf_queue_t *spill) { int x, tot = htsbuf_read(spill, buf, bufsize); if(tot == bufsize) return 0; x = recv(fd, buf + tot, bufsize - tot, MSG_WAITALL); if(x != bufsize - tot) return -1; return 0; } /** * */ static int tcp_server_epoll_fd; typedef struct tcp_server { tcp_server_callback_t *start; void *opaque; int serverfd; } tcp_server_t; typedef struct tcp_server_launch_t { tcp_server_callback_t *start; void *opaque; int fd; struct sockaddr_in source; } tcp_server_launch_t; /** * */ static void * tcp_server_start(void *aux) { tcp_server_launch_t *tsl = aux; int val; val = 1; setsockopt(tsl->fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)); val = 30; setsockopt(tsl->fd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)); val = 15; setsockopt(tsl->fd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)); val = 5; setsockopt(tsl->fd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)); val = 1; setsockopt(tsl->fd, SOL_TCP, TCP_NODELAY, &val, sizeof(val)); tsl->start(tsl->fd, tsl->opaque, &tsl->source); free(tsl); return NULL; } /** * */ static void * tcp_server_loop(void *aux) { int r, i; struct epoll_event ev[1]; tcp_server_t *ts; tcp_server_launch_t *tsl; pthread_attr_t attr; pthread_t tid; socklen_t slen; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); while(1) { r = epoll_wait(tcp_server_epoll_fd, ev, sizeof(ev) / sizeof(ev[0]), -1); if(r == -1) { perror("tcp_server: epoll_wait"); continue; } for(i = 0; i < r; i++) { ts = ev[i].data.ptr; if(ev[i].events & EPOLLHUP) { close(ts->serverfd); free(ts); continue; } if(ev[i].events & EPOLLIN) { tsl = malloc(sizeof(tcp_server_launch_t)); tsl->start = ts->start; tsl->opaque = ts->opaque; slen = sizeof(struct sockaddr_in); tsl->fd = accept(ts->serverfd, (struct sockaddr *)&tsl->source, &slen); if(tsl->fd == -1) { perror("accept"); free(tsl); sleep(1); continue; } pthread_create(&tid, &attr, tcp_server_start, tsl); } } } } /** * */ void * tcp_server_create(int port, tcp_server_callback_t *start, void *opaque) { int fd, x; struct epoll_event e; tcp_server_t *ts; struct sockaddr_in s; int one = 1; memset(&e, 0, sizeof(e)); fd = socket(AF_INET, SOCK_STREAM, 0); if(fd == -1) return NULL; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)); memset(&s, 0, sizeof(s)); s.sin_family = AF_INET; s.sin_port = htons(port); x = bind(fd, (struct sockaddr *)&s, sizeof(s)); if(x < 0) { close(fd); return NULL; } listen(fd, 1); ts = malloc(sizeof(tcp_server_t)); ts->serverfd = fd; ts->start = start; ts->opaque = opaque; e.events = EPOLLIN; e.data.ptr = ts; epoll_ctl(tcp_server_epoll_fd, EPOLL_CTL_ADD, fd, &e); printf("Adding fd %d, listening on port %d\n", fd, port); return ts; } /** * */ void tcp_server_init(void) { pthread_t tid; tcp_server_epoll_fd = epoll_create(10); pthread_create(&tid, NULL, tcp_server_loop, NULL); }