diff --git a/Makefile b/Makefile index f604195d..571e2a2d 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ -include ../config.mak SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c \ - subscriptions.c tsmux.c tsdemux.c pes.c buffer.c + subscriptions.c tsmux.c tsdemux.c pes.c buffer.c tcp.c SRCS += pvr.c diff --git a/htsclient.c b/htsclient.c index 789058d7..e13b15d0 100644 --- a/htsclient.c +++ b/htsclient.c @@ -37,9 +37,9 @@ #include "teletext.h" #include "dispatch.h" #include "dvb.h" -#include "strtab.h" #include "buffer.h" #include "tsmux.h" +#include "tcp.h" LIST_HEAD(client_list, client); @@ -51,9 +51,9 @@ struct client_list all_clients; */ typedef struct client { + tcp_session_t c_tcp_session; LIST_ENTRY(client) c_global_link; - int c_fd; int c_streamfd; pthread_t c_ptid; @@ -64,15 +64,6 @@ typedef struct client { struct ref_update_queue c_refq; - int c_pkt_maxsiz; - - char c_input_buf[100]; - int c_input_buf_ptr; - - char *c_title; - - void *c_dispatch_handle; - dtimer_t c_status_timer; void *c_muxer; @@ -83,18 +74,7 @@ typedef struct client { static void client_status_update(void *aux, int64_t now); -static void -cprintf(client_t *c, const char *fmt, ...) -{ - va_list ap; - char buf[5000]; - - va_start(ap, fmt); - vsnprintf(buf, sizeof(buf), fmt, ap); - va_end(ap); - - write(c->c_fd, buf, strlen(buf)); -} +#define cprintf(c, fmt...) tcp_printf(&(c)->c_tcp_session, fmt) void @@ -524,7 +504,7 @@ cr_streamport(client_t *c, char **argv, int argc) c->c_port = atoi(argv[1]); syslog(LOG_INFO, "%s registers UDP stream target %s:%d", - c->c_title, inet_ntoa(c->c_ipaddr), c->c_port); + tcp_logname(&c->c_tcp_session), inet_ntoa(c->c_ipaddr), c->c_port); return 0; } @@ -593,14 +573,6 @@ cr_event_info(client_t *c, char **argv, int argc) * */ -static struct strtab recoptab[] = { - { "once", RECOP_ONCE }, - { "daily", RECOP_DAILY }, - { "weekly", RECOP_WEEKLY }, - { "cancel", RECOP_CANCEL }, - { "toggle", RECOP_TOGGLE } -}; - static int cr_event_record(client_t *c, char **argv, int argc) { @@ -610,7 +582,7 @@ cr_event_record(client_t *c, char **argv, int argc) if(argc < 2) return 1; - op = str2val(argv[1], recoptab); + op = pvr_op2int(argv[1]); if(op == -1) return 1; @@ -745,9 +717,10 @@ const struct { }; -static void -client_req(client_t *c, char *buf) +static int +client_req(void *aux, char *buf) { + client_t *c = aux; int i, l, x; const char *n; char *argv[40]; @@ -776,28 +749,23 @@ client_req(client_t *c, char *buf) if(x >= 0) cprintf(c, "eom %s\n", x ? "error" : "ok"); - return; + return 0; } } cprintf(c, "eom nocommand\n"); + return 0; } /* - * client error + * client disconnect */ static void -client_teardown(client_t *c, int err) +client_disconnect(client_t *c) { th_subscription_t *s; - syslog(LOG_INFO, "%s disconnected -- %s", c->c_title, strerror(err)); - dtimer_disarm(&c->c_status_timer); - dispatch_delfd(c->c_dispatch_handle); - - close(c->c_fd); - if(c->c_streamfd != -1) close(c->c_streamfd); @@ -807,142 +775,36 @@ client_teardown(client_t *c, int err) LIST_REMOVE(s, ths_subscriber_link); subscription_unsubscribe(s); } - free(c->c_title); - free(c); } -/* - * data available on socket - */ -static void -client_data_read(client_t *c) -{ - int space = sizeof(c->c_input_buf) - c->c_input_buf_ptr - 1; - int r, cr = 0, i; - char buf[100]; - - if(space < 1) { - client_teardown(c, EBADMSG); - return; - } - - r = read(c->c_fd, c->c_input_buf + c->c_input_buf_ptr, space); - if(r < 0) { - client_teardown(c, errno); - return; - } - - if(r == 0) { - client_teardown(c, ECONNRESET); - return; - } - - c->c_input_buf_ptr += r; - c->c_input_buf[c->c_input_buf_ptr] = 0; - - while(1) { - cr = 0; - - for(i = 0; i < c->c_input_buf_ptr; i++) - if(c->c_input_buf[i] == 0xa) - break; - - if(i == c->c_input_buf_ptr) - break; - - memcpy(buf, c->c_input_buf, i); - buf[i] = 0; - i++; - memmove(c->c_input_buf, c->c_input_buf + i, sizeof(c->c_input_buf) - i); - c->c_input_buf_ptr -= i; - - i = strlen(buf); - while(i > 0 && buf[i-1] < 32) - buf[--i] = 0; - // printf("buf = |%s|\n", buf); - client_req(c, buf); - } -} - - -/* - * dispatcher callback - */ -static void -client_socket_callback(int events, void *opaque, int fd) -{ - client_t *c = opaque; - - if(events & DISPATCH_ERR) { - client_teardown(c, ECONNRESET); - return; - } - - if(events & DISPATCH_READ) - client_data_read(c); -} - - /* * */ static void -client_connect_callback(int events, void *opaque, int fd) +htsclient_tcp_callback(tcpevent_t event, void *tcpsession) { - struct sockaddr_in from; - socklen_t socklen = sizeof(struct sockaddr_in); - int newfd; - int val; - client_t *c; - char txt[30]; + client_t *c = tcpsession; - if(!(events & DISPATCH_READ)) - return; + switch(event) { + case TCP_CONNECT: + TAILQ_INIT(&c->c_refq); + LIST_INSERT_HEAD(&all_clients, c, c_global_link); + c->c_streamfd = -1; + dtimer_arm(&c->c_status_timer, client_status_update, c, 1); + break; - newfd = accept(fd, (struct sockaddr *)&from, &socklen); - if(newfd == -1) - return; - - fcntl(newfd, F_SETFL, fcntl(newfd, F_GETFL) | O_NONBLOCK); + case TCP_DISCONNECT: + client_disconnect(c); + break; - val = 1; - setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)); - - val = 30; - setsockopt(newfd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)); - - val = 15; - setsockopt(newfd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)); - - val = 5; - setsockopt(newfd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)); - - val = 1; - setsockopt(newfd, SOL_TCP, TCP_NODELAY, &val, sizeof(val)); - - c = calloc(1, sizeof(client_t)); - c->c_fd = newfd; - c->c_pkt_maxsiz = 188 * 7; - TAILQ_INIT(&c->c_refq); - LIST_INSERT_HEAD(&all_clients, c, c_global_link); - c->c_streamfd = -1; //socket(AF_INET, SOCK_DGRAM, 0); - - snprintf(txt, sizeof(txt), "%s:%d", - inet_ntoa(from.sin_addr), ntohs(from.sin_port)); - - c->c_title = strdup(txt); - - syslog(LOG_INFO, "Got TCP connection from %s", c->c_title); - - c->c_dispatch_handle = dispatch_addfd(newfd, client_socket_callback, c, - DISPATCH_READ); - - dtimer_arm(&c->c_status_timer, client_status_update, c, 1); + case TCP_INPUT: + tcp_line_read(&c->c_tcp_session, client_req); + break; + } } - /* * Fire up client handling */ @@ -950,32 +812,8 @@ client_connect_callback(int events, void *opaque, int fd) void client_start(void) { - struct sockaddr_in sin; - int s; - int one = 1; - s = socket(AF_INET, SOCK_STREAM, 0); - memset(&sin, 0, sizeof(sin)); - - sin.sin_family = AF_INET; - sin.sin_port = htons(9909); - - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)); - - fcntl(s, F_SETFL, fcntl(s, F_GETFL) | O_NONBLOCK); - - syslog(LOG_INFO, "Listening for TCP connections on %s:%d", - inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); - - if(bind(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) { - syslog(LOG_ERR, "Unable to bind socket for incomming TCP connections" - "%s:%d -- %s", - inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), - strerror(errno)); - return; - } - - listen(s, 1); - dispatch_addfd(s, client_connect_callback, NULL, DISPATCH_READ); + tcp_create_server(9909, sizeof(client_t), "htsclient", + htsclient_tcp_callback); } diff --git a/rtsp.c b/rtsp.c index 1a0b8389..063defec 100644 --- a/rtsp.c +++ b/rtsp.c @@ -39,11 +39,14 @@ #include "strtab.h" #include "rtp.h" #include "tsmux.h" +#include "tcp.h" #include #include #include +#define rcprintf(c, fmt...) tcp_printf(&(rc)->rc_tcp_session, fmt) + static AVRandomState rtsp_rnd; #define RTSP_MAX_LINE_LEN 1000 @@ -82,8 +85,7 @@ typedef struct rtsp_arg { typedef struct rtsp_connection { - int rc_fd; - void *rc_dispatch_handle; + tcp_session_t rc_tcp_session; /* Must be first */ char *rc_url; LIST_HEAD(, rtsp_arg) rc_args; @@ -106,14 +108,6 @@ typedef struct rtsp_connection { } rc_cmd; struct rtsp_session_head rc_sessions; - - int rc_input_buf_ptr; - char rc_input_buf[RTSP_MAX_LINE_LEN]; - struct sockaddr_in rc_from; - - char *rc_logname; /* Printable name used when logging stuff related - to this connection */ - } rtsp_connection_t; @@ -261,21 +255,6 @@ rtsp_session_destroy(rtsp_session_t *rs) free(rs); } -/* - * Prints data on rtsp connection - */ -static void -rcprintf(rtsp_connection_t *rc, const char *fmt, ...) -{ - va_list ap; - char buf[5000]; - - va_start(ap, fmt); - vsnprintf(buf, sizeof(buf), fmt, ap); - va_end(ap); - - write(rc->rc_fd, buf, strlen(buf)); -} /* @@ -315,7 +294,6 @@ static void rtsp_con_set_arg(rtsp_connection_t *rc, char *key, char *val) { rtsp_arg_t *ra; - char buf[100]; LIST_FOREACH(ra, &rc->rc_args, link) if(!strcasecmp(ra->key, key)) @@ -329,7 +307,7 @@ rtsp_con_set_arg(rtsp_connection_t *rc, char *key, char *val) free(ra->val); } ra->val = strdup(val); - +#if 0 if(!strcasecmp(key, "User-Agent")) { free(rc->rc_logname); @@ -338,6 +316,7 @@ rtsp_con_set_arg(rtsp_connection_t *rc, char *key, char *val) val); rc->rc_logname = strdup(buf); } +#endif } @@ -402,7 +381,7 @@ rtsp_reply_error(rtsp_connection_t *rc, int error, const char *errstr) if(errstr == NULL) errstr = rtsp_err2str(error); - syslog(LOG_INFO, "rtsp: %s: %s", rc->rc_logname, errstr); + syslog(LOG_INFO, "rtsp: %s: %s", tcp_logname(&rc->rc_tcp_session), errstr); rcprintf(rc, "RTSP/1.0 %d %s\r\n", error, errstr); if((c = rtsp_con_get_arg(rc, "cseq")) != NULL) @@ -474,7 +453,7 @@ rtsp_cmd_play(rtsp_connection_t *rc) ts_muxer_play(rs->rs_muxer, start); syslog(LOG_INFO, "rtsp: %s: Starting playback of %s", - rc->rc_logname, rs->rs_s->ths_channel->ch_name); + tcp_logname(&rc->rc_tcp_session), rs->rs_s->ths_channel->ch_name); rcprintf(rc, "RTSP/1.0 200 OK\r\n" @@ -510,7 +489,7 @@ rtsp_cmd_pause(rtsp_connection_t *rc) ts_muxer_pause(rs->rs_muxer); syslog(LOG_INFO, "rtsp: %s: Pausing playback of %s", - rc->rc_logname, rs->rs_s->ths_channel->ch_name); + tcp_logname(&rc->rc_tcp_session), rs->rs_s->ths_channel->ch_name); rcprintf(rc, "RTSP/1.0 200 OK\r\n" @@ -594,7 +573,7 @@ rtsp_cmd_setup(rtsp_connection_t *rc) return; } - dst = rc->rc_from; + memcpy(&dst, &rc->rc_tcp_session.tcp_peer_addr, sizeof(struct sockaddr_in)); dst.sin_port = htons(client_ports[0]); if((rs = rtsp_session_create(ch, &dst)) == NULL) { @@ -713,8 +692,9 @@ rtsp_cmd_teardown(rtsp_connection_t *rc) * RTSP connection state machine & parser */ static int -rtsp_con_parse(rtsp_connection_t *rc, char *buf) +rtsp_con_parse(void *aux, char *buf) { + rtsp_connection_t *rc = aux; int n; char *argv[3], *c; @@ -777,158 +757,45 @@ rtsp_con_parse(rtsp_connection_t *rc, char *buf) /* - * client error, teardown connection + * disconnect */ static void -rtsp_con_teardown(rtsp_connection_t *rc, int err) +rtsp_disconnect(rtsp_connection_t *rc) { rtsp_session_t *rs; - syslog(LOG_INFO, "rtsp: %s: disconnected -- %s", - rc->rc_logname, strerror(err)); - + + rtsp_con_flush_args(rc); + while((rs = LIST_FIRST(&rc->rc_sessions)) != NULL) rtsp_session_destroy(rs); - close(dispatch_delfd(rc->rc_dispatch_handle)); - free(rc->rc_url); - free(rc->rc_logname); - rtsp_con_flush_args(rc); - free(rc); } -/* - * data available on socket - */ -static void -rtsp_con_data_read(rtsp_connection_t *rc) -{ - int space = sizeof(rc->rc_input_buf) - rc->rc_input_buf_ptr - 1; - int r, cr = 0, i, err; - char buf[RTSP_MAX_LINE_LEN]; - - if(space < 1) { - rtsp_con_teardown(rc, EBADMSG); - return; - } - - r = read(rc->rc_fd, rc->rc_input_buf + rc->rc_input_buf_ptr, space); - if(r < 0) { - rtsp_con_teardown(rc, errno); - return; - } - - if(r == 0) { - rtsp_con_teardown(rc, ECONNRESET); - return; - } - - rc->rc_input_buf_ptr += r; - rc->rc_input_buf[rc->rc_input_buf_ptr] = 0; - - while(1) { - cr = 0; - - for(i = 0; i < rc->rc_input_buf_ptr; i++) - if(rc->rc_input_buf[i] == 0xa) - break; - - if(i == rc->rc_input_buf_ptr) - break; - - memcpy(buf, rc->rc_input_buf, i); - buf[i] = 0; - i++; - memmove(rc->rc_input_buf, rc->rc_input_buf + i, - sizeof(rc->rc_input_buf) - i); - rc->rc_input_buf_ptr -= i; - - i = strlen(buf); - while(i > 0 && buf[i-1] < 32) - buf[--i] = 0; - - if((err = rtsp_con_parse(rc, buf)) != 0) { - rtsp_con_teardown(rc, err); - break; - } - } -} - - -/* - * dispatcher callback - */ -static void -rtsp_con_socket_callback(int events, void *opaque, int fd) -{ - rtsp_connection_t *rc = opaque; - - if(events & DISPATCH_ERR) { - rtsp_con_teardown(rc, ECONNRESET); - return; - } - - if(events & DISPATCH_READ) - rtsp_con_data_read(rc); -} - - /* * */ static void -rtsp_connect_callback(int events, void *opaque, int fd) +rtsp_tcp_callback(tcpevent_t event, void *tcpsession) { - struct sockaddr_in from; - socklen_t socklen = sizeof(struct sockaddr_in); - int newfd; - int val; - rtsp_connection_t *rc; - char txt[30]; + rtsp_connection_t *rc = tcpsession; - if(!(events & DISPATCH_READ)) - return; + switch(event) { + case TCP_CONNECT: + break; - newfd = accept(fd, (struct sockaddr *)&from, &socklen); - if(newfd == -1) - return; - - fcntl(newfd, F_SETFL, fcntl(newfd, F_GETFL) | O_NONBLOCK); + case TCP_DISCONNECT: + rtsp_disconnect(rc); + break; - val = 1; - setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)); - - val = 30; - setsockopt(newfd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)); - - val = 15; - setsockopt(newfd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)); - - val = 5; - setsockopt(newfd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)); - - val = 1; - setsockopt(newfd, SOL_TCP, TCP_NODELAY, &val, sizeof(val)); - - rc = calloc(1, sizeof(rtsp_connection_t)); - rc->rc_fd = newfd; - - snprintf(txt, sizeof(txt), "%s:%d", - inet_ntoa(from.sin_addr), ntohs(from.sin_port)); - - rc->rc_logname = strdup(txt); - - syslog(LOG_INFO, "rtsp: %s: connected", rc->rc_logname); - - rc->rc_from = from; - - rc->rc_dispatch_handle = dispatch_addfd(newfd, rtsp_con_socket_callback, rc, - DISPATCH_READ); + case TCP_INPUT: + tcp_line_read(&rc->rc_tcp_session, rtsp_con_parse); + break; + } } - /* * Fire up RTSP server */ @@ -936,33 +803,7 @@ rtsp_connect_callback(int events, void *opaque, int fd) void rtsp_start(void) { - struct sockaddr_in sin; - int s; - int one = 1; - s = socket(AF_INET, SOCK_STREAM, 0); - memset(&sin, 0, sizeof(sin)); - - sin.sin_family = AF_INET; - sin.sin_port = htons(9908); - - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)); - - fcntl(s, F_SETFL, fcntl(s, F_GETFL) | O_NONBLOCK); - - syslog(LOG_INFO, "rtsp: Listening for RTSP/TCP connections on %s:%d", - inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); - - if(bind(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) { - syslog(LOG_ERR, - "rtsp: Unable to bind socket for incomming RTSP/TCP connections" - "%s:%d -- %s", - inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), - strerror(errno)); - return; - } - av_init_random(time(NULL), &rtsp_rnd); - - listen(s, 1); - dispatch_addfd(s, rtsp_connect_callback, NULL, DISPATCH_READ); + tcp_create_server(9908, sizeof(rtsp_connection_t), "rtsp", + rtsp_tcp_callback); } diff --git a/tcp.c b/tcp.c new file mode 100644 index 00000000..b7ef52b3 --- /dev/null +++ b/tcp.c @@ -0,0 +1,381 @@ +/* + * tvheadend, TCP common functions + * Copyright (C) 2007 Andreas Öman + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "dispatch.h" +#include "tcp.h" + + +/* + * printfs data on a TCP connection + */ +void +tcp_printf(tcp_session_t *ses, const char *fmt, ...) +{ + va_list ap; + char buf[5000]; + void *out; + int l; + + va_start(ap, fmt); + l = vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + out = malloc(l); + memcpy(out, buf, l); + + tcp_send_msg(ses, &ses->tcp_q_hi, out, l); +} + + + +/** + * Line parser for TCP based connections. Note that callback cannot + * destroy the session on its own. It must return an error to perform + * disconnect. + */ +void +tcp_line_read(tcp_session_t *ses, tcp_line_input_t *callback) +{ + int space = sizeof(ses->tcp_input_buf) - ses->tcp_input_buf_ptr - 1; + int r, cr = 0, i, err; + char buf[TCP_MAX_LINE_LEN]; + + if(space < 1) { + tcp_disconnect(ses, EBADMSG); + return; + } + + r = read(ses->tcp_fd, ses->tcp_input_buf + ses->tcp_input_buf_ptr, space); + if(r < 1) { + tcp_disconnect(ses, r == 0 ? ECONNRESET : errno); + return; + } + + ses->tcp_input_buf_ptr += r; + ses->tcp_input_buf[ses->tcp_input_buf_ptr] = 0; + + while(1) { + cr = 0; + + for(i = 0; i < ses->tcp_input_buf_ptr; i++) + if(ses->tcp_input_buf[i] == 0xa) + break; + + if(i == ses->tcp_input_buf_ptr) + break; + + memcpy(buf, ses->tcp_input_buf, i); + buf[i] = 0; + i++; + memmove(ses->tcp_input_buf, ses->tcp_input_buf + i, + sizeof(ses->tcp_input_buf) - i); + ses->tcp_input_buf_ptr -= i; + + i = strlen(buf); + while(i > 0 && buf[i-1] < 32) + buf[--i] = 0; + + if((err = callback(ses, buf)) != 0) { + tcp_disconnect(ses, err); + break; + } + } +} + + + +/** + * Create an output queue + */ +static void +tcp_init_queue(tcp_queue_t *tq, int maxdepth) +{ + TAILQ_INIT(&tq->tq_messages); + tq->tq_depth = 0; + tq->tq_maxdepth = maxdepth; +} + +/** + * Flusing all pending data from a queue + */ +static void +tcp_flush_queue(tcp_queue_t *tq) +{ + tcp_data_t *td; + + while((td = TAILQ_FIRST(&tq->tq_messages)) != NULL) { + TAILQ_REMOVE(&tq->tq_messages, td, td_link); + free((void *)td->td_data); + free(td); + } +} + + +/** + * Transmit data from any of the queues + * Select hi-pri queue first if possible, but always stick on the same + * queue until a whole message has been sent + */ +static void +tcp_transmit(tcp_session_t *ses) +{ + tcp_queue_t *q = ses->tcp_q_current; + tcp_data_t *hd; + int r; + + again: + if(q == NULL) { + if(ses->tcp_q_hi.tq_depth) + q = &ses->tcp_q_hi; + if(ses->tcp_q_low.tq_depth) + q = &ses->tcp_q_low; + } + + while(q != NULL) { + hd = TAILQ_FIRST(&q->tq_messages); + + r = write(ses->tcp_fd, hd->td_data + hd->td_offset, + hd->td_datalen - hd->td_offset); + + if(r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) + r = 0; + + if(r == 0) + break; + + if(r == -1) { + tcp_disconnect(ses, errno); + return; + } + q->tq_depth -= r; + hd->td_offset += r; + + if(hd->td_offset == hd->td_datalen) { + TAILQ_REMOVE(&q->tq_messages, hd, td_link); + free((void *)hd->td_data); + free(hd); + q = NULL; + goto again; + } + } + + if(q == NULL) { + if(ses->tcp_blocked) { + dispatch_clr(ses->tcp_dispatch_handle, DISPATCH_WRITE); + ses->tcp_blocked = 0; + } + } else { + if(!ses->tcp_blocked) { + dispatch_set(ses->tcp_dispatch_handle, DISPATCH_WRITE); + ses->tcp_blocked = 1; + } + } + ses->tcp_q_current = q; +} + +/** + * Enqueue a message and start transmission if nothing currently is + * being sent. + */ +int +tcp_send_msg(tcp_session_t *ses, tcp_queue_t *tq, const void *data, + size_t len) +{ + tcp_data_t *td; + + if(tq == NULL) + tq = &ses->tcp_q_low; + + if(len > tq->tq_maxdepth - tq->tq_depth) { + free((void *)data); + return -1; + } + + td = malloc(sizeof(tcp_data_t)); + td->td_offset = 0; + td->td_datalen = len; + td->td_data = data; + TAILQ_INSERT_TAIL(&tq->tq_messages, td, td_link); + tq->tq_depth += td->td_datalen; + + if(!ses->tcp_blocked) + tcp_transmit(ses); + + return 0; +} + + + +/** + * Disconnect handler + */ +void +tcp_disconnect(tcp_session_t *ses, int err) +{ + tcp_server_t *srv = ses->tcp_server; + + tcp_flush_queue(&ses->tcp_q_low); + tcp_flush_queue(&ses->tcp_q_hi); + + srv->tcp_callback(TCP_DISCONNECT, ses); + + syslog(LOG_INFO, "%s: %s: disconnected -- %s", + srv->tcp_server_name, ses->tcp_peer_txt, strerror(err)); + + close(dispatch_delfd(ses->tcp_dispatch_handle)); + free(ses); +} + + +/** + * Dispatcher callback + */ +static void +tcp_socket_callback(int events, void *opaque, int fd) +{ + tcp_session_t *ses = opaque; + tcp_server_t *srv = ses->tcp_server; + + if(events & DISPATCH_ERR) { + tcp_disconnect(ses, ECONNRESET); + return; + } + + if(events & DISPATCH_READ) + srv->tcp_callback(TCP_INPUT, ses); + + if(events & DISPATCH_WRITE) + tcp_transmit(ses); +} + + + +/** + * TCP connect callback + */ +static void +tcp_connect_callback(int events, void *opaque, int fd) +{ + struct sockaddr_in from; + socklen_t socklen = sizeof(struct sockaddr_in); + int newfd; + int val; + tcp_session_t *ses; + tcp_server_t *srv = opaque; + + if(!(events & DISPATCH_READ)) + return; + + if((newfd = accept(fd, (struct sockaddr *)&from, &socklen)) == -1) + return; + + fcntl(newfd, F_SETFL, fcntl(newfd, F_GETFL) | O_NONBLOCK); + + val = 1; + setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)); + + val = 30; + setsockopt(newfd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val)); + + val = 15; + setsockopt(newfd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val)); + + val = 5; + setsockopt(newfd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val)); + + val = 1; + setsockopt(newfd, SOL_TCP, TCP_NODELAY, &val, sizeof(val)); + + + ses = calloc(1, srv->tcp_session_size); + tcp_init_queue(&ses->tcp_q_hi, 20 * 1000 * 1000); + tcp_init_queue(&ses->tcp_q_low, 20 * 1000 * 1000); + + ses->tcp_server = srv; + + memcpy(&ses->tcp_peer_addr, &from, socklen); + snprintf(ses->tcp_peer_txt, sizeof(ses->tcp_peer_txt), "%s:%d", + inet_ntoa(from.sin_addr), ntohs(from.sin_port)); + + syslog(LOG_INFO, "%s: %s: connected", + srv->tcp_server_name, ses->tcp_peer_txt); + + ses->tcp_fd = newfd; + + ses->tcp_dispatch_handle = dispatch_addfd(newfd, tcp_socket_callback, + ses, DISPATCH_READ); + srv->tcp_callback(TCP_CONNECT, ses); +} + + + +/** + * Create a TCP based server + */ +void +tcp_create_server(int port, size_t session_size, const char *name, + tcp_callback_t *cb) +{ + struct sockaddr_in sin; + int one = 1; + + tcp_server_t *s = malloc(sizeof(tcp_server_t)); + + s->tcp_fd = socket(AF_INET, SOCK_STREAM, 0); + s->tcp_session_size = session_size; + s->tcp_callback = cb; + + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + + setsockopt(s->tcp_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int)); + + fcntl(s->tcp_fd, F_SETFL, fcntl(s->tcp_fd, F_GETFL) | O_NONBLOCK); + + syslog(LOG_INFO, "%s: Listening for TCP connections on %s:%d", + name, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); + + if(bind(s->tcp_fd, (struct sockaddr *)&sin, sizeof(sin)) < 0) { + syslog(LOG_ERR, + "%s: Unable to bind socket for incomming TCP connections, " + "%s:%d -- %s", + name, + inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), + strerror(errno)); + return; + } + + listen(s->tcp_fd, 1); + dispatch_addfd(s->tcp_fd, tcp_connect_callback, s, DISPATCH_READ); + + s->tcp_server_name = strdup(name); +} diff --git a/tcp.h b/tcp.h new file mode 100644 index 00000000..47bf2297 --- /dev/null +++ b/tcp.h @@ -0,0 +1,91 @@ +/* + * tvheadend, TCP common functions + * Copyright (C) 2007 Andreas Öman + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef TCP_H_ +#define TCP_H_ + +TAILQ_HEAD(tcp_data_queue, tcp_data); + +typedef struct tcp_data { + TAILQ_ENTRY(tcp_data) td_link; + const void *td_data; + unsigned int td_datalen; + int td_offset; +} tcp_data_t; + +typedef struct tcp_queue { + struct tcp_data_queue tq_messages; + int tq_depth; + int tq_maxdepth; +} tcp_queue_t; + +typedef enum { + TCP_CONNECT, + TCP_DISCONNECT, + TCP_INPUT, +} tcpevent_t; + +typedef void (tcp_callback_t)(tcpevent_t event, void *tcpsession); +typedef int (tcp_line_input_t)(void *tcpsession, char *line); + +typedef struct tcpserver { + tcp_callback_t *tcp_callback; + int tcp_fd; + size_t tcp_session_size; + const char *tcp_server_name; +} tcp_server_t; + +#define TCP_MAX_LINE_LEN 256 + +typedef struct tcp_session { + void *tcp_dispatch_handle; + int tcp_fd; + struct sockaddr_storage tcp_peer_addr; + char tcp_peer_txt[100]; + tcp_server_t *tcp_server; + + /* Output queueing */ + + int tcp_blocked; + tcp_queue_t tcp_q_hi; + tcp_queue_t tcp_q_low; + + tcp_queue_t *tcp_q_current; + + /* Input line parser */ + + int tcp_input_buf_ptr; + char tcp_input_buf[TCP_MAX_LINE_LEN]; + +} tcp_session_t; + +void tcp_disconnect(tcp_session_t *ses, int err); + +void tcp_create_server(int port, size_t session_size, const char *name, + tcp_callback_t *cb); + +void tcp_line_read(tcp_session_t *ses, tcp_line_input_t *callback); + +#define tcp_logname(ses) ((ses)->tcp_peer_txt) + +int tcp_send_msg(tcp_session_t *ses, tcp_queue_t *tq, const void *data, + size_t len); + +void tcp_printf(tcp_session_t *ses, const char *fmt, ...); + +#endif /* TCP_H_ */