diff --git a/lib/Makefile.am b/lib/Makefile.am index 78ac33f17..441f8199a 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -36,7 +36,7 @@ else dist_libwebsockets_la_SOURCES += md5.c sha-1.c endif -libwebsockets_la_CFLAGS=-Wall -std=gnu99 -pedantic +libwebsockets_la_CFLAGS=-Wall -std=gnu99 -pedantic -g libwebsockets_la_LDFLAGS= if MINGW diff --git a/lib/extension-deflate-frame.c b/lib/extension-deflate-frame.c index 541131b00..7a8faccc1 100644 --- a/lib/extension-deflate-frame.c +++ b/lib/extension-deflate-frame.c @@ -163,7 +163,7 @@ bail: * screwed.. close the connection... we will get a * destroy callback to take care of closing nicely */ - fprintf(stderr, "zlib error inflate %d: %s", + lwsl_err("zlib error inflate %d: %s\n", n, conn->zs_in.msg); return -1; } diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index add3bd796..99ce9e84a 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -371,6 +371,9 @@ just_kill_connection: if (wsi->c_address) free(wsi->c_address); + if (wsi->rxflow_buffer) + free(wsi->rxflow_buffer); + /* lwsl_info("closing fd=%d\n", wsi->sock); */ #ifdef LWS_OPENSSL_SUPPORT @@ -660,7 +663,8 @@ notify_action: else n = LWS_CALLBACK_SERVER_WRITEABLE; - wsi->protocol->callback(context, wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0); + user_callback_handle_rxflow(wsi->protocol->callback, context, + wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0); return 0; } @@ -902,7 +906,7 @@ libwebsocket_service_fd(struct libwebsocket_context *context, LWS_CLOSE_STATUS_NOSTATUS); else if (wsi->state == WSI_STATE_HTTP && wsi->protocol->callback) - if (wsi->protocol->callback(context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space, + if (user_callback_handle_rxflow(wsi->protocol->callback, context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space, wsi->filepath, wsi->filepos)) libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS); break; @@ -1117,7 +1121,7 @@ bail_prox_listener: /* broadcast it to this connection */ - new_wsi->protocol->callback(context, new_wsi, + user_callback_handle_rxflow(new_wsi->protocol->callback, context, new_wsi, LWS_CALLBACK_BROADCAST, new_wsi->user_space, buf + LWS_SEND_BUFFER_PRE_PADDING, len); @@ -1531,6 +1535,51 @@ libwebsocket_get_socket_fd(struct libwebsocket *wsi) return wsi->sock; } + +int +_libwebsocket_rx_flow_control(struct libwebsocket *wsi) +{ + struct libwebsocket_context *context = wsi->protocol->owning_server; + int n; + + if (!(wsi->rxflow_change_to & 2)) + return 0; + + wsi->rxflow_change_to &= ~2; + + lwsl_info("rxflow: wsi %p change_to %d\n", wsi, wsi->rxflow_change_to); + + /* if we're letting it come again, did we interrupt anything? */ + if ((wsi->rxflow_change_to & 1) && wsi->rxflow_buffer) { + n = libwebsocket_interpret_incoming_packet(wsi, NULL, 0); + if (n < 0) { + libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS); + return -1; + } + if (n) + /* oh he stuck again, do nothing */ + return 0; + } + + if (wsi->rxflow_change_to & 1) + context->fds[wsi->position_in_fds_table].events |= POLLIN; + else + context->fds[wsi->position_in_fds_table].events &= ~POLLIN; + + if (wsi->rxflow_change_to & 1) + /* external POLL support via protocol 0 */ + context->protocols[0].callback(context, wsi, + LWS_CALLBACK_SET_MODE_POLL_FD, + (void *)(long)wsi->sock, NULL, POLLIN); + else + /* external POLL support via protocol 0 */ + context->protocols[0].callback(context, wsi, + LWS_CALLBACK_CLEAR_MODE_POLL_FD, + (void *)(long)wsi->sock, NULL, POLLIN); + + return 1; +} + /** * libwebsocket_rx_flow_control() - Enable and disable socket servicing for * receieved packets. @@ -1545,36 +1594,12 @@ libwebsocket_get_socket_fd(struct libwebsocket *wsi) int libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable) { - struct libwebsocket_context *context = wsi->protocol->owning_server; - int n; + wsi->rxflow_change_to = 2 | !!enable; - for (n = 0; n < context->fds_count; n++) - if (context->fds[n].fd == wsi->sock) { - if (enable) - context->fds[n].events |= POLLIN; - else - context->fds[n].events &= ~POLLIN; - - return 0; - } - - if (enable) - /* external POLL support via protocol 0 */ - context->protocols[0].callback(context, wsi, - LWS_CALLBACK_SET_MODE_POLL_FD, - (void *)(long)wsi->sock, NULL, POLLIN); - else - /* external POLL support via protocol 0 */ - context->protocols[0].callback(context, wsi, - LWS_CALLBACK_CLEAR_MODE_POLL_FD, - (void *)(long)wsi->sock, NULL, POLLIN); - -#if 0 - lwsl_err("libwebsocket_rx_flow_control unable to find socket\n"); -#endif - return 1; + return 0; } + /** * libwebsocket_canonical_hostname() - returns this host's hostname * @@ -1630,6 +1655,23 @@ OpenSSL_verify_callback(int preverify_ok, X509_STORE_CTX *x509_ctx) } #endif +int user_callback_handle_rxflow(callback_function callback_function, + struct libwebsocket_context * context, + struct libwebsocket *wsi, + enum libwebsocket_callback_reasons reason, void *user, + void *in, size_t len) +{ + int n; + + n = callback_function(context, wsi, reason, user, in, len); + if (n < 0) + return n; + + _libwebsocket_rx_flow_control(wsi); + + return 0; +} + /** * libwebsocket_create_context() - Create the websocket handler @@ -2366,7 +2408,8 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol, if (wsi->protocol != protocol) continue; - wsi->protocol->callback(context, wsi, + user_callback_handle_rxflow(wsi->protocol->callback, + context, wsi, LWS_CALLBACK_BROADCAST, wsi->user_space, buf, len); diff --git a/lib/parsers.c b/lib/parsers.c index 6d87907e3..853b1ec7a 100644 --- a/lib/parsers.c +++ b/lib/parsers.c @@ -664,6 +664,7 @@ handle_first: break; case LWS_RXPS_EAT_UNTIL_76_FF: + if (c == 0xff) { wsi->lws_rx_parse_state = LWS_RXPS_NEW; goto issue; @@ -675,7 +676,8 @@ handle_first: break; issue: if (wsi->protocol->callback) - wsi->protocol->callback(wsi->protocol->owning_server, + user_callback_handle_rxflow(wsi->protocol->callback, + wsi->protocol->owning_server, wsi, LWS_CALLBACK_RECEIVE, wsi->user_space, &wsi->rx_user_buffer[LWS_SEND_BUFFER_PRE_PADDING], @@ -865,7 +867,8 @@ spill: eff_buf.token[eff_buf.token_len] = '\0'; if (wsi->protocol->callback) - wsi->protocol->callback(wsi->protocol->owning_server, + user_callback_handle_rxflow(wsi->protocol->callback, + wsi->protocol->owning_server, wsi, LWS_CALLBACK_RECEIVE, wsi->user_space, eff_buf.token, @@ -895,18 +898,58 @@ int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi, unsigned char *buf, size_t len) { size_t n; + int m; + int clear_rxflow = !!wsi->rxflow_buffer; + struct libwebsocket_context *context = wsi->protocol->owning_server; #ifdef DEBUG lwsl_parser("received %d byte packet\n", (int)len); lwsl_hexdump(buf, len); #endif + if (buf && wsi->rxflow_buffer) + lwsl_err("!!!! libwebsocket_interpret_incoming_packet: was pending rxflow, data loss\n"); + /* let the rx protocol state machine have as much as it needs */ n = 0; - while (n < len) - if (libwebsocket_rx_sm(wsi, buf[n++]) < 0) + if (!buf) { + lwsl_info("dumping stored rxflow buffer len %d pos=%d\n", wsi->rxflow_len, wsi->rxflow_pos); + buf = wsi->rxflow_buffer; + n = wsi->rxflow_pos; + len = wsi->rxflow_len; + /* let's pretend he's already allowing input */ + context->fds[wsi->position_in_fds_table].events |= POLLIN; + } + + while (n < len) { + if (!(context->fds[wsi->position_in_fds_table].events & POLLIN)) { + /* his RX is flowcontrolled */ + if (!wsi->rxflow_buffer) { /* a new rxflow in effect, buffer it and warn caller */ + lwsl_info("new rxflow input buffer len %d\n", len - n); + wsi->rxflow_buffer = (unsigned char *)malloc(len - n); + wsi->rxflow_len = len - n; + wsi->rxflow_pos = 0; + memcpy(wsi->rxflow_buffer, buf + n, len - n); + } else { + lwsl_info("re-using rxflow input buffer\n"); + /* rxflow while we were spilling previous rxflow buffer */ + wsi->rxflow_pos = n; + } + return 1; + } + m = libwebsocket_rx_sm(wsi, buf[n]); + if (m < 0) return -1; + n++; + } + + if (clear_rxflow) { + lwsl_info("flow: clearing it\n"); + free(wsi->rxflow_buffer); + wsi->rxflow_buffer = NULL; + context->fds[wsi->position_in_fds_table].events &= ~POLLIN; + } return 0; } diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 00270065a..ce5d2f401 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -345,6 +345,10 @@ struct libwebsocket { int sock; int position_in_fds_table; + unsigned char *rxflow_buffer; + int rxflow_len; + int rxflow_pos; + int rxflow_change_to; enum lws_rx_parse_state lws_rx_parse_state; char extension_data_pending; @@ -480,6 +484,15 @@ extern int lws_issue_raw_ext_access(struct libwebsocket *wsi, unsigned char *buf, size_t len); +extern int +_libwebsocket_rx_flow_control(struct libwebsocket *wsi); + +extern int +user_callback_handle_rxflow(callback_function, struct libwebsocket_context * context, + struct libwebsocket *wsi, + enum libwebsocket_callback_reasons reason, void *user, + void *in, size_t len); + #ifndef LWS_OPENSSL_SUPPORT unsigned char * diff --git a/test-server/test-server.c b/test-server/test-server.c index 857cae2d4..9487d80e1 100644 --- a/test-server/test-server.c +++ b/test-server/test-server.c @@ -334,6 +334,8 @@ struct a_message { static struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; static int ringbuffer_head; +static struct libwebsocket *wsi_choked[20]; +static int num_wsi_choked; static int callback_lws_mirror(struct libwebsocket_context *context, @@ -365,7 +367,7 @@ callback_lws_mirror(struct libwebsocket_context *context, LWS_WRITE_TEXT); if (n < 0) { fprintf(stderr, "ERROR %d writing to socket\n", n); - exit(1); + return 1; } if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) @@ -373,9 +375,14 @@ callback_lws_mirror(struct libwebsocket_context *context, else pss->ringbuffer_tail++; - if (((ringbuffer_head - pss->ringbuffer_tail) % - MAX_MESSAGE_QUEUE) < (MAX_MESSAGE_QUEUE - 15)) - libwebsocket_rx_flow_control(wsi, 1); + if (((ringbuffer_head - pss->ringbuffer_tail) & + (MAX_MESSAGE_QUEUE - 1)) < (MAX_MESSAGE_QUEUE - 15)) { + for (n = 0; n < num_wsi_choked; n++) + libwebsocket_rx_flow_control(wsi_choked[n], 1); + num_wsi_choked = 0; + } + +// fprintf(stderr, "tx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)); libwebsocket_callback_on_writable(context, wsi); @@ -390,6 +397,12 @@ callback_lws_mirror(struct libwebsocket_context *context, case LWS_CALLBACK_RECEIVE: + if (((ringbuffer_head - pss->ringbuffer_tail) & + (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) { + fprintf(stderr, "dropping!\n"); + goto choke; + } + if (ringbuffer[ringbuffer_head].payload) free(ringbuffer[ringbuffer_head].payload); @@ -404,13 +417,22 @@ callback_lws_mirror(struct libwebsocket_context *context, else ringbuffer_head++; - if (((ringbuffer_head - pss->ringbuffer_tail) % - MAX_MESSAGE_QUEUE) > (MAX_MESSAGE_QUEUE - 10)) - libwebsocket_rx_flow_control(wsi, 0); + if (((ringbuffer_head - pss->ringbuffer_tail) & + (MAX_MESSAGE_QUEUE - 1)) < (MAX_MESSAGE_QUEUE - 10)) + goto done; +choke: + if (num_wsi_choked < sizeof wsi_choked / sizeof wsi_choked[0]) { + libwebsocket_rx_flow_control(wsi, 0); + wsi_choked[num_wsi_choked++] = wsi; + } + +// fprintf(stderr, "rx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)); +done: libwebsocket_callback_on_writable_all_protocol( libwebsockets_get_protocol(wsi)); break; + /* * this just demonstrates how to use the protocol filter. If you won't * study and reject connections based on header content, you don't need