From 44e0b088fa9b3854a39eb7ee02c09e89b49541ae Mon Sep 17 00:00:00 2001 From: Andy Green Date: Mon, 28 Dec 2015 14:24:49 +0800 Subject: [PATCH] autobahn add same serverside rxflow cache to client Server side has had immediate RX flow control for quite a while. But client side made do with RX continuing until what had been received was exhausted. For what Autobahn tests, that's not enough. This patch gives clientside RX flow control the same immediate effect as the server side enjoys, re-using the same code. Signed-off-by: Andy Green --- lib/client.c | 22 +++++++++++++++++++++- lib/handshake.c | 7 +++++-- lib/libwebsockets.c | 35 +++++++++++++++++++++++++++++++++++ lib/output.c | 15 +++++++++++---- lib/private-libwebsockets.h | 10 ++++------ lib/server.c | 37 +------------------------------------ lib/service.c | 17 ++++++++++++----- 7 files changed, 89 insertions(+), 54 deletions(-) diff --git a/lib/client.c b/lib/client.c index d712e023..51e30f0b 100644 --- a/lib/client.c +++ b/lib/client.c @@ -25,21 +25,41 @@ int lws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len) { unsigned int n; + lwsl_debug("%s: len %u\n", __func__, len); + switch (wsi->mode) { case LWSCM_WSCL_WAITING_PROXY_REPLY: case LWSCM_WSCL_ISSUE_HANDSHAKE: case LWSCM_WSCL_WAITING_SERVER_REPLY: case LWSCM_WSCL_WAITING_EXTENSION_CONNECT: case LWSCM_WS_CLIENT: - for (n = 0; n < len; n++) + for (n = 0; n < len; n++) { + /* + * we were accepting input but now we stopped doing so + */ + if (!(wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) { + lwsl_debug("%s: caching %d\n", __func__, len - n); + lws_rxflow_cache(wsi, *buf, 0, len - n); + return 0; + } + + /* account for what we're using in rxflow buffer */ + if (wsi->rxflow_buffer) + wsi->rxflow_pos++; + if (lws_client_rx_sm(wsi, *(*buf)++)) { lwsl_debug("client_rx_sm failed\n"); return 1; } + } + lwsl_debug("%s: finished with %d\n", __func__, len); return 0; default: break; } + + lwsl_debug("%s: did nothing\n", __func__); + return 0; } diff --git a/lib/handshake.c b/lib/handshake.c index bf9437f1..00b543ae 100644 --- a/lib/handshake.c +++ b/lib/handshake.c @@ -108,12 +108,15 @@ http_new: /* Handshake indicates this session is done. */ goto bail; - /* It's possible that we've exhausted our data already, but - * lws_handshake_server doesn't update len for us. + /* + * It's possible that we've exhausted our data already, or + * rx flow control has stopped us dealing with this early, + * but lws_handshake_server doesn't update len for us. * Figure out how much was read, so that we can proceed * appropriately: */ len -= (buf - last_char); + lwsl_debug("%s: thinks we have used %d\n", __func__, len); if (!wsi->hdr_parsing_completed) /* More header content on the way */ diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index 380f2277..82f317af 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -974,3 +974,38 @@ lws_close_reason(struct lws *wsi, enum lws_close_status status, wsi->u.ws.close_in_ping_buffer_len = p - start; } + +LWS_EXTERN int +_lws_rx_flow_control(struct lws *wsi) +{ + /* there is no pending change */ + if (!(wsi->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE)) + return 0; + + /* stuff is still buffered, not ready to really accept new input */ + if (wsi->rxflow_buffer) { + /* get ourselves called back to deal with stashed buffer */ + lws_callback_on_writable(wsi); + return 0; + } + + /* pending is cleared, we can change rxflow state */ + + wsi->rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE; + + lwsl_info("rxflow: wsi %p change_to %d\n", wsi, + wsi->rxflow_change_to & LWS_RXFLOW_ALLOW); + + /* adjust the pollfd for this wsi */ + + if (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW) { + if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { + lwsl_info("%s: fail\n", __func__); + return -1; + } + } else + if (lws_change_pollfd(wsi, LWS_POLLIN, 0)) + return -1; + + return 0; +} diff --git a/lib/output.c b/lib/output.c index 5af0e269..2f913523 100644 --- a/lib/output.c +++ b/lib/output.c @@ -24,8 +24,13 @@ static int lws_0405_frame_mask_generate(struct lws *wsi) { - int n; - +#if 0 + wsi->u.ws.mask_nonce[0] = 0; + wsi->u.ws.mask_nonce[1] = 0; + wsi->u.ws.mask_nonce[2] = 0; + wsi->u.ws.mask_nonce[3] = 0; +#else + int n; /* fetch the per-frame nonce */ n = lws_get_random(lws_get_context(wsi), wsi->u.ws.mask_nonce, 4); @@ -34,7 +39,7 @@ lws_0405_frame_mask_generate(struct lws *wsi) SYSTEM_RANDOM_FILEPATH, n); return 1; } - +#endif /* start masking from first byte of masking key buffer */ wsi->u.ws.frame_mask_index = 0; @@ -260,8 +265,10 @@ LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf, /* if we are continuing a frame that already had its header done */ - if (wsi->u.ws.inside_frame) + if (wsi->u.ws.inside_frame) { + lwsl_debug("INSIDE FRAME\n"); goto do_more_inside_frame; + } wsi->u.ws.clean_buffer = 1; diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 07fb2a1f..b91417f9 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -1060,9 +1060,6 @@ lws_rx_sm(struct lws *wsi, unsigned char c); LWS_EXTERN int lws_issue_raw_ext_access(struct lws *wsi, unsigned char *buf, size_t len); -LWS_EXTERN int -_lws_rx_flow_control(struct lws *wsi); - LWS_EXTERN void lws_union_transition(struct lws *wsi, enum connection_mode mode); @@ -1242,19 +1239,20 @@ lws_decode_ssl_error(void); #define lws_context_init_client_ssl(_a, _b) (0) #define lws_handshake_client(_a, _b, _c) (0) #endif + +LWS_EXTERN int +_lws_rx_flow_control(struct lws *wsi); + #ifndef LWS_NO_SERVER LWS_EXTERN int lws_server_socket_service(struct lws_context *context, struct lws *wsi, struct lws_pollfd *pollfd); LWS_EXTERN int -_lws_rx_flow_control(struct lws *wsi); -LWS_EXTERN int lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len); LWS_EXTERN int _lws_server_listen_accept_flow_control(struct lws_context *context, int on); #else #define lws_server_socket_service(_a, _b, _c) (0) -#define _lws_rx_flow_control(_a) (0) #define lws_handshake_server(_a, _b, _c) (0) #define _lws_server_listen_accept_flow_control(a, b) (0) #endif diff --git a/lib/server.c b/lib/server.c index fa99f132..c60405eb 100644 --- a/lib/server.c +++ b/lib/server.c @@ -146,41 +146,6 @@ bail: return 1; } -int -_lws_rx_flow_control(struct lws *wsi) -{ - /* there is no pending change */ - if (!(wsi->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE)) - return 0; - - /* stuff is still buffered, not ready to really accept new input */ - if (wsi->rxflow_buffer) { - /* get ourselves called back to deal with stashed buffer */ - lws_callback_on_writable(wsi); - return 0; - } - - /* pending is cleared, we can change rxflow state */ - - wsi->rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE; - - lwsl_info("rxflow: wsi %p change_to %d\n", wsi, - wsi->rxflow_change_to & LWS_RXFLOW_ALLOW); - - /* adjust the pollfd for this wsi */ - - if (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW) { - if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { - lwsl_info("%s: fail\n", __func__); - return -1; - } - } else - if (lws_change_pollfd(wsi, LWS_POLLIN, 0)) - return -1; - - return 0; -} - int _lws_server_listen_accept_flow_control(struct lws_context *context, int on) { @@ -733,7 +698,7 @@ int lws_server_socket_service(struct lws_context *context, /* any incoming data ready? */ - if (!(pollfd->revents & LWS_POLLIN)) + if (!(pollfd->revents & pollfd->events && LWS_POLLIN)) goto try_pollout; len = lws_ssl_capable_read(wsi, context->serv_buf, diff --git a/lib/service.c b/lib/service.c index 46b924c5..8d5c2d5b 100644 --- a/lib/service.c +++ b/lib/service.c @@ -461,7 +461,7 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd) /* handle session socket closed */ if ((!(pollfd->revents & LWS_POLLIN)) && - (pollfd->revents & LWS_POLLHUP)) { + (pollfd->revents & LWS_POLLHUP)) { lwsl_debug("Session Socket %p (fd=%d) dead\n", (void *)wsi, pollfd->fd); @@ -511,7 +511,7 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd) } if (wsi->rxflow_buffer && - (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) { + (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) { lwsl_info("draining rxflow\n"); /* well, drain it */ eff_buf.token = (char *)wsi->rxflow_buffer + @@ -522,11 +522,11 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd) } /* any incoming data ready? */ - - if (!(pollfd->revents & LWS_POLLIN)) + /* notice if rx flow going off raced poll(), rx flow wins */ + if (wsi->rxflow_buffer || + !(pollfd->revents & pollfd->events & LWS_POLLIN)) break; read: - eff_buf.token_len = lws_ssl_capable_read(wsi, context->serv_buf, pending ? pending : @@ -572,6 +572,13 @@ drain: /* service incoming data */ if (eff_buf.token_len) { + /* + * if draining from rxflow buffer, not + * critical to track what was used since at the + * use it bumps wsi->rxflow_pos. If we come + * around again it will pick up from where it + * left off. + */ n = lws_read(wsi, (unsigned char *)eff_buf.token, eff_buf.token_len); if (n < 0) {