autobahn add same serverside rxflow cache to client

Server side has had immediate RX flow control for quite a while.

But client side made do with RX continuing until what had been received was exhausted.

For what Autobahn tests, that's not enough.

This patch gives clientside RX flow control the same immediate effect as the server
side enjoys, re-using the same code.

Signed-off-by: Andy Green <andy.green@linaro.org>
This commit is contained in:
Andy Green 2015-12-28 14:24:49 +08:00
parent f05167dee6
commit 44e0b088fa
7 changed files with 89 additions and 54 deletions

View file

@ -25,21 +25,41 @@ int lws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len)
{
unsigned int n;
lwsl_debug("%s: len %u\n", __func__, len);
switch (wsi->mode) {
case LWSCM_WSCL_WAITING_PROXY_REPLY:
case LWSCM_WSCL_ISSUE_HANDSHAKE:
case LWSCM_WSCL_WAITING_SERVER_REPLY:
case LWSCM_WSCL_WAITING_EXTENSION_CONNECT:
case LWSCM_WS_CLIENT:
for (n = 0; n < len; n++)
for (n = 0; n < len; n++) {
/*
* we were accepting input but now we stopped doing so
*/
if (!(wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) {
lwsl_debug("%s: caching %d\n", __func__, len - n);
lws_rxflow_cache(wsi, *buf, 0, len - n);
return 0;
}
/* account for what we're using in rxflow buffer */
if (wsi->rxflow_buffer)
wsi->rxflow_pos++;
if (lws_client_rx_sm(wsi, *(*buf)++)) {
lwsl_debug("client_rx_sm failed\n");
return 1;
}
}
lwsl_debug("%s: finished with %d\n", __func__, len);
return 0;
default:
break;
}
lwsl_debug("%s: did nothing\n", __func__);
return 0;
}

View file

@ -108,12 +108,15 @@ http_new:
/* Handshake indicates this session is done. */
goto bail;
/* It's possible that we've exhausted our data already, but
* lws_handshake_server doesn't update len for us.
/*
* It's possible that we've exhausted our data already, or
* rx flow control has stopped us dealing with this early,
* but lws_handshake_server doesn't update len for us.
* Figure out how much was read, so that we can proceed
* appropriately:
*/
len -= (buf - last_char);
lwsl_debug("%s: thinks we have used %d\n", __func__, len);
if (!wsi->hdr_parsing_completed)
/* More header content on the way */

View file

@ -974,3 +974,38 @@ lws_close_reason(struct lws *wsi, enum lws_close_status status,
wsi->u.ws.close_in_ping_buffer_len = p - start;
}
LWS_EXTERN int
_lws_rx_flow_control(struct lws *wsi)
{
/* there is no pending change */
if (!(wsi->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE))
return 0;
/* stuff is still buffered, not ready to really accept new input */
if (wsi->rxflow_buffer) {
/* get ourselves called back to deal with stashed buffer */
lws_callback_on_writable(wsi);
return 0;
}
/* pending is cleared, we can change rxflow state */
wsi->rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE;
lwsl_info("rxflow: wsi %p change_to %d\n", wsi,
wsi->rxflow_change_to & LWS_RXFLOW_ALLOW);
/* adjust the pollfd for this wsi */
if (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW) {
if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
lwsl_info("%s: fail\n", __func__);
return -1;
}
} else
if (lws_change_pollfd(wsi, LWS_POLLIN, 0))
return -1;
return 0;
}

View file

@ -24,8 +24,13 @@
static int
lws_0405_frame_mask_generate(struct lws *wsi)
{
int n;
#if 0
wsi->u.ws.mask_nonce[0] = 0;
wsi->u.ws.mask_nonce[1] = 0;
wsi->u.ws.mask_nonce[2] = 0;
wsi->u.ws.mask_nonce[3] = 0;
#else
int n;
/* fetch the per-frame nonce */
n = lws_get_random(lws_get_context(wsi), wsi->u.ws.mask_nonce, 4);
@ -34,7 +39,7 @@ lws_0405_frame_mask_generate(struct lws *wsi)
SYSTEM_RANDOM_FILEPATH, n);
return 1;
}
#endif
/* start masking from first byte of masking key buffer */
wsi->u.ws.frame_mask_index = 0;
@ -260,8 +265,10 @@ LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf,
/* if we are continuing a frame that already had its header done */
if (wsi->u.ws.inside_frame)
if (wsi->u.ws.inside_frame) {
lwsl_debug("INSIDE FRAME\n");
goto do_more_inside_frame;
}
wsi->u.ws.clean_buffer = 1;

