1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

windows: migrate to WSAPoll

Switch out the guts of the default window platform
wait to use WSAPoll(), switch the lws_cancel_service()
mechanism to use a UDP socket pair.
This commit is contained in:
Andy Green 2020-12-05 05:01:55 +00:00
parent cef8ce81f7
commit 2d6b9a06a7
10 changed files with 128 additions and 232 deletions

View file

@ -369,15 +369,11 @@ typedef SOCKET lws_sockfd_type;
typedef HANDLE lws_filefd_type;
#endif
struct lws_pollfd {
lws_sockfd_type fd; /**< file descriptor */
SHORT events; /**< which events to respond to */
SHORT revents; /**< which events happened */
uint8_t write_blocked;
};
#define LWS_POLLHUP (FD_CLOSE)
#define LWS_POLLIN (FD_READ | FD_ACCEPT)
#define LWS_POLLOUT (FD_WRITE)
#define lws_pollfd pollfd
#define LWS_POLLHUP (POLLHUP)
#define LWS_POLLIN (POLLRDNORM | POLLRDBAND)
#define LWS_POLLOUT (POLLWRNORM)
#else

View file

@ -56,41 +56,57 @@ typedef enum {
static lcccr_t
lws_client_connect_check(struct lws *wsi)
{
#if !defined(WIN32)
socklen_t sl = sizeof(int);
int e = 0;
int en = 0;
socklen_t sl = sizeof(e);
/*
* This resets SO_ERROR after reading it. If there's an error
* condition, the connect definitively failed.
*/
if (!getsockopt(wsi->desc.sockfd, SOL_SOCKET, SO_ERROR, &e, &sl)) {
if (!getsockopt(wsi->desc.sockfd, SOL_SOCKET, SO_ERROR,
#if defined(WIN32)
(char *)
#endif
&e, &sl)) {
en = LWS_ERRNO;
if (!e) {
lwsl_debug("%s: getsockopt check: conn OK errno %d\n",
__func__, errno);
__func__, en);
return LCCCR_CONNECTED;
}
lwsl_debug("%s: getsockopt fd %d says err %d\n", __func__,
lwsl_notice("%s: getsockopt fd %d says err %d\n", __func__,
wsi->desc.sockfd, e);
}
#else
#if defined(WIN32)
if (!connect(wsi->desc.sockfd, NULL, 0))
return LCCCR_CONNECTED;
if (!LWS_ERRNO || LWS_ERRNO == WSAEINVAL ||
LWS_ERRNO == WSAEWOULDBLOCK ||
LWS_ERRNO == WSAEALREADY) {
lwsl_info("%s: errno %d\n", __func__, errno);
en = LWS_ERRNO;
if (en == WSAEISCONN) /* already connected */
return LCCCR_CONNECTED;
if (en == WSAEALREADY) {
/* reset the POLLOUT wait */
if (lws_change_pollfd(wsi, 0, LWS_POLLOUT))
lwsl_notice("pollfd failed\n");
}
if (!en || en == WSAEINVAL ||
en == WSAEWOULDBLOCK ||
en == WSAEALREADY) {
lwsl_debug("%s: errno %d\n", __func__, en);
return LCCCR_CONTINUE;
}
#endif
lwsl_info("%s: connect check take as FAILED\n", __func__);
lwsl_notice("%s: connect check take as FAILED: errno %d\n", __func__, en);
return LCCCR_FAILED;
}
@ -137,9 +153,6 @@ lws_client_connect_3_connect(struct lws *wsi, const char *ads,
result = NULL;
}
if (n == LWS_CONNECT_COMPLETION_GOOD)
goto conn_good;
#if defined(LWS_WITH_IPV6) && defined(__ANDROID__)
ipv6only = 0;
#endif
@ -455,14 +468,12 @@ ads_known:
wsi->a.context->timeout_secs *
LWS_USEC_PER_SEC);
#if !defined(WIN32)
/*
* must do specifically a POLLOUT poll to hear
* about the connect completion
*/
if (lws_change_pollfd(wsi, 0, LWS_POLLOUT))
goto try_next_dns_result_fds;
#endif
return wsi;
}

View file

@ -138,8 +138,10 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
lws_memory_barrier();
#endif
#if !defined(__linux__)
/* OSX couldn't see close on stdin pipe side otherwise */
#if !defined(__linux__) && !defined(WIN32)
/* OSX couldn't see close on stdin pipe side otherwise; WSAPOLL
* blows up if we give it POLLHUP
*/
_or |= LWS_POLLHUP;
#endif
@ -190,6 +192,7 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
* then cancel it to force a restart with our changed events
*/
pa_events = pa->prev_events != pa->events;
pfd->events = (short)pa->events;
if (pa_events) {
if (lws_plat_change_pollfd(context, wsi, pfd)) {

View file

@ -392,10 +392,7 @@ struct lws_context_per_thread {
struct lws_pollfd *fds;
volatile struct lws_foreign_thread_pollfd * volatile foreign_pfd_list;
#ifdef _WIN32
WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS];
CRITICAL_SECTION interrupt_lock;
#endif
lws_sockfd_type dummy_pipe_fds[2];
struct lws *pipe_wsi;

View file

@ -1510,7 +1510,7 @@ lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin)
!(newconn_cannot_use_h1 && w->role_ops != &role_ops_h1) &&
!strcmp(adsin, w->cli_hostname_copy) &&
#if defined(LWS_WITH_TLS)
(!(wsi->tls.use_ssl & LCCSCF_USE_SSL) || !my_alpn || (my_alpn && strstr(my_alpn, "http/1.1"))) &&
(!(wsi->tls.use_ssl & LCCSCF_USE_SSL) || !my_alpn || (my_alpn && strstr(my_alpn, "http/1.1"))) &&
(wsi->tls.use_ssl & LCCSCF_USE_SSL) ==
(w->tls.use_ssl & LCCSCF_USE_SSL) &&
#endif

View file

@ -410,7 +410,7 @@ struct lws_context {
lws_sorted_usec_list_t sul_nl_coldplug;
#endif
#if defined(LWS_PLAT_FREERTOS)
#if defined(LWS_PLAT_FREERTOS) || defined(WIN32)
struct sockaddr_in frt_pipe_si;
#endif

View file

@ -87,11 +87,7 @@ lws_plat_init(struct lws_context *context,
}
while (n--) {
int m;
pt->fds_count = 0;
for (m = 0; m < WSA_MAXIMUM_WAIT_EVENTS; m++)
pt->events[m] = WSACreateEvent();
InitializeCriticalSection(&pt->interrupt_lock);
pt++;
}
@ -111,16 +107,7 @@ lws_plat_init(struct lws_context *context,
void
lws_plat_context_early_destroy(struct lws_context *context)
{
struct lws_context_per_thread *pt = &context->pt[0];
int n = context->count_threads;
while (n--) {
int m;
for (m = 0; m < WSA_MAXIMUM_WAIT_EVENTS; m++)
WSACloseEvent(pt->events[m]);
DeleteCriticalSection(&pt->interrupt_lock);
pt++;
}
}
void

View file

@ -27,9 +27,69 @@
#endif
#include "private-lib-core.h"
#include <stdlib.h>
#include <stdio.h>
#include <io.h>
#include <fcntl.h>
int
lws_plat_pipe_create(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
struct sockaddr_in *si = &wsi->a.context->frt_pipe_si;
lws_sockfd_type *fd = pt->dummy_pipe_fds;
socklen_t sl;
/*
* Non-WSA HANDLEs can't join the WSAPoll() wait... use a UDP socket
* listening on 127.0.0.1:xxxx and send a byte to it from a second UDP
* socket to cancel the wait.
*
* Set the port to 0 at the bind, so lwip will choose a free one in the
* ephemeral range for us.
*/
fd[0] = socket(AF_INET, SOCK_DGRAM, 0);
if (fd[0] == INVALID_SOCKET)
goto bail;
fd[1] = socket(AF_INET, SOCK_DGRAM, 0);
if (fd[1] == INVALID_SOCKET)
goto bail;
/*
* No need for memset since it's in zalloc'd context... it's in the
* context so we can reuse the prepared sockaddr to send tp fd[0] whem
* we want to cancel the wait
*/
si->sin_family = AF_INET;
si->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
si->sin_port = 0;
if (bind(fd[0], (const struct sockaddr *)si, sizeof(*si)) < 0)
goto bail;
/*
* Query the socket to set context->frt_pipe_si to the full sockaddr it
* wants to be addressed by, including the port that lwip chose.
*
* Afterwards, we can use this prepared sockaddr stashed in the context
* to trigger the "pipe" without any other preliminaries.
*/
sl = sizeof(*si);
if (getsockname(fd[0], (struct sockaddr *)si, &sl))
goto bail;
lwsl_info("%s: cancel UDP skt port %d\n", __func__,
ntohs(si->sin_port));
return 0;
bail:
lwsl_err("%s: failed\n", __func__);
return 1;
}
@ -37,21 +97,31 @@ int
lws_plat_pipe_signal(struct lws_context *ctx, int tsi)
{
struct lws_context_per_thread *pt = &ctx->pt[tsi];
struct sockaddr_in *si = &ctx->frt_pipe_si;
lws_sockfd_type *fd = pt->dummy_pipe_fds;
char u = 0;
int n;
/*
* We need the critical section so that we are either setting it or
* clearing it, no matter how many threads competing there is a clear
* atomic state for the event
* Send a single UDP byte payload to the listening socket fd[0], forcing
* the event loop wait to wake. fd[1] and context->frt_pipe_si are
* set at context creation and are static.
*/
EnterCriticalSection(&pt->interrupt_lock);
WSASetEvent(pt->events[0]); /* trigger the cancel event */
LeaveCriticalSection(&pt->interrupt_lock);
n = sendto(fd[1], &u, 1, 0, (struct sockaddr *)si, sizeof(*si));
return 0;
return n != 1;
}
void
lws_plat_pipe_close(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
if (pt->dummy_pipe_fds[0] && pt->dummy_pipe_fds[0] != LWS_SOCK_INVALID)
closesocket(pt->dummy_pipe_fds[0]);
if (pt->dummy_pipe_fds[1] && pt->dummy_pipe_fds[1] != LWS_SOCK_INVALID)
closesocket(pt->dummy_pipe_fds[1]);
pt->dummy_pipe_fds[0] = pt->dummy_pipe_fds[1] = LWS_SOCK_INVALID;
}

View file

@ -60,12 +60,10 @@ int
_lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
{
struct lws_context_per_thread *pt;
WSANETWORKEVENTS networkevents;
struct lws_pollfd *pfd;
lws_usec_t timeout_us;
struct lws *wsi;
unsigned int i;
DWORD ev;
int n;
/* stay dead once we are dead */
@ -150,69 +148,15 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
if (!lws_service_adjust_timeout(context, 1, tsi))
timeout_us = 0;
/*
* WSA cannot actually tell us this from the wait... if anyone wants
* POLLOUT and is not blocked for it, no need to wait since we will want
* to service at least those. Still enter the wait so we can pick up
* other pending things...
*/
for (n = 0; n < (int)pt->fds_count; n++)
if (pt->fds[n].fd != LWS_SOCK_INVALID &&
pt->fds[n].events & LWS_POLLOUT &&
!pt->fds[n].write_blocked) {
timeout_us = 0;
break;
}
// lwsl_notice("%s: to %dms\n", __func__, (int)(timeout_us / 1000));
ev = WSAWaitForMultipleEvents(pt->fds_count + 1, pt->events, FALSE,
(DWORD)(timeout_us / LWS_US_PER_MS),
FALSE);
//lwsl_notice("%s: ev 0x%x\n", __func__, ev);
/*
* The wait returns indicating the one event that had something, or
* that we timed out, or something broken.
*
* Amazingly WSA can only handle 64 events, because the errors start
* at ordinal 64.
*/
if (ev >= WSA_MAXIMUM_WAIT_EVENTS &&
ev != WSA_WAIT_TIMEOUT)
/* some kind of error */
return 0;
if (!ev) {
/*
* The zero'th event is the cancel event specifically. Lock
* the event reset so we are definitely clearing it while we
* try to clear it.
*/
EnterCriticalSection(&pt->interrupt_lock);
WSAResetEvent(pt->events[0]);
LeaveCriticalSection(&pt->interrupt_lock);
#if defined(LWS_WITH_THREADPOOL)
/*
* threadpools that need to call for on_writable callbacks do it by
* marking the task as needing one for its wsi, then cancelling service.
*
* Each tsi will call this to perform the actual callback_on_writable
* from the correct service thread context
*/
lws_threadpool_tsi_context(pt->context, pt->tid);
#endif
lws_broadcast(pt, LWS_CALLBACK_EVENT_WAIT_CANCELLED, NULL, 0);
// lwsl_notice("%s: in %dms, count %d\n", __func__, (int)(timeout_us / 1000), pt->fds_count);
// for (n = 0; n < (int)pt->fds_count; n++)
// lwsl_notice("%s: fd %d ev 0x%x POLLIN %d, POLLOUT %d\n", __func__, (int)pt->fds[n].fd, (int)pt->fds[n].events, POLLIN, POLLOUT);
int d = WSAPoll((WSAPOLLFD *)&pt->fds[0], pt->fds_count, (int)(timeout_us / LWS_US_PER_MS));
if (d < 0) {
lwsl_err("%s: WSAPoll failed: count %d, err %d: %d\n", __func__, pt->fds_count, d, WSAGetLastError());
return 0;
}
/*
* Otherwise at least fds[ev - 1] has something to do...
*/
// lwsl_notice("%s: out\n", __func__);
#if defined(LWS_WITH_TLS)
if (pt->context->tls_ops &&
@ -220,110 +164,12 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
pt->context->tls_ops->fake_POLLIN_for_buffered(pt);
#endif
/*
* POLLOUT for any fds that can
*/
for (n = 0; n < (int)pt->fds_count; n++)
if (pt->fds[n].fd != LWS_SOCK_INVALID &&
pt->fds[n].events & LWS_POLLOUT &&
!pt->fds[n].write_blocked) {
struct timeval tv;
fd_set se;
/*
* We have to check if it is blocked...
* if not, do the POLLOUT handling
*/
FD_ZERO(&se);
FD_SET(pt->fds[n].fd, &se);
tv.tv_sec = tv.tv_usec = 0;
if (select(1, NULL, &se, NULL, &tv) != 1)
pt->fds[n].write_blocked = 1;
else {
pt->fds[n].revents |= LWS_POLLOUT;
lws_service_fd_tsi(context, &pt->fds[n], tsi);
}
if (pt->fds[n].fd != LWS_SOCK_INVALID && pt->fds[n].revents) {
// lwsl_notice("%s: idx %d, revents 0x%x\n", __func__, n, pt->fds[n].revents);
lws_service_fd_tsi(context, &pt->fds[n], tsi);
}
if (ev && ev < WSA_MAXIMUM_WAIT_EVENTS) {
unsigned int err;
/* handle fds[ev - 1] */
if (WSAEnumNetworkEvents(pt->fds[ev - 1].fd, pt->events[ev],
&networkevents) == SOCKET_ERROR) {
lwsl_err("WSAEnumNetworkEvents() failed "
"with error %d\n", LWS_ERRNO);
return -1;
}
pfd = &pt->fds[ev - 1];
pfd->revents = (short)networkevents.lNetworkEvents;
if (!pfd->write_blocked && pfd->revents & FD_WRITE)
pfd->write_blocked = 0;
err = networkevents.iErrorCode[FD_CONNECT_BIT];
if ((networkevents.lNetworkEvents & FD_CONNECT) &&
wsi_from_fd(context, pfd->fd) &&
!wsi_from_fd(context, pfd->fd)->udp) {
lwsl_debug("%s: FD_CONNECT: %s\n", __func__,
lws_wsi_tag(wsi_from_fd(context, pfd->fd)));
pfd->revents &= ~LWS_POLLOUT;
if (err && err != LWS_EALREADY &&
err != LWS_EINPROGRESS &&
err != LWS_EWOULDBLOCK &&
err != WSAEINVAL) {
lwsl_debug("Unable to connect errno=%d\n", err);
/*
* the connection has definitively failed... but
* do we have more DNS entries to try?
*/
if (wsi_from_fd(context, pfd->fd)->dns_sorted_list.count) {
lws_sul_schedule(context, 0,
&wsi_from_fd(context, pfd->fd)->
sul_connect_timeout,
lws_client_conn_wait_timeout, 1);
return 0;
} else
pfd->revents |= LWS_POLLHUP;
} else
if (wsi_from_fd(context, pfd->fd)) {
if (wsi_from_fd(context, pfd->fd)->udp)
pfd->revents |= LWS_POLLHUP;
else
lws_client_connect_3_connect(
wsi_from_fd(context, pfd->fd),
NULL, NULL,
LWS_CONNECT_COMPLETION_GOOD,
NULL);
}
}
if (pfd->revents & LWS_POLLOUT) {
wsi = wsi_from_fd(context, pfd->fd);
if (wsi)
wsi->sock_send_blocking = 0;
}
if (pfd->revents) {
/*
* On windows is somehow necessary to "acknowledge" the
* POLLIN event, otherwise we never receive another one
* on the TCP connection. But it breaks UDP, so only
* do it on non-UDP.
*/
wsi = wsi_from_fd(context, pfd->fd);
if (wsi && !wsi->udp)
recv(pfd->fd, NULL, 0, 0);
lws_service_fd_tsi(context, pfd, tsi);
}
}
return 0;
}

View file

@ -222,21 +222,7 @@ int
lws_plat_change_pollfd(struct lws_context *context, struct lws *wsi,
struct lws_pollfd *pfd)
{
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
long e = LWS_POLLHUP | FD_CONNECT | FD_ACCEPT | FD_CLOSE | FD_WRITE;
/*
* On windows, FD_WRITE is only coming to indicate that we are writable
* again after being choked. So we must always listen for it.
*/
if (pfd->events & LWS_POLLIN)
e |= FD_READ;
if (WSAEventSelect(wsi->desc.sockfd, pt->events[(pfd - pt->fds) + 1], e)) {
lwsl_err("WSAEventSelect() failed with error %d\n", LWS_ERRNO);
return 1;
}
//struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
return 0;
}