Reorganize the UDP code from IPTV to the shared location

This commit is contained in:
Jaroslav Kysela 2014-04-09 19:28:16 +02:00
parent 0e720dc62b
commit 91e5c9f7a8
6 changed files with 427 additions and 124 deletions

View file

@ -76,6 +76,7 @@ SRCS = src/version.c \
src/access.c \
src/dtable.c \
src/tcp.c \
src/udp.c \
src/url.c \
src/http.c \
src/notify.c \

View file

@ -239,7 +239,8 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
/* Close file */
if (im->mm_iptv_fd > 0) {
close(im->mm_iptv_fd); // removes from poll
udp_close(im->mm_iptv_connection); // removes from poll
im->mm_iptv_connection = NULL;
im->mm_iptv_fd = -1;
}

View file

@ -23,6 +23,7 @@
#include "input.h"
#include "htsbuf.h"
#include "url.h"
#include "udp.h"
#define IPTV_PKT_SIZE (300*188)
@ -72,6 +73,7 @@ struct iptv_mux
mpegts_mux_t;
int mm_iptv_fd;
udp_connection_t *mm_iptv_connection;
char *mm_iptv_url;
char *mm_iptv_interface;

View file

@ -45,137 +45,23 @@
static int
iptv_udp_start ( iptv_mux_t *im, const url_t *url )
{
int fd, solip, rxsize, reuse = 1, ipv6 = 0;
struct ifreq ifr;
struct in_addr saddr;
struct in6_addr s6addr;
char name[256], buf[256];
char name[256];
udp_connection_t *conn;
im->mm_display_name((mpegts_mux_t*)im, name, sizeof(name));
/* Determine if this is IPv6 */
if (!inet_pton(AF_INET, url->host, &saddr)) {
ipv6 = 1;
if (!inet_pton(AF_INET6, url->host, &s6addr)) {
tvherror("iptv", "%s - failed to process host", name);
return SM_CODE_TUNING_FAILED;
}
}
/* Open socket */
if ((fd = tvh_socket(ipv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0)) == -1) {
tvherror("iptv", "%s - failed to create socket [%s]",
name, strerror(errno));
conn = udp_bind("iptv", name, url->host, url->port,
im->mm_iptv_interface, IPTV_PKT_SIZE);
if (conn == UDP_FATAL_ERROR)
return SM_CODE_TUNING_FAILED;
}
/* Mark reuse address */
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
/* Bind to interface */
memset(&ifr, 0, sizeof(ifr));
if (im->mm_iptv_interface && *im->mm_iptv_interface) {
snprintf(ifr.ifr_name, IFNAMSIZ, "%s", im->mm_iptv_interface);
if (ioctl(fd, SIOCGIFINDEX, &ifr)) {
tvherror("iptv", "%s - could not find interface %s",
name, im->mm_iptv_interface);
goto error;
}
}
/* IPv4 */
if (!ipv6) {
struct ip_mreqn m;
struct sockaddr_in sin;
memset(&m, 0, sizeof(m));
memset(&sin, 0, sizeof(sin));
/* Bind */
sin.sin_family = AF_INET;
sin.sin_port = htons(url->port);
sin.sin_addr = saddr;
if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) == -1) {
inet_ntop(AF_INET, &sin.sin_addr, buf, sizeof(buf));
tvherror("iptv", "%s - cannot bind %s:%hd [e=%s]",
name, buf, ntohs(sin.sin_port), strerror(errno));
goto error;
}
/* Join group */
m.imr_multiaddr = sin.sin_addr;
m.imr_address.s_addr = 0;
#if defined(PLATFORM_LINUX)
m.imr_ifindex = ifr.ifr_ifindex;
#elif defined(PLATFORM_FREEBSD)
m.imr_ifindex = ifr.ifr_index;
#endif
#ifdef SOL_IP
solip = SOL_IP;
#else
{
struct protoent *pent;
pent = getprotobyname("ip");
solip = (pent != NULL) ? pent->p_proto : 0;
}
#endif
if (setsockopt(fd, solip, IP_ADD_MEMBERSHIP, &m, sizeof(m))) {
inet_ntop(AF_INET, &m.imr_multiaddr, buf, sizeof(buf));
tvhwarn("iptv", "%s - cannot join %s [%s]",
name, buf, strerror(errno));
}
/* Bind to IPv6 group */
} else {
struct ipv6_mreq m;
struct sockaddr_in6 sin;
memset(&m, 0, sizeof(m));
memset(&sin, 0, sizeof(sin));
/* Bind */
sin.sin6_family = AF_INET6;
sin.sin6_port = htons(url->port);
sin.sin6_addr = s6addr;
if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) == -1) {
inet_ntop(AF_INET6, &sin.sin6_addr, buf, sizeof(buf));
tvherror("iptv", "%s - cannot bind %s:%hd [e=%s]",
name, buf, ntohs(sin.sin6_port), strerror(errno));
goto error;
}
/* Join group */
m.ipv6mr_multiaddr = sin.sin6_addr;
#if defined(PLATFORM_LINUX)
m.ipv6mr_interface = ifr.ifr_ifindex;
#elif defined(PLATFORM_FREEBSD)
m.ipv6mr_interface = ifr.ifr_index;
#endif
#ifdef SOL_IPV6
if (setsockopt(fd, SOL_IPV6, IPV6_ADD_MEMBERSHIP, &m, sizeof(m))) {
inet_ntop(AF_INET, &m.ipv6mr_multiaddr, buf, sizeof(buf));
tvhwarn("iptv", "%s - cannot join %s [%s]",
name, buf, strerror(errno));
}
#else
tvherror("iptv", "IPv6 multicast not supported");
goto error;
#endif
}
/* Increase RX buffer size */
rxsize = IPTV_PKT_SIZE;
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rxsize, sizeof(rxsize)) == -1)
tvhwarn("iptv", "%s - cannot increase UDP rx buffer size [%s]",
name, strerror(errno));
if (conn == NULL)
return -1;
/* Done */
im->mm_iptv_fd = fd;
im->mm_iptv_fd = conn->fd;
im->mm_iptv_connection = conn;
iptv_input_mux_started(im);
return 0;
error:
close(fd);
return -1;
}
static ssize_t