View file

@ -1060,9 +1060,6 @@ lws_rx_sm(struct lws *wsi, unsigned char c);
LWS_EXTERN int
lws_issue_raw_ext_access(struct lws *wsi, unsigned char *buf, size_t len);
LWS_EXTERN int
_lws_rx_flow_control(struct lws *wsi);
LWS_EXTERN void
lws_union_transition(struct lws *wsi, enum connection_mode mode);
@ -1242,19 +1239,20 @@ lws_decode_ssl_error(void);
#define lws_context_init_client_ssl(_a, _b) (0)
#define lws_handshake_client(_a, _b, _c) (0)
#endif
LWS_EXTERN int
_lws_rx_flow_control(struct lws *wsi);
#ifndef LWS_NO_SERVER
LWS_EXTERN int
lws_server_socket_service(struct lws_context *context, struct lws *wsi,
struct lws_pollfd *pollfd);
LWS_EXTERN int
_lws_rx_flow_control(struct lws *wsi);
LWS_EXTERN int
lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len);
LWS_EXTERN int
_lws_server_listen_accept_flow_control(struct lws_context *context, int on);
#else
#define lws_server_socket_service(_a, _b, _c) (0)
#define _lws_rx_flow_control(_a) (0)
#define lws_handshake_server(_a, _b, _c) (0)
#define _lws_server_listen_accept_flow_control(a, b) (0)
#endif

View file

@ -146,41 +146,6 @@ bail:
return 1;
}
int
_lws_rx_flow_control(struct lws *wsi)
{
/* there is no pending change */
if (!(wsi->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE))
return 0;
/* stuff is still buffered, not ready to really accept new input */
if (wsi->rxflow_buffer) {
/* get ourselves called back to deal with stashed buffer */
lws_callback_on_writable(wsi);
return 0;
}
/* pending is cleared, we can change rxflow state */
wsi->rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE;
lwsl_info("rxflow: wsi %p change_to %d\n", wsi,
wsi->rxflow_change_to & LWS_RXFLOW_ALLOW);
/* adjust the pollfd for this wsi */
if (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW) {
if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
lwsl_info("%s: fail\n", __func__);
return -1;
}
} else
if (lws_change_pollfd(wsi, LWS_POLLIN, 0))
return -1;
return 0;
}
int
_lws_server_listen_accept_flow_control(struct lws_context *context, int on)
{
@ -733,7 +698,7 @@ int lws_server_socket_service(struct lws_context *context,
/* any incoming data ready? */
if (!(pollfd->revents & LWS_POLLIN))
if (!(pollfd->revents & pollfd->events && LWS_POLLIN))
goto try_pollout;
len = lws_ssl_capable_read(wsi, context->serv_buf,

View file

@ -461,7 +461,7 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd)
/* handle session socket closed */
if ((!(pollfd->revents & LWS_POLLIN)) &&
(pollfd->revents & LWS_POLLHUP)) {
(pollfd->revents & LWS_POLLHUP)) {
lwsl_debug("Session Socket %p (fd=%d) dead\n",
(void *)wsi, pollfd->fd);
@ -511,7 +511,7 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd)
}
if (wsi->rxflow_buffer &&
(wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) {
(wsi->rxflow_change_to & LWS_RXFLOW_ALLOW)) {
lwsl_info("draining rxflow\n");
/* well, drain it */
eff_buf.token = (char *)wsi->rxflow_buffer +
@ -522,11 +522,11 @@ lws_service_fd(struct lws_context *context, struct lws_pollfd *pollfd)
}
/* any incoming data ready? */
if (!(pollfd->revents & LWS_POLLIN))
/* notice if rx flow going off raced poll(), rx flow wins */
if (wsi->rxflow_buffer ||
!(pollfd->revents & pollfd->events & LWS_POLLIN))
break;
read:
eff_buf.token_len = lws_ssl_capable_read(wsi,
context->serv_buf,
pending ? pending :
@ -572,6 +572,13 @@ drain:
/* service incoming data */
if (eff_buf.token_len) {
/*
* if draining from rxflow buffer, not
* critical to track what was used since at the
* use it bumps wsi->rxflow_pos. If we come
* around again it will pick up from where it
* left off.
*/
n = lws_read(wsi, (unsigned char *)eff_buf.token,
eff_buf.token_len);
if (n < 0) {