1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

reflect send completeness in lws_write return

under load, writing packet sizes to the socket that are normally fine
can do partial writes, eg asking to write 4096 may only take 2800 of
it and return 2800 from the actual send.

Until now lws assumed that if it was safe to send, it could take any
size buffer, that's not the case under load.

This patch changes lws_write to return the amount actually taken...
that and the meaning of it becomes tricky when dealing with
compressed links, the amount taken and the amount sent differ.  Also
there is no way to recover at the moment from a protocol-encoded
frame only being partially accepted... however for http file send
content it can and does recover now.

Small frames don't have to take any care about it but large atomic
sends (> 2K) have been seen to fail under load.

Signed-off-by: Andy Green <andy.green@linaro.org>
This commit is contained in:
Andy Green 2013-02-23 10:50:10 +08:00
parent 2672fb2d68
commit fc7c5e4539
12 changed files with 135 additions and 34 deletions

View file

@ -23,6 +23,11 @@ User api changes
LWS_CALLBACK_FILTER_NETWORK_CONNECTION also has the socket descriptor
delivered by @in now instead of @user.
- libwebsocket_write() now returns -1 for error, or the amount of data
actually accepted for send. Under load, the OS may signal it is
ready to send new data on the socket, but have only a restricted
amount of memory to buffer the packet compared to usual.
User api removal
----------------

View file

