From 91e5c9f7a8c01e89e623739caafdc2eebd243705 Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Wed, 9 Apr 2014 19:28:16 +0200 Subject: [PATCH] Reorganize the UDP code from IPTV to the shared location --- Makefile | 1 + src/input/mpegts/iptv/iptv.c | 3 +- src/input/mpegts/iptv/iptv_private.h | 2 + src/input/mpegts/iptv/iptv_udp.c | 132 +--------- src/udp.c | 344 +++++++++++++++++++++++++++ src/udp.h | 69 ++++++ 6 files changed, 427 insertions(+), 124 deletions(-) create mode 100644 src/udp.c create mode 100644 src/udp.h diff --git a/Makefile b/Makefile index 466d6405..ca7f8a55 100644 --- a/Makefile +++ b/Makefile @@ -76,6 +76,7 @@ SRCS = src/version.c \ src/access.c \ src/dtable.c \ src/tcp.c \ + src/udp.c \ src/url.c \ src/http.c \ src/notify.c \ diff --git a/src/input/mpegts/iptv/iptv.c b/src/input/mpegts/iptv/iptv.c index 4fd059c1..274cc893 100644 --- a/src/input/mpegts/iptv/iptv.c +++ b/src/input/mpegts/iptv/iptv.c @@ -239,7 +239,8 @@ iptv_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) /* Close file */ if (im->mm_iptv_fd > 0) { - close(im->mm_iptv_fd); // removes from poll + udp_close(im->mm_iptv_connection); // removes from poll + im->mm_iptv_connection = NULL; im->mm_iptv_fd = -1; } diff --git a/src/input/mpegts/iptv/iptv_private.h b/src/input/mpegts/iptv/iptv_private.h index edfabb5e..966a2746 100644 --- a/src/input/mpegts/iptv/iptv_private.h +++ b/src/input/mpegts/iptv/iptv_private.h @@ -23,6 +23,7 @@ #include "input.h" #include "htsbuf.h" #include "url.h" +#include "udp.h" #define IPTV_PKT_SIZE (300*188) @@ -72,6 +73,7 @@ struct iptv_mux mpegts_mux_t; int mm_iptv_fd; + udp_connection_t *mm_iptv_connection; char *mm_iptv_url; char *mm_iptv_interface; diff --git a/src/input/mpegts/iptv/iptv_udp.c b/src/input/mpegts/iptv/iptv_udp.c index ea8cac0b..c001204f 100644 --- a/src/input/mpegts/iptv/iptv_udp.c +++ b/src/input/mpegts/iptv/iptv_udp.c @@ -45,137 +45,23 @@ static int iptv_udp_start ( iptv_mux_t *im, const url_t *url ) { - int fd, solip, rxsize, reuse = 1, ipv6 = 0; - struct ifreq ifr; - struct in_addr saddr; - struct in6_addr s6addr; - char name[256], buf[256]; + char name[256]; + udp_connection_t *conn; im->mm_display_name((mpegts_mux_t*)im, name, sizeof(name)); - /* Determine if this is IPv6 */ - if (!inet_pton(AF_INET, url->host, &saddr)) { - ipv6 = 1; - if (!inet_pton(AF_INET6, url->host, &s6addr)) { - tvherror("iptv", "%s - failed to process host", name); - return SM_CODE_TUNING_FAILED; - } - } - - /* Open socket */ - if ((fd = tvh_socket(ipv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0)) == -1) { - tvherror("iptv", "%s - failed to create socket [%s]", - name, strerror(errno)); + conn = udp_bind("iptv", name, url->host, url->port, + im->mm_iptv_interface, IPTV_PKT_SIZE); + if (conn == UDP_FATAL_ERROR) return SM_CODE_TUNING_FAILED; - } - - /* Mark reuse address */ - setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); - - /* Bind to interface */ - memset(&ifr, 0, sizeof(ifr)); - if (im->mm_iptv_interface && *im->mm_iptv_interface) { - snprintf(ifr.ifr_name, IFNAMSIZ, "%s", im->mm_iptv_interface); - if (ioctl(fd, SIOCGIFINDEX, &ifr)) { - tvherror("iptv", "%s - could not find interface %s", - name, im->mm_iptv_interface); - goto error; - } - } - - /* IPv4 */ - if (!ipv6) { - struct ip_mreqn m; - struct sockaddr_in sin; - memset(&m, 0, sizeof(m)); - memset(&sin, 0, sizeof(sin)); - - /* Bind */ - sin.sin_family = AF_INET; - sin.sin_port = htons(url->port); - sin.sin_addr = saddr; - if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) == -1) { - inet_ntop(AF_INET, &sin.sin_addr, buf, sizeof(buf)); - tvherror("iptv", "%s - cannot bind %s:%hd [e=%s]", - name, buf, ntohs(sin.sin_port), strerror(errno)); - goto error; - } - - /* Join group */ - m.imr_multiaddr = sin.sin_addr; - m.imr_address.s_addr = 0; -#if defined(PLATFORM_LINUX) - m.imr_ifindex = ifr.ifr_ifindex; -#elif defined(PLATFORM_FREEBSD) - m.imr_ifindex = ifr.ifr_index; -#endif -#ifdef SOL_IP - solip = SOL_IP; -#else - { - struct protoent *pent; - pent = getprotobyname("ip"); - solip = (pent != NULL) ? pent->p_proto : 0; - } -#endif - - if (setsockopt(fd, solip, IP_ADD_MEMBERSHIP, &m, sizeof(m))) { - inet_ntop(AF_INET, &m.imr_multiaddr, buf, sizeof(buf)); - tvhwarn("iptv", "%s - cannot join %s [%s]", - name, buf, strerror(errno)); - } - - /* Bind to IPv6 group */ - } else { - struct ipv6_mreq m; - struct sockaddr_in6 sin; - memset(&m, 0, sizeof(m)); - memset(&sin, 0, sizeof(sin)); - - /* Bind */ - sin.sin6_family = AF_INET6; - sin.sin6_port = htons(url->port); - sin.sin6_addr = s6addr; - if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) == -1) { - inet_ntop(AF_INET6, &sin.sin6_addr, buf, sizeof(buf)); - tvherror("iptv", "%s - cannot bind %s:%hd [e=%s]", - name, buf, ntohs(sin.sin6_port), strerror(errno)); - goto error; - } - - /* Join group */ - m.ipv6mr_multiaddr = sin.sin6_addr; -#if defined(PLATFORM_LINUX) - m.ipv6mr_interface = ifr.ifr_ifindex; -#elif defined(PLATFORM_FREEBSD) - m.ipv6mr_interface = ifr.ifr_index; -#endif -#ifdef SOL_IPV6 - if (setsockopt(fd, SOL_IPV6, IPV6_ADD_MEMBERSHIP, &m, sizeof(m))) { - inet_ntop(AF_INET, &m.ipv6mr_multiaddr, buf, sizeof(buf)); - tvhwarn("iptv", "%s - cannot join %s [%s]", - name, buf, strerror(errno)); - } -#else - tvherror("iptv", "IPv6 multicast not supported"); - goto error; -#endif - } - - /* Increase RX buffer size */ - rxsize = IPTV_PKT_SIZE; - if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rxsize, sizeof(rxsize)) == -1) - tvhwarn("iptv", "%s - cannot increase UDP rx buffer size [%s]", - name, strerror(errno)); + if (conn == NULL) + return -1; /* Done */ - im->mm_iptv_fd = fd; + im->mm_iptv_fd = conn->fd; + im->mm_iptv_connection = conn; iptv_input_mux_started(im); return 0; - -error: - close(fd); - return -1; } static ssize_t diff --git a/src/udp.c b/src/udp.c new file mode 100644 index 00000000..a52d84e1 --- /dev/null +++ b/src/udp.c @@ -0,0 +1,344 @@ +/* + * TVHeadend - UDP common routines + * + * Copyright (C) 2013 Adam Sutton + * Copyright (C) 2014 Jaroslav Kysela + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "tvheadend.h" +#include "udp.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#if defined(PLATFORM_LINUX) +#include +#elif defined(PLATFORM_FREEBSD) +# include +# ifndef IPV6_ADD_MEMBERSHIP +# define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP +# define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP +# endif +#endif + +extern int tcp_preferred_address_family; + +static int +udp_resolve( udp_connection_t *uc, int receiver ) +{ + struct addrinfo hints, *res, *ressave, *use = NULL; + char port_buf[6]; + int x; + + snprintf(port_buf, 6, "%d", uc->port); + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_flags = receiver ? AI_PASSIVE : 0; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + + x = getaddrinfo(uc->host ? uc->host : "*", port_buf, &hints, &res); + if (x < 0) { + tvhlog(LOG_ERR, uc->subsystem, "getaddrinfo: %s: %s", + uc->host != NULL ? uc->host : "*", + x == EAI_SYSTEM ? strerror(errno) : gai_strerror(x)); + return -1; + } + + ressave = res; + while (res) { + if (res->ai_family == tcp_preferred_address_family) { + use = res; + break; + } else if (use == NULL) { + use = res; + } + res = res->ai_next; + } + if (use->ai_family == AF_INET6) { + uc->ip.ss_family = AF_INET6; + IP_AS_V6(uc->ip, port) = htons(uc->port); + memcpy(&IP_AS_V6(uc->ip, addr), &((struct sockaddr_in6 *)use->ai_addr)->sin6_addr, + sizeof(struct in6_addr)); + uc->multicast = !!IN6_IS_ADDR_MULTICAST(&IP_AS_V6(uc->ip, addr)); + } else if (use->ai_family == AF_INET) { + uc->ip.ss_family = AF_INET; + IP_AS_V4(uc->ip, port) = htons(uc->port); + IP_AS_V4(uc->ip, addr) = ((struct sockaddr_in *)use->ai_addr)->sin_addr; + uc->multicast = !!IN_MULTICAST(ntohl(IP_AS_V4(uc->ip, addr.s_addr))); + } + freeaddrinfo(ressave); + if (uc->ip.ss_family != AF_INET && uc->ip.ss_family != AF_INET6) { + tvherror(uc->subsystem, "%s - failed to process host '%s'", uc->name, uc->host); + return -1; + } + return 0; +} + +udp_connection_t * +udp_bind ( const char *subsystem, const char *name, + const char *bindaddr, int port, + const char *ifname, int rxsize ) +{ + int fd, solip, reuse = 1; + struct ifreq ifr; + udp_connection_t *uc; + char buf[256]; + socklen_t addrlen; + + uc = calloc(1, sizeof(udp_connection_t)); + uc->fd = -1; + uc->host = bindaddr ? strdup(bindaddr) : NULL; + uc->port = port; + uc->ifname = ifname ? strdup(ifname) : NULL; + uc->subsystem = subsystem ? strdup(subsystem) : NULL; + uc->name = name ? strdup(name) : NULL; + uc->rxtxsize = rxsize; + + if (udp_resolve(uc, 1) < 0) { + udp_close(uc); + return UDP_FATAL_ERROR; + } + + /* Open socket */ + if ((fd = tvh_socket(uc->ip.ss_family, SOCK_DGRAM, 0)) == -1) { + tvherror(subsystem, "%s - failed to create socket [%s]", + name, strerror(errno)); + udp_close(uc); + return UDP_FATAL_ERROR; + } + + /* Mark reuse address */ + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); + + /* Bind to interface */ + memset(&ifr, 0, sizeof(ifr)); + if (ifname && *ifname) { + snprintf(ifr.ifr_name, IFNAMSIZ, "%s", ifname); + if (ioctl(fd, SIOCGIFINDEX, &ifr)) { + tvherror(subsystem, "%s - could not find interface %s", + name, ifname); + goto error; + } + } + + /* IPv4 */ + if (uc->ip.ss_family == AF_INET) { + struct ip_mreqn m; + memset(&m, 0, sizeof(m)); + + /* Bind */ + if (bind(fd, (struct sockaddr *)&uc->ip, sizeof(struct sockaddr_in)) == -1) { + inet_ntop(AF_INET, &IP_AS_V4(uc->ip, addr), buf, sizeof(buf)); + tvherror(subsystem, "%s - cannot bind %s:%hu [e=%s]", + name, buf, ntohs(IP_AS_V4(uc->ip, port)), strerror(errno)); + goto error; + } + + if (uc->multicast) { + /* Join group */ + m.imr_multiaddr = IP_AS_V4(uc->ip, addr); + m.imr_address.s_addr = 0; +#if defined(PLATFORM_LINUX) + m.imr_ifindex = ifr.ifr_ifindex; +#elif defined(PLATFORM_FREEBSD) + m.imr_ifindex = ifr.ifr_index; +#endif +#ifdef SOL_IP + solip = SOL_IP; +#else + { + struct protoent *pent; + pent = getprotobyname("ip"); + solip = (pent != NULL) ? pent->p_proto : 0; + } +#endif + + if (setsockopt(fd, solip, IP_ADD_MEMBERSHIP, &m, sizeof(m))) { + inet_ntop(AF_INET, &m.imr_multiaddr, buf, sizeof(buf)); + tvhwarn("iptv", "%s - cannot join %s [%s]", + name, buf, strerror(errno)); + } + } + + /* Bind to IPv6 group */ + } else { + struct ipv6_mreq m; + memset(&m, 0, sizeof(m)); + + /* Bind */ + if (bind(fd, (struct sockaddr *)&uc->ip, sizeof(struct sockaddr_in6)) == -1) { + inet_ntop(AF_INET6, &IP_AS_V6(uc->ip, addr), buf, sizeof(buf)); + tvherror(subsystem, "%s - cannot bind %s:%hu [e=%s]", + name, buf, ntohs(IP_AS_V6(uc->ip, port)), strerror(errno)); + goto error; + } + + if (uc->multicast) { + /* Join group */ + m.ipv6mr_multiaddr = IP_AS_V6(uc->ip, addr); +#if defined(PLATFORM_LINUX) + m.ipv6mr_interface = ifr.ifr_ifindex; +#elif defined(PLATFORM_FREEBSD) + m.ipv6mr_interface = ifr.ifr_index; +#endif +#ifdef SOL_IPV6 + if (setsockopt(fd, SOL_IPV6, IPV6_ADD_MEMBERSHIP, &m, sizeof(m))) { + inet_ntop(AF_INET, &m.ipv6mr_multiaddr, buf, sizeof(buf)); + tvhwarn(subsystem, "%s - cannot join %s [%s]", + name, buf, strerror(errno)); + } +#else + tvherror("iptv", "IPv6 multicast not supported"); + goto error; +#endif + } + } + + addrlen = sizeof(uc->ip); + getsockname(fd, (struct sockaddr *)&uc->ip, &addrlen); + + /* Increase RX buffer size */ + if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rxsize, sizeof(rxsize)) == -1) + tvhwarn("iptv", "%s - cannot increase UDP rx buffer size [%s]", + name, strerror(errno)); + + uc->fd = fd; + return uc; + +error: + udp_close(uc); + return NULL; +} + +udp_connection_t * +udp_connect ( const char *subsystem, const char *name, + const char *host, int port, + const char *ifname, int txsize ) +{ + int fd; + struct ifreq ifr; + udp_connection_t *uc; + + uc = calloc(1, sizeof(udp_connection_t)); + uc->fd = -1; + uc->host = host ? strdup(host) : NULL; + uc->port = port; + uc->ifname = ifname ? strdup(ifname) : NULL; + uc->subsystem = subsystem ? strdup(subsystem) : NULL; + uc->name = name ? strdup(name) : NULL; + uc->rxtxsize = txsize; + + if (udp_resolve(uc, 1) < 0) { + udp_close(uc); + return UDP_FATAL_ERROR; + } + + /* Open socket */ + if ((fd = tvh_socket(uc->ip.ss_family, SOCK_DGRAM, 0)) == -1) { + tvherror(subsystem, "%s - failed to create socket [%s]", + name, strerror(errno)); + udp_close(uc); + return UDP_FATAL_ERROR; + } + + /* Bind to interface */ + memset(&ifr, 0, sizeof(ifr)); + if (ifname && *ifname) { + snprintf(ifr.ifr_name, IFNAMSIZ, "%s", ifname); + if (ioctl(fd, SIOCGIFINDEX, &ifr)) { + tvherror(subsystem, "%s - could not find interface %s", + name, ifname); + goto error; + } + } + + /* Increase TX buffer size */ + if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &txsize, sizeof(txsize)) == -1) + tvhwarn("iptv", "%s - cannot increase UDP tx buffer size [%s]", + name, strerror(errno)); + + uc->fd = fd; + return uc; + +error: + udp_close(uc); + return NULL; +} + +void +udp_close( udp_connection_t *uc ) +{ + if (uc == NULL || uc == UDP_FATAL_ERROR) + return; + if (uc->fd >= 0) + close(uc->fd); + free(uc->host); + free(uc->ifname); + free(uc->subsystem); + free(uc->name); + free(uc); +} + +int +udp_write( udp_connection_t *uc, const void *buf, size_t len, + struct sockaddr_storage *storage ) +{ + int r; + + if (storage == NULL) + storage = &uc->ip; + while (len) { + r = sendto(uc->fd, buf, len, 0, (struct sockaddr*)storage, + storage->ss_family == AF_INET6 ? + sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); + if (r < 0) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + usleep(100); + continue; + } + break; + } + len -= r; + buf += r; + } + return len; +} + +int +udp_write_queue( udp_connection_t *uc, htsbuf_queue_t *q, + struct sockaddr_storage *storage ) +{ + htsbuf_data_t *hd; + int l, r = 0; + void *p; + + while ((hd = TAILQ_FIRST(&q->hq_q)) != NULL) { + if (!r) { + l = hd->hd_data_len - hd->hd_data_off; + p = hd->hd_data + hd->hd_data_off; + r = udp_write(uc, p, l, storage); + } + htsbuf_data_free(q, hd); + } + q->hq_size = 0; + return r; +} diff --git a/src/udp.h b/src/udp.h new file mode 100644 index 00000000..fe302e16 --- /dev/null +++ b/src/udp.h @@ -0,0 +1,69 @@ +/* + * tvheadend, UDP interface + * Copyright (C) 2013 Adam Sutton + * Copyright (C) 2014 Jaroslav Kysela + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef UDP_H_ +#define UDP_H_ + +#include +#include "tcp.h" + +#define UDP_FATAL_ERROR ((void *)-1) + +#define IP_AS_V4(storage, f) ((struct sockaddr_in *)&(storage))->sin_##f +#define IP_AS_V6(storage, f) ((struct sockaddr_in6 *)&(storage))->sin6_##f +#define IP_IN_ADDR(storage) \ + ((storage).ss_family == AF_INET6 ? \ + &((struct sockaddr_in6 *)&(storage))->sin6_addr : \ + (void *)&((struct sockaddr_in *)&(storage))->sin_addr) +#define IP_PORT(storage) \ + ((storage).ss_family == AF_INET6 ? \ + ((struct sockaddr_in6 *)&(storage))->sin6_port : \ + ((struct sockaddr_in *)&(storage))->sin_port) + +typedef struct udp_connection { + char *host; + int port; + int multicast; + char *ifname; + struct sockaddr_storage ip; + int fd; + char *subsystem; + char *name; + int rxtxsize; +} udp_connection_t; + +udp_connection_t * +udp_bind ( const char *subsystem, const char *name, + const char *bindaddr, int port, + const char *ifname, int rxsize ); +udp_connection_t * +udp_connect ( const char *subsystem, const char *name, + const char *host, int port, + const char *ifname, int txsize ); +void +udp_close ( udp_connection_t *uc ); +int +udp_write( udp_connection_t *uc, const void *buf, size_t len, + struct sockaddr_storage *storage ); +int +udp_write_queue( udp_connection_t *uc, htsbuf_queue_t *q, + struct sockaddr_storage *storage ); + + +#endif /* UDP_H_ */