diff --git a/CMakeLists.txt b/CMakeLists.txt index 61468b042..adb563bf0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -76,19 +76,6 @@ option(LWS_WITH_HTTP2 "Compile with support for http2" OFF) option(LWS_MBED3 "Platform is MBED3" OFF) option(LWS_SSL_SERVER_WITH_ECDH_CERT "Include SSL server use ECDH certificate" OFF) - -if (DEFINED YOTTA_WEBSOCKETS_VERSION_STRING) - -set(LWS_WITH_SHARED OFF) -set(LWS_WITH_SSL OFF) -set(LWS_WITH_ZLIB OFF) -set(LWS_WITHOUT_CLIENT ON) -set(LWS_WITHOUT_TESTAPPS ON) -set(LWS_WITHOUT_EXTENSIONS ON) -set(LWS_MBED3 ON) - -endif() - if (DEFINED YOTTA_WEBSOCKETS_VERSION_STRING) set(LWS_WITH_SHARED OFF) @@ -98,6 +85,7 @@ set(LWS_WITHOUT_CLIENT ON) set(LWS_WITHOUT_TESTAPPS ON) set(LWS_WITHOUT_EXTENSIONS ON) set(LWS_MBED3 ON) +set(LWS_MAX_SMP 1) endif() @@ -247,6 +235,10 @@ if (LWS_WITH_HTTP2) set(LWS_USE_HTTP2 1) endif() +if ("${LWS_MAX_SMP}" STREQUAL "") + set(LWS_MAX_SMP 32) +endif() + #if (LWS_MBED3) # set(CMAKE_C_FLAGS "-D_DEBUG ${CMAKE_C_FLAGS}") #endif() @@ -1140,6 +1132,7 @@ message(" LWS_IPV6 = ${LWS_IPV6}") message(" LWS_WITH_HTTP2 = ${LWS_WITH_HTTP2}") message(" LWS_MBED3 = ${LWS_MBED3}") message(" LWS_SSL_SERVER_WITH_ECDH_CERT = ${LWS_SSL_SERVER_WITH_ECDH_CERT}") +message(" LWS_MAX_SMP = ${LWS_MAX_SMP}") message("---------------------------------------------------------------------") # These will be available to parent projects including libwebsockets using add_subdirectory() diff --git a/README.build.md b/README.build.md index b9bec0133..95a28e0ff 100644 --- a/README.build.md +++ b/README.build.md @@ -86,6 +86,14 @@ Building on Unix: $ LD_LIBRARY_PATH=/usr/local/ssl/lib libwebsockets-test-server --ssl ``` + **NOTE5**: + To build with debug info and _DEBUG for lower priority debug messages + compiled in, use + + ```bash + $ cmake .. -DCMAKE_BUILD_TYPE=DEBUG + ```` + 4. Finally you can build using the generated Makefile: ```bash diff --git a/README.coding.md b/README.coding.md index 4775cd524..d425bacc2 100644 --- a/README.coding.md +++ b/README.coding.md @@ -330,5 +330,51 @@ LWS_SERVER_OPTION_SSL_ECD to build in support and select it at runtime. +SMP / Multithreaded service +--------------------------- +SMP support is integrated into LWS without any internal threading. It's +very simple to use, libwebsockets-test-server-pthread shows how to do it, +use -j argument there to control the number of service threads up to 32. + +Two new members are added to the info struct + + unsigned int count_threads; + unsigned int fd_limit_per_thread; + +leave them at the default 0 to get the normal singlethreaded service loop. + +Set count_threads to n to tell lws you will have n simultaneous service threads +operating on the context. + +There is still a single listen socket on one port, no matter how many +service threads. + +When a connection is made, it is accepted by the service thread with the least +connections active to perform load balancing. + +The user code is responsible for spawning n threads running the service loop +associated to a specific tsi (Thread Service Index, 0 .. n - 1). See +the libwebsockets-test-server-pthread for how to do. + +If you leave fd_limit_per_thread at 0, then the process limit of fds is shared +between the service threads; if you process was allowed 1024 fds overall then +each thread is limited to 1024 / n. + +You can set fd_limit_per_thread to a nonzero number to control this manually, eg +the overall supported fd limit is less than the process allowance. + +You can control the context basic data allocation for multithreading from Cmake +using -DLWS_MAX_SMP=, if not given it's set to 32. The serv_buf allocation +for the threads (currently 4096) is made at runtime only for active threads. + +Because lws will limit the requested number of actual threads supported +according to LWS_MAX_SMP, there is an api lws_get_count_threads(context) to +discover how many threads were actually allowed when the context was created. + +It's required to implement locking in the user code in the same way that +libwebsockets-test-server-pthread does it, for the FD locking callbacks. + +There is no knowledge or dependency in lws itself about pthreads. How the +locking is implemented is entirely up to the user code. diff --git a/changelog b/changelog index 9a34696c5..87a6942cd 100644 --- a/changelog +++ b/changelog @@ -164,7 +164,52 @@ to build in support and select it at runtime. 6) There's a new api lws_parse_uri() that simplies chopping up https://xxx:yyy/zzz uris into parts nicely. The test client now uses this -to allow proper uris. +to allow proper uris as well as the old address style. + +7) SMP support is integrated into LWS without any internal threading. It's +very simple to use, libwebsockets-test-server-pthread shows how to do it, +use -j argument there to control the number of service threads up to 32. + +Two new members are added to the info struct + + unsigned int count_threads; + unsigned int fd_limit_per_thread; + +leave them at the default 0 to get the normal singlethreaded service loop. + +Set count_threads to n to tell lws you will have n simultaneous service threads +operating on the context. + +There is still a single listen socket on one port, no matter how many +service threads. + +When a connection is made, it is accepted by the service thread with the least +connections active to perform load balancing. + +The user code is responsible for spawning n threads running the service loop +associated to a specific tsi (Thread Service Index, 0 .. n - 1). See +the libwebsockets-test-server-pthread for how to do. + +If you leave fd_limit_per_thread at 0, then the process limit of fds is shared +between the service threads; if you process was allowed 1024 fds overall then +each thread is limited to 1024 / n. + +You can set fd_limit_per_thread to a nonzero number to control this manually, eg +the overall supported fd limit is less than the process allowance. + +You can control the context basic data allocation for multithreading from Cmake +using -DLWS_MAX_SMP=, if not given it's set to 32. The serv_buf allocation +for the threads (currently 4096) is made at runtime only for active threads. + +Because lws will limit the requested number of actual threads supported +according to LWS_MAX_SMP, there is an api lws_get_count_threads(context) to +discover how many threads were actually allowed when the context was created. + +It's required to implement locking in the user code in the same way that +libwebsockets-test-server-pthread does it, for the FD locking callbacks. + +There is no knowledge or dependency in lws itself about pthreads. How the +locking is implemented is entirely up to the user code. User api changes diff --git a/lib/client-handshake.c b/lib/client-handshake.c index f8e4c8135..6e0350171 100644 --- a/lib/client-handshake.c +++ b/lib/client-handshake.c @@ -9,6 +9,7 @@ lws_client_connect_2(struct lws *wsi) struct addrinfo hints, *result; #endif struct lws_context *context = wsi->context; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; struct sockaddr_in server_addr4; struct sockaddr_in client_addr4; struct lws_pollfd pfd; @@ -21,18 +22,18 @@ lws_client_connect_2(struct lws *wsi) /* proxy? */ if (context->http_proxy_port) { - plen = sprintf((char *)context->serv_buf, + plen = sprintf((char *)pt->serv_buf, "CONNECT %s:%u HTTP/1.0\x0d\x0a" "User-agent: libwebsockets\x0d\x0a", lws_hdr_simple_ptr(wsi, _WSI_TOKEN_CLIENT_PEER_ADDRESS), wsi->u.hdr.ah->c_port); if (context->proxy_basic_auth_token[0]) - plen += sprintf((char *)context->serv_buf + plen, + plen += sprintf((char *)pt->serv_buf + plen, "Proxy-authorization: basic %s\x0d\x0a", context->proxy_basic_auth_token); - plen += sprintf((char *)context->serv_buf + plen, + plen += sprintf((char *)pt->serv_buf + plen, "\x0d\x0a"); ads = context->http_proxy_address; @@ -262,7 +263,7 @@ lws_client_connect_2(struct lws *wsi) goto failed; wsi->u.hdr.ah->c_port = context->http_proxy_port; - n = send(wsi->sock, (char *)context->serv_buf, plen, + n = send(wsi->sock, (char *)pt->serv_buf, plen, MSG_NOSIGNAL); if (n < 0) { lwsl_debug("ERROR writing to proxy socket\n"); diff --git a/lib/client-parser.c b/lib/client-parser.c index ad8d58d61..1ae473f8e 100644 --- a/lib/client-parser.c +++ b/lib/client-parser.c @@ -23,6 +23,7 @@ int lws_client_rx_sm(struct lws *wsi, unsigned char c) { + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; int callback_action = LWS_CALLBACK_CLIENT_RECEIVE; unsigned short close_code; unsigned char *pp; @@ -31,7 +32,7 @@ int lws_client_rx_sm(struct lws *wsi, unsigned char c) if (wsi->u.ws.rx_draining_ext) { - struct lws **w = &wsi->context->rx_draining_ext_list; + struct lws **w = &pt->rx_draining_ext_list; lwsl_ext("%s: RX EXT DRAINING: Removing from list\n", __func__, c); assert(!c); eff_buf.token = NULL; @@ -536,8 +537,8 @@ utf8_fail: lwsl_info("utf8 error\n"); * last chunk */ wsi->u.ws.rx_draining_ext = 1; - wsi->u.ws.rx_draining_ext_list = wsi->context->rx_draining_ext_list; - wsi->context->rx_draining_ext_list = wsi; + wsi->u.ws.rx_draining_ext_list = pt->rx_draining_ext_list; + pt->rx_draining_ext_list = wsi; lwsl_ext("%s: RX EXT DRAINING: Adding to list\n", __func__); } if (wsi->state == LWSS_RETURNED_CLOSE_ALREADY || diff --git a/lib/client.c b/lib/client.c index 9424ed331..a308487c7 100644 --- a/lib/client.c +++ b/lib/client.c @@ -68,7 +68,9 @@ int lws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len) int lws_client_socket_service(struct lws_context *context, struct lws *wsi, struct lws_pollfd *pollfd) { - char *p = (char *)&context->serv_buf[0]; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + char *p = (char *)&pt->serv_buf[0]; + char *sb = p; unsigned char c; int n, len; @@ -103,8 +105,7 @@ int lws_client_socket_service(struct lws_context *context, return 0; } - n = recv(wsi->sock, (char *)context->serv_buf, - sizeof(context->serv_buf), 0); + n = recv(wsi->sock, sb, LWS_MAX_SOCKET_IO_BUF, 0); if (n < 0) { if (LWS_ERRNO == LWS_EAGAIN) { @@ -117,12 +118,12 @@ int lws_client_socket_service(struct lws_context *context, return 0; } - context->serv_buf[13] = '\0'; - if (strcmp((char *)context->serv_buf, "HTTP/1.0 200 ") && - strcmp((char *)context->serv_buf, "HTTP/1.1 200 ") + pt->serv_buf[13] = '\0'; + if (strcmp(sb, "HTTP/1.0 200 ") && + strcmp(sb, "HTTP/1.1 200 ") ) { lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS); - lwsl_err("ERROR proxy: %s\n", context->serv_buf); + lwsl_err("ERROR proxy: %s\n", sb); return 0; } @@ -269,9 +270,7 @@ some_wait: n = ERR_get_error(); if (n != SSL_ERROR_NONE) { lwsl_err("SSL connect error %lu: %s\n", - n, - ERR_error_string(n, - (char *)context->serv_buf)); + n, ERR_error_string(n, sb)); return 0; } } @@ -327,8 +326,7 @@ some_wait: n = ERR_get_error(); if (n != SSL_ERROR_NONE) { lwsl_err("SSL connect error %lu: %s\n", - n, ERR_error_string(n, - (char *)context->serv_buf)); + n, ERR_error_string(n, sb)); return 0; } } @@ -351,7 +349,7 @@ some_wait: lwsl_notice("accepting self-signed certificate\n"); } else { lwsl_err("server's cert didn't look good, X509_V_ERR = %d: %s\n", - n, ERR_error_string(n, (char *)context->serv_buf)); + n, ERR_error_string(n, sb)); lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS); return 0; @@ -380,10 +378,9 @@ some_wait: lws_latency_pre(context, wsi); - n = lws_ssl_capable_write(wsi, context->serv_buf, - p - (char *)context->serv_buf); + n = lws_ssl_capable_write(wsi, (unsigned char *)sb, p - sb); lws_latency(context, wsi, "send lws_issue_raw", n, - n == p - (char *)context->serv_buf); + n == p - sb); switch (n) { case LWS_SSL_CAPABLE_ERROR: lwsl_debug("ERROR writing to client socket\n"); @@ -508,14 +505,16 @@ strtolower(char *s) int lws_client_interpret_server_handshake(struct lws *wsi) { + int n, len, okay = 0, isErrorCodeReceived = 0, port = 0, ssl = 0; struct lws_context *context = wsi->context; int close_reason = LWS_CLOSE_STATUS_PROTOCOL_ERR; - int n, len, okay = 0, isErrorCodeReceived = 0, port = 0, ssl = 0; const char *pc, *prot, *ads = NULL, *path; char *p; #ifndef LWS_NO_EXTENSIONS - const struct lws_extension *ext; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + char *sb = (char *)&pt->serv_buf[0]; const struct lws_ext_options *opts; + const struct lws_extension *ext; char ext_name[128]; const char *c, *a; char ignore; @@ -662,13 +661,12 @@ check_extensions: * and go through matching them or identifying bogons */ - if (lws_hdr_copy(wsi, (char *)context->serv_buf, - sizeof(context->serv_buf), WSI_TOKEN_EXTENSIONS) < 0) { + if (lws_hdr_copy(wsi, sb, LWS_MAX_SOCKET_IO_BUF, WSI_TOKEN_EXTENSIONS) < 0) { lwsl_warn("ext list from server failed to copy\n"); goto bail2; } - c = (char *)context->serv_buf; + c = sb; n = 0; ignore = 0; a = NULL; @@ -1024,7 +1022,7 @@ lws_generate_client_handshake(struct lws *wsi, char *pkt) context->protocols[0].callback(wsi, LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER, NULL, &p, - (pkt + sizeof(context->serv_buf)) - p - 12); + (pkt + LWS_MAX_SOCKET_IO_BUF) - p - 12); p += sprintf(p, "\x0d\x0a"); diff --git a/lib/context.c b/lib/context.c index b66e09885..02b3f1fa3 100644 --- a/lib/context.c +++ b/lib/context.c @@ -96,15 +96,14 @@ lws_create_context(struct lws_context_creation_info *info) #endif lws_feature_status_libev(info); #endif - lwsl_info(" LWS_MAX_HEADER_LEN: %u\n", LWS_MAX_HEADER_LEN); - lwsl_info(" LWS_MAX_PROTOCOLS: %u\n", LWS_MAX_PROTOCOLS); - - lwsl_info(" SPEC_LATEST_SUPPORTED: %u\n", SPEC_LATEST_SUPPORTED); - lwsl_info(" AWAITING_TIMEOUT: %u\n", AWAITING_TIMEOUT); - lwsl_info(" sizeof (*info): %u\n", sizeof(*info)); + lwsl_info(" LWS_MAX_HEADER_LEN : %u\n", LWS_MAX_HEADER_LEN); + lwsl_info(" LWS_MAX_PROTOCOLS : %u\n", LWS_MAX_PROTOCOLS); + lwsl_info(" LWS_MAX_SMP : %u\n", LWS_MAX_SMP); + lwsl_info(" SPEC_LATEST_SUPPORTED : %u\n", SPEC_LATEST_SUPPORTED); + lwsl_info(" AWAITING_TIMEOUT : %u\n", AWAITING_TIMEOUT); + lwsl_info(" sizeof (*info) : %u\n", sizeof(*info)); #if LWS_POSIX lwsl_info(" SYSTEM_RANDOM_FILEPATH: '%s'\n", SYSTEM_RANDOM_FILEPATH); - lwsl_info(" LWS_MAX_ZLIB_CONN_BUFFER: %u\n", LWS_MAX_ZLIB_CONN_BUFFER); #endif if (lws_plat_context_early_init()) return NULL; @@ -120,6 +119,16 @@ lws_create_context(struct lws_context_creation_info *info) lwsl_notice(" Started with daemon pid %d\n", pid_daemon); } #endif + context->max_fds = getdtablesize(); + + if (info->count_threads) + context->count_threads = info->count_threads; + else + context->count_threads = 1; + + if (context->count_threads > LWS_MAX_SMP) + context->count_threads = LWS_MAX_SMP; + context->lserv_seen = 0; context->protocols = info->protocols; context->token_limits = info->token_limits; @@ -132,6 +141,26 @@ lws_create_context(struct lws_context_creation_info *info) context->ka_interval = info->ka_interval; context->ka_probes = info->ka_probes; + /* we zalloc only the used ones, so the memory is not wasted + * allocating for unused threads + */ + for (n = 0; n < context->count_threads; n++) { + context->pt[n].serv_buf = lws_zalloc(LWS_MAX_SOCKET_IO_BUF); + if (!context->pt[n].serv_buf) { + lwsl_err("OOM\n"); + return NULL; + } + } + + if (info->fd_limit_per_thread) + context->fd_limit_per_thread = info->fd_limit_per_thread; + else + context->fd_limit_per_thread = context->max_fds / + context->count_threads; + + lwsl_notice(" Threads: %d each %d fds\n", context->count_threads, + context->fd_limit_per_thread); + memset(&wsi, 0, sizeof(wsi)); wsi.context = context; @@ -151,7 +180,12 @@ lws_create_context(struct lws_context_creation_info *info) context->lws_ev_sigint_cb = &lws_sigint_cb; #endif /* LWS_USE_LIBEV */ - lwsl_info(" mem: context: %5u bytes\n", sizeof(struct lws_context)); + lwsl_info(" mem: context: %5u bytes (%d + (%d x %d))\n", + sizeof(struct lws_context) + + (context->count_threads * LWS_MAX_SOCKET_IO_BUF), + sizeof(struct lws_context), + context->count_threads, + LWS_MAX_SOCKET_IO_BUF); /* * allocate and initialize the pool of @@ -181,21 +215,24 @@ lws_create_context(struct lws_context_creation_info *info) /* this is per context */ lwsl_info(" mem: http hdr rsvd: %5u bytes ((%u + %u) x %u)\n", - (context->max_http_header_data + sizeof(struct allocated_headers)) * + (context->max_http_header_data + + sizeof(struct allocated_headers)) * context->max_http_header_pool, - context->max_http_header_data, sizeof(struct allocated_headers), + context->max_http_header_data, + sizeof(struct allocated_headers), context->max_http_header_pool); - - context->max_fds = getdtablesize(); - - context->fds = lws_zalloc(sizeof(struct lws_pollfd) * context->max_fds); - if (context->fds == NULL) { + n = sizeof(struct lws_pollfd) * context->count_threads * + context->fd_limit_per_thread; + context->pt[0].fds = lws_zalloc(n); + if (context->pt[0].fds == NULL) { lwsl_err("OOM allocating %d fds\n", context->max_fds); goto bail; } - - lwsl_info(" mem: pollfd map: %5u\n", - sizeof(struct lws_pollfd) * context->max_fds); + lwsl_info(" mem: pollfd map: %5u\n", n); + /* each thread serves his own chunk of fds */ + for (n = 1; n < (int)info->count_threads; n++) + context->pt[n].fds = context->pt[0].fds + + (n * context->fd_limit_per_thread); if (lws_plat_init(context, info)) goto bail; @@ -289,7 +326,7 @@ lws_context_destroy(struct lws_context *context) { const struct lws_protocols *protocol = NULL; struct lws wsi; - int n; + int n, m = context->count_threads; lwsl_notice("%s\n", __func__); @@ -306,14 +343,15 @@ lws_context_destroy(struct lws_context *context) lwsl_notice("Worst latency: %s\n", context->worst_latency_info); #endif - for (n = 0; n < context->fds_count; n++) { - struct lws *wsi = wsi_from_fd(context, context->fds[n].fd); - if (!wsi) - continue; - lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY + while (m--) + for (n = 0; n < context->pt[m].fds_count; n++) { + struct lws *wsi = wsi_from_fd(context, context->pt[m].fds[n].fd); + if (!wsi) + continue; + lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY /* no protocol close */); - n--; - } + n--; + } /* * give all extensions a chance to clean up any per-context @@ -345,10 +383,13 @@ lws_context_destroy(struct lws_context *context) ev_signal_stop(context->io_loop, &context->w_sigint.watcher); #endif /* LWS_USE_LIBEV */ + for (n = 0; n < context->count_threads; n++) + lws_free_set_NULL(context->pt[n].serv_buf); + lws_plat_context_early_destroy(context); lws_ssl_context_destroy(context); - if (context->fds) - lws_free(context->fds); + if (context->pt[0].fds) + lws_free(context->pt[0].fds); if (context->ah_pool) lws_free(context->ah_pool); if (context->http_header_data) diff --git a/lib/header.c b/lib/header.c index 8f71a2894..8c6b96e2a 100644 --- a/lib/header.c +++ b/lib/header.c @@ -177,11 +177,10 @@ lws_return_http_status(struct lws *wsi, unsigned int code, const char *html_body { int n, m; struct lws_context *context = lws_get_context(wsi); - unsigned char *p = context->serv_buf + - LWS_PRE; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + unsigned char *p = pt->serv_buf + LWS_PRE; unsigned char *start = p; - unsigned char *end = p + sizeof(context->serv_buf) - - LWS_PRE; + unsigned char *end = p + LWS_MAX_SOCKET_IO_BUF - LWS_PRE; if (!html_body) html_body = ""; diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index f8c60a387..ae58daa54 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -85,6 +85,7 @@ void lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason) { struct lws_context *context = wsi->context; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; int n, m, ret, old_state; struct lws_tokens eff_buf; @@ -261,7 +262,7 @@ just_kill_connection: wsi->mode == LWSCM_WS_CLIENT) { if (wsi->u.ws.rx_draining_ext) { - struct lws **w = &wsi->context->rx_draining_ext_list; + struct lws **w = &pt->rx_draining_ext_list; wsi->u.ws.rx_draining_ext = 0; /* remove us from context draining ext list */ @@ -276,7 +277,7 @@ just_kill_connection: } if (wsi->u.ws.tx_draining_ext) { - struct lws **w = &wsi->context->tx_draining_ext_list; + struct lws **w = &pt->tx_draining_ext_list; wsi->u.ws.tx_draining_ext = 0; /* remove us from context draining ext list */ @@ -546,15 +547,19 @@ LWS_VISIBLE int lws_callback_all_protocol(struct lws_context *context, const struct lws_protocols *protocol, int reason) { + struct lws_context_per_thread *pt = &context->pt[0]; struct lws *wsi; - int n; + int n, m = context->count_threads; - for (n = 0; n < context->fds_count; n++) { - wsi = wsi_from_fd(context, context->fds[n].fd); - if (!wsi) - continue; - if (wsi->protocol == protocol) - protocol->callback(wsi, reason, wsi->user_space, NULL, 0); + while (m--) { + for (n = 0; n < pt->fds_count; n++) { + wsi = wsi_from_fd(context, pt->fds[n].fd); + if (!wsi) + continue; + if (wsi->protocol == protocol) + protocol->callback(wsi, reason, wsi->user_space, NULL, 0); + } + pt++; } return 0; @@ -691,15 +696,19 @@ LWS_VISIBLE void lws_rx_flow_allow_all_protocol(const struct lws_context *context, const struct lws_protocols *protocol) { - int n; + const struct lws_context_per_thread *pt = &context->pt[0]; struct lws *wsi; + int n, m = context->count_threads; - for (n = 0; n < context->fds_count; n++) { - wsi = wsi_from_fd(context, context->fds[n].fd); - if (!wsi) - continue; - if (wsi->protocol == protocol) - lws_rx_flow_control(wsi, LWS_RXFLOW_ALLOW); + while (m--) { + for (n = 0; n < pt->fds_count; n++) { + wsi = wsi_from_fd(context, pt->fds[n].fd); + if (!wsi) + continue; + if (wsi->protocol == protocol) + lws_rx_flow_control(wsi, LWS_RXFLOW_ALLOW); + } + pt++; } } @@ -1013,6 +1022,12 @@ lws_get_context(const struct lws *wsi) return wsi->context; } +LWS_VISIBLE LWS_EXTERN int +lws_get_count_threads(struct lws_context *context) +{ + return context->count_threads; +} + LWS_VISIBLE LWS_EXTERN void * lws_wsi_user(struct lws *wsi) { @@ -1024,8 +1039,7 @@ lws_close_reason(struct lws *wsi, enum lws_close_status status, unsigned char *buf, size_t len) { unsigned char *p, *start; - int budget = sizeof(wsi->u.ws.ping_payload_buf) - - LWS_PRE; + int budget = sizeof(wsi->u.ws.ping_payload_buf) - LWS_PRE; assert(wsi->mode == LWSCM_WS_SERVING || wsi->mode == LWSCM_WS_CLIENT); @@ -1046,7 +1060,7 @@ _lws_rx_flow_control(struct lws *wsi) { /* there is no pending change */ if (!(wsi->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE)) { - lwsl_info("%s: no pending change\n", __func__); + lwsl_debug("%s: no pending change\n", __func__); return 0; } @@ -1147,7 +1161,8 @@ lws_check_utf8(unsigned char *state, unsigned char *buf, size_t len) */ LWS_VISIBLE LWS_EXTERN int -lws_parse_uri(char *p, const char **prot, const char **ads, int *port, const char **path) +lws_parse_uri(char *p, const char **prot, const char **ads, int *port, + const char **path) { const char *end; static const char *slash = "/"; diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index 5c7db72b9..09a3164cc 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -1298,6 +1298,11 @@ extern int lws_extension_callback_pm_deflate( * allocated for the lifetime of the context). If the pool is * busy new incoming connections must wait for accept until one * becomes free. + * @count_threads: how many contexts to create in an array, 0 = 1 + * @fd_limit_per_thread: nonzero means restrict each service thread to this + * many fds, 0 means the default which is divide the process fd + * limit by the number of threads. + * */ struct lws_context_creation_info { @@ -1329,6 +1334,9 @@ struct lws_context_creation_info { short max_http_header_data; short max_http_header_pool; + unsigned int count_threads; + unsigned int fd_limit_per_thread; + /* Add new things just above here ---^ * This is part of the ABI, don't needlessly break compatibility * @@ -1400,6 +1408,12 @@ lws_context_destroy(struct lws_context *context); LWS_VISIBLE LWS_EXTERN int lws_service(struct lws_context *context, int timeout_ms); +LWS_VISIBLE LWS_EXTERN int +lws_service_tsi(struct lws_context *context, int timeout_ms, int tsi); + +LWS_VISIBLE LWS_EXTERN void +lws_cancel_service_pt(struct lws *wsi); + LWS_VISIBLE LWS_EXTERN void lws_cancel_service(struct lws_context *context); @@ -1429,7 +1443,7 @@ lws_add_http_header_status(struct lws *wsi, unsigned int code, unsigned char **p, unsigned char *end); -LWS_EXTERN int +LWS_VISIBLE LWS_EXTERN int lws_http_transaction_completed(struct lws *wsi); #ifdef LWS_USE_LIBEV @@ -1449,6 +1463,10 @@ lws_sigint_cb(struct ev_loop *loop, struct ev_signal *watcher, int revents); LWS_VISIBLE LWS_EXTERN int lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd); +LWS_VISIBLE LWS_EXTERN int +lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, + int tsi); + LWS_VISIBLE LWS_EXTERN void * lws_context_user(struct lws_context *context); @@ -1742,6 +1760,9 @@ lws_get_fops(struct lws_context *context); LWS_VISIBLE LWS_EXTERN struct lws_context * lws_get_context(const struct lws *wsi); +LWS_VISIBLE LWS_EXTERN int +lws_get_count_threads(struct lws_context *context); + /* * Wsi-associated File Operations access helpers * diff --git a/lib/lws-plat-unix.c b/lib/lws-plat-unix.c index a0e4f8683..98276dbd2 100644 --- a/lib/lws-plat-unix.c +++ b/lib/lws-plat-unix.c @@ -59,7 +59,25 @@ static void lws_sigusr2(int sig) } /** - * lws_cancel_service() - Cancel servicing of pending websocket activity + * lws_cancel_service_pt() - Cancel servicing of pending socket activity + * on one thread + * @wsi: Cancel service on the thread this wsi is serviced by + * + * This function let a call to lws_service() waiting for a timeout + * immediately return. + */ +LWS_VISIBLE void +lws_cancel_service_pt(struct lws *wsi) +{ + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + char buf = 0; + + if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1) + lwsl_err("Cannot write to dummy pipe"); +} + +/** + * lws_cancel_service() - Cancel ALL servicing of pending socket activity * @context: Websocket context * * This function let a call to lws_service() waiting for a timeout @@ -68,10 +86,14 @@ static void lws_sigusr2(int sig) LWS_VISIBLE void lws_cancel_service(struct lws_context *context) { - char buf = 0; + struct lws_context_per_thread *pt = &context->pt[0]; + char buf = 0, m = context->count_threads; - if (write(context->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1) - lwsl_err("Cannot write to dummy pipe"); + while (m--) { + if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1) + lwsl_err("Cannot write to dummy pipe"); + pt++; + } } LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line) @@ -96,12 +118,12 @@ LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line) } LWS_VISIBLE int -lws_plat_service(struct lws_context *context, int timeout_ms) +lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) { - int n; - int m; - char buf; + struct lws_context_per_thread *pt = &context->pt[tsi]; struct lws *wsi; + int n, m; + char buf; #ifdef LWS_OPENSSL_SUPPORT struct lws *wsi_next; #endif @@ -125,26 +147,26 @@ lws_plat_service(struct lws_context *context, int timeout_ms) context->service_tid = context->service_tid_detected; /* if we know we are draining rx ext, do not wait in poll */ - if (context->rx_draining_ext_list) + if (pt->rx_draining_ext_list) timeout_ms = 0; #ifdef LWS_OPENSSL_SUPPORT /* if we know we have non-network pending data, do not wait in poll */ - if (lws_ssl_anybody_has_buffered_read(context)) { + if (lws_ssl_anybody_has_buffered_read_tsi(context, tsi)) { timeout_ms = 0; lwsl_err("ssl buffered read\n"); } #endif - n = poll(context->fds, context->fds_count, timeout_ms); + n = poll(pt->fds, pt->fds_count, timeout_ms); #ifdef LWS_OPENSSL_SUPPORT - if (!context->rx_draining_ext_list && - !lws_ssl_anybody_has_buffered_read(context) && n == 0) { + if (!pt->rx_draining_ext_list && + !lws_ssl_anybody_has_buffered_read_tsi(context, tsi) && !n) { #else - if (!context->rx_draining_ext_list && n == 0) /* poll timeout */ { + if (!pt->rx_draining_ext_list && !n) /* poll timeout */ { #endif - lws_service_fd(context, NULL); + lws_service_fd_tsi(context, NULL, tsi); return 0; } @@ -158,10 +180,10 @@ lws_plat_service(struct lws_context *context, int timeout_ms) * For all guys with already-available ext data to drain, if they are * not flowcontrolled, fake their POLLIN status */ - wsi = context->rx_draining_ext_list; + wsi = pt->rx_draining_ext_list; while (wsi) { - context->fds[wsi->position_in_fds_table].revents |= - context->fds[wsi->position_in_fds_table].events & POLLIN; + pt->fds[wsi->position_in_fds_table].revents |= + pt->fds[wsi->position_in_fds_table].events & POLLIN; wsi = wsi->u.ws.rx_draining_ext_list; } @@ -173,12 +195,12 @@ lws_plat_service(struct lws_context *context, int timeout_ms) * network socket may have nothing */ - wsi = context->pending_read_list; + wsi = pt->pending_read_list; while (wsi) { wsi_next = wsi->pending_read_list_next; - context->fds[wsi->position_in_fds_table].revents |= - context->fds[wsi->position_in_fds_table].events & POLLIN; - if (context->fds[wsi->position_in_fds_table].revents & POLLIN) + pt->fds[wsi->position_in_fds_table].revents |= + pt->fds[wsi->position_in_fds_table].events & POLLIN; + if (pt->fds[wsi->position_in_fds_table].revents & POLLIN) /* * he's going to get serviced now, take him off the * list of guys with buffered SSL. If he still has some @@ -193,17 +215,17 @@ lws_plat_service(struct lws_context *context, int timeout_ms) /* any socket with events to service? */ - for (n = 0; n < context->fds_count; n++) { - if (!context->fds[n].revents) + for (n = 0; n < pt->fds_count; n++) { + if (!pt->fds[n].revents) continue; - if (context->fds[n].fd == context->dummy_pipe_fds[0]) { - if (read(context->fds[n].fd, &buf, 1) != 1) + if (pt->fds[n].fd == pt->dummy_pipe_fds[0]) { + if (read(pt->fds[n].fd, &buf, 1) != 1) lwsl_err("Cannot read from dummy pipe."); continue; } - m = lws_service_fd(context, &context->fds[n]); + m = lws_service_fd_tsi(context, &pt->fds[n], tsi); if (m < 0) return -1; /* if something closed, retry this slot */ @@ -214,6 +236,12 @@ lws_plat_service(struct lws_context *context, int timeout_ms) return 0; } +LWS_VISIBLE int +lws_plat_service(struct lws_context *context, int timeout_ms) +{ + return lws_plat_service_tsi(context, timeout_ms, 0); +} + LWS_VISIBLE int lws_plat_set_socket_options(struct lws_context *context, int fd) { @@ -332,11 +360,17 @@ lws_plat_context_early_destroy(struct lws_context *context) LWS_VISIBLE void lws_plat_context_late_destroy(struct lws_context *context) { + struct lws_context_per_thread *pt = &context->pt[0]; + int m = context->count_threads; + if (context->lws_lookup) lws_free(context->lws_lookup); - close(context->dummy_pipe_fds[0]); - close(context->dummy_pipe_fds[1]); + while (m--) { + close(pt->dummy_pipe_fds[0]); + close(pt->dummy_pipe_fds[1]); + pt++; + } close(context->fd_random); } @@ -411,11 +445,12 @@ lws_interface_to_sa(int ipv6, const char *ifname, struct sockaddr_in *addr, size } LWS_VISIBLE void -lws_plat_insert_socket_into_fds(struct lws_context *context, - struct lws *wsi) +lws_plat_insert_socket_into_fds(struct lws_context *context, struct lws *wsi) { + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + lws_libev_io(wsi, LWS_EV_START | LWS_EV_READ); - context->fds[context->fds_count++].revents = 0; + pt->fds[pt->fds_count++].revents = 0; } LWS_VISIBLE void @@ -514,39 +549,47 @@ LWS_VISIBLE int lws_plat_init(struct lws_context *context, struct lws_context_creation_info *info) { - context->lws_lookup = lws_zalloc(sizeof(struct lws *) * context->max_fds); + struct lws_context_per_thread *pt = &context->pt[0]; + int n = context->count_threads, fd; + + /* master context has the global fd lookup array */ + context->lws_lookup = lws_zalloc(sizeof(struct lws *) * + context->max_fds); if (context->lws_lookup == NULL) { - lwsl_err( - "Unable to allocate lws_lookup array for %d connections\n", - context->max_fds); + lwsl_err("OOM on lws_lookup array for %d connections\n", + context->max_fds); return 1; } lwsl_notice(" mem: platform fd map: %5u bytes\n", sizeof(struct lws *) * context->max_fds); + fd = open(SYSTEM_RANDOM_FILEPATH, O_RDONLY); - context->fd_random = open(SYSTEM_RANDOM_FILEPATH, O_RDONLY); + context->fd_random = fd; if (context->fd_random < 0) { lwsl_err("Unable to open random device %s %d\n", - SYSTEM_RANDOM_FILEPATH, context->fd_random); + SYSTEM_RANDOM_FILEPATH, context->fd_random); return 1; } if (!lws_libev_init_fd_table(context)) { /* otherwise libev handled it instead */ - if (pipe(context->dummy_pipe_fds)) { - lwsl_err("Unable to create pipe\n"); - return 1; + while (n--) { + if (pipe(pt->dummy_pipe_fds)) { + lwsl_err("Unable to create pipe\n"); + return 1; + } + + /* use the read end of pipe as first item */ + pt->fds[0].fd = pt->dummy_pipe_fds[0]; + pt->fds[0].events = LWS_POLLIN; + pt->fds[0].revents = 0; + pt->fds_count = 1; + pt++; } } - /* use the read end of pipe as first item */ - context->fds[0].fd = context->dummy_pipe_fds[0]; - context->fds[0].events = LWS_POLLIN; - context->fds[0].revents = 0; - context->fds_count = 1; - context->fops.open = _lws_plat_file_open; context->fops.close = _lws_plat_file_close; context->fops.seek_cur = _lws_plat_file_seek_cur; diff --git a/lib/lws-plat-win.c b/lib/lws-plat-win.c index 7e7931042..36df16d64 100644 --- a/lib/lws-plat-win.c +++ b/lib/lws-plat-win.c @@ -1,3 +1,4 @@ +#define _WINSOCK_DEPRECATED_NO_WARNINGS #include "private-libwebsockets.h" unsigned long long @@ -126,7 +127,20 @@ LWS_VISIBLE int lws_poll_listen_fd(struct lws_pollfd *fd) LWS_VISIBLE void lws_cancel_service(struct lws_context *context) { - WSASetEvent(context->events[0]); + struct lws_context_per_thread *pt = &context->pt[0]; + int n = context->count_threads; + + while (n--) { + WSASetEvent(pt->events[0]); + pt++; + } +} + +LWS_VISIBLE void +lws_cancel_service_pt(struct lws *wsi) +{ + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + WSASetEvent(pt->events[0]); } LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line) @@ -135,7 +149,7 @@ LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line) } LWS_VISIBLE int -lws_plat_service(struct lws_context *context, int timeout_ms) +lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) { int n; int i; @@ -143,6 +157,7 @@ lws_plat_service(struct lws_context *context, int timeout_ms) WSANETWORKEVENTS networkevents; struct lws_pollfd *pfd; struct lws *wsi; + struct lws_context_per_thread *pt = &context->pt[tsi]; /* stay dead once we are dead */ @@ -160,8 +175,8 @@ lws_plat_service(struct lws_context *context, int timeout_ms) } context->service_tid = context->service_tid_detected; - for (i = 0; i < context->fds_count; ++i) { - pfd = &context->fds[i]; + for (i = 0; i < pt->fds_count; ++i) { + pfd = &pt->fds[i]; if (pfd->fd == context->lserv_fd) continue; @@ -179,7 +194,7 @@ lws_plat_service(struct lws_context *context, int timeout_ms) } } - ev = WSAWaitForMultipleEvents(context->fds_count + 1, context->events, + ev = WSAWaitForMultipleEvents(pt->fds_count + 1, pt->events, FALSE, timeout_ms, FALSE); context->service_tid = 0; @@ -189,17 +204,17 @@ lws_plat_service(struct lws_context *context, int timeout_ms) } if (ev == WSA_WAIT_EVENT_0) { - WSAResetEvent(context->events[0]); + WSAResetEvent(pt->events[0]); return 0; } - if (ev < WSA_WAIT_EVENT_0 || ev > WSA_WAIT_EVENT_0 + context->fds_count) + if (ev < WSA_WAIT_EVENT_0 || ev > WSA_WAIT_EVENT_0 + pt->fds_count) return -1; - pfd = &context->fds[ev - WSA_WAIT_EVENT_0 - 1]; + pfd = &pt->fds[ev - WSA_WAIT_EVENT_0 - 1]; if (WSAEnumNetworkEvents(pfd->fd, - context->events[ev - WSA_WAIT_EVENT_0], + pt->events[ev - WSA_WAIT_EVENT_0], &networkevents) == SOCKET_ERROR) { lwsl_err("WSAEnumNetworkEvents() failed with error %d\n", LWS_ERRNO); @@ -217,6 +232,12 @@ lws_plat_service(struct lws_context *context, int timeout_ms) return lws_service_fd(context, pfd); } +LWS_VISIBLE int +lws_plat_service(struct lws_context *context, int timeout_ms) +{ + return lws_plat_service_tsi(context, timeout_ms, 0); +} + LWS_VISIBLE int lws_plat_set_socket_options(struct lws_context *context, lws_sockfd_type fd) { @@ -289,9 +310,15 @@ lws_plat_context_early_init(void) LWS_VISIBLE void lws_plat_context_early_destroy(struct lws_context *context) { - if (context->events) { - WSACloseEvent(context->events[0]); - lws_free(context->events); + struct lws_context_per_thread *pt = &context->pt[0]; + int n = context->count_threads; + + while (n--) { + if (pt->events) { + WSACloseEvent(pt->events[0]); + lws_free(pt->events); + } + pt++; } } @@ -329,20 +356,23 @@ lws_interface_to_sa(int ipv6, } LWS_VISIBLE void -lws_plat_insert_socket_into_fds(struct lws_context *context, - struct lws *wsi) +lws_plat_insert_socket_into_fds(struct lws_context *context, struct lws *wsi) { - context->fds[context->fds_count++].revents = 0; - context->events[context->fds_count] = WSACreateEvent(); - WSAEventSelect(wsi->sock, context->events[context->fds_count], LWS_POLLIN); + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + + pt->fds[pt->fds_count++].revents = 0; + pt->events[pt->fds_count] = WSACreateEvent(); + WSAEventSelect(wsi->sock, pt->events[pt->fds_count], LWS_POLLIN); } LWS_VISIBLE void lws_plat_delete_socket_from_fds(struct lws_context *context, struct lws *wsi, int m) { - WSACloseEvent(context->events[m + 1]); - context->events[m + 1] = context->events[context->fds_count + 1]; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + + WSACloseEvent(pt->events[m + 1]); + pt->events[m + 1] = pt->events[pt->fds_count + 1]; } LWS_VISIBLE void @@ -354,6 +384,7 @@ LWS_VISIBLE int lws_plat_change_pollfd(struct lws_context *context, struct lws *wsi, struct lws_pollfd *pfd) { + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; long networkevents = LWS_POLLHUP; if ((pfd->events & LWS_POLLIN)) @@ -363,7 +394,7 @@ lws_plat_change_pollfd(struct lws_context *context, networkevents |= LWS_POLLOUT; if (WSAEventSelect(wsi->sock, - context->events[wsi->position_in_fds_table + 1], + pt->events[wsi->position_in_fds_table + 1], networkevents) != SOCKET_ERROR) return 0; @@ -497,7 +528,8 @@ LWS_VISIBLE int lws_plat_init(struct lws_context *context, struct lws_context_creation_info *info) { - int i; + struct lws_context_per_thread *pt = &context->pt[0]; + int i, n = context->count_threads; for (i = 0; i < FD_HASHTABLE_MODULUS; i++) { context->fd_hashtable[i].wsi = @@ -507,15 +539,20 @@ lws_plat_init(struct lws_context *context, return -1; } - context->events = lws_malloc(sizeof(WSAEVENT) * (context->max_fds + 1)); - if (context->events == NULL) { - lwsl_err("Unable to allocate events array for %d connections\n", - context->max_fds); - return 1; - } + while (n--) { + pt->events = lws_malloc(sizeof(WSAEVENT) * + (context->fd_limit_per_thread + 1)); + if (pt->events == NULL) { + lwsl_err("Unable to allocate events array for %d connections\n", + context->fd_limit_per_thread + 1); + return 1; + } - context->fds_count = 0; - context->events[0] = WSACreateEvent(); + pt->fds_count = 0; + pt->events[0] = WSACreateEvent(); + + pt++; + } context->fd_random = 0; diff --git a/lib/output.c b/lib/output.c index 9ba6d53dd..74ff8568c 100644 --- a/lib/output.c +++ b/lib/output.c @@ -233,6 +233,7 @@ handle_truncated_send: LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf, size_t len, enum lws_write_protocol wp) { + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; int masked7 = (wsi->mode == LWSCM_WS_CLIENT); unsigned char is_masked_bit = 0; unsigned char *dropmask = NULL; @@ -242,7 +243,7 @@ LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf, size_t len, if (wsi->state == LWSS_ESTABLISHED && wsi->u.ws.tx_draining_ext) { /* remove us from the list */ - struct lws **w = &wsi->context->tx_draining_ext_list; + struct lws **w = &pt->tx_draining_ext_list; lwsl_debug("%s: TX EXT DRAINING: Remove from list\n", __func__); wsi->u.ws.tx_draining_ext = 0; /* remove us from context draining ext list */ @@ -311,9 +312,8 @@ LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf, size_t len, if (n && eff_buf.token_len) { /* extension requires further draining */ wsi->u.ws.tx_draining_ext = 1; - wsi->u.ws.tx_draining_ext_list = - wsi->context->tx_draining_ext_list; - wsi->context->tx_draining_ext_list = wsi; + wsi->u.ws.tx_draining_ext_list = pt->tx_draining_ext_list; + pt->tx_draining_ext_list = wsi; /* we must come back to do more */ lws_callback_on_writable(wsi); /* @@ -551,6 +551,7 @@ send_raw: LWS_VISIBLE int lws_serve_http_file_fragment(struct lws *wsi) { struct lws_context *context = wsi->context; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; unsigned long amount; int n, m; @@ -569,8 +570,8 @@ LWS_VISIBLE int lws_serve_http_file_fragment(struct lws *wsi) goto all_sent; if (lws_plat_file_read(wsi, wsi->u.http.fd, &amount, - context->serv_buf, - sizeof(context->serv_buf)) < 0) + pt->serv_buf, + LWS_MAX_SOCKET_IO_BUF) < 0) return -1; /* caller will close */ n = (int)amount; @@ -578,7 +579,7 @@ LWS_VISIBLE int lws_serve_http_file_fragment(struct lws *wsi) lws_set_timeout(wsi, PENDING_TIMEOUT_HTTP_CONTENT, AWAITING_TIMEOUT); wsi->u.http.filepos += n; - m = lws_write(wsi, context->serv_buf, n, + m = lws_write(wsi, pt->serv_buf, n, wsi->u.http.filepos == wsi->u.http.filelen ? LWS_WRITE_HTTP_FINAL : LWS_WRITE_HTTP); if (m < 0) diff --git a/lib/parsers.c b/lib/parsers.c index 93a121075..17059af85 100644 --- a/lib/parsers.c +++ b/lib/parsers.c @@ -737,6 +737,7 @@ LWS_VISIBLE int lws_frame_is_binary(struct lws *wsi) int lws_rx_sm(struct lws *wsi, unsigned char c) { + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; struct lws_tokens eff_buf; int ret = 0, n, rx_draining_ext = 0; int callback_action = LWS_CALLBACK_RECEIVE; @@ -746,7 +747,7 @@ lws_rx_sm(struct lws *wsi, unsigned char c) switch (wsi->lws_rx_parse_state) { case LWS_RXPS_NEW: if (wsi->u.ws.rx_draining_ext) { - struct lws **w = &wsi->context->rx_draining_ext_list; + struct lws **w = &pt->rx_draining_ext_list; eff_buf.token = NULL; eff_buf.token_len = 0; @@ -1016,13 +1017,14 @@ handle_first: assert(wsi->u.ws.rx_ubuf); - if (wsi->u.ws.rx_ubuf_head + LWS_PRE + 4 >= wsi->u.ws.rx_ubuf_alloc) { - lwsl_err("Attempted overflow\n"); - return -1; - } + if (wsi->u.ws.rx_ubuf_head + LWS_PRE >= + wsi->u.ws.rx_ubuf_alloc) { + lwsl_err("Attempted overflow \n"); + return -1; + } if (wsi->u.ws.all_zero_nonce) wsi->u.ws.rx_ubuf[LWS_PRE + - (wsi->u.ws.rx_ubuf_head++)] = c; + (wsi->u.ws.rx_ubuf_head++)] = c; else wsi->u.ws.rx_ubuf[LWS_PRE + (wsi->u.ws.rx_ubuf_head++)] = @@ -1079,8 +1081,7 @@ spill: wsi->protocol->callback, wsi, LWS_CALLBACK_WS_PEER_INITIATED_CLOSE, wsi->user_space, - &wsi->u.ws.rx_ubuf[ - LWS_PRE], + &wsi->u.ws.rx_ubuf[LWS_PRE], wsi->u.ws.rx_ubuf_head)) return -1; @@ -1139,7 +1140,7 @@ ping_drop: default: lwsl_parser("passing opc %x up to exts\n", - wsi->u.ws.opcode); + wsi->u.ws.opcode); /* * It's something special we can't understand here. * Pass the payload up to the extension's parsing @@ -1189,8 +1190,8 @@ drain_extension: if (n && eff_buf.token_len) { /* extension had more... main loop will come back */ wsi->u.ws.rx_draining_ext = 1; - wsi->u.ws.rx_draining_ext_list = wsi->context->rx_draining_ext_list; - wsi->context->rx_draining_ext_list = wsi; + wsi->u.ws.rx_draining_ext_list = pt->rx_draining_ext_list; + pt->rx_draining_ext_list = wsi; } if (eff_buf.token_len > 0 || diff --git a/lib/pollfd.c b/lib/pollfd.c index c41a3b810..8ab372013 100644 --- a/lib/pollfd.c +++ b/lib/pollfd.c @@ -25,8 +25,9 @@ int insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi) { struct lws_pollargs pa = { wsi->sock, LWS_POLLIN, 0 }; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; - if (context->fds_count >= context->max_fds) { + if ((unsigned int)pt->fds_count >= context->fd_limit_per_thread) { lwsl_err("Too many fds (%d)\n", context->max_fds); return 1; } @@ -47,9 +48,9 @@ insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi) return -1; insert_wsi(context, wsi); - wsi->position_in_fds_table = context->fds_count; - context->fds[context->fds_count].fd = wsi->sock; - context->fds[context->fds_count].events = LWS_POLLIN; + wsi->position_in_fds_table = pt->fds_count; + pt->fds[pt->fds_count].fd = wsi->sock; + pt->fds[pt->fds_count].events = LWS_POLLIN; lws_plat_insert_socket_into_fds(context, wsi); @@ -72,10 +73,11 @@ remove_wsi_socket_from_fds(struct lws *wsi) struct lws *end_wsi; struct lws_pollargs pa = { wsi->sock, 0, 0 }; struct lws_context *context = wsi->context; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE); - --context->fds_count; + --pt->fds_count; #if !defined(_WIN32) && !defined(MBED_OPERATORS) if (wsi->sock > context->max_fds) { @@ -95,7 +97,7 @@ remove_wsi_socket_from_fds(struct lws *wsi) m = wsi->position_in_fds_table; /* replace the contents for this */ /* have the last guy take up the vacant slot */ - context->fds[m] = context->fds[context->fds_count]; + pt->fds[m] = pt->fds[pt->fds_count]; lws_plat_delete_socket_from_fds(context, wsi, m); @@ -104,7 +106,7 @@ remove_wsi_socket_from_fds(struct lws *wsi) * (still same fd pointing to same wsi) */ /* end guy's "position in fds table" changed */ - end_wsi = wsi_from_fd(context, context->fds[context->fds_count].fd); + end_wsi = wsi_from_fd(context, pt->fds[pt->fds_count].fd); end_wsi->position_in_fds_table = m; /* deletion guy's lws_lookup entry needs nuking */ delete_from_fd(context, wsi->sock); @@ -128,6 +130,7 @@ int lws_change_pollfd(struct lws *wsi, int _and, int _or) { struct lws_context *context; + struct lws_context_per_thread *pt; int tid; int sampled_tid; struct lws_pollfd *pfd; @@ -141,7 +144,9 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or) if (!context) return 1; - pfd = &context->fds[wsi->position_in_fds_table]; + pt = &context->pt[(int)wsi->tsi]; + + pfd = &pt->fds[wsi->position_in_fds_table]; pa.fd = wsi->sock; if (context->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, @@ -179,7 +184,7 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or) if (tid == -1) return -1; if (tid != sampled_tid) - lws_cancel_service(context); + lws_cancel_service_pt(wsi); } } @@ -250,8 +255,7 @@ lws_callback_on_writable(struct lws *wsi) network_sock: #endif - if (lws_ext_cb_active(wsi, - LWS_EXT_CB_REQUEST_ON_WRITEABLE, NULL, 0)) + if (lws_ext_cb_active(wsi, LWS_EXT_CB_REQUEST_ON_WRITEABLE, NULL, 0)) return 1; if (wsi->position_in_fds_table < 0) { @@ -281,15 +285,19 @@ LWS_VISIBLE int lws_callback_on_writable_all_protocol(const struct lws_context *context, const struct lws_protocols *protocol) { + const struct lws_context_per_thread *pt = &context->pt[0]; struct lws *wsi; - int n; + int n, m = context->count_threads; - for (n = 0; n < context->fds_count; n++) { - wsi = wsi_from_fd(context,context->fds[n].fd); - if (!wsi) - continue; - if (wsi->protocol == protocol) - lws_callback_on_writable(wsi); + while (m--) { + for (n = 0; n < pt->fds_count; n++) { + wsi = wsi_from_fd(context, pt->fds[n].fd); + if (!wsi) + continue; + if (wsi->protocol == protocol) + lws_callback_on_writable(wsi); + } + pt++; } return 0; diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 7a16cb50f..44295b788 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -314,9 +314,6 @@ extern "C" { #ifndef SYSTEM_RANDOM_FILEPATH #define SYSTEM_RANDOM_FILEPATH "/dev/urandom" #endif -#ifndef LWS_MAX_ZLIB_CONN_BUFFER -#define LWS_MAX_ZLIB_CONN_BUFFER (64 * 1024) -#endif /* * if not in a connection storm, check for incoming @@ -500,13 +497,45 @@ struct allocated_headers { unsigned char nfrag; }; +/* + * so we can have n connections being serviced simultaneously, + * these things need to be isolated per-thread. + */ + +struct lws_context_per_thread { + struct lws_pollfd *fds; + struct lws *rx_draining_ext_list; + struct lws *tx_draining_ext_list; +#ifdef LWS_OPENSSL_SUPPORT + struct lws *pending_read_list; /* linked list */ +#endif + /* + * usable by anything in the service code, but only if the scope + * does not last longer than the service action (since next service + * of any socket can likewise use it and overwrite) + */ + unsigned char *serv_buf; +#ifdef _WIN32 + WSAEVENT *events; +#else + int dummy_pipe_fds[2]; +#endif + int fds_count; +}; + +/* + * the rest is managed per-context, that includes + * + * - processwide single fd -> wsi lookup + * - contextwide headers pool + * - contextwide ssl context + * - contextwide proxy + */ + struct lws_context { time_t last_timeout_check_s; struct lws_plat_file_ops fops; -#ifdef _WIN32 - WSAEVENT *events; -#endif - struct lws_pollfd *fds; + struct lws_context_per_thread pt[LWS_MAX_SMP]; #ifdef _WIN32 /* different implementation between unix and windows */ struct lws_fd_hashtable fd_hashtable[FD_HASHTABLE_MODULUS]; @@ -530,23 +559,15 @@ struct lws_context { const struct lws_protocols *protocols; void *http_header_data; struct allocated_headers *ah_pool; - struct lws *rx_draining_ext_list; - struct lws *tx_draining_ext_list; + #ifdef LWS_OPENSSL_SUPPORT SSL_CTX *ssl_ctx; SSL_CTX *ssl_client_ctx; - struct lws *pending_read_list; /* linked list */ #endif #ifndef LWS_NO_EXTENSIONS const struct lws_extension *extensions; #endif - /* - * usable by anything in the service code, but only if the scope - * does not last longer than the service action (since next service - * of any socket can likewise use it and overwrite) - */ - unsigned char serv_buf[LWS_MAX_SOCKET_IO_BUF]; char http_proxy_address[128]; char proxy_basic_auth_token[128]; char canonical_hostname[128]; @@ -557,7 +578,6 @@ struct lws_context { lws_sockfd_type lserv_fd; - int fds_count; int max_fds; int listen_port; #ifdef LWS_USE_LIBEV @@ -571,6 +591,7 @@ struct lws_context { int lserv_seen; unsigned int http_proxy_port; unsigned int options; + unsigned int fd_limit_per_thread; /* * set to the Thread ID that's doing the service loop just before entry @@ -580,9 +601,6 @@ struct lws_context { */ volatile int service_tid; int service_tid_detected; -#ifndef _WIN32 - int dummy_pipe_fds[2]; -#endif int count_protocols; int ka_time; @@ -593,15 +611,21 @@ struct lws_context { int use_ssl; int allow_non_ssl_on_ssl_port; unsigned int user_supplied_ssl_ctx:1; -#define lws_ssl_anybody_has_buffered_read(ctx) \ - (ctx->use_ssl && ctx->pending_read_list) +#define lws_ssl_anybody_has_buffered_read(w) \ + (w->context->use_ssl && \ + w->context->pt[(int)w->tsi].pending_read_list) +#define lws_ssl_anybody_has_buffered_read_tsi(c, t) \ + (c->use_ssl && \ + c->pt[(int)t].pending_read_list) #else #define lws_ssl_anybody_has_buffered_read(ctx) (0) +#define lws_ssl_anybody_has_buffered_read_tsi(ctx, t) (0) #endif short max_http_header_data; short max_http_header_pool; short ah_count_in_use; + short count_threads; unsigned int being_destroyed:1; }; @@ -966,6 +990,7 @@ struct lws { char rx_frame_type; /* enum lws_write_protocol */ char pending_timeout; /* enum pending_timeout */ char pps; /* enum lws_pending_protocol_send */ + char tsi; /* thread service index we belong to */ }; LWS_EXTERN int log_level; @@ -1329,6 +1354,8 @@ LWS_EXTERN int lws_poll_listen_fd(struct lws_pollfd *fd); LWS_EXTERN int lws_plat_service(struct lws_context *context, int timeout_ms); +LWS_EXTERN LWS_VISIBLE int +lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi); LWS_EXTERN int lws_plat_init(struct lws_context *context, struct lws_context_creation_info *info); diff --git a/lib/server-handshake.c b/lib/server-handshake.c index 7b6fc8c05..93cba5a44 100644 --- a/lib/server-handshake.c +++ b/lib/server-handshake.c @@ -27,6 +27,7 @@ LWS_VISIBLE int lws_extension_server_handshake(struct lws *wsi, char **p) { struct lws_context *context = wsi->context; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; const struct lws_extension *ext; char ext_name[128]; int ext_count = 0; @@ -47,11 +48,11 @@ lws_extension_server_handshake(struct lws *wsi, char **p) * and go through them */ - if (lws_hdr_copy(wsi, (char *)context->serv_buf, - sizeof(context->serv_buf), WSI_TOKEN_EXTENSIONS) < 0) + if (lws_hdr_copy(wsi, (char *)pt->serv_buf, LWS_MAX_SOCKET_IO_BUF, + WSI_TOKEN_EXTENSIONS) < 0) return 1; - c = (char *)context->serv_buf; + c = (char *)pt->serv_buf; lwsl_parser("WSI_TOKEN_EXTENSIONS = '%s'\n", c); wsi->count_act_ext = 0; n = 0; @@ -157,6 +158,7 @@ lws_extension_server_handshake(struct lws *wsi, char **p) int handshake_0405(struct lws_context *context, struct lws *wsi) { + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; unsigned char hash[20]; int n; char *response; @@ -180,15 +182,14 @@ handshake_0405(struct lws_context *context, struct lws *wsi) * since key length is restricted above (currently 128), cannot * overflow */ - n = sprintf((char *)context->serv_buf, + n = sprintf((char *)pt->serv_buf, "%s258EAFA5-E914-47DA-95CA-C5AB0DC85B11", lws_hdr_simple_ptr(wsi, WSI_TOKEN_KEY)); - lws_SHA1(context->serv_buf, n, hash); + lws_SHA1(pt->serv_buf, n, hash); accept_len = lws_b64_encode_string((char *)hash, 20, - (char *)context->serv_buf, - sizeof(context->serv_buf)); + (char *)pt->serv_buf, LWS_MAX_SOCKET_IO_BUF); if (accept_len < 0) { lwsl_warn("Base64 encoded hash too long\n"); goto bail; @@ -202,13 +203,13 @@ handshake_0405(struct lws_context *context, struct lws *wsi) /* make a buffer big enough for everything */ - response = (char *)context->serv_buf + MAX_WEBSOCKET_04_KEY_LEN + LWS_PRE; + response = (char *)pt->serv_buf + MAX_WEBSOCKET_04_KEY_LEN + LWS_PRE; p = response; LWS_CPYAPP(p, "HTTP/1.1 101 Switching Protocols\x0d\x0a" "Upgrade: WebSocket\x0d\x0a" "Connection: Upgrade\x0d\x0a" "Sec-WebSocket-Accept: "); - strcpy(p, (char *)context->serv_buf); + strcpy(p, (char *)pt->serv_buf); p += accept_len; if (lws_hdr_total_length(wsi, WSI_TOKEN_PROTOCOL)) { diff --git a/lib/server.c b/lib/server.c index c92401d2f..3be716d9e 100644 --- a/lib/server.c +++ b/lib/server.c @@ -655,11 +655,27 @@ int lws_http_transaction_completed(struct lws *wsi) return 0; } +static int +lws_get_idlest_tsi(struct lws_context *context) +{ + unsigned int lowest = ~0; + int n, hit = 0; + + for (n = 0; n < context->count_threads; n++) + if ((unsigned int)context->pt[n].fds_count < lowest) { + lowest = context->pt[n].fds_count; + hit = n; + } + + return hit; +} + LWS_VISIBLE int lws_server_socket_service(struct lws_context *context, struct lws *wsi, struct lws_pollfd *pollfd) { lws_sockfd_type accept_fd = LWS_SOCK_INVALID; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; #if LWS_POSIX struct sockaddr_in cli_addr; socklen_t clilen; @@ -697,8 +713,8 @@ int lws_server_socket_service(struct lws_context *context, if (!(pollfd->revents & pollfd->events && LWS_POLLIN)) goto try_pollout; - len = lws_ssl_capable_read(wsi, context->serv_buf, - sizeof(context->serv_buf)); + len = lws_ssl_capable_read(wsi, pt->serv_buf, + LWS_MAX_SOCKET_IO_BUF); lwsl_debug("%s: read %d\r\n", __func__, len); switch (len) { case 0: @@ -719,7 +735,7 @@ int lws_server_socket_service(struct lws_context *context, * hm this may want to send * (via HTTP callback for example) */ - n = lws_read(wsi, context->serv_buf, len); + n = lws_read(wsi, pt->serv_buf, len); if (n < 0) /* we closed wsi */ return 1; @@ -809,6 +825,8 @@ try_pollout: } new_wsi->sock = accept_fd; + new_wsi->tsi = lws_get_idlest_tsi(context); + lwsl_info("Accepted to tsi %d\n", new_wsi->tsi); /* the transport is accepted... give him time to negotiate */ lws_set_timeout(new_wsi, PENDING_TIMEOUT_ESTABLISH_WITH_SERVER, @@ -877,9 +895,10 @@ LWS_VISIBLE int lws_serve_http_file(struct lws *wsi, const char *file, int other_headers_len) { struct lws_context *context = lws_get_context(wsi); - unsigned char *response = context->serv_buf + LWS_PRE; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + unsigned char *response = pt->serv_buf + LWS_PRE; unsigned char *p = response; - unsigned char *end = p + sizeof(context->serv_buf) - LWS_PRE; + unsigned char *end = p + LWS_MAX_SOCKET_IO_BUF - LWS_PRE; int ret = 0; wsi->u.http.fd = lws_plat_file_open(wsi, file, &wsi->u.http.filelen, diff --git a/lib/service.c b/lib/service.c index 9ddbeaea8..af72dc537 100644 --- a/lib/service.c +++ b/lib/service.c @@ -370,11 +370,12 @@ int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len) */ LWS_VISIBLE int -lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd) +lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int tsi) { #if LWS_POSIX int idx = 0; #endif + struct lws_context_per_thread *pt = &context->pt[tsi]; lws_sockfd_type our_fd = 0; struct lws_tokens eff_buf; unsigned int pending = 0; @@ -449,7 +450,7 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd) * pending connection here, it causes us to check again next time. */ - if (context->lserv_fd && pollfd != &context->fds[idx]) { + if (context->lserv_fd && pollfd != &pt->fds[idx]) { context->lserv_count++; if (context->lserv_seen || context->lserv_count == context->lserv_mod) { @@ -462,14 +463,14 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd) * even with extpoll, we prepared this * internal fds for listen */ - n = lws_poll_listen_fd(&context->fds[idx]); + n = lws_poll_listen_fd(&pt->fds[idx]); if (n <= 0) { if (context->lserv_seen) context->lserv_seen--; break; } /* there's a conn waiting for us */ - lws_service_fd(context, &context->fds[idx]); + lws_service_fd(context, &pt->fds[idx]); context->lserv_seen++; } } @@ -605,9 +606,8 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd) if (!(pollfd->revents & pollfd->events & LWS_POLLIN)) break; read: - eff_buf.token_len = lws_ssl_capable_read(wsi, context->serv_buf, - pending ? pending : - sizeof(context->serv_buf)); + eff_buf.token_len = lws_ssl_capable_read(wsi, pt->serv_buf, + pending ? pending : LWS_MAX_SOCKET_IO_BUF); switch (eff_buf.token_len) { case 0: lwsl_info("service_fd: closing due to 0 length read\n"); @@ -633,7 +633,7 @@ read: * used then so it is efficient. */ - eff_buf.token = (char *)context->serv_buf; + eff_buf.token = (char *)pt->serv_buf; drain: do { @@ -672,8 +672,8 @@ drain: pending = lws_ssl_pending(wsi); if (pending) { handle_pending: - pending = pending > sizeof(context->serv_buf) ? - sizeof(context->serv_buf) : pending; + pending = pending > LWS_MAX_SOCKET_IO_BUF ? + LWS_MAX_SOCKET_IO_BUF : pending; goto read; } @@ -720,6 +720,12 @@ handled: return n; } +LWS_VISIBLE int +lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd) +{ + return lws_service_fd_tsi(context, pollfd, 0); +} + /** * lws_service() - Service any pending websocket activity * @context: Websocket context @@ -758,3 +764,9 @@ lws_service(struct lws_context *context, int timeout_ms) return lws_plat_service(context, timeout_ms); } +LWS_VISIBLE int +lws_service_tsi(struct lws_context *context, int timeout_ms, int tsi) +{ + return lws_plat_service_tsi(context, timeout_ms, tsi); +} + diff --git a/lib/ssl.c b/lib/ssl.c index f72b6b5b1..3b66ccfb5 100644 --- a/lib/ssl.c +++ b/lib/ssl.c @@ -153,7 +153,7 @@ lws_context_init_server_ssl(struct lws_context_creation_info *info, error = ERR_get_error(); lwsl_err("problem creating ssl method %lu: %s\n", error, ERR_error_string(error, - (char *)context->serv_buf)); + (char *)context->pt[0].serv_buf)); return 1; } context->ssl_ctx = SSL_CTX_new(method); /* create context */ @@ -161,7 +161,7 @@ lws_context_init_server_ssl(struct lws_context_creation_info *info, error = ERR_get_error(); lwsl_err("problem creating ssl context %lu: %s\n", error, ERR_error_string(error, - (char *)context->serv_buf)); + (char *)context->pt[0].serv_buf)); return 1; } @@ -217,7 +217,7 @@ lws_context_init_server_ssl(struct lws_context_creation_info *info, info->ssl_cert_filepath, error, ERR_error_string(error, - (char *)context->serv_buf)); + (char *)context->pt[0].serv_buf)); return 1; } lws_ssl_bind_passphrase(context->ssl_ctx, info); @@ -231,7 +231,7 @@ lws_context_init_server_ssl(struct lws_context_creation_info *info, lwsl_err("ssl problem getting key '%s' %lu: %s\n", info->ssl_private_key_filepath, error, ERR_error_string(error, - (char *)context->serv_buf)); + (char *)context->pt[0].serv_buf)); return 1; } } else @@ -250,7 +250,7 @@ lws_context_init_server_ssl(struct lws_context_creation_info *info, } #ifdef LWS_SSL_SERVER_WITH_ECDH_CERT - if (context->options & LWS_SERVER_OPTION_SSL_ECDH) { + if (context->options & LWS_SERVER_OPTION_SSL_ECDH) { lwsl_notice(" Using ECDH certificate support\n"); /* Get X509 certificate from ssl context */ @@ -352,7 +352,7 @@ int lws_context_init_client_ssl(struct lws_context_creation_info *info, error = ERR_get_error(); lwsl_err("problem creating ssl method %lu: %s\n", error, ERR_error_string(error, - (char *)context->serv_buf)); + (char *)context->pt[0].serv_buf)); return 1; } /* create context */ @@ -361,7 +361,7 @@ int lws_context_init_client_ssl(struct lws_context_creation_info *info, error = ERR_get_error(); lwsl_err("problem creating ssl context %lu: %s\n", error, ERR_error_string(error, - (char *)context->serv_buf)); + (char *)context->pt[0].serv_buf)); return 1; } @@ -416,7 +416,7 @@ int lws_context_init_client_ssl(struct lws_context_creation_info *info, info->ssl_cert_filepath, ERR_get_error(), ERR_error_string(ERR_get_error(), - (char *)context->serv_buf)); + (char *)context->pt[0].serv_buf)); return 1; } } @@ -429,7 +429,7 @@ int lws_context_init_client_ssl(struct lws_context_creation_info *info, info->ssl_private_key_filepath, ERR_get_error(), ERR_error_string(ERR_get_error(), - (char *)context->serv_buf)); + (char *)context->pt[0].serv_buf)); return 1; } @@ -459,16 +459,17 @@ LWS_VISIBLE void lws_ssl_remove_wsi_from_buffered_list(struct lws *wsi) { struct lws_context *context = wsi->context; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; if (!wsi->pending_read_list_prev && !wsi->pending_read_list_next && - context->pending_read_list != wsi) + pt->pending_read_list != wsi) /* we are not on the list */ return; /* point previous guy's next to our next */ if (!wsi->pending_read_list_prev) - context->pending_read_list = wsi->pending_read_list_next; + pt->pending_read_list = wsi->pending_read_list_next; else wsi->pending_read_list_prev->pending_read_list_next = wsi->pending_read_list_next; @@ -486,6 +487,7 @@ LWS_VISIBLE int lws_ssl_capable_read(struct lws *wsi, unsigned char *buf, int len) { struct lws_context *context = wsi->context; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; int n; if (!wsi->ssl) @@ -521,16 +523,16 @@ lws_ssl_capable_read(struct lws *wsi, unsigned char *buf, int len) return n; if (wsi->pending_read_list_prev) return n; - if (context->pending_read_list == wsi) + if (pt->pending_read_list == wsi) return n; /* add us to the linked list of guys with pending ssl */ - if (context->pending_read_list) - context->pending_read_list->pending_read_list_prev = wsi; + if (pt->pending_read_list) + pt->pending_read_list->pending_read_list_prev = wsi; - wsi->pending_read_list_next = context->pending_read_list; + wsi->pending_read_list_next = pt->pending_read_list; wsi->pending_read_list_prev = NULL; - context->pending_read_list = wsi; + pt->pending_read_list = wsi; return n; bail: @@ -596,6 +598,7 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi, { struct lws *wsi = *pwsi; struct lws_context *context = wsi->context; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; int n, m; #ifndef USE_WOLFSSL BIO *bio; @@ -674,8 +677,8 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi, lws_latency_pre(context, wsi); - n = recv(wsi->sock, (char *)context->serv_buf, - sizeof(context->serv_buf), MSG_PEEK); + n = recv(wsi->sock, (char *)pt->serv_buf, LWS_MAX_SOCKET_IO_BUF, + MSG_PEEK); /* * optionally allow non-SSL connect on SSL listening socket @@ -685,7 +688,7 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi, */ if (context->allow_non_ssl_on_ssl_port) { - if (n >= 1 && context->serv_buf[0] >= ' ') { + if (n >= 1 && pt->serv_buf[0] >= ' ') { /* * TLS content-type for Handshake is 0x16, and * for ChangeCipherSpec Record, it's 0x14 diff --git a/lws_config.h.in b/lws_config.h.in index e03c1d5bc..9d4a0a646 100644 --- a/lws_config.h.in +++ b/lws_config.h.in @@ -71,4 +71,7 @@ /* SSL server using ECDH certificate */ #cmakedefine LWS_SSL_SERVER_WITH_ECDH_CERT +/* Maximum supported service threads */ +#define LWS_MAX_SMP ${LWS_MAX_SMP} + ${LWS_SIZEOFPTR_CODE} diff --git a/test-server/test-server-pthreads.c b/test-server/test-server-pthreads.c index 4b1aa4556..317e4326a 100644 --- a/test-server/test-server-pthreads.c +++ b/test-server/test-server-pthreads.c @@ -1,7 +1,7 @@ /* * libwebsockets-test-server - libwebsockets test implementation * - * Copyright (C) 2010-2015 Andy Green + * Copyright (C) 2010-2016 Andy Green * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -132,12 +132,34 @@ void *thread_dumb_increment(void *threadid) pthread_exit(NULL); } +void *thread_service(void *threadid) +{ + while (lws_service_tsi(context, 50, (int)(long)threadid) >= 0 && !force_exit) + ; + + pthread_exit(NULL); +} + void sighandler(int sig) { force_exit = 1; lws_cancel_service(context); } +static const struct lws_extension exts[] = { + { + "permessage-deflate", + lws_extension_callback_pm_deflate, + "permessage-deflate" + }, + { + "deflate-frame", + lws_extension_callback_pm_deflate, + "deflate_frame" + }, + { NULL, NULL, NULL /* terminator */ } +}; + static struct option options[] = { { "help", no_argument, NULL, 'h' }, { "debug", required_argument, NULL, 'd' }, @@ -147,6 +169,7 @@ static struct option options[] = { { "interface", required_argument, NULL, 'i' }, { "closetest", no_argument, NULL, 'c' }, { "libev", no_argument, NULL, 'e' }, + { "threads", required_argument, NULL, 'j' }, #ifndef LWS_NO_DAEMONIZE { "daemonize", no_argument, NULL, 'D' }, #endif @@ -159,10 +182,11 @@ int main(int argc, char **argv) struct lws_context_creation_info info; char interface_name[128] = ""; const char *iface = NULL; - pthread_t pthread_dumb; + pthread_t pthread_dumb, pthread_service[32]; char cert_path[1024]; char key_path[1024]; int debug_level = 7; + int threads = 1; int use_ssl = 0; void *retval; int opts = 0; @@ -184,10 +208,17 @@ int main(int argc, char **argv) pthread_mutex_init(&lock_established_conns, NULL); while (n >= 0) { - n = getopt_long(argc, argv, "eci:hsap:d:Dr:", options, NULL); + n = getopt_long(argc, argv, "eci:hsap:d:Dr:j:", options, NULL); if (n < 0) continue; switch (n) { + case 'j': + threads = atoi(optarg); + if (threads > ARRAY_SIZE(pthread_service)) { + lwsl_err("Max threads %d\n", ARRAY_SIZE(pthread_service)); + return 1; + } + break; case 'e': opts |= LWS_SERVER_OPTION_LIBEV; break; @@ -259,7 +290,7 @@ int main(int argc, char **argv) lws_set_log_level(debug_level, lwsl_emit_syslog); lwsl_notice("libwebsockets test server - " - "(C) Copyright 2010-2015 Andy Green - " + "(C) Copyright 2010-2016 Andy Green - " "licensed under LGPL2.1\n"); printf("Using resource path \"%s\"\n", resource_path); @@ -302,6 +333,8 @@ int main(int argc, char **argv) info.gid = -1; info.uid = -1; info.options = opts; + info.count_threads = threads; + info.extensions = exts; context = lws_create_context(&info); if (context == NULL) { @@ -317,12 +350,21 @@ int main(int argc, char **argv) goto done; } - /* this is our service thread */ + /* + * notice the actual number of threads may be capped by the library, + * so use lws_get_count_threads() to get the actual amount of threads + * initialized. + */ - n = 0; - while (n >= 0 && !force_exit) { - n = lws_service(context, 50); - } + for (n = 0; n < lws_get_count_threads(context); n++) + if (pthread_create(&pthread_service[n], NULL, thread_service, + (void *)(long)n)) + lwsl_err("Failed to start service thread\n"); + + /* wait for all the service threads to exit */ + + for (n = 0; n < lws_get_count_threads(context); n++) + pthread_join(pthread_service[n], &retval); /* wait for pthread_dumb to exit */ pthread_join(pthread_dumb, &retval);