mirror of
https://github.com/warmcat/libwebsockets.git
synced 2025-03-09 00:00:04 +01:00
partial: replace ad-hoc code with a wsi buflist_out
Various kinds of input stashing were replaced with a single buflist before v3.0... this patch replaces the partial send arrangements with its own buflist in the same way. Buflists as the name says are growable lists of allocations in a linked-list that take care of book-keeping what's added and removed (even if what is removed is less than the current buffer on the list). The immediate result is that we no longer have to freak out if we had a partial buffered and new output is coming... we can just pile it on the end of the buflist and keep draining the front of it. Likewise we no longer need to be rabid about reporting multiple attempts to send stuff without going back to the event loop, although not doing that will introduce inefficiencies we don't have to term it "illegal" any more. Since buflists have proven reliable on the input side and the logic for dealing with truncated "non-network events" was already there this internal-only change should be relatively self-contained.
This commit is contained in:
parent
23e433ac1b
commit
90e6e65bff
15 changed files with 80 additions and 93 deletions
|
@ -170,7 +170,7 @@ __lws_free_wsi(struct lws *wsi)
|
|||
lws_free(wsi->user_space);
|
||||
|
||||
lws_buflist_destroy_all_segments(&wsi->buflist);
|
||||
lws_free_set_NULL(wsi->trunc_alloc);
|
||||
lws_buflist_destroy_all_segments(&wsi->buflist_out);
|
||||
lws_free_set_NULL(wsi->udp);
|
||||
|
||||
if (wsi->vhost && wsi->vhost->lserv_wsi == wsi)
|
||||
|
@ -723,14 +723,14 @@ __lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason, const char *
|
|||
goto just_kill_connection;
|
||||
|
||||
case LRS_FLUSHING_BEFORE_CLOSE:
|
||||
if (wsi->trunc_len) {
|
||||
if (lws_has_buffered_out(wsi)) {
|
||||
lws_callback_on_writable(wsi);
|
||||
return;
|
||||
}
|
||||
lwsl_info("%p: end LRS_FLUSHING_BEFORE_CLOSE\n", wsi);
|
||||
goto just_kill_connection;
|
||||
default:
|
||||
if (wsi->trunc_len) {
|
||||
if (lws_has_buffered_out(wsi)) {
|
||||
lwsl_info("%p: LRS_FLUSHING_BEFORE_CLOSE\n", wsi);
|
||||
lwsi_set_state(wsi, LRS_FLUSHING_BEFORE_CLOSE);
|
||||
__lws_set_timeout(wsi,
|
||||
|
@ -2166,7 +2166,7 @@ lws_get_ssl(struct lws *wsi)
|
|||
LWS_VISIBLE int
|
||||
lws_partial_buffered(struct lws *wsi)
|
||||
{
|
||||
return !!wsi->trunc_len;
|
||||
return lws_has_buffered_out(wsi);
|
||||
}
|
||||
|
||||
LWS_VISIBLE lws_fileofs_t
|
||||
|
|
|
@ -31,47 +31,54 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len)
|
|||
size_t real_len = len;
|
||||
unsigned int n;
|
||||
|
||||
// lwsl_hexdump_err(buf, len);
|
||||
|
||||
/*
|
||||
* Detect if we got called twice without going through the
|
||||
* event loop to handle pending. This would be caused by either
|
||||
* back-to-back writes in one WRITABLE (illegal) or calling lws_write()
|
||||
* from outside the WRITABLE callback (illegal).
|
||||
* event loop to handle pending. Since that guarantees extending any
|
||||
* existing buflist_out it's inefficient.
|
||||
*/
|
||||
if (wsi->could_have_pending) {
|
||||
lwsl_hexdump_level(LLL_ERR, buf, len);
|
||||
lwsl_err("** %p: vh: %s, prot: %s, role %s: "
|
||||
"Illegal back-to-back write of %lu detected...\n",
|
||||
if (buf && wsi->could_have_pending) {
|
||||
lwsl_hexdump_level(LLL_INFO, buf, len);
|
||||
lwsl_info("** %p: vh: %s, prot: %s, role %s: "
|
||||
"Inefficient back-to-back write of %lu detected...\n",
|
||||
wsi, wsi->vhost->name, wsi->protocol->name,
|
||||
wsi->role_ops->name,
|
||||
(unsigned long)len);
|
||||
// assert(0);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_C_API_WRITE, 1);
|
||||
|
||||
if (!len)
|
||||
return 0;
|
||||
/* just ignore sends after we cleared the truncation buffer */
|
||||
if (lwsi_state(wsi) == LRS_FLUSHING_BEFORE_CLOSE && !wsi->trunc_len)
|
||||
if (lwsi_state(wsi) == LRS_FLUSHING_BEFORE_CLOSE &&
|
||||
!lws_has_buffered_out(wsi))
|
||||
return (int)len;
|
||||
|
||||
if (wsi->trunc_len && (buf < wsi->trunc_alloc ||
|
||||
buf > (wsi->trunc_alloc + wsi->trunc_len + wsi->trunc_offset))) {
|
||||
lwsl_hexdump_level(LLL_ERR, buf, len);
|
||||
lwsl_err("** %p: vh: %s, prot: %s, Sending new %lu, pending truncated ...\n"
|
||||
" It's illegal to do an lws_write outside of\n"
|
||||
" the writable callback: fix your code\n",
|
||||
if (buf && lws_has_buffered_out(wsi)) {
|
||||
lwsl_info("** %p: vh: %s, prot: %s, incr buflist_out by %lu\n",
|
||||
wsi, wsi->vhost->name, wsi->protocol->name,
|
||||
(unsigned long)len);
|
||||
assert(0);
|
||||
|
||||
return -1;
|
||||
/*
|
||||
* already buflist ahead of this, add it on the tail of the
|
||||
* buflist, then ignore it for now and act like we're flushing
|
||||
* the buflist...
|
||||
*/
|
||||
|
||||
lws_buflist_append_segment(&wsi->buflist_out, buf, len);
|
||||
|
||||
buf = NULL;
|
||||
len = 0;
|
||||
}
|
||||
|
||||
if (wsi->buflist_out) {
|
||||
/* we have to drain the earliest buflist_out stuff first */
|
||||
|
||||
len = lws_buflist_next_segment_len(&wsi->buflist_out, &buf);
|
||||
real_len = len;
|
||||
}
|
||||
|
||||
if (!len)
|
||||
return 0;
|
||||
|
||||
if (!wsi->http2_substream && !lws_socket_is_valid(wsi->desc.sockfd))
|
||||
lwsl_warn("** error invalid sock but expected to send\n");
|
||||
|
||||
|
@ -111,16 +118,19 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len)
|
|||
}
|
||||
|
||||
/*
|
||||
* we were already handling a truncated send?
|
||||
* we were sending this from buflist_out? Then not sending everything
|
||||
* is a small matter of advancing ourselves only by the amount we did
|
||||
* send in the buflist.
|
||||
*/
|
||||
if (wsi->trunc_len) {
|
||||
lwsl_info("%p partial adv %d (vs %ld)\n", wsi, n, (long)real_len);
|
||||
wsi->trunc_offset += n;
|
||||
wsi->trunc_len -= n;
|
||||
if (lws_has_buffered_out(wsi)) {
|
||||
lwsl_info("%p partial adv %d (vs %ld)\n", wsi, n,
|
||||
(long)real_len);
|
||||
lws_buflist_use_segment(&wsi->buflist_out, n);
|
||||
|
||||
if (!lws_has_buffered_out(wsi)) {
|
||||
lwsl_info("%s: wsi %p: buflist_out flushed\n",
|
||||
__func__, wsi);
|
||||
|
||||
if (!wsi->trunc_len) {
|
||||
lwsl_info("** %p partial send completed\n", wsi);
|
||||
/* done with it, but don't free it */
|
||||
n = (int)real_len;
|
||||
if (lwsi_state(wsi) == LRS_FLUSHING_BEFORE_CLOSE) {
|
||||
lwsl_info("** %p signalling to close now\n", wsi);
|
||||
|
@ -150,36 +160,19 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len)
|
|||
return n;
|
||||
|
||||
/*
|
||||
* Newly truncated send. Buffer the remainder (it will get
|
||||
* first priority next time the socket is writable).
|
||||
* We were not able to send everything... and we were not sending from
|
||||
* an existing buflist_out. So we are starting a fresh buflist_out, by
|
||||
* buffering the unsent remainder on it.
|
||||
* (it will get first priority next time the socket is writable).
|
||||
*/
|
||||
lwsl_debug("%p new partial sent %d from %lu total\n", wsi, n,
|
||||
(unsigned long)real_len);
|
||||
|
||||
lws_buflist_append_segment(&wsi->buflist_out, buf + n, real_len - n);
|
||||
|
||||
lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_C_WRITE_PARTIALS, 1);
|
||||
lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_B_PARTIALS_ACCEPTED_PARTS, n);
|
||||
|
||||
/*
|
||||
* - if we still have a suitable malloc lying around, use it
|
||||
* - or, if too small, reallocate it
|
||||
* - or, if no buffer, create it
|
||||
*/
|
||||
if (!wsi->trunc_alloc || real_len - n > wsi->trunc_alloc_len) {
|
||||
lws_free(wsi->trunc_alloc);
|
||||
|
||||
wsi->trunc_alloc_len = (unsigned int)(real_len - n);
|
||||
wsi->trunc_alloc = lws_malloc(real_len - n,
|
||||
"truncated send alloc");
|
||||
if (!wsi->trunc_alloc) {
|
||||
lwsl_err("truncated send: unable to malloc %lu\n",
|
||||
(unsigned long)(real_len - n));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
wsi->trunc_offset = 0;
|
||||
wsi->trunc_len = (unsigned int)(real_len - n);
|
||||
memcpy(wsi->trunc_alloc, buf + n, real_len - n);
|
||||
|
||||
#if !defined(LWS_WITH_ESP32)
|
||||
if (lws_wsi_is_udp(wsi)) {
|
||||
/* stash original destination for fulfilling UDP partials */
|
||||
|
@ -264,7 +257,7 @@ lws_ssl_capable_write_no_ssl(struct lws *wsi, unsigned char *buf, int len)
|
|||
|
||||
if (lws_wsi_is_udp(wsi)) {
|
||||
#if !defined(LWS_WITH_ESP32)
|
||||
if (wsi->trunc_len)
|
||||
if (lws_has_buffered_out(wsi))
|
||||
n = sendto(wsi->desc.sockfd, (const char *)buf,
|
||||
len, 0, &wsi->udp->sa_pending,
|
||||
wsi->udp->salen_pending);
|
||||
|
|
|
@ -888,10 +888,8 @@ struct lws {
|
|||
void *user_space;
|
||||
void *opaque_parent_data;
|
||||
|
||||
struct lws_buflist *buflist;
|
||||
|
||||
/* truncated send handling */
|
||||
unsigned char *trunc_alloc; /* non-NULL means buffering in progress */
|
||||
struct lws_buflist *buflist; /* input-side buflist */
|
||||
struct lws_buflist *buflist_out; /* output-side buflist */
|
||||
|
||||
#if defined(LWS_WITH_TLS)
|
||||
struct lws_lws_tls tls;
|
||||
|
@ -916,9 +914,7 @@ struct lws {
|
|||
/* ints */
|
||||
#define LWS_NO_FDS_POS (-1)
|
||||
int position_in_fds_table;
|
||||
unsigned int trunc_alloc_len; /* size of malloc */
|
||||
unsigned int trunc_offset; /* where we are in terms of spilling */
|
||||
unsigned int trunc_len; /* how much is buffered */
|
||||
|
||||
#ifndef LWS_NO_CLIENT
|
||||
int chunk_remaining;
|
||||
#endif
|
||||
|
@ -1063,6 +1059,9 @@ lws_latency(struct lws_context *context, struct lws *wsi, const char *action,
|
|||
int ret, int completion);
|
||||
#endif
|
||||
|
||||
static LWS_INLINE int
|
||||
lws_has_buffered_out(struct lws *wsi) { return !!wsi->buflist_out; }
|
||||
|
||||
LWS_EXTERN int LWS_WARN_UNUSED_RESULT
|
||||
lws_ws_client_rx_sm(struct lws *wsi, unsigned char c);
|
||||
|
||||
|
|
|
@ -76,10 +76,9 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
|
|||
* corrupted.
|
||||
*/
|
||||
|
||||
if (wsi->trunc_len) {
|
||||
if (lws_has_buffered_out(wsi)) {
|
||||
//lwsl_notice("%s: completing partial\n", __func__);
|
||||
if (lws_issue_raw(wsi, wsi->trunc_alloc + wsi->trunc_offset,
|
||||
wsi->trunc_len) < 0) {
|
||||
if (lws_issue_raw(wsi, NULL, 0) < 0) {
|
||||
lwsl_info("%s signalling to close\n", __func__);
|
||||
goto bail_die;
|
||||
}
|
||||
|
|
|
@ -4309,6 +4309,9 @@ LWS_VISIBLE LWS_EXTERN int LWS_WARN_UNUSED_RESULT
|
|||
lws_add_http_common_headers(struct lws *wsi, unsigned int code,
|
||||
const char *content_type, lws_filepos_t content_len,
|
||||
unsigned char **p, unsigned char *end);
|
||||
|
||||
|
||||
|
||||
///@}
|
||||
|
||||
/** \defgroup form-parsing Form Parsing
|
||||
|
|
|
@ -44,7 +44,7 @@ lws_send_pipe_choked(struct lws *wsi)
|
|||
wsi_eff->could_have_pending = 0;
|
||||
|
||||
/* treat the fact we got a truncated send pending as if we're choked */
|
||||
if (wsi_eff->trunc_len)
|
||||
if (lws_has_buffered_out(wsi))
|
||||
return 1;
|
||||
|
||||
FD_ZERO(&writefds);
|
||||
|
|
|
@ -54,14 +54,14 @@ lws_send_pipe_choked(struct lws *wsi)
|
|||
wsi_eff->could_have_pending = 0;
|
||||
|
||||
/* treat the fact we got a truncated send pending as if we're choked */
|
||||
if (wsi_eff->trunc_len)
|
||||
if (lws_has_buffered_out(wsi_eff))
|
||||
return 1;
|
||||
|
||||
#if 0
|
||||
struct lws_pollfd fds;
|
||||
|
||||
/* treat the fact we got a truncated send pending as if we're choked */
|
||||
if (wsi->trunc_len)
|
||||
if (lws_has_buffered_out(wsi))
|
||||
return 1;
|
||||
|
||||
fds.fd = wsi->desc.sockfd;
|
||||
|
|
|
@ -41,7 +41,7 @@ lws_send_pipe_choked(struct lws *wsi)
|
|||
wsi_eff->could_have_pending = 0;
|
||||
|
||||
/* treat the fact we got a truncated send pending as if we're choked */
|
||||
if (wsi_eff->trunc_len)
|
||||
if (lws_has_buffered_out(wsi_eff))
|
||||
return 1;
|
||||
|
||||
fds.fd = wsi_eff->desc.sockfd;
|
||||
|
|
|
@ -15,7 +15,7 @@ lws_send_pipe_choked(struct lws *wsi)
|
|||
wsi_eff->could_have_pending = 0;
|
||||
|
||||
/* treat the fact we got a truncated send pending as if we're choked */
|
||||
if (wsi_eff->trunc_len)
|
||||
if (lws_has_buffered_out(wsi_eff))
|
||||
return 1;
|
||||
|
||||
return (int)wsi_eff->sock_send_blocking;
|
||||
|
|
|
@ -449,10 +449,9 @@ try_pollout:
|
|||
|
||||
if (lwsi_state(wsi) != LRS_ISSUING_FILE) {
|
||||
|
||||
if (wsi->trunc_len) {
|
||||
if (lws_has_buffered_out(wsi)) {
|
||||
//lwsl_notice("%s: completing partial\n", __func__);
|
||||
if (lws_issue_raw(wsi, wsi->trunc_alloc + wsi->trunc_offset,
|
||||
wsi->trunc_len) < 0) {
|
||||
if (lws_issue_raw(wsi, NULL, 0) < 0) {
|
||||
lwsl_info("%s signalling to close\n", __func__);
|
||||
goto fail;
|
||||
}
|
||||
|
|
|
@ -163,7 +163,7 @@ rops_handle_POLLIN_h2(struct lws_context_per_thread *pt, struct lws *wsi,
|
|||
|
||||
if (wsi->http2_substream || wsi->upgraded_to_http2) {
|
||||
wsi1 = lws_get_network_wsi(wsi);
|
||||
if (wsi1 && wsi1->trunc_len)
|
||||
if (wsi1 && lws_has_buffered_out(wsi1))
|
||||
/*
|
||||
* We cannot deal with any kind of new RX
|
||||
* because we are dealing with a partial send
|
||||
|
|
|
@ -1713,7 +1713,7 @@ lws_http_transaction_completed(struct lws *wsi)
|
|||
{
|
||||
int n = NO_PENDING_TIMEOUT;
|
||||
|
||||
if (wsi->trunc_len) {
|
||||
if (lws_has_buffered_out(wsi)) {
|
||||
/*
|
||||
* ...so he tried to send something large as the http reply,
|
||||
* it went as a partial, but he immediately said the
|
||||
|
@ -2082,10 +2082,8 @@ LWS_VISIBLE int lws_serve_http_file_fragment(struct lws *wsi)
|
|||
|
||||
do {
|
||||
|
||||
if (wsi->trunc_len) {
|
||||
if (lws_issue_raw(wsi, wsi->trunc_alloc +
|
||||
wsi->trunc_offset,
|
||||
wsi->trunc_len) < 0) {
|
||||
if (lws_has_buffered_out(wsi)) {
|
||||
if (lws_issue_raw(wsi, NULL, 0) < 0) {
|
||||
lwsl_info("%s: closing\n", __func__);
|
||||
goto file_had_it;
|
||||
}
|
||||
|
@ -2259,7 +2257,7 @@ LWS_VISIBLE int lws_serve_http_file_fragment(struct lws *wsi)
|
|||
}
|
||||
|
||||
all_sent:
|
||||
if ((!wsi->trunc_len && wsi->http.filepos >= wsi->http.filelen)
|
||||
if ((!lws_has_buffered_out(wsi) && wsi->http.filepos >= wsi->http.filelen)
|
||||
#if defined(LWS_WITH_RANGES)
|
||||
|| finished)
|
||||
#else
|
||||
|
|
|
@ -30,12 +30,12 @@ rops_handle_POLLIN_raw_skt(struct lws_context_per_thread *pt, struct lws *wsi,
|
|||
|
||||
/* pending truncated sends have uber priority */
|
||||
|
||||
if (wsi->trunc_len) {
|
||||
if (lws_has_buffered_out(wsi)) {
|
||||
if (!(pollfd->revents & LWS_POLLOUT))
|
||||
return LWS_HPI_RET_HANDLED;
|
||||
|
||||
if (lws_issue_raw(wsi, wsi->trunc_alloc + wsi->trunc_offset,
|
||||
wsi->trunc_len) < 0)
|
||||
/* drain the output buflist */
|
||||
if (lws_issue_raw(wsi, NULL, 0) < 0)
|
||||
goto fail;
|
||||
/*
|
||||
* we can't afford to allow input processing to send
|
||||
|
|
|
@ -283,7 +283,7 @@ lws_issue_raw_ext_access(struct lws *wsi, unsigned char *buf, size_t len)
|
|||
* Or we had to hold on to some of it?
|
||||
*/
|
||||
|
||||
if (!lws_send_pipe_choked(wsi) && !wsi->trunc_len)
|
||||
if (!lws_send_pipe_choked(wsi) && !lws_has_buffered_out(wsi))
|
||||
/* no we could add more, lets's do that */
|
||||
continue;
|
||||
|
||||
|
|
|
@ -976,7 +976,7 @@ rops_handle_POLLIN_ws(struct lws_context_per_thread *pt, struct lws *wsi,
|
|||
#if defined(LWS_WITH_HTTP2)
|
||||
if (wsi->http2_substream || wsi->upgraded_to_http2) {
|
||||
wsi1 = lws_get_network_wsi(wsi);
|
||||
if (wsi1 && wsi1->trunc_len)
|
||||
if (wsi1 && lws_has_buffered_out(wsi1))
|
||||
/* We cannot deal with any kind of new RX
|
||||
* because we are dealing with a partial send
|
||||
* (new RX may trigger new http_action() that
|
||||
|
@ -1525,10 +1525,6 @@ rops_close_role_ws(struct lws_context_per_thread *pt, struct lws *wsi)
|
|||
#endif
|
||||
lws_free_set_NULL(wsi->ws->rx_ubuf);
|
||||
|
||||
if (wsi->trunc_alloc)
|
||||
/* not going to be completed... nuke it */
|
||||
lws_free_set_NULL(wsi->trunc_alloc);
|
||||
|
||||
wsi->ws->ping_payload_len = 0;
|
||||
wsi->ws->ping_pending_flag = 0;
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue