diff --git a/lib/client.c b/lib/client.c index cd791198..963c659e 100644 --- a/lib/client.c +++ b/lib/client.c @@ -626,6 +626,8 @@ check_accept: memset(&wsi->u, 0, sizeof(wsi->u)); + wsi->u.ws.rxflow_change_to = LWS_RXFLOW_ALLOW; + /* * create the frame buffer for this connection according to the * size mentioned in the protocol definition. If 0 there, then diff --git a/lib/handshake.c b/lib/handshake.c index bfcad656..4e9bd709 100644 --- a/lib/handshake.c +++ b/lib/handshake.c @@ -239,6 +239,7 @@ libwebsocket_read(struct libwebsocket_context *context, /* union transition */ memset(&wsi->u, 0, sizeof(wsi->u)); + wsi->u.ws.rxflow_change_to = LWS_RXFLOW_ALLOW; /* * create the frame buffer for this connection according to the diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index bcde2acb..c96db244 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -841,6 +841,7 @@ libwebsocket_service_fd(struct libwebsocket_context *context, struct timeval tv; int timed_out = 0; int our_fd = 0; + char draining_flow = 0; #ifndef LWS_NO_EXTENSIONS int more = 1; @@ -984,14 +985,25 @@ libwebsocket_service_fd(struct libwebsocket_context *context, /* the guy requested a callback when it was OK to write */ if ((pollfd->revents & POLLOUT) && - wsi->state == WSI_STATE_ESTABLISHED) - if (lws_handle_POLLOUT_event(context, wsi, - pollfd) < 0) { + wsi->state == WSI_STATE_ESTABLISHED && + lws_handle_POLLOUT_event(context, wsi, pollfd) < 0) { lwsl_info("libwebsocket_service_fd: closing\n"); goto close_and_handled; } + if (wsi->u.ws.rxflow_buffer && + (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)) { + lwsl_info("draining rxflow\n"); + /* well, drain it */ + eff_buf.token = (char *)wsi->u.ws.rxflow_buffer + + wsi->u.ws.rxflow_pos; + eff_buf.token_len = wsi->u.ws.rxflow_len - + wsi->u.ws.rxflow_pos; + draining_flow = 1; + goto drain; + } + /* any incoming data ready? */ if (!(pollfd->revents & POLLIN)) @@ -1041,6 +1053,7 @@ read_pending: */ eff_buf.token = (char *)context->service_buffer; +drain: #ifndef LWS_NO_EXTENSIONS more = 1; while (more) { @@ -1079,6 +1092,14 @@ read_pending: eff_buf.token_len = 0; } #endif + if (draining_flow && wsi->u.ws.rxflow_buffer && + wsi->u.ws.rxflow_pos == wsi->u.ws.rxflow_len) { + lwsl_info("flow buffer: drained\n"); + free(wsi->u.ws.rxflow_buffer); + wsi->u.ws.rxflow_buffer = NULL; + /* having drained the rxflow buffer, can rearm POLLIN */ + _libwebsocket_rx_flow_control(wsi); + } #ifdef LWS_OPENSSL_SUPPORT if (wsi->ssl && SSL_pending(wsi->ssl)) @@ -1099,10 +1120,9 @@ read_pending: goto handled; close_and_handled: - libwebsocket_close_and_free_session( - context, wsi, - LWS_CLOSE_STATUS_NOSTATUS); - n = 0; + libwebsocket_close_and_free_session(context, wsi, + LWS_CLOSE_STATUS_NOSTATUS); + n = 1; handled: pollfd->revents = 0; @@ -1249,6 +1269,7 @@ int libwebsocket_service(struct libwebsocket_context *context, int timeout_ms) { int n; + int m; /* stay dead once we are dead */ @@ -1266,11 +1287,17 @@ libwebsocket_service(struct libwebsocket_context *context, int timeout_ms) /* any socket with events to service? */ - for (n = 0; n < context->fds_count; n++) - if (context->fds[n].revents) - if (libwebsocket_service_fd(context, - &context->fds[n]) < 0) - return -1; + for (n = 0; n < context->fds_count; n++) { + if (!context->fds[n].revents) + continue; + m = libwebsocket_service_fd(context, &context->fds[n]); + if (m < 0) + return -1; + /* if something closed, retry this slot */ + if (m) + n--; + } + return 0; } @@ -1479,7 +1506,7 @@ lws_latency(struct libwebsocket_context *context, struct libwebsocket *wsi, #ifdef LWS_NO_SERVER int -_libwebsocket_rx_flow_control(struct libwebsocket *wsi) +_libwebsocket_rx_flow_control(struct libswebsocket *wsi) { return 0; } @@ -1488,34 +1515,33 @@ int _libwebsocket_rx_flow_control(struct libwebsocket *wsi) { struct libwebsocket_context *context = wsi->protocol->owning_server; - int n; - if (!(wsi->u.ws.rxflow_change_to & 2)) + /* there is no pending change */ + if (!(wsi->u.ws.rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE)) return 0; - wsi->u.ws.rxflow_change_to &= ~2; - - lwsl_info("rxflow: wsi %p change_to %d\n", - wsi, wsi->u.ws.rxflow_change_to); - - /* if we're letting it come again, did we interrupt anything? */ - if ((wsi->u.ws.rxflow_change_to & 1) && wsi->u.ws.rxflow_buffer) { - n = libwebsocket_interpret_incoming_packet(wsi, NULL, 0); - if (n < 0) { - lwsl_info("libwebsocket_rx_flow_control: close req\n"); - return -1; - } - if (n) - /* oh he stuck again, do nothing */ - return 0; + /* stuff is still buffered, not ready to really accept new input */ + if (wsi->u.ws.rxflow_buffer) { + /* get ourselves called back to deal with stashed buffer */ + libwebsocket_callback_on_writable(context, wsi); + return 0; } - if (wsi->u.ws.rxflow_change_to & 1) + /* pending is cleared, we can change rxflow state */ + + wsi->u.ws.rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE; + + lwsl_info("rxflow: wsi %p change_to %d\n", wsi, + wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW); + + /* adjust the pollfd for this wsi */ + + if (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW) context->fds[wsi->position_in_fds_table].events |= POLLIN; else context->fds[wsi->position_in_fds_table].events &= ~POLLIN; - if (wsi->u.ws.rxflow_change_to & 1) + if (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW) /* external POLL support via protocol 0 */ context->protocols[0].callback(context, wsi, LWS_CALLBACK_SET_MODE_POLL_FD, @@ -1544,7 +1570,11 @@ _libwebsocket_rx_flow_control(struct libwebsocket *wsi) int libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable) { - wsi->u.ws.rxflow_change_to = 2 | !!enable; + if (enable == (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)) + return 0; + + lwsl_info("libwebsocket_rx_flow_control(0x%p, %d)\n", wsi, enable); + wsi->u.ws.rxflow_change_to = LWS_RXFLOW_PENDING_CHANGE | !!enable; return 0; } diff --git a/lib/parsers.c b/lib/parsers.c index 7e6fd898..3ff3fbed 100644 --- a/lib/parsers.c +++ b/lib/parsers.c @@ -1047,36 +1047,22 @@ illegal_ctl_length: int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi, unsigned char *buf, size_t len) { - size_t n; + size_t n = 0; int m; - int clear_rxflow = !!wsi->u.ws.rxflow_buffer; - struct libwebsocket_context *context = wsi->protocol->owning_server; #if 0 lwsl_parser("received %d byte packet\n", (int)len); lwsl_hexdump(buf, len); #endif - if (buf && wsi->u.ws.rxflow_buffer) - lwsl_err("!!!! pending rxflow data loss\n"); - /* let the rx protocol state machine have as much as it needs */ - n = 0; - if (!buf) { - lwsl_info("dumping stored rxflow buffer len %d pos=%d\n", - wsi->u.ws.rxflow_len, wsi->u.ws.rxflow_pos); - buf = wsi->u.ws.rxflow_buffer; - n = wsi->u.ws.rxflow_pos; - len = wsi->u.ws.rxflow_len; - /* let's pretend he's already allowing input */ - context->fds[wsi->position_in_fds_table].events |= POLLIN; - } - while (n < len) { - if (!(context->fds[wsi->position_in_fds_table].events & - POLLIN)) { - /* his RX is flowcontrolled */ + /* + * we were accepting input but now we stopped doing so + */ + if (!(wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)) { + /* his RX is flowcontrolled, don't send remaining now */ if (!wsi->u.ws.rxflow_buffer) { /* a new rxflow, buffer it and warn caller */ lwsl_info("new rxflow input buffer len %d\n", @@ -1087,24 +1073,21 @@ int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi, wsi->u.ws.rxflow_pos = 0; memcpy(wsi->u.ws.rxflow_buffer, buf + n, len - n); - } else { - lwsl_info("re-using rxflow input buffer\n"); + } else /* rxflow while we were spilling prev rxflow */ - wsi->u.ws.rxflow_pos = n; - } + lwsl_info("stalling in existing rxflow buffer"); + return 1; } - m = libwebsocket_rx_sm(wsi, buf[n]); + + /* account for what we're using in rxflow buffer */ + if (wsi->u.ws.rxflow_buffer) + wsi->u.ws.rxflow_pos++; + + /* process the byte */ + m = libwebsocket_rx_sm(wsi, buf[n++]); if (m < 0) return -1; - n++; - } - - if (clear_rxflow) { - lwsl_info("flow: clearing it\n"); - free(wsi->u.ws.rxflow_buffer); - wsi->u.ws.rxflow_buffer = NULL; - context->fds[wsi->position_in_fds_table].events &= ~POLLIN; } return 0; diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index eecdd3d1..251a7022 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -227,6 +227,11 @@ enum connection_mode { LWS_CONNMODE_SERVER_LISTENER, }; +enum { + LWS_RXFLOW_ALLOW = (1 << 0), + LWS_RXFLOW_PENDING_CHANGE = (1 << 1), +}; + struct libwebsocket_protocols; struct libwebsocket; diff --git a/test-server/test-client.c b/test-server/test-client.c index b602442d..d25e31f4 100644 --- a/test-server/test-client.c +++ b/test-server/test-client.c @@ -122,7 +122,7 @@ callback_lws_mirror(struct libwebsocket_context *context, switch (reason) { case LWS_CALLBACK_CLOSED: - fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n"); + fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED mirror_lifetime=%d\n", mirror_lifetime); wsi_mirror = NULL; break; diff --git a/test-server/test-server.c b/test-server/test-server.c index ad2e9cd0..cfe34ee3 100644 --- a/test-server/test-server.c +++ b/test-server/test-server.c @@ -431,7 +431,7 @@ callback_dumb_increment(struct libwebsocket_context *context, /* lws-mirror_protocol */ -#define MAX_MESSAGE_QUEUE 128 +#define MAX_MESSAGE_QUEUE 32 struct per_session_data__lws_mirror { struct libwebsocket *wsi; @@ -461,8 +461,7 @@ callback_lws_mirror(struct libwebsocket_context *context, switch (reason) { case LWS_CALLBACK_ESTABLISHED: - lwsl_info("callback_lws_mirror: " - "LWS_CALLBACK_ESTABLISHED\n"); + lwsl_info("callback_lws_mirror: LWS_CALLBACK_ESTABLISHED\n"); pss->ringbuffer_tail = ringbuffer_head; pss->wsi = wsi; break; @@ -488,9 +487,9 @@ callback_lws_mirror(struct libwebsocket_context *context, lwsl_err("ERROR %d writing to mirror socket\n", n); return -1; } - if (n < ringbuffer[pss->ringbuffer_tail].len) { - lwsl_err("mirror partial write %d vs %d\n", n, ringbuffer[pss->ringbuffer_tail].len); - } + if (n < ringbuffer[pss->ringbuffer_tail].len) + lwsl_err("mirror partial write %d vs %d\n", + n, ringbuffer[pss->ringbuffer_tail].len); if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) pss->ringbuffer_tail = 0; @@ -507,8 +506,13 @@ callback_lws_mirror(struct libwebsocket_context *context, if (lws_send_pipe_choked(wsi)) { libwebsocket_callback_on_writable(context, wsi); - return 0; + break; } + /* + * for tests with chrome on same machine as client and + * server, this is needed to stop chrome choking + */ + usleep(1); } break; @@ -540,6 +544,7 @@ callback_lws_mirror(struct libwebsocket_context *context, choke: if (num_wsi_choked < sizeof wsi_choked / sizeof wsi_choked[0]) { + lwsl_debug("LWS_CALLBACK_RECEIVE: throttling %p\n", wsi); libwebsocket_rx_flow_control(wsi, 0); wsi_choked[num_wsi_choked++] = wsi; }