344
src/udp.c Normal file
View file

@ -0,0 +1,344 @@
/*
* TVHeadend - UDP common routines
*
* Copyright (C) 2013 Adam Sutton
* Copyright (C) 2014 Jaroslav Kysela
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tvheadend.h"
#include "udp.h"
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <assert.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netdb.h>
#if defined(PLATFORM_LINUX)
#include <linux/netdevice.h>
#elif defined(PLATFORM_FREEBSD)
# include <net/if.h>
# ifndef IPV6_ADD_MEMBERSHIP
# define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
# define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
# endif
#endif
extern int tcp_preferred_address_family;
static int
udp_resolve( udp_connection_t *uc, int receiver )
{
struct addrinfo hints, *res, *ressave, *use = NULL;
char port_buf[6];
int x;
snprintf(port_buf, 6, "%d", uc->port);
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_flags = receiver ? AI_PASSIVE : 0;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
x = getaddrinfo(uc->host ? uc->host : "*", port_buf, &hints, &res);
if (x < 0) {
tvhlog(LOG_ERR, uc->subsystem, "getaddrinfo: %s: %s",
uc->host != NULL ? uc->host : "*",
x == EAI_SYSTEM ? strerror(errno) : gai_strerror(x));
return -1;
}
ressave = res;
while (res) {
if (res->ai_family == tcp_preferred_address_family) {
use = res;
break;
} else if (use == NULL) {
use = res;
}
res = res->ai_next;
}
if (use->ai_family == AF_INET6) {
uc->ip.ss_family = AF_INET6;
IP_AS_V6(uc->ip, port) = htons(uc->port);
memcpy(&IP_AS_V6(uc->ip, addr), &((struct sockaddr_in6 *)use->ai_addr)->sin6_addr,
sizeof(struct in6_addr));
uc->multicast = !!IN6_IS_ADDR_MULTICAST(&IP_AS_V6(uc->ip, addr));
} else if (use->ai_family == AF_INET) {
uc->ip.ss_family = AF_INET;
IP_AS_V4(uc->ip, port) = htons(uc->port);
IP_AS_V4(uc->ip, addr) = ((struct sockaddr_in *)use->ai_addr)->sin_addr;
uc->multicast = !!IN_MULTICAST(ntohl(IP_AS_V4(uc->ip, addr.s_addr)));
}
freeaddrinfo(ressave);
if (uc->ip.ss_family != AF_INET && uc->ip.ss_family != AF_INET6) {
tvherror(uc->subsystem, "%s - failed to process host '%s'", uc->name, uc->host);
return -1;
}
return 0;
}
udp_connection_t *
udp_bind ( const char *subsystem, const char *name,
const char *bindaddr, int port,
const char *ifname, int rxsize )
{
int fd, solip, reuse = 1;
struct ifreq ifr;
udp_connection_t *uc;
char buf[256];
socklen_t addrlen;
uc = calloc(1, sizeof(udp_connection_t));
uc->fd = -1;
uc->host = bindaddr ? strdup(bindaddr) : NULL;
uc->port = port;
uc->ifname = ifname ? strdup(ifname) : NULL;
uc->subsystem = subsystem ? strdup(subsystem) : NULL;
uc->name = name ? strdup(name) : NULL;
uc->rxtxsize = rxsize;
if (udp_resolve(uc, 1) < 0) {
udp_close(uc);
return UDP_FATAL_ERROR;
}
/* Open socket */
if ((fd = tvh_socket(uc->ip.ss_family, SOCK_DGRAM, 0)) == -1) {
tvherror(subsystem, "%s - failed to create socket [%s]",
name, strerror(errno));
udp_close(uc);
return UDP_FATAL_ERROR;
}
/* Mark reuse address */
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
/* Bind to interface */
memset(&ifr, 0, sizeof(ifr));
if (ifname && *ifname) {
snprintf(ifr.ifr_name, IFNAMSIZ, "%s", ifname);
if (ioctl(fd, SIOCGIFINDEX, &ifr)) {
tvherror(subsystem, "%s - could not find interface %s",
name, ifname);
goto error;
}
}
/* IPv4 */
if (uc->ip.ss_family == AF_INET) {
struct ip_mreqn m;
memset(&m, 0, sizeof(m));
/* Bind */
if (bind(fd, (struct sockaddr *)&uc->ip, sizeof(struct sockaddr_in)) == -1) {
inet_ntop(AF_INET, &IP_AS_V4(uc->ip, addr), buf, sizeof(buf));
tvherror(subsystem, "%s - cannot bind %s:%hu [e=%s]",
name, buf, ntohs(IP_AS_V4(uc->ip, port)), strerror(errno));
goto error;
}
if (uc->multicast) {
/* Join group */
m.imr_multiaddr = IP_AS_V4(uc->ip, addr);
m.imr_address.s_addr = 0;
#if defined(PLATFORM_LINUX)
m.imr_ifindex = ifr.ifr_ifindex;
#elif defined(PLATFORM_FREEBSD)
m.imr_ifindex = ifr.ifr_index;
#endif
#ifdef SOL_IP
solip = SOL_IP;
#else
{
struct protoent *pent;
pent = getprotobyname("ip");
solip = (pent != NULL) ? pent->p_proto : 0;
}
#endif
if (setsockopt(fd, solip, IP_ADD_MEMBERSHIP, &m, sizeof(m))) {
inet_ntop(AF_INET, &m.imr_multiaddr, buf, sizeof(buf));
tvhwarn("iptv", "%s - cannot join %s [%s]",
name, buf, strerror(errno));
}
}
/* Bind to IPv6 group */
} else {
struct ipv6_mreq m;
memset(&m, 0, sizeof(m));
/* Bind */
if (bind(fd, (struct sockaddr *)&uc->ip, sizeof(struct sockaddr_in6)) == -1) {
inet_ntop(AF_INET6, &IP_AS_V6(uc->ip, addr), buf, sizeof(buf));
tvherror(subsystem, "%s - cannot bind %s:%hu [e=%s]",
name, buf, ntohs(IP_AS_V6(uc->ip, port)), strerror(errno));
goto error;
}
if (uc->multicast) {
/* Join group */
m.ipv6mr_multiaddr = IP_AS_V6(uc->ip, addr);
#if defined(PLATFORM_LINUX)
m.ipv6mr_interface = ifr.ifr_ifindex;
#elif defined(PLATFORM_FREEBSD)
m.ipv6mr_interface = ifr.ifr_index;
#endif
#ifdef SOL_IPV6
if (setsockopt(fd, SOL_IPV6, IPV6_ADD_MEMBERSHIP, &m, sizeof(m))) {
inet_ntop(AF_INET, &m.ipv6mr_multiaddr, buf, sizeof(buf));
tvhwarn(subsystem, "%s - cannot join %s [%s]",
name, buf, strerror(errno));
}
#else
tvherror("iptv", "IPv6 multicast not supported");
goto error;
#endif
}
}
addrlen = sizeof(uc->ip);
getsockname(fd, (struct sockaddr *)&uc->ip, &addrlen);
/* Increase RX buffer size */
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rxsize, sizeof(rxsize)) == -1)
tvhwarn("iptv", "%s - cannot increase UDP rx buffer size [%s]",
name, strerror(errno));
uc->fd = fd;
return uc;
error:
udp_close(uc);
return NULL;
}
udp_connection_t *
udp_connect ( const char *subsystem, const char *name,
const char *host, int port,
const char *ifname, int txsize )
{
int fd;
struct ifreq ifr;
udp_connection_t *uc;
uc = calloc(1, sizeof(udp_connection_t));
uc->fd = -1;
uc->host = host ? strdup(host) : NULL;
uc->port = port;
uc->ifname = ifname ? strdup(ifname) : NULL;
uc->subsystem = subsystem ? strdup(subsystem) : NULL;
uc->name = name ? strdup(name) : NULL;
uc->rxtxsize = txsize;
if (udp_resolve(uc, 1) < 0) {
udp_close(uc);
return UDP_FATAL_ERROR;
}
/* Open socket */
if ((fd = tvh_socket(uc->ip.ss_family, SOCK_DGRAM, 0)) == -1) {
tvherror(subsystem, "%s - failed to create socket [%s]",
name, strerror(errno));
udp_close(uc);
return UDP_FATAL_ERROR;
}
/* Bind to interface */
memset(&ifr, 0, sizeof(ifr));
if (ifname && *ifname) {
snprintf(ifr.ifr_name, IFNAMSIZ, "%s", ifname);
if (ioctl(fd, SIOCGIFINDEX, &ifr)) {
tvherror(subsystem, "%s - could not find interface %s",
name, ifname);
goto error;
}
}
/* Increase TX buffer size */
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &txsize, sizeof(txsize)) == -1)
tvhwarn("iptv", "%s - cannot increase UDP tx buffer size [%s]",
name, strerror(errno));
uc->fd = fd;
return uc;
error:
udp_close(uc);
return NULL;
}
void
udp_close( udp_connection_t *uc )
{
if (uc == NULL || uc == UDP_FATAL_ERROR)
return;
if (uc->fd >= 0)
close(uc->fd);
free(uc->host);
free(uc->ifname);
free(uc->subsystem);
free(uc->name);
free(uc);
}
int
udp_write( udp_connection_t *uc, const void *buf, size_t len,
struct sockaddr_storage *storage )
{
int r;
if (storage == NULL)
storage = &uc->ip;
while (len) {
r = sendto(uc->fd, buf, len, 0, (struct sockaddr*)storage,
storage->ss_family == AF_INET6 ?
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
if (r < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
usleep(100);
continue;
}
break;
}
len -= r;
buf += r;
}
return len;
}
int
udp_write_queue( udp_connection_t *uc, htsbuf_queue_t *q,
struct sockaddr_storage *storage )
{
htsbuf_data_t *hd;
int l, r = 0;
void *p;
while ((hd = TAILQ_FIRST(&q->hq_q)) != NULL) {
if (!r) {
l = hd->hd_data_len - hd->hd_data_off;
p = hd->hd_data + hd->hd_data_off;
r = udp_write(uc, p, l, storage);
}
htsbuf_data_free(q, hd);
}
q->hq_size = 0;
return r;
}

