tvhpoll: update all code to use new tvhpoll wrapper rather than epoll

This commit is contained in:
Bernhard Froehlich 2013-05-04 16:45:51 +02:00 committed by Adam Sutton
parent 7d122d8dea
commit 17b16d7a76
6 changed files with 136 additions and 130 deletions

View file

@ -24,6 +24,7 @@
#include <pthread.h>
#include "htsmsg.h"
#include "psi.h"
#include "tvhpoll.h"
struct service;
struct th_dvb_table;
@ -204,7 +205,7 @@ typedef struct th_dvb_adapter {
th_dvb_mux_instance_t *tda_mux_epg;
int tda_table_epollfd;
tvhpoll_t *tda_table_pd;
uint32_t tda_enabled;

View file

@ -23,7 +23,6 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <dirent.h>
#include <fcntl.h>
@ -48,6 +47,7 @@
#include "epggrab.h"
#include "diseqc.h"
#include "atomic.h"
#include "tvhpoll.h"
struct th_dvb_adapter_queue dvb_adapters;
struct th_dvb_mux_instance_tree dvb_muxes;
@ -1020,11 +1020,11 @@ static void *
dvb_adapter_input_dvr(void *aux)
{
th_dvb_adapter_t *tda = aux;
int fd = -1, i, r, c, efd, nfds, dmx = -1;
int fd = -1, i, r, c, nfds, dmx = -1;
uint8_t tsb[188 * 10];
service_t *t;
struct epoll_event ev;
int delay = 10;
tvhpoll_t *pd;
tvhpoll_event_t ev[2];
/* Install RAW demux */
if (tda->tda_rawmode) {
@ -1040,26 +1040,25 @@ dvb_adapter_input_dvr(void *aux)
return NULL;
}
/* Create poll */
efd = epoll_create(2);
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN;
ev.data.fd = tda->tda_dvr_pipe.rd;
epoll_ctl(efd, EPOLL_CTL_ADD, tda->tda_dvr_pipe.rd, &ev);
ev.data.fd = fd;
epoll_ctl(efd, EPOLL_CTL_ADD, fd, &ev);
pd = tvhpoll_create(2);
memset(ev, 0, sizeof(ev));
ev[0].data.fd = ev[0].fd = tda->tda_dvr_pipe.rd;
ev[0].events = TVHPOLL_IN;
ev[1].data.fd = ev[1].fd = fd;
ev[1].events = TVHPOLL_IN;
tvhpoll_add(pd, ev, 2);
r = i = 0;
while(1) {
/* Wait for input */
nfds = epoll_wait(efd, &ev, 1, delay);
nfds = tvhpoll_wait(pd, ev, 1, -1);
/* No data */
if (nfds < 1) continue;
/* Exit */
if (ev.data.fd != fd) break;
if (ev[0].data.fd != fd) break;
/* Read data */
c = read(fd, tsb+r, sizeof(tsb)-r);
@ -1140,7 +1139,7 @@ dvb_adapter_input_dvr(void *aux)
if(dmx != -1)
close(dmx);
close(efd);
tvhpoll_destroy(pd);
close(fd);
return NULL;
}

View file

@ -27,11 +27,11 @@
#include <fcntl.h>
#include <linux/dvb/frontend.h>
#include <linux/dvb/dmx.h>
#include <sys/epoll.h>
#include "tvheadend.h"
#include "dvb.h"
#include "service.h"
#include "tvhpoll.h"
/**
* Install filters for a service
@ -58,9 +58,9 @@ open_service(th_dvb_adapter_t *tda, service_t *s)
if(fd == -1) {
st->es_demuxer_fd = -1;
tvhlog(LOG_ERR, "dvb",
"\"%s\" unable to open demuxer \"%s\" for pid %d -- %s",
s->s_identifier, tda->tda_demux_path,
st->es_pid, strerror(errno));
"\"%s\" unable to open demuxer \"%s\" for pid %d -- %s",
s->s_identifier, tda->tda_demux_path,
st->es_pid, strerror(errno));
continue;
}
@ -73,9 +73,9 @@ open_service(th_dvb_adapter_t *tda, service_t *s)
if(ioctl(fd, DMX_SET_PES_FILTER, &dmx_param)) {
tvhlog(LOG_ERR, "dvb",
"\"%s\" unable to configure demuxer \"%s\" for pid %d -- %s",
s->s_identifier, tda->tda_demux_path,
st->es_pid, strerror(errno));
"\"%s\" unable to configure demuxer \"%s\" for pid %d -- %s",
s->s_identifier, tda->tda_demux_path,
st->es_pid, strerror(errno));
close(fd);
fd = -1;
}
@ -113,7 +113,7 @@ static void
open_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
{
th_dvb_adapter_t *tda = tdmi->tdmi_adapter;
struct epoll_event e;
tvhpoll_event_t ev;
static int tdt_id_tally;
tdt->tdt_fd = tvh_open(tda->tda_demux_path, O_RDWR, 0);
@ -122,10 +122,11 @@ open_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
tdt->tdt_id = ++tdt_id_tally;
e.events = EPOLLIN;
e.data.u64 = ((uint64_t)tdt->tdt_fd << 32) | tdt->tdt_id;
ev.fd = tdt->tdt_fd;
ev.events = TVHPOLL_IN;
ev.data.u64 = ((uint64_t)tdt->tdt_fd << 32) | tdt->tdt_id;
if(epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_ADD, tdt->tdt_fd, &e)) {
if(tvhpoll_add(tda->tda_table_pd, &ev, 1) != 0) {
close(tdt->tdt_fd);
tdt->tdt_fd = -1;
} else {
@ -136,13 +137,13 @@ open_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
fp.filter.mask[0] = tdt->tdt_mask;
if(tdt->tdt_flags & TDT_CRC)
fp.flags |= DMX_CHECK_CRC;
fp.flags |= DMX_CHECK_CRC;
fp.flags |= DMX_IMMEDIATE_START;
fp.pid = tdt->tdt_pid;
if(ioctl(tdt->tdt_fd, DMX_SET_FILTER, &fp)) {
close(tdt->tdt_fd);
tdt->tdt_fd = -1;
close(tdt->tdt_fd);
tdt->tdt_fd = -1;
}
}
}
@ -159,10 +160,12 @@ static void
tdt_close_fd(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
{
th_dvb_adapter_t *tda = tdmi->tdmi_adapter;
tvhpoll_event_t ev;
assert(tdt->tdt_fd != -1);
epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_DEL, tdt->tdt_fd, NULL);
ev.fd = tdt->tdt_fd;
tvhpoll_rem(tda->tda_table_pd, &ev, 1);
close(tdt->tdt_fd);
tdt->tdt_fd = -1;
@ -178,51 +181,48 @@ static void *
dvb_table_input(void *aux)
{
th_dvb_adapter_t *tda = aux;
int r, i, tid, fd, x;
struct epoll_event ev[1];
int r, tid, fd, x;
uint8_t sec[4096];
th_dvb_mux_instance_t *tdmi;
th_dvb_table_t *tdt;
int64_t cycle_barrier = 0;
tvhpoll_event_t ev;
while(1) {
x = epoll_wait(tda->tda_table_epollfd, ev, sizeof(ev) / sizeof(ev[0]), -1);
x = tvhpoll_wait(tda->tda_table_pd, &ev, 1, -1);
if (x != 1) continue;
for(i = 0; i < x; i++) {
tid = ev[i].data.u64 & 0xffffffff;
fd = ev[i].data.u64 >> 32;
tid = ev.data.u64 & 0xffffffff;
fd = ev.data.u64 >> 32;
if(!(ev[i].events & EPOLLIN))
continue;
if(!(ev.events & TVHPOLL_IN))
continue;
if((r = read(fd, sec, sizeof(sec))) < 3)
continue;
if((r = read(fd, sec, sizeof(sec))) < 3)
continue;
pthread_mutex_lock(&global_lock);
if((tdmi = tda->tda_mux_current) != NULL) {
LIST_FOREACH(tdt, &tdmi->tdmi_tables, tdt_link)
if(tdt->tdt_id == tid)
break;
pthread_mutex_lock(&global_lock);
if((tdmi = tda->tda_mux_current) != NULL) {
LIST_FOREACH(tdt, &tdmi->tdmi_tables, tdt_link)
if(tdt->tdt_id == tid)
break;
if(tdt != NULL) {
dvb_table_dispatch(sec, r, tdt);
if(tdt != NULL) {
dvb_table_dispatch(sec, r, tdt);
/* Any tables pending (that wants a filter/fd), close this one */
if(TAILQ_FIRST(&tdmi->tdmi_table_queue) != NULL &&
cycle_barrier < getmonoclock()) {
tdt_close_fd(tdmi, tdt);
cycle_barrier = getmonoclock() + 100000;
tdt = TAILQ_FIRST(&tdmi->tdmi_table_queue);
assert(tdt != NULL);
TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link);
open_table(tdmi, tdt);
}
}
/* Any tables pending (that wants a filter/fd), close this one */
if(TAILQ_FIRST(&tdmi->tdmi_table_queue) != NULL &&
cycle_barrier < getmonoclock()) {
tdt_close_fd(tdmi, tdt);
cycle_barrier = getmonoclock() + 100000;
tdt = TAILQ_FIRST(&tdmi->tdmi_table_queue);
assert(tdt != NULL);
TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link);
open_table(tdmi, tdt);
}
}
pthread_mutex_unlock(&global_lock);
}
pthread_mutex_unlock(&global_lock);
}
return NULL;
}
@ -232,11 +232,13 @@ static void
close_table(th_dvb_mux_instance_t *tdmi, th_dvb_table_t *tdt)
{
th_dvb_adapter_t *tda = tdmi->tdmi_adapter;
tvhpoll_event_t ev;
if(tdt->tdt_fd == -1) {
TAILQ_REMOVE(&tdmi->tdmi_table_queue, tdt, tdt_pending_link);
} else {
epoll_ctl(tda->tda_table_epollfd, EPOLL_CTL_DEL, tdt->tdt_fd, NULL);
ev.fd = tdt->tdt_fd;
tvhpoll_rem(tda->tda_table_pd, &ev, 1);
close(tdt->tdt_fd);
}
}
@ -253,7 +255,7 @@ dvb_input_filtered_setup(th_dvb_adapter_t *tda)
tda->tda_close_table = close_table;
pthread_t ptid;
tda->tda_table_epollfd = epoll_create(50);
tda->tda_table_pd = tvhpoll_create(50);
pthread_create(&ptid, NULL, dvb_table_input, tda);
}

