diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index 31e14242..7342e9ba 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -376,10 +376,10 @@ just_kill_connection: free(wsi->u.ws.rxflow_buffer); wsi->u.ws.rxflow_buffer = NULL; } - if (wsi->u.ws.truncated_send_malloc) { + if (wsi->truncated_send_malloc) { /* not going to be completed... nuke it */ - free(wsi->u.ws.truncated_send_malloc); - wsi->u.ws.truncated_send_malloc = NULL; + free(wsi->truncated_send_malloc); + wsi->truncated_send_malloc = NULL; } } @@ -642,6 +642,10 @@ LWS_VISIBLE int lws_send_pipe_choked(struct libwebsocket *wsi) { struct pollfd fds; + /* treat the fact we got a truncated send pending as if we're choked */ + if (wsi->truncated_send_malloc) + return 1; + fds.fd = wsi->sock; fds.events = POLLOUT; fds.revents = 0; @@ -671,10 +675,10 @@ lws_handle_POLLOUT_event(struct libwebsocket_context *context, /* pending truncated sends have uber priority */ - if (wsi->u.ws.truncated_send_malloc) { - lws_issue_raw(wsi, wsi->u.ws.truncated_send_malloc + - wsi->u.ws.truncated_send_offset, - wsi->u.ws.truncated_send_len); + if (wsi->truncated_send_malloc) { + lws_issue_raw(wsi, wsi->truncated_send_malloc + + wsi->truncated_send_offset, + wsi->truncated_send_len); /* leave POLLOUT active either way */ return 0; } diff --git a/lib/output.c b/lib/output.c index a375dea2..c1074011 100644 --- a/lib/output.c +++ b/lib/output.c @@ -100,6 +100,14 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len) int n; #ifndef LWS_NO_EXTENSIONS int m; + size_t real_len = len; + + if (wsi->truncated_send_malloc && + (buf < wsi->truncated_send_malloc || + buf > (wsi->truncated_send_malloc + wsi->truncated_send_len + wsi->truncated_send_offset))) { + lwsl_err("****** %x Sending something else while pending truncated ...\n", wsi); + assert(0); + } /* * one of the extensions is carrying our data itself? Like mux? @@ -138,12 +146,23 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len) lws_hexdump(buf, len); #endif +#if 0 + /* test partial send support by forcing multiple sends on everything */ + len = len / 2; + if (!len) + len = 1; +#endif + lws_latency_pre(context, wsi); #ifdef LWS_OPENSSL_SUPPORT if (wsi->ssl) { n = SSL_write(wsi->ssl, buf, len); lws_latency(context, wsi, "SSL_write lws_issue_raw", n, n >= 0); if (n < 0) { + if (errno == EAGAIN || errno == EINTR) { + n = 0; + goto handle_truncated_send; + } lwsl_debug("ERROR writing to socket\n"); return -1; } @@ -152,6 +171,10 @@ int lws_issue_raw(struct libwebsocket *wsi, unsigned char *buf, size_t len) n = send(wsi->sock, buf, len, MSG_NOSIGNAL); lws_latency(context, wsi, "send lws_issue_raw", n, n == len); if (n < 0) { + if (errno == EAGAIN || errno == EINTR) { + n = 0; + goto handle_truncated_send; + } lwsl_debug("ERROR writing len %d to skt %d\n", len, n); return -1; } @@ -166,16 +189,17 @@ handle_truncated_send: /* * already handling a truncated send? */ - if (wsi->u.ws.truncated_send_malloc) { - lwsl_info("***** partial send moved on by %d (vs %d)\n", n, len); - wsi->u.ws.truncated_send_offset += n; - wsi->u.ws.truncated_send_len -= n; + if (wsi->truncated_send_malloc) { + lwsl_info("***** %x partial send moved on by %d (vs %d)\n", wsi, n, real_len); + wsi->truncated_send_offset += n; + wsi->truncated_send_len -= n; - if (!wsi->u.ws.truncated_send_len) { - lwsl_info("***** partial send completed\n"); + if (!wsi->truncated_send_len) { + lwsl_info("***** %x partial send completed\n", wsi); /* done with it */ - free(wsi->u.ws.truncated_send_malloc); - wsi->u.ws.truncated_send_malloc = NULL; + free(wsi->truncated_send_malloc); + wsi->truncated_send_malloc = NULL; + n = real_len; } else libwebsocket_callback_on_writable( wsi->protocol->owning_server, wsi); @@ -183,7 +207,7 @@ handle_truncated_send: return n; } - if (n < len) { + if (n < real_len) { if (wsi->u.ws.clean_buffer) /* * This buffer unaffected by extension rewriting. @@ -198,23 +222,23 @@ handle_truncated_send: * Newly truncated send. Buffer the remainder (it will get * first priority next time the socket is writable) */ - lwsl_info("***** new partial send %d sent %d accepted\n", len, n); + lwsl_info("***** %x new partial sent %d from %d total\n", wsi, n, real_len); - wsi->u.ws.truncated_send_malloc = malloc(len - n); - if (!wsi->u.ws.truncated_send_malloc) { + wsi->truncated_send_malloc = malloc(real_len - n); + if (!wsi->truncated_send_malloc) { lwsl_err("truncated send: unable to malloc %d\n", - len - n); + real_len - n); return -1; } - wsi->u.ws.truncated_send_offset = 0; - wsi->u.ws.truncated_send_len = len - n; - memcpy(wsi->u.ws.truncated_send_malloc, buf + n, len - n); + wsi->truncated_send_offset = 0; + wsi->truncated_send_len = real_len - n; + memcpy(wsi->truncated_send_malloc, buf + n, real_len - n); libwebsocket_callback_on_writable( wsi->protocol->owning_server, wsi); - return len; + return real_len; } return n; @@ -304,7 +328,7 @@ lws_issue_raw_ext_access(struct libwebsocket *wsi, */ if (!lws_send_pipe_choked(wsi) && - !wsi->u.ws.truncated_send_malloc) + !wsi->truncated_send_malloc) /* no we could add more, lets's do that */ continue; @@ -608,6 +632,17 @@ LWS_VISIBLE int libwebsockets_serve_http_file_fragment( int n, m; while (!lws_send_pipe_choked(wsi)) { + + if (wsi->truncated_send_malloc) { + lws_issue_raw(wsi, wsi->truncated_send_malloc + + wsi->truncated_send_offset, + wsi->truncated_send_len); + continue; + } + + if (wsi->u.http.filepos == wsi->u.http.filelen) + goto all_sent; + n = read(wsi->u.http.fd, context->service_buffer, sizeof(context->service_buffer)); if (n > 0) { @@ -624,8 +659,9 @@ LWS_VISIBLE int libwebsockets_serve_http_file_fragment( if (n < 0) return -1; /* caller will close */ - - if (wsi->u.http.filepos == wsi->u.http.filelen) { +all_sent: + if (!wsi->truncated_send_malloc && + wsi->u.http.filepos == wsi->u.http.filelen) { wsi->state = WSI_STATE_HTTP; if (wsi->protocol->callback) @@ -638,7 +674,7 @@ LWS_VISIBLE int libwebsockets_serve_http_file_fragment( } } - lwsl_notice("choked before able to send whole file (post)\n"); + lwsl_info("choked before able to send whole file (post)\n"); libwebsocket_callback_on_writable(context, wsi); return 0; /* indicates further processing must be done */ diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 147253b9..12b8d5a8 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -378,10 +378,6 @@ struct _lws_websocket_related { unsigned int this_frame_masked:1; unsigned int inside_frame:1; /* next write will be more of frame */ unsigned int clean_buffer:1; /* buffer not rewritten by extension */ - /* truncated send handling */ - unsigned char *truncated_send_malloc; /* non-NULL means buffering in progress */ - unsigned int truncated_send_offset; /* where we are in terms of spilling */ - unsigned int truncated_send_len; /* how much is buffered */ }; struct libwebsocket { @@ -415,6 +411,11 @@ struct libwebsocket { unsigned long latency_start; #endif + /* truncated send handling */ + unsigned char *truncated_send_malloc; /* non-NULL means buffering in progress */ + unsigned int truncated_send_offset; /* where we are in terms of spilling */ + unsigned int truncated_send_len; /* how much is buffered */ + void *user_space; /* members with mutually exclusive lifetimes are unionized */ diff --git a/lib/server.c b/lib/server.c index 7c0421a9..6c35c98e 100644 --- a/lib/server.c +++ b/lib/server.c @@ -145,6 +145,21 @@ int lws_server_socket_service(struct libwebsocket_context *context, /* handle http headers coming in */ + /* pending truncated sends have uber priority */ + + if (wsi->truncated_send_malloc) { + if (pollfd->revents & POLLOUT) + lws_issue_raw(wsi, wsi->truncated_send_malloc + + wsi->truncated_send_offset, + wsi->truncated_send_len); + /* + * we can't afford to allow input processing send + * something new, so spin around he event loop until + * he doesn't have any partials + */ + break; + } + /* any incoming data ready? */ if (pollfd->revents & POLLIN) { @@ -178,11 +193,17 @@ int lws_server_socket_service(struct libwebsocket_context *context, return 0; } + /* hm this may want to send (via HTTP callback for example) */ + n = libwebsocket_read(context, wsi, context->service_buffer, len); if (n < 0) /* we closed wsi */ return 0; + + /* hum he may have used up the writability above */ + + break; } /* this handles POLLOUT for http serving fragments */ diff --git a/test-server/test-server.c b/test-server/test-server.c index 9ca5d218..2cdd67be 100644 --- a/test-server/test-server.c +++ b/test-server/test-server.c @@ -377,7 +377,7 @@ static int callback_http(struct libwebsocket_context *context, goto bail; /* sent it all, close conn */ if (n == 0) - goto bail; + goto flush_bail; /* * because it's HTTP and not websocket, don't need to take * care about pre and postamble @@ -393,6 +393,12 @@ static int callback_http(struct libwebsocket_context *context, } while (!lws_send_pipe_choked(wsi)); libwebsocket_callback_on_writable(context, wsi); break; +flush_bail: + /* true if still partial pending */ + if (lws_send_pipe_choked(wsi)) { + libwebsocket_callback_on_writable(context, wsi); + break; + } bail: close(pss->fd);