main: add kqueue support

This commit is contained in:
Richard Aas 2015-06-29 06:51:05 +00:00
parent 144eb24b55
commit 87f8a8b1a4
5 changed files with 194 additions and 7 deletions

View file

@ -59,6 +59,7 @@ enum poll_method {
METHOD_SELECT,
METHOD_EPOLL,
METHOD_ACTSCHED,
METHOD_KQUEUE,
/* sep */
METHOD_MAX
};

View file

@ -259,6 +259,7 @@ endif
AR := ar
AFLAGS := cru
LIB_SUFFIX := .dylib
HAVE_KQUEUE := 1
endif
ifeq ($(OS),netbsd)
CFLAGS += -fPIC -DNETBSD
@ -268,6 +269,7 @@ ifeq ($(OS),netbsd)
APP_LFLAGS += -rdynamic
AR := ar
AFLAGS := cru
HAVE_KQUEUE := 1
endif
ifeq ($(OS),freebsd)
CFLAGS += -fPIC -DFREEBSD
@ -277,6 +279,7 @@ ifeq ($(OS),freebsd)
APP_LFLAGS += -rdynamic
AR := ar
AFLAGS := cru
HAVE_KQUEUE := 1
endif
ifeq ($(OS),openbsd)
CFLAGS += -fPIC -DOPENBSD
@ -286,6 +289,7 @@ ifeq ($(OS),openbsd)
APP_LFLAGS += -rdynamic
AR := ar
AFLAGS := cru
HAVE_KQUEUE := 1
endif
ifeq ($(OS),win32)
CFLAGS += -DWIN32 -D_WIN32_WINNT=0x0501
@ -563,6 +567,9 @@ CFLAGS += -DHAVE_SYS_TIME_H
ifneq ($(HAVE_EPOLL),)
CFLAGS += -DHAVE_EPOLL
endif
ifneq ($(HAVE_KQUEUE),)
CFLAGS += -DHAVE_KQUEUE
endif
CFLAGS += -DHAVE_UNAME
CFLAGS += -DHAVE_UNISTD_H
ifneq ($(OS),cygwin)

View file