View file

@ -21,7 +21,6 @@
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <assert.h>
@ -40,6 +39,7 @@
#include "tsdemux.h"
#include "psi.h"
#include "settings.h"
#include "tvhpoll.h"
#if defined(PLATFORM_LINUX)
#include <linux/netdevice.h>
@ -52,8 +52,8 @@
# endif
#endif
static int iptv_thread_running;
static int iptv_epollfd;
static int iptv_thread_running;
static tvhpoll_t *iptv_poll;
static pthread_mutex_t iptv_recvmutex;
struct service_list iptv_all_services; /* All IPTV services */
@ -137,11 +137,11 @@ iptv_thread(void *aux)
{
int nfds, fd, r, j, hlen;
uint8_t tsb[65536], *buf;
struct epoll_event ev;
tvhpoll_event_t ev;
service_t *t;
while(1) {
nfds = epoll_wait(iptv_epollfd, &ev, 1, -1);
nfds = tvhpoll_wait(iptv_poll, &ev, 1, -1);
if(nfds == -1) {
tvhlog(LOG_ERR, "IPTV", "epoll() error -- %s, sleeping 1 second",
strerror(errno));
@ -220,13 +220,13 @@ iptv_service_start(service_t *t, unsigned int weight, int force_start)
struct sockaddr_in sin;
struct sockaddr_in6 sin6;
struct ifreq ifr;
struct epoll_event ev;
tvhpoll_event_t ev;
assert(t->s_iptv_fd == -1);
if(iptv_thread_running == 0) {
iptv_thread_running = 1;
iptv_epollfd = epoll_create(10);
iptv_poll = tvhpoll_create(10);
pthread_create(&tid, NULL, iptv_thread, NULL);
}
@ -345,9 +345,10 @@ iptv_service_start(service_t *t, unsigned int weight, int force_start)
resize, strerror(errno));
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN;
ev.events = TVHPOLL_IN;
ev.fd = fd;
ev.data.fd = fd;
if(epoll_ctl(iptv_epollfd, EPOLL_CTL_ADD, fd, &ev) == -1) {
if(tvhpoll_add(iptv_poll, &ev, 1) == -1) {
tvhlog(LOG_ERR, "IPTV", "\"%s\" cannot add to epoll set -- %s",
t->s_identifier, strerror(errno));
close(fd);
@ -451,6 +452,7 @@ iptv_service_stop(service_t *t)
#endif
}
close(t->s_iptv_fd); // Automatically removes fd from epoll set
// TODO: this is an issue
t->s_iptv_fd = -1;
}

View file

@ -18,7 +18,6 @@
#include <pthread.h>
#include <netdb.h>
#include <sys/epoll.h>
#include <poll.h>
#include <assert.h>
#include <stdio.h>
@ -34,6 +33,7 @@
#include "tcp.h"
#include "tvheadend.h"
#include "tvhpoll.h"
int tcp_preferred_address_family = AF_INET;
@ -144,16 +144,16 @@ tcp_connect(const char *hostname, int port, char *errbuf, size_t errbufsize,
r = poll(&pfd, 1, timeout * 1000);
if(r == 0) {
/* Timeout */
snprintf(errbuf, errbufsize, "Connection attempt timed out");
close(fd);
return -1;
/* Timeout */
snprintf(errbuf, errbufsize, "Connection attempt timed out");
close(fd);
return -1;
}
if(r == -1) {
snprintf(errbuf, errbufsize, "poll() error: %s", strerror(errno));
close(fd);
return -1;
snprintf(errbuf, errbufsize, "poll() error: %s", strerror(errno));
close(fd);
return -1;
}
getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
@ -331,7 +331,7 @@ tcp_read_timeout(int fd, void *buf, size_t len, int timeout)
x = recv(fd, buf + tot, len - tot, MSG_DONTWAIT);
if(x == -1) {
if(errno == EAGAIN)
continue;
continue;
return errno;
}
@ -373,7 +373,7 @@ tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen)
/**
*
*/
static int tcp_server_epoll_fd;
static tvhpoll_t *tcp_server_poll;
typedef struct tcp_server {
tcp_server_callback_t *start;
@ -438,8 +438,8 @@ tcp_server_start(void *aux)
static void *
tcp_server_loop(void *aux)
{
int r, i;
struct epoll_event ev[1];
int r;
tvhpoll_event_t ev;
tcp_server_t *ts;
tcp_server_launch_t *tsl;
pthread_attr_t attr;
@ -450,46 +450,45 @@ tcp_server_loop(void *aux)
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
while(1) {
r = epoll_wait(tcp_server_epoll_fd, ev, sizeof(ev) / sizeof(ev[0]), -1);
r = tvhpoll_wait(tcp_server_poll, &ev, 1, -1);
if(r == -1) {
perror("tcp_server: epoll_wait");
perror("tcp_server: tchpoll_wait");
continue;
}
for(i = 0; i < r; i++) {
ts = ev[i].data.ptr;
if (r == 0) continue;
if(ev[i].events & EPOLLHUP) {
close(ts->serverfd);
free(ts);
continue;
}
ts = ev.data.ptr;
if(ev[i].events & EPOLLIN) {
tsl = malloc(sizeof(tcp_server_launch_t));
tsl->start = ts->start;
tsl->opaque = ts->opaque;
slen = sizeof(struct sockaddr_storage);
if(ev.events & TVHPOLL_HUP) {
close(ts->serverfd);
free(ts);
continue;
}
tsl->fd = accept(ts->serverfd,
(struct sockaddr *)&tsl->peer, &slen);
if(tsl->fd == -1) {
perror("accept");
free(tsl);
sleep(1);
continue;
}
if(ev.events & TVHPOLL_IN) {
tsl = malloc(sizeof(tcp_server_launch_t));
tsl->start = ts->start;
tsl->opaque = ts->opaque;
slen = sizeof(struct sockaddr_storage);
tsl->fd = accept(ts->serverfd,
(struct sockaddr *)&tsl->peer, &slen);
if(tsl->fd == -1) {
perror("accept");
free(tsl);
sleep(1);
continue;
}
slen = sizeof(struct sockaddr_storage);
if(getsockname(tsl->fd, (struct sockaddr *)&tsl->self, &slen)) {
close(tsl->fd);
free(tsl);
continue;
}
slen = sizeof(struct sockaddr_storage);
if(getsockname(tsl->fd, (struct sockaddr *)&tsl->self, &slen)) {
close(tsl->fd);
free(tsl);
continue;
}
pthread_create(&tid, &attr, tcp_server_start, tsl);
}
pthread_create(&tid, &attr, tcp_server_start, tsl);
}
}
return NULL;
@ -502,14 +501,14 @@ void *
tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start, void *opaque)
{
int fd, x;
struct epoll_event e;
tvhpoll_event_t ev;
tcp_server_t *ts;
struct addrinfo hints, *res, *ressave, *use = NULL;
char *portBuf = (char*)malloc(6);
int one = 1;
int zero = 0;
memset(&e, 0, sizeof(e));
memset(&ev, 0, sizeof(ev));
snprintf(portBuf, 6, "%d", port);
@ -570,9 +569,10 @@ tcp_server_create(const char *bindaddr, int port, tcp_server_callback_t *start,
ts->start = start;
ts->opaque = opaque;
e.events = EPOLLIN;
e.data.ptr = ts;
epoll_ctl(tcp_server_epoll_fd, EPOLL_CTL_ADD, fd, &e);
ev.fd = fd;
ev.events = TVHPOLL_IN;
ev.data.ptr = ts;
tvhpoll_add(tcp_server_poll, &ev, 1);
return ts;
}
@ -588,7 +588,7 @@ tcp_server_init(int opt_ipv6)
if(opt_ipv6)
tcp_preferred_address_family = AF_INET6;
tcp_server_epoll_fd = epoll_create(10);
tcp_server_poll = tvhpoll_create(10);
pthread_create(&tid, NULL, tcp_server_loop, NULL);
}

View file

@ -21,6 +21,7 @@
#include "timeshift.h"
#include "timeshift/private.h"
#include "atomic.h"
#include "tvhpoll.h"
#include <sys/types.h>
#include <sys/stat.h>
@ -399,7 +400,7 @@ static int _timeshift_flush_to_live
void *timeshift_reader ( void *p )
{
timeshift_t *ts = p;
int efd, nfds, end, fd = -1, run = 1, wait = -1;
int nfds, end, fd = -1, run = 1, wait = -1;
timeshift_file_t *cur_file = NULL;
off_t cur_off = 0;
int cur_speed = 100, keyframe_mode = 0;
@ -409,13 +410,13 @@ void *timeshift_reader ( void *p )
timeshift_index_iframe_t *tsi = NULL;
streaming_skip_t *skip = NULL;
time_t last_status = 0;
tvhpoll_t *pd;
tvhpoll_event_t ev = { 0 };
/* Poll */
struct epoll_event ev = { 0 };
efd = epoll_create(1);
ev.events = EPOLLIN;
ev.data.fd = ts->rd_pipe.rd;
epoll_ctl(efd, EPOLL_CTL_ADD, ev.data.fd, &ev);
pd = tvhpoll_create(1);
ev.fd = ts->rd_pipe.rd;
ev.events = TVHPOLL_IN;
tvhpoll_add(pd, &ev, 1);
/* Output */
while (run) {
@ -427,7 +428,7 @@ void *timeshift_reader ( void *p )
/* Wait for data */
if(wait)
nfds = epoll_wait(efd, &ev, 1, wait);
nfds = tvhpoll_wait(pd, &ev, 1, wait);
else
nfds = 0;
wait = -1;
@ -438,7 +439,7 @@ void *timeshift_reader ( void *p )
/* Control */
pthread_mutex_lock(&ts->state_mutex);
if (nfds == 1) {
if (_read_msg(ev.data.fd, &ctrl) > 0) {
if (_read_msg(ts->rd_pipe.rd, &ctrl) > 0) {
/* Exit */
if (ctrl->sm_type == SMT_EXIT) {
@ -802,6 +803,7 @@ void *timeshift_reader ( void *p )
}
/* Cleanup */
tvhpoll_destroy(pd);
if (fd != -1) close(fd);
if (sm) streaming_msg_free(sm);
if (ctrl) streaming_msg_free(ctrl);