diff --git a/lib/client.c b/lib/client.c index d712e023..51e30f0b 100644 --- a/lib/client.c +++ b/lib/client.c @@ -25,21 +25,41 @@ int lws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len) { unsigned int n; + lwsl_debug("%s: len %u\n", __func__, len); + switch (wsi->mode) { case LWSCM_WSCL_WAITING_PROXY_REPLY: case LWSCM_WSCL_ISSUE_HANDSHAKE: case LWSCM_WSCL_WAITING_SERVER_REPLY: case LWSCM_WSCL_WAITING_EXTENSION_CONNECT: case LWSCM_WS_CLIENT: - for (n = 0; n < len; n++) + for (n = 0; n < len; n++) { + /* + * we were accepting input but now we stopped doing so + */ + if (!(wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) { + lwsl_debug("%s: caching %d\n", __func__, len - n); + lws_rxflow_cache(wsi, *buf, 0, len - n); + return 0; + } + + /* account for what we're using in rxflow buffer */ + if (wsi->rxflow_buffer) + wsi->rxflow_pos++; + if (lws_client_rx_sm(wsi, *(*buf)++)) { lwsl_debug("client_rx_sm failed\n"); return 1; } + } + lwsl_debug("%s: finished with %d\n", __func__, len); return 0; default: break; } + + lwsl_debug("%s: did nothing\n", __func__); + return 0; } diff --git a/lib/handshake.c b/lib/handshake.c index bf9437f1..00b543ae 100644 --- a/lib/handshake.c +++ b/lib/handshake.c @@ -108,12 +108,15 @@ http_new: /* Handshake indicates this session is done. */ goto bail; - /* It's possible that we've exhausted our data already, but - * lws_handshake_server doesn't update len for us. + /* + * It's possible that we've exhausted our data already, or + * rx flow control has stopped us dealing with this early, + * but lws_handshake_server doesn't update len for us. * Figure out how much was read, so that we can proceed * appropriately: */ len -= (buf - last_char); + lwsl_debug("%s: thinks we have used %d\n", __func__, len); if (!wsi->hdr_parsing_completed) /* More header content on the way */ diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index 380f2277..82f317af 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -974,3 +974,38 @@ lws_close_reason(struct lws *wsi, enum lws_close_status status, wsi->u.ws.close_in_ping_buffer_len = p - start; } + +LWS_EXTERN int +_lws_rx_flow_control(struct lws *wsi) +{ + /* there is no pending change */ + if (!(wsi->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE)) + return 0; + + /* stuff is still buffered, not ready to really accept new input */ + if (wsi->rxflow_buffer) { + /* get ourselves called back to deal with stashed buffer */ + lws_callback_on_writable(wsi); + return 0; + } + + /* pending is cleared, we can change rxflow state */ + + wsi->rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE; + + lwsl_info("rxflow: wsi %p change_to %d\n", wsi, + wsi->rxflow_change_to & LWS_RXFLOW_ALLOW); + + /* adjust the pollfd for this wsi */ + + if (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW) { + if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { + lwsl_info("%s: fail\n", __func__); + return -1; + } + } else + if (lws_change_pollfd(wsi, LWS_POLLIN, 0)) + return -1; + + return 0; +} diff --git a/lib/output.c b/lib/output.c index 5af0e269..2f913523 100644 --- a/lib/output.c +++ b/lib/output.c @@ -24,8 +24,13 @@ static int lws_0405_frame_mask_generate(struct lws *wsi) { - int n; - +#if 0 + wsi->u.ws.mask_nonce[0] = 0; + wsi->u.ws.mask_nonce[1] = 0; + wsi->u.ws.mask_nonce[2] = 0; + wsi->u.ws.mask_nonce[3] = 0; +#else + int n; /* fetch the per-frame nonce */ n = lws_get_random(lws_get_context(wsi), wsi->u.ws.mask_nonce, 4); @@ -34,7 +39,7 @@ lws_0405_frame_mask_generate(struct lws *wsi) SYSTEM_RANDOM_FILEPATH, n); return 1; } - +#endif /* start masking from first byte of masking key buffer */ wsi->u.ws.frame_mask_index = 0; @@ -260,8 +265,10 @@ LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf, /* if we are continuing a frame that already had its header done */ - if (wsi->u.ws.inside_frame) + if (wsi->u.ws.inside_frame) { + lwsl_debug("INSIDE FRAME\n"); goto do_more_inside_frame; + } wsi->u.ws.clean_buffer = 1; diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 07fb2a1f..b91417f9 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -1060,9 +1060,6 @@ lws_rx_sm(struct lws *wsi, unsigned char c); LWS_EXTERN int lws_issue_raw_ext_access(struct lws *wsi, unsigned char *buf, size_t len); -LWS_EXTERN int -_lws_rx_flow_control(struct lws *wsi); - LWS_EXTERN void lws_union_transition(struct lws *wsi, enum connection_mode mode); @@ -1242,19 +1239,20 @@ lws_decode_ssl_error(void); #define lws_context_init_client_ssl(_a, _b) (0) #define lws_handshake_client(_a, _b, _c) (0) #endif + +LWS_EXTERN int +_lws_rx_flow_control(struct lws *wsi); + #ifndef LWS_NO_SERVER LWS_EXTERN int lws_server_socket_service(struct lws_context *context, struct lws *wsi, struct lws_pollfd *pollfd); LWS_EXTERN int -_lws_rx_flow_control(struct lws *wsi); -LWS_EXTERN int lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len); LWS_EXTERN int _lws_server_listen_accept_flow_control(struct lws_context *context, int on); #else #define lws_server_socket_service(_a, _b, _c) (0) -#define _lws_rx_flow_control(_a) (0) #define lws_handshake_server(_a, _b, _c) (0) #define _lws_server_listen_accept_flow_control(a, b) (0) #endif diff --git a/lib/server.c b/lib/server.c index fa99f132..c60405eb 100644 --- a/lib/server.c +++ b/lib/server.c @@ -146,41 +146,6 @@ bail: return 1; } -int -_lws_rx_flow_control(struct lws *wsi) -{ - /* there is no pending change */ - if (!(wsi->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE)) - return 0; - - /* stuff is still buffered, not ready to really accept new input */ - if (wsi->rxflow_buffer) { - /* get ourselves called back to deal with stashed buffer */ - lws_callback_on_writable(wsi); - return 0; - } - - /* pending is cleared, we can change rxflow state */ - - wsi->rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE; - - lwsl_info("rxflow: wsi %p change_to %d\n", wsi, - wsi->rxflow_change_to & LWS_RXFLOW_ALLOW); - - /* adjust the pollfd for this wsi */ - - if (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW) { - if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { - lwsl_info("%s: fail\n", __func__); - return -1; - } - } else - if (lws_change_pollfd(wsi, LWS_POLLIN, 0)) - return -1; - - return 0; -} - int _lws_server_listen_accept_flow_control(struct lws_context *context, int on) { @@ -733,7 +698,7 @@ int lws_server_socket_service(struct lws_context *context, /* any incoming data ready? */ - if (!(pollfd->revents & LWS_POLLIN)) + if (!(pollfd->revents & pollfd->events && LWS_POLLIN)) goto try_pollout; len = lws_ssl_capable_read(wsi, context->serv_buf, diff --git a/lib/service.c b/lib/service.c index 46b924c5..8d5c2d5b 100644 --- a/lib/service.c +++ b/lib/service.c @@ -461,7 +461,7 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd) /* handle session socket closed */ if ((!(pollfd->revents & LWS_POLLIN)) && - (pollfd->revents & LWS_POLLHUP)) { + (pollfd->revents & LWS_POLLHUP)) { lwsl_debug("Session Socket %p (fd=%d) dead\n", (void *)wsi, pollfd->fd); @@ -511,7 +511,7 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd) } if (wsi->rxflow_buffer && - (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) { + (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) { lwsl_info("draining rxflow\n"); /* well, drain it */ eff_buf.token = (char *)wsi->rxflow_buffer + @@ -522,11 +522,11 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd) } /* any incoming data ready? */ - - if (!(pollfd->revents & LWS_POLLIN)) + /* notice if rx flow going off raced poll(), rx flow wins */ + if (wsi->rxflow_buffer || + !(pollfd->revents & pollfd->events & LWS_POLLIN)) break; read: - eff_buf.token_len = lws_ssl_capable_read(wsi, context->serv_buf, pending ? pending : @@ -572,6 +572,13 @@ drain: /* service incoming data */ if (eff_buf.token_len) { + /* + * if draining from rxflow buffer, not + * critical to track what was used since at the + * use it bumps wsi->rxflow_pos. If we come + * around again it will pick up from where it + * left off. + */ n = lws_read(wsi, (unsigned char *)eff_buf.token, eff_buf.token_len); if (n < 0) {