udp: create multirecv interface

This commit is contained in:
Jaroslav Kysela 2014-04-18 23:41:20 +02:00
parent 140b6983a5
commit a71b2e3001
3 changed files with 129 additions and 72 deletions

View file

@ -17,8 +17,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _GNU_SOURCE
#include <sys/socket.h>
#include <fcntl.h>
#include "tvheadend.h"
#include "tvhpoll.h"
@ -26,57 +24,6 @@
#include "http.h"
#include "satip_private.h"
#define PKTS 64
#ifndef CONFIG_RECVMMSG
#ifdef __linux__
/* define the syscall - works only for linux */
#include <linux/unistd.h>
struct mmsghdr {
struct msghdr msg_hdr;
unsigned int msg_len;
};
int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
unsigned int flags, struct timespec *timeout);
#ifdef __NR_recvmmsg
int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
unsigned int flags, struct timespec *timeout)
{
return syscall(__NR_recvmmsg, sockfd, msgvec, vlen, flags, timeout);
}
#else
#undef PKTS
#define PKTS 1
/* receive only single packet */
int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
unsigned int flags, struct timespec *timeout)
{
ssize_t r = recvmsg(sockfd, &msgvec->msg_hdr, flags);
if (r < 0)
return r;
msgvec->msg_len = r;
return 1;
}
#endif
#else /* not __linux__ */
#error "Add recvmmsg() support for your platform!!!"
#endif
#endif /* !CONFIG_RECVMMSG */
static int
satip_frontend_tune1
( satip_frontend_t *lfe, mpegts_mux_instance_t *mmi );
@ -777,18 +724,18 @@ satip_frontend_pid_changed( http_client_t *rtsp,
static void *
satip_frontend_input_thread ( void *aux )
{
#define RTP_PKTS 64
#define RTP_PKT_SIZE 1472 /* this is maximum UDP payload (standard ethernet) */
#define HTTP_CMD_NONE 9874
satip_frontend_t *lfe = aux;
mpegts_mux_instance_t *mmi = lfe->sf_mmi;
http_client_t *rtsp;
dvb_mux_t *lm;
char buf[256];
uint8_t tsb[PKTS][1356 + 128];
struct iovec *iovec;
uint8_t rtcp[2048];
uint8_t *p;
sbuf_t sb;
struct iovec iov[PKTS];
struct mmsghdr msg[PKTS];
int pos, nfds, i, r;
size_t c;
int tc;
@ -796,6 +743,7 @@ satip_frontend_input_thread ( void *aux )
tvhpoll_t *efd;
int changing = 0, ms = -1, fatal = 0;
uint32_t seq = -1, nseq;
udp_multirecv_t um;
lfe->mi_display_name((mpegts_input_t*)lfe, buf, sizeof(buf));
@ -827,16 +775,6 @@ satip_frontend_input_thread ( void *aux )
tvhpoll_add(efd, ev, 4);
rtsp->hc_efd = efd;
/* Read */
memset(&msg, 0, sizeof(msg));
for (i = 0; i < PKTS; i++) {
msg[i].msg_hdr.msg_iov = &iov[i];
msg[i].msg_hdr.msg_iovlen = 1;
iov[i].iov_base = tsb[i];
iov[i].iov_len = sizeof(tsb[0]);
}
r = satip_rtsp_setup(rtsp,
lfe->sf_position, lfe->sf_number,
lfe->sf_rtp_port, &lm->lm_tuning,
@ -846,6 +784,7 @@ satip_frontend_input_thread ( void *aux )
return NULL;
}
udp_multirecv_init(&um, RTP_PKTS, RTP_PKT_SIZE);
sbuf_init_fixed(&sb, 18800);
while (tvheadend_running && !fatal) {
@ -853,8 +792,8 @@ satip_frontend_input_thread ( void *aux )
nfds = tvhpoll_wait(efd, ev, 1, ms);
if (nfds > 0 && ev[0].data.ptr == NULL) {
c = read(lfe->sf_dvr_pipe.rd, tsb[0], 1);
if (c == 1 && tsb[0][0] == 'c') {
c = read(lfe->sf_dvr_pipe.rd, rtcp, 1);
if (c == 1 && rtcp[0] == 'c') {
ms = 20;
changing = 1;
continue;
@ -932,7 +871,7 @@ satip_frontend_input_thread ( void *aux )
if (ev[0].data.ptr != lfe->sf_rtp)
continue;
tc = recvmmsg(lfe->sf_rtp->fd, msg, PKTS, MSG_DONTWAIT, NULL);
tc = udp_multirecv_read(&um, lfe->sf_rtp->fd, RTP_PKTS, &iovec);
if (tc < 0) {
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)
@ -941,14 +880,14 @@ satip_frontend_input_thread ( void *aux )
tvhlog(LOG_WARNING, "satip", "%s - recvmsg() EOVERFLOW", buf);
continue;
}
tvhlog(LOG_ERR, "satip", "%s - recv() error %d (%s)",
tvhlog(LOG_ERR, "satip", "%s - multirecv error %d (%s)",
buf, errno, strerror(errno));
break;
}
for (i = 0; i < tc; i++) {
p = tsb[i];
c = msg[i].msg_len;
p = iovec[i].iov_base;
c = iovec[i].iov_len;
/* Strip RTP header */
if (c < 12)
@ -980,6 +919,7 @@ satip_frontend_input_thread ( void *aux )
}
sbuf_free(&sb);
udp_multirecv_free(&um);
ev[0].events = TVHPOLL_IN;
ev[0].fd = lfe->sf_rtp->fd;

100
src/udp.c
View file

@ -18,6 +18,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _GNU_SOURCE
#include "tvheadend.h"
#include "udp.h"
@ -380,3 +381,102 @@ udp_write_queue( udp_connection_t *uc, htsbuf_queue_t *q,
q->hq_size = 0;
return r;
}
/*
* UDP multi packet receive support
*/
#ifndef CONFIG_RECVMMSG
#ifdef __linux__
/* define the syscall - works only for linux */
#include <linux/unistd.h>
struct mmsghdr {
struct msghdr msg_hdr;
unsigned int msg_len;
};
int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
unsigned int flags, struct timespec *timeout);
#ifdef __NR_recvmmsg
int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
unsigned int flags, struct timespec *timeout)
{
return syscall(__NR_recvmmsg, sockfd, msgvec, vlen, flags, timeout);
}
#else
#undef PKTS
#define PKTS 1
/* receive only single packet */
int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
unsigned int flags, struct timespec *timeout)
{
ssize_t r = recvmsg(sockfd, &msgvec->msg_hdr, flags);
if (r < 0)
return r;
msgvec->msg_len = r;
return 1;
}
#endif
#else /* not __linux__ */
#error "Add recvmmsg() support for your platform!!!"
#endif
#endif /* !CONFIG_RECVMMSG */
void
udp_multirecv_init( udp_multirecv_t *um, int packets, int psize )
{
int i;
um->um_psize = psize;
um->um_packets = packets;
um->um_data = malloc(packets * psize);
um->um_iovec = malloc(packets * sizeof(struct iovec));
um->um_riovec = malloc(packets * sizeof(struct iovec));
um->um_msg = calloc(packets, sizeof(struct mmsghdr));
for (i = 0; i < packets; i++) {
((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iov = &um->um_iovec[i];
((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iovlen = 1;
um->um_iovec[i].iov_base = /* follow thru */
um->um_riovec[i].iov_base = um->um_data + i * psize;
um->um_iovec[i].iov_len = psize;
}
}
void
udp_multirecv_free( udp_multirecv_t *um )
{
free(um->um_msg); um->um_msg = NULL;
free(um->um_iovec); um->um_iovec = NULL;
free(um->um_data); um->um_data = NULL;
um->um_psize = 0;
um->um_packets = 0;
}
int
udp_multirecv_read( udp_multirecv_t *um, int fd, int packets,
struct iovec **iovec )
{
int n, i;
if (packets > um->um_packets)
packets = um->um_packets;
n = recvmmsg(fd, (struct mmsghdr *)um->um_msg, packets, MSG_DONTWAIT, NULL);
if (n > 0) {
for (i = 0; i < n; i++)
um->um_riovec[i].iov_len = ((struct mmsghdr *)um->um_msg)[i].msg_len;
*iovec = um->um_riovec;
}
return n;
}

View file

@ -70,5 +70,22 @@ int
udp_write_queue( udp_connection_t *uc, htsbuf_queue_t *q,
struct sockaddr_storage *storage );
typedef struct udp_multirecv {
int um_psize;
int um_packets;
uint8_t *um_data;
struct iovec *um_iovec;
struct iovec *um_riovec;
struct mmsghdr *um_msg;
} udp_multirecv_t;
void
udp_multirecv_init( udp_multirecv_t *um, int packets, int psize );
void
udp_multirecv_free( udp_multirecv_t *um );
int
udp_multirecv_read( udp_multirecv_t *um, int fd, int packets,
struct iovec **iovec );
#endif /* UDP_H_ */