factor out common TCP code to a file of its own

This commit is contained in:
Andreas Öman 2007-10-31 16:10:40 +00:00
parent 45d09db30a
commit 08ee05a41e
5 changed files with 534 additions and 383 deletions

View file

@ -1,7 +1,7 @@
-include ../config.mak
SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c \
subscriptions.c tsmux.c tsdemux.c pes.c buffer.c
subscriptions.c tsmux.c tsdemux.c pes.c buffer.c tcp.c
SRCS += pvr.c

View file

@ -37,9 +37,9 @@
#include "teletext.h"
#include "dispatch.h"
#include "dvb.h"
#include "strtab.h"
#include "buffer.h"
#include "tsmux.h"
#include "tcp.h"
LIST_HEAD(client_list, client);
@ -51,9 +51,9 @@ struct client_list all_clients;
*/
typedef struct client {
tcp_session_t c_tcp_session;
LIST_ENTRY(client) c_global_link;
int c_fd;
int c_streamfd;
pthread_t c_ptid;
@ -64,15 +64,6 @@ typedef struct client {
struct ref_update_queue c_refq;
int c_pkt_maxsiz;
char c_input_buf[100];
int c_input_buf_ptr;
char *c_title;
void *c_dispatch_handle;
dtimer_t c_status_timer;
void *c_muxer;
@ -83,18 +74,7 @@ typedef struct client {
static void client_status_update(void *aux, int64_t now);
static void
cprintf(client_t *c, const char *fmt, ...)
{
va_list ap;
char buf[5000];
va_start(ap, fmt);
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
write(c->c_fd, buf, strlen(buf));
}
#define cprintf(c, fmt...) tcp_printf(&(c)->c_tcp_session, fmt)
void
@ -524,7 +504,7 @@ cr_streamport(client_t *c, char **argv, int argc)
c->c_port = atoi(argv[1]);
syslog(LOG_INFO, "%s registers UDP stream target %s:%d",
c->c_title, inet_ntoa(c->c_ipaddr), c->c_port);
tcp_logname(&c->c_tcp_session), inet_ntoa(c->c_ipaddr), c->c_port);
return 0;
}
@ -593,14 +573,6 @@ cr_event_info(client_t *c, char **argv, int argc)
*
*/
static struct strtab recoptab[] = {
{ "once", RECOP_ONCE },
{ "daily", RECOP_DAILY },
{ "weekly", RECOP_WEEKLY },
{ "cancel", RECOP_CANCEL },
{ "toggle", RECOP_TOGGLE }
};
static int
cr_event_record(client_t *c, char **argv, int argc)
{
@ -610,7 +582,7 @@ cr_event_record(client_t *c, char **argv, int argc)
if(argc < 2)
return 1;
op = str2val(argv[1], recoptab);
op = pvr_op2int(argv[1]);
if(op == -1)
return 1;
@ -745,9 +717,10 @@ const struct {
};
static void
client_req(client_t *c, char *buf)
static int
client_req(void *aux, char *buf)
{
client_t *c = aux;
int i, l, x;
const char *n;
char *argv[40];
@ -776,28 +749,23 @@ client_req(client_t *c, char *buf)
if(x >= 0)
cprintf(c, "eom %s\n", x ? "error" : "ok");
return;
return 0;
}
}
cprintf(c, "eom nocommand\n");
return 0;
}
/*
* client error
* client disconnect
*/
static void
client_teardown(client_t *c, int err)
client_disconnect(client_t *c)
{
th_subscription_t *s;
syslog(LOG_INFO, "%s disconnected -- %s", c->c_title, strerror(err));
dtimer_disarm(&c->c_status_timer);
dispatch_delfd(c->c_dispatch_handle);
close(c->c_fd);
if(c->c_streamfd != -1)
close(c->c_streamfd);
@ -807,142 +775,36 @@ client_teardown(client_t *c, int err)
LIST_REMOVE(s, ths_subscriber_link);
subscription_unsubscribe(s);
}
free(c->c_title);
free(c);
}
/*
* data available on socket
*/
static void
client_data_read(client_t *c)
{
int space = sizeof(c->c_input_buf) - c->c_input_buf_ptr - 1;
int r, cr = 0, i;
char buf[100];
if(space < 1) {
client_teardown(c, EBADMSG);
return;
}
r = read(c->c_fd, c->c_input_buf + c->c_input_buf_ptr, space);
if(r < 0) {
client_teardown(c, errno);
return;
}
if(r == 0) {
client_teardown(c, ECONNRESET);
return;
}
c->c_input_buf_ptr += r;
c->c_input_buf[c->c_input_buf_ptr] = 0;
while(1) {
cr = 0;
for(i = 0; i < c->c_input_buf_ptr; i++)
if(c->c_input_buf[i] == 0xa)
break;
if(i == c->c_input_buf_ptr)
break;
memcpy(buf, c->c_input_buf, i);
buf[i] = 0;
i++;
memmove(c->c_input_buf, c->c_input_buf + i, sizeof(c->c_input_buf) - i);
c->c_input_buf_ptr -= i;
i = strlen(buf);
while(i > 0 && buf[i-1] < 32)
buf[--i] = 0;
// printf("buf = |%s|\n", buf);
client_req(c, buf);
}
}
/*
* dispatcher callback
*/
static void
client_socket_callback(int events, void *opaque, int fd)
{
client_t *c = opaque;
if(events & DISPATCH_ERR) {
client_teardown(c, ECONNRESET);
return;
}
if(events & DISPATCH_READ)
client_data_read(c);
}
/*
*
*/
static void
client_connect_callback(int events, void *opaque, int fd)
htsclient_tcp_callback(tcpevent_t event, void *tcpsession)
{
struct sockaddr_in from;
socklen_t socklen = sizeof(struct sockaddr_in);
int newfd;
int val;
client_t *c;
char txt[30];
client_t *c = tcpsession;
if(!(events & DISPATCH_READ))
return;
switch(event) {
case TCP_CONNECT:
TAILQ_INIT(&c->c_refq);
LIST_INSERT_HEAD(&all_clients, c, c_global_link);
c->c_streamfd = -1;
dtimer_arm(&c->c_status_timer, client_status_update, c, 1);
break;
newfd = accept(fd, (struct sockaddr *)&from, &socklen);
if(newfd == -1)
return;
fcntl(newfd, F_SETFL, fcntl(newfd, F_GETFL) | O_NONBLOCK);
case TCP_DISCONNECT:
client_disconnect(c);
break;
val = 1;
setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
val = 30;
setsockopt(newfd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val));
val = 15;
setsockopt(newfd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val));
val = 5;
setsockopt(newfd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val));
val = 1;
setsockopt(newfd, SOL_TCP, TCP_NODELAY, &val, sizeof(val));
c = calloc(1, sizeof(client_t));
c->c_fd = newfd;
c->c_pkt_maxsiz = 188 * 7;
TAILQ_INIT(&c->c_refq);
LIST_INSERT_HEAD(&all_clients, c, c_global_link);
c->c_streamfd = -1; //socket(AF_INET, SOCK_DGRAM, 0);
snprintf(txt, sizeof(txt), "%s:%d",
inet_ntoa(from.sin_addr), ntohs(from.sin_port));
c->c_title = strdup(txt);
syslog(LOG_INFO, "Got TCP connection from %s", c->c_title);
c->c_dispatch_handle = dispatch_addfd(newfd, client_socket_callback, c,
DISPATCH_READ);
dtimer_arm(&c->c_status_timer, client_status_update, c, 1);
case TCP_INPUT:
tcp_line_read(&c->c_tcp_session, client_req);
break;
}
}
/*
* Fire up client handling
*/
@ -950,32 +812,8 @@ client_connect_callback(int events, void *opaque, int fd)
void
client_start(void)
{
struct sockaddr_in sin;
int s;
int one = 1;
s = socket(AF_INET, SOCK_STREAM, 0);
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(9909);
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int));
fcntl(s, F_SETFL, fcntl(s, F_GETFL) | O_NONBLOCK);
syslog(LOG_INFO, "Listening for TCP connections on %s:%d",
inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
if(bind(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
syslog(LOG_ERR, "Unable to bind socket for incomming TCP connections"
"%s:%d -- %s",
inet_ntoa(sin.sin_addr), ntohs(sin.sin_port),
strerror(errno));
return;
}
listen(s, 1);
dispatch_addfd(s, client_connect_callback, NULL, DISPATCH_READ);
tcp_create_server(9909, sizeof(client_t), "htsclient",
htsclient_tcp_callback);
}

221
rtsp.c
View file

@ -39,11 +39,14 @@
#include "strtab.h"
#include "rtp.h"
#include "tsmux.h"
#include "tcp.h"
#include <ffmpeg/avformat.h>
#include <ffmpeg/rtspcodes.h>
#include <ffmpeg/random.h>
#define rcprintf(c, fmt...) tcp_printf(&(rc)->rc_tcp_session, fmt)
static AVRandomState rtsp_rnd;
#define RTSP_MAX_LINE_LEN 1000
@ -82,8 +85,7 @@ typedef struct rtsp_arg {
typedef struct rtsp_connection {
int rc_fd;
void *rc_dispatch_handle;
tcp_session_t rc_tcp_session; /* Must be first */
char *rc_url;
LIST_HEAD(, rtsp_arg) rc_args;
@ -106,14 +108,6 @@ typedef struct rtsp_connection {
} rc_cmd;
struct rtsp_session_head rc_sessions;
int rc_input_buf_ptr;
char rc_input_buf[RTSP_MAX_LINE_LEN];
struct sockaddr_in rc_from;
char *rc_logname; /* Printable name used when logging stuff related
to this connection */
} rtsp_connection_t;
@ -261,21 +255,6 @@ rtsp_session_destroy(rtsp_session_t *rs)
free(rs);
}
/*
* Prints data on rtsp connection
*/
static void
rcprintf(rtsp_connection_t *rc, const char *fmt, ...)
{
va_list ap;
char buf[5000];
va_start(ap, fmt);
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
write(rc->rc_fd, buf, strlen(buf));
}
/*
@ -315,7 +294,6 @@ static void
rtsp_con_set_arg(rtsp_connection_t *rc, char *key, char *val)
{
rtsp_arg_t *ra;
char buf[100];
LIST_FOREACH(ra, &rc->rc_args, link)
if(!strcasecmp(ra->key, key))
@ -329,7 +307,7 @@ rtsp_con_set_arg(rtsp_connection_t *rc, char *key, char *val)
free(ra->val);
}
ra->val = strdup(val);
#if 0
if(!strcasecmp(key, "User-Agent")) {
free(rc->rc_logname);
@ -338,6 +316,7 @@ rtsp_con_set_arg(rtsp_connection_t *rc, char *key, char *val)
val);
rc->rc_logname = strdup(buf);
}
#endif
}
@ -402,7 +381,7 @@ rtsp_reply_error(rtsp_connection_t *rc, int error, const char *errstr)
if(errstr == NULL)
errstr = rtsp_err2str(error);
syslog(LOG_INFO, "rtsp: %s: %s", rc->rc_logname, errstr);
syslog(LOG_INFO, "rtsp: %s: %s", tcp_logname(&rc->rc_tcp_session), errstr);
rcprintf(rc, "RTSP/1.0 %d %s\r\n", error, errstr);
if((c = rtsp_con_get_arg(rc, "cseq")) != NULL)
@ -474,7 +453,7 @@ rtsp_cmd_play(rtsp_connection_t *rc)
ts_muxer_play(rs->rs_muxer, start);
syslog(LOG_INFO, "rtsp: %s: Starting playback of %s",
rc->rc_logname, rs->rs_s->ths_channel->ch_name);
tcp_logname(&rc->rc_tcp_session), rs->rs_s->ths_channel->ch_name);
rcprintf(rc,
"RTSP/1.0 200 OK\r\n"
@ -510,7 +489,7 @@ rtsp_cmd_pause(rtsp_connection_t *rc)
ts_muxer_pause(rs->rs_muxer);
syslog(LOG_INFO, "rtsp: %s: Pausing playback of %s",
rc->rc_logname, rs->rs_s->ths_channel->ch_name);
tcp_logname(&rc->rc_tcp_session), rs->rs_s->ths_channel->ch_name);
rcprintf(rc,
"RTSP/1.0 200 OK\r\n"
@ -594,7 +573,7 @@ rtsp_cmd_setup(rtsp_connection_t *rc)
return;
}
dst = rc->rc_from;
memcpy(&dst, &rc->rc_tcp_session.tcp_peer_addr, sizeof(struct sockaddr_in));
dst.sin_port = htons(client_ports[0]);
if((rs = rtsp_session_create(ch, &dst)) == NULL) {
@ -713,8 +692,9 @@ rtsp_cmd_teardown(rtsp_connection_t *rc)
* RTSP connection state machine & parser
*/
static int
rtsp_con_parse(rtsp_connection_t *rc, char *buf)
rtsp_con_parse(void *aux, char *buf)
{
rtsp_connection_t *rc = aux;
int n;
char *argv[3], *c;
@ -777,158 +757,45 @@ rtsp_con_parse(rtsp_connection_t *rc, char *buf)
/*
* client error, teardown connection
* disconnect
*/
static void
rtsp_con_teardown(rtsp_connection_t *rc, int err)
rtsp_disconnect(rtsp_connection_t *rc)
{
rtsp_session_t *rs;
syslog(LOG_INFO, "rtsp: %s: disconnected -- %s",
rc->rc_logname, strerror(err));
rtsp_con_flush_args(rc);
while((rs = LIST_FIRST(&rc->rc_sessions)) != NULL)
rtsp_session_destroy(rs);
close(dispatch_delfd(rc->rc_dispatch_handle));
free(rc->rc_url);
free(rc->rc_logname);
rtsp_con_flush_args(rc);
free(rc);
}
/*
* data available on socket
*/
static void
rtsp_con_data_read(rtsp_connection_t *rc)
{
int space = sizeof(rc->rc_input_buf) - rc->rc_input_buf_ptr - 1;
int r, cr = 0, i, err;
char buf[RTSP_MAX_LINE_LEN];
if(space < 1) {
rtsp_con_teardown(rc, EBADMSG);
return;
}
r = read(rc->rc_fd, rc->rc_input_buf + rc->rc_input_buf_ptr, space);
if(r < 0) {
rtsp_con_teardown(rc, errno);
return;
}
if(r == 0) {
rtsp_con_teardown(rc, ECONNRESET);
return;
}
rc->rc_input_buf_ptr += r;
rc->rc_input_buf[rc->rc_input_buf_ptr] = 0;
while(1) {
cr = 0;
for(i = 0; i < rc->rc_input_buf_ptr; i++)
if(rc->rc_input_buf[i] == 0xa)
break;
if(i == rc->rc_input_buf_ptr)
break;
memcpy(buf, rc->rc_input_buf, i);
buf[i] = 0;
i++;
memmove(rc->rc_input_buf, rc->rc_input_buf + i,
sizeof(rc->rc_input_buf) - i);
rc->rc_input_buf_ptr -= i;
i = strlen(buf);
while(i > 0 && buf[i-1] < 32)
buf[--i] = 0;
if((err = rtsp_con_parse(rc, buf)) != 0) {
rtsp_con_teardown(rc, err);
break;
}
}
}
/*
* dispatcher callback
*/
static void
rtsp_con_socket_callback(int events, void *opaque, int fd)
{
rtsp_connection_t *rc = opaque;
if(events & DISPATCH_ERR) {
rtsp_con_teardown(rc, ECONNRESET);
return;
}
if(events & DISPATCH_READ)
rtsp_con_data_read(rc);
}
/*
*
*/
static void
rtsp_connect_callback(int events, void *opaque, int fd)
rtsp_tcp_callback(tcpevent_t event, void *tcpsession)
{
struct sockaddr_in from;
socklen_t socklen = sizeof(struct sockaddr_in);
int newfd;
int val;
rtsp_connection_t *rc;
char txt[30];
rtsp_connection_t *rc = tcpsession;
if(!(events & DISPATCH_READ))
return;
switch(event) {
case TCP_CONNECT:
break;
newfd = accept(fd, (struct sockaddr *)&from, &socklen);
if(newfd == -1)
return;
fcntl(newfd, F_SETFL, fcntl(newfd, F_GETFL) | O_NONBLOCK);
case TCP_DISCONNECT:
rtsp_disconnect(rc);
break;
val = 1;
setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
val = 30;
setsockopt(newfd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val));
val = 15;
setsockopt(newfd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val));
val = 5;
setsockopt(newfd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val));
val = 1;
setsockopt(newfd, SOL_TCP, TCP_NODELAY, &val, sizeof(val));
rc = calloc(1, sizeof(rtsp_connection_t));
rc->rc_fd = newfd;
snprintf(txt, sizeof(txt), "%s:%d",
inet_ntoa(from.sin_addr), ntohs(from.sin_port));
rc->rc_logname = strdup(txt);
syslog(LOG_INFO, "rtsp: %s: connected", rc->rc_logname);
rc->rc_from = from;
rc->rc_dispatch_handle = dispatch_addfd(newfd, rtsp_con_socket_callback, rc,
DISPATCH_READ);
case TCP_INPUT:
tcp_line_read(&rc->rc_tcp_session, rtsp_con_parse);
break;
}
}
/*
* Fire up RTSP server
*/
@ -936,33 +803,7 @@ rtsp_connect_callback(int events, void *opaque, int fd)
void
rtsp_start(void)
{
struct sockaddr_in sin;
int s;
int one = 1;
s = socket(AF_INET, SOCK_STREAM, 0);
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(9908);
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int));
fcntl(s, F_SETFL, fcntl(s, F_GETFL) | O_NONBLOCK);
syslog(LOG_INFO, "rtsp: Listening for RTSP/TCP connections on %s:%d",
inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
if(bind(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
syslog(LOG_ERR,
"rtsp: Unable to bind socket for incomming RTSP/TCP connections"
"%s:%d -- %s",
inet_ntoa(sin.sin_addr), ntohs(sin.sin_port),
strerror(errno));
return;
}
av_init_random(time(NULL), &rtsp_rnd);
listen(s, 1);
dispatch_addfd(s, rtsp_connect_callback, NULL, DISPATCH_READ);
tcp_create_server(9908, sizeof(rtsp_connection_t), "rtsp",
rtsp_tcp_callback);
}

381
tcp.c Normal file
View file

@ -0,0 +1,381 @@
/*
* tvheadend, TCP common functions
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <fcntl.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include "dispatch.h"
#include "tcp.h"
/*
* printfs data on a TCP connection
*/
void
tcp_printf(tcp_session_t *ses, const char *fmt, ...)
{
va_list ap;
char buf[5000];
void *out;
int l;
va_start(ap, fmt);
l = vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
out = malloc(l);
memcpy(out, buf, l);
tcp_send_msg(ses, &ses->tcp_q_hi, out, l);
}
/**
* Line parser for TCP based connections. Note that callback cannot
* destroy the session on its own. It must return an error to perform
* disconnect.
*/
void
tcp_line_read(tcp_session_t *ses, tcp_line_input_t *callback)
{
int space = sizeof(ses->tcp_input_buf) - ses->tcp_input_buf_ptr - 1;
int r, cr = 0, i, err;
char buf[TCP_MAX_LINE_LEN];
if(space < 1) {
tcp_disconnect(ses, EBADMSG);
return;
}
r = read(ses->tcp_fd, ses->tcp_input_buf + ses->tcp_input_buf_ptr, space);
if(r < 1) {
tcp_disconnect(ses, r == 0 ? ECONNRESET : errno);
return;
}
ses->tcp_input_buf_ptr += r;
ses->tcp_input_buf[ses->tcp_input_buf_ptr] = 0;
while(1) {
cr = 0;
for(i = 0; i < ses->tcp_input_buf_ptr; i++)
if(ses->tcp_input_buf[i] == 0xa)
break;
if(i == ses->tcp_input_buf_ptr)
break;
memcpy(buf, ses->tcp_input_buf, i);
buf[i] = 0;
i++;
memmove(ses->tcp_input_buf, ses->tcp_input_buf + i,
sizeof(ses->tcp_input_buf) - i);
ses->tcp_input_buf_ptr -= i;
i = strlen(buf);
while(i > 0 && buf[i-1] < 32)
buf[--i] = 0;
if((err = callback(ses, buf)) != 0) {
tcp_disconnect(ses, err);
break;
}
}
}
/**
* Create an output queue
*/
static void
tcp_init_queue(tcp_queue_t *tq, int maxdepth)
{
TAILQ_INIT(&tq->tq_messages);
tq->tq_depth = 0;
tq->tq_maxdepth = maxdepth;
}
/**
* Flusing all pending data from a queue
*/
static void
tcp_flush_queue(tcp_queue_t *tq)
{
tcp_data_t *td;
while((td = TAILQ_FIRST(&tq->tq_messages)) != NULL) {
TAILQ_REMOVE(&tq->tq_messages, td, td_link);
free((void *)td->td_data);
free(td);
}
}
/**
* Transmit data from any of the queues
* Select hi-pri queue first if possible, but always stick on the same
* queue until a whole message has been sent
*/
static void
tcp_transmit(tcp_session_t *ses)
{
tcp_queue_t *q = ses->tcp_q_current;
tcp_data_t *hd;
int r;
again:
if(q == NULL) {
if(ses->tcp_q_hi.tq_depth)
q = &ses->tcp_q_hi;
if(ses->tcp_q_low.tq_depth)
q = &ses->tcp_q_low;
}
while(q != NULL) {
hd = TAILQ_FIRST(&q->tq_messages);
r = write(ses->tcp_fd, hd->td_data + hd->td_offset,
hd->td_datalen - hd->td_offset);
if(r == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
r = 0;
if(r == 0)
break;
if(r == -1) {
tcp_disconnect(ses, errno);
return;
}
q->tq_depth -= r;
hd->td_offset += r;
if(hd->td_offset == hd->td_datalen) {
TAILQ_REMOVE(&q->tq_messages, hd, td_link);
free((void *)hd->td_data);
free(hd);
q = NULL;
goto again;
}
}
if(q == NULL) {
if(ses->tcp_blocked) {
dispatch_clr(ses->tcp_dispatch_handle, DISPATCH_WRITE);
ses->tcp_blocked = 0;
}
} else {
if(!ses->tcp_blocked) {
dispatch_set(ses->tcp_dispatch_handle, DISPATCH_WRITE);
ses->tcp_blocked = 1;
}
}
ses->tcp_q_current = q;
}
/**
* Enqueue a message and start transmission if nothing currently is
* being sent.
*/
int
tcp_send_msg(tcp_session_t *ses, tcp_queue_t *tq, const void *data,
size_t len)
{
tcp_data_t *td;
if(tq == NULL)
tq = &ses->tcp_q_low;
if(len > tq->tq_maxdepth - tq->tq_depth) {
free((void *)data);
return -1;
}
td = malloc(sizeof(tcp_data_t));
td->td_offset = 0;
td->td_datalen = len;
td->td_data = data;
TAILQ_INSERT_TAIL(&tq->tq_messages, td, td_link);
tq->tq_depth += td->td_datalen;
if(!ses->tcp_blocked)
tcp_transmit(ses);
return 0;
}
/**
* Disconnect handler
*/
void
tcp_disconnect(tcp_session_t *ses, int err)
{
tcp_server_t *srv = ses->tcp_server;
tcp_flush_queue(&ses->tcp_q_low);
tcp_flush_queue(&ses->tcp_q_hi);
srv->tcp_callback(TCP_DISCONNECT, ses);
syslog(LOG_INFO, "%s: %s: disconnected -- %s",
srv->tcp_server_name, ses->tcp_peer_txt, strerror(err));
close(dispatch_delfd(ses->tcp_dispatch_handle));
free(ses);
}
/**
* Dispatcher callback
*/
static void
tcp_socket_callback(int events, void *opaque, int fd)
{
tcp_session_t *ses = opaque;
tcp_server_t *srv = ses->tcp_server;
if(events & DISPATCH_ERR) {
tcp_disconnect(ses, ECONNRESET);
return;
}
if(events & DISPATCH_READ)
srv->tcp_callback(TCP_INPUT, ses);
if(events & DISPATCH_WRITE)
tcp_transmit(ses);
}
/**
* TCP connect callback
*/
static void
tcp_connect_callback(int events, void *opaque, int fd)
{
struct sockaddr_in from;
socklen_t socklen = sizeof(struct sockaddr_in);
int newfd;
int val;
tcp_session_t *ses;
tcp_server_t *srv = opaque;
if(!(events & DISPATCH_READ))
return;
if((newfd = accept(fd, (struct sockaddr *)&from, &socklen)) == -1)
return;
fcntl(newfd, F_SETFL, fcntl(newfd, F_GETFL) | O_NONBLOCK);
val = 1;
setsockopt(newfd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
val = 30;
setsockopt(newfd, SOL_TCP, TCP_KEEPIDLE, &val, sizeof(val));
val = 15;
setsockopt(newfd, SOL_TCP, TCP_KEEPINTVL, &val, sizeof(val));
val = 5;
setsockopt(newfd, SOL_TCP, TCP_KEEPCNT, &val, sizeof(val));
val = 1;
setsockopt(newfd, SOL_TCP, TCP_NODELAY, &val, sizeof(val));
ses = calloc(1, srv->tcp_session_size);
tcp_init_queue(&ses->tcp_q_hi, 20 * 1000 * 1000);
tcp_init_queue(&ses->tcp_q_low, 20 * 1000 * 1000);
ses->tcp_server = srv;
memcpy(&ses->tcp_peer_addr, &from, socklen);
snprintf(ses->tcp_peer_txt, sizeof(ses->tcp_peer_txt), "%s:%d",
inet_ntoa(from.sin_addr), ntohs(from.sin_port));
syslog(LOG_INFO, "%s: %s: connected",
srv->tcp_server_name, ses->tcp_peer_txt);
ses->tcp_fd = newfd;
ses->tcp_dispatch_handle = dispatch_addfd(newfd, tcp_socket_callback,
ses, DISPATCH_READ);
srv->tcp_callback(TCP_CONNECT, ses);
}
/**
* Create a TCP based server
*/
void
tcp_create_server(int port, size_t session_size, const char *name,
tcp_callback_t *cb)
{
struct sockaddr_in sin;
int one = 1;
tcp_server_t *s = malloc(sizeof(tcp_server_t));
s->tcp_fd = socket(AF_INET, SOCK_STREAM, 0);
s->tcp_session_size = session_size;
s->tcp_callback = cb;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(port);
setsockopt(s->tcp_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int));
fcntl(s->tcp_fd, F_SETFL, fcntl(s->tcp_fd, F_GETFL) | O_NONBLOCK);
syslog(LOG_INFO, "%s: Listening for TCP connections on %s:%d",
name, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
if(bind(s->tcp_fd, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
syslog(LOG_ERR,
"%s: Unable to bind socket for incomming TCP connections, "
"%s:%d -- %s",
name,
inet_ntoa(sin.sin_addr), ntohs(sin.sin_port),
strerror(errno));
return;
}
listen(s->tcp_fd, 1);
dispatch_addfd(s->tcp_fd, tcp_connect_callback, s, DISPATCH_READ);
s->tcp_server_name = strdup(name);
}

91
tcp.h Normal file
View file

@ -0,0 +1,91 @@
/*
* tvheadend, TCP common functions
* Copyright (C) 2007 Andreas Öman
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TCP_H_
#define TCP_H_
TAILQ_HEAD(tcp_data_queue, tcp_data);
typedef struct tcp_data {
TAILQ_ENTRY(tcp_data) td_link;
const void *td_data;
unsigned int td_datalen;
int td_offset;
} tcp_data_t;
typedef struct tcp_queue {
struct tcp_data_queue tq_messages;
int tq_depth;
int tq_maxdepth;
} tcp_queue_t;
typedef enum {
TCP_CONNECT,
TCP_DISCONNECT,
TCP_INPUT,
} tcpevent_t;
typedef void (tcp_callback_t)(tcpevent_t event, void *tcpsession);
typedef int (tcp_line_input_t)(void *tcpsession, char *line);
typedef struct tcpserver {
tcp_callback_t *tcp_callback;
int tcp_fd;
size_t tcp_session_size;
const char *tcp_server_name;
} tcp_server_t;
#define TCP_MAX_LINE_LEN 256
typedef struct tcp_session {
void *tcp_dispatch_handle;
int tcp_fd;
struct sockaddr_storage tcp_peer_addr;
char tcp_peer_txt[100];
tcp_server_t *tcp_server;
/* Output queueing */
int tcp_blocked;
tcp_queue_t tcp_q_hi;
tcp_queue_t tcp_q_low;
tcp_queue_t *tcp_q_current;
/* Input line parser */
int tcp_input_buf_ptr;
char tcp_input_buf[TCP_MAX_LINE_LEN];
} tcp_session_t;
void tcp_disconnect(tcp_session_t *ses, int err);
void tcp_create_server(int port, size_t session_size, const char *name,
tcp_callback_t *cb);
void tcp_line_read(tcp_session_t *ses, tcp_line_input_t *callback);
#define tcp_logname(ses) ((ses)->tcp_peer_txt)
int tcp_send_msg(tcp_session_t *ses, tcp_queue_t *tq, const void *data,
size_t len);
void tcp_printf(tcp_session_t *ses, const char *fmt, ...);
#endif /* TCP_H_ */