smp: force cancel on pt that was assigned new wsi

This commit is contained in:
Andy Green 2018-03-08 12:04:13 +08:00
parent ad5dbda120
commit 5fc2598eac
4 changed files with 119 additions and 96 deletions

View file

@ -8,22 +8,39 @@ libwebsockets
News
----
## New "minimal examples"
https://github.com/warmcat/libwebsockets/tree/master/minimal-examples
These are like the test apps, but focus on doing one thing, the best way, with the minimum amount of code. For example the minimal-http-server serves the cwd on http/1 or http/2 in 50 LOC.
They also build standalone, so it's easier to copy them directly to start your own project; they
are CC0 licensed (public domain) to facilitate that.
## Windows binary builds
32- and 64-bit Windows binary builds are available via Appveyor. Visit [lws on Appveyor](https://ci.appveyor.com/project/lws-team/libwebsockets),
click on a build, the ARTIFACTS, and unzip the zip file at `C:\Program Files (x86)/libwebsockets`.
- v2.4 is out... HTTP/2 server support and mbedTLS as a TLS backend.
## Latest Stable
- v2.4.2 is out... HTTP/2 server support and mbedTLS as a TLS backend.
see the changelog https://github.com/warmcat/libwebsockets/blob/v2.4-stable/changelog
Please note the additional READMEs have moved to ./READMEs/
- v2.3 is out... see the changelog https://github.com/warmcat/libwebsockets/blob/v2.3-stable/changelog
## ESP32 is supported
ESP32 is now supported in lws! Download the
- factory https://github.com/warmcat/lws-esp32-factory and
- test server app https://github.com/warmcat/lws-esp32-test-server-demos
The ESP32 stuff has my dynamic mbedtls buffer allocation patches applied,
which reduce allocation for small payload TLS links by around 26KiB per connection.
## Support
This is the libwebsockets C library for lightweight websocket clients and
servers. For support, visit

View file

@ -63,7 +63,9 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
assert(wsi->position_in_fds_table >= 0 &&
wsi->position_in_fds_table < (int)pt->fds_count);
#if !defined(LWS_WITH_LIBUV) && !defined(LWS_WITH_LIBEV) && !defined(LWS_WITH_LIBEVENT)
#if !defined(LWS_WITH_LIBUV) && \
!defined(LWS_WITH_LIBEV) && \
!defined(LWS_WITH_LIBEVENT)
/*
* This only applies when we use the default poll() event loop.
*
@ -198,24 +200,28 @@ bail:
}
#ifndef LWS_NO_SERVER
/*
* Enable or disable listen sockets on this pt globally...
* it's modulated according to the pt having space for a new accept.
*/
static void
lws_accept_modulation(struct lws_context_per_thread *pt, int allow)
lws_accept_modulation(struct lws_context *context,
struct lws_context_per_thread *pt, int allow)
{
// multithread listen seems broken
#if 0
struct lws_vhost *vh = context->vhost_list;
struct lws_pollargs pa1;
while (vh) {
if (allow)
_lws_change_pollfd(pt->wsi_listening,
if (vh->lserv_wsi) {
if (allow)
_lws_change_pollfd(vh->lserv_wsi,
0, LWS_POLLIN, &pa1);
else
_lws_change_pollfd(pt->wsi_listening,
else
_lws_change_pollfd(vh->lserv_wsi,
LWS_POLLIN, 0, &pa1);
}
vh = vh->vhost_next;
}
#endif
}
#endif
@ -275,7 +281,7 @@ __insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
#ifndef LWS_NO_SERVER
/* if no more room, defeat accepts on this thread */
if ((unsigned int)pt->fds_count == context->fd_limit_per_thread - 1)
lws_accept_modulation(pt, 0);
lws_accept_modulation(context, pt, 0);
#endif
if (wsi->vhost &&
@ -357,7 +363,7 @@ __remove_wsi_socket_from_fds(struct lws *wsi)
if (!context->being_destroyed &&
/* if this made some room, accept connects on this thread */
(unsigned int)pt->fds_count < context->fd_limit_per_thread - 1)
lws_accept_modulation(pt, 1);
lws_accept_modulation(context, pt, 1);
#endif
if (wsi->vhost &&
@ -584,10 +590,9 @@ lws_same_vh_protocol_remove(struct lws *wsi)
}
/* our next should point back to our prev */
if (wsi->same_vh_protocol_next) {
if (wsi->same_vh_protocol_next)
wsi->same_vh_protocol_next->same_vh_protocol_prev =
wsi->same_vh_protocol_prev;
}
wsi->same_vh_protocol_prev = NULL;
wsi->same_vh_protocol_next = NULL;

View file

@ -72,56 +72,55 @@ lws_context_init_server(struct lws_context_creation_info *info,
for (m = 0; m < limit; m++) {
#ifdef LWS_WITH_UNIX_SOCK
if (LWS_UNIX_SOCK_ENABLED(vhost))
sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
else
if (LWS_UNIX_SOCK_ENABLED(vhost))
sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
else
#endif
#ifdef LWS_WITH_IPV6
if (LWS_IPV6_ENABLED(vhost))
sockfd = socket(AF_INET6, SOCK_STREAM, 0);
else
if (LWS_IPV6_ENABLED(vhost))
sockfd = socket(AF_INET6, SOCK_STREAM, 0);
else
#endif
sockfd = socket(AF_INET, SOCK_STREAM, 0);
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == LWS_SOCK_INVALID) {
#endif
lwsl_err("ERROR opening socket\n");
return 1;
}
if (sockfd == LWS_SOCK_INVALID) {
#endif /* LWS_POSIX */
lwsl_err("ERROR opening socket\n");
return 1;
}
#if LWS_POSIX && !defined(LWS_WITH_ESP32)
#if (defined(WIN32) || defined(_WIN32)) && defined(SO_EXCLUSIVEADDRUSE)
/*
* only accept that we are the only listener on the port
* https://msdn.microsoft.com/zh-tw/library/
* windows/desktop/ms740621(v=vs.85).aspx
*
* for lws, to match Linux, we default to exclusive listen
*/
if (!lws_check_opt(vhost->options,
LWS_SERVER_OPTION_ALLOW_LISTEN_SHARE)) {
if (setsockopt(sockfd, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
/*
* only accept that we are the only listener on the port
* https://msdn.microsoft.com/zh-tw/library/
* windows/desktop/ms740621(v=vs.85).aspx
*
* for lws, to match Linux, we default to exclusive listen
*/
if (!lws_check_opt(vhost->options,
LWS_SERVER_OPTION_ALLOW_LISTEN_SHARE)) {
if (setsockopt(sockfd, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
(const void *)&opt, sizeof(opt)) < 0) {
lwsl_err("reuseaddr failed\n");
compatible_close(sockfd);
return 1;
}
} else
#endif
/*
* allow us to restart even if old sockets in TIME_WAIT
*/
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
(const void *)&opt, sizeof(opt)) < 0) {
lwsl_err("reuseaddr failed\n");
compatible_close(sockfd);
return 1;
}
} else
#endif
/*
* allow us to restart even if old sockets in TIME_WAIT
*/
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
(const void *)&opt, sizeof(opt)) < 0) {
lwsl_err("reuseaddr failed\n");
compatible_close(sockfd);
return 1;
}
#if defined(LWS_WITH_IPV6) && defined(IPV6_V6ONLY)
if (LWS_IPV6_ENABLED(vhost)) {
if (vhost->options & LWS_SERVER_OPTION_IPV6_V6ONLY_MODIFY) {
if (LWS_IPV6_ENABLED(vhost) &&
vhost->options & LWS_SERVER_OPTION_IPV6_V6ONLY_MODIFY) {
int value = (vhost->options &
LWS_SERVER_OPTION_IPV6_V6ONLY_VALUE) ? 1 : 0;
if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_V6ONLY,
@ -130,17 +129,15 @@ lws_context_init_server(struct lws_context_creation_info *info,
return 1;
}
}
}
#endif
#if defined(__linux__) && defined(SO_REUSEPORT)
n = lws_check_opt(vhost->options, LWS_SERVER_OPTION_ALLOW_LISTEN_SHARE);
n = lws_check_opt(vhost->options,
LWS_SERVER_OPTION_ALLOW_LISTEN_SHARE);
#if LWS_MAX_SMP > 1
n = 1;
n = 1;
#endif
if (n)
if (vhost->context->count_threads > 1)
if (n && vhost->context->count_threads > 1)
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT,
(const void *)&opt, sizeof(opt)) < 0) {
compatible_close(sockfd);
@ -148,52 +145,51 @@ lws_context_init_server(struct lws_context_creation_info *info,
}
#endif
#endif
lws_plat_set_socket_options(vhost, sockfd);
lws_plat_set_socket_options(vhost, sockfd);
#if LWS_POSIX
n = lws_socket_bind(vhost, sockfd, info->port, info->iface);
if (n < 0)
goto bail;
info->port = n;
n = lws_socket_bind(vhost, sockfd, info->port, info->iface);
if (n < 0)
goto bail;
info->port = n;
#endif
vhost->listen_port = info->port;
vhost->iface = info->iface;
vhost->listen_port = info->port;
vhost->iface = info->iface;
wsi = lws_zalloc(sizeof(struct lws), "listen wsi");
if (wsi == NULL) {
lwsl_err("Out of mem\n");
goto bail;
}
wsi->context = vhost->context;
wsi->desc.sockfd = sockfd;
wsi->mode = LWSCM_SERVER_LISTENER;
wsi->protocol = vhost->protocols;
wsi->tsi = m;
wsi->vhost = vhost;
wsi->listener = 1;
wsi = lws_zalloc(sizeof(struct lws), "listen wsi");
if (wsi == NULL) {
lwsl_err("Out of mem\n");
goto bail;
}
wsi->context = vhost->context;
wsi->desc.sockfd = sockfd;
wsi->mode = LWSCM_SERVER_LISTENER;
wsi->protocol = vhost->protocols;
wsi->tsi = m;
wsi->vhost = vhost;
wsi->listener = 1;
#ifdef LWS_WITH_LIBUV
if (LWS_LIBUV_ENABLED(vhost->context))
lws_uv_initvhost(vhost, wsi);
if (LWS_LIBUV_ENABLED(vhost->context))
lws_uv_initvhost(vhost, wsi);
#endif
if (__insert_wsi_socket_into_fds(vhost->context, wsi))
goto bail;
if (__insert_wsi_socket_into_fds(vhost->context, wsi))
goto bail;
vhost->context->count_wsi_allocated++;
vhost->lserv_wsi = wsi;
vhost->context->count_wsi_allocated++;
vhost->lserv_wsi = wsi;
#if LWS_POSIX
n = listen(wsi->desc.sockfd, LWS_SOMAXCONN);
if (n < 0) {
lwsl_err("listen failed with error %d\n", LWS_ERRNO);
vhost->lserv_wsi = NULL;
vhost->context->count_wsi_allocated--;
__remove_wsi_socket_from_fds(wsi);
goto bail;
}
n = listen(wsi->desc.sockfd, LWS_SOMAXCONN);
if (n < 0) {
lwsl_err("listen failed with error %d\n", LWS_ERRNO);
vhost->lserv_wsi = NULL;
vhost->context->count_wsi_allocated--;
__remove_wsi_socket_from_fds(wsi);
goto bail;
}
} /* for each thread able to independently listen */
#else
#endif
if (!lws_check_opt(info->options, LWS_SERVER_OPTION_EXPLICIT_VHOSTS)) {
#ifdef LWS_WITH_UNIX_SOCK
@ -2131,6 +2127,8 @@ lws_adopt_descriptor_vhost(struct lws_vhost *vh, lws_adoption_type type,
lwsl_info("%s: waiting for ah\n", __func__);
}
lws_cancel_service_pt(new_wsi);
return new_wsi;
fail:

View file

@ -21,7 +21,7 @@
#include <string.h>
#include <signal.h>
#define COUNT_THREADS 10
#define COUNT_THREADS 8
static struct lws_context *context;
static int interrupted;
@ -48,7 +48,8 @@ static const struct lws_http_mount mount = {
void *thread_service(void *threadid)
{
while (lws_service_tsi(context, 50, (int)(lws_intptr_t)threadid) >= 0 &&
while (lws_service_tsi(context, 10000,
(int)(lws_intptr_t)threadid) >= 0 &&
!interrupted)
;
@ -58,6 +59,7 @@ void *thread_service(void *threadid)
void sigint_handler(int sig)
{
interrupted = 1;
lws_cancel_service(context);
}
int main(int argc, char **argv)
@ -72,12 +74,13 @@ int main(int argc, char **argv)
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
info.port = 7681;
info.mounts = &mount;
// info.max_http_header_pool = 10;
info.count_threads = COUNT_THREADS;
lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_USER
/* | LLL_INFO */ /* | LLL_DEBUG */, NULL);
/* | LLL_INFO */ /* | LLL_DEBUG */, NULL);
lwsl_user("LWS minimal http server SMP | visit http://localhost:7681\n");
lwsl_user("LWS minimal http server SMP | visit http://127.0.0.1:7681\n");
context = lws_create_context(&info);
if (!context) {