From 706961dbb5b962d24a3e6e26d8053976b14bb026 Mon Sep 17 00:00:00 2001 From: Andy Green Date: Thu, 17 Jan 2013 16:50:35 +0800 Subject: [PATCH] solve flowcontrol problems Problems with rx flow control implementation were the underlying cause of the connection stalling issue that was covered up with the udelay() patch that was removed recently. This get rx flow control working properly and corrects problems with fifo management in the test server mirror protocol code too. The rxfow control api has been changed to just set a flag, so it's very cheap to call from user code. After the callbacks that might use the rxflow control api the flag is checked and any pending actions done. rx flow control now stops any rx packet coming immediately, with compessed connections "just what was left in the pipe" might be hundreds of KBytes. To implement that the current packet being decoded is copied into a malloc'd buffer by the rx processing code now. When rxflow is allows to come again, the buffer is drained and freed before any new packet content is accepted. Signed-off-by: Andy Green --- lib/Makefile.am | 2 +- lib/extension-deflate-frame.c | 2 +- lib/libwebsockets.c | 105 ++++++++++++++++++++++++---------- lib/parsers.c | 51 +++++++++++++++-- lib/private-libwebsockets.h | 13 +++++ test-server/test-server.c | 36 +++++++++--- 6 files changed, 165 insertions(+), 44 deletions(-) diff --git a/lib/Makefile.am b/lib/Makefile.am index 78ac33f17..441f8199a 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -36,7 +36,7 @@ else dist_libwebsockets_la_SOURCES += md5.c sha-1.c endif -libwebsockets_la_CFLAGS=-Wall -std=gnu99 -pedantic +libwebsockets_la_CFLAGS=-Wall -std=gnu99 -pedantic -g libwebsockets_la_LDFLAGS= if MINGW diff --git a/lib/extension-deflate-frame.c b/lib/extension-deflate-frame.c index 541131b00..7a8faccc1 100644 --- a/lib/extension-deflate-frame.c +++ b/lib/extension-deflate-frame.c @@ -163,7 +163,7 @@ bail: * screwed.. close the connection... we will get a * destroy callback to take care of closing nicely */ - fprintf(stderr, "zlib error inflate %d: %s", + lwsl_err("zlib error inflate %d: %s\n", n, conn->zs_in.msg); return -1; } diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index add3bd796..99ce9e84a 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -371,6 +371,9 @@ just_kill_connection: if (wsi->c_address) free(wsi->c_address); + if (wsi->rxflow_buffer) + free(wsi->rxflow_buffer); + /* lwsl_info("closing fd=%d\n", wsi->sock); */ #ifdef LWS_OPENSSL_SUPPORT @@ -660,7 +663,8 @@ notify_action: else n = LWS_CALLBACK_SERVER_WRITEABLE; - wsi->protocol->callback(context, wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0); + user_callback_handle_rxflow(wsi->protocol->callback, context, + wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0); return 0; } @@ -902,7 +906,7 @@ libwebsocket_service_fd(struct libwebsocket_context *context, LWS_CLOSE_STATUS_NOSTATUS); else if (wsi->state == WSI_STATE_HTTP && wsi->protocol->callback) - if (wsi->protocol->callback(context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space, + if (user_callback_handle_rxflow(wsi->protocol->callback, context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space, wsi->filepath, wsi->filepos)) libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS); break; @@ -1117,7 +1121,7 @@ bail_prox_listener: /* broadcast it to this connection */ - new_wsi->protocol->callback(context, new_wsi, + user_callback_handle_rxflow(new_wsi->protocol->callback, context, new_wsi, LWS_CALLBACK_BROADCAST, new_wsi->user_space, buf + LWS_SEND_BUFFER_PRE_PADDING, len); @@ -1531,6 +1535,51 @@ libwebsocket_get_socket_fd(struct libwebsocket *wsi) return wsi->sock; } + +int +_libwebsocket_rx_flow_control(struct libwebsocket *wsi) +{ + struct libwebsocket_context *context = wsi->protocol->owning_server; + int n; + + if (!(wsi->rxflow_change_to & 2)) + return 0; + + wsi->rxflow_change_to &= ~2; + + lwsl_info("rxflow: wsi %p change_to %d\n", wsi, wsi->rxflow_change_to); + + /* if we're letting it come again, did we interrupt anything? */ + if ((wsi->rxflow_change_to & 1) && wsi->rxflow_buffer) { + n = libwebsocket_interpret_incoming_packet(wsi, NULL, 0); + if (n < 0) { + libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS); + return -1; + } + if (n) + /* oh he stuck again, do nothing */ + return 0; + } + + if (wsi->rxflow_change_to & 1) + context->fds[wsi->position_in_fds_table].events |= POLLIN; + else + context->fds[wsi->position_in_fds_table].events &= ~POLLIN; + + if (wsi->rxflow_change_to & 1) + /* external POLL support via protocol 0 */ + context->protocols[0].callback(context, wsi, + LWS_CALLBACK_SET_MODE_POLL_FD, + (void *)(long)wsi->sock, NULL, POLLIN); + else + /* external POLL support via protocol 0 */ + context->protocols[0].callback(context, wsi, + LWS_CALLBACK_CLEAR_MODE_POLL_FD, + (void *)(long)wsi->sock, NULL, POLLIN); + + return 1; +} + /** * libwebsocket_rx_flow_control() - Enable and disable socket servicing for * receieved packets. @@ -1545,36 +1594,12 @@ libwebsocket_get_socket_fd(struct libwebsocket *wsi) int libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable) { - struct libwebsocket_context *context = wsi->protocol->owning_server; - int n; + wsi->rxflow_change_to = 2 | !!enable; - for (n = 0; n < context->fds_count; n++) - if (context->fds[n].fd == wsi->sock) { - if (enable) - context->fds[n].events |= POLLIN; - else - context->fds[n].events &= ~POLLIN; - - return 0; - } - - if (enable) - /* external POLL support via protocol 0 */ - context->protocols[0].callback(context, wsi, - LWS_CALLBACK_SET_MODE_POLL_FD, - (void *)(long)wsi->sock, NULL, POLLIN); - else - /* external POLL support via protocol 0 */ - context->protocols[0].callback(context, wsi, - LWS_CALLBACK_CLEAR_MODE_POLL_FD, - (void *)(long)wsi->sock, NULL, POLLIN); - -#if 0 - lwsl_err("libwebsocket_rx_flow_control unable to find socket\n"); -#endif - return 1; + return 0; } + /** * libwebsocket_canonical_hostname() - returns this host's hostname * @@ -1630,6 +1655,23 @@ OpenSSL_verify_callback(int preverify_ok, X509_STORE_CTX *x509_ctx) } #endif +int user_callback_handle_rxflow(callback_function callback_function, + struct libwebsocket_context * context, + struct libwebsocket *wsi, + enum libwebsocket_callback_reasons reason, void *user, + void *in, size_t len) +{ + int n; + + n = callback_function(context, wsi, reason, user, in, len); + if (n < 0) + return n; + + _libwebsocket_rx_flow_control(wsi); + + return 0; +} + /** * libwebsocket_create_context() - Create the websocket handler @@ -2366,7 +2408,8 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol, if (wsi->protocol != protocol) continue; - wsi->protocol->callback(context, wsi, + user_callback_handle_rxflow(wsi->protocol->callback, + context, wsi, LWS_CALLBACK_BROADCAST, wsi->user_space, buf, len); diff --git a/lib/parsers.c b/lib/parsers.c index 6d87907e3..853b1ec7a 100644 --- a/lib/parsers.c +++ b/lib/parsers.c @@ -664,6 +664,7 @@ handle_first: break; case LWS_RXPS_EAT_UNTIL_76_FF: + if (c == 0xff) { wsi->lws_rx_parse_state = LWS_RXPS_NEW; goto issue; @@ -675,7 +676,8 @@ handle_first: break; issue: if (wsi->protocol->callback) - wsi->protocol->callback(wsi->protocol->owning_server, + user_callback_handle_rxflow(wsi->protocol->callback, + wsi->protocol->owning_server, wsi, LWS_CALLBACK_RECEIVE, wsi->user_space, &wsi->rx_user_buffer[LWS_SEND_BUFFER_PRE_PADDING], @@ -865,7 +867,8 @@ spill: eff_buf.token[eff_buf.token_len] = '\0'; if (wsi->protocol->callback) - wsi->protocol->callback(wsi->protocol->owning_server, + user_callback_handle_rxflow(wsi->protocol->callback, + wsi->protocol->owning_server, wsi, LWS_CALLBACK_RECEIVE, wsi->user_space, eff_buf.token, @@ -895,18 +898,58 @@ int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi, unsigned char *buf, size_t len) { size_t n; + int m; + int clear_rxflow = !!wsi->rxflow_buffer; + struct libwebsocket_context *context = wsi->protocol->owning_server; #ifdef DEBUG lwsl_parser("received %d byte packet\n", (int)len); lwsl_hexdump(buf, len); #endif + if (buf && wsi->rxflow_buffer) + lwsl_err("!!!! libwebsocket_interpret_incoming_packet: was pending rxflow, data loss\n"); + /* let the rx protocol state machine have as much as it needs */ n = 0; - while (n < len) - if (libwebsocket_rx_sm(wsi, buf[n++]) < 0) + if (!buf) { + lwsl_info("dumping stored rxflow buffer len %d pos=%d\n", wsi->rxflow_len, wsi->rxflow_pos); + buf = wsi->rxflow_buffer; + n = wsi->rxflow_pos; + len = wsi->rxflow_len; + /* let's pretend he's already allowing input */ + context->fds[wsi->position_in_fds_table].events |= POLLIN; + } + + while (n < len) { + if (!(context->fds[wsi->position_in_fds_table].events & POLLIN)) { + /* his RX is flowcontrolled */ + if (!wsi->rxflow_buffer) { /* a new rxflow in effect, buffer it and warn caller */ + lwsl_info("new rxflow input buffer len %d\n", len - n); + wsi->rxflow_buffer = (unsigned char *)malloc(len - n); + wsi->rxflow_len = len - n; + wsi->rxflow_pos = 0; + memcpy(wsi->rxflow_buffer, buf + n, len - n); + } else { + lwsl_info("re-using rxflow input buffer\n"); + /* rxflow while we were spilling previous rxflow buffer */ + wsi->rxflow_pos = n; + } + return 1; + } + m = libwebsocket_rx_sm(wsi, buf[n]); + if (m < 0) return -1; + n++; + } + + if (clear_rxflow) { + lwsl_info("flow: clearing it\n"); + free(wsi->rxflow_buffer); + wsi->rxflow_buffer = NULL; + context->fds[wsi->position_in_fds_table].events &= ~POLLIN; + } return 0; } diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 00270065a..ce5d2f401 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -345,6 +345,10 @@ struct libwebsocket { int sock; int position_in_fds_table; + unsigned char *rxflow_buffer; + int rxflow_len; + int rxflow_pos; + int rxflow_change_to; enum lws_rx_parse_state lws_rx_parse_state; char extension_data_pending; @@ -480,6 +484,15 @@ extern int lws_issue_raw_ext_access(struct libwebsocket *wsi, unsigned char *buf, size_t len); +extern int +_libwebsocket_rx_flow_control(struct libwebsocket *wsi); + +extern int +user_callback_handle_rxflow(callback_function, struct libwebsocket_context * context, + struct libwebsocket *wsi, + enum libwebsocket_callback_reasons reason, void *user, + void *in, size_t len); + #ifndef LWS_OPENSSL_SUPPORT unsigned char * diff --git a/test-server/test-server.c b/test-server/test-server.c index 857cae2d4..9487d80e1 100644 --- a/test-server/test-server.c +++ b/test-server/test-server.c @@ -334,6 +334,8 @@ struct a_message { static struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; static int ringbuffer_head; +static struct libwebsocket *wsi_choked[20]; +static int num_wsi_choked; static int callback_lws_mirror(struct libwebsocket_context *context, @@ -365,7 +367,7 @@ callback_lws_mirror(struct libwebsocket_context *context, LWS_WRITE_TEXT); if (n < 0) { fprintf(stderr, "ERROR %d writing to socket\n", n); - exit(1); + return 1; } if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) @@ -373,9 +375,14 @@ callback_lws_mirror(struct libwebsocket_context *context, else pss->ringbuffer_tail++; - if (((ringbuffer_head - pss->ringbuffer_tail) % - MAX_MESSAGE_QUEUE) < (MAX_MESSAGE_QUEUE - 15)) - libwebsocket_rx_flow_control(wsi, 1); + if (((ringbuffer_head - pss->ringbuffer_tail) & + (MAX_MESSAGE_QUEUE - 1)) < (MAX_MESSAGE_QUEUE - 15)) { + for (n = 0; n < num_wsi_choked; n++) + libwebsocket_rx_flow_control(wsi_choked[n], 1); + num_wsi_choked = 0; + } + +// fprintf(stderr, "tx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)); libwebsocket_callback_on_writable(context, wsi); @@ -390,6 +397,12 @@ callback_lws_mirror(struct libwebsocket_context *context, case LWS_CALLBACK_RECEIVE: + if (((ringbuffer_head - pss->ringbuffer_tail) & + (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) { + fprintf(stderr, "dropping!\n"); + goto choke; + } + if (ringbuffer[ringbuffer_head].payload) free(ringbuffer[ringbuffer_head].payload); @@ -404,13 +417,22 @@ callback_lws_mirror(struct libwebsocket_context *context, else ringbuffer_head++; - if (((ringbuffer_head - pss->ringbuffer_tail) % - MAX_MESSAGE_QUEUE) > (MAX_MESSAGE_QUEUE - 10)) - libwebsocket_rx_flow_control(wsi, 0); + if (((ringbuffer_head - pss->ringbuffer_tail) & + (MAX_MESSAGE_QUEUE - 1)) < (MAX_MESSAGE_QUEUE - 10)) + goto done; +choke: + if (num_wsi_choked < sizeof wsi_choked / sizeof wsi_choked[0]) { + libwebsocket_rx_flow_control(wsi, 0); + wsi_choked[num_wsi_choked++] = wsi; + } + +// fprintf(stderr, "rx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)); +done: libwebsocket_callback_on_writable_all_protocol( libwebsockets_get_protocol(wsi)); break; + /* * this just demonstrates how to use the protocol filter. If you won't * study and reject connections based on header content, you don't need