@ -29,6 +29,13 @@
#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif
#ifdef HAVE_KQUEUE
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#undef LIST_INIT
#undef LIST_FOREACH
#endif
#include <re_types.h>
#include <re_fmt.h>
#include <re_mem.h>
@ -104,6 +111,11 @@ struct re {
int epfd; /**< epoll control file descriptor */
#endif
#ifdef HAVE_KQUEUE
struct kevent *evlist;
int kqfd;
#endif
#ifdef HAVE_PTHREAD
pthread_mutex_t mutex; /**< Mutex for thread synchronization */
pthread_mutex_t *mutexp; /**< Pointer to active mutex */
@ -126,6 +138,10 @@ static struct re global_re = {
NULL,
-1,
#endif
#ifdef HAVE_KQUEUE
NULL,
-1,
#endif
#ifdef HAVE_PTHREAD
#if MAIN_DEBUG && defined (PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP)
PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP,
@ -270,7 +286,7 @@ static int set_epoll_fds(struct re *re, int fd, int flags)
struct epoll_event event;
int err = 0;
if (re->epfd <= 0)
if (re->epfd < 0)
return EBADFD;
memset(&event, 0, sizeof(event));
@ -323,6 +339,46 @@ static int set_epoll_fds(struct re *re, int fd, int flags)
#endif
#ifdef HAVE_KQUEUE
static int set_kqueue_fds(struct re *re, int fd, int flags)
{
struct kevent kev[2];
int r, n = 0;
memset(kev, 0, sizeof(kev));
/* always delete the events */
EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
EV_SET(&kev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
kevent(re->kqfd, kev, 2, NULL, 0, NULL);
memset(kev, 0, sizeof(kev));
if (flags & FD_READ) {
EV_SET(&kev[n], fd, EVFILT_READ, EV_ADD, 0, 0, 0);
++n;
}
if (flags & FD_WRITE) {
EV_SET(&kev[n], fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
++n;
}
if (n) {
r = kevent(re->kqfd, kev, n, NULL, 0, NULL);
if (r < 0) {
int err = errno;
DEBUG_WARNING("set: [fd=%d, flags=%x] kevent: %m\n",
fd, flags, err);
return err;
}
}
return 0;
}
#endif
/**
* Rebuild the file descriptor mapping table. This must be done whenever
* the polling method is changed.
@ -350,9 +406,19 @@ static int rebuild_fds(struct re *re)
err = set_epoll_fds(re, i, re->fhs[i].flags);
break;
#endif
#ifdef HAVE_KQUEUE
case METHOD_KQUEUE:
err = set_kqueue_fds(re, i, re->fhs[i].flags);
break;
#endif
default:
break;
}
if (err)
break;
}
return err;
@ -391,15 +457,39 @@ static int poll_init(struct re *re)
return ENOMEM;
}
if (re->epfd <= 0
if (re->epfd < 0
&& -1 == (re->epfd = epoll_create(re->maxfds))) {
int err = errno;
DEBUG_WARNING("epoll_create: %m (maxfds=%d)\n",
errno, re->maxfds);
return errno;
err, re->maxfds);
return err;
}
DEBUG_INFO("init: epoll_create() epfd=%d\n", re->epfd);
break;
#endif
#ifdef HAVE_KQUEUE
case METHOD_KQUEUE:
if (!re->evlist) {
size_t sz = re->maxfds * sizeof(*re->evlist);
re->evlist = mem_zalloc(sz, NULL);
if (!re->evlist)
return ENOMEM;
}
if (re->kqfd < 0) {
re->kqfd = kqueue();
if (re->kqfd < 0)
return errno;
DEBUG_INFO("kqueue: fd=%d\n", re->kqfd);
}
break;
#endif
default:
break;
}
@ -421,13 +511,22 @@ static void poll_close(struct re *re)
#ifdef HAVE_EPOLL
DEBUG_INFO("poll_close: epfd=%d\n", re->epfd);
if (re->epfd > 0) {
if (re->epfd >= 0) {
(void)close(re->epfd);
re->epfd = -1;
}
re->events = mem_deref(re->events);
#endif
#ifdef HAVE_KQUEUE
if (re->kqfd >= 0) {
close(re->kqfd);
re->kqfd = -1;
}
re->evlist = mem_deref(re->evlist);
#endif
}
@ -514,11 +613,18 @@ int fd_listen(int fd, int flags, fd_h *fh, void *arg)
#ifdef HAVE_EPOLL
case METHOD_EPOLL:
if (re->epfd <= 0)
if (re->epfd < 0)
return EBADFD;
err = set_epoll_fds(re, fd, flags);
break;
#endif
#ifdef HAVE_KQUEUE
case METHOD_KQUEUE:
err = set_kqueue_fds(re, fd, flags);
break;
#endif
default:
break;
}
@ -615,6 +721,21 @@ static int fd_poll(struct re *re)
break;
#endif
#ifdef HAVE_KQUEUE
case METHOD_KQUEUE: {
struct timespec timeout;
timeout.tv_sec = to / 1000;
timeout.tv_nsec = (to % 1000) * 1000000;
re_unlock(re);
n = kevent(re->kqfd, NULL, 0, re->evlist, re->maxfds,
to ? &timeout : NULL);
re_lock(re);
}
break;
#endif
default:
(void)to;
DEBUG_WARNING("no polling method set\n");
@ -678,6 +799,44 @@ static int fd_poll(struct re *re)
break;
#endif
#ifdef HAVE_KQUEUE
case METHOD_KQUEUE: {
struct kevent *kev = &re->evlist[i];
fd = (int)kev->ident;
if (fd >= re->maxfds) {
DEBUG_WARNING("large fd=%d\n", fd);
break;
}
if (kev->filter == EVFILT_READ)
flags |= FD_READ;
else if (kev->filter == EVFILT_WRITE)
flags |= FD_WRITE;
else {
DEBUG_WARNING("kqueue: unhandled "
"filter %x\n",
kev->filter);
}
if (kev->flags & EV_EOF) {
flags |= FD_EXCEPT;
}
if (kev->flags & EV_ERROR) {
DEBUG_WARNING("kqueue: EV_ERROR on fd %d\n",
fd);
}
if (!flags) {
DEBUG_WARNING("kqueue: no flags fd=%d\n", fd);
}
}
break;
#endif
default:
return EINVAL;
}
@ -937,6 +1096,10 @@ int poll_method_set(enum poll_method method)
#ifdef HAVE_ACTSCHED
case METHOD_ACTSCHED:
break;
#endif
#ifdef HAVE_KQUEUE
case METHOD_KQUEUE:
break;
#endif
default:
DEBUG_WARNING("poll method not supported: '%s'\n",
@ -989,6 +1152,10 @@ int re_thread_init(void)
re->epfd = -1;
#endif
#ifdef HAVE_KQUEUE
re->kqfd = -1;
#endif
pthread_setspecific(pt_key, re);
return 0;
#else

View file

@ -14,6 +14,7 @@ static const char str_poll[] = "poll"; /**< POSIX.1-2001 poll */
static const char str_select[] = "select"; /**< POSIX.1-2001 select */
static const char str_epoll[] = "epoll"; /**< Linux epoll */
static const char str_as[] = "actsched"; /**< Symbian ActiveScheduler */
static const char str_kqueue[] = "kqueue";
/**
@ -32,6 +33,13 @@ enum poll_method poll_method_best(void)
m = METHOD_EPOLL;
}
#endif
#ifdef HAVE_KQUEUE
if (METHOD_NULL == m) {
m = METHOD_KQUEUE;
}
#endif
#ifdef HAVE_POLL
if (METHOD_NULL == m) {
m = METHOD_POLL;
@ -67,6 +75,7 @@ const char *poll_method_name(enum poll_method method)
case METHOD_SELECT: return str_select;
case METHOD_EPOLL: return str_epoll;
case METHOD_ACTSCHED: return str_as;
case METHOD_KQUEUE: return str_kqueue;
default: return "???";
}
}
@ -93,6 +102,8 @@ int poll_method_type(enum poll_method *method, const struct pl *name)
*method = METHOD_EPOLL;
else if (0 == pl_strcasecmp(name, str_as))
*method = METHOD_ACTSCHED;
else if (0 == pl_strcasecmp(name, str_kqueue))
*method = METHOD_KQUEUE;
else
return ENOENT;

View file

@ -25,7 +25,8 @@ static pthread_mutex_t *lockv;
static inline unsigned long threadid(void)
{
#if defined (DARWIN) || defined (FREEBSD) || defined (OPENBSD)
#if defined (DARWIN) || defined (FREEBSD) || defined (OPENBSD) || \
defined (NETBSD)
return (unsigned long)(void *)pthread_self();
#else
return (unsigned long)pthread_self();