1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

lws_buflist

For h1 / ws, a combination of removing POLLIN wait and
stashing any unused rx lets us immediately respond to
rx flow control requests in a simple and effective way,
because the tcp socket is the stream.

But for muxed protocols like h2, that technique cannot
be used because we cannot silence the whole bundle of
streams because one can't handle any more rx dynamically.

There are control frames and content for other streams
serialized inbetween the flow controlled stream content.
We have no choice but to read to so we can see the other
things.  Therefore for muxed protocols like h2 and spdy,
rx flow control boils down to tx credit manipulation
on individual streams to staunch the flow at the peer.

However that requires a round trip to take effect, any
transmitted packets that were in flight before the tx credit
reduction arrives at the remote peer are still going to come
and have to be dealt with by adding them to the stash.

This patch introduces lws_buflist scatter-gather type
buffer management for rxflow handling, so we can append
buffer segments in a linked-list to handle whatever rx
is unavoidably in flight on a stream that is trying to
assert rx flow control.
This commit is contained in:
Andy Green 2018-04-13 16:01:38 +08:00
parent 7812ffabcc
commit 4b7144f763
13 changed files with 310 additions and 117 deletions

View file

@ -504,11 +504,13 @@ static const struct lws_protocols protocols_dummy[] = {
/* first protocol must always be HTTP handler */
{
"http-only", /* name */
lws_callback_http_dummy, /* callback */
0, /* per_session_data_size */
0, /* max frame size / rx buffer */
0, NULL, 0
"http-only", /* name */
lws_callback_http_dummy, /* callback */
0, /* per_session_data_size */
0, /* rx_buffer_size */
0, /* id */
NULL, /* user */
0 /* tx_packet_size */
},
/*
* the other protocols are provided by lws plugins

View file

@ -93,7 +93,7 @@ __lws_free_wsi(struct lws *wsi)
wsi->user_space && !wsi->user_space_externally_allocated)
lws_free(wsi->user_space);
lws_free_set_NULL(wsi->rxflow_buffer);
lws_buflist_destroy_all_segments(&wsi->buflist_rxflow);
lws_free_set_NULL(wsi->trunc_alloc);
lws_free_set_NULL(wsi->ws);
lws_free_set_NULL(wsi->udp);
@ -842,6 +842,10 @@ just_kill_connection:
__lws_remove_from_timeout_list(wsi);
lws_dll_lws_remove(&wsi->dll_hrtimer);
/* don't repeat event loop stuff */
if (wsi->told_event_loop_closed)
return;
/* checking return redundant since we anyway close */
if (wsi->desc.sockfd != LWS_SOCK_INVALID)
__remove_wsi_socket_from_fds(wsi);
@ -849,7 +853,8 @@ just_kill_connection:
lws_same_vh_protocol_remove(wsi);
lwsi_set_state(wsi, LRS_DEAD_SOCKET);
lws_free_set_NULL(wsi->rxflow_buffer);
lws_buflist_destroy_all_segments(&wsi->buflist_rxflow);
lws_dll_lws_remove(&wsi->dll_rxflow);
if (wsi->role_ops->close_role)
wsi->role_ops->close_role(pt, wsi);
@ -952,6 +957,114 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason, const char *ca
lws_pt_unlock(pt);
}
/* lws_buflist */
int
lws_buflist_append_segment(struct lws_buflist **head, uint8_t *buf, size_t len)
{
int first = !*head;
void *p;
assert(buf);
assert(len);
/* append at the tail */
while (*head)
head = &((*head)->next);
lwsl_info("%s: len %u\n", __func__, (uint32_t)len);
*head = (struct lws_buflist *)
lws_malloc(sizeof(**head) + len, __func__);
if (!*head) {
lwsl_err("%s: OOM\n", __func__);
return -1;
}
(*head)->len = len;
(*head)->pos = 0;
(*head)->next = NULL;
p = (void *)(*head)->buf;
memcpy(p, buf, len);
return first; /* returns 1 if first segment just created */
}
static int
lws_buflist_destroy_segment(struct lws_buflist **head)
{
struct lws_buflist *old = *head;
assert(*head);
*head = (*head)->next;
lws_free(old);
return !*head; /* returns 1 if last segment just destroyed */
}
void
lws_buflist_destroy_all_segments(struct lws_buflist **head)
{
struct lws_buflist *p = *head, *p1;
while (p) {
p1 = p->next;
lws_free(p);
p = p1;
}
*head = NULL;
}
size_t
lws_buflist_next_segment_len(struct lws_buflist **head, uint8_t **buf)
{
if (!*head) {
if (buf)
*buf = NULL;
return 0;
}
if (!(*head)->len && (*head)->next)
lws_buflist_destroy_segment(head);
if (!*head) {
if (buf)
*buf = NULL;
return 0;
}
assert((*head)->pos < (*head)->len);
if (buf)
*buf = (*head)->buf + (*head)->pos;
return (*head)->len - (*head)->pos;
}
int
lws_buflist_use_segment(struct lws_buflist **head, size_t len)
{
assert(*head);
assert(len);
assert((*head)->pos + len <= (*head)->len);
(*head)->pos += len;
if ((*head)->pos == (*head)->len)
lws_buflist_destroy_segment(head);
if (!*head)
return 0;
return (*head)->len;
}
/* ... */
LWS_VISIBLE LWS_EXTERN const char *
lws_get_urlarg_by_name(struct lws *wsi, const char *name, char *buf, int len)
{
@ -1486,6 +1599,11 @@ lws_rx_flow_control(struct lws *wsi, int _enable)
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
int en = _enable;
// h2 ignores rx flow control atm
if (lwsi_role_h2(wsi) || wsi->http2_substream ||
lwsi_role_h2_ENCAPSULATION(wsi))
return 0; // !!!
lwsl_info("%s: %p 0x%x\n", __func__, wsi, _enable);
if (!(_enable & LWS_RXFLOW_REASON_APPLIES)) {
@ -2103,6 +2221,11 @@ __lws_rx_flow_control(struct lws *wsi)
{
struct lws *wsic = wsi->child_list;
// h2 ignores rx flow control atm
if (lwsi_role_h2(wsi) || wsi->http2_substream ||
lwsi_role_h2_ENCAPSULATION(wsi))
return 0; // !!!
/* if he has children, do those if they were changed */
while (wsic) {
if (wsic->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE)
@ -2116,13 +2239,13 @@ __lws_rx_flow_control(struct lws *wsi)
return 0;
/* stuff is still buffered, not ready to really accept new input */
if (wsi->rxflow_buffer) {
if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
/* get ourselves called back to deal with stashed buffer */
lws_callback_on_writable(wsi);
return 0;
}
/* pending is cleared, we can change rxflow state */
/* now the pending is cleared, we can change rxflow state */
wsi->rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE;

View file

@ -2380,8 +2380,8 @@ struct lws_protocols {
* be able to consume it all without having to return to the event
* loop. That is supported in lws.
*
* If .tx_packet_size is 0, this also controls how much may be sent at once
* for backwards compatibility.
* If .tx_packet_size is 0, this also controls how much may be sent at
* once for backwards compatibility.
*/
unsigned int id;
/**< ignored by lws, but useful to contain user information bound
@ -5724,6 +5724,60 @@ lws_dll_lws_remove(struct lws_dll_lws *_a)
} \
}
struct lws_buflist;
/**
* lws_buflist_append_segment(): add buffer to buflist at head
*
* \param head: list head
* \param buf: buffer to stash
* \param len: length of buffer to stash
*
* Returns -1 on OOM, 1 if this was the first segment on the list, and 0 if
* it was a subsequent segment.
*/
LWS_VISIBLE LWS_EXTERN int
lws_buflist_append_segment(struct lws_buflist **head, uint8_t *buf, size_t len);
/**
* lws_buflist_next_segment_len(): number of bytes left in current segment
*
* \param head: list head
* \param buf: if non-NULL, *buf is written with the address of the start of
* the remaining data in the segment
*
* Returns the number of bytes left in the current segment. 0 indicates
* that the buflist is empty (there are no segments on the buflist).
*/
LWS_VISIBLE LWS_EXTERN size_t
lws_buflist_next_segment_len(struct lws_buflist **head, uint8_t **buf);
/**
* lws_buflist_use_segment(): remove len bytes from the current segment
*
* \param head: list head
* \param len: number of bytes to mark as used
*
* If len is less than the remaining length of the current segment, the position
* in the current segment is simply advanced and it returns.
*
* If len uses up the remaining length of the current segment, then the segment
* is deleted and the list head moves to the next segment if any.
*
* Returns the number of bytes left in the current segment. 0 indicates
* that the buflist is empty (there are no segments on the buflist).
*/
LWS_VISIBLE LWS_EXTERN int
lws_buflist_use_segment(struct lws_buflist **head, size_t len);
/**
* lws_buflist_destroy_all_segments(): free all segments on the list
*
* \param head: list head
*
* This frees everything on the list unconditionally. *head is always
* NULL after this.
*/
LWS_VISIBLE LWS_EXTERN void
lws_buflist_destroy_all_segments(struct lws_buflist **head);
/**
* lws_ptr_diff(): helper to report distance between pointers as an int
*

View file

@ -377,7 +377,7 @@ __lws_change_pollfd(struct lws *wsi, int _and, int _or)
if (!wsi || (!wsi->protocol && !wsi->event_pipe) ||
wsi->position_in_fds_table < 0)
return 1;
return 0;
context = lws_get_context(wsi);
if (!context)

View file

@ -524,7 +524,7 @@ enum lwsi_role {
#define lwsi_role(wsi) (wsi->wsistate & LWSI_ROLE_MASK)
#if !defined (_DEBUG)
#define lwsi_set_role(wsi, role) wsi->wsistate = \
(wsi->wsistate & (~LWSI_ROLE_MASK)) | role
(wsi->wsistate & (~LWSI_ROLE_MASK)) | role
#else
void lwsi_set_role(struct lws *wsi, lws_wsi_state_t role);
#endif
@ -959,6 +959,7 @@ struct lws_context_per_thread {
struct lws *tx_draining_ext_list;
struct lws_dll_lws dll_head_timeout;
struct lws_dll_lws dll_head_hrtimer;
struct lws_dll_lws dll_head_rxflow;
#if defined(LWS_WITH_LIBUV) || defined(LWS_WITH_LIBEVENT)
struct lws_context *context;
#endif
@ -2018,6 +2019,15 @@ struct lws_access_log {
};
#endif
struct lws_buflist {
struct lws_buflist *next;
size_t len;
size_t pos;
uint8_t buf[1]; /* true length of this is set by the oversize malloc */
};
#define lws_wsi_is_udp(___wsi) (!!___wsi->udp)
struct lws {
@ -2056,6 +2066,7 @@ struct lws {
struct lws_dll_lws dll_timeout;
struct lws_dll_lws dll_hrtimer;
struct lws_dll_lws dll_rxflow;
#if defined(LWS_WITH_PEER_LIMITS)
struct lws_peer *peer;
#endif
@ -2072,8 +2083,9 @@ struct lws {
#endif
void *user_space;
void *opaque_parent_data;
/* rxflow handling */
unsigned char *rxflow_buffer;
struct lws_buflist *buflist_rxflow;
/* truncated send handling */
unsigned char *trunc_alloc; /* non-NULL means buffering in progress */
@ -2112,8 +2124,6 @@ struct lws {
/* ints */
int position_in_fds_table;
uint32_t rxflow_len;
uint32_t rxflow_pos;
uint32_t preamble_rx_len;
unsigned int trunc_alloc_len; /* size of malloc */
unsigned int trunc_offset; /* where we are in terms of spilling */

View file

@ -53,8 +53,9 @@ lws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len)
continue;
}
/* account for what we're using in rxflow buffer */
if (wsi->rxflow_buffer)
wsi->rxflow_pos++;
if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL) &&
!lws_buflist_use_segment(&wsi->buflist_rxflow, 1))
lws_dll_lws_remove(&wsi->dll_rxflow);
if (lws_client_rx_sm(wsi, *(*buf)++)) {
lwsl_debug("client_rx_sm exited\n");

View file

@ -2186,10 +2186,9 @@ lws_read_h2(struct lws *wsi, unsigned char *buf, lws_filepos_t len)
}
/* account for what we're using in rxflow buffer */
if (wsi->rxflow_buffer) {
wsi->rxflow_pos += (int)body_chunk_len;
assert(wsi->rxflow_pos <= wsi->rxflow_len);
}
if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL) &&
!lws_buflist_use_segment(&wsi->buflist_rxflow, body_chunk_len))
lws_dll_lws_remove(&wsi->dll_rxflow);
buf += body_chunk_len;
len -= body_chunk_len;

View file

@ -175,14 +175,10 @@ rops_handle_POLLIN_h2(struct lws_context_per_thread *pt, struct lws *wsi,
/* 3: RX Flowcontrol buffer / h2 rx scratch needs to be drained
*/
if (wsi->rxflow_buffer) {
lwsl_info("draining rxflow (len %d)\n",
wsi->rxflow_len - wsi->rxflow_pos);
assert(wsi->rxflow_pos < wsi->rxflow_len);
/* well, drain it */
eff_buf.token = (char *)wsi->rxflow_buffer +
wsi->rxflow_pos;
eff_buf.token_len = wsi->rxflow_len - wsi->rxflow_pos;
eff_buf.token_len = lws_buflist_next_segment_len(&wsi->buflist_rxflow,
(uint8_t **)&eff_buf.token);
if (eff_buf.token_len) {
lwsl_info("draining rxflow (len %d)\n", eff_buf.token_len);
draining_flow = 1;
goto drain;
}
@ -359,10 +355,9 @@ drain:
goto read;
}
if (draining_flow && wsi->rxflow_buffer &&
wsi->rxflow_pos == wsi->rxflow_len) {
if (draining_flow && /* were draining, now nothing left */
!lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
lwsl_info("%s: %p flow buf: drained\n", __func__, wsi);
lws_free_set_NULL(wsi->rxflow_buffer);
/* having drained the rxflow buffer, can rearm POLLIN */
#ifdef LWS_NO_SERVER
n =
@ -766,21 +761,41 @@ rops_callback_on_writable_h2(struct lws *wsi)
return 0;
}
static void
lws_h2_dump_waiting_children(struct lws *wsi)
{
#if defined(_DEBUG)
lwsl_info("%s: %p: children waiting for POLLOUT service:\n",
__func__, wsi);
wsi = wsi->h2.child_list;
while (wsi) {
if (wsi->h2.requested_POLLOUT)
lwsl_info(" * %p %s\n", wsi, wsi->protocol->name);
else
lwsl_info(" %p %s\n", wsi, wsi->protocol->name);
wsi = wsi->h2.sibling_list;
}
#endif
}
/*
* we are the 'network wsi' for potentially many muxed child wsi with
* no network connection of their own, who have to use us for all their
* network actions. So we use a round-robin scheme to share out the
* POLLOUT notifications to our children.
*
* But because any child could exhaust the socket's ability to take
* writes, we can only let one child get notified each time.
*
* In addition children may be closed / deleted / added between POLLOUT
* notifications, so we can't hold pointers
*/
static int
rops_perform_user_POLLOUT_h2(struct lws *wsi)
{
/*
* we are the 'network wsi' for potentially many muxed child wsi with
* no network connection of their own, who have to use us for all their
* network actions. So we use a round-robin scheme to share out the
* POLLOUT notifications to our children.
*
* But because any child could exhaust the socket's ability to take
* writes, we can only let one child get notified each time.
*
* In addition children may be closed / deleted / added between POLLOUT
* notifications, so we can't hold pointers
*/
struct lws **wsi2, *wsi2a;
int write_type = LWS_WRITE_PONG, n;
@ -792,16 +807,7 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi)
return 0;
}
lwsl_info("%s: %p: children waiting for POLLOUT service:\n", __func__, wsi);
wsi2a = wsi->h2.child_list;
while (wsi2a) {
if (wsi2a->h2.requested_POLLOUT)
lwsl_info(" * %p %s\n", wsi2a, wsi2a->protocol->name);
else
lwsl_info(" %p %s\n", wsi2a, wsi2a->protocol->name);
wsi2a = wsi2a->h2.sibling_list;
}
lws_h2_dump_waiting_children(wsi);
wsi2 = &wsi->h2.child_list;
if (!*wsi2)
@ -842,7 +848,8 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi)
}
w->h2.requested_POLLOUT = 0;
lwsl_info("%s: child %p (state %d)\n", __func__, w, lwsi_state(w));
lwsl_info("%s: child %p (wsistate 0x%x)\n", __func__, w,
w->wsistate);
/* if we arrived here, even by looping, we checked choked */
w->could_have_pending = 0;
@ -855,7 +862,8 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi)
strlen(w->h2.pending_status_body +
LWS_PRE), LWS_WRITE_HTTP_FINAL);
lws_free_set_NULL(w->h2.pending_status_body);
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, "h2 end stream 1");
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
"h2 end stream 1");
wa = &wsi->h2.child_list;
goto next_child;
}
@ -892,7 +900,8 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi)
*/
if (n || w->h2.send_END_STREAM) {
lwsl_info("closing stream after h2 action\n");
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, "h2 end stream");
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
"h2 end stream");
wa = &wsi->h2.child_list;
}
@ -918,7 +927,8 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi)
*/
if (n < 0 || w->h2.send_END_STREAM) {
lwsl_debug("Closing POLLOUT child %p\n", w);
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, "h2 end stream file");
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
"h2 end stream file");
wa = &wsi->h2.child_list;
goto next_child;
}
@ -944,14 +954,16 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi)
if (n >= 0) {
lwsi_set_state(w, LRS_AWAITING_CLOSE_ACK);
lws_set_timeout(w, PENDING_TIMEOUT_CLOSE_ACK, 5);
lwsl_debug("sent close indication, awaiting ack\n");
lwsl_debug("sent close frame, awaiting ack\n");
}
goto next_child;
}
/* Acknowledge receipt of peer's notification he closed,
* then logically close ourself */
/*
* Acknowledge receipt of peer's notification he closed,
* then logically close ourself
*/
if ((lwsi_role_ws(w) && w->ws->ping_pending_flag) ||
(lwsi_state(w) == LRS_RETURNED_CLOSE &&
@ -969,11 +981,12 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi)
/* well he is sent, mark him done */
w->ws->ping_pending_flag = 0;
if (w->ws->payload_is_close) {
/* oh... a close frame was it... then we are done */
/* oh... a close frame... then we are done */
lwsl_debug("Acknowledged peer's close packet\n");
w->ws->payload_is_close = 0;
lwsi_set_state(w, LRS_RETURNED_CLOSE);
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, "returned close packet");
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
"returned close packet");
wa = &wsi->h2.child_list;
goto next_child;
}
@ -986,8 +999,10 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi)
}
if (lws_callback_as_writeable(w)) {
lwsl_info("Closing POLLOUT child (end stream %d)\n", w->h2.send_END_STREAM);
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, "h2 pollout handle");
lwsl_info("Closing POLLOUT child (end stream %d)\n",
w->h2.send_END_STREAM);
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
"h2 pollout handle");
wa = &wsi->h2.child_list;
} else
if (w->h2.send_END_STREAM)
@ -997,18 +1012,7 @@ next_child:
wsi2 = wa;
} while (wsi2 && *wsi2 && !lws_send_pipe_choked(wsi));
lwsl_info("%s: %p: children waiting for POLLOUT service: %p\n",
__func__, wsi, wsi->h2.child_list);
wsi2a = wsi->h2.child_list;
while (wsi2a) {
if (wsi2a->h2.requested_POLLOUT)
lwsl_debug(" * %p\n", wsi2a);
else
lwsl_debug(" %p\n", wsi2a);
wsi2a = wsi2a->h2.sibling_list;
}
// lws_h2_dump_waiting_children(wsi);
wsi2a = wsi->h2.child_list;
while (wsi2a) {

View file

@ -941,14 +941,10 @@ rops_handle_POLLIN_ws(struct lws_context_per_thread *pt, struct lws *wsi,
/* 3: RX Flowcontrol buffer / h2 rx scratch needs to be drained
*/
if (wsi->rxflow_buffer) {
lwsl_info("draining rxflow (len %d)\n",
wsi->rxflow_len - wsi->rxflow_pos);
assert(wsi->rxflow_pos < wsi->rxflow_len);
/* well, drain it */
eff_buf.token = (char *)wsi->rxflow_buffer +
wsi->rxflow_pos;
eff_buf.token_len = wsi->rxflow_len - wsi->rxflow_pos;
eff_buf.token_len = lws_buflist_next_segment_len(&wsi->buflist_rxflow,
(uint8_t **)&eff_buf.token);
if (eff_buf.token_len) {
lwsl_info("draining rxflow (len %d)\n", eff_buf.token_len);
draining_flow = 1;
goto drain;
}
@ -1145,10 +1141,9 @@ drain:
goto read;
}
if (draining_flow && wsi->rxflow_buffer &&
wsi->rxflow_pos == wsi->rxflow_len) {
if (draining_flow && /* were draining, now nothing left */
!lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
lwsl_info("%s: %p flow buf: drained\n", __func__, wsi);
lws_free_set_NULL(wsi->rxflow_buffer);
/* having drained the rxflow buffer, can rearm POLLIN */
#ifdef LWS_NO_SERVER
n =

View file

@ -559,7 +559,7 @@ bail:
int
lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
{
int m;
int m, draining_flow = 0;
lwsl_parser("%s: received %d byte packet\n", __func__, (int)len);
@ -572,6 +572,7 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
if (wsi->rxflow_bitmap) {
lws_rxflow_cache(wsi, *buf, 0, (int)len);
lwsl_parser("%s: cached %ld\n", __func__, (long)len);
buf += len; /* stashing it is taking care of it */
return 1;
}
@ -583,18 +584,21 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
}
/* account for what we're using in rxflow buffer */
if (wsi->rxflow_buffer) {
wsi->rxflow_pos++;
if (wsi->rxflow_pos > wsi->rxflow_len)
assert(0);
if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
draining_flow = 1;
if (!lws_buflist_use_segment(&wsi->buflist_rxflow, 1))
lws_dll_lws_remove(&wsi->dll_rxflow);
}
/* consume payload bytes efficiently */
if (wsi->lws_rx_parse_state ==
LWS_RXPS_PAYLOAD_UNTIL_LENGTH_EXHAUSTED) {
m = lws_payload_until_length_exhausted(wsi, buf, &len);
if (wsi->rxflow_buffer)
wsi->rxflow_pos += m;
if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
draining_flow = 1;
if (!lws_buflist_use_segment(&wsi->buflist_rxflow, m))
lws_dll_lws_remove(&wsi->dll_rxflow);
}
}
/* process the byte */
@ -603,9 +607,10 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
return -1;
len--;
if (wsi->rxflow_buffer && wsi->rxflow_pos == wsi->rxflow_len) {
if (draining_flow && /* were draining, now nothing left */
!lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
lwsl_debug("%s: %p flow buf: drained\n", __func__, wsi);
lws_free_set_NULL(wsi->rxflow_buffer);
/* having drained the rxflow buffer, can rearm POLLIN */
#ifdef LWS_NO_SERVER
m =

View file

@ -277,37 +277,36 @@ __lws_service_timeout_check(struct lws *wsi, time_t sec)
int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
uint8_t *buffered;
size_t blen;
int ret = 0, m;
if (wsi->role_ops->rxflow_cache)
if (wsi->role_ops->rxflow_cache(wsi, buf, n, len))
return 0;
/* his RX is flowcontrolled, don't send remaining now */
if (wsi->rxflow_buffer) {
if (buf >= wsi->rxflow_buffer &&
&buf[len - 1] < &wsi->rxflow_buffer[wsi->rxflow_len]) {
blen = lws_buflist_next_segment_len(&wsi->buflist_rxflow, &buffered);
if (blen) {
if (buf >= buffered && buf + len <= buffered + blen) {
/* rxflow while we were spilling prev rxflow */
lwsl_info("%s: staying in rxflow buf\n", __func__);
return 1;
} else {
lwsl_err("%s: conflicting rxflow buf, "
"current %p len %d, new %p len %d\n", __func__,
wsi->rxflow_buffer, wsi->rxflow_len, buf, len);
assert(0);
return 1;
}
ret = 1;
}
/* a new rxflow, buffer it and warn caller */
lwsl_info("%s: new rxflow input buffer len %d\n", __func__, len - n);
wsi->rxflow_buffer = lws_malloc(len - n, "rxflow buf");
if (!wsi->rxflow_buffer)
m = lws_buflist_append_segment(&wsi->buflist_rxflow, buf + n, len - n);
if (m < 0)
return -1;
if (m)
lws_dll_lws_add_front(&wsi->dll_rxflow, &pt->dll_head_rxflow);
wsi->rxflow_len = len - n;
wsi->rxflow_pos = 0;
memcpy(wsi->rxflow_buffer, buf + n, len - n);
return 0;
return ret;
}
/* this is used by the platform service code to stop us waiting for network

View file

@ -121,6 +121,7 @@ context_creation(void)
info.external_baggage_free_on_destroy = config_strings;
info.max_http_header_pool = 1024;
info.pt_serv_buf_size = 8192;
info.options = opts | LWS_SERVER_OPTION_VALIDATE_UTF8 |
LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
LWS_SERVER_OPTION_LIBUV;

View file

@ -845,7 +845,7 @@ function ev_mousemove (ev) {
clearTimeout(lm_timer);
pending = "";
} else
lm_timer = setTimeout(lm_timer_handler, 30);
lm_timer = setTimeout(lm_timer_handler, 1);
last_x = x;
last_y = y;