diff --git a/lib/client-handshake.c b/lib/client-handshake.c index 75d20322..7ec045cf 100644 --- a/lib/client-handshake.c +++ b/lib/client-handshake.c @@ -301,7 +301,9 @@ lws_client_connect_2(struct lws *wsi) return wsi; oom4: - lws_free_header_table(wsi); + /* we're closing, losing some rx is OK */ + wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen; + lws_header_table_detach(wsi); lws_free(wsi); return NULL; @@ -402,7 +404,7 @@ lws_client_connect_via_info(struct lws_client_connect_info *i) } #endif - if (lws_allocate_header_table(wsi)) + if (lws_header_table_attach(wsi)) goto bail; /* @@ -465,7 +467,9 @@ lws_client_connect_via_info(struct lws_client_connect_info *i) return lws_client_connect_2(wsi); bail1: - lws_free_header_table(wsi); + /* we're closing, losing some rx is OK */ + wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen; + lws_header_table_detach(wsi); bail: lws_free(wsi); diff --git a/lib/client.c b/lib/client.c index fd9fb578..28affab3 100644 --- a/lib/client.c +++ b/lib/client.c @@ -798,7 +798,7 @@ check_accept: lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0); /* free up his parsing allocations */ - lws_free_header_table(wsi); + lws_header_table_detach(wsi); lws_union_transition(wsi, LWSCM_WS_CLIENT); wsi->state = LWSS_ESTABLISHED; diff --git a/lib/handshake.c b/lib/handshake.c index 310fd3c5..faa187e5 100644 --- a/lib/handshake.c +++ b/lib/handshake.c @@ -55,12 +55,14 @@ * into multiple fragments. They may contain unknown headers with arbitrary * argument lengths. So, we parse using a single-character at a time state * machine that is completely independent of packet size. + * + * Returns <0 for error or length of chars consumed from buf (up to len) */ LWS_VISIBLE int lws_read(struct lws *wsi, unsigned char *buf, size_t len) { - unsigned char *last_char; + unsigned char *last_char, *oldbuf = buf; int body_chunk_len; size_t n; @@ -90,7 +92,7 @@ lws_read(struct lws *wsi, unsigned char *buf, size_t len) } break; #endif -http_new: + case LWSS_HTTP: wsi->hdr_parsing_completed = 0; /* fallthru */ @@ -203,9 +205,9 @@ postbody_completion: read_ok: /* Nothing more to do for now */ - lwsl_debug("%s: read_ok\n", __func__); + lwsl_info("%s: read_ok, used %d\n", __func__, buf - oldbuf); - return 0; + return buf - oldbuf; http_complete: lwsl_debug("%s: http_complete\n", __func__); @@ -215,11 +217,11 @@ http_complete: if (lws_http_transaction_completed(wsi)) goto bail; #endif - /* If we have more data, loop back around: */ - if (len) - goto http_new; - - return 0; + /* we may have next header set already, but return to event loop first + * so a heaily-pipelined http/1.1 connection cannot monopolize the + * service thread with GET hugefile.bin GET hugefile.bin etc + */ + goto read_ok; bail: lwsl_debug("closing connection at lws_read bail:\n"); diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index a5cc13ca..e8b84791 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -52,18 +52,12 @@ lws_free_wsi(struct lws *wsi) lws_free_set_NULL(wsi->rxflow_buffer); lws_free_set_NULL(wsi->trunc_alloc); - /* - * These union members have an ah at the start - * - * struct _lws_http_mode_related http; - * struct _lws_http2_related http2; - * struct _lws_header_related hdr; - * - * basically ws-related union member does not - */ - if (wsi->mode != LWSCM_WS_CLIENT && - wsi->mode != LWSCM_WS_SERVING) - lws_free_header_table(wsi); + + if (wsi->u.hdr.ah) { + /* we're closing, losing some rx is OK */ + wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen; + lws_header_table_detach(wsi); + } wsi->context->count_wsi_allocated--; lwsl_debug("%s: %p, remaining wsi %d\n", __func__, wsi, @@ -411,10 +405,12 @@ just_kill_connection: wsi->socket_is_permanently_unusable = 1; #ifdef LWS_USE_LIBUV - /* libuv has to do his own close handle processing asynchronously */ - lws_libuv_closehandle(wsi); + if (LWS_LIBUV_ENABLED(context)) { + /* libuv has to do his own close handle processing asynchronously */ + lws_libuv_closehandle(wsi); - return; + return; + } #endif lws_close_free_wsi_final(wsi); diff --git a/lib/lws-plat-unix.c b/lib/lws-plat-unix.c index eb09ce84..ab8c9b13 100644 --- a/lib/lws-plat-unix.c +++ b/lib/lws-plat-unix.c @@ -122,12 +122,8 @@ LWS_VISIBLE int lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) { struct lws_context_per_thread *pt = &context->pt[tsi]; - struct lws *wsi; int n, m, c; char buf; -#ifdef LWS_OPENSSL_SUPPORT - struct lws *wsi_next; -#endif /* stay dead once we are dead */ @@ -148,17 +144,7 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) } context->service_tid = context->service_tid_detected; - /* if we know we are draining rx ext, do not wait in poll */ - if (pt->rx_draining_ext_list) - timeout_ms = 0; - -#ifdef LWS_OPENSSL_SUPPORT - /* if we know we have non-network pending data, do not wait in poll */ - if (lws_ssl_anybody_has_buffered_read_tsi(context, tsi)) { - timeout_ms = 0; - lwsl_err("ssl buffered read\n"); - } -#endif + timeout_ms = lws_service_adjust_timeout(context, timeout_ms, tsi); n = poll(pt->fds, pt->fds_count, timeout_ms); @@ -178,46 +164,11 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) return 0; } - /* - * For all guys with already-available ext data to drain, if they are - * not flowcontrolled, fake their POLLIN status - */ - wsi = pt->rx_draining_ext_list; - while (wsi) { - pt->fds[wsi->position_in_fds_table].revents |= - pt->fds[wsi->position_in_fds_table].events & POLLIN; - wsi = wsi->u.ws.rx_draining_ext_list; - } + c = n; -#ifdef LWS_OPENSSL_SUPPORT - /* - * For all guys with buffered SSL read data already saved up, if they - * are not flowcontrolled, fake their POLLIN status so they'll get - * service to use up the buffered incoming data, even though their - * network socket may have nothing - */ - - wsi = pt->pending_read_list; - while (wsi) { - wsi_next = wsi->pending_read_list_next; - pt->fds[wsi->position_in_fds_table].revents |= - pt->fds[wsi->position_in_fds_table].events & POLLIN; - if (pt->fds[wsi->position_in_fds_table].revents & POLLIN) - /* - * he's going to get serviced now, take him off the - * list of guys with buffered SSL. If he still has some - * at the end of the service, he'll get put back on the - * list then. - */ - lws_ssl_remove_wsi_from_buffered_list(wsi); - - wsi = wsi_next; - } -#endif + lws_service_flag_pending(context, tsi); /* any socket with events to service? */ - - c = n; for (n = 0; n < pt->fds_count && c; n++) { if (!pt->fds[n].revents) continue; @@ -383,7 +334,8 @@ lws_plat_context_late_destroy(struct lws_context *context) /* cast a struct sockaddr_in6 * into addr for ipv6 */ LWS_VISIBLE int -lws_interface_to_sa(int ipv6, const char *ifname, struct sockaddr_in *addr, size_t addrlen) +lws_interface_to_sa(int ipv6, const char *ifname, struct sockaddr_in *addr, + size_t addrlen) { int rc = -1; diff --git a/lib/lws-plat-win.c b/lib/lws-plat-win.c index 8b481e79..6fcf6dab 100644 --- a/lib/lws-plat-win.c +++ b/lib/lws-plat-win.c @@ -153,16 +153,15 @@ LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line) LWS_VISIBLE int lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) { - int n; - unsigned int i; - DWORD ev; + struct lws_context_per_thread *pt = &context->pt[tsi]; WSANETWORKEVENTS networkevents; struct lws_pollfd *pfd; struct lws *wsi; - struct lws_context_per_thread *pt = &context->pt[tsi]; + unsigned int i; + DWORD ev; + int n, m; /* stay dead once we are dead */ - if (context == NULL) return 1; @@ -196,6 +195,9 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) } } + /* if we know something needs service already, don't wait in poll */ + timeout_ms = lws_service_adjust_timeout(context, timeout_ms, tsi); + ev = WSAWaitForMultipleEvents(pt->fds_count + 1, pt->events, FALSE, timeout_ms, FALSE); context->service_tid = 0; @@ -215,8 +217,9 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) pfd = &pt->fds[ev - WSA_WAIT_EVENT_0 - 1]; - if (WSAEnumNetworkEvents(pfd->fd, - pt->events[ev - WSA_WAIT_EVENT_0], + /* eh... is one event at a time the best windows can do? */ + + if (WSAEnumNetworkEvents(pfd->fd, pt->events[ev - WSA_WAIT_EVENT_0], &networkevents) == SOCKET_ERROR) { lwsl_err("WSAEnumNetworkEvents() failed with error %d\n", LWS_ERRNO); @@ -231,7 +234,27 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) wsi->sock_send_blocking = 0; } - return lws_service_fd(context, pfd); + /* if someone faked their LWS_POLLIN, then go through all active fds */ + + if (lws_service_flag_pending(context, tsi)) { + /* any socket with events to service? */ + for (n = 0; n < (int)pt->fds_count; n++) { + if (!pt->fds[n].revents) + continue; + + m = lws_service_fd_tsi(context, &pt->fds[n], tsi); + if (m < 0) + return -1; + /* if something closed, retry this slot */ + if (m) + n--; + } + return 0; + } + + /* otherwise just do the one... must be a way to improve that... */ + + return lws_service_fd_tsi(context, pfd, tsi); } LWS_VISIBLE int diff --git a/lib/parsers.c b/lib/parsers.c index d556b9fb..55c5dacc 100644 --- a/lib/parsers.c +++ b/lib/parsers.c @@ -61,19 +61,30 @@ lextable_decode(int pos, char c) } void -lws_reset_header_table(struct lws *wsi) +lws_header_table_reset(struct lws *wsi) { - if (!wsi->u.hdr.ah) - return; + struct allocated_headers *ah = wsi->u.hdr.ah; + + /* if we have the idea we're resetting 'our' ah, must be bound to one */ + assert(ah); + /* ah also concurs with ownership */ + assert(ah->wsi == wsi); /* init the ah to reflect no headers or data have appeared yet */ - memset(wsi->u.hdr.ah->frag_index, 0, sizeof(wsi->u.hdr.ah->frag_index)); - wsi->u.hdr.ah->nfrag = 0; - wsi->u.hdr.ah->pos = 0; + memset(ah->frag_index, 0, sizeof(ah->frag_index)); + ah->nfrag = 0; + ah->pos = 0; + + /* and reset the rx state */ + ah->rxpos = 0; + ah->rxlen = 0; + + /* since we will restart the ah, our new headers are not completed */ + wsi->hdr_parsing_completed = 0; } int LWS_WARN_UNUSED_RESULT -lws_allocate_header_table(struct lws *wsi) +lws_header_table_attach(struct lws *wsi) { struct lws_context *context = wsi->context; struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; @@ -133,6 +144,7 @@ lws_allocate_header_table(struct lws *wsi) wsi->u.hdr.ah = &pt->ah_pool[n]; wsi->u.hdr.ah->in_use = 1; + pt->ah_pool[n].wsi = wsi; /* mark our owner */ pt->ah_count_in_use++; _lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa); @@ -143,7 +155,7 @@ lws_allocate_header_table(struct lws *wsi) lws_pt_unlock(pt); reset: - lws_reset_header_table(wsi); + lws_header_table_reset(wsi); time(&wsi->u.hdr.ah->assigned); return 0; @@ -154,7 +166,7 @@ bail: return 1; } -int lws_free_header_table(struct lws *wsi) +int lws_header_table_detach(struct lws *wsi) { struct lws_context *context = wsi->context; struct allocated_headers *ah = wsi->u.hdr.ah; @@ -163,8 +175,18 @@ int lws_free_header_table(struct lws *wsi) struct lws **pwsi; time_t now; - lwsl_info("%s: wsi %p: ah %p (tsi=%d, count = %d)\n", __func__, (void *)wsi, - (void *)wsi->u.hdr.ah, wsi->tsi, pt->ah_count_in_use); + lwsl_info("%s: wsi %p: ah %p (tsi=%d, count = %d)\n", __func__, + (void *)wsi, (void *)wsi->u.hdr.ah, wsi->tsi, + pt->ah_count_in_use); + + assert(ah); + + /* may not be detached while he still has unprocessed rx */ + if (ah->rxpos != ah->rxlen) { + lwsl_err("%s: %p: rxpos:%d, rxlen:%d\n", __func__, wsi, + ah->rxpos, ah->rxlen); + assert(ah->rxpos == ah->rxlen); + } lws_pt_lock(pt); @@ -173,8 +195,8 @@ int lws_free_header_table(struct lws *wsi) if (wsi->socket_is_permanently_unusable) while (*pwsi) { if (*pwsi == wsi) { - lwsl_info("%s: wsi %p, removing from wait list\n", - __func__, wsi); + lwsl_info("%s: wsi %p, remv wait\n", + __func__, wsi); *pwsi = wsi->u.hdr.ah_wait_list; wsi->u.hdr.ah_wait_list = NULL; pt->ah_wait_list_length--; @@ -189,11 +211,13 @@ int lws_free_header_table(struct lws *wsi) if (now - wsi->u.hdr.ah->assigned > 3) lwsl_err("header assign - free time %d\n", (int)(now - wsi->u.hdr.ah->assigned)); - /* if we think we're freeing one, there should be one to free */ + + /* if we think we're detaching one, there should be one in use */ assert(pt->ah_count_in_use > 0); - /* and he should have been in use */ + /* and this specific one should have been in use */ assert(wsi->u.hdr.ah->in_use); wsi->u.hdr.ah = NULL; + ah->wsi = NULL; /* no owner */ if (!*pwsi) { ah->in_use = 0; @@ -212,13 +236,17 @@ int lws_free_header_table(struct lws *wsi) lwsl_info("last wsi in wait list %p\n", wsi); wsi->u.hdr.ah = ah; - lws_reset_header_table(wsi); + ah->wsi = wsi; /* new owner */ + lws_header_table_reset(wsi); time(&wsi->u.hdr.ah->assigned); assert(wsi->position_in_fds_table != -1); lwsl_info("%s: Enabling %p POLLIN\n", __func__, wsi); - /* his wait is over, let him progress */ + + /* he has been stuck waiting for an ah, but now his wait is over, + * let him progress + */ _lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa); /* point prev guy to next guy in list instead */ @@ -459,7 +487,8 @@ issue_char(struct lws *wsi, unsigned char c) return -1; wsi->u.hdr.ah->data[wsi->u.hdr.ah->pos++] = '\0'; lwsl_warn("header %i exceeds limit %d\n", - wsi->u.hdr.parser_state, wsi->u.hdr.current_token_limit); + wsi->u.hdr.parser_state, + wsi->u.hdr.current_token_limit); } return 1; @@ -679,7 +708,7 @@ check_eol: /* bail at EOL */ if (wsi->u.hdr.parser_state != WSI_TOKEN_CHALLENGE && - c == '\x0d') { + c == '\x0d') { c = '\0'; wsi->u.hdr.parser_state = WSI_TOKEN_SKIPPING_SAW_CR; lwsl_parser("*\n"); diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index bdfecdac..09d140f5 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -489,6 +489,7 @@ struct lws_fragments { */ struct allocated_headers { + struct lws *wsi; /* owner */ char *data; /* prepared by context init to point to dedicated storage */ /* * the randomly ordered fragments, indexed by frag_index and @@ -502,6 +503,10 @@ struct allocated_headers { * the actual header data gets dumped as it comes in, into data[] */ unsigned char frag_index[WSI_TOKEN_COUNT]; + unsigned char rx[2048]; + unsigned int rxpos; + unsigned int rxlen; + #ifndef LWS_NO_CLIENT char initial_handshake_hash_base64[30]; unsigned short c_port; @@ -946,6 +951,8 @@ struct _lws_header_related { }; struct _lws_websocket_related { + /* cheapest way to deal with ah overlap with ws union transition */ + struct _lws_header_related *hdr; char *rx_ubuf; unsigned int rx_ubuf_alloc; struct lws *rx_draining_ext_list; @@ -1119,6 +1126,12 @@ lws_http_action(struct lws *wsi); LWS_EXTERN int lws_b64_selftest(void); +LWS_EXTERN int +lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi); + +LWS_EXTERN int +lws_service_flag_pending(struct lws_context *context, int tsi); + #if defined(_WIN32) || defined(MBED_OPERATORS) LWS_EXTERN struct lws * wsi_from_fd(const struct lws_context *context, lws_sockfd_type fd); @@ -1244,13 +1257,13 @@ LWS_EXTERN int lws_plat_set_socket_options(struct lws_context *context, lws_sockfd_type fd); LWS_EXTERN int LWS_WARN_UNUSED_RESULT -lws_allocate_header_table(struct lws *wsi); +lws_header_table_attach(struct lws *wsi); LWS_EXTERN int -lws_free_header_table(struct lws *wsi); +lws_header_table_detach(struct lws *wsi); LWS_EXTERN void -lws_reset_header_table(struct lws *wsi); +lws_header_table_reset(struct lws *wsi); LWS_EXTERN char * LWS_WARN_UNUSED_RESULT lws_hdr_simple_ptr(struct lws *wsi, enum lws_token_indexes h); diff --git a/lib/server.c b/lib/server.c index 9b8d6b55..63c80f53 100644 --- a/lib/server.c +++ b/lib/server.c @@ -292,33 +292,20 @@ lws_http_action(struct lws *wsi) n = wsi->protocol->callback(wsi, LWS_CALLBACK_FILTER_HTTP_CONNECTION, wsi->user_space, uri_ptr, uri_len); - - if (!n) { - /* - * if there is content supposed to be coming, - * put a timeout on it having arrived - */ - lws_set_timeout(wsi, PENDING_TIMEOUT_HTTP_CONTENT, - AWAITING_TIMEOUT); - - n = wsi->protocol->callback(wsi, LWS_CALLBACK_HTTP, - wsi->user_space, uri_ptr, uri_len); - } - - /* - * If we are in keepalive, we may already have the next header set - * pipelined in the lws_read buffer above us... if so, we must hold - * the ah so it's still bound when we want to process the next headers. - * - */ - if (connection_type == HTTP_CONNECTION_CLOSE || - !wsi->u.hdr.more_rx_waiting) - /* now drop the header info we kept a pointer to */ - lws_free_header_table(wsi); - if (n) { lwsl_info("LWS_CALLBACK_HTTP closing\n"); - return 1; /* struct ah ptr already nuked */ } + + return 1; + } + /* + * if there is content supposed to be coming, + * put a timeout on it having arrived + */ + lws_set_timeout(wsi, PENDING_TIMEOUT_HTTP_CONTENT, + AWAITING_TIMEOUT); + + n = wsi->protocol->callback(wsi, LWS_CALLBACK_HTTP, + wsi->user_space, uri_ptr, uri_len); /* * If we're not issuing a file, check for content_length or @@ -336,7 +323,9 @@ lws_http_action(struct lws *wsi) return 0; bail_nuke_ah: - lws_free_header_table(wsi); + /* we're closing, losing some rx is OK */ + wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen; + lws_header_table_detach(wsi); return 1; } @@ -346,12 +335,15 @@ int lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len) { struct lws_context *context = lws_get_context(wsi); + struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + struct _lws_header_related hdr; struct allocated_headers *ah; int protocol_len, n, hit; char protocol_list[128]; char protocol_name[32]; char *p; + assert(len < 10000000); assert(wsi->u.hdr.ah); while (len--) { @@ -563,10 +555,34 @@ upgrade_ws: goto bail_nuke_ah; } - /* drop the header info -- no bail_nuke_ah after this */ - lws_free_header_table(wsi); + /* we are upgrading to ws, so http/1.1 and keepalive + + * pipelined header considerations about keeping the ah around + * no longer apply. However it's common for the first ws + * protocol data to have been coalesced with the browser + * upgrade request and to already be in the ah rx buffer. + */ + + lwsl_err("%s: %p: inheriting ah in ws mode (rxpos: %d, rxlen: %d)\n", + __func__, wsi, wsi->u.hdr.ah->rxpos, wsi->u.hdr.ah->rxlen); + lws_pt_lock(pt); + hdr = wsi->u.hdr; lws_union_transition(wsi, LWSCM_WS_SERVING); + /* + * first service is WS mode will notice this, use the RX and + * then detach the ah (caution: we are not in u.hdr union + * mode any more then... ah_temp member is at start the same + * though) + * + * Beacuse rxpos/rxlen shows something in the ah, we will get + * service guaranteed next time around the event loop + * + * All union members begin with hdr, so we can use it even + * though we transitioned to ws union mode (the ah detach + * code uses it anyway). + */ + wsi->u.hdr = hdr; + lws_pt_unlock(pt); /* * create the frame buffer for this connection according to the @@ -601,7 +617,10 @@ upgrade_ws: bail_nuke_ah: /* drop the header info */ - lws_free_header_table(wsi); + /* we're closing, losing some rx is OK */ + wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen; + lws_header_table_detach(wsi); + return 1; } @@ -718,10 +737,11 @@ lws_http_transaction_completed(struct lws *wsi) * that is already at least the start of another header set, simply * reset the existing header table and keep it. */ - if (!wsi->u.hdr.more_rx_waiting) - lws_free_header_table(wsi); + if (wsi->u.hdr.ah && + wsi->u.hdr.ah->rxpos == wsi->u.hdr.ah->rxlen) + lws_header_table_detach(wsi); else - lws_reset_header_table(wsi); + lws_header_table_reset(wsi); /* If we're (re)starting on headers, need other implied init */ wsi->u.hdr.ues = URIES_IDLE; @@ -804,11 +824,11 @@ lws_server_socket_service(struct lws_context *context, struct lws *wsi, { struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; lws_sockfd_type accept_fd = LWS_SOCK_INVALID; + struct allocated_headers *ah; #if LWS_POSIX struct sockaddr_in cli_addr; socklen_t clilen; #endif - int n, len; switch (wsi->mode) { @@ -825,11 +845,12 @@ lws_server_socket_service(struct lws_context *context, struct lws *wsi, if (!(pollfd->revents & LWS_POLLOUT)) break; - if (lws_issue_raw(wsi, wsi->trunc_alloc + wsi->trunc_offset, + if (lws_issue_raw(wsi, wsi->trunc_alloc + + wsi->trunc_offset, wsi->trunc_len) < 0) goto fail; /* - * we can't afford to allow input processing send + * we can't afford to allow input processing to send * something new, so spin around he event loop until * he doesn't have any partials */ @@ -841,34 +862,63 @@ lws_server_socket_service(struct lws_context *context, struct lws *wsi, if (!(pollfd->revents & pollfd->events & LWS_POLLIN)) goto try_pollout; - if ((wsi->state == LWSS_HTTP || - wsi->state == LWSS_HTTP_ISSUING_FILE || - wsi->state == LWSS_HTTP_HEADERS) && !wsi->u.hdr.ah) - if (lws_allocate_header_table(wsi)) - goto try_pollout; + /* these states imply we MUST have an ah attached */ - /* - * This is a good situation, the ah allocated and we know there - * is header data pending. However, in http1.1 / keepalive - * case, back-to-back header sets pipelined into one packet - * is common. - * - * Ah is defined to be required to stay attached to the wsi - * until the current set of header data completes, which may - * involve network roundtrips if fragmented. Typically the - * header set is not fragmented and gets done atomically. - * - * When we complete processing for the first header set, we - * normally drop the ah in lws_http_transaction_completed() - * since we do not know how long it would be held waiting for - * the start of the next header set to arrive. - * - * However if there is pending data, http1.1 / keepalive mode - * is active, we need to retain (just reset) the ah after - * dealing with each header set instead of dropping it. - * - * Otherwise the remaining header data has nowhere to be stored. - */ + if (wsi->state == LWSS_HTTP || + wsi->state == LWSS_HTTP_ISSUING_FILE || + wsi->state == LWSS_HTTP_HEADERS) { + if (!wsi->u.hdr.ah) + if (lws_header_table_attach(wsi)) + goto try_pollout; + + ah = wsi->u.hdr.ah; + + lwsl_debug("%s: %p: rxpos:%d rxlen:%d\n", __func__, wsi, + ah->rxpos, ah->rxlen); + + /* if nothing in ah rx buffer, get some fresh rx */ + if (ah->rxpos == ah->rxlen) { + ah->rxlen = lws_ssl_capable_read(wsi, ah->rx, + sizeof(ah->rx)); + ah->rxpos = 0; + lwsl_debug("%s: wsi %p, ah->rxlen = %d\r\n", + __func__, wsi, ah->rxlen); + switch (ah->rxlen) { + case 0: + lwsl_info("%s: read 0 len\n", __func__); + /* lwsl_info(" state=%d\n", wsi->state); */ +// if (!wsi->hdr_parsing_completed) +// lws_header_table_detach(wsi); + /* fallthru */ + case LWS_SSL_CAPABLE_ERROR: + goto fail; + case LWS_SSL_CAPABLE_MORE_SERVICE: + ah->rxlen = ah->rxpos = 0; + goto try_pollout; + } + } + assert(ah->rxpos != ah->rxlen && ah->rxlen); + /* just ignore incoming if waiting for close */ + if (wsi->state != LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE) { + n = lws_read(wsi, ah->rx + ah->rxpos, + ah->rxlen - ah->rxpos); + if (n < 0) /* we closed wsi */ + return 1; + if (wsi->u.hdr.ah) { + if ( wsi->u.hdr.ah->rxlen) + wsi->u.hdr.ah->rxpos += n; + + if (wsi->u.hdr.ah->rxpos == wsi->u.hdr.ah->rxlen && + (wsi->mode != LWSCM_HTTP_SERVING && + wsi->mode != LWSCM_HTTP_SERVING_ACCEPTED && + wsi->mode != LWSCM_HTTP2_SERVING)) + lws_header_table_detach(wsi); + } + break; + } + + goto try_pollout; + } len = lws_ssl_capable_read(wsi, pt->serv_buf, LWS_MAX_SOCKET_IO_BUF); @@ -877,8 +927,8 @@ lws_server_socket_service(struct lws_context *context, struct lws *wsi, case 0: lwsl_info("%s: read 0 len\n", __func__); /* lwsl_info(" state=%d\n", wsi->state); */ - if (!wsi->hdr_parsing_completed) - lws_free_header_table(wsi); +// if (!wsi->hdr_parsing_completed) +// lws_header_table_detach(wsi); /* fallthru */ case LWS_SSL_CAPABLE_ERROR: goto fail; diff --git a/lib/service.c b/lib/service.c index 2e4570fd..f5dcefc4 100644 --- a/lib/service.c +++ b/lib/service.c @@ -319,10 +319,13 @@ lws_service_timeout_check(struct lws *wsi, unsigned int sec) lws_pt_lock(pt); pwsi = &pt->ah_wait_list; + if (!pwsi) + return 0; while (*pwsi) { if (*pwsi == wsi) break; pwsi = &(*pwsi)->u.hdr.ah_wait_list; + lwsl_err("%s: pwsi=%p\n", __func__, pwsi); } lws_pt_unlock(pt); @@ -363,6 +366,124 @@ int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len) return 0; } +/* this is used by the platform service code to stop us waiting for network + * activity in poll() when we have something that already needs service + */ + +int +lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi) +{ + struct lws_context_per_thread *pt = &context->pt[tsi]; + int n; + + /* Figure out if we really want to wait in poll() + * We only need to wait if really nothing already to do and we have + * to wait for something from network + */ + + /* 1) if we know we are draining rx ext, do not wait in poll */ + if (pt->rx_draining_ext_list) + timeout_ms = 0; + +#ifdef LWS_OPENSSL_SUPPORT + /* 2) if we know we have non-network pending data, do not wait in poll */ + if (lws_ssl_anybody_has_buffered_read_tsi(context, tsi)) { + timeout_ms = 0; + lwsl_err("ssl buffered read\n"); + } +#endif + + /* 3) if any ah has pending rx, do not wait in poll */ + for (n = 0; n < context->max_http_header_pool; n++) + if (pt->ah_pool[n].rxpos != pt->ah_pool[n].rxlen) { + /* any ah with pending rx must be attached to someone */ + assert(pt->ah_pool[n].wsi); + timeout_ms = 0; + break; + } + + return timeout_ms; +} + +/* + * guys that need POLLIN service again without waiting for network action + * can force POLLIN here if not flowcontrolled, so they will get service. + * + * Return nonzero if anybody got their POLLIN faked + */ +int +lws_service_flag_pending(struct lws_context *context, int tsi) +{ + struct lws_context_per_thread *pt = &context->pt[tsi]; +#ifdef LWS_OPENSSL_SUPPORT + struct lws *wsi_next; +#endif + struct lws *wsi; + int forced = 0; + int n; + + /* POLLIN faking */ + + /* + * 1) For all guys with already-available ext data to drain, if they are + * not flowcontrolled, fake their POLLIN status + */ + wsi = pt->rx_draining_ext_list; + while (wsi) { + pt->fds[wsi->position_in_fds_table].revents |= + pt->fds[wsi->position_in_fds_table].events & LWS_POLLIN; + if (pt->fds[wsi->position_in_fds_table].revents & + LWS_POLLIN) + forced = 1; + wsi = wsi->u.ws.rx_draining_ext_list; + } + +#ifdef LWS_OPENSSL_SUPPORT + /* + * 2) For all guys with buffered SSL read data already saved up, if they + * are not flowcontrolled, fake their POLLIN status so they'll get + * service to use up the buffered incoming data, even though their + * network socket may have nothing + */ + wsi = pt->pending_read_list; + while (wsi) { + wsi_next = wsi->pending_read_list_next; + pt->fds[wsi->position_in_fds_table].revents |= + pt->fds[wsi->position_in_fds_table].events & LWS_POLLIN; + if (pt->fds[wsi->position_in_fds_table].revents & LWS_POLLIN) { + forced = 1; + /* + * he's going to get serviced now, take him off the + * list of guys with buffered SSL. If he still has some + * at the end of the service, he'll get put back on the + * list then. + */ + lws_ssl_remove_wsi_from_buffered_list(wsi); + } + + wsi = wsi_next; + } +#endif + /* + * 3) For any wsi who have an ah with pending RX who did not + * complete their current headers, and are not flowcontrolled, + * fake their POLLIN status so they will be able to drain the + * rx buffered in the ah + */ + for (n = 0; n < context->max_http_header_pool; n++) + if (pt->ah_pool[n].rxpos != pt->ah_pool[n].rxlen && + !pt->ah_pool[n].wsi->hdr_parsing_completed) { + pt->fds[pt->ah_pool[n].wsi->position_in_fds_table].revents |= + pt->fds[pt->ah_pool[n].wsi->position_in_fds_table].events & + LWS_POLLIN; + if (pt->fds[pt->ah_pool[n].wsi->position_in_fds_table].revents & + LWS_POLLIN) + forced = 1; + } + + return forced; +} + /** * lws_service_fd() - Service polled socket with something waiting * @context: Websocket context @@ -519,7 +640,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t lwsl_info("lws_service_fd: closing\n"); goto close_and_handled; } -#if 1 + if (wsi->state == LWSS_RETURNED_CLOSE_ALREADY || wsi->state == LWSS_AWAITING_CLOSE_ACK) { /* @@ -530,7 +651,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t lws_rx_flow_control(wsi, 1); wsi->u.ws.tx_draining_ext = 0; } -#endif + if (wsi->u.ws.tx_draining_ext) { /* we cannot deal with new RX until the TX ext * path has been drained. It's because new @@ -592,27 +713,42 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t goto drain; } - /* 4: any incoming data ready? + /* 4: any incoming (or ah-stashed incoming rx) data ready? * notice if rx flow going off raced poll(), rx flow wins */ + if (!(pollfd->revents & pollfd->events & LWS_POLLIN)) break; read: - eff_buf.token_len = lws_ssl_capable_read(wsi, pt->serv_buf, - pending ? pending : LWS_MAX_SOCKET_IO_BUF); - switch (eff_buf.token_len) { - case 0: - lwsl_info("service_fd: closing due to 0 length read\n"); - goto close_and_handled; - case LWS_SSL_CAPABLE_MORE_SERVICE: - lwsl_info("SSL Capable more service\n"); - n = 0; - goto handled; - case LWS_SSL_CAPABLE_ERROR: - lwsl_info("Closing when error\n"); - goto close_and_handled; - } + /* all the union members start with hdr, so even in ws mode + * we can deal with the ah via u.hdr + */ + if (wsi->u.hdr.ah) { + lwsl_err("%s: %p: using inherited ah rx\n", __func__, wsi); + eff_buf.token_len = wsi->u.hdr.ah->rxlen - + wsi->u.hdr.ah->rxpos; + eff_buf.token = (char *)wsi->u.hdr.ah->rx + + wsi->u.hdr.ah->rxpos; + } else { + + eff_buf.token_len = lws_ssl_capable_read(wsi, pt->serv_buf, + pending ? pending : LWS_MAX_SOCKET_IO_BUF); + switch (eff_buf.token_len) { + case 0: + lwsl_info("service_fd: closing due to 0 length read\n"); + goto close_and_handled; + case LWS_SSL_CAPABLE_MORE_SERVICE: + lwsl_info("SSL Capable more service\n"); + n = 0; + goto handled; + case LWS_SSL_CAPABLE_ERROR: + lwsl_info("Closing when error\n"); + goto close_and_handled; + } + + eff_buf.token = (char *)pt->serv_buf; + } /* * give any active extensions a chance to munge the buffer * before parse. We pass in a pointer to an lws_tokens struct @@ -624,15 +760,12 @@ read: * extension callback handling, just the normal input buffer is * used then so it is efficient. */ - - eff_buf.token = (char *)pt->serv_buf; - drain: do { more = 0; - m = lws_ext_cb_active(wsi, - LWS_EXT_CB_PACKET_RX_PREPARSE, &eff_buf, 0); + m = lws_ext_cb_active(wsi, LWS_EXT_CB_PACKET_RX_PREPARSE, + &eff_buf, 0); if (m < 0) goto close_and_handled; if (m) @@ -661,6 +794,16 @@ drain: eff_buf.token_len = 0; } while (more); + if (wsi->u.hdr.ah) { + lwsl_err("%s: %p: detaching inherited used ah\n", __func__, wsi); + /* show we used all the pending rx up */ + wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen; + /* we can run the normal ah detach flow despite + * being in ws union mode, since all union members + * start with hdr */ + lws_header_table_detach(wsi); + } + pending = lws_ssl_pending(wsi); if (pending) { handle_pending: @@ -670,7 +813,7 @@ handle_pending: } if (draining_flow && wsi->rxflow_buffer && - wsi->rxflow_pos == wsi->rxflow_len) { + wsi->rxflow_pos == wsi->rxflow_len) { lwsl_info("flow buffer: drained\n"); lws_free_set_NULL(wsi->rxflow_buffer); /* having drained the rxflow buffer, can rearm POLLIN */