diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index b7d84b81..54c0c5fb 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -185,6 +185,10 @@ insert_wsi_socket_into_fds(struct libwebsocket_context *context, #endif /* LWS_USE_LIBEV */ context->fds[context->fds_count++].revents = 0; +#ifdef _WIN32 + context->events[context->fds_count] = WSACreateEvent(); + WSAEventSelect(wsi->sock, context->events[context->fds_count], LWS_POLLIN); +#endif /* external POLL support via protocol 0 */ context->protocols[0].callback(context, wsi, @@ -229,6 +233,10 @@ remove_wsi_socket_from_fds(struct libwebsocket_context *context, /* have the last guy take up the vacant slot */ context->fds[m] = context->fds[context->fds_count]; +#ifdef _WIN32 + WSACloseEvent(context->events[m + 1]); + context->events[m + 1] = context->events[context->fds_count + 1]; +#endif /* * end guy's fds_lookup entry remains unchanged * (still same fd pointing to same wsi) @@ -765,6 +773,9 @@ int lws_set_socket_options(struct libwebsocket_context *context, int fd) LWS_VISIBLE int lws_send_pipe_choked(struct libwebsocket *wsi) { +#ifdef _WIN32 + return wsi->sock_send_blocking; +#else struct pollfd fds; /* treat the fact we got a truncated send pending as if we're choked */ @@ -784,6 +795,7 @@ LWS_VISIBLE int lws_send_pipe_choked(struct libwebsocket *wsi) /* okay to send another packet without blocking */ return 0; +#endif } int @@ -974,6 +986,23 @@ libwebsocket_service_timeout_check(struct libwebsocket_context *context, return 0; } +static int lws_poll_listen_fd(struct pollfd* fd) +{ +#ifdef _WIN32 + fd_set readfds; + struct timeval tv = { 0, 0 }; + + assert(fd->events == POLLIN); + + FD_ZERO(&readfds); + FD_SET(fd->fd, &readfds); + + return select(fd->fd + 1, &readfds, NULL, NULL, &tv); +#else + return poll(fd, 1, 0); +#endif +} + /** * libwebsocket_service_fd() - Service polled socket with something waiting * @context: Websocket context @@ -1103,8 +1132,7 @@ libwebsocket_service_fd(struct libwebsocket_context *context, * even with extpoll, we prepared this * internal fds for listen */ - n = poll(&context->fds[listen_socket_fds_index], - 1, 0); + n = lws_poll_listen_fd(&context->fds[listen_socket_fds_index]); if (n > 0) { /* there's a conn waiting for us */ libwebsocket_service_fd(context, &context-> @@ -1389,6 +1417,10 @@ libwebsocket_context_destroy(struct libwebsocket_context *context) #if defined(WIN32) || defined(_WIN32) + if (context->events) { + WSACloseEvent(context->events[0]); + free(context->events); + } #else close(context->dummy_pipe_fds[0]); close(context->dummy_pipe_fds[1]); @@ -1470,8 +1502,15 @@ LWS_VISIBLE int libwebsocket_service(struct libwebsocket_context *context, int timeout_ms) { int n; +#ifdef _WIN32 + int i; + DWORD ev; + WSANETWORKEVENTS networkevents; + struct pollfd *pfd; +#else int m; char buf; +#endif /* stay dead once we are dead */ @@ -1484,6 +1523,53 @@ libwebsocket_service(struct libwebsocket_context *context, int timeout_ms) #endif /* LWS_USE_LIBEV */ context->service_tid = context->protocols[0].callback(context, NULL, LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0); + +#ifdef _WIN32 + for (i = 0; i < context->fds_count; ++i) { + pfd = &context->fds[i]; + if (pfd->fd == context->listen_service_fd) + continue; + if (pfd->events & POLLOUT) { + if (context->lws_lookup[pfd->fd]->sock_send_blocking) + continue; + pfd->revents = POLLOUT; + n = libwebsocket_service_fd(context, pfd); + if (n < 0) + return n; + } + } + + ev = WSAWaitForMultipleEvents(context->fds_count + 1, context->events, FALSE, timeout_ms, FALSE); + if (ev == WSA_WAIT_TIMEOUT) { + libwebsocket_service_fd(context, NULL); + return 0; + } + + if (ev == WSA_WAIT_EVENT_0) { + WSAResetEvent(context->events[0]); + return 0; + } + + if (ev < WSA_WAIT_EVENT_0 || ev > WSA_WAIT_EVENT_0 + context->fds_count) + return -1; + + pfd = &context->fds[ev - WSA_WAIT_EVENT_0 - 1]; + + if (WSAEnumNetworkEvents(pfd->fd, context->events[ev - WSA_WAIT_EVENT_0], + &networkevents) == SOCKET_ERROR) { + lwsl_err("WSAEnumNetworkEvents() failed with error %d\n", LWS_ERRNO); + return -1; + } + + pfd->revents = (networkevents.lNetworkEvents & LWS_POLLIN) ? POLLIN : 0; + + if (networkevents.lNetworkEvents & LWS_POLLOUT) { + context->lws_lookup[pfd->fd]->sock_send_blocking = FALSE; + pfd->revents |= POLLOUT; + } + + return libwebsocket_service_fd(context, pfd); +#else n = poll(context->fds, context->fds_count, timeout_ms); context->service_tid = 0; @@ -1504,13 +1590,13 @@ libwebsocket_service(struct libwebsocket_context *context, int timeout_ms) for (n = 0; n < context->fds_count; n++) { if (!context->fds[n].revents) continue; -#ifndef _WIN32 + if (context->fds[n].fd == context->dummy_pipe_fds[0]) { if (read(context->fds[n].fd, &buf, 1) != 1) lwsl_err("Cannot read from dummy pipe."); continue; } -#endif + m = libwebsocket_service_fd(context, &context->fds[n]); if (m < 0) return -1; @@ -1520,6 +1606,7 @@ libwebsocket_service(struct libwebsocket_context *context, int timeout_ms) } return 0; +#endif } /** @@ -1528,13 +1615,13 @@ libwebsocket_service(struct libwebsocket_context *context, int timeout_ms) * * This function let a call to libwebsocket_service() waiting for a timeout * immediately return. - * - * At the moment this functionality cannot be used on Windows. */ LWS_VISIBLE void libwebsocket_cancel_service(struct libwebsocket_context *context) { -#ifndef _WIN32 +#ifdef _WIN32 + WSASetEvent(context->events[0]); +#else char buf = 0; if (write(context->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1) lwsl_err("Cannot write to dummy pipe."); @@ -1649,20 +1736,24 @@ void lws_change_pollfd(struct libwebsocket *wsi, int _and, int _or) { struct libwebsocket_context *context = wsi->protocol->owning_server; - int events; int tid; int sampled_tid; + struct pollfd *pfd; struct libwebsocket_pollargs pa; +#ifdef _WIN32 + long networkevents = FD_WRITE; +#endif + pfd = &context->fds[wsi->position_in_fds_table]; pa.fd = wsi->sock; context->protocols[0].callback(context, wsi, LWS_CALLBACK_LOCK_POLL, wsi->user_space, (void *) &pa, 0); - pa.prev_events = events = context->fds[wsi->position_in_fds_table].events; + pa.prev_events = pfd->events; - pa.events = context->fds[wsi->position_in_fds_table].events = (events & ~_and) | _or; + pa.events = pfd->events = (pfd->events & ~_and) | _or; context->protocols[0].callback(context, wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD, @@ -1675,7 +1766,18 @@ lws_change_pollfd(struct libwebsocket *wsi, int _and, int _or) * ... and the service thread is waiting ... * then cancel it to force a restart with our changed events */ - if (events != context->fds[wsi->position_in_fds_table].events) { + if (pa.prev_events != pa.events) { +#ifdef _WIN32 + if ((pfd->events & POLLIN)) + networkevents |= LWS_POLLIN; + + if (WSAEventSelect(wsi->sock, + context->events[wsi->position_in_fds_table + 1], + networkevents) == SOCKET_ERROR) { + lwsl_err("WSAEventSelect() failed with error %d\n", LWS_ERRNO); + } +#endif + sampled_tid = context->service_tid; if (sampled_tid) { tid = context->protocols[0].callback(context, NULL, @@ -2144,20 +2246,6 @@ libwebsocket_create_context(struct lws_context_creation_info *info) lwsl_err("WSAStartup failed with error: %d\n", err); return NULL; } - - /* default to a poll() made out of select() */ - poll = emulated_poll; - -#ifndef _WIN32_WCE - /* if windows socket lib available, use his WSAPoll */ - wsdll = GetModuleHandle(_T("Ws2_32.dll")); - if (wsdll) - poll = (PFNWSAPOLL)GetProcAddress(wsdll, "WSAPoll"); - - /* Finally fall back to emulated poll if all else fails */ - if (!poll) - poll = emulated_poll; -#endif } #endif @@ -2219,9 +2307,23 @@ libwebsocket_create_context(struct lws_context_creation_info *info) memset(context->lws_lookup, 0, sizeof(struct libwebsocket *) * context->max_fds); +#ifdef _WIN32 + context->events = (WSAEVENT *)malloc(sizeof(WSAEVENT) * + (context->max_fds + 1)); + if (context->events == NULL) { + lwsl_err("Unable to allocate events array for %d connections\n", + context->max_fds); + free(context->lws_lookup); + free(context->fds); + free(context); + return NULL; + } +#endif + if (!LWS_LIBEV_ENABLED(context)) { #ifdef _WIN32 context->fds_count = 0; + context->events[0] = WSACreateEvent(); #else if (pipe(context->dummy_pipe_fds)) { lwsl_err("Unable to create pipe\n"); diff --git a/lib/output.c b/lib/output.c index cad68976..69245661 100644 --- a/lib/output.c +++ b/lib/output.c @@ -86,6 +86,13 @@ LWS_VISIBLE void lwsl_hexdump(void *vbuf, size_t len) #endif +static void lws_set_blocking_send(struct libwebsocket *wsi) +{ +#ifdef _WIN32 + wsi->sock_send_blocking = TRUE; +#endif +} + /* * notice this returns number of bytes consumed, or -1 */ @@ -160,6 +167,8 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len) n = SSL_get_error(wsi->ssl, n); if (n == SSL_ERROR_WANT_READ || n == SSL_ERROR_WANT_WRITE) { + if (n == SSL_ERROR_WANT_WRITE) + lws_set_blocking_send(wsi); n = 0; goto handle_truncated_send; @@ -172,7 +181,10 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len) n = send(wsi->sock, buf, len, MSG_NOSIGNAL); lws_latency(context, wsi, "send lws_issue_raw", n, n == len); if (n < 0) { - if (LWS_ERRNO == LWS_EAGAIN || LWS_ERRNO == LWS_EINTR) { + if (LWS_ERRNO == LWS_EAGAIN || LWS_ERRNO == LWS_EWOULDBLOCK + || LWS_ERRNO == LWS_EINTR) { + if (LWS_ERRNO == LWS_EWOULDBLOCK) + lws_set_blocking_send(wsi); n = 0; goto handle_truncated_send; } diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 25b45ed9..e8dce85f 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -68,6 +68,8 @@ #define LWS_EINTR WSAEINTR #define LWS_EISCONN WSAEISCONN #define LWS_EWOULDBLOCK WSAEWOULDBLOCK +#define LWS_POLLIN (FD_READ | FD_ACCEPT) +#define LWS_POLLOUT (FD_WRITE) #define compatible_close(fd) closesocket(fd); #ifdef __MINGW64__ @@ -282,6 +284,9 @@ struct lws_signal_watcher { #endif /* LWS_USE_LIBEV */ struct libwebsocket_context { +#ifdef _WIN32 + WSAEVENT *events; +#endif struct pollfd *fds; struct libwebsocket **lws_lookup; /* fd to wsi */ int fds_count; @@ -500,6 +505,10 @@ struct libwebsocket { BIO *client_bio; unsigned int use_ssl:2; #endif + +#ifdef _WIN32 + BOOL sock_send_blocking; +#endif }; LWS_EXTERN void