@ -273,7 +273,11 @@ spill:
}
lwsl_parser("client sees server close len = %d\n",
wsi->u.ws.rx_user_buffer_head);
/* parrot the close packet payload back */
/*
* parrot the close packet payload back
* we do not care about how it went, we are closing
* immediately afterwards
*/
libwebsocket_write(wsi, (unsigned char *)
&wsi->u.ws.rx_user_buffer[
LWS_SEND_BUFFER_PRE_PADDING],
@ -284,7 +288,11 @@ spill:
case LWS_WS_OPCODE_07__PING:
lwsl_info("client received ping, doing pong\n");
/* parrot the ping packet payload back as a pong*/
/*
* parrot the ping packet payload back as a pong
* !!! this may block or have partial write or fail
* !!! very unlikely if the ping size is small
*/
libwebsocket_write(wsi, (unsigned char *)
&wsi->u.ws.rx_user_buffer[
LWS_SEND_BUFFER_PRE_PADDING],

View file

@ -276,7 +276,7 @@ libwebsocket_close_and_free_session(struct libwebsocket_context *context,
if (eff_buf.token_len)
if (lws_issue_raw(wsi, (unsigned char *)eff_buf.token,
eff_buf.token_len)) {
eff_buf.token_len) != eff_buf.token_len) {
lwsl_debug("close: ext spill failed\n");
goto just_kill_connection;
}
@ -305,7 +305,7 @@ libwebsocket_close_and_free_session(struct libwebsocket_context *context,
n = libwebsocket_write(wsi,
&buf[LWS_SEND_BUFFER_PRE_PADDING + 2],
0, LWS_WRITE_CLOSE);
if (!n) {
if (n >= 0) {
/*
* we have sent a nice protocol level indication we
* now wish to close, we should not send anything more
@ -698,9 +698,18 @@ lws_handle_POLLOUT_event(struct libwebsocket_context *context,
/* assuming they gave us something to send, send it */
if (eff_buf.token_len) {
if (lws_issue_raw(wsi, (unsigned char *)eff_buf.token,
eff_buf.token_len))
n = lws_issue_raw(wsi, (unsigned char *)eff_buf.token,
eff_buf.token_len);
if (n < 0)
return -1;
/*
* Keep amount spilled small to minimize chance of this
*/
if (n != eff_buf.token_len) {
lwsl_err("Unable to spill ext %d vs %s\n",
eff_buf.token_len, n);
return -1;
}
} else
continue;

View file

@ -90,6 +90,10 @@ void lwsl_hexdump(void *vbuf, size_t len)
#endif
/*
* notice this returns number of bytes sent, or -1
*/
int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len)
{
struct libwebsocket_context *context = wsi->protocol->owning_server;
@ -117,7 +121,7 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len)
}
if (m) /* handled */ {
/* lwsl_ext("ext sent it\n"); */
return 0;
return m;
}
}
#endif
@ -146,14 +150,14 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len)
#endif
n = send(wsi->sock, buf, len, MSG_NOSIGNAL);
lws_latency(context, wsi, "send lws_issue_raw", n, n == len);
if (n != len) {
if (n < 0) {
lwsl_debug("ERROR writing len %d to skt %d\n", len, n);
return -1;
}
#ifdef LWS_OPENSSL_SUPPORT
}
#endif
return 0;
return n;
}
#ifdef LWS_NO_EXTENSIONS
@ -210,10 +214,21 @@ lws_issue_raw_ext_access(struct libwebsocket *wsi,
/* assuming they left us something to send, send it */
if (eff_buf.token_len)
if (lws_issue_raw(wsi, (unsigned char *)eff_buf.token,
eff_buf.token_len))
if (eff_buf.token_len) {
n = lws_issue_raw(wsi, (unsigned char *)eff_buf.token,
eff_buf.token_len);
if (n < 0)
return -1;
/*
* Keep amount spilled small to minimize chance of this
*/
if (n != eff_buf.token_len) {
lwsl_err("Unable to spill ext %d vs %s\n",
eff_buf.token_len, n);
return -1;
}
}
lwsl_parser("written %d bytes to client\n", eff_buf.token_len);
@ -247,7 +262,7 @@ lws_issue_raw_ext_access(struct libwebsocket *wsi,
ret = 0;
}
return 0;
return len;
}
#endif
@ -274,6 +289,11 @@ lws_issue_raw_ext_access(struct libwebsocket *wsi,
* valid storage before and after buf as explained above. This scheme
* allows maximum efficiency of sending data and protocol in a single
* packet while not burdening the user code with any protocol knowledge.
*
* Return may be -1 for a fatal error needing connection close, or a
* positive number reflecting the amount of bytes actually sent. This
* can be less than the requested number of bytes due to OS memory
* pressure at any given time.
*/
int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
@ -285,6 +305,7 @@ int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
int masked7 = wsi->mode == LWS_CONNMODE_WS_CLIENT;
unsigned char *dropmask = NULL;
unsigned char is_masked_bit = 0;
size_t orig_len = len;
#ifndef LWS_NO_EXTENSIONS
struct lws_tokens eff_buf;
int m;
@ -423,7 +444,7 @@ int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
if (libwebsocket_0405_frame_mask_generate(wsi)) {
lwsl_err("lws_write: frame mask generation failed\n");
return 1;
return -1;
}
/*
@ -453,11 +474,8 @@ send_raw:
case LWS_WRITE_HTTP:
case LWS_WRITE_PONG:
case LWS_WRITE_PING:
if (lws_issue_raw(wsi, (unsigned char *)buf - pre,
len + pre + post))
return -1;
return 0;
return lws_issue_raw(wsi, (unsigned char *)buf - pre,
len + pre + post);
default:
break;
}
@ -476,26 +494,36 @@ send_raw:
* callback returns 1 in case it wants to spill more buffers
*/
return lws_issue_raw_ext_access(wsi, buf - pre, len + pre + post);
n = lws_issue_raw_ext_access(wsi, buf - pre, len + pre + post);
if (n < 0)
return n;
return orig_len - ((len - pre + post) -n );
}
int libwebsockets_serve_http_file_fragment(
struct libwebsocket_context *context, struct libwebsocket *wsi)
{
int ret = 0;
int n;
int n, m;
while (!lws_send_pipe_choked(wsi)) {
n = read(wsi->u.http.fd, context->service_buffer,
sizeof(context->service_buffer));
if (n > 0) {
libwebsocket_write(wsi, context->service_buffer, n,
m = libwebsocket_write(wsi, context->service_buffer, n,
LWS_WRITE_HTTP);
if (m < 0)
return -1;
wsi->u.http.filepos += n;
if (m != n)
/* adjust for what was not sent */
lseek(wsi->u.http.fd, m - n, SEEK_CUR);
}
if (n < 0)
return 1; /* caller will close */
return -1; /* caller will close */
if (n < sizeof(context->service_buffer) ||
wsi->u.http.filepos == wsi->u.http.filelen) {
@ -552,6 +580,7 @@ int libwebsockets_serve_http_file(struct libwebsocket_context *context,
"HTTP/1.0 400 Bad\x0d\x0aServer: libwebsockets\x0d\x0a\x0d\x0a"
);
wsi->u.http.fd = 0;
/* too small to care about partial, closing anyway */
libwebsocket_write(wsi, context->service_buffer,
p - context->service_buffer, LWS_WRITE_HTTP);
@ -569,8 +598,10 @@ int libwebsockets_serve_http_file(struct libwebsocket_context *context,
ret = libwebsocket_write(wsi, context->service_buffer,
p - context->service_buffer, LWS_WRITE_HTTP);
if (ret)
if (ret != (p - context->service_buffer)) {
lwsl_err("_write returned %d from %d\n", ret, (p - context->service_buffer));
return -1;
}
wsi->u.http.filepos = 0;
wsi->state = WSI_STATE_HTTP_ISSUING_FILE;

View file

@ -935,7 +935,7 @@ spill:
LWS_SEND_BUFFER_PRE_PADDING],
wsi->u.ws.rx_user_buffer_head,
LWS_WRITE_CLOSE);
if (n)
if (n < 0)
lwsl_info("write of close ack failed %d\n", n);
wsi->state = WSI_STATE_RETURNED_CLOSE_ALREADY;
/* close the connection */
@ -951,6 +951,8 @@ spill:
n = libwebsocket_write(wsi, (unsigned char *)
&wsi->u.ws.rx_user_buffer[LWS_SEND_BUFFER_PRE_PADDING],
wsi->u.ws.rx_user_buffer_head, LWS_WRITE_PONG);
if (n < 0)
return -1;
/* ... then just drop it */
wsi->u.ws.rx_user_buffer_head = 0;
return 0;

View file

@ -241,7 +241,7 @@ handshake_0405(struct libwebsocket_context *context, struct libwebsocket *wsi)
#endif
n = libwebsocket_write(wsi, (unsigned char *)response,
p - response, LWS_WRITE_HTTP);
if (n < 0) {
if (n != (p - response)) {
lwsl_debug("handshake_0405: ERROR writing to socket\n");
goto bail;
}

View file

@ -438,6 +438,11 @@ In the case of sending using websocket protocol, be sure to allocate
valid storage before and after buf as explained above. This scheme
allows maximum efficiency of sending data and protocol in a single
packet while not burdening the user code with any protocol knowledge.
<p>
Return may be -1 for a fatal error needing connection close, or a
positive number reflecting the amount of bytes actually sent. This
can be less than the requested number of bytes due to OS memory
pressure at any given time.
</blockquote>
<hr>
<h2>libwebsockets_serve_http_file - Send a file back to the client using http</h2>

View file

@ -150,9 +150,16 @@ callback_lws_mirror(struct libwebsocket_context *context,
(int)random() % 250,
(int)random() % 24);
libwebsocket_write(wsi,
n = libwebsocket_write(wsi,
&buf[LWS_SEND_BUFFER_PRE_PADDING], l, opts | LWS_WRITE_TEXT);
if (n < 0)
return -1;
if (n < l) {
lwsl_err("Partial write LWS_CALLBACK_CLIENT_WRITEABLE\n");
return -1;
}
mirror_lifetime--;
if (!mirror_lifetime) {
fprintf(stderr, "closing mirror session\n");

View file

@ -69,6 +69,10 @@ callback_echo(struct libwebsocket_context *context,
lwsl_err("ERROR %d writing to socket, hanging up\n", n);
return 1;
}
if (n < pss->len) {
lwsl_err("Partial write\n");
return -1;
}
break;
case LWS_CALLBACK_RECEIVE:
@ -101,7 +105,11 @@ callback_echo(struct libwebsocket_context *context,
n = libwebsocket_write(wsi, &pss->buf[LWS_SEND_BUFFER_PRE_PADDING], pss->len, LWS_WRITE_TEXT);
if (n < 0) {
lwsl_err("ERROR %d writing to socket, hanging up\n", n);
return 1;
return -1;
}
if (n < pss->len) {
lwsl_err("Partial write\n");
return -1;
}
break;
#endif

View file

@ -175,6 +175,12 @@ callback_fraggle(struct libwebsocket_context *context,
psf->state = FRAGSTATE_POST_PAYLOAD_SUM;
n = libwebsocket_write(wsi, bp, chunk, write_mode);
if (n < 0)
return -1;
if (n < chunk) {
lwsl_err("Partial write\n");
return -1;
}
libwebsocket_callback_on_writable(context, wsi);
break;
@ -192,6 +198,12 @@ callback_fraggle(struct libwebsocket_context *context,
n = libwebsocket_write(wsi, (unsigned char *)bp,
4, LWS_WRITE_BINARY);
if (n < 0)
return -1;
if (n < 4) {
lwsl_err("Partial write\n");
return -1;
}
psf->state = FRAGSTATE_START_MESSAGE;

View file

@ -259,14 +259,21 @@ callback_lws_mirror(struct libwebsocket_context * this,
global_tx_count++;
if (use_mirror)
libwebsocket_write(wsi,
n = libwebsocket_write(wsi,
&pingbuf[LWS_SEND_BUFFER_PRE_PADDING],
size, write_options | LWS_WRITE_BINARY);
else
libwebsocket_write(wsi,
n = libwebsocket_write(wsi,
&pingbuf[LWS_SEND_BUFFER_PRE_PADDING],
size, write_options | LWS_WRITE_PING);
if (n < 0)
return -1;
if (n < size) {
lwsl_err("Partial write\n");
return -1;
}
if (flood &&
(psd->ping_index - psd->rx_count) < (screen_width - 1))
fprintf(stderr, ".");

View file

@ -122,13 +122,12 @@ static int callback_http(struct libwebsocket_context *context,
char client_ip[128];
#endif
char buf[256];
int n;
int n, m;
unsigned char *p;
static unsigned char buffer[4096];
struct stat stat_buf;
struct per_session_data__http *pss = (struct per_session_data__http *)user;
#ifdef EXTERNAL_POLL
int m;
int fd = (int)(long)in;
#endif
@ -166,6 +165,7 @@ static int callback_http(struct libwebsocket_context *context,
* send the http headers...
* this won't block since it's the first payload sent
* on the connection since it was established
* (too small for partial)
*/
n = libwebsocket_write(wsi, buffer,
@ -223,10 +223,13 @@ static int callback_http(struct libwebsocket_context *context,
* because it's HTTP and not websocket, don't need to take
* care about pre and postamble
*/
n = libwebsocket_write(wsi, buffer, n, LWS_WRITE_HTTP);
if (n < 0)
m = libwebsocket_write(wsi, buffer, n, LWS_WRITE_HTTP);
if (m < 0)
/* write failed, close conn */
goto bail;
if (m != n)
/* partial write, adjust */
lseek(pss->fd, m - n, SEEK_CUR);
} while (!lws_send_pipe_choked(wsi));
libwebsocket_callback_on_writable(context, wsi);
@ -383,6 +386,7 @@ callback_dumb_increment(struct libwebsocket_context *context,
case LWS_CALLBACK_SERVER_WRITEABLE:
n = sprintf((char *)p, "%d", pss->number++);
/* too small for partial */
n = libwebsocket_write(wsi, p, n, LWS_WRITE_TEXT);
if (n < 0) {
lwsl_err("ERROR %d writing to di socket\n", n);
@ -479,6 +483,9 @@ callback_lws_mirror(struct libwebsocket_context *context,
lwsl_err("ERROR %d writing to mirror socket\n", n);
return -1;
}
if (n < ringbuffer[pss->ringbuffer_tail].len) {
lwsl_err("mirror partial write %d vs %d\n", n, ringbuffer[pss->ringbuffer_tail].len);
}
if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
pss->ringbuffer_tail = 0;