69
src/udp.h Normal file
View file

@ -0,0 +1,69 @@
/*
* tvheadend, UDP interface
* Copyright (C) 2013 Adam Sutton
* Copyright (C) 2014 Jaroslav Kysela
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef UDP_H_
#define UDP_H_
#include <netinet/in.h>
#include "tcp.h"
#define UDP_FATAL_ERROR ((void *)-1)
#define IP_AS_V4(storage, f) ((struct sockaddr_in *)&(storage))->sin_##f
#define IP_AS_V6(storage, f) ((struct sockaddr_in6 *)&(storage))->sin6_##f
#define IP_IN_ADDR(storage) \
((storage).ss_family == AF_INET6 ? \
&((struct sockaddr_in6 *)&(storage))->sin6_addr : \
(void *)&((struct sockaddr_in *)&(storage))->sin_addr)
#define IP_PORT(storage) \
((storage).ss_family == AF_INET6 ? \
((struct sockaddr_in6 *)&(storage))->sin6_port : \
((struct sockaddr_in *)&(storage))->sin_port)
typedef struct udp_connection {
char *host;
int port;
int multicast;
char *ifname;
struct sockaddr_storage ip;
int fd;
char *subsystem;
char *name;
int rxtxsize;
} udp_connection_t;
udp_connection_t *
udp_bind ( const char *subsystem, const char *name,
const char *bindaddr, int port,
const char *ifname, int rxsize );
udp_connection_t *
udp_connect ( const char *subsystem, const char *name,
const char *host, int port,
const char *ifname, int txsize );
void
udp_close ( udp_connection_t *uc );
int
udp_write( udp_connection_t *uc, const void *buf, size_t len,
struct sockaddr_storage *storage );
int
udp_write_queue( udp_connection_t *uc, htsbuf_queue_t *q,
struct sockaddr_storage *storage );
#endif /* UDP_H_ */