diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index a8f2cf1e..ec71d799 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -413,6 +413,111 @@ int lws_send_pipe_choked(struct libwebsocket *wsi) return 0; } +static int +lws_handle_POLLOUT_event(struct libwebsocket_context *context, + struct libwebsocket *wsi, struct pollfd *pollfd) +{ + struct lws_tokens eff_buf; + int n; + int ret; + int m; + + if (!wsi->extension_data_pending) + goto user_service; + + /* + * check in on the active extensions, see if they + * had pending stuff to spill... they need to get the + * first look-in otherwise sequence will be disordered + * + * NULL, zero-length eff_buf means just spill pending + */ + + ret = 1; + while (ret == 1) { + + /* default to nobody has more to spill */ + + ret = 0; + eff_buf.token = NULL; + eff_buf.token_len = 0; + + /* give every extension a chance to spill */ + + for (n = 0; n < wsi->count_active_extensions; n++) { + m = wsi->active_extensions[n]->callback( + wsi->protocol->owning_server, wsi, + LWS_EXT_CALLBACK_PACKET_TX_PRESEND, + wsi->active_extensions_user[n], &eff_buf, 0); + if (m < 0) { + fprintf(stderr, "extension reports fatal error\n"); + return -1; + } + if (m) + /* + * at least one extension told us he has more + * to spill, so we will go around again after + */ + ret = 1; + } + + /* assuming they gave us something to send, send it */ + + if (eff_buf.token_len) { + if (lws_issue_raw(wsi, (unsigned char *)eff_buf.token, + eff_buf.token_len)) + return -1; + } else + continue; + + /* no extension has more to spill */ + + if (!ret) + continue; + + /* + * There's more to spill from an extension, but we just sent + * something... did that leave the pipe choked? + */ + + if (!lws_send_pipe_choked(wsi)) + /* no we could add more */ + continue; + + fprintf(stderr, "choked in POLLOUT service\n"); + + /* + * Yes, he's choked. Leave the POLLOUT masked on so we will + * come back here when he is unchoked. Don't call the user + * callback to enforce ordering of spilling, he'll get called + * when we come back here and there's nothing more to spill. + */ + + return 0; + } + + wsi->extension_data_pending = 0; + +user_service: + /* one shot */ + + pollfd->events &= ~POLLOUT; + + /* external POLL support via protocol 0 */ + context->protocols[0].callback(context, wsi, + LWS_CALLBACK_CLEAR_MODE_POLL_FD, + (void *)(long)wsi->sock, NULL, POLLOUT); + + wsi->protocol->callback(context, wsi, + LWS_CALLBACK_CLIENT_WRITEABLE, + wsi->user_space, + NULL, 0); + + return 0; +} + + + /** * libwebsocket_service_fd() - Service polled socket with something waiting * @context: Websocket context @@ -428,7 +533,7 @@ int libwebsocket_service_fd(struct libwebsocket_context *context, struct pollfd *pollfd) { - unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + MAX_BROADCAST_PAYLOAD + + unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 1 + MAX_BROADCAST_PAYLOAD + LWS_SEND_BUFFER_POST_PADDING]; struct libwebsocket *wsi; struct libwebsocket *new_wsi; @@ -712,24 +817,17 @@ libwebsocket_service_fd(struct libwebsocket_context *context, return 1; } - /* the guy requested a callback when it was OK to write */ + /* + * either extension code with stuff to spill, or the user code, + * requested a callback when it was OK to write + */ - if (pollfd->revents & POLLOUT) { - - /* one shot */ - - pollfd->events &= ~POLLOUT; - - /* external POLL support via protocol 0 */ - context->protocols[0].callback(context, wsi, - LWS_CALLBACK_CLEAR_MODE_POLL_FD, - (void *)(long)wsi->sock, NULL, POLLOUT); - - wsi->protocol->callback(context, wsi, - LWS_CALLBACK_CLIENT_WRITEABLE, - wsi->user_space, - NULL, 0); - } + if (pollfd->revents & POLLOUT) + if (lws_handle_POLLOUT_event(context, wsi, pollfd) < 0) { + libwebsocket_close_and_free_session(context, wsi, + LWS_CLOSE_STATUS_NORMAL); + return 1; + } /* any incoming data ready? */ @@ -1397,20 +1495,13 @@ bail2: /* the guy requested a callback when it was OK to write */ - if (pollfd->revents & POLLOUT) { + if (pollfd->revents & POLLOUT) + if (lws_handle_POLLOUT_event(context, wsi, pollfd) < 0) { + libwebsocket_close_and_free_session(context, wsi, + LWS_CLOSE_STATUS_NORMAL); + return 1; + } - pollfd->events &= ~POLLOUT; - - /* external POLL support via protocol 0 */ - context->protocols[0].callback(context, wsi, - LWS_CALLBACK_CLEAR_MODE_POLL_FD, - (void *)(long)wsi->sock, NULL, POLLOUT); - - wsi->protocol->callback(context, wsi, - LWS_CALLBACK_CLIENT_WRITEABLE, - wsi->user_space, - NULL, 0); - } /* any incoming data ready? */ diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index a9627ad2..b759590b 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -75,6 +75,7 @@ enum libwebsocket_extension_callback_reasons { LWS_EXT_CALLBACK_CONSTRUCT, LWS_EXT_CALLBACK_DESTROY, LWS_EXT_CALLBACK_PACKET_RX_PREPARSE, + LWS_EXT_CALLBACK_PACKET_TX_PRESEND, }; enum libwebsocket_write_protocol { diff --git a/lib/parsers.c b/lib/parsers.c index 37a4ae6e..e4f17a40 100644 --- a/lib/parsers.c +++ b/lib/parsers.c @@ -1109,9 +1109,12 @@ int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf, size_t len, enum libwebsocket_write_protocol protocol) { int n; + int m; int pre = 0; int post = 0; unsigned int shift = 7; + struct lws_tokens eff_buf; + int ret; if (len == 0 && protocol != LWS_WRITE_CLOSE) { fprintf(stderr, "zero length libwebsocket_write attempt\n"); @@ -1326,10 +1329,95 @@ int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf, send_raw: - if (lws_issue_raw(wsi, buf - pre, len + pre + post)) - return -1; + if (protocol == LWS_WRITE_HTTP) { + if (lws_issue_raw(wsi, (unsigned char *)buf - pre, + len + pre + post)) + return -1; - debug("written %d bytes to client\n", (int)len); + return 0; + } + + /* + * give any active extensions a chance to munge the buffer + * before send. We pass in a pointer to an lws_tokens struct + * prepared with the default buffer and content length that's in + * there. Rather than rewrite the default buffer, extensions + * that expect to grow the buffer can adapt .token to + * point to their own per-connection buffer in the extension + * user allocation. By default with no extensions or no + * extension callback handling, just the normal input buffer is + * used then so it is efficient. + * + * callback returns 1 in case it wants to spill more buffers + */ + + eff_buf.token = (char *)buf - pre; + eff_buf.token_len = len + pre + post; + + /* + * while we have original buf to spill ourselves, or extensions report + * more in their pipeline + */ + + ret = 1; + while (ret == 1) { + + /* default to nobody has more to spill */ + + ret = 0; + + /* show every extension the new incoming data */ + + for (n = 0; n < wsi->count_active_extensions; n++) { + m = wsi->active_extensions[n]->callback( + wsi->protocol->owning_server, wsi, + LWS_EXT_CALLBACK_PACKET_TX_PRESEND, + wsi->active_extensions_user[n], &eff_buf, 0); + if (m < 0) { + fprintf(stderr, "Extension reports fatal error\n"); + return -1; + } + if (m) + /* + * at least one extension told us he has more + * to spill, so we will go around again after + */ + ret = 1; + } + + /* assuming they left us something to send, send it */ + + if (eff_buf.token_len) + if (lws_issue_raw(wsi, (unsigned char *)eff_buf.token, + eff_buf.token_len)) + return -1; + + /* we used up what we had */ + + eff_buf.token = NULL; + eff_buf.token_len = 0; + + /* + * Did that leave the pipe choked? + */ + + if (!lws_send_pipe_choked(wsi)) + /* no we could add more */ + continue; + + fprintf(stderr, "choked\n"); + + /* + * Yes, he's choked. Don't spill the rest now get a callback + * when he is ready to send and take care of it there + */ + libwebsocket_callback_on_writable( + wsi->protocol->owning_server, wsi); + wsi->extension_data_pending = 1; + ret = 0; + } + + debug("written %d bytes to client\n", eff_buf.token_len); return 0; } diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 9866871d..fa61f497 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -248,6 +248,7 @@ struct libwebsocket { int sock; enum lws_rx_parse_state lws_rx_parse_state; + char extension_data_pending; /* 04 protocol specific */