windows Use WSAWaitForMultipleEvents() instead of poll() on Windows

This commit is contained in:
Patrick Gansterer 2014-03-28 15:44:56 +01:00 committed by Andy Green
parent 8c020f2363
commit 0fc37b64f6
3 changed files with 149 additions and 26 deletions

View file

@ -185,6 +185,10 @@ insert_wsi_socket_into_fds(struct libwebsocket_context *context,
#endif /* LWS_USE_LIBEV */
context->fds[context->fds_count++].revents = 0;
#ifdef _WIN32
context->events[context->fds_count] = WSACreateEvent();
WSAEventSelect(wsi->sock, context->events[context->fds_count], LWS_POLLIN);
#endif
/* external POLL support via protocol 0 */
context->protocols[0].callback(context, wsi,
@ -229,6 +233,10 @@ remove_wsi_socket_from_fds(struct libwebsocket_context *context,
/* have the last guy take up the vacant slot */
context->fds[m] = context->fds[context->fds_count];
#ifdef _WIN32
WSACloseEvent(context->events[m + 1]);
context->events[m + 1] = context->events[context->fds_count + 1];
#endif
/*
* end guy's fds_lookup entry remains unchanged
* (still same fd pointing to same wsi)
@ -765,6 +773,9 @@ int lws_set_socket_options(struct libwebsocket_context *context, int fd)
LWS_VISIBLE int lws_send_pipe_choked(struct libwebsocket *wsi)
{
#ifdef _WIN32
return wsi->sock_send_blocking;
#else
struct pollfd fds;
/* treat the fact we got a truncated send pending as if we're choked */
@ -784,6 +795,7 @@ LWS_VISIBLE int lws_send_pipe_choked(struct libwebsocket *wsi)
/* okay to send another packet without blocking */
return 0;
#endif
}
int
@ -974,6 +986,23 @@ libwebsocket_service_timeout_check(struct libwebsocket_context *context,
return 0;
}
static int lws_poll_listen_fd(struct pollfd* fd)
{
#ifdef _WIN32
fd_set readfds;
struct timeval tv = { 0, 0 };
assert(fd->events == POLLIN);
FD_ZERO(&readfds);
FD_SET(fd->fd, &readfds);
return select(fd->fd + 1, &readfds, NULL, NULL, &tv);
#else
return poll(fd, 1, 0);
#endif
}
/**
* libwebsocket_service_fd() - Service polled socket with something waiting
* @context: Websocket context
@ -1103,8 +1132,7 @@ libwebsocket_service_fd(struct libwebsocket_context *context,
* even with extpoll, we prepared this
* internal fds for listen
*/
n = poll(&context->fds[listen_socket_fds_index],
1, 0);
n = lws_poll_listen_fd(&context->fds[listen_socket_fds_index]);
if (n > 0) { /* there's a conn waiting for us */
libwebsocket_service_fd(context,
&context->
@ -1389,6 +1417,10 @@ libwebsocket_context_destroy(struct libwebsocket_context *context)
#if defined(WIN32) || defined(_WIN32)
if (context->events) {
WSACloseEvent(context->events[0]);
free(context->events);
}
#else
close(context->dummy_pipe_fds[0]);
close(context->dummy_pipe_fds[1]);
@ -1470,8 +1502,15 @@ LWS_VISIBLE int
libwebsocket_service(struct libwebsocket_context *context, int timeout_ms)
{
int n;
#ifdef _WIN32
int i;
DWORD ev;
WSANETWORKEVENTS networkevents;
struct pollfd *pfd;
#else
int m;
char buf;
#endif
/* stay dead once we are dead */
@ -1484,6 +1523,53 @@ libwebsocket_service(struct libwebsocket_context *context, int timeout_ms)
#endif /* LWS_USE_LIBEV */
context->service_tid = context->protocols[0].callback(context, NULL,
LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0);
#ifdef _WIN32
for (i = 0; i < context->fds_count; ++i) {
pfd = &context->fds[i];
if (pfd->fd == context->listen_service_fd)
continue;
if (pfd->events & POLLOUT) {
if (context->lws_lookup[pfd->fd]->sock_send_blocking)
continue;
pfd->revents = POLLOUT;
n = libwebsocket_service_fd(context, pfd);
if (n < 0)
return n;
}
}
ev = WSAWaitForMultipleEvents(context->fds_count + 1, context->events, FALSE, timeout_ms, FALSE);
if (ev == WSA_WAIT_TIMEOUT) {
libwebsocket_service_fd(context, NULL);
return 0;
}
if (ev == WSA_WAIT_EVENT_0) {
WSAResetEvent(context->events[0]);
return 0;
}
if (ev < WSA_WAIT_EVENT_0 || ev > WSA_WAIT_EVENT_0 + context->fds_count)
return -1;
pfd = &context->fds[ev - WSA_WAIT_EVENT_0 - 1];
if (WSAEnumNetworkEvents(pfd->fd, context->events[ev - WSA_WAIT_EVENT_0],
&networkevents) == SOCKET_ERROR) {
lwsl_err("WSAEnumNetworkEvents() failed with error %d\n", LWS_ERRNO);
return -1;
}
pfd->revents = (networkevents.lNetworkEvents & LWS_POLLIN) ? POLLIN : 0;
if (networkevents.lNetworkEvents & LWS_POLLOUT) {
context->lws_lookup[pfd->fd]->sock_send_blocking = FALSE;
pfd->revents |= POLLOUT;
}
return libwebsocket_service_fd(context, pfd);
#else
n = poll(context->fds, context->fds_count, timeout_ms);
context->service_tid = 0;
@ -1504,13 +1590,13 @@ libwebsocket_service(struct libwebsocket_context *context, int timeout_ms)
for (n = 0; n < context->fds_count; n++) {
if (!context->fds[n].revents)
continue;
#ifndef _WIN32
if (context->fds[n].fd == context->dummy_pipe_fds[0]) {
if (read(context->fds[n].fd, &buf, 1) != 1)
lwsl_err("Cannot read from dummy pipe.");
continue;
}
#endif
m = libwebsocket_service_fd(context, &context->fds[n]);
if (m < 0)
return -1;
@ -1520,6 +1606,7 @@ libwebsocket_service(struct libwebsocket_context *context, int timeout_ms)
}
return 0;
#endif
}
/**
@ -1528,13 +1615,13 @@ libwebsocket_service(struct libwebsocket_context *context, int timeout_ms)
*
* This function let a call to libwebsocket_service() waiting for a timeout
* immediately return.
*
* At the moment this functionality cannot be used on Windows.
*/
LWS_VISIBLE void
libwebsocket_cancel_service(struct libwebsocket_context *context)
{
#ifndef _WIN32
#ifdef _WIN32
WSASetEvent(context->events[0]);
#else
char buf = 0;
if (write(context->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1)
lwsl_err("Cannot write to dummy pipe.");
@ -1649,20 +1736,24 @@ void
lws_change_pollfd(struct libwebsocket *wsi, int _and, int _or)
{
struct libwebsocket_context *context = wsi->protocol->owning_server;
int events;
int tid;
int sampled_tid;
struct pollfd *pfd;
struct libwebsocket_pollargs pa;
#ifdef _WIN32
long networkevents = FD_WRITE;
#endif
pfd = &context->fds[wsi->position_in_fds_table];
pa.fd = wsi->sock;
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_LOCK_POLL,
wsi->user_space, (void *) &pa, 0);
pa.prev_events = events = context->fds[wsi->position_in_fds_table].events;
pa.prev_events = pfd->events;
pa.events = context->fds[wsi->position_in_fds_table].events = (events & ~_and) | _or;
pa.events = pfd->events = (pfd->events & ~_and) | _or;
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_CHANGE_MODE_POLL_FD,
@ -1675,7 +1766,18 @@ lws_change_pollfd(struct libwebsocket *wsi, int _and, int _or)
* ... and the service thread is waiting ...
* then cancel it to force a restart with our changed events
*/
if (events != context->fds[wsi->position_in_fds_table].events) {
if (pa.prev_events != pa.events) {
#ifdef _WIN32
if ((pfd->events & POLLIN))
networkevents |= LWS_POLLIN;
if (WSAEventSelect(wsi->sock,
context->events[wsi->position_in_fds_table + 1],
networkevents) == SOCKET_ERROR) {
lwsl_err("WSAEventSelect() failed with error %d\n", LWS_ERRNO);
}
#endif
sampled_tid = context->service_tid;
if (sampled_tid) {
tid = context->protocols[0].callback(context, NULL,
@ -2144,20 +2246,6 @@ libwebsocket_create_context(struct lws_context_creation_info *info)
lwsl_err("WSAStartup failed with error: %d\n", err);
return NULL;
}
/* default to a poll() made out of select() */
poll = emulated_poll;
#ifndef _WIN32_WCE
/* if windows socket lib available, use his WSAPoll */
wsdll = GetModuleHandle(_T("Ws2_32.dll"));
if (wsdll)
poll = (PFNWSAPOLL)GetProcAddress(wsdll, "WSAPoll");
/* Finally fall back to emulated poll if all else fails */
if (!poll)
poll = emulated_poll;
#endif
}
#endif
@ -2219,9 +2307,23 @@ libwebsocket_create_context(struct lws_context_creation_info *info)
memset(context->lws_lookup, 0, sizeof(struct libwebsocket *) *
context->max_fds);
#ifdef _WIN32
context->events = (WSAEVENT *)malloc(sizeof(WSAEVENT) *
(context->max_fds + 1));
if (context->events == NULL) {
lwsl_err("Unable to allocate events array for %d connections\n",
context->max_fds);
free(context->lws_lookup);
free(context->fds);
free(context);
return NULL;
}
#endif
if (!LWS_LIBEV_ENABLED(context)) {
#ifdef _WIN32
context->fds_count = 0;
context->events[0] = WSACreateEvent();
#else
if (pipe(context->dummy_pipe_fds)) {
lwsl_err("Unable to create pipe\n");

View file

@ -86,6 +86,13 @@ LWS_VISIBLE void lwsl_hexdump(void *vbuf, size_t len)
#endif
static void lws_set_blocking_send(struct libwebsocket *wsi)
{
#ifdef _WIN32
wsi->sock_send_blocking = TRUE;
#endif
}
/*
* notice this returns number of bytes consumed, or -1
*/
@ -160,6 +167,8 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len)
n = SSL_get_error(wsi->ssl, n);
if (n == SSL_ERROR_WANT_READ ||
n == SSL_ERROR_WANT_WRITE) {
if (n == SSL_ERROR_WANT_WRITE)
lws_set_blocking_send(wsi);
n = 0;
goto handle_truncated_send;
@ -172,7 +181,10 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len)
n = send(wsi->sock, buf, len, MSG_NOSIGNAL);
lws_latency(context, wsi, "send lws_issue_raw", n, n == len);
if (n < 0) {
if (LWS_ERRNO == LWS_EAGAIN || LWS_ERRNO == LWS_EINTR) {
if (LWS_ERRNO == LWS_EAGAIN || LWS_ERRNO == LWS_EWOULDBLOCK
|| LWS_ERRNO == LWS_EINTR) {
if (LWS_ERRNO == LWS_EWOULDBLOCK)
lws_set_blocking_send(wsi);
n = 0;
goto handle_truncated_send;
}

View file

@ -68,6 +68,8 @@
#define LWS_EINTR WSAEINTR
#define LWS_EISCONN WSAEISCONN
#define LWS_EWOULDBLOCK WSAEWOULDBLOCK
#define LWS_POLLIN (FD_READ | FD_ACCEPT)
#define LWS_POLLOUT (FD_WRITE)
#define compatible_close(fd) closesocket(fd);
#ifdef __MINGW64__
@ -282,6 +284,9 @@ struct lws_signal_watcher {
#endif /* LWS_USE_LIBEV */
struct libwebsocket_context {
#ifdef _WIN32
WSAEVENT *events;
#endif
struct pollfd *fds;
struct libwebsocket **lws_lookup; /* fd to wsi */
int fds_count;
@ -500,6 +505,10 @@ struct libwebsocket {
BIO *client_bio;
unsigned int use_ssl:2;
#endif
#ifdef _WIN32
BOOL sock_send_blocking;
#endif
};
LWS_EXTERN void