1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

windows: WSA loop fixes

This commit is contained in:
Andy Green 2020-10-08 08:02:31 +01:00
parent 2ad378095d
commit 04d947b6f7
6 changed files with 128 additions and 60 deletions

View file

@ -373,6 +373,7 @@ struct lws_pollfd {
lws_sockfd_type fd; /**< file descriptor */
SHORT events; /**< which events to respond to */
SHORT revents; /**< which events happened */
uint8_t write_blocked;
};
#define LWS_POLLHUP (FD_CLOSE)
#define LWS_POLLIN (FD_READ | FD_ACCEPT)

View file

@ -409,7 +409,7 @@ struct lws_context_per_thread {
struct lws_pollfd *fds;
volatile struct lws_foreign_thread_pollfd * volatile foreign_pfd_list;
#ifdef _WIN32
WSAEVENT events;
WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS];
CRITICAL_SECTION interrupt_lock;
#endif
lws_sockfd_type dummy_pipe_fds[2];
@ -461,9 +461,6 @@ struct lws_context_per_thread {
unsigned char event_loop_destroy_processing_done:1;
unsigned char destroy_self:1;
unsigned char is_destroyed:1;
#ifdef _WIN32
unsigned char interrupt_requested:1;
#endif
};
#if defined(LWS_WITH_SERVER_STATUS)

View file

