Use one event for all sockets to avoid 64 limit and fix the single dispatch issue

This commit is contained in:
mmessina 2016-07-11 21:17:21 +08:00 committed by Andy Green
parent 5f73048d58
commit fc0e52da9e

View file

@ -79,7 +79,7 @@ delete_from_fd(struct lws_context *context, lws_sockfd_type fd)
if (context->fd_hashtable[h].wsi[n]->sock == fd) {
while (n < context->fd_hashtable[h].length) {
context->fd_hashtable[h].wsi[n] =
context->fd_hashtable[h].wsi[n + 1];
context->fd_hashtable[h].wsi[n + 1];
n++;
}
context->fd_hashtable[h].length--;
@ -93,7 +93,7 @@ delete_from_fd(struct lws_context *context, lws_sockfd_type fd)
}
LWS_VISIBLE int lws_get_random(struct lws_context *context,
void *buf, int len)
void *buf, int len)
{
int n;
char *p = (char *)buf;
@ -146,55 +146,6 @@ LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
lwsl_emit_stderr(level, line);
}
LWS_VISIBLE DWORD
lws_plat_wait_event(struct lws_context_per_thread* pt, int timeout)
{
int event_count = pt->fds_count + 1;
HANDLE *events = pt->events;
DWORD ev;
// the WSAWaitForMultipleEvents can wait for maximum of 64 handles
if (event_count <= WSA_MAXIMUM_WAIT_EVENTS)
return WSAWaitForMultipleEvents(event_count, events, FALSE,
timeout, FALSE);
/* you should use libuv instead of this */
int timeout_left = timeout;
// the smaller the step the closer we get to the valid solution
// and the more CPU we will use
int timeout_step = (timeout > 20) ? 20 : timeout;
do {
int events_left = event_count;
int events_handled = 0;
timeout = 0;
while (events_left > 0) {
// split to groups to size of max 64
int rem = (events_left > WSA_MAXIMUM_WAIT_EVENTS) ?
WSA_MAXIMUM_WAIT_EVENTS : events_left;
// wait only on the last group
if (events_left == rem)
timeout = timeout_step;
ev = WSAWaitForMultipleEvents(rem, &events[events_handled],
FALSE, timeout, FALSE);
if (ev != WSA_WAIT_TIMEOUT)
return ev + events_handled;
events_handled += rem;
events_left -= rem;
}
timeout_left -= timeout_step;
} while (timeout_left > 0);
return WSA_WAIT_TIMEOUT;
}
LWS_VISIBLE int
lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
{
@ -218,12 +169,28 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
context->service_tid_detected = context->vhost_list->
protocols[0].callback(&_lws, LWS_CALLBACK_GET_THREAD_ID,
NULL, NULL, 0);
NULL, NULL, 0);
}
context->service_tid = context->service_tid_detected;
if (timeout_ms < 0)
goto faked_service;
{
if (lws_service_flag_pending(context, tsi)) {
/* any socket with events to service? */
for (n = 0; n < (int)pt->fds_count; n++) {
if (!pt->fds[n].revents)
continue;
m = lws_service_fd_tsi(context, &pt->fds[n], tsi);
if (m < 0)
return -1;
/* if something closed, retry this slot */
if (m)
n--;
}
}
return 0;
}
for (i = 0; i < pt->fds_count; ++i) {
pfd = &pt->fds[i];
@ -248,78 +215,60 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
/* if we know something needs service already, don't wait in poll */
timeout_ms = lws_service_adjust_timeout(context, timeout_ms, tsi);
ev = lws_plat_wait_event(pt, timeout_ms);
ev = WSAWaitForMultipleEvents( 1, pt->events , FALSE, timeout_ms, FALSE);
if (ev == WSA_WAIT_EVENT_0) {
WSAResetEvent(pt->events[0]);
int servicedFd = 0;
for(unsigned int eIdx = 0; eIdx < pt->fds_count; ++eIdx)
{
if (WSAEnumNetworkEvents(pt->fds[eIdx].fd, 0, &networkevents) == SOCKET_ERROR) {
lwsl_err("WSAEnumNetworkEvents() failed with error %d\n", LWS_ERRNO);
return -1;
}
pfd = &pt->fds[eIdx];
pfd->revents = (short)networkevents.lNetworkEvents;
if ((networkevents.lNetworkEvents & FD_CONNECT) &&
networkevents.iErrorCode[FD_CONNECT_BIT] &&
networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EALREADY &&
networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EINPROGRESS &&
networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EWOULDBLOCK &&
networkevents.iErrorCode[FD_CONNECT_BIT] != WSAEINVAL) {
lwsl_debug("Unable to connect errno=%d\n",
networkevents.iErrorCode[FD_CONNECT_BIT]);
pfd->revents = LWS_POLLHUP;
} else
pfd->revents = (short)networkevents.lNetworkEvents;
if (pfd->revents & LWS_POLLOUT) {
wsi = wsi_from_fd(context, pfd->fd);
if (wsi)
wsi->sock_send_blocking = 0;
}
if( pfd->revents != 0 )
{
lws_service_fd_tsi(context, pfd, tsi);
++servicedFd;
}
}
if(servicedFd) /* if something closed, retry all the slots */
{
WSASetEvent(pt->events[0]);
return 0;
}
}
context->service_tid = 0;
if (ev == WSA_WAIT_TIMEOUT) {
lws_service_fd(context, NULL);
return 0;
}
if (ev == WSA_WAIT_EVENT_0) {
WSAResetEvent(pt->events[0]);
return 0;
}
if (ev < WSA_WAIT_EVENT_0 || ev > WSA_WAIT_EVENT_0 + pt->fds_count)
return -1;
pfd = &pt->fds[ev - WSA_WAIT_EVENT_0 - 1];
/* eh... is one event at a time the best windows can do? */
if (WSAEnumNetworkEvents(pfd->fd, pt->events[ev - WSA_WAIT_EVENT_0],
&networkevents) == SOCKET_ERROR) {
lwsl_err("WSAEnumNetworkEvents() failed with error %d\n",
LWS_ERRNO);
return -1;
}
if ((networkevents.lNetworkEvents & FD_CONNECT) &&
networkevents.iErrorCode[FD_CONNECT_BIT] &&
networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EALREADY &&
networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EINPROGRESS &&
networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EWOULDBLOCK &&
networkevents.iErrorCode[FD_CONNECT_BIT] != WSAEINVAL) {
lwsl_debug("Unable to connect errno=%d\n",
networkevents.iErrorCode[FD_CONNECT_BIT]);
pfd->revents = LWS_POLLHUP;
} else
pfd->revents = (short)networkevents.lNetworkEvents;
if (pfd->revents & LWS_POLLOUT) {
wsi = wsi_from_fd(context, pfd->fd);
if (wsi)
wsi->sock_send_blocking = 0;
}
faked_service:
/* if someone faked their LWS_POLLIN, then go through all active fds */
if (lws_service_flag_pending(context, tsi)) {
/* any socket with events to service? */
for (n = 0; n < (int)pt->fds_count; n++) {
if (!pt->fds[n].revents)
continue;
m = lws_service_fd_tsi(context, &pt->fds[n], tsi);
if (m < 0)
return -1;
/* if something closed, retry this slot */
if (m)
n--;
}
return 0;
}
if (timeout_ms < 0)
return 0;
/* otherwise just do the one... must be a way to improve that... */
return lws_service_fd_tsi(context, pfd, tsi);
return 0;;
}
LWS_VISIBLE int
@ -345,7 +294,7 @@ lws_plat_set_socket_options(struct lws_vhost *vhost, lws_sockfd_type fd)
/* enable keepalive on this socket */
optval = 1;
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE,
(const char *)&optval, optlen) < 0)
(const char *)&optval, optlen) < 0)
return 1;
alive.onoff = TRUE;
@ -353,7 +302,7 @@ lws_plat_set_socket_options(struct lws_vhost *vhost, lws_sockfd_type fd)
alive.keepaliveinterval = vhost->ka_interval;
if (WSAIoctl(fd, SIO_KEEPALIVE_VALS, &alive, sizeof(alive),
NULL, 0, &dwBytesRet, NULL, NULL))
NULL, 0, &dwBytesRet, NULL, NULL))
return 1;
}
@ -459,9 +408,9 @@ lws_plat_insert_socket_into_fds(struct lws_context *context, struct lws *wsi)
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
pt->fds[pt->fds_count++].revents = 0;
pt->events[pt->fds_count] = WSACreateEvent();
WSAEventSelect(wsi->sock, pt->events[pt->fds_count],
LWS_POLLIN | LWS_POLLHUP | FD_CONNECT);
pt->events[pt->fds_count] = pt->events[0];
WSAEventSelect(wsi->sock, pt->events[0],
LWS_POLLIN | LWS_POLLHUP | FD_CONNECT);
}
LWS_VISIBLE void
@ -470,7 +419,6 @@ lws_plat_delete_socket_from_fds(struct lws_context *context,
{
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
WSACloseEvent(pt->events[m + 1]);
pt->events[m + 1] = pt->events[pt->fds_count--];
}
@ -486,11 +434,11 @@ lws_plat_check_connection_error(struct lws *wsi)
int optLen = sizeof(int);
if (getsockopt(wsi->sock, SOL_SOCKET, SO_ERROR,
(char*)&optVal, &optLen) != SOCKET_ERROR && optVal &&
optVal != LWS_EALREADY && optVal != LWS_EINPROGRESS &&
optVal != LWS_EWOULDBLOCK && optVal != WSAEINVAL) {
lwsl_debug("Connect failed SO_ERROR=%d\n", optVal);
return 1;
(char*)&optVal, &optLen) != SOCKET_ERROR && optVal &&
optVal != LWS_EALREADY && optVal != LWS_EINPROGRESS &&
optVal != LWS_EWOULDBLOCK && optVal != WSAEINVAL) {
lwsl_debug("Connect failed SO_ERROR=%d\n", optVal);
return 1;
}
return 0;
@ -498,7 +446,7 @@ lws_plat_check_connection_error(struct lws *wsi)
LWS_VISIBLE int
lws_plat_change_pollfd(struct lws_context *context,
struct lws *wsi, struct lws_pollfd *pfd)
struct lws *wsi, struct lws_pollfd *pfd)
{
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
long networkevents = LWS_POLLHUP | FD_CONNECT;
@ -510,8 +458,8 @@ lws_plat_change_pollfd(struct lws_context *context,
networkevents |= LWS_POLLOUT;
if (WSAEventSelect(wsi->sock,
pt->events[wsi->position_in_fds_table + 1],
networkevents) != SOCKET_ERROR)
pt->events[0],
networkevents) != SOCKET_ERROR)
return 0;
lwsl_err("WSAEventSelect() failed with error %d\n", LWS_ERRNO);
@ -567,7 +515,7 @@ lws_plat_inet_ntop(int af, const void *src, char *dst, int cnt)
static lws_filefd_type
_lws_plat_file_open(struct lws *wsi, const char *filename,
unsigned long *filelen, int flags)
unsigned long *filelen, int flags)
{
HANDLE ret;
WCHAR buf[MAX_PATH];
@ -609,7 +557,7 @@ _lws_plat_file_seek_cur(struct lws *wsi, lws_filefd_type fd, long offset)
static int
_lws_plat_file_read(struct lws *wsi, lws_filefd_type fd, unsigned long *amount,
unsigned char* buf, unsigned long len)
unsigned char* buf, unsigned long len)
{
DWORD _amount;
@ -626,7 +574,7 @@ _lws_plat_file_read(struct lws *wsi, lws_filefd_type fd, unsigned long *amount,
static int
_lws_plat_file_write(struct lws *wsi, lws_filefd_type fd, unsigned long *amount,
unsigned char* buf, unsigned long len)
unsigned char* buf, unsigned long len)
{
(void)wsi;
(void)fd;
@ -641,7 +589,7 @@ _lws_plat_file_write(struct lws *wsi, lws_filefd_type fd, unsigned long *amount,
LWS_VISIBLE int
lws_plat_init(struct lws_context *context,
struct lws_context_creation_info *info)
struct lws_context_creation_info *info)
{
struct lws_context_per_thread *pt = &context->pt[0];
int i, n = context->count_threads;