From a71b2e3001ab8e48dfc272a51467a81da2e87aec Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Fri, 18 Apr 2014 23:41:20 +0200 Subject: [PATCH] udp: create multirecv interface --- src/input/mpegts/satip/satip_frontend.c | 84 +++----------------- src/udp.c | 100 ++++++++++++++++++++++++ src/udp.h | 17 ++++ 3 files changed, 129 insertions(+), 72 deletions(-) diff --git a/src/input/mpegts/satip/satip_frontend.c b/src/input/mpegts/satip/satip_frontend.c index 19c53c8e..430a0cfc 100644 --- a/src/input/mpegts/satip/satip_frontend.c +++ b/src/input/mpegts/satip/satip_frontend.c @@ -17,8 +17,6 @@ * along with this program. If not, see . */ -#define _GNU_SOURCE -#include #include #include "tvheadend.h" #include "tvhpoll.h" @@ -26,57 +24,6 @@ #include "http.h" #include "satip_private.h" -#define PKTS 64 - -#ifndef CONFIG_RECVMMSG - -#ifdef __linux__ - -/* define the syscall - works only for linux */ - -#include - -struct mmsghdr { - struct msghdr msg_hdr; - unsigned int msg_len; -}; - -int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, - unsigned int flags, struct timespec *timeout); - -#ifdef __NR_recvmmsg - -int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, - unsigned int flags, struct timespec *timeout) -{ - return syscall(__NR_recvmmsg, sockfd, msgvec, vlen, flags, timeout); -} - -#else - -#undef PKTS -#define PKTS 1 -/* receive only single packet */ -int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, - unsigned int flags, struct timespec *timeout) -{ - ssize_t r = recvmsg(sockfd, &msgvec->msg_hdr, flags); - if (r < 0) - return r; - msgvec->msg_len = r; - return 1; -} - -#endif - -#else /* not __linux__ */ - -#error "Add recvmmsg() support for your platform!!!" - -#endif - -#endif /* !CONFIG_RECVMMSG */ - static int satip_frontend_tune1 ( satip_frontend_t *lfe, mpegts_mux_instance_t *mmi ); @@ -777,18 +724,18 @@ satip_frontend_pid_changed( http_client_t *rtsp, static void * satip_frontend_input_thread ( void *aux ) { +#define RTP_PKTS 64 +#define RTP_PKT_SIZE 1472 /* this is maximum UDP payload (standard ethernet) */ #define HTTP_CMD_NONE 9874 satip_frontend_t *lfe = aux; mpegts_mux_instance_t *mmi = lfe->sf_mmi; http_client_t *rtsp; dvb_mux_t *lm; char buf[256]; - uint8_t tsb[PKTS][1356 + 128]; + struct iovec *iovec; uint8_t rtcp[2048]; uint8_t *p; sbuf_t sb; - struct iovec iov[PKTS]; - struct mmsghdr msg[PKTS]; int pos, nfds, i, r; size_t c; int tc; @@ -796,6 +743,7 @@ satip_frontend_input_thread ( void *aux ) tvhpoll_t *efd; int changing = 0, ms = -1, fatal = 0; uint32_t seq = -1, nseq; + udp_multirecv_t um; lfe->mi_display_name((mpegts_input_t*)lfe, buf, sizeof(buf)); @@ -827,16 +775,6 @@ satip_frontend_input_thread ( void *aux ) tvhpoll_add(efd, ev, 4); rtsp->hc_efd = efd; - /* Read */ - memset(&msg, 0, sizeof(msg)); - for (i = 0; i < PKTS; i++) { - msg[i].msg_hdr.msg_iov = &iov[i]; - msg[i].msg_hdr.msg_iovlen = 1; - iov[i].iov_base = tsb[i]; - iov[i].iov_len = sizeof(tsb[0]); - } - - r = satip_rtsp_setup(rtsp, lfe->sf_position, lfe->sf_number, lfe->sf_rtp_port, &lm->lm_tuning, @@ -846,6 +784,7 @@ satip_frontend_input_thread ( void *aux ) return NULL; } + udp_multirecv_init(&um, RTP_PKTS, RTP_PKT_SIZE); sbuf_init_fixed(&sb, 18800); while (tvheadend_running && !fatal) { @@ -853,8 +792,8 @@ satip_frontend_input_thread ( void *aux ) nfds = tvhpoll_wait(efd, ev, 1, ms); if (nfds > 0 && ev[0].data.ptr == NULL) { - c = read(lfe->sf_dvr_pipe.rd, tsb[0], 1); - if (c == 1 && tsb[0][0] == 'c') { + c = read(lfe->sf_dvr_pipe.rd, rtcp, 1); + if (c == 1 && rtcp[0] == 'c') { ms = 20; changing = 1; continue; @@ -932,7 +871,7 @@ satip_frontend_input_thread ( void *aux ) if (ev[0].data.ptr != lfe->sf_rtp) continue; - tc = recvmmsg(lfe->sf_rtp->fd, msg, PKTS, MSG_DONTWAIT, NULL); + tc = udp_multirecv_read(&um, lfe->sf_rtp->fd, RTP_PKTS, &iovec); if (tc < 0) { if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK) @@ -941,14 +880,14 @@ satip_frontend_input_thread ( void *aux ) tvhlog(LOG_WARNING, "satip", "%s - recvmsg() EOVERFLOW", buf); continue; } - tvhlog(LOG_ERR, "satip", "%s - recv() error %d (%s)", + tvhlog(LOG_ERR, "satip", "%s - multirecv error %d (%s)", buf, errno, strerror(errno)); break; } for (i = 0; i < tc; i++) { - p = tsb[i]; - c = msg[i].msg_len; + p = iovec[i].iov_base; + c = iovec[i].iov_len; /* Strip RTP header */ if (c < 12) @@ -980,6 +919,7 @@ satip_frontend_input_thread ( void *aux ) } sbuf_free(&sb); + udp_multirecv_free(&um); ev[0].events = TVHPOLL_IN; ev[0].fd = lfe->sf_rtp->fd; diff --git a/src/udp.c b/src/udp.c index 335e0f25..a0789f09 100644 --- a/src/udp.c +++ b/src/udp.c @@ -18,6 +18,7 @@ * along with this program. If not, see . */ +#define _GNU_SOURCE #include "tvheadend.h" #include "udp.h" @@ -380,3 +381,102 @@ udp_write_queue( udp_connection_t *uc, htsbuf_queue_t *q, q->hq_size = 0; return r; } + +/* + * UDP multi packet receive support + */ + +#ifndef CONFIG_RECVMMSG + +#ifdef __linux__ + +/* define the syscall - works only for linux */ + +#include + +struct mmsghdr { + struct msghdr msg_hdr; + unsigned int msg_len; +}; + +int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, + unsigned int flags, struct timespec *timeout); + +#ifdef __NR_recvmmsg + +int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, + unsigned int flags, struct timespec *timeout) +{ + return syscall(__NR_recvmmsg, sockfd, msgvec, vlen, flags, timeout); +} + +#else + +#undef PKTS +#define PKTS 1 +/* receive only single packet */ +int recvmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, + unsigned int flags, struct timespec *timeout) +{ + ssize_t r = recvmsg(sockfd, &msgvec->msg_hdr, flags); + if (r < 0) + return r; + msgvec->msg_len = r; + return 1; +} + +#endif + +#else /* not __linux__ */ + +#error "Add recvmmsg() support for your platform!!!" + +#endif + +#endif /* !CONFIG_RECVMMSG */ + +void +udp_multirecv_init( udp_multirecv_t *um, int packets, int psize ) +{ + int i; + + um->um_psize = psize; + um->um_packets = packets; + um->um_data = malloc(packets * psize); + um->um_iovec = malloc(packets * sizeof(struct iovec)); + um->um_riovec = malloc(packets * sizeof(struct iovec)); + um->um_msg = calloc(packets, sizeof(struct mmsghdr)); + for (i = 0; i < packets; i++) { + ((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iov = &um->um_iovec[i]; + ((struct mmsghdr *)um->um_msg)[i].msg_hdr.msg_iovlen = 1; + um->um_iovec[i].iov_base = /* follow thru */ + um->um_riovec[i].iov_base = um->um_data + i * psize; + um->um_iovec[i].iov_len = psize; + } +} + +void +udp_multirecv_free( udp_multirecv_t *um ) +{ + free(um->um_msg); um->um_msg = NULL; + free(um->um_iovec); um->um_iovec = NULL; + free(um->um_data); um->um_data = NULL; + um->um_psize = 0; + um->um_packets = 0; +} + +int +udp_multirecv_read( udp_multirecv_t *um, int fd, int packets, + struct iovec **iovec ) +{ + int n, i; + if (packets > um->um_packets) + packets = um->um_packets; + n = recvmmsg(fd, (struct mmsghdr *)um->um_msg, packets, MSG_DONTWAIT, NULL); + if (n > 0) { + for (i = 0; i < n; i++) + um->um_riovec[i].iov_len = ((struct mmsghdr *)um->um_msg)[i].msg_len; + *iovec = um->um_riovec; + } + return n; +} diff --git a/src/udp.h b/src/udp.h index 6f2fd904..c5fa2eef 100644 --- a/src/udp.h +++ b/src/udp.h @@ -70,5 +70,22 @@ int udp_write_queue( udp_connection_t *uc, htsbuf_queue_t *q, struct sockaddr_storage *storage ); +typedef struct udp_multirecv { + int um_psize; + int um_packets; + uint8_t *um_data; + struct iovec *um_iovec; + struct iovec *um_riovec; + struct mmsghdr *um_msg; +} udp_multirecv_t; + +void +udp_multirecv_init( udp_multirecv_t *um, int packets, int psize ); +void +udp_multirecv_free( udp_multirecv_t *um ); +int +udp_multirecv_read( udp_multirecv_t *um, int fd, int packets, + struct iovec **iovec ); + #endif /* UDP_H_ */