@ -87,8 +87,10 @@ lws_plat_init(struct lws_context *context,
}
while (n--) {
int m;
pt->fds_count = 0;
pt->events = WSACreateEvent(); /* the cancel event */
for (m = 0; m < WSA_MAXIMUM_WAIT_EVENTS; m++)
pt->events[m] = WSACreateEvent();
InitializeCriticalSection(&pt->interrupt_lock);
pt++;

View file

@ -38,10 +38,15 @@ lws_plat_pipe_signal(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
/*
* We need the critical section so that we are either setting it or
* clearing it, no matter how many threads competing there is a clear
* atomic state for the event
*/
EnterCriticalSection(&pt->interrupt_lock);
pt->interrupt_requested = 1;
WSASetEvent(pt->events[0]); /* trigger the cancel event */
LeaveCriticalSection(&pt->interrupt_lock);
WSASetEvent(pt->events); /* trigger the cancel event */
return 0;
}

View file

@ -67,8 +67,6 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
unsigned int i;
DWORD ev;
int n;
unsigned int eIdx;
int interrupt_requested;
/* stay dead once we are dead */
if (context == NULL)
@ -148,60 +146,123 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
/*
* is there anybody with pending stuff that needs service forcing?
*/
if (!lws_service_adjust_timeout(context, 1, tsi))
timeout_us = 0;
for (n = 0; n < (int)pt->fds_count; n++)
WSAEventSelect(pt->fds[n].fd, pt->events,
FD_READ | (!!(pt->fds[n].events & LWS_POLLOUT) * FD_WRITE) |
FD_OOB | FD_ACCEPT |
FD_CONNECT | FD_CLOSE | FD_QOS |
FD_ROUTING_INTERFACE_CHANGE |
FD_ADDRESS_LIST_CHANGE);
/*
* WSA cannot actually tell us this from the wait... if anyone wants
* POLLOUT and is not blocked for it, no need to wait since we will want
* to service at least those. Still enter the wait so we can pick up
* other pending things...
*/
ev = WSAWaitForMultipleEvents(1, &pt->events, FALSE,
(DWORD)(timeout_us / LWS_US_PER_MS), FALSE);
if (ev != WSA_WAIT_EVENT_0)
for (n = 0; n < (int)pt->fds_count; n++)
if (pt->fds[n].fd != LWS_SOCK_INVALID &&
pt->fds[n].events & LWS_POLLOUT &&
!pt->fds[n].write_blocked) {
timeout_us = 0;
break;
}
// lwsl_notice("%s: to %dms\n", __func__, (int)(timeout_us / 1000));
ev = WSAWaitForMultipleEvents(pt->fds_count + 1, pt->events, FALSE,
(DWORD)(timeout_us / LWS_US_PER_MS),
FALSE);
//lwsl_notice("%s: ev 0x%x\n", __func__, ev);
/*
* The wait returns indicating the one event that had something, or
* that we timed out, or something broken.
*
* Amazingly WSA can only handle 64 events, because the errors start
* at ordinal 64.
*/
if (ev >= WSA_MAXIMUM_WAIT_EVENTS &&
ev != WSA_WAIT_TIMEOUT)
/* some kind of error */
return 0;
EnterCriticalSection(&pt->interrupt_lock);
interrupt_requested = pt->interrupt_requested;
pt->interrupt_requested = 0;
LeaveCriticalSection(&pt->interrupt_lock);
if (interrupt_requested) {
lws_broadcast(pt, LWS_CALLBACK_EVENT_WAIT_CANCELLED,
NULL, 0);
if (!ev) {
/*
* The zero'th event is the cancel event specifically. Lock
* the event reset so we are definitely clearing it while we
* try to clear it.
*/
EnterCriticalSection(&pt->interrupt_lock);
WSAResetEvent(pt->events[0]);
LeaveCriticalSection(&pt->interrupt_lock);
lws_broadcast(pt, LWS_CALLBACK_EVENT_WAIT_CANCELLED, NULL, 0);
return 0;
}
/*
* Otherwise at least fds[ev - 1] has something to do...
*/
#if defined(LWS_WITH_TLS)
if (pt->context->tls_ops &&
pt->context->tls_ops->fake_POLLIN_for_buffered)
pt->context->tls_ops->fake_POLLIN_for_buffered(pt);
#endif
for (eIdx = 0; eIdx < pt->fds_count; ++eIdx) {
/*
* POLLOUT for any fds that can
*/
for (n = 0; n < (int)pt->fds_count; n++)
if (pt->fds[n].fd != LWS_SOCK_INVALID &&
pt->fds[n].events & LWS_POLLOUT &&
!pt->fds[n].write_blocked) {
struct timeval tv;
fd_set se;
/*
* We have to check if it is blocked...
* if not, do the POLLOUT handling
*/
FD_ZERO(&se);
FD_SET(pt->fds[n].fd, &se);
tv.tv_sec = tv.tv_usec = 0;
if (select(1, NULL, &se, NULL, &tv) != 1)
pt->fds[n].write_blocked = 1;
else {
pt->fds[n].revents |= LWS_POLLOUT;
lws_service_fd_tsi(context, &pt->fds[n], tsi);
}
}
if (ev && ev < WSA_MAXIMUM_WAIT_EVENTS) {
unsigned int err;
if (WSAEnumNetworkEvents(pt->fds[eIdx].fd, pt->events,
/* handle fds[ev - 1] */
if (WSAEnumNetworkEvents(pt->fds[ev - 1].fd, pt->events[ev],
&networkevents) == SOCKET_ERROR) {
lwsl_err("WSAEnumNetworkEvents() failed "
"with error %d\n", LWS_ERRNO);
return -1;
}
if (!networkevents.lNetworkEvents)
networkevents.lNetworkEvents = LWS_POLLOUT;
pfd = &pt->fds[eIdx];
pfd = &pt->fds[ev - 1];
pfd->revents = (short)networkevents.lNetworkEvents;
if (!pfd->write_blocked && pfd->revents & FD_WRITE)
pfd->write_blocked = 0;
err = networkevents.iErrorCode[FD_CONNECT_BIT];
if ((networkevents.lNetworkEvents & FD_CONNECT) && wsi_from_fd(context, pfd->fd) && !wsi_from_fd(context, pfd->fd)->udp) {
lwsl_debug("%s: FD_CONNECT: %p\n", __func__, wsi_from_fd(context, pfd->fd));
if ((networkevents.lNetworkEvents & FD_CONNECT) &&
wsi_from_fd(context, pfd->fd) &&
!wsi_from_fd(context, pfd->fd)->udp) {
lwsl_debug("%s: FD_CONNECT: %p\n", __func__,
wsi_from_fd(context, pfd->fd));
pfd->revents &= ~LWS_POLLOUT;
if (err && err != LWS_EALREADY &&
err != LWS_EINPROGRESS && err != LWS_EWOULDBLOCK &&
err != LWS_EINPROGRESS &&
err != LWS_EWOULDBLOCK &&
err != WSAEINVAL) {
lwsl_debug("Unable to connect errno=%d\n", err);
@ -210,10 +271,11 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
* do we have more DNS entries to try?
*/
if (wsi_from_fd(context, pfd->fd)->dns_results_next) {
lws_sul_schedule(context, 0, &wsi_from_fd(context, pfd->fd)->
sul_connect_timeout,
lws_client_conn_wait_timeout, 1);
continue;
lws_sul_schedule(context, 0,
&wsi_from_fd(context, pfd->fd)->
sul_connect_timeout,
lws_client_conn_wait_timeout, 1);
return 0;
} else
pfd->revents |= LWS_POLLHUP;
} else
@ -221,9 +283,11 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
if (wsi_from_fd(context, pfd->fd)->udp)
pfd->revents |= LWS_POLLHUP;
else
lws_client_connect_3_connect(wsi_from_fd(context, pfd->fd),
NULL, NULL, LWS_CONNECT_COMPLETION_GOOD,
NULL);
lws_client_connect_3_connect(
wsi_from_fd(context, pfd->fd),
NULL, NULL,
LWS_CONNECT_COMPLETION_GOOD,
NULL);
}
}
@ -232,9 +296,6 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
if (wsi)
wsi->sock_send_blocking = 0;
}
/* if something closed, retry this slot */
if (pfd->revents & LWS_POLLHUP)
--eIdx;
if (pfd->revents) {
/*

View file

@ -180,15 +180,15 @@ void
lws_plat_insert_socket_into_fds(struct lws_context *context, struct lws *wsi)
{
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
int n = LWS_POLLIN | LWS_POLLHUP | FD_CONNECT;
if (wsi->udp) {
lwsl_info("%s: UDP\n", __func__);
n = LWS_POLLIN;
pt->fds[pt->fds_count].events |= LWS_POLLIN;
}
pt->fds[pt->fds_count++].revents = 0;
WSAEventSelect(wsi->desc.sockfd, pt->events, n);
lws_plat_change_pollfd(context, wsi, &pt->fds[pt->fds_count - 1]);
}
void
@ -219,24 +219,26 @@ lws_plat_check_connection_error(struct lws *wsi)
}
int
lws_plat_change_pollfd(struct lws_context *context,
struct lws *wsi, struct lws_pollfd *pfd)
lws_plat_change_pollfd(struct lws_context *context, struct lws *wsi,
struct lws_pollfd *pfd)
{
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
long e = LWS_POLLHUP | FD_CONNECT;
long e = LWS_POLLHUP | FD_CONNECT | FD_ACCEPT | FD_CLOSE | FD_WRITE;
if ((pfd->events & LWS_POLLIN))
e |= LWS_POLLIN;
/*
* On windows, FD_WRITE is only coming to indicate that we are writable
* again after being choked. So we must always listen for it.
*/
if ((pfd->events & LWS_POLLOUT))
e |= LWS_POLLOUT;
if (pfd->events & LWS_POLLIN)
e |= FD_READ;
if (WSAEventSelect(wsi->desc.sockfd, pt->events, e) != SOCKET_ERROR)
return 0;
if (WSAEventSelect(wsi->desc.sockfd, pt->events[(pfd - pt->fds) + 1], e)) {
lwsl_err("WSAEventSelect() failed with error %d\n", LWS_ERRNO);
return 1;
}
lwsl_err("WSAEventSelect() failed with error %d\n", LWS_ERRNO);
return 1;
return 0;
}
const char *