diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index ea8a8ff3..7e7a7b22 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -370,6 +370,11 @@ just_kill_connection: free(wsi->u.ws.rxflow_buffer); wsi->u.ws.rxflow_buffer = NULL; } + if (wsi->u.ws.truncated_send_malloc) { + /* not going to be completed... nuke it */ + free(wsi->u.ws.truncated_send_malloc); + wsi->u.ws.truncated_send_malloc = NULL; + } } /* tell the user it's all over for this guy */ @@ -658,6 +663,16 @@ lws_handle_POLLOUT_event(struct libwebsocket_context *context, int m; int handled = 0; + /* pending truncated sends have uber priority */ + + if (wsi->u.ws.truncated_send_malloc) { + lws_issue_raw(wsi, wsi->u.ws.truncated_send_malloc + + wsi->u.ws.truncated_send_offset, + wsi->u.ws.truncated_send_len); + /* leave POLLOUT active either way */ + return 0; + } + for (n = 0; n < wsi->count_active_extensions; n++) { if (!wsi->active_extensions[n]->callback) continue; diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index 2d62107f..d8e4a273 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -709,6 +709,14 @@ typedef int (extension_callback_function)(struct libwebsocket_context *context, * libwebsockets_remaining_packet_payload(). Notice that you * just talk about frame size here, the LWS_SEND_BUFFER_PRE_PADDING * and post-padding are automatically also allocated on top. + * @no_buffer_all_partial_tx: Leave at zero if you want the library to take + * care of all partial tx for you. It's useful if you only have + * small tx packets and the chance of any truncated send is small + * enough any additional malloc / buffering overhead is less + * painful than writing the code to deal with partial sends. For + * protocols where you stream big blocks, set to nonzero and use + * the return value from libwebsocket_write() to manage how much + * got send yourself. * @owning_server: the server init call fills in this opaque pointer when * registering this protocol with the server. * @protocol_index: which protocol we are starting from zero @@ -723,6 +731,7 @@ struct libwebsocket_protocols { callback_function *callback; size_t per_session_data_size; size_t rx_buffer_size; + int no_buffer_all_partial_tx; /* * below are filled in on server init and can be left uninitialized, diff --git a/lib/output.c b/lib/output.c index 224a0f27..d1bd9394 100644 --- a/lib/output.c +++ b/lib/output.c @@ -91,7 +91,7 @@ LWS_VISIBLE void lwsl_hexdump(void *vbuf, size_t len) #endif /* - * notice this returns number of bytes sent, or -1 + * notice this returns number of bytes consumed, or -1 */ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len) @@ -121,12 +121,13 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len) } if (m) /* handled */ { /* lwsl_ext("ext sent it\n"); */ - return m; + n = m; + goto handle_truncated_send; } } #endif - if (!wsi->sock) - lwsl_warn("** error 0 sock but expected to send\n"); + if (wsi->sock < 0) + lwsl_warn("** error invalid sock but expected to send\n"); /* * nope, send it on the socket directly @@ -157,6 +158,63 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len) #ifdef LWS_OPENSSL_SUPPORT } #endif + +handle_truncated_send: + + /* + * already handling a truncated send? + */ + if (wsi->u.ws.truncated_send_malloc) { + lwsl_info("***** partial send moved on by %d (vs %d)\n", n, len); + wsi->u.ws.truncated_send_offset += n; + wsi->u.ws.truncated_send_len -= n; + + if (!wsi->u.ws.truncated_send_len) { + lwsl_info("***** partial send completed\n"); + /* done with it */ + free(wsi->u.ws.truncated_send_malloc); + wsi->u.ws.truncated_send_malloc = NULL; + } else + libwebsocket_callback_on_writable( + wsi->protocol->owning_server, wsi); + + return n; + } + + if (n < len) { + if (wsi->u.ws.clean_buffer) { + /* + * This buffer unaffected by extension rewriting. + * It means the user code is expected to deal with + * partial sends. (lws knows the header was already + * sent, so on next send will just resume sending + * payload) + */ + } + + /* + * Newly truncated send. Buffer the remainder (it will get + * first priority next time the socket is writable) + */ + lwsl_info("***** new partial send %d sent %d accepted\n", len, n); + + wsi->u.ws.truncated_send_malloc = malloc(len - n); + if (!wsi->u.ws.truncated_send_malloc) { + lwsl_err("truncated send: unable to malloc %d\n", + len - n); + return -1; + } + + wsi->u.ws.truncated_send_offset = 0; + wsi->u.ws.truncated_send_len = len - n; + memcpy(wsi->u.ws.truncated_send_malloc, buf + n, len - n); + + libwebsocket_callback_on_writable( + wsi->protocol->owning_server, wsi); + + return len; + } + return n; } @@ -212,6 +270,9 @@ lws_issue_raw_ext_access(struct libwebsocket *wsi, ret = 1; } + if ((char *)buf != eff_buf.token) + wsi->u.ws.clean_buffer = 0; /* extension recreated it: we need to buffer this if not all sent */ + /* assuming they left us something to send, send it */ if (eff_buf.token_len) { @@ -219,20 +280,13 @@ lws_issue_raw_ext_access(struct libwebsocket *wsi, eff_buf.token_len); if (n < 0) return -1; - /* - * Keep amount spilled small to minimize chance of this - */ - if (n != eff_buf.token_len) { - lwsl_err("Unable to spill ext %d vs %d\n", - eff_buf.token_len, n); - return -1; - } + /* always either sent it all or privately buffered */ } lwsl_parser("written %d bytes to client\n", eff_buf.token_len); - /* no extension has more to spill */ + /* no extension has more to spill? Then we can go */ if (!ret) break; @@ -244,10 +298,12 @@ lws_issue_raw_ext_access(struct libwebsocket *wsi, /* * Did that leave the pipe choked? + * Or we had to hold on to some of it? */ - if (!lws_send_pipe_choked(wsi)) - /* no we could add more */ + if (!lws_send_pipe_choked(wsi) && + !wsi->u.ws.truncated_send_malloc) + /* no we could add more, lets's do that */ continue; lwsl_debug("choked\n"); @@ -311,7 +367,8 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf, int m; #endif - if (len == 0 && protocol != LWS_WRITE_CLOSE && protocol != LWS_WRITE_PING && protocol != LWS_WRITE_PONG) { + if (len == 0 && protocol != LWS_WRITE_CLOSE && + protocol != LWS_WRITE_PING && protocol != LWS_WRITE_PONG) { lwsl_warn("zero length libwebsocket_write attempt\n"); return 0; } @@ -324,8 +381,19 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf, if (wsi->state != WSI_STATE_ESTABLISHED) return -1; + /* if we are continuing a frame that already had its header done */ + + if (wsi->u.ws.inside_frame) + goto do_more_inside_frame; + + /* if he wants all partials buffered, never have a clean_buffer */ + wsi->u.ws.clean_buffer = !wsi->protocol->no_buffer_all_partial_tx; + #ifndef LWS_NO_EXTENSIONS - /* give a change to the extensions to modify payload */ + /* + * give a chance to the extensions to modify payload + * pre-TX mangling is not allowed to truncate + */ eff_buf.token = (char *)buf; eff_buf.token_len = len; @@ -347,12 +415,21 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf, } } + /* + * an extension did something we need to keep... for example, if + * compression extension, it has already updated its state according + * to this being issued + */ + if ((char *)buf != eff_buf.token) + wsi->u.ws.clean_buffer = 0; /* we need to buffer this if not all sent */ + buf = (unsigned char *)eff_buf.token; len = eff_buf.token_len; #endif switch (wsi->ietf_spec_revision) { case 13: + if (masked7) { pre += 4; dropmask = &buf[0 - pre]; @@ -435,6 +512,8 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf, break; } +do_more_inside_frame: + /* * Deal with masking if we are in client -> server direction and * the protocol demands it @@ -442,10 +521,11 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf, if (wsi->mode == LWS_CONNMODE_WS_CLIENT) { - if (libwebsocket_0405_frame_mask_generate(wsi)) { - lwsl_err("lws_write: frame mask generation failed\n"); - return -1; - } + if (!wsi->u.ws.inside_frame) + if (libwebsocket_0405_frame_mask_generate(wsi)) { + lwsl_err("lws_write: frame mask generation failed\n"); + return -1; + } /* * in v7, just mask the payload @@ -455,10 +535,9 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf, wsi->u.ws.frame_masking_nonce_04[ (wsi->u.ws.frame_mask_index++) & 3]; - if (dropmask) + if (dropmask) /* never set if already inside frame */ /* copy the frame nonce into place */ - memcpy(dropmask, - wsi->u.ws.frame_masking_nonce_04, 4); + memcpy(dropmask, wsi->u.ws.frame_masking_nonce_04, 4); } send_raw: @@ -480,6 +559,8 @@ send_raw: break; } + wsi->u.ws.inside_frame = 1; + /* * give any active extensions a chance to munge the buffer * before send. We pass in a pointer to an lws_tokens struct @@ -492,13 +573,31 @@ send_raw: * used then so it is efficient. * * callback returns 1 in case it wants to spill more buffers + * + * This takes care of holding the buffer if send is incomplete, ie, + * if wsi->u.ws.clean_buffer is 0 (meaning an extension meddled with + * the buffer). If wsi->u.ws.clean_buffer is 1, it will instead + * return to the user code how much OF THE USER BUFFER was consumed. */ n = lws_issue_raw_ext_access(wsi, buf - pre, len + pre + post); if (n < 0) return n; - return orig_len - ((len - pre + post) -n ); + if (n == len + pre + post) { + /* everything in the buffer was handled (or rebuffered...) */ + wsi->u.ws.inside_frame = 0; + return orig_len; + } + + /* + * it is how many bytes of user buffer got sent... may be < orig_len + * in which case callback when writable has already been arranged + * and user code can call libwebsocket_write() again with the rest + * later. + */ + + return n - (pre + post); } LWS_VISIBLE int libwebsockets_serve_http_file_fragment( diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 7c39183a..9efc0db6 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -343,6 +343,12 @@ struct _lws_websocket_related { int rxflow_pos; unsigned int rxflow_change_to:2; unsigned int this_frame_masked:1; + unsigned int inside_frame:1; /* next write will be more of frame */ + unsigned int clean_buffer:1; /* buffer not rewritten by extension */ + /* truncated send handling */ + unsigned char *truncated_send_malloc; /* non-NULL means buffering in progress */ + unsigned int truncated_send_offset; /* where we are in terms of spilling */ + unsigned int truncated_send_len; /* how much is buffered */ }; struct libwebsocket {