diff --git a/CMakeLists.txt b/CMakeLists.txt index 44609414..f1911176 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -85,10 +85,16 @@ set(LWS_WITHOUT_CLIENT ON) set(LWS_WITHOUT_TESTAPPS ON) set(LWS_WITHOUT_EXTENSIONS ON) set(LWS_MBED3 ON) +# this implies no pthreads in the lib set(LWS_MAX_SMP 1) endif() +if (WIN32) +# this implies no pthreads in the lib +set(LWS_MAX_SMP 1) +endif() + # Allow the user to override installation directories. set(LWS_INSTALL_LIB_DIR lib CACHE PATH "Installation directory for libraries") diff --git a/lib/context.c b/lib/context.c index 26aac353..3e5fc024 100644 --- a/lib/context.c +++ b/lib/context.c @@ -80,7 +80,7 @@ lws_create_context(struct lws_context_creation_info *info) int pid_daemon = get_daemonize_pid(); #endif char *p; - int n; + int n, m; lwsl_notice("Initial logging level %d\n", log_level); @@ -96,7 +96,7 @@ lws_create_context(struct lws_context_creation_info *info) #endif lws_feature_status_libev(info); #endif - lwsl_info(" LWS_MAX_HEADER_LEN : %u\n", LWS_MAX_HEADER_LEN); + lwsl_info(" LWS_DEF_HEADER_LEN : %u\n", LWS_DEF_HEADER_LEN); lwsl_info(" LWS_MAX_PROTOCOLS : %u\n", LWS_MAX_PROTOCOLS); lwsl_info(" LWS_MAX_SMP : %u\n", LWS_MAX_SMP); lwsl_info(" SPEC_LATEST_SUPPORTED : %u\n", SPEC_LATEST_SUPPORTED); @@ -129,7 +129,6 @@ lws_create_context(struct lws_context_creation_info *info) if (context->count_threads > LWS_MAX_SMP) context->count_threads = LWS_MAX_SMP; - context->lserv_seen = 0; context->protocols = info->protocols; context->token_limits = info->token_limits; context->listen_port = info->port; @@ -141,8 +140,18 @@ lws_create_context(struct lws_context_creation_info *info) context->ka_interval = info->ka_interval; context->ka_probes = info->ka_probes; - /* we zalloc only the used ones, so the memory is not wasted - * allocating for unused threads + if (info->max_http_header_data) + context->max_http_header_data = info->max_http_header_data; + else + context->max_http_header_data = LWS_DEF_HEADER_LEN; + if (info->max_http_header_pool) + context->max_http_header_pool = info->max_http_header_pool; + else + context->max_http_header_pool = LWS_DEF_HEADER_POOL; + + /* + * Allocate the per-thread storage for scratchpad buffers, + * and header data pool */ for (n = 0; n < context->count_threads; n++) { context->pt[n].serv_buf = lws_zalloc(LWS_MAX_SOCKET_IO_BUF); @@ -150,6 +159,22 @@ lws_create_context(struct lws_context_creation_info *info) lwsl_err("OOM\n"); return NULL; } + + context->pt[n].http_header_data = lws_malloc(context->max_http_header_data * + context->max_http_header_pool); + if (!context->pt[n].http_header_data) + goto bail; + + context->pt[n].ah_pool = lws_zalloc(sizeof(struct allocated_headers) * + context->max_http_header_pool); + for (m = 0; m < context->max_http_header_pool; m++) + context->pt[n].ah_pool[m].data = + (char *)context->pt[n].http_header_data + + (m * context->max_http_header_data); + if (!context->pt[n].ah_pool) + goto bail; + + lws_pt_mutex_init(&context->pt[n]); } if (info->fd_limit_per_thread) @@ -180,44 +205,18 @@ lws_create_context(struct lws_context_creation_info *info) context->lws_ev_sigint_cb = &lws_sigint_cb; #endif /* LWS_USE_LIBEV */ - lwsl_info(" mem: context: %5u bytes (%d + (%d x %d))\n", + lwsl_info(" mem: context: %5u bytes (%d ctx + (%d thr x %d))\n", sizeof(struct lws_context) + (context->count_threads * LWS_MAX_SOCKET_IO_BUF), sizeof(struct lws_context), context->count_threads, LWS_MAX_SOCKET_IO_BUF); - /* - * allocate and initialize the pool of - * allocated_header structs + data - */ - if (info->max_http_header_data) - context->max_http_header_data = info->max_http_header_data; - else - context->max_http_header_data = LWS_MAX_HEADER_LEN; - if (info->max_http_header_pool) - context->max_http_header_pool = info->max_http_header_pool; - else - context->max_http_header_pool = LWS_MAX_HEADER_POOL; - - context->http_header_data = lws_malloc(context->max_http_header_data * - context->max_http_header_pool); - if (!context->http_header_data) - goto bail; - context->ah_pool = lws_zalloc(sizeof(struct allocated_headers) * - context->max_http_header_pool); - if (!context->ah_pool) - goto bail; - - for (n = 0; n < context->max_http_header_pool; n++) - context->ah_pool[n].data = (char *)context->http_header_data + - (n * context->max_http_header_data); - - /* this is per context */ - lwsl_info(" mem: http hdr rsvd: %5u bytes ((%u + %u) x %u)\n", + lwsl_info(" mem: http hdr rsvd: %5u bytes (%u thr x (%u + %u) x %u))\n", (context->max_http_header_data + sizeof(struct allocated_headers)) * - context->max_http_header_pool, + context->max_http_header_pool * context->count_threads, + context->count_threads, context->max_http_header_data, sizeof(struct allocated_headers), context->max_http_header_pool); @@ -229,10 +228,13 @@ lws_create_context(struct lws_context_creation_info *info) goto bail; } lwsl_info(" mem: pollfd map: %5u\n", n); + +#if LWS_MAX_SMP > 1 /* each thread serves his own chunk of fds */ for (n = 1; n < (int)info->count_threads; n++) - context->pt[n].fds = context->pt[0].fds + - (n * context->fd_limit_per_thread); + context->pt[n].fds = context->pt[n - 1].fds + + context->fd_limit_per_thread; +#endif if (lws_plat_init(context, info)) goto bail; @@ -345,7 +347,7 @@ lws_context_destroy(struct lws_context *context) #endif while (m--) - for (n = 0; n < context->pt[m].fds_count; n++) { + for (n = 0; (unsigned int)n < context->pt[m].fds_count; n++) { struct lws *wsi = wsi_from_fd(context, context->pt[m].fds[n].fd); if (!wsi) continue; @@ -384,17 +386,18 @@ lws_context_destroy(struct lws_context *context) ev_signal_stop(context->io_loop, &context->w_sigint.watcher); #endif /* LWS_USE_LIBEV */ - for (n = 0; n < context->count_threads; n++) + for (n = 0; n < context->count_threads; n++) { lws_free_set_NULL(context->pt[n].serv_buf); + if (context->pt[n].ah_pool) + lws_free(context->pt[n].ah_pool); + if (context->pt[n].http_header_data) + lws_free(context->pt[n].http_header_data); + } lws_plat_context_early_destroy(context); lws_ssl_context_destroy(context); if (context->pt[0].fds) lws_free(context->pt[0].fds); - if (context->ah_pool) - lws_free(context->ah_pool); - if (context->http_header_data) - lws_free(context->http_header_data); lws_plat_context_late_destroy(context); diff --git a/lib/handshake.c b/lib/handshake.c index e5130563..310fd3c5 100644 --- a/lib/handshake.c +++ b/lib/handshake.c @@ -100,6 +100,7 @@ http_new: wsi->u.hdr.lextable_pos = 0; /* fallthru */ case LWSS_HTTP_HEADERS: + assert(wsi->u.hdr.ah); lwsl_parser("issuing %d bytes to parser\n", (int)len); if (lws_handshake_client(wsi, &buf, len)) @@ -182,6 +183,7 @@ postbody_completion: case LWSS_ESTABLISHED: case LWSS_AWAITING_CLOSE_ACK: + case LWSS_SHUTDOWN: if (lws_handshake_client(wsi, &buf, len)) goto bail; switch (wsi->mode) { diff --git a/lib/libev.c b/lib/libev.c index e52dedbd..167c0457 100644 --- a/lib/libev.c +++ b/lib/libev.c @@ -81,6 +81,7 @@ lws_initloop(struct lws_context *context, struct ev_loop *loop) const char * backend_name; int status = 0; int backend; + int m = 0; /* !!! TODO add pt support */ if (!loop) loop = ev_default_loop(0); @@ -91,7 +92,7 @@ lws_initloop(struct lws_context *context, struct ev_loop *loop) * Initialize the accept w_accept with the listening socket * and register a callback for read operations */ - ev_io_init(w_accept, lws_accept_cb, context->lserv_fd, EV_READ); + ev_io_init(w_accept, lws_accept_cb, context->pt[m].lserv_fd, EV_READ); ev_io_start(context->io_loop,w_accept); /* Register the signal watcher unless the user says not to */ diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index fad9ab31..26979e98 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -1,7 +1,7 @@ /* * libwebsockets - small server side websockets and web server implementation * - * Copyright (C) 2010-2014 Andy Green + * Copyright (C) 2010-2016 Andy Green * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -63,33 +63,72 @@ lws_free_wsi(struct lws *wsi) */ if (wsi->mode != LWSCM_WS_CLIENT && wsi->mode != LWSCM_WS_SERVING) - if (wsi->u.hdr.ah) - lws_free_header_table(wsi); + lws_free_header_table(wsi); + lws_free(wsi); } - static void lws_remove_from_timeout_list(struct lws *wsi) { + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + if (!wsi->timeout_list_prev) return; + lws_pt_lock(pt); if (wsi->timeout_list) wsi->timeout_list->timeout_list_prev = wsi->timeout_list_prev; *wsi->timeout_list_prev = wsi->timeout_list; wsi->timeout_list_prev = NULL; wsi->timeout_list = NULL; + lws_pt_unlock(pt); } +/** + * lws_set_timeout() - marks the wsi as subject to a timeout + * + * You will not need this unless you are doing something special + * + * @wsi: Websocket connection instance + * @reason: timeout reason + * @secs: how many seconds + */ + +LWS_VISIBLE void +lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs) +{ + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + time_t now; + + lws_pt_lock(pt); + + time(&now); + + if (!wsi->pending_timeout && reason) { + wsi->timeout_list = pt->timeout_list; + if (wsi->timeout_list) + wsi->timeout_list->timeout_list_prev = &wsi->timeout_list; + wsi->timeout_list_prev = &pt->timeout_list; + *wsi->timeout_list_prev = wsi; + } + + wsi->pending_timeout_limit = now + secs; + wsi->pending_timeout = reason; + + lws_pt_unlock(pt); + + if (!reason) + lws_remove_from_timeout_list(wsi); +} void lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason) { struct lws_context *context; struct lws_context_per_thread *pt; - int n, m, ret, old_state; + int n, m, ret; struct lws_tokens eff_buf; if (!wsi) @@ -97,7 +136,6 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason) context = wsi->context; pt = &context->pt[(int)wsi->tsi]; - old_state = wsi->state; if (wsi->mode == LWSCM_HTTP_SERVING_ACCEPTED && wsi->u.http.fd != LWS_INVALID_FILE) { @@ -108,10 +146,13 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason) wsi->user_space, NULL, 0); } if (wsi->socket_is_permanently_unusable || - reason == LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY) + reason == LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY || + wsi->state == LWSS_SHUTDOWN) goto just_kill_connection; - switch (old_state) { + wsi->state_pre_close = wsi->state; + + switch (wsi->state_pre_close) { case LWSS_DEAD_SOCKET: return; @@ -203,7 +244,7 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason) * LWSS_AWAITING_CLOSE_ACK and will skip doing this a second time. */ - if (old_state == LWSS_ESTABLISHED && + if (wsi->state_pre_close == LWSS_ESTABLISHED && (wsi->u.ws.close_in_ping_buffer_len || /* already a reason */ (reason != LWS_CLOSE_STATUS_NOSTATUS && (reason != LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY)))) { @@ -218,8 +259,7 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason) reason & 0xff; } - n = lws_write(wsi, &wsi->u.ws.ping_payload_buf[ - LWS_PRE], + n = lws_write(wsi, &wsi->u.ws.ping_payload_buf[LWS_PRE], wsi->u.ws.close_in_ping_buffer_len, LWS_WRITE_CLOSE); if (n >= 0) { @@ -246,7 +286,28 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason) just_kill_connection: - lwsl_debug("close: just_kill_connection: %p\n", wsi); +#if LWS_POSIX + /* + * Testing with ab shows that we have to stage the socket close when + * the system is under stress... shutdown any further TX, change the + * state to one that won't emit anything more, and wait with a timeout + * for the POLLIN to show a zero-size rx before coming back and doing + * the actual close. + */ + if (wsi->state != LWSS_SHUTDOWN) { + lwsl_info("%s: shutting down connection: %p\n", __func__, wsi); + n = shutdown(wsi->sock, SHUT_WR); + if (n) + lwsl_debug("closing: shutdown ret %d\n", LWS_ERRNO); + wsi->state = LWSS_SHUTDOWN; + lws_change_pollfd(wsi, LWS_POLLOUT, LWS_POLLIN); + lws_set_timeout(wsi, PENDING_TIMEOUT_SHUTDOWN_FLUSH, + AWAITING_TIMEOUT); + return; + } +#endif + + lwsl_info("%s: real just_kill_connection: %p\n", __func__, wsi); /* * we won't be servicing or receiving anything further from this guy @@ -262,7 +323,7 @@ just_kill_connection: lws_free_set_NULL(wsi->rxflow_buffer); - if (old_state == LWSS_ESTABLISHED || + if (wsi->state_pre_close == LWSS_ESTABLISHED || wsi->mode == LWSCM_WS_SERVING || wsi->mode == LWSCM_WS_CLIENT) { @@ -308,10 +369,10 @@ just_kill_connection: /* tell the user it's all over for this guy */ if (wsi->protocol && wsi->protocol->callback && - ((old_state == LWSS_ESTABLISHED) || - (old_state == LWSS_RETURNED_CLOSE_ALREADY) || - (old_state == LWSS_AWAITING_CLOSE_ACK) || - (old_state == LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE))) { + ((wsi->state_pre_close == LWSS_ESTABLISHED) || + (wsi->state_pre_close == LWSS_RETURNED_CLOSE_ALREADY) || + (wsi->state_pre_close == LWSS_AWAITING_CLOSE_ACK) || + (wsi->state_pre_close == LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE))) { lwsl_debug("calling back CLOSED\n"); wsi->protocol->callback(wsi, LWS_CALLBACK_CLOSED, wsi->user_space, NULL, 0); @@ -327,7 +388,7 @@ just_kill_connection: wsi->user_space, NULL, 0); } else lwsl_debug("not calling back closed mode=%d state=%d\n", - wsi->mode, old_state); + wsi->mode, wsi->state_pre_close); /* deallocate any active extension contexts */ @@ -341,12 +402,10 @@ just_kill_connection: LWS_EXT_CB_DESTROY_ANY_WSI_CLOSING, NULL, 0) < 0) lwsl_warn("ext destroy wsi failed\n"); + wsi->socket_is_permanently_unusable = 1; + if (!lws_ssl_close(wsi) && lws_socket_is_valid(wsi->sock)) { #if LWS_POSIX - n = shutdown(wsi->sock, SHUT_RDWR); - if (n) - lwsl_debug("closing: shutdown ret %d\n", LWS_ERRNO); - n = compatible_close(wsi->sock); if (n) lwsl_debug("closing: close ret %d\n", LWS_ERRNO); @@ -555,7 +614,7 @@ lws_callback_all_protocol(struct lws_context *context, { struct lws_context_per_thread *pt = &context->pt[0]; struct lws *wsi; - int n, m = context->count_threads; + unsigned int n, m = context->count_threads; while (m--) { for (n = 0; n < pt->fds_count; n++) { @@ -571,39 +630,6 @@ lws_callback_all_protocol(struct lws_context *context, return 0; } -/** - * lws_set_timeout() - marks the wsi as subject to a timeout - * - * You will not need this unless you are doing something special - * - * @wsi: Websocket connection instance - * @reason: timeout reason - * @secs: how many seconds - */ - -LWS_VISIBLE void -lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs) -{ - time_t now; - - time(&now); - - if (!wsi->pending_timeout) { - wsi->timeout_list = wsi->context->timeout_list; - if (wsi->timeout_list) - wsi->timeout_list->timeout_list_prev = &wsi->timeout_list; - wsi->timeout_list_prev = &wsi->context->timeout_list; - *wsi->timeout_list_prev = wsi; - } - - wsi->pending_timeout_limit = now + secs; - wsi->pending_timeout = reason; - - if (!reason) - lws_remove_from_timeout_list(wsi); -} - - #if LWS_POSIX /** @@ -704,7 +730,7 @@ lws_rx_flow_allow_all_protocol(const struct lws_context *context, { const struct lws_context_per_thread *pt = &context->pt[0]; struct lws *wsi; - int n, m = context->count_threads; + unsigned int n, m = context->count_threads; while (m--) { for (n = 0; n < pt->fds_count; n++) { diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index 405d8974..982516f0 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -1497,6 +1497,7 @@ enum pending_timeout { PENDING_TIMEOUT_HTTP_CONTENT = 10, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND = 11, PENDING_FLUSH_STORED_SEND_BEFORE_CLOSE = 12, + PENDING_TIMEOUT_SHUTDOWN_FLUSH = 13, /****** add new things just above ---^ ******/ }; diff --git a/lib/lws-plat-unix.c b/lib/lws-plat-unix.c index 98276dbd..9953ac7a 100644 --- a/lib/lws-plat-unix.c +++ b/lib/lws-plat-unix.c @@ -122,7 +122,7 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) { struct lws_context_per_thread *pt = &context->pt[tsi]; struct lws *wsi; - int n, m; + int n, m, c; char buf; #ifdef LWS_OPENSSL_SUPPORT struct lws *wsi_next; @@ -215,10 +215,13 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) /* any socket with events to service? */ - for (n = 0; n < pt->fds_count; n++) { + c = n; + for (n = 0; n < pt->fds_count && c; n++) { if (!pt->fds[n].revents) continue; + c--; + if (pt->fds[n].fd == pt->dummy_pipe_fds[0]) { if (read(pt->fds[n].fd, &buf, 1) != 1) lwsl_err("Cannot read from dummy pipe."); @@ -457,6 +460,8 @@ LWS_VISIBLE void lws_plat_delete_socket_from_fds(struct lws_context *context, struct lws *wsi, int m) { + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + pt->fds_count--; } LWS_VISIBLE void diff --git a/lib/lws-plat-win.c b/lib/lws-plat-win.c index 36df16d6..a79bcffa 100644 --- a/lib/lws-plat-win.c +++ b/lib/lws-plat-win.c @@ -152,7 +152,7 @@ LWS_VISIBLE int lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) { int n; - int i; + unsigned int i; DWORD ev; WSANETWORKEVENTS networkevents; struct lws_pollfd *pfd; @@ -177,7 +177,7 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) for (i = 0; i < pt->fds_count; ++i) { pfd = &pt->fds[i]; - if (pfd->fd == context->lserv_fd) + if (pfd->fd == pt->lserv_fd) continue; if (pfd->events & LWS_POLLOUT) { @@ -372,7 +372,7 @@ lws_plat_delete_socket_from_fds(struct lws_context *context, struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; WSACloseEvent(pt->events[m + 1]); - pt->events[m + 1] = pt->events[pt->fds_count + 1]; + pt->events[m + 1] = pt->events[pt->fds_count--]; } LWS_VISIBLE void diff --git a/lib/output.c b/lib/output.c index 86d63d4c..5f54a4fa 100644 --- a/lib/output.c +++ b/lib/output.c @@ -30,7 +30,7 @@ lws_0405_frame_mask_generate(struct lws *wsi) wsi->u.ws.mask[2] = 0; wsi->u.ws.mask[3] = 0; #else - int n; + int n; /* fetch the per-frame nonce */ n = lws_get_random(lws_get_context(wsi), wsi->u.ws.mask, 4); @@ -643,6 +643,7 @@ lws_ssl_capable_write_no_ssl(struct lws *wsi, unsigned char *buf, int len) #if LWS_POSIX n = send(wsi->sock, (char *)buf, len, MSG_NOSIGNAL); + lwsl_info("%s: sent len %d result %d", __func__, len, n); if (n >= 0) return n; @@ -662,7 +663,7 @@ lws_ssl_capable_write_no_ssl(struct lws *wsi, unsigned char *buf, int len) // !!! #endif - lwsl_debug("ERROR writing len %d to skt %d\n", len, n); + lwsl_debug("ERROR writing len %d to skt fd %d err %d / errno %d\n", len, wsi->sock, n, LWS_ERRNO); return LWS_SSL_CAPABLE_ERROR; } #endif diff --git a/lib/parsers.c b/lib/parsers.c index 5b7aecdc..bc9bf4c0 100644 --- a/lib/parsers.c +++ b/lib/parsers.c @@ -27,8 +27,8 @@ unsigned char lextable[] = { #define FAIL_CHAR 0x08 -int -LWS_WARN_UNUSED_RESULT lextable_decode(int pos, char c) +int LWS_WARN_UNUSED_RESULT +lextable_decode(int pos, char c) { if (c >= 'A' && c <= 'Z') c += 'a' - 'A'; @@ -60,82 +60,173 @@ LWS_WARN_UNUSED_RESULT lextable_decode(int pos, char c) } } +static void +lws_reset_header_table(struct lws *wsi) +{ + /* init the ah to reflect no headers or data have appeared yet */ + memset(wsi->u.hdr.ah->frag_index, 0, sizeof(wsi->u.hdr.ah->frag_index)); + wsi->u.hdr.ah->nfrag = 0; + wsi->u.hdr.ah->pos = 0; +} + int LWS_WARN_UNUSED_RESULT lws_allocate_header_table(struct lws *wsi) { struct lws_context *context = wsi->context; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + struct lws_pollargs pa; + struct lws **pwsi; int n; - lwsl_debug("%s: wsi %p: ah %p\n", __func__, (void *)wsi, - (void *)wsi->u.hdr.ah); + lwsl_info("%s: wsi %p: ah %p (tsi %d)\n", __func__, (void *)wsi, + (void *)wsi->u.hdr.ah, wsi->tsi); /* if we are already bound to one, just clear it down */ - if (wsi->u.hdr.ah) + if (wsi->u.hdr.ah) { + lwsl_err("cleardown\n"); goto reset; + } + + lws_pt_lock(pt); + pwsi = &pt->ah_wait_list; + while (*pwsi) { + if (*pwsi == wsi) { + /* if already waiting on list, if no new ah just ret */ + if (pt->ah_count_in_use == + context->max_http_header_pool) { + lwsl_err("ah wl denied\n"); + goto bail; + } + /* new ah.... remove ourselves from waiting list */ + *pwsi = wsi->u.hdr.ah_wait_list; + wsi->u.hdr.ah_wait_list = NULL; + pt->ah_wait_list_length--; + break; + } + pwsi = &(*pwsi)->u.hdr.ah_wait_list; + } /* - * server should have suppressed the accept of a new wsi before this - * became the case. If initiating multiple client connects, make sure - * the ah pool is big enough to cope, or be prepared to retry + * pool is all busy... add us to waiting list and return that we + * weren't able to deliver it right now */ - if (context->ah_count_in_use == context->max_http_header_pool) { - lwsl_err("No free ah\n"); - return -1; + if (pt->ah_count_in_use == context->max_http_header_pool) { + lwsl_err("%s: adding %p to ah waiting list\n", __func__, wsi); + wsi->u.hdr.ah_wait_list = pt->ah_wait_list; + pt->ah_wait_list = wsi; + pt->ah_wait_list_length++; + + /* we cannot accept input then */ + + _lws_change_pollfd(wsi, LWS_POLLIN, 0, &pa); + goto bail; } for (n = 0; n < context->max_http_header_pool; n++) - if (!context->ah_pool[n].in_use) + if (!pt->ah_pool[n].in_use) break; /* if the count of in use said something free... */ assert(n != context->max_http_header_pool); - wsi->u.hdr.ah = &context->ah_pool[n]; + wsi->u.hdr.ah = &pt->ah_pool[n]; wsi->u.hdr.ah->in_use = 1; + pt->ah_count_in_use++; - context->ah_count_in_use++; - /* if we used up all the ah, defeat accepting new server connections */ - if (context->ah_count_in_use == context->max_http_header_pool) - if (_lws_server_listen_accept_flow_control(context, 0)) - return 1; + _lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa); - lwsl_debug("%s: wsi %p: ah %p: count %d (on exit)\n", - __func__, (void *)wsi, (void *)wsi->u.hdr.ah, - context->ah_count_in_use); + lwsl_info("%s: wsi %p: ah %p: count %d (on exit)\n", __func__, + (void *)wsi, (void *)wsi->u.hdr.ah, pt->ah_count_in_use); + + lws_pt_unlock(pt); reset: - /* init the ah to reflect no headers or data have appeared yet */ - memset(wsi->u.hdr.ah->frag_index, 0, sizeof(wsi->u.hdr.ah->frag_index)); - wsi->u.hdr.ah->nfrag = 0; - wsi->u.hdr.ah->pos = 0; + lws_reset_header_table(wsi); + time(&wsi->u.hdr.ah->assigned); return 0; + +bail: + lws_pt_unlock(pt); + + return 1; } int lws_free_header_table(struct lws *wsi) { struct lws_context *context = wsi->context; + struct allocated_headers *ah = wsi->u.hdr.ah; + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + struct lws_pollargs pa; + struct lws **pwsi; + time_t now; - lwsl_debug("%s: wsi %p: ah %p (count = %d)\n", __func__, (void *)wsi, - (void *)wsi->u.hdr.ah, context->ah_count_in_use); + lwsl_info("%s: wsi %p: ah %p (tsi=%d, count = %d)\n", __func__, (void *)wsi, + (void *)wsi->u.hdr.ah, wsi->tsi, pt->ah_count_in_use); - assert(wsi->u.hdr.ah); - if (!wsi->u.hdr.ah) - return 0; + lws_pt_lock(pt); + pwsi = &pt->ah_wait_list; + if (!wsi->u.hdr.ah) { /* remove from wait list if that's all */ + if (wsi->socket_is_permanently_unusable) + while (*pwsi) { + if (*pwsi == wsi) { + lwsl_info("%s: wsi %p, removing from wait list\n", + __func__, wsi); + *pwsi = wsi->u.hdr.ah_wait_list; + wsi->u.hdr.ah_wait_list = NULL; + pt->ah_wait_list_length--; + goto bail; + } + pwsi = &(*pwsi)->u.hdr.ah_wait_list; + } + + goto bail; + } + time(&now); + if (now - wsi->u.hdr.ah->assigned > 3) + lwsl_err("header assign - free time %d\n", + (int)(now - wsi->u.hdr.ah->assigned)); /* if we think we're freeing one, there should be one to free */ - assert(context->ah_count_in_use > 0); - + assert(pt->ah_count_in_use > 0); + /* and he should have been in use */ assert(wsi->u.hdr.ah->in_use); - wsi->u.hdr.ah->in_use = 0; - - /* if we just freed up one ah, allow new server connection */ - if (context->ah_count_in_use == context->max_http_header_pool) - if (_lws_server_listen_accept_flow_control(context, 1)) - return 1; - - context->ah_count_in_use--; wsi->u.hdr.ah = NULL; + if (!*pwsi) { + ah->in_use = 0; + pt->ah_count_in_use--; + + goto bail; + } + + /* somebody else on same tsi is waiting, give it to him */ + + lwsl_info("pt wait list %p\n", *pwsi); + while ((*pwsi)->u.hdr.ah_wait_list) + pwsi = &(*pwsi)->u.hdr.ah_wait_list; + + wsi = *pwsi; + lwsl_info("last wsi in wait list %p\n", wsi); + + wsi->u.hdr.ah = ah; + lws_reset_header_table(wsi); + time(&wsi->u.hdr.ah->assigned); + + assert(wsi->position_in_fds_table != -1); + + lwsl_info("%s: Enabling %p POLLIN\n", __func__, wsi); + /* his wait is over, let him progress */ + _lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa); + + /* point prev guy to next guy in list instead */ + *pwsi = wsi->u.hdr.ah_wait_list; + wsi->u.hdr.ah_wait_list = NULL; + pt->ah_wait_list_length--; + + assert(!!pt->ah_wait_list_length == !!(int)(long)pt->ah_wait_list); +bail: + lws_pt_unlock(pt); + return 0; } @@ -386,6 +477,8 @@ lws_parse(struct lws *wsi, unsigned char c) struct lws_context *context = wsi->context; unsigned int n, m, enc = 0; + assert(wsi->u.hdr.ah); + switch (wsi->u.hdr.parser_state) { default: @@ -651,8 +744,7 @@ swallow: lwsl_parser("known hdr %d\n", n); for (m = 0; m < ARRAY_SIZE(methods); m++) if (n == methods[m] && - ah->frag_index[ - methods[m]]) { + ah->frag_index[methods[m]]) { lwsl_warn("Duplicated method\n"); return -1; } diff --git a/lib/pollfd.c b/lib/pollfd.c index 8ab37201..c4609328 100644 --- a/lib/pollfd.c +++ b/lib/pollfd.c @@ -21,11 +21,80 @@ #include "private-libwebsockets.h" +int +_lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa) +{ + struct lws_context *context; + struct lws_context_per_thread *pt; + int ret = 0, pa_events = 1; + struct lws_pollfd *pfd; + int sampled_tid, tid; + + if (!wsi || wsi->position_in_fds_table < 0) + return 0; + + context = wsi->context; + pt = &context->pt[(int)wsi->tsi]; + assert(wsi->position_in_fds_table >= 0 && + wsi->position_in_fds_table < pt->fds_count); + + pfd = &pt->fds[wsi->position_in_fds_table]; + pa->fd = wsi->sock; + pa->prev_events = pfd->events; + pa->events = pfd->events = (pfd->events & ~_and) | _or; + + if (context->protocols[0].callback(wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD, + wsi->user_space, (void *)pa, 0)) { + ret = -1; + goto bail; + } + + /* + * if we changed something in this pollfd... + * ... and we're running in a different thread context + * than the service thread... + * ... and the service thread is waiting ... + * then cancel it to force a restart with our changed events + */ +#if LWS_POSIX + pa_events = pa->prev_events != pa->events; +#endif + if (pa_events) { + + if (lws_plat_change_pollfd(context, wsi, pfd)) { + lwsl_info("%s failed\n", __func__); + ret = -1; + goto bail; + } + + sampled_tid = context->service_tid; + if (sampled_tid) { + tid = context->protocols[0].callback(wsi, + LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0); + if (tid == -1) { + ret = -1; + goto bail; + } + if (tid != sampled_tid) + lws_cancel_service_pt(wsi); + } + } +bail: + return ret; +} + int insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi) { struct lws_pollargs pa = { wsi->sock, LWS_POLLIN, 0 }; struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + int ret = 0; +#ifndef LWS_NO_SERVER + struct lws_pollargs pa1; +#endif + + lwsl_info("%s: %p: tsi=%d, sock=%d, pos-in-fds=%d\n", + __func__, wsi, wsi->tsi, wsi->sock, pt->fds_count); if ((unsigned int)pt->fds_count >= context->fd_limit_per_thread) { lwsl_err("Too many fds (%d)\n", context->max_fds); @@ -47,83 +116,105 @@ insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi) wsi->user_space, (void *) &pa, 1)) return -1; + lws_pt_lock(pt); + pt->count_conns++; insert_wsi(context, wsi); wsi->position_in_fds_table = pt->fds_count; pt->fds[pt->fds_count].fd = wsi->sock; pt->fds[pt->fds_count].events = LWS_POLLIN; + /* don't apply this logic to the listening socket... */ +// if (wsi->mode != LWSCM_SERVER_LISTENER && !wsi->u.hdr.ah) +// pt->fds[pt->fds_count].events = 0; + + pa.events = pt->fds[pt->fds_count].events; + lws_plat_insert_socket_into_fds(context, wsi); /* external POLL support via protocol 0 */ if (context->protocols[0].callback(wsi, LWS_CALLBACK_ADD_POLL_FD, wsi->user_space, (void *) &pa, 0)) - return -1; + ret = -1; +#ifndef LWS_NO_SERVER + /* if no more room, defeat accepts on this thread */ + if ((unsigned int)pt->fds_count == context->fd_limit_per_thread - 1) + _lws_change_pollfd(pt->wsi_listening, LWS_POLLIN, 0, &pa1); +#endif + lws_pt_unlock(pt); if (context->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, wsi->user_space, (void *)&pa, 1)) - return -1; + ret = -1; - return 0; + return ret; } int remove_wsi_socket_from_fds(struct lws *wsi) { - int m; + int m, ret = 0; struct lws *end_wsi; struct lws_pollargs pa = { wsi->sock, 0, 0 }; +#ifndef LWS_NO_SERVER + struct lws_pollargs pa1; +#endif struct lws_context *context = wsi->context; struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; - lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE); - - --pt->fds_count; - #if !defined(_WIN32) && !defined(MBED_OPERATORS) if (wsi->sock > context->max_fds) { - lwsl_err("Socket fd %d too high (%d)\n", - wsi->sock, context->max_fds); + lwsl_err("fd %d too high (%d)\n", wsi->sock, context->max_fds); return 1; } #endif - lwsl_info("%s: wsi=%p, sock=%d, fds pos=%d\n", __func__, - wsi, wsi->sock, wsi->position_in_fds_table); - if (context->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, wsi->user_space, (void *)&pa, 1)) return -1; - m = wsi->position_in_fds_table; /* replace the contents for this */ + lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE); - /* have the last guy take up the vacant slot */ - pt->fds[m] = pt->fds[pt->fds_count]; + lws_pt_lock(pt); + + lwsl_info("%s: wsi=%p, sock=%d, fds pos=%d, end guy pos=%d, endfd=%d\n", + __func__, wsi, wsi->sock, wsi->position_in_fds_table, + pt->fds_count, pt->fds[pt->fds_count].fd); + + /* the guy who is to be deleted's slot index in pt->fds */ + m = wsi->position_in_fds_table; + + /* have the last guy take up the now vacant slot */ + pt->fds[m] = pt->fds[pt->fds_count - 1]; lws_plat_delete_socket_from_fds(context, wsi, m); - /* - * end guy's fds_lookup entry remains unchanged - * (still same fd pointing to same wsi) - */ - /* end guy's "position in fds table" changed */ + /* end guy's "position in fds table" is now the deletion guy's old one */ end_wsi = wsi_from_fd(context, pt->fds[pt->fds_count].fd); + assert(end_wsi); end_wsi->position_in_fds_table = m; + /* deletion guy's lws_lookup entry needs nuking */ delete_from_fd(context, wsi->sock); /* removed wsi has no position any more */ wsi->position_in_fds_table = -1; /* remove also from external POLL support via protocol 0 */ - if (lws_socket_is_valid(wsi->sock)) { + if (lws_socket_is_valid(wsi->sock)) if (context->protocols[0].callback(wsi, LWS_CALLBACK_DEL_POLL_FD, - wsi->user_space, (void *) &pa, 0)) - return -1; - } + wsi->user_space, (void *) &pa, 0)) + ret = -1; +#ifndef LWS_NO_SERVER + /* if this made some room, accept connects on this thread */ + if ((unsigned int)pt->fds_count < context->fd_limit_per_thread - 1) + _lws_change_pollfd(pt->wsi_listening, 0, LWS_POLLIN, &pa1); +#endif + lws_pt_unlock(pt); + if (context->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, wsi->user_space, (void *) &pa, 1)) - return -1; + ret = -1; - return 0; + return ret; } int @@ -131,11 +222,8 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or) { struct lws_context *context; struct lws_context_per_thread *pt; - int tid; - int sampled_tid; - struct lws_pollfd *pfd; + int ret = 0; struct lws_pollargs pa; - int pa_events = 1; if (!wsi || !wsi->protocol || wsi->position_in_fds_table < 0) return 1; @@ -144,55 +232,20 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or) if (!context) return 1; - pt = &context->pt[(int)wsi->tsi]; - - pfd = &pt->fds[wsi->position_in_fds_table]; - pa.fd = wsi->sock; - if (context->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, wsi->user_space, (void *) &pa, 0)) return -1; - pa.prev_events = pfd->events; - pa.events = pfd->events = (pfd->events & ~_and) | _or; - - if (context->protocols[0].callback(wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD, - wsi->user_space, (void *) &pa, 0)) - return -1; - - /* - * if we changed something in this pollfd... - * ... and we're running in a different thread context - * than the service thread... - * ... and the service thread is waiting ... - * then cancel it to force a restart with our changed events - */ -#if LWS_POSIX - pa_events = pa.prev_events != pa.events; -#endif - if (pa_events) { - - if (lws_plat_change_pollfd(context, wsi, pfd)) { - lwsl_info("%s failed\n", __func__); - return 1; - } - - sampled_tid = context->service_tid; - if (sampled_tid) { - tid = context->protocols[0].callback(wsi, - LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0); - if (tid == -1) - return -1; - if (tid != sampled_tid) - lws_cancel_service_pt(wsi); - } - } + pt = &context->pt[(int)wsi->tsi]; + lws_pt_lock(pt); + ret = _lws_change_pollfd(wsi, _and, _or, &pa); + lws_pt_unlock(pt); if (context->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, wsi->user_space, (void *) &pa, 0)) - return -1; + ret = -1; - return 0; + return ret; } @@ -287,7 +340,7 @@ lws_callback_on_writable_all_protocol(const struct lws_context *context, { const struct lws_context_per_thread *pt = &context->pt[0]; struct lws *wsi; - int n, m = context->count_threads; + unsigned int n, m = context->count_threads; while (m--) { for (n = 0; n < pt->fds_count; n++) { diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index b1eb8bcf..cfc0a713 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -1,7 +1,7 @@ /* * libwebsockets - small server side websockets and web server implementation * - * Copyright (C) 2010 - 2015 Andy Green + * Copyright (C) 2010 - 2016 Andy Green * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -34,6 +34,9 @@ #include #include #include +#if LWS_MAX_SMP > 1 +#include +#endif #ifdef LWS_HAVE_SYS_STAT_H #include @@ -60,6 +63,7 @@ #define MSG_NOSIGNAL 0 #define SHUT_RDWR SD_BOTH #define SOL_TCP IPPROTO_TCP +#define SHUT_WR SD_SEND #define compatible_close(fd) closesocket(fd) #define lws_set_blocking_send(wsi) wsi->sock_send_blocking = 1 @@ -280,11 +284,11 @@ extern "C" { #endif #endif -#ifndef LWS_MAX_HEADER_LEN -#define LWS_MAX_HEADER_LEN 1024 +#ifndef LWS_DEF_HEADER_LEN +#define LWS_DEF_HEADER_LEN 1024 #endif -#ifndef LWS_MAX_HEADER_POOL -#define LWS_MAX_HEADER_POOL 16 +#ifndef LWS_DEF_HEADER_POOL +#define LWS_DEF_HEADER_POOL 16 #endif #ifndef LWS_MAX_PROTOCOLS #define LWS_MAX_PROTOCOLS 5 @@ -299,7 +303,7 @@ extern "C" { #define SPEC_LATEST_SUPPORTED 13 #endif #ifndef AWAITING_TIMEOUT -#define AWAITING_TIMEOUT 5 +#define AWAITING_TIMEOUT 20 #endif #ifndef CIPHERS_LIST_STRING #define CIPHERS_LIST_STRING "DEFAULT" @@ -315,12 +319,6 @@ extern "C" { #define SYSTEM_RANDOM_FILEPATH "/dev/urandom" #endif -/* - * if not in a connection storm, check for incoming - * connections this many normal connection services - */ -#define LWS_lserv_mod 10 - enum lws_websocket_opcodes_07 { LWSWSOPC_CONTINUATION = 0, LWSWSOPC_TEXT_FRAME = 1, @@ -347,6 +345,7 @@ enum lws_connection_states { LWSS_RETURNED_CLOSE_ALREADY, LWSS_AWAITING_CLOSE_ACK, LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE, + LWSS_SHUTDOWN, LWSS_HTTP2_AWAIT_CLIENT_PREFACE, LWSS_HTTP2_ESTABLISHED_PRE_SETTINGS, @@ -411,6 +410,7 @@ enum connection_mode { /* transient, ssl delay hiding */ LWSCM_SSL_ACK_PENDING, + LWSCM_SSL_INIT, /* transient modes */ LWSCM_WSCL_WAITING_CONNECT, @@ -481,6 +481,7 @@ struct allocated_headers { * lws_fragments->nfrag for continuation. */ struct lws_fragments frags[WSI_TOKEN_COUNT * 2]; + time_t assigned; /* * for each recognized token, frag_index says which frag[] his data * starts in (0 means the token did not appear) @@ -503,12 +504,26 @@ struct allocated_headers { */ struct lws_context_per_thread { +#if LWS_MAX_SMP > 1 + pthread_mutex_t lock; +#endif struct lws_pollfd *fds; struct lws *rx_draining_ext_list; struct lws *tx_draining_ext_list; + struct lws *timeout_list; + void *http_header_data; + struct allocated_headers *ah_pool; + struct lws *ah_wait_list; + int ah_wait_list_length; #ifdef LWS_OPENSSL_SUPPORT struct lws *pending_read_list; /* linked list */ #endif +#ifndef LWS_NO_SERVER + struct lws *wsi_listening; +#endif + lws_sockfd_type lserv_fd; + + unsigned long count_conns; /* * usable by anything in the service code, but only if the scope * does not last longer than the service action (since next service @@ -520,7 +535,9 @@ struct lws_context_per_thread { #else int dummy_pipe_fds[2]; #endif - int fds_count; + unsigned int fds_count; + + short ah_count_in_use; }; /* @@ -551,14 +568,8 @@ struct lws_context { const char *iface; const struct lws_token_limits *token_limits; void *user_space; - struct lws *timeout_list; -#ifndef LWS_NO_SERVER - struct lws *wsi_listening; -#endif const struct lws_protocols *protocols; - void *http_header_data; - struct allocated_headers *ah_pool; #ifdef LWS_OPENSSL_SUPPORT SSL_CTX *ssl_ctx; @@ -576,8 +587,6 @@ struct lws_context { char worst_latency_info[256]; #endif - lws_sockfd_type lserv_fd; - int max_fds; int listen_port; #ifdef LWS_USE_LIBEV @@ -587,8 +596,6 @@ struct lws_context { int fd_random; int lserv_mod; - int lserv_count; - int lserv_seen; unsigned int http_proxy_port; unsigned int options; unsigned int fd_limit_per_thread; @@ -624,7 +631,6 @@ struct lws_context { short max_http_header_data; short max_http_header_pool; - short ah_count_in_use; short count_threads; unsigned int being_destroyed:1; @@ -697,6 +703,8 @@ enum uri_esc_states { struct _lws_http_mode_related { /* MUST be first in struct */ struct allocated_headers *ah; /* mirroring _lws_header_related */ + struct lws *ah_wait_list; + struct lws *new_wsi_list; unsigned long filepos; unsigned long filelen; lws_filefd_type fd; @@ -857,6 +865,7 @@ struct _lws_http2_related { struct _lws_header_related { /* MUST be first in struct */ struct allocated_headers *ah; + struct lws *ah_wait_list; enum uri_path_states ups; enum uri_esc_states ues; short lextable_pos; @@ -986,6 +995,7 @@ struct lws { unsigned char ietf_spec_revision; char mode; /* enum connection_mode */ char state; /* enum lws_connection_states */ + char state_pre_close; char lws_rx_parse_state; /* enum lws_rx_parse_state */ char rx_frame_type; /* enum lws_write_protocol */ char pending_timeout; /* enum pending_timeout */ @@ -1049,7 +1059,7 @@ LWS_EXTERN int delete_from_fd(struct lws_context *context, lws_sockfd_type fd); #else #define wsi_from_fd(A,B) A->lws_lookup[B] -#define insert_wsi(A,B) A->lws_lookup[B->sock]=B +#define insert_wsi(A,B) assert(A->lws_lookup[B->sock] == 0); A->lws_lookup[B->sock]=B #define delete_from_fd(A,B) A->lws_lookup[B]=0 #endif @@ -1222,7 +1232,7 @@ enum lws_ssl_capable_status { #define lws_ssl_capable_read lws_ssl_capable_read_no_ssl #define lws_ssl_capable_write lws_ssl_capable_write_no_ssl #define lws_ssl_pending lws_ssl_pending_no_ssl -#define lws_server_socket_service_ssl(_a, _b, _c, _d) (0) +#define lws_server_socket_service_ssl(_b, _c) (0) #define lws_ssl_close(_a) (0) #define lws_ssl_context_destroy(_a) #define lws_ssl_remove_wsi_from_buffered_list(_a) @@ -1236,9 +1246,7 @@ lws_ssl_capable_write(struct lws *wsi, unsigned char *buf, int len); LWS_EXTERN int LWS_WARN_UNUSED_RESULT lws_ssl_pending(struct lws *wsi); LWS_EXTERN int LWS_WARN_UNUSED_RESULT -lws_server_socket_service_ssl(struct lws **wsi, struct lws *new_wsi, - lws_sockfd_type accept_fd, - struct lws_pollfd *pollfd); +lws_server_socket_service_ssl(struct lws *new_wsi, lws_sockfd_type accept_fd); LWS_EXTERN int lws_ssl_close(struct lws *wsi); LWS_EXTERN void @@ -1265,6 +1273,29 @@ lws_context_init_http2_ssl(struct lws_context *context); #endif #endif +#if LWS_MAX_SMP > 1 +static LWS_INLINE void +lws_pt_mutex_init(struct lws_context_per_thread *pt) +{ + pthread_mutex_init(&pt->lock, NULL); +} +static LWS_INLINE void +lws_pt_lock(struct lws_context_per_thread *pt) +{ + pthread_mutex_lock(&pt->lock); +} + +static LWS_INLINE void +lws_pt_unlock(struct lws_context_per_thread *pt) +{ + pthread_mutex_unlock(&pt->lock); +} +#else +#define lws_pt_mutex_init(_a) (void)(_a) +#define lws_pt_lock(_a) (void)(_a) +#define lws_pt_unlock(_a) (void)(_a) +#endif + LWS_EXTERN int LWS_WARN_UNUSED_RESULT lws_ssl_capable_read_no_ssl(struct lws *wsi, unsigned char *buf, int len); @@ -1297,6 +1328,9 @@ lws_decode_ssl_error(void); LWS_EXTERN int _lws_rx_flow_control(struct lws *wsi); +LWS_EXTERN int +_lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa); + #ifndef LWS_NO_SERVER LWS_EXTERN int lws_server_socket_service(struct lws_context *context, struct lws *wsi, @@ -1304,7 +1338,7 @@ lws_server_socket_service(struct lws_context *context, struct lws *wsi, LWS_EXTERN int lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len); LWS_EXTERN int -_lws_server_listen_accept_flow_control(struct lws_context *context, int on); +_lws_server_listen_accept_flow_control(struct lws *twsi, int on); #else #define lws_server_socket_service(_a, _b, _c) (0) #define lws_handshake_server(_a, _b, _c) (0) diff --git a/lib/server-handshake.c b/lib/server-handshake.c index 93cba5a4..bbbcfcc0 100644 --- a/lib/server-handshake.c +++ b/lib/server-handshake.c @@ -166,14 +166,13 @@ handshake_0405(struct lws_context *context, struct lws *wsi) int accept_len; if (!lws_hdr_total_length(wsi, WSI_TOKEN_HOST) || - !lws_hdr_total_length(wsi, WSI_TOKEN_KEY)) { + !lws_hdr_total_length(wsi, WSI_TOKEN_KEY)) { lwsl_parser("handshake_04 missing pieces\n"); /* completed header processing, but missing some bits */ goto bail; } - if (lws_hdr_total_length(wsi, WSI_TOKEN_KEY) >= - MAX_WEBSOCKET_04_KEY_LEN) { + if (lws_hdr_total_length(wsi, WSI_TOKEN_KEY) >= MAX_WEBSOCKET_04_KEY_LEN) { lwsl_warn("Client key too long %d\n", MAX_WEBSOCKET_04_KEY_LEN); goto bail; } @@ -183,8 +182,8 @@ handshake_0405(struct lws_context *context, struct lws *wsi) * overflow */ n = sprintf((char *)pt->serv_buf, - "%s258EAFA5-E914-47DA-95CA-C5AB0DC85B11", - lws_hdr_simple_ptr(wsi, WSI_TOKEN_KEY)); + "%s258EAFA5-E914-47DA-95CA-C5AB0DC85B11", + lws_hdr_simple_ptr(wsi, WSI_TOKEN_KEY)); lws_SHA1(pt->serv_buf, n, hash); diff --git a/lib/server.c b/lib/server.c index aeb5f8ed..319e637f 100644 --- a/lib/server.c +++ b/lib/server.c @@ -33,8 +33,9 @@ int lws_context_init_server(struct lws_context_creation_info *info, socklen_t len = sizeof(struct sockaddr); struct sockaddr_in sin; struct sockaddr *v; - int n, opt = 1; + int n, opt = 1, limit = 1; #endif + int m = 0; lws_sockfd_type sockfd; struct lws *wsi; @@ -44,6 +45,11 @@ int lws_context_init_server(struct lws_context_creation_info *info, return 0; #if LWS_POSIX +#if defined(__linux__) + limit = context->count_threads; +#endif + + for (m = 0; m < limit; m++) { #ifdef LWS_USE_IPV6 if (LWS_IPV6_ENABLED(context)) sockfd = socket(AF_INET6, SOCK_STREAM, 0); @@ -69,6 +75,13 @@ int lws_context_init_server(struct lws_context_creation_info *info, compatible_close(sockfd); return 1; } +#if defined(__linux__) && defined(SO_REUSEPORT) + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, + (const void *)&opt, sizeof(opt)) < 0) { + compatible_close(sockfd); + return 1; + } +#endif #endif lws_plat_set_socket_options(context, sockfd); @@ -122,19 +135,19 @@ int lws_context_init_server(struct lws_context_creation_info *info, wsi->sock = sockfd; wsi->mode = LWSCM_SERVER_LISTENER; wsi->protocol = context->protocols; + wsi->tsi = m; - context->wsi_listening = wsi; + context->pt[m].wsi_listening = wsi; if (insert_wsi_socket_into_fds(context, wsi)) goto bail; - context->lserv_mod = LWS_lserv_mod; - context->lserv_count = 0; - context->lserv_fd = sockfd; + context->pt[m].lserv_fd = sockfd; #if LWS_POSIX - listen(sockfd, LWS_SOMAXCONN); + listen(wsi->sock, LWS_SOMAXCONN); + } /* for each thread able to independently lister */ #else - mbed3_tcp_stream_bind(sockfd, info->port, wsi); + mbed3_tcp_stream_bind(wsi->sock, info->port, wsi); #endif lwsl_notice(" Listening on port %d\n", info->port); @@ -147,15 +160,17 @@ bail: } int -_lws_server_listen_accept_flow_control(struct lws_context *context, int on) +_lws_server_listen_accept_flow_control(struct lws *twsi, int on) { - struct lws *wsi = context->wsi_listening; + struct lws_context_per_thread *pt = &twsi->context->pt[(int)twsi->tsi]; + struct lws *wsi = pt->wsi_listening; int n; - if (!wsi || context->being_destroyed) + if (!wsi || twsi->context->being_destroyed) return 0; - lwsl_debug("%s: wsi %p: state %d\n", __func__, (void *)wsi, on); + lwsl_debug("%s: Thr %d: LISTEN wsi %p: state %d\n", + __func__, twsi->tsi, (void *)wsi, on); if (on) n = lws_change_pollfd(wsi, 0, LWS_POLLIN); @@ -324,7 +339,7 @@ int lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len) char protocol_name[32]; char *p; - /* LWSCM_WS_SERVING */ + assert(wsi->u.hdr.ah); while (len--) { @@ -576,10 +591,33 @@ bail_nuke_ah: return 1; } +static int +lws_get_idlest_tsi(struct lws_context *context) +{ + unsigned int lowest = ~0; + int n = 0, hit = -1; + + for (; n < context->count_threads; n++) { + if ((unsigned int)context->pt[n].fds_count != context->fd_limit_per_thread - 1 && + (unsigned int)context->pt[n].fds_count < lowest) { + lowest = context->pt[n].fds_count; + hit = n; + } + } + + return hit; +} + struct lws * lws_create_new_server_wsi(struct lws_context *context) { struct lws *new_wsi; + int n = lws_get_idlest_tsi(context); + + if (n < 0) { + lwsl_err("no space for new conn\n"); + return NULL; + } new_wsi = lws_zalloc(sizeof(struct lws)); if (new_wsi == NULL) { @@ -587,6 +625,9 @@ lws_create_new_server_wsi(struct lws_context *context) return NULL; } + new_wsi->tsi = n; + lwsl_info("Accepted %p to tsi %d\n", new_wsi, new_wsi->tsi); + new_wsi->context = context; new_wsi->pending_timeout = NO_PENDING_TIMEOUT; new_wsi->rxflow_change_to = LWS_RXFLOW_ALLOW; @@ -601,11 +642,6 @@ lws_create_new_server_wsi(struct lws_context *context) new_wsi->use_ssl = LWS_SSL_ENABLED(context); #endif - if (lws_allocate_header_table(new_wsi)) { - lws_free(new_wsi); - return NULL; - } - /* * these can only be set once the protocol is known * we set an unestablished connection's protocol pointer @@ -617,12 +653,13 @@ lws_create_new_server_wsi(struct lws_context *context) new_wsi->ietf_spec_revision = 0; new_wsi->sock = LWS_SOCK_INVALID; + /* * outermost create notification for wsi * no user_space because no protocol selection */ - context->protocols[0].callback(new_wsi, LWS_CALLBACK_WSI_CREATE, NULL, - NULL, 0); + context->protocols[0].callback(new_wsi, LWS_CALLBACK_WSI_CREATE, + NULL, NULL, 0); return new_wsi; } @@ -642,7 +679,7 @@ lws_http_transaction_completed(struct lws *wsi) lwsl_debug("%s: wsi %p\n", __func__, wsi); /* if we can't go back to accept new headers, drop the connection */ if (wsi->u.http.connection_type != HTTP_CONNECTION_KEEP_ALIVE) { - lwsl_info("%s: close connection\n", __func__); + lwsl_info("%s: %p: close connection\n", __func__, wsi); return 1; } @@ -655,34 +692,75 @@ lws_http_transaction_completed(struct lws *wsi) lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0); if (lws_allocate_header_table(wsi)) - return 1; + lwsl_info("On waiting list for header table"); /* If we're (re)starting on headers, need other implied init */ wsi->u.hdr.ues = URIES_IDLE; - lwsl_info("%s: keep-alive await new transaction\n", __func__); + lwsl_info("%s: %p: keep-alive await new transaction\n", __func__, wsi); return 0; } -static int -lws_get_idlest_tsi(struct lws_context *context) +/* + * either returns new wsi bound to accept_fd, or closes accept_fd and + * returns NULL, having cleaned up any new wsi pieces + */ + +LWS_VISIBLE struct lws * +lws_adopt_socket(struct lws_context *context, lws_sockfd_type accept_fd) { - unsigned int lowest = ~0; - int n, hit = 0; + struct lws *new_wsi = lws_create_new_server_wsi(context); + if (!new_wsi) { + compatible_close(accept_fd); + return NULL; + } - for (n = 0; n < context->count_threads; n++) - if ((unsigned int)context->pt[n].fds_count < lowest) { - lowest = context->pt[n].fds_count; - hit = n; - } + new_wsi->sock = accept_fd; - return hit; + /* the transport is accepted... give him time to negotiate */ + lws_set_timeout(new_wsi, PENDING_TIMEOUT_ESTABLISH_WITH_SERVER, + AWAITING_TIMEOUT); + +#if LWS_POSIX == 0 + mbed3_tcp_stream_accept(accept_fd, new_wsi); +#endif + + /* + * A new connection was accepted. Give the user a chance to + * set properties of the newly created wsi. There's no protocol + * selected yet so we issue this to protocols[0] + */ + if ((context->protocols[0].callback)(new_wsi, + LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED, NULL, NULL, 0)) { + compatible_close(new_wsi->sock); + lws_free(new_wsi); + return NULL; + } + + lws_libev_accept(new_wsi, new_wsi->sock); + + if (!LWS_SSL_ENABLED(context)) { + if (insert_wsi_socket_into_fds(context, new_wsi)) + goto fail; + } else { + new_wsi->mode = LWSCM_SSL_INIT; + if (lws_server_socket_service_ssl(new_wsi, accept_fd)) + goto fail; + } + + return new_wsi; + +fail: + lwsl_err("%s: fail\n", __func__); + lws_close_free_wsi(new_wsi, LWS_CLOSE_STATUS_NOSTATUS); + + return NULL; } -LWS_VISIBLE -int lws_server_socket_service(struct lws_context *context, - struct lws *wsi, struct lws_pollfd *pollfd) +LWS_VISIBLE int +lws_server_socket_service(struct lws_context *context, struct lws *wsi, + struct lws_pollfd *pollfd) { lws_sockfd_type accept_fd = LWS_SOCK_INVALID; struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; @@ -690,7 +768,7 @@ int lws_server_socket_service(struct lws_context *context, struct sockaddr_in cli_addr; socklen_t clilen; #endif - struct lws *new_wsi = NULL; + int n, len; switch (wsi->mode) { @@ -698,6 +776,7 @@ int lws_server_socket_service(struct lws_context *context, case LWSCM_HTTP_SERVING: case LWSCM_HTTP_SERVING_ACCEPTED: case LWSCM_HTTP2_SERVING: + case LWSS_SHUTDOWN: /* handle http headers coming in */ @@ -720,15 +799,19 @@ int lws_server_socket_service(struct lws_context *context, /* any incoming data ready? */ - if (!(pollfd->revents & pollfd->events && LWS_POLLIN)) + if (!(pollfd->revents & pollfd->events & LWS_POLLIN)) goto try_pollout; + if (wsi->state == LWSS_HTTP && !wsi->u.hdr.ah) + if (lws_allocate_header_table(wsi)) + goto try_pollout; + len = lws_ssl_capable_read(wsi, pt->serv_buf, LWS_MAX_SOCKET_IO_BUF); - lwsl_debug("%s: read %d\r\n", __func__, len); + lwsl_debug("%s: wsi %p read %d\r\n", __func__, wsi, len); switch (len) { case 0: - lwsl_info("lws_server_skt_srv: read 0 len\n"); + lwsl_info("%s: read 0 len\n", __func__); /* lwsl_info(" state=%d\n", wsi->state); */ if (!wsi->hdr_parsing_completed) lws_free_header_table(wsi); @@ -748,7 +831,6 @@ int lws_server_socket_service(struct lws_context *context, n = lws_read(wsi, pt->serv_buf, len); if (n < 0) /* we closed wsi */ return 1; - /* hum he may have used up the * writability above */ break; @@ -761,8 +843,10 @@ try_pollout: break; /* one shot */ - if (lws_change_pollfd(wsi, LWS_POLLOUT, 0)) + if (lws_change_pollfd(wsi, LWS_POLLOUT, 0)) { + lwsl_notice("%s a\n", __func__); goto fail; + } lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_WRITE); @@ -771,15 +855,19 @@ try_pollout: wsi->protocol->callback, wsi, LWS_CALLBACK_HTTP_WRITEABLE, wsi->user_space, NULL, 0); - if (n < 0) + if (n < 0) { + lwsl_info("writeable_fail\n"); goto fail; + } break; } /* >0 == completion, <0 == error */ n = lws_serve_http_file_fragment(wsi); - if (n < 0 || (n > 0 && lws_http_transaction_completed(wsi))) + if (n < 0 || (n > 0 && lws_http_transaction_completed(wsi))) { + lwsl_info("completed\n"); goto fail; + } break; case LWSCM_SERVER_LISTENER: @@ -787,91 +875,65 @@ try_pollout: #if LWS_POSIX /* pollin means a client has connected to us then */ - if (!(pollfd->revents & LWS_POLLIN)) - break; + do { + if (!(pollfd->revents & LWS_POLLIN) || !(pollfd->events & LWS_POLLIN)) + break; - /* listen socket got an unencrypted connection... */ + /* listen socket got an unencrypted connection... */ - clilen = sizeof(cli_addr); - lws_latency_pre(context, wsi); - accept_fd = accept(pollfd->fd, (struct sockaddr *)&cli_addr, - &clilen); - lws_latency(context, wsi, - "unencrypted accept LWSCM_SERVER_LISTENER", - accept_fd, accept_fd >= 0); - if (accept_fd < 0) { - if (LWS_ERRNO == LWS_EAGAIN || - LWS_ERRNO == LWS_EWOULDBLOCK) { - lwsl_debug("accept asks to try again\n"); + clilen = sizeof(cli_addr); + lws_latency_pre(context, wsi); + accept_fd = accept(pollfd->fd, (struct sockaddr *)&cli_addr, + &clilen); + lws_latency(context, wsi, "listener accept", accept_fd, + accept_fd >= 0); + if (accept_fd < 0) { + if (LWS_ERRNO == LWS_EAGAIN || + LWS_ERRNO == LWS_EWOULDBLOCK) { + lwsl_err("accept asks to try again\n"); + break; + } + lwsl_err("ERROR on accept: %s\n", strerror(LWS_ERRNO)); break; } - lwsl_warn("ERROR on accept: %s\n", strerror(LWS_ERRNO)); - break; - } - lws_plat_set_socket_options(context, accept_fd); -#else - /* not very beautiful... */ - accept_fd = (lws_sockfd_type)pollfd; -#endif - /* - * look at who we connected to and give user code a chance - * to reject based on client IP. There's no protocol selected - * yet so we issue this to protocols[0] - */ + lws_plat_set_socket_options(context, accept_fd); - if ((context->protocols[0].callback)(wsi, - LWS_CALLBACK_FILTER_NETWORK_CONNECTION, - NULL, (void *)(long)accept_fd, 0)) { - lwsl_debug("Callback denied network connection\n"); - compatible_close(accept_fd); - break; - } - - new_wsi = lws_create_new_server_wsi(context); - if (new_wsi == NULL) { - compatible_close(accept_fd); - break; - } - - new_wsi->sock = accept_fd; - new_wsi->tsi = lws_get_idlest_tsi(context); - lwsl_info("Accepted to tsi %d\n", new_wsi->tsi); - - /* the transport is accepted... give him time to negotiate */ - lws_set_timeout(new_wsi, PENDING_TIMEOUT_ESTABLISH_WITH_SERVER, - AWAITING_TIMEOUT); - -#if LWS_POSIX == 0 - mbed3_tcp_stream_accept(accept_fd, new_wsi); -#endif - - /* - * A new connection was accepted. Give the user a chance to - * set properties of the newly created wsi. There's no protocol - * selected yet so we issue this to protocols[0] - */ - (context->protocols[0].callback)(new_wsi, - LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED, - NULL, NULL, 0); - - lws_libev_accept(new_wsi, accept_fd); - - if (!LWS_SSL_ENABLED(context)) { -#if LWS_POSIX lwsl_debug("accepted new conn port %u on fd=%d\n", ntohs(cli_addr.sin_port), accept_fd); + +#else + /* not very beautiful... */ + accept_fd = (lws_sockfd_type)pollfd; #endif - if (insert_wsi_socket_into_fds(context, new_wsi)) - goto fail; - } - break; + /* + * look at who we connected to and give user code a chance + * to reject based on client IP. There's no protocol selected + * yet so we issue this to protocols[0] + */ + if ((context->protocols[0].callback)(wsi, + LWS_CALLBACK_FILTER_NETWORK_CONNECTION, + NULL, (void *)(long)accept_fd, 0)) { + lwsl_debug("Callback denied network connection\n"); + compatible_close(accept_fd); + break; + } + + if (!lws_adopt_socket(context, accept_fd)) + /* already closed cleanly as necessary */ + return 1; + +#if LWS_POSIX + } while (pt->fds_count < context->fd_limit_per_thread - 1 && + lws_poll_listen_fd(&pt->fds[wsi->position_in_fds_table]) > 0); +#endif + return 0; default: break; } - if (!lws_server_socket_service_ssl(&wsi, new_wsi, accept_fd, pollfd)) + if (!lws_server_socket_service_ssl(wsi, accept_fd)) return 0; fail: diff --git a/lib/service.c b/lib/service.c index 098170ba..ef930551 100644 --- a/lib/service.c +++ b/lib/service.c @@ -294,6 +294,9 @@ notify: int lws_service_timeout_check(struct lws *wsi, unsigned int sec) { + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + struct lws **pwsi; + /* * if extensions want in on it (eg, we are a mux parent) * give them a chance to service child timeouts @@ -309,8 +312,25 @@ lws_service_timeout_check(struct lws *wsi, unsigned int sec) * connection */ if ((time_t)sec > wsi->pending_timeout_limit) { - lwsl_info("wsi %p: TIMEDOUT WAITING on %d\n", - (void *)wsi, wsi->pending_timeout); +#if LWS_POSIX + lwsl_notice("wsi %p: TIMEDOUT WAITING on %d (did hdr %d, ah %p, wl %d, pfd events %d)\n", + (void *)wsi, wsi->pending_timeout, + wsi->hdr_parsing_completed, wsi->u.hdr.ah, + pt->ah_wait_list_length, + pt->fds[wsi->sock].events); +#endif + lws_pt_lock(pt); + + pwsi = &pt->ah_wait_list; + while (*pwsi) { + if (*pwsi == wsi) + break; + pwsi = &(*pwsi)->u.hdr.ah_wait_list; + } + lws_pt_unlock(pt); + + if (!*pwsi) + lwsl_err("*** not on ah wait list ***\n"); /* * Since he failed a timeout, he already had a chance to do * something and was unable to... that includes situations like @@ -372,9 +392,6 @@ int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len) LWS_VISIBLE int lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int tsi) { -#if LWS_POSIX - int idx = 0; -#endif struct lws_context_per_thread *pt = &context->pt[tsi]; lws_sockfd_type our_fd = 0, tmp_fd; struct lws_tokens eff_buf; @@ -386,10 +403,6 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t int n, m; int more; -#if LWS_POSIX - if (context->lserv_fd) - idx = wsi_from_fd(context, context->lserv_fd)->position_in_fds_table; -#endif /* * you can call us with pollfd = NULL to just allow the once-per-second * global timeout checks; if less than a second since the last check @@ -409,7 +422,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t if (pollfd) our_fd = pollfd->fd; - wsi = context->timeout_list; + wsi = context->pt[tsi].timeout_list; while (wsi) { /* we have to take copies, because he may be deleted */ wsi1 = wsi->timeout_list; @@ -423,6 +436,18 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t } wsi = wsi1; } +#if 1 + { + char s[300], *p = s; + + for (n = 0; n < context->count_threads; n++) + p += sprintf(p, " %7lu (%5d), ", + context->pt[n].count_conns, + context->pt[n].fds_count); + + lwsl_notice("load: %s\n", s); + } +#endif } /* the socket we came to service timed out, nothing to do */ @@ -445,44 +470,10 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t */ #if LWS_POSIX - /* - * deal with listen service piggybacking - * every lserv_mod services of other fds, we - * sneak one in to service the listen socket if there's anything waiting - * - * To handle connection storms, as found in ab, if we previously saw a - * pending connection here, it causes us to check again next time. - */ - - if (context->lserv_fd && pollfd != &pt->fds[idx]) { - context->lserv_count++; - if (context->lserv_seen || - context->lserv_count == context->lserv_mod) { - context->lserv_count = 0; - m = 1; - if (context->lserv_seen > 5) - m = 2; - while (m--) { - /* - * even with extpoll, we prepared this - * internal fds for listen - */ - n = lws_poll_listen_fd(&pt->fds[idx]); - if (n <= 0) { - if (context->lserv_seen) - context->lserv_seen--; - break; - } - /* there's a conn waiting for us */ - lws_service_fd(context, &pt->fds[idx]); - context->lserv_seen++; - } - } - } /* handle session socket closed */ - if ((!(pollfd->revents & LWS_POLLIN)) && + if ((!(pollfd->revents & pollfd->events & LWS_POLLIN)) && (pollfd->revents & LWS_POLLHUP)) { lwsl_debug("Session Socket %p (fd=%d) dead\n", diff --git a/lib/ssl.c b/lib/ssl.c index 3b66ccfb..bb9613a9 100644 --- a/lib/ssl.c +++ b/lib/ssl.c @@ -592,11 +592,8 @@ lws_ssl_close(struct lws *wsi) /* leave all wsi close processing to the caller */ LWS_VISIBLE int -lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi, - lws_sockfd_type accept_fd, - struct lws_pollfd *pollfd) +lws_server_socket_service_ssl(struct lws *wsi, lws_sockfd_type accept_fd) { - struct lws *wsi = *pwsi; struct lws_context *context = wsi->context; struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; int n, m; @@ -606,43 +603,41 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi, if (!LWS_SSL_ENABLED(context)) return 0; - +lwsl_err("%s: mode %d, state %d\n", __func__, wsi->mode, wsi->state); switch (wsi->mode) { - case LWSCM_SERVER_LISTENER: + case LWSCM_SSL_INIT: - if (!new_wsi) { - lwsl_err("no new_wsi\n"); + if (!wsi) return 0; - } - new_wsi->ssl = SSL_new(context->ssl_ctx); - if (new_wsi->ssl == NULL) { + wsi->ssl = SSL_new(context->ssl_ctx); + if (wsi->ssl == NULL) { lwsl_err("SSL_new failed: %s\n", - ERR_error_string(SSL_get_error(new_wsi->ssl, 0), NULL)); + ERR_error_string(SSL_get_error(wsi->ssl, 0), NULL)); lws_decode_ssl_error(); compatible_close(accept_fd); goto fail; } - SSL_set_ex_data(new_wsi->ssl, + SSL_set_ex_data(wsi->ssl, openssl_websocket_private_data_index, context); - SSL_set_fd(new_wsi->ssl, accept_fd); + SSL_set_fd(wsi->ssl, accept_fd); #ifdef USE_WOLFSSL #ifdef USE_OLD_CYASSL - CyaSSL_set_using_nonblock(new_wsi->ssl, 1); + CyaSSL_set_using_nonblock(wsi->ssl, 1); #else - wolfSSL_set_using_nonblock(new_wsi->ssl, 1); + wolfSSL_set_using_nonblock(wsi->ssl, 1); #endif #else - SSL_set_mode(new_wsi->ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); - bio = SSL_get_rbio(new_wsi->ssl); + SSL_set_mode(wsi->ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); + bio = SSL_get_rbio(wsi->ssl); if (bio) BIO_set_nbio(bio, 1); /* nonblocking */ else lwsl_notice("NULL rbio\n"); - bio = SSL_get_wbio(new_wsi->ssl); + bio = SSL_get_wbio(wsi->ssl); if (bio) BIO_set_nbio(bio, 1); /* nonblocking */ else @@ -655,8 +650,6 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi, * pieces come if we're not sorted yet */ - *pwsi = new_wsi; - wsi = *pwsi; wsi->mode = LWSCM_SSL_ACK_PENDING; if (insert_wsi_socket_into_fds(context, wsi)) goto fail; @@ -754,7 +747,7 @@ go_again: break; } lwsl_debug("SSL_accept failed skt %u: %s\n", - pollfd->fd, ERR_error_string(m, NULL)); + wsi->sock, ERR_error_string(m, NULL)); goto fail; accepted: diff --git a/test-server/test-server-http.c b/test-server/test-server-http.c index 4c93496f..7fd421e4 100644 --- a/test-server/test-server-http.c +++ b/test-server/test-server-http.c @@ -35,6 +35,8 @@ * using this protocol, including the sender */ +extern int debug_level; + enum demo_protocols { /* always first */ PROTOCOL_HTTP = 0, @@ -117,8 +119,8 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, { struct per_session_data__http *pss = (struct per_session_data__http *)user; - static unsigned char buffer[4096]; - unsigned long amount, file_len; + unsigned char buffer[4096 + LWS_PRE]; + unsigned long amount, file_len, sent; char leaf_path[1024]; const char *mimetype; char *other_headers; @@ -136,15 +138,16 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, switch (reason) { case LWS_CALLBACK_HTTP: - dump_handshake_info(wsi); + if (debug_level & LLL_INFO) { + dump_handshake_info(wsi); - /* dump the individual URI Arg parameters */ - n = 0; - while (lws_hdr_copy_fragment(wsi, buf, sizeof(buf), - WSI_TOKEN_HTTP_URI_ARGS, n) > 0) { - lwsl_info("URI Arg %d: %s\n", ++n, buf); + /* dump the individual URI Arg parameters */ + n = 0; + while (lws_hdr_copy_fragment(wsi, buf, sizeof(buf), + WSI_TOKEN_HTTP_URI_ARGS, n) > 0) { + lwsl_info("URI Arg %d: %s\n", ++n, buf); + } } - if (len < 1) { lws_return_http_status(wsi, HTTP_STATUS_BAD_REQUEST, NULL); @@ -153,8 +156,7 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, /* this example server has no concept of directories */ if (strchr((const char *)in + 1, '/')) { - lws_return_http_status(wsi, - HTTP_STATUS_FORBIDDEN, NULL); + lws_return_http_status(wsi, HTTP_STATUS_FORBIDDEN, NULL); goto try_to_reuse; } @@ -177,8 +179,10 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, pss->fd = lws_plat_file_open(wsi, leaf_path, &file_len, LWS_O_RDONLY); - if (pss->fd == LWS_INVALID_FILE) + if (pss->fd == LWS_INVALID_FILE) { + lwsl_err("faild to open file %s\n", leaf_path); return -1; + } /* * we will send a big jpeg file, but it could be @@ -219,9 +223,11 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, * this is mandated by changes in HTTP2 */ + *p = '\0'; + lwsl_info("%s\n", buffer + LWS_PRE); + n = lws_write(wsi, buffer + LWS_PRE, p - (buffer + LWS_PRE), LWS_WRITE_HTTP_HEADERS); - if (n < 0) { lws_plat_file_close(wsi, pss->fd); return -1; @@ -284,7 +290,6 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, * we'll get a LWS_CALLBACK_HTTP_FILE_COMPLETION callback when * it's done */ - break; case LWS_CALLBACK_HTTP_BODY: @@ -308,9 +313,15 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, goto try_to_reuse; case LWS_CALLBACK_HTTP_WRITEABLE: + lwsl_info("LWS_CALLBACK_HTTP_WRITEABLE\n"); + + if (pss->fd == LWS_INVALID_FILE) + goto try_to_reuse; + /* * we can send more of whatever it is we were sending */ + sent = 0; do { /* we'd like the send this much */ n = sizeof(buffer) - LWS_PRE; @@ -328,15 +339,16 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, n = m; n = lws_plat_file_read(wsi, pss->fd, - &amount, buffer + - LWS_PRE, n); + &amount, buffer + LWS_PRE, n); /* problem reading, close conn */ - if (n < 0) + if (n < 0) { + lwsl_err("problem reading file\n"); goto bail; + } n = (int)amount; /* sent it all, close conn */ if (n == 0) - goto flush_bail; + goto penultimate; /* * To support HTTP2, must take care about preamble space * @@ -344,46 +356,28 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, * is handled by the library itself if you sent a * content-length header */ - m = lws_write(wsi, buffer + LWS_PRE, - n, LWS_WRITE_HTTP); - if (m < 0) + m = lws_write(wsi, buffer + LWS_PRE, n, LWS_WRITE_HTTP); + if (m < 0) { + lwsl_err("write failed\n"); /* write failed, close conn */ goto bail; - - /* - * http2 won't do this - */ - if (m != n) - /* partial write, adjust */ - if (lws_plat_file_seek_cur(wsi, pss->fd, m - n) == - (unsigned long)-1) - goto bail; - + } if (m) /* while still active, extend timeout */ - lws_set_timeout(wsi, - PENDING_TIMEOUT_HTTP_CONTENT, 5); - - /* if we have indigestion, let him clear it - * before eating more */ - if (lws_partial_buffered(wsi)) - break; - - } while (!lws_send_pipe_choked(wsi)); + lws_set_timeout(wsi, PENDING_TIMEOUT_HTTP_CONTENT, 5); + sent += m; + } while (!lws_send_pipe_choked(wsi) && (sent < 1024 * 1024)); later: lws_callback_on_writable(wsi); break; -flush_bail: - /* true if still partial pending */ - if (lws_partial_buffered(wsi)) { - lws_callback_on_writable(wsi); - break; - } +penultimate: lws_plat_file_close(wsi, pss->fd); + pss->fd = LWS_INVALID_FILE; goto try_to_reuse; bail: lws_plat_file_close(wsi, pss->fd); + return -1; /* diff --git a/test-server/test-server-libev.c b/test-server/test-server-libev.c index de9e12ef..8cb13928 100644 --- a/test-server/test-server-libev.c +++ b/test-server/test-server-libev.c @@ -23,6 +23,7 @@ int close_testing; int max_poll_elements; +int debug_level = 7; volatile int force_exit = 0; struct lws_context *context; struct lws_plat_file_ops fops_plat; @@ -192,7 +193,6 @@ int main(int argc, char **argv) ev_timer timeout_watcher; char cert_path[1024]; char key_path[1024]; - int debug_level = 7; int use_ssl = 0; int opts = 0; int n = 0; diff --git a/test-server/test-server-mirror.c b/test-server/test-server-mirror.c index 365333d4..274c53b6 100644 --- a/test-server/test-server-mirror.c +++ b/test-server/test-server-mirror.c @@ -22,7 +22,7 @@ /* lws-mirror_protocol */ -#define MAX_MESSAGE_QUEUE 32 +#define MAX_MESSAGE_QUEUE 512 struct a_message { void *payload; @@ -38,7 +38,7 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, { struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user; - int n; + int n, m; switch (reason) { @@ -59,19 +59,16 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, if (close_testing) break; while (pss->ringbuffer_tail != ringbuffer_head) { - + m = ringbuffer[pss->ringbuffer_tail].len; n = lws_write(wsi, (unsigned char *) ringbuffer[pss->ringbuffer_tail].payload + - LWS_PRE, - ringbuffer[pss->ringbuffer_tail].len, - LWS_WRITE_TEXT); + LWS_PRE, m, LWS_WRITE_TEXT); if (n < 0) { lwsl_err("ERROR %d writing to mirror socket\n", n); return -1; } - if (n < (int)ringbuffer[pss->ringbuffer_tail].len) - lwsl_err("mirror partial write %d vs %d\n", - n, ringbuffer[pss->ringbuffer_tail].len); + if (n < m) + lwsl_err("mirror partial write %d vs %d\n", n, m); if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) pss->ringbuffer_tail = 0; @@ -83,8 +80,7 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); - if (lws_partial_buffered(wsi) || - lws_send_pipe_choked(wsi)) { + if (lws_send_pipe_choked(wsi)) { lws_callback_on_writable(wsi); break; } @@ -101,11 +97,10 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, if (ringbuffer[ringbuffer_head].payload) free(ringbuffer[ringbuffer_head].payload); - ringbuffer[ringbuffer_head].payload = - malloc(LWS_PRE + len); + ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len); ringbuffer[ringbuffer_head].len = len; memcpy((char *)ringbuffer[ringbuffer_head].payload + - LWS_PRE, in, len); + LWS_PRE, in, len); if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) ringbuffer_head = 0; else diff --git a/test-server/test-server-pthreads.c b/test-server/test-server-pthreads.c index 317e4326..af5bfc17 100644 --- a/test-server/test-server-pthreads.c +++ b/test-server/test-server-pthreads.c @@ -24,6 +24,7 @@ int close_testing; int max_poll_elements; +int debug_level = 7; #ifdef EXTERNAL_POLL struct lws_pollfd *pollfds; @@ -185,7 +186,6 @@ int main(int argc, char **argv) pthread_t pthread_dumb, pthread_service[32]; char cert_path[1024]; char key_path[1024]; - int debug_level = 7; int threads = 1; int use_ssl = 0; void *retval; @@ -335,6 +335,7 @@ int main(int argc, char **argv) info.options = opts; info.count_threads = threads; info.extensions = exts; + info.max_http_header_pool = 4; context = lws_create_context(&info); if (context == NULL) { diff --git a/test-server/test-server.c b/test-server/test-server.c index cd31082f..56794595 100644 --- a/test-server/test-server.c +++ b/test-server/test-server.c @@ -23,6 +23,7 @@ int close_testing; int max_poll_elements; +int debug_level = 7; #ifdef EXTERNAL_POLL struct lws_pollfd *pollfds; @@ -171,7 +172,6 @@ int main(int argc, char **argv) const char *iface = NULL; char cert_path[1024]; char key_path[1024]; - int debug_level = 7; int use_ssl = 0; int opts = 0; int n = 0;