From 17b16d7a76243ac07f980c0222abe2edd1e5699e Mon Sep 17 00:00:00 2001 From: Bernhard Froehlich Date: Sat, 4 May 2013 16:45:51 +0200 Subject: [PATCH] tvhpoll: update all code to use new tvhpoll wrapper rather than epoll --- src/dvb/dvb.h | 3 +- src/dvb/dvb_adapter.c | 29 +++++----- src/dvb/dvb_input_filtered.c | 98 ++++++++++++++++---------------- src/iptv_input.c | 20 ++++--- src/tcp.c | 96 +++++++++++++++---------------- src/timeshift/timeshift_reader.c | 20 ++++--- 6 files changed, 136 insertions(+), 130 deletions(-) diff --git a/src/dvb/dvb.h b/src/dvb/dvb.h index 127d676f..eb6d2b79 100644 --- a/src/dvb/dvb.h +++ b/src/dvb/dvb.h @@ -24,6 +24,7 @@ #include #include "htsmsg.h" #include "psi.h" +#include "tvhpoll.h" struct service; struct th_dvb_table; @@ -204,7 +205,7 @@ typedef struct th_dvb_adapter { th_dvb_mux_instance_t *tda_mux_epg; - int tda_table_epollfd; + tvhpoll_t *tda_table_pd; uint32_t tda_enabled; diff --git a/src/dvb/dvb_adapter.c b/src/dvb/dvb_adapter.c index 00a53704..93f06cdb 100644 --- a/src/dvb/dvb_adapter.c +++ b/src/dvb/dvb_adapter.c @@ -23,7 +23,6 @@ #include #include #include -#include #include #include #include @@ -48,6 +47,7 @@ #include "epggrab.h" #include "diseqc.h" #include "atomic.h" +#include "tvhpoll.h" struct th_dvb_adapter_queue dvb_adapters; struct th_dvb_mux_instance_tree dvb_muxes; @@ -1020,11 +1020,11 @@ static void * dvb_adapter_input_dvr(void *aux) { th_dvb_adapter_t *tda = aux; - int fd = -1, i, r, c, efd, nfds, dmx = -1; + int fd = -1, i, r, c, nfds, dmx = -1; uint8_t tsb[188 * 10]; service_t *t; - struct epoll_event ev; - int delay = 10; + tvhpoll_t *pd; + tvhpoll_event_t ev[2]; /* Install RAW demux */ if (tda->tda_rawmode) { @@ -1040,26 +1040,25 @@ dvb_adapter_input_dvr(void *aux) return NULL; } - /* Create poll */ - efd = epoll_create(2); - memset(&ev, 0, sizeof(ev)); - ev.events = EPOLLIN; - ev.data.fd = tda->tda_dvr_pipe.rd; - epoll_ctl(efd, EPOLL_CTL_ADD, tda->tda_dvr_pipe.rd, &ev); - ev.data.fd = fd; - epoll_ctl(efd, EPOLL_CTL_ADD, fd, &ev); + pd = tvhpoll_create(2); + memset(ev, 0, sizeof(ev)); + ev[0].data.fd = ev[0].fd = tda->tda_dvr_pipe.rd; + ev[0].events = TVHPOLL_IN; + ev[1].data.fd = ev[1].fd = fd; + ev[1].events = TVHPOLL_IN; + tvhpoll_add(pd, ev, 2); r = i = 0; while(1) { /* Wait for input */ - nfds = epoll_wait(efd, &ev, 1, delay); + nfds = tvhpoll_wait(pd, ev, 1, -1); /* No data */ if (nfds < 1) continue; /* Exit */ - if (ev.data.fd != fd) break; + if (ev[0].data.fd != fd) break; /* Read data */ c = read(fd, tsb+r, sizeof(tsb)-r); @@ -1140,7 +1139,7 @@ dvb_adapter_input_dvr(void *aux) if(dmx != -1) close(dmx); - close(efd); + tvhpoll_destroy(pd); close(fd); return NULL; } diff --git a/src/dvb/dvb_input_filtered.c b/src/dvb/dvb_input_filtered.c index a4dddf26..0de91d07 100644 --- a/src/dvb/dvb_input_filtered.c +++ b/src/dvb/dvb_input_filtered.c @@ -27,11 +27,11 @@ #include #include #include -#include #include "tvheadend.h" #include "dvb.h" #include "service.h" +#include "tvhpoll.h" /** * Install filters for a service @@ -58,9 +58,9 @@ open_service(th_dvb_adapter_t *tda, service_t *s) if(fd == -1) { st->es_demuxer_fd = -1; tvhlog(LOG_ERR, "dvb", - "\"%s\" unable to open demuxer \"%s\" for pid %d -- %s", - s->s_identifier, tda->tda_demux_path, - st->es_pid, strerror(errno)); + "\"%s\" unable to open demuxer \"%s\" for pid %d -- %s", + s->s_identifier, tda->tda_demux_path, + st->es_pid, strerror(errno)); continue; } @@ -73,9 +73,9 @@ open_service(th_dvb_adapter_t *tda, service_t *s) if(ioctl(fd, DMX_SET_PES_FILTER, &dmx_param)) { tvhlog(LOG_ERR, "dvb", - "\"%s\" unable to configure demuxer \"%s\" for pid %d -- %s", - s->s_identifier, tda->tda_demux_path, - st->es_pid, strerror(errno)); + "\"%s\" unable to configure demuxer \"%s\" for pid %d -- %s", + s->s_identifier, tda->tda_demux_path, + st->es_pid, strerror(errno)); close(fd); fd = -1; } @@ -113,7 +113,7 @@ static void open_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt) { th_dvb_adapter_t *tda = tdmi->tdmi_adapter; - struct epoll_event e; + tvhpoll_event_t ev; static int tdt_id_tally; tdt->tdt_fd = tvh_open(tda->tda_demux_path, O_RDWR, 0); @@ -122,10 +122,11 @@ open_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt) tdt->tdt_id = ++tdt_id_tally; - e.events = EPOLLIN; - e.data.u64 = ((uint64_t)tdt->tdt_fd << 32) | tdt->tdt_id; + ev.fd = tdt->tdt_fd; + ev.events = TVHPOLL_IN; + ev.data.u64 = ((uint64_t)tdt->tdt_fd << 32) | tdt->tdt_id; - if(epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_ADD, tdt->tdt_fd, &e)) { + if(tvhpoll_add(tda->tda_table_pd, &ev, 1) != 0) { close(tdt->tdt_fd); tdt->tdt_fd = -1; } else { @@ -136,13 +137,13 @@ open_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt) fp.filter.mask[0] = tdt->tdt_mask; if(tdt->tdt_flags & TDT_CRC) - fp.flags |= DMX_CHECK_CRC; + fp.flags |= DMX_CHECK_CRC; fp.flags |= DMX_IMMEDIATE_START; fp.pid = tdt->tdt_pid; if(ioctl(tdt->tdt_fd, DMX_SET_FILTER, &fp)) { - close(tdt->tdt_fd); - tdt->tdt_fd = -1; + close(tdt->tdt_fd); + tdt->tdt_fd = -1; } } } @@ -159,10 +160,12 @@ static void tdt_close_fd(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt) { th_dvb_adapter_t *tda = tdmi->tdmi_adapter; + tvhpoll_event_t ev; assert(tdt->tdt_fd != -1); - epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_DEL, tdt->tdt_fd, NULL); + ev.fd = tdt->tdt_fd; + tvhpoll_rem(tda->tda_table_pd, &ev, 1); close(tdt->tdt_fd); tdt->tdt_fd = -1; @@ -178,51 +181,48 @@ static void * dvb_table_input(void *aux) { th_dvb_adapter_t *tda = aux; - int r, i, tid, fd, x; - struct epoll_event ev[1]; + int r, tid, fd, x; uint8_t sec[4096]; th_dvb_mux_instance_t *tdmi; th_dvb_table_t *tdt; int64_t cycle_barrier = 0; + tvhpoll_event_t ev; while(1) { - x = epoll_wait(tda->tda_table_epollfd, ev, sizeof(ev) / sizeof(ev[0]), -1); + x = tvhpoll_wait(tda->tda_table_pd, &ev, 1, -1); + if (x != 1) continue; - for(i = 0; i < x; i++) { - - tid = ev[i].data.u64 & 0xffffffff; - fd = ev[i].data.u64 >> 32; + tid = ev.data.u64 & 0xffffffff; + fd = ev.data.u64 >> 32; - if(!(ev[i].events & EPOLLIN)) - continue; + if(!(ev.events & TVHPOLL_IN)) + continue; - if((r = read(fd, sec, sizeof(sec))) < 3) - continue; + if((r = read(fd, sec, sizeof(sec))) < 3) + continue; - pthread_mutex_lock(&global_lock); - if((tdmi = tda->tda_mux_current) != NULL) { - LIST_FOREACH(tdt, &tdmi->tdmi_tables, tdt_link) - if(tdt->tdt_id == tid) - break; + pthread_mutex_lock(&global_lock); + if((tdmi = tda->tda_mux_current) != NULL) { + LIST_FOREACH(tdt, &tdmi->tdmi_tables, tdt_link) + if(tdt->tdt_id == tid) + break; - if(tdt != NULL) { - dvb_table_dispatch(sec, r, tdt); + if(tdt != NULL) { + dvb_table_dispatch(sec, r, tdt); - /* Any tables pending (that wants a filter/fd), close this one */ - if(TAILQ_FIRST(&tdmi->tdmi_table_queue) != NULL && - cycle_barrier < getmonoclock()) { - tdt_close_fd(tdmi, tdt); - cycle_barrier = getmonoclock() + 100000; - tdt = TAILQ_FIRST(&tdmi->tdmi_table_queue); - assert(tdt != NULL); - TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link); - - open_table(tdmi, tdt); - } - } + /* Any tables pending (that wants a filter/fd), close this one */ + if(TAILQ_FIRST(&tdmi->tdmi_table_queue) != NULL && + cycle_barrier < getmonoclock()) { + tdt_close_fd(tdmi, tdt); + cycle_barrier = getmonoclock() + 100000; + tdt = TAILQ_FIRST(&tdmi->tdmi_table_queue); + assert(tdt != NULL); + TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link); + open_table(tdmi, tdt); + } } - pthread_mutex_unlock(&global_lock); } + pthread_mutex_unlock(&global_lock); } return NULL; } @@ -232,11 +232,13 @@ static void close_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt) { th_dvb_adapter_t *tda = tdmi->tdmi_adapter; + tvhpoll_event_t ev; if(tdt->tdt_fd == -1) { TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link); } else { - epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_DEL, tdt->tdt_fd, NULL); + ev.fd = tdt->tdt_fd; + tvhpoll_rem(tda->tda_table_pd, &ev, 1); close(tdt->tdt_fd); } } @@ -253,7 +255,7 @@ dvb_input_filtered_setup(th_dvb_adapter_t *tda) tda->tda_close_table = close_table; pthread_t ptid; - tda->tda_table_epollfd = epoll_create(50); + tda->tda_table_pd = tvhpoll_create(50); pthread_create(&ptid, NULL, dvb_table_input, tda); } diff --git a/src/iptv_input.c b/src/iptv_input.c index 7ca96f6d..1b232e81 100644 --- a/src/iptv_input.c +++ b/src/iptv_input.c @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -40,6 +39,7 @@ #include "tsdemux.h" #include "psi.h" #include "settings.h" +#include "tvhpoll.h" #if defined(PLATFORM_LINUX) #include @@ -52,8 +52,8 @@ # endif #endif -static int iptv_thread_running; -static int iptv_epollfd; +static int iptv_thread_running; +static tvhpoll_t *iptv_poll; static pthread_mutex_t iptv_recvmutex; struct service_list iptv_all_services; /* All IPTV services */ @@ -137,11 +137,11 @@ iptv_thread(void *aux) { int nfds, fd, r, j, hlen; uint8_t tsb[65536], *buf; - struct epoll_event ev; + tvhpoll_event_t ev; service_t *t; while(1) { - nfds = epoll_wait(iptv_epollfd, &ev, 1, -1); + nfds = tvhpoll_wait(iptv_poll, &ev, 1, -1); if(nfds == -1) { tvhlog(LOG_ERR, "IPTV", "epoll() error -- %s, sleeping 1 second", strerror(errno)); @@ -220,13 +220,13 @@ iptv_service_start(service_t *t, unsigned int weight, int force_start) struct sockaddr_in sin; struct sockaddr_in6 sin6; struct ifreq ifr; - struct epoll_event ev; + tvhpoll_event_t ev; assert(t->s_iptv_fd == -1); if(iptv_thread_running == 0) { iptv_thread_running = 1; - iptv_epollfd = epoll_create(10); + iptv_poll = tvhpoll_create(10); pthread_create(&tid, NULL, iptv_thread, NULL); } @@ -345,9 +345,10 @@ iptv_service_start(service_t *t, unsigned int weight, int force_start) resize, strerror(errno)); memset(&ev, 0, sizeof(ev)); - ev.events = EPOLLIN; + ev.events = TVHPOLL_IN; + ev.fd = fd; ev.data.fd = fd; - if(epoll_ctl(iptv_epollfd, EPOLL_CTL_ADD, fd, &ev) == -1) { + if(tvhpoll_add(iptv_poll, &ev, 1) == -1) { tvhlog(LOG_ERR, "IPTV", "\"%s\" cannot add to epoll set -- %s", t->s_identifier, strerror(errno)); close(fd); @@ -451,6 +452,7 @@ iptv_service_stop(service_t *t) #endif } close(t->s_iptv_fd); // Automatically removes fd from epoll set + // TODO: this is an issue t->s_iptv_fd = -1; } diff --git a/src/tcp.c b/src/tcp.c index fc134e4f..a176bdac 100644 --- a/src/tcp.c +++ b/src/tcp.c @@ -18,7 +18,6 @@ #include #include -#include #include #include #include @@ -34,6 +33,7 @@ #include "tcp.h" #include "tvheadend.h" +#include "tvhpoll.h" int tcp_preferred_address_family = AF_INET; @@ -144,16 +144,16 @@ tcp_connect(const char *hostname, int port, char *errbuf, size_t errbufsize, r = poll(&pfd, 1, timeout * 1000); if(r == 0) { - /* Timeout */ - snprintf(errbuf, errbufsize, "Connection attempt timed out"); - close(fd); - return -1; + /* Timeout */ + snprintf(errbuf, errbufsize, "Connection attempt timed out"); + close(fd); + return -1; } if(r == -1) { - snprintf(errbuf, errbufsize, "poll() error: %s", strerror(errno)); - close(fd); - return -1; + snprintf(errbuf, errbufsize, "poll() error: %s", strerror(errno)); + close(fd); + return -1; } getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen); @@ -331,7 +331,7 @@ tcp_read_timeout(int fd, void *buf, size_t len, int timeout) x = recv(fd, buf + tot, len - tot, MSG_DONTWAIT); if(x == -1) { if(errno == EAGAIN) - continue; + continue; return errno; } @@ -373,7 +373,7 @@ tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen) /** * */ -static int tcp_server_epoll_fd; +static tvhpoll_t *tcp_server_poll; typedef struct tcp_server { tcp_server_callback_t *start; @@ -438,8 +438,8 @@ tcp_server_start(void *aux) static void * tcp_server_loop(void *aux) { - int r, i; - struct epoll_event ev[1]; + int r; + tvhpoll_event_t ev; tcp_server_t *ts; tcp_server_launch_t *tsl; pthread_attr_t attr; @@ -450,46 +450,45 @@ tcp_server_loop(void *aux) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); while(1) { - r = epoll_wait(tcp_server_epoll_fd, ev, sizeof(ev) / sizeof(ev[0]), -1); + r = tvhpoll_wait(tcp_server_poll, &ev, 1, -1); if(r == -1) { - perror("tcp_server: epoll_wait"); + perror("tcp_server: tchpoll_wait"); continue; } - for(i = 0; i < r; i++) { - ts = ev[i].data.ptr; + if (r == 0) continue; - if(ev[i].events & EPOLLHUP) { - close(ts->serverfd); - free(ts); - continue; - } + ts = ev.data.ptr; - if(ev[i].events & EPOLLIN) { - tsl = malloc(sizeof(tcp_server_launch_t)); - tsl->start = ts->start; - tsl->opaque = ts->opaque; - slen = sizeof(struct sockaddr_storage); + if(ev.events & TVHPOLL_HUP) { + close(ts->serverfd); + free(ts); + continue; + } - tsl->fd = accept(ts->serverfd, - (struct sockaddr *)&tsl->peer, &slen); - if(tsl->fd == -1) { - perror("accept"); - free(tsl); - sleep(1); - continue; - } + if(ev.events & TVHPOLL_IN) { + tsl = malloc(sizeof(tcp_server_launch_t)); + tsl->start = ts->start; + tsl->opaque = ts->opaque; + slen = sizeof(struct sockaddr_storage); + tsl->fd = accept(ts->serverfd, + (struct sockaddr *)&tsl->peer, &slen); + if(tsl->fd == -1) { + perror("accept"); + free(tsl); + sleep(1); + continue; + } - slen = sizeof(struct sockaddr_storage); - if(getsockname(tsl->fd, (struct sockaddr *)&tsl->self, &slen)) { - close(tsl->fd); - free(tsl); - continue; - } + slen = sizeof(struct sockaddr_storage); + if(getsockname(tsl->fd, (struct sockaddr *)&tsl->self, &slen)) { + close(tsl->fd); + free(tsl); + continue; + } - pthread_create(&tid, &attr, tcp_server_start, tsl); - } + pthread_create(&tid, &attr, tcp_server_start, tsl); } } return NULL; @@ -502,14 +501,14 @@ void * tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start, void *opaque) { int fd, x; - struct epoll_event e; + tvhpoll_event_t ev; tcp_server_t *ts; struct addrinfo hints, *res, *ressave, *use = NULL; char *portBuf = (char*)malloc(6); int one = 1; int zero = 0; - memset(&e, 0, sizeof(e)); + memset(&ev, 0, sizeof(ev)); snprintf(portBuf, 6, "%d", port); @@ -570,9 +569,10 @@ tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start, ts->start = start; ts->opaque = opaque; - e.events = EPOLLIN; - e.data.ptr = ts; - epoll_ctl(tcp_server_epoll_fd, EPOLL_CTL_ADD, fd, &e); + ev.fd = fd; + ev.events = TVHPOLL_IN; + ev.data.ptr = ts; + tvhpoll_add(tcp_server_poll, &ev, 1); return ts; } @@ -588,7 +588,7 @@ tcp_server_init(int opt_ipv6) if(opt_ipv6) tcp_preferred_address_family = AF_INET6; - tcp_server_epoll_fd = epoll_create(10); + tcp_server_poll = tvhpoll_create(10); pthread_create(&tid, NULL, tcp_server_loop, NULL); } diff --git a/src/timeshift/timeshift_reader.c b/src/timeshift/timeshift_reader.c index e9b3ca53..3eac00a6 100644 --- a/src/timeshift/timeshift_reader.c +++ b/src/timeshift/timeshift_reader.c @@ -21,6 +21,7 @@ #include "timeshift.h" #include "timeshift/private.h" #include "atomic.h" +#include "tvhpoll.h" #include #include @@ -399,7 +400,7 @@ static int _timeshift_flush_to_live void *timeshift_reader ( void *p ) { timeshift_t *ts = p; - int efd, nfds, end, fd = -1, run = 1, wait = -1; + int nfds, end, fd = -1, run = 1, wait = -1; timeshift_file_t *cur_file = NULL; off_t cur_off = 0; int cur_speed = 100, keyframe_mode = 0; @@ -409,13 +410,13 @@ void *timeshift_reader ( void *p ) timeshift_index_iframe_t *tsi = NULL; streaming_skip_t *skip = NULL; time_t last_status = 0; + tvhpoll_t *pd; + tvhpoll_event_t ev = { 0 }; - /* Poll */ - struct epoll_event ev = { 0 }; - efd = epoll_create(1); - ev.events = EPOLLIN; - ev.data.fd = ts->rd_pipe.rd; - epoll_ctl(efd, EPOLL_CTL_ADD, ev.data.fd, &ev); + pd = tvhpoll_create(1); + ev.fd = ts->rd_pipe.rd; + ev.events = TVHPOLL_IN; + tvhpoll_add(pd, &ev, 1); /* Output */ while (run) { @@ -427,7 +428,7 @@ void *timeshift_reader ( void *p ) /* Wait for data */ if(wait) - nfds = epoll_wait(efd, &ev, 1, wait); + nfds = tvhpoll_wait(pd, &ev, 1, wait); else nfds = 0; wait = -1; @@ -438,7 +439,7 @@ void *timeshift_reader ( void *p ) /* Control */ pthread_mutex_lock(&ts->state_mutex); if (nfds == 1) { - if (_read_msg(ev.data.fd, &ctrl) > 0) { + if (_read_msg(ts->rd_pipe.rd, &ctrl) > 0) { /* Exit */ if (ctrl->sm_type == SMT_EXIT) { @@ -802,6 +803,7 @@ void *timeshift_reader ( void *p ) } /* Cleanup */ + tvhpoll_destroy(pd); if (fd != -1) close(fd); if (sm) streaming_msg_free(sm); if (ctrl) streaming_msg_free(ctrl);