mirror of
https://github.com/warmcat/libwebsockets.git
synced 2025-03-09 00:00:04 +01:00
add explicit error for partial send
This patch adds code to handle the situation that a prepared user buffer could not all be sent on the socket at once. There are two kinds of situation to handle 1) User code handles it: The connection only has extensions active that do not rewrite the buffer. In this case, the patch caused libwebsocket_write() to simply return the amount of user buffer that was consumed (this is specifically the amount of user buffer used in sending what was accepted, nothing else). So user code can just advance its buffer that much and resume sending when the socket is writable again. This continues the frame rather than starting a new one or new fragment. 2) The connections has extensions active which actually send something quite different than what the user buffer contains, for example a compression extension. In this case, libwebsockets will dynamically malloc a buffer to contain a copy of the remaining unsent data, request notifiction when writeable again, and automatically spill and free this buffer with the highest priority before passing on the writable notification to anything else. For this situation, the call to write will return that it used the whole user buffer, even though part is still rebuffered. This patch should enable libwebsockets to detect the two cases and take the appropriate action. There are also two choices for user code to deal with partial sends. 1) Leave the no_buffer_all_partial_tx member in the protocol struct at zero. The library will dyamically buffer anything you send that did not get completely written to the socket, and automatically spill it next time the socket is writable. You can use this method if your sent frames are relatvely small and unlikely to get truncated anyway. 2) Set the no_buffer_all_partial_tx member in the protocol struct. User code now needs to take care of the return value from libwebsocket_write() and deal with resending the remainder if not all of the requested amount got sent. You should use this method if you are sending large messages and want to maximize throughput and efficiency. Since the new member no_buffer_all_partial_tx will be zero by default, this patch will auto-rebuffer any partial sends by default. That's good for most cases but if you attempt to send large blocks, make sure you follow option 2) above. Signed-off-by: Andy Green <andy.green@linaro.org>
This commit is contained in:
parent
5dc62ead9d
commit
1f4267bda8
4 changed files with 154 additions and 25 deletions
|
@ -370,6 +370,11 @@ just_kill_connection:
|
|||
free(wsi->u.ws.rxflow_buffer);
|
||||
wsi->u.ws.rxflow_buffer = NULL;
|
||||
}
|
||||
if (wsi->u.ws.truncated_send_malloc) {
|
||||
/* not going to be completed... nuke it */
|
||||
free(wsi->u.ws.truncated_send_malloc);
|
||||
wsi->u.ws.truncated_send_malloc = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* tell the user it's all over for this guy */
|
||||
|
@ -658,6 +663,16 @@ lws_handle_POLLOUT_event(struct libwebsocket_context *context,
|
|||
int m;
|
||||
int handled = 0;
|
||||
|
||||
/* pending truncated sends have uber priority */
|
||||
|
||||
if (wsi->u.ws.truncated_send_malloc) {
|
||||
lws_issue_raw(wsi, wsi->u.ws.truncated_send_malloc +
|
||||
wsi->u.ws.truncated_send_offset,
|
||||
wsi->u.ws.truncated_send_len);
|
||||
/* leave POLLOUT active either way */
|
||||
return 0;
|
||||
}
|
||||
|
||||
for (n = 0; n < wsi->count_active_extensions; n++) {
|
||||
if (!wsi->active_extensions[n]->callback)
|
||||
continue;
|
||||
|
|
|
@ -709,6 +709,14 @@ typedef int (extension_callback_function)(struct libwebsocket_context *context,
|
|||
* libwebsockets_remaining_packet_payload(). Notice that you
|
||||
* just talk about frame size here, the LWS_SEND_BUFFER_PRE_PADDING
|
||||
* and post-padding are automatically also allocated on top.
|
||||
* @no_buffer_all_partial_tx: Leave at zero if you want the library to take
|
||||
* care of all partial tx for you. It's useful if you only have
|
||||
* small tx packets and the chance of any truncated send is small
|
||||
* enough any additional malloc / buffering overhead is less
|
||||
* painful than writing the code to deal with partial sends. For
|
||||
* protocols where you stream big blocks, set to nonzero and use
|
||||
* the return value from libwebsocket_write() to manage how much
|
||||
* got send yourself.
|
||||
* @owning_server: the server init call fills in this opaque pointer when
|
||||
* registering this protocol with the server.
|
||||
* @protocol_index: which protocol we are starting from zero
|
||||
|
@ -723,6 +731,7 @@ struct libwebsocket_protocols {
|
|||
callback_function *callback;
|
||||
size_t per_session_data_size;
|
||||
size_t rx_buffer_size;
|
||||
int no_buffer_all_partial_tx;
|
||||
|
||||
/*
|
||||
* below are filled in on server init and can be left uninitialized,
|
||||
|
|
149
lib/output.c
149
lib/output.c
|
@ -91,7 +91,7 @@ LWS_VISIBLE void lwsl_hexdump(void *vbuf, size_t len)
|
|||
#endif
|
||||
|
||||
/*
|
||||
* notice this returns number of bytes sent, or -1
|
||||
* notice this returns number of bytes consumed, or -1
|
||||
*/
|
||||
|
||||
int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len)
|
||||
|
@ -121,12 +121,13 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len)
|
|||
}
|
||||
if (m) /* handled */ {
|
||||
/* lwsl_ext("ext sent it\n"); */
|
||||
return m;
|
||||
n = m;
|
||||
goto handle_truncated_send;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if (!wsi->sock)
|
||||
lwsl_warn("** error 0 sock but expected to send\n");
|
||||
if (wsi->sock < 0)
|
||||
lwsl_warn("** error invalid sock but expected to send\n");
|
||||
|
||||
/*
|
||||
* nope, send it on the socket directly
|
||||
|
@ -157,6 +158,63 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len)
|
|||
#ifdef LWS_OPENSSL_SUPPORT
|
||||
}
|
||||
#endif
|
||||
|
||||
handle_truncated_send:
|
||||
|
||||
/*
|
||||
* already handling a truncated send?
|
||||
*/
|
||||
if (wsi->u.ws.truncated_send_malloc) {
|
||||
lwsl_info("***** partial send moved on by %d (vs %d)\n", n, len);
|
||||
wsi->u.ws.truncated_send_offset += n;
|
||||
wsi->u.ws.truncated_send_len -= n;
|
||||
|
||||
if (!wsi->u.ws.truncated_send_len) {
|
||||
lwsl_info("***** partial send completed\n");
|
||||
/* done with it */
|
||||
free(wsi->u.ws.truncated_send_malloc);
|
||||
wsi->u.ws.truncated_send_malloc = NULL;
|
||||
} else
|
||||
libwebsocket_callback_on_writable(
|
||||
wsi->protocol->owning_server, wsi);
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
if (n < len) {
|
||||
if (wsi->u.ws.clean_buffer) {
|
||||
/*
|
||||
* This buffer unaffected by extension rewriting.
|
||||
* It means the user code is expected to deal with
|
||||
* partial sends. (lws knows the header was already
|
||||
* sent, so on next send will just resume sending
|
||||
* payload)
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
* Newly truncated send. Buffer the remainder (it will get
|
||||
* first priority next time the socket is writable)
|
||||
*/
|
||||
lwsl_info("***** new partial send %d sent %d accepted\n", len, n);
|
||||
|
||||
wsi->u.ws.truncated_send_malloc = malloc(len - n);
|
||||
if (!wsi->u.ws.truncated_send_malloc) {
|
||||
lwsl_err("truncated send: unable to malloc %d\n",
|
||||
len - n);
|
||||
return -1;
|
||||
}
|
||||
|
||||
wsi->u.ws.truncated_send_offset = 0;
|
||||
wsi->u.ws.truncated_send_len = len - n;
|
||||
memcpy(wsi->u.ws.truncated_send_malloc, buf + n, len - n);
|
||||
|
||||
libwebsocket_callback_on_writable(
|
||||
wsi->protocol->owning_server, wsi);
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
|
@ -212,6 +270,9 @@ lws_issue_raw_ext_access(struct libwebsocket *wsi,
|
|||
ret = 1;
|
||||
}
|
||||
|
||||
if ((char *)buf != eff_buf.token)
|
||||
wsi->u.ws.clean_buffer = 0; /* extension recreated it: we need to buffer this if not all sent */
|
||||
|
||||
/* assuming they left us something to send, send it */
|
||||
|
||||
if (eff_buf.token_len) {
|
||||
|
@ -219,20 +280,13 @@ lws_issue_raw_ext_access(struct libwebsocket *wsi,
|
|||
eff_buf.token_len);
|
||||
if (n < 0)
|
||||
return -1;
|
||||
/*
|
||||
* Keep amount spilled small to minimize chance of this
|
||||
*/
|
||||
if (n != eff_buf.token_len) {
|
||||
lwsl_err("Unable to spill ext %d vs %d\n",
|
||||
eff_buf.token_len, n);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* always either sent it all or privately buffered */
|
||||
}
|
||||
|
||||
lwsl_parser("written %d bytes to client\n", eff_buf.token_len);
|
||||
|
||||
/* no extension has more to spill */
|
||||
/* no extension has more to spill? Then we can go */
|
||||
|
||||
if (!ret)
|
||||
break;
|
||||
|
@ -244,10 +298,12 @@ lws_issue_raw_ext_access(struct libwebsocket *wsi,
|
|||
|
||||
/*
|
||||
* Did that leave the pipe choked?
|
||||
* Or we had to hold on to some of it?
|
||||
*/
|
||||
|
||||
if (!lws_send_pipe_choked(wsi))
|
||||
/* no we could add more */
|
||||
if (!lws_send_pipe_choked(wsi) &&
|
||||
!wsi->u.ws.truncated_send_malloc)
|
||||
/* no we could add more, lets's do that */
|
||||
continue;
|
||||
|
||||
lwsl_debug("choked\n");
|
||||
|
@ -311,7 +367,8 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
|
|||
int m;
|
||||
#endif
|
||||
|
||||
if (len == 0 && protocol != LWS_WRITE_CLOSE && protocol != LWS_WRITE_PING && protocol != LWS_WRITE_PONG) {
|
||||
if (len == 0 && protocol != LWS_WRITE_CLOSE &&
|
||||
protocol != LWS_WRITE_PING && protocol != LWS_WRITE_PONG) {
|
||||
lwsl_warn("zero length libwebsocket_write attempt\n");
|
||||
return 0;
|
||||
}
|
||||
|
@ -324,8 +381,19 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
|
|||
if (wsi->state != WSI_STATE_ESTABLISHED)
|
||||
return -1;
|
||||
|
||||
/* if we are continuing a frame that already had its header done */
|
||||
|
||||
if (wsi->u.ws.inside_frame)
|
||||
goto do_more_inside_frame;
|
||||
|
||||
/* if he wants all partials buffered, never have a clean_buffer */
|
||||
wsi->u.ws.clean_buffer = !wsi->protocol->no_buffer_all_partial_tx;
|
||||
|
||||
#ifndef LWS_NO_EXTENSIONS
|
||||
/* give a change to the extensions to modify payload */
|
||||
/*
|
||||
* give a chance to the extensions to modify payload
|
||||
* pre-TX mangling is not allowed to truncate
|
||||
*/
|
||||
eff_buf.token = (char *)buf;
|
||||
eff_buf.token_len = len;
|
||||
|
||||
|
@ -347,12 +415,21 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* an extension did something we need to keep... for example, if
|
||||
* compression extension, it has already updated its state according
|
||||
* to this being issued
|
||||
*/
|
||||
if ((char *)buf != eff_buf.token)
|
||||
wsi->u.ws.clean_buffer = 0; /* we need to buffer this if not all sent */
|
||||
|
||||
buf = (unsigned char *)eff_buf.token;
|
||||
len = eff_buf.token_len;
|
||||
#endif
|
||||
|
||||
switch (wsi->ietf_spec_revision) {
|
||||
case 13:
|
||||
|
||||
if (masked7) {
|
||||
pre += 4;
|
||||
dropmask = &buf[0 - pre];
|
||||
|
@ -435,6 +512,8 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
|
|||
break;
|
||||
}
|
||||
|
||||
do_more_inside_frame:
|
||||
|
||||
/*
|
||||
* Deal with masking if we are in client -> server direction and
|
||||
* the protocol demands it
|
||||
|
@ -442,10 +521,11 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
|
|||
|
||||
if (wsi->mode == LWS_CONNMODE_WS_CLIENT) {
|
||||
|
||||
if (libwebsocket_0405_frame_mask_generate(wsi)) {
|
||||
lwsl_err("lws_write: frame mask generation failed\n");
|
||||
return -1;
|
||||
}
|
||||
if (!wsi->u.ws.inside_frame)
|
||||
if (libwebsocket_0405_frame_mask_generate(wsi)) {
|
||||
lwsl_err("lws_write: frame mask generation failed\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* in v7, just mask the payload
|
||||
|
@ -455,10 +535,9 @@ LWS_VISIBLE int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
|
|||
wsi->u.ws.frame_masking_nonce_04[
|
||||
(wsi->u.ws.frame_mask_index++) & 3];
|
||||
|
||||
if (dropmask)
|
||||
if (dropmask) /* never set if already inside frame */
|
||||
/* copy the frame nonce into place */
|
||||
memcpy(dropmask,
|
||||
wsi->u.ws.frame_masking_nonce_04, 4);
|
||||
memcpy(dropmask, wsi->u.ws.frame_masking_nonce_04, 4);
|
||||
}
|
||||
|
||||
send_raw:
|
||||
|
@ -480,6 +559,8 @@ send_raw:
|
|||
break;
|
||||
}
|
||||
|
||||
wsi->u.ws.inside_frame = 1;
|
||||
|
||||
/*
|
||||
* give any active extensions a chance to munge the buffer
|
||||
* before send. We pass in a pointer to an lws_tokens struct
|
||||
|
@ -492,13 +573,31 @@ send_raw:
|
|||
* used then so it is efficient.
|
||||
*
|
||||
* callback returns 1 in case it wants to spill more buffers
|
||||
*
|
||||
* This takes care of holding the buffer if send is incomplete, ie,
|
||||
* if wsi->u.ws.clean_buffer is 0 (meaning an extension meddled with
|
||||
* the buffer). If wsi->u.ws.clean_buffer is 1, it will instead
|
||||
* return to the user code how much OF THE USER BUFFER was consumed.
|
||||
*/
|
||||
|
||||
n = lws_issue_raw_ext_access(wsi, buf - pre, len + pre + post);
|
||||
if (n < 0)
|
||||
return n;
|
||||
|
||||
return orig_len - ((len - pre + post) -n );
|
||||
if (n == len + pre + post) {
|
||||
/* everything in the buffer was handled (or rebuffered...) */
|
||||
wsi->u.ws.inside_frame = 0;
|
||||
return orig_len;
|
||||
}
|
||||
|
||||
/*
|
||||
* it is how many bytes of user buffer got sent... may be < orig_len
|
||||
* in which case callback when writable has already been arranged
|
||||
* and user code can call libwebsocket_write() again with the rest
|
||||
* later.
|
||||
*/
|
||||
|
||||
return n - (pre + post);
|
||||
}
|
||||
|
||||
LWS_VISIBLE int libwebsockets_serve_http_file_fragment(
|
||||
|
|
|
@ -343,6 +343,12 @@ struct _lws_websocket_related {
|
|||
int rxflow_pos;
|
||||
unsigned int rxflow_change_to:2;
|
||||
unsigned int this_frame_masked:1;
|
||||
unsigned int inside_frame:1; /* next write will be more of frame */
|
||||
unsigned int clean_buffer:1; /* buffer not rewritten by extension */
|
||||
/* truncated send handling */
|
||||
unsigned char *truncated_send_malloc; /* non-NULL means buffering in progress */
|
||||
unsigned int truncated_send_offset; /* where we are in terms of spilling */
|
||||
unsigned int truncated_send_len; /* how much is buffered */
|
||||
};
|
||||
|
||||
struct libwebsocket {
|
||||
|
|
Loading…
Add table
Reference in a new issue