diff --git a/lib/context.c b/lib/context.c index 83b356937..1ed9e15d3 100644 --- a/lib/context.c +++ b/lib/context.c @@ -504,11 +504,13 @@ static const struct lws_protocols protocols_dummy[] = { /* first protocol must always be HTTP handler */ { - "http-only", /* name */ - lws_callback_http_dummy, /* callback */ - 0, /* per_session_data_size */ - 0, /* max frame size / rx buffer */ - 0, NULL, 0 + "http-only", /* name */ + lws_callback_http_dummy, /* callback */ + 0, /* per_session_data_size */ + 0, /* rx_buffer_size */ + 0, /* id */ + NULL, /* user */ + 0 /* tx_packet_size */ }, /* * the other protocols are provided by lws plugins diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index af8ad1722..e6ec7db89 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -93,7 +93,7 @@ __lws_free_wsi(struct lws *wsi) wsi->user_space && !wsi->user_space_externally_allocated) lws_free(wsi->user_space); - lws_free_set_NULL(wsi->rxflow_buffer); + lws_buflist_destroy_all_segments(&wsi->buflist_rxflow); lws_free_set_NULL(wsi->trunc_alloc); lws_free_set_NULL(wsi->ws); lws_free_set_NULL(wsi->udp); @@ -842,6 +842,10 @@ just_kill_connection: __lws_remove_from_timeout_list(wsi); lws_dll_lws_remove(&wsi->dll_hrtimer); + /* don't repeat event loop stuff */ + if (wsi->told_event_loop_closed) + return; + /* checking return redundant since we anyway close */ if (wsi->desc.sockfd != LWS_SOCK_INVALID) __remove_wsi_socket_from_fds(wsi); @@ -849,7 +853,8 @@ just_kill_connection: lws_same_vh_protocol_remove(wsi); lwsi_set_state(wsi, LRS_DEAD_SOCKET); - lws_free_set_NULL(wsi->rxflow_buffer); + lws_buflist_destroy_all_segments(&wsi->buflist_rxflow); + lws_dll_lws_remove(&wsi->dll_rxflow); if (wsi->role_ops->close_role) wsi->role_ops->close_role(pt, wsi); @@ -952,6 +957,114 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason, const char *ca lws_pt_unlock(pt); } +/* lws_buflist */ + +int +lws_buflist_append_segment(struct lws_buflist **head, uint8_t *buf, size_t len) +{ + int first = !*head; + void *p; + + assert(buf); + assert(len); + + /* append at the tail */ + while (*head) + head = &((*head)->next); + + lwsl_info("%s: len %u\n", __func__, (uint32_t)len); + + *head = (struct lws_buflist *) + lws_malloc(sizeof(**head) + len, __func__); + if (!*head) { + lwsl_err("%s: OOM\n", __func__); + return -1; + } + + (*head)->len = len; + (*head)->pos = 0; + (*head)->next = NULL; + + p = (void *)(*head)->buf; + memcpy(p, buf, len); + + return first; /* returns 1 if first segment just created */ +} + +static int +lws_buflist_destroy_segment(struct lws_buflist **head) +{ + struct lws_buflist *old = *head; + + assert(*head); + *head = (*head)->next; + lws_free(old); + + return !*head; /* returns 1 if last segment just destroyed */ +} + +void +lws_buflist_destroy_all_segments(struct lws_buflist **head) +{ + struct lws_buflist *p = *head, *p1; + + while (p) { + p1 = p->next; + lws_free(p); + p = p1; + } + + *head = NULL; +} + +size_t +lws_buflist_next_segment_len(struct lws_buflist **head, uint8_t **buf) +{ + if (!*head) { + if (buf) + *buf = NULL; + + return 0; + } + + if (!(*head)->len && (*head)->next) + lws_buflist_destroy_segment(head); + + if (!*head) { + if (buf) + *buf = NULL; + + return 0; + } + + assert((*head)->pos < (*head)->len); + + if (buf) + *buf = (*head)->buf + (*head)->pos; + + return (*head)->len - (*head)->pos; +} + +int +lws_buflist_use_segment(struct lws_buflist **head, size_t len) +{ + assert(*head); + assert(len); + + assert((*head)->pos + len <= (*head)->len); + + (*head)->pos += len; + if ((*head)->pos == (*head)->len) + lws_buflist_destroy_segment(head); + + if (!*head) + return 0; + + return (*head)->len; +} + +/* ... */ + LWS_VISIBLE LWS_EXTERN const char * lws_get_urlarg_by_name(struct lws *wsi, const char *name, char *buf, int len) { @@ -1486,6 +1599,11 @@ lws_rx_flow_control(struct lws *wsi, int _enable) struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; int en = _enable; + // h2 ignores rx flow control atm + if (lwsi_role_h2(wsi) || wsi->http2_substream || + lwsi_role_h2_ENCAPSULATION(wsi)) + return 0; // !!! + lwsl_info("%s: %p 0x%x\n", __func__, wsi, _enable); if (!(_enable & LWS_RXFLOW_REASON_APPLIES)) { @@ -2103,6 +2221,11 @@ __lws_rx_flow_control(struct lws *wsi) { struct lws *wsic = wsi->child_list; + // h2 ignores rx flow control atm + if (lwsi_role_h2(wsi) || wsi->http2_substream || + lwsi_role_h2_ENCAPSULATION(wsi)) + return 0; // !!! + /* if he has children, do those if they were changed */ while (wsic) { if (wsic->rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE) @@ -2116,13 +2239,13 @@ __lws_rx_flow_control(struct lws *wsi) return 0; /* stuff is still buffered, not ready to really accept new input */ - if (wsi->rxflow_buffer) { + if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) { /* get ourselves called back to deal with stashed buffer */ lws_callback_on_writable(wsi); return 0; } - /* pending is cleared, we can change rxflow state */ + /* now the pending is cleared, we can change rxflow state */ wsi->rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE; diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index 13b0984fd..db614c069 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -2380,8 +2380,8 @@ struct lws_protocols { * be able to consume it all without having to return to the event * loop. That is supported in lws. * - * If .tx_packet_size is 0, this also controls how much may be sent at once - * for backwards compatibility. + * If .tx_packet_size is 0, this also controls how much may be sent at + * once for backwards compatibility. */ unsigned int id; /**< ignored by lws, but useful to contain user information bound @@ -5724,6 +5724,60 @@ lws_dll_lws_remove(struct lws_dll_lws *_a) } \ } +struct lws_buflist; + +/** + * lws_buflist_append_segment(): add buffer to buflist at head + * + * \param head: list head + * \param buf: buffer to stash + * \param len: length of buffer to stash + * + * Returns -1 on OOM, 1 if this was the first segment on the list, and 0 if + * it was a subsequent segment. + */ +LWS_VISIBLE LWS_EXTERN int +lws_buflist_append_segment(struct lws_buflist **head, uint8_t *buf, size_t len); +/** + * lws_buflist_next_segment_len(): number of bytes left in current segment + * + * \param head: list head + * \param buf: if non-NULL, *buf is written with the address of the start of + * the remaining data in the segment + * + * Returns the number of bytes left in the current segment. 0 indicates + * that the buflist is empty (there are no segments on the buflist). + */ +LWS_VISIBLE LWS_EXTERN size_t +lws_buflist_next_segment_len(struct lws_buflist **head, uint8_t **buf); +/** + * lws_buflist_use_segment(): remove len bytes from the current segment + * + * \param head: list head + * \param len: number of bytes to mark as used + * + * If len is less than the remaining length of the current segment, the position + * in the current segment is simply advanced and it returns. + * + * If len uses up the remaining length of the current segment, then the segment + * is deleted and the list head moves to the next segment if any. + * + * Returns the number of bytes left in the current segment. 0 indicates + * that the buflist is empty (there are no segments on the buflist). + */ +LWS_VISIBLE LWS_EXTERN int +lws_buflist_use_segment(struct lws_buflist **head, size_t len); +/** + * lws_buflist_destroy_all_segments(): free all segments on the list + * + * \param head: list head + * + * This frees everything on the list unconditionally. *head is always + * NULL after this. + */ +LWS_VISIBLE LWS_EXTERN void +lws_buflist_destroy_all_segments(struct lws_buflist **head); + /** * lws_ptr_diff(): helper to report distance between pointers as an int * diff --git a/lib/pollfd.c b/lib/pollfd.c index d9029c446..18474b7ed 100644 --- a/lib/pollfd.c +++ b/lib/pollfd.c @@ -377,7 +377,7 @@ __lws_change_pollfd(struct lws *wsi, int _and, int _or) if (!wsi || (!wsi->protocol && !wsi->event_pipe) || wsi->position_in_fds_table < 0) - return 1; + return 0; context = lws_get_context(wsi); if (!context) diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index c1a2c3025..8f5787335 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -524,7 +524,7 @@ enum lwsi_role { #define lwsi_role(wsi) (wsi->wsistate & LWSI_ROLE_MASK) #if !defined (_DEBUG) #define lwsi_set_role(wsi, role) wsi->wsistate = \ - (wsi->wsistate & (~LWSI_ROLE_MASK)) | role + (wsi->wsistate & (~LWSI_ROLE_MASK)) | role #else void lwsi_set_role(struct lws *wsi, lws_wsi_state_t role); #endif @@ -959,6 +959,7 @@ struct lws_context_per_thread { struct lws *tx_draining_ext_list; struct lws_dll_lws dll_head_timeout; struct lws_dll_lws dll_head_hrtimer; + struct lws_dll_lws dll_head_rxflow; #if defined(LWS_WITH_LIBUV) || defined(LWS_WITH_LIBEVENT) struct lws_context *context; #endif @@ -2018,6 +2019,15 @@ struct lws_access_log { }; #endif +struct lws_buflist { + struct lws_buflist *next; + + size_t len; + size_t pos; + + uint8_t buf[1]; /* true length of this is set by the oversize malloc */ +}; + #define lws_wsi_is_udp(___wsi) (!!___wsi->udp) struct lws { @@ -2056,6 +2066,7 @@ struct lws { struct lws_dll_lws dll_timeout; struct lws_dll_lws dll_hrtimer; + struct lws_dll_lws dll_rxflow; #if defined(LWS_WITH_PEER_LIMITS) struct lws_peer *peer; #endif @@ -2072,8 +2083,9 @@ struct lws { #endif void *user_space; void *opaque_parent_data; - /* rxflow handling */ - unsigned char *rxflow_buffer; + + struct lws_buflist *buflist_rxflow; + /* truncated send handling */ unsigned char *trunc_alloc; /* non-NULL means buffering in progress */ @@ -2112,8 +2124,6 @@ struct lws { /* ints */ int position_in_fds_table; - uint32_t rxflow_len; - uint32_t rxflow_pos; uint32_t preamble_rx_len; unsigned int trunc_alloc_len; /* size of malloc */ unsigned int trunc_offset; /* where we are in terms of spilling */ diff --git a/lib/roles/h1/client-h1.c b/lib/roles/h1/client-h1.c index 1229aabb3..e3b8607f0 100644 --- a/lib/roles/h1/client-h1.c +++ b/lib/roles/h1/client-h1.c @@ -53,8 +53,9 @@ lws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len) continue; } /* account for what we're using in rxflow buffer */ - if (wsi->rxflow_buffer) - wsi->rxflow_pos++; + if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL) && + !lws_buflist_use_segment(&wsi->buflist_rxflow, 1)) + lws_dll_lws_remove(&wsi->dll_rxflow); if (lws_client_rx_sm(wsi, *(*buf)++)) { lwsl_debug("client_rx_sm exited\n"); diff --git a/lib/roles/h2/http2.c b/lib/roles/h2/http2.c index 8563204a5..b68152c01 100644 --- a/lib/roles/h2/http2.c +++ b/lib/roles/h2/http2.c @@ -2186,10 +2186,9 @@ lws_read_h2(struct lws *wsi, unsigned char *buf, lws_filepos_t len) } /* account for what we're using in rxflow buffer */ - if (wsi->rxflow_buffer) { - wsi->rxflow_pos += (int)body_chunk_len; - assert(wsi->rxflow_pos <= wsi->rxflow_len); - } + if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL) && + !lws_buflist_use_segment(&wsi->buflist_rxflow, body_chunk_len)) + lws_dll_lws_remove(&wsi->dll_rxflow); buf += body_chunk_len; len -= body_chunk_len; diff --git a/lib/roles/h2/ops-h2.c b/lib/roles/h2/ops-h2.c index 148c6cb37..5c9a475b5 100644 --- a/lib/roles/h2/ops-h2.c +++ b/lib/roles/h2/ops-h2.c @@ -175,14 +175,10 @@ rops_handle_POLLIN_h2(struct lws_context_per_thread *pt, struct lws *wsi, /* 3: RX Flowcontrol buffer / h2 rx scratch needs to be drained */ - if (wsi->rxflow_buffer) { - lwsl_info("draining rxflow (len %d)\n", - wsi->rxflow_len - wsi->rxflow_pos); - assert(wsi->rxflow_pos < wsi->rxflow_len); - /* well, drain it */ - eff_buf.token = (char *)wsi->rxflow_buffer + - wsi->rxflow_pos; - eff_buf.token_len = wsi->rxflow_len - wsi->rxflow_pos; + eff_buf.token_len = lws_buflist_next_segment_len(&wsi->buflist_rxflow, + (uint8_t **)&eff_buf.token); + if (eff_buf.token_len) { + lwsl_info("draining rxflow (len %d)\n", eff_buf.token_len); draining_flow = 1; goto drain; } @@ -359,10 +355,9 @@ drain: goto read; } - if (draining_flow && wsi->rxflow_buffer && - wsi->rxflow_pos == wsi->rxflow_len) { + if (draining_flow && /* were draining, now nothing left */ + !lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) { lwsl_info("%s: %p flow buf: drained\n", __func__, wsi); - lws_free_set_NULL(wsi->rxflow_buffer); /* having drained the rxflow buffer, can rearm POLLIN */ #ifdef LWS_NO_SERVER n = @@ -766,21 +761,41 @@ rops_callback_on_writable_h2(struct lws *wsi) return 0; } +static void +lws_h2_dump_waiting_children(struct lws *wsi) +{ +#if defined(_DEBUG) + lwsl_info("%s: %p: children waiting for POLLOUT service:\n", + __func__, wsi); + + wsi = wsi->h2.child_list; + while (wsi) { + if (wsi->h2.requested_POLLOUT) + lwsl_info(" * %p %s\n", wsi, wsi->protocol->name); + else + lwsl_info(" %p %s\n", wsi, wsi->protocol->name); + + wsi = wsi->h2.sibling_list; + } +#endif +} + +/* + * we are the 'network wsi' for potentially many muxed child wsi with + * no network connection of their own, who have to use us for all their + * network actions. So we use a round-robin scheme to share out the + * POLLOUT notifications to our children. + * + * But because any child could exhaust the socket's ability to take + * writes, we can only let one child get notified each time. + * + * In addition children may be closed / deleted / added between POLLOUT + * notifications, so we can't hold pointers + */ + static int rops_perform_user_POLLOUT_h2(struct lws *wsi) { - /* - * we are the 'network wsi' for potentially many muxed child wsi with - * no network connection of their own, who have to use us for all their - * network actions. So we use a round-robin scheme to share out the - * POLLOUT notifications to our children. - * - * But because any child could exhaust the socket's ability to take - * writes, we can only let one child get notified each time. - * - * In addition children may be closed / deleted / added between POLLOUT - * notifications, so we can't hold pointers - */ struct lws **wsi2, *wsi2a; int write_type = LWS_WRITE_PONG, n; @@ -792,16 +807,7 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi) return 0; } - lwsl_info("%s: %p: children waiting for POLLOUT service:\n", __func__, wsi); - wsi2a = wsi->h2.child_list; - while (wsi2a) { - if (wsi2a->h2.requested_POLLOUT) - lwsl_info(" * %p %s\n", wsi2a, wsi2a->protocol->name); - else - lwsl_info(" %p %s\n", wsi2a, wsi2a->protocol->name); - - wsi2a = wsi2a->h2.sibling_list; - } + lws_h2_dump_waiting_children(wsi); wsi2 = &wsi->h2.child_list; if (!*wsi2) @@ -842,7 +848,8 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi) } w->h2.requested_POLLOUT = 0; - lwsl_info("%s: child %p (state %d)\n", __func__, w, lwsi_state(w)); + lwsl_info("%s: child %p (wsistate 0x%x)\n", __func__, w, + w->wsistate); /* if we arrived here, even by looping, we checked choked */ w->could_have_pending = 0; @@ -855,7 +862,8 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi) strlen(w->h2.pending_status_body + LWS_PRE), LWS_WRITE_HTTP_FINAL); lws_free_set_NULL(w->h2.pending_status_body); - lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, "h2 end stream 1"); + lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, + "h2 end stream 1"); wa = &wsi->h2.child_list; goto next_child; } @@ -892,7 +900,8 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi) */ if (n || w->h2.send_END_STREAM) { lwsl_info("closing stream after h2 action\n"); - lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, "h2 end stream"); + lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, + "h2 end stream"); wa = &wsi->h2.child_list; } @@ -918,7 +927,8 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi) */ if (n < 0 || w->h2.send_END_STREAM) { lwsl_debug("Closing POLLOUT child %p\n", w); - lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, "h2 end stream file"); + lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, + "h2 end stream file"); wa = &wsi->h2.child_list; goto next_child; } @@ -944,14 +954,16 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi) if (n >= 0) { lwsi_set_state(w, LRS_AWAITING_CLOSE_ACK); lws_set_timeout(w, PENDING_TIMEOUT_CLOSE_ACK, 5); - lwsl_debug("sent close indication, awaiting ack\n"); + lwsl_debug("sent close frame, awaiting ack\n"); } goto next_child; } - /* Acknowledge receipt of peer's notification he closed, - * then logically close ourself */ + /* + * Acknowledge receipt of peer's notification he closed, + * then logically close ourself + */ if ((lwsi_role_ws(w) && w->ws->ping_pending_flag) || (lwsi_state(w) == LRS_RETURNED_CLOSE && @@ -969,11 +981,12 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi) /* well he is sent, mark him done */ w->ws->ping_pending_flag = 0; if (w->ws->payload_is_close) { - /* oh... a close frame was it... then we are done */ + /* oh... a close frame... then we are done */ lwsl_debug("Acknowledged peer's close packet\n"); w->ws->payload_is_close = 0; lwsi_set_state(w, LRS_RETURNED_CLOSE); - lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, "returned close packet"); + lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, + "returned close packet"); wa = &wsi->h2.child_list; goto next_child; } @@ -986,8 +999,10 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi) } if (lws_callback_as_writeable(w)) { - lwsl_info("Closing POLLOUT child (end stream %d)\n", w->h2.send_END_STREAM); - lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, "h2 pollout handle"); + lwsl_info("Closing POLLOUT child (end stream %d)\n", + w->h2.send_END_STREAM); + lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, + "h2 pollout handle"); wa = &wsi->h2.child_list; } else if (w->h2.send_END_STREAM) @@ -997,18 +1012,7 @@ next_child: wsi2 = wa; } while (wsi2 && *wsi2 && !lws_send_pipe_choked(wsi)); - lwsl_info("%s: %p: children waiting for POLLOUT service: %p\n", - __func__, wsi, wsi->h2.child_list); - wsi2a = wsi->h2.child_list; - while (wsi2a) { - if (wsi2a->h2.requested_POLLOUT) - lwsl_debug(" * %p\n", wsi2a); - else - lwsl_debug(" %p\n", wsi2a); - - wsi2a = wsi2a->h2.sibling_list; - } - + // lws_h2_dump_waiting_children(wsi); wsi2a = wsi->h2.child_list; while (wsi2a) { diff --git a/lib/roles/ws/ops-ws.c b/lib/roles/ws/ops-ws.c index 7e997d775..9e1436072 100644 --- a/lib/roles/ws/ops-ws.c +++ b/lib/roles/ws/ops-ws.c @@ -941,14 +941,10 @@ rops_handle_POLLIN_ws(struct lws_context_per_thread *pt, struct lws *wsi, /* 3: RX Flowcontrol buffer / h2 rx scratch needs to be drained */ - if (wsi->rxflow_buffer) { - lwsl_info("draining rxflow (len %d)\n", - wsi->rxflow_len - wsi->rxflow_pos); - assert(wsi->rxflow_pos < wsi->rxflow_len); - /* well, drain it */ - eff_buf.token = (char *)wsi->rxflow_buffer + - wsi->rxflow_pos; - eff_buf.token_len = wsi->rxflow_len - wsi->rxflow_pos; + eff_buf.token_len = lws_buflist_next_segment_len(&wsi->buflist_rxflow, + (uint8_t **)&eff_buf.token); + if (eff_buf.token_len) { + lwsl_info("draining rxflow (len %d)\n", eff_buf.token_len); draining_flow = 1; goto drain; } @@ -1145,10 +1141,9 @@ drain: goto read; } - if (draining_flow && wsi->rxflow_buffer && - wsi->rxflow_pos == wsi->rxflow_len) { + if (draining_flow && /* were draining, now nothing left */ + !lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) { lwsl_info("%s: %p flow buf: drained\n", __func__, wsi); - lws_free_set_NULL(wsi->rxflow_buffer); /* having drained the rxflow buffer, can rearm POLLIN */ #ifdef LWS_NO_SERVER n = diff --git a/lib/roles/ws/server-ws.c b/lib/roles/ws/server-ws.c index d2db68254..fa7ddfe8f 100644 --- a/lib/roles/ws/server-ws.c +++ b/lib/roles/ws/server-ws.c @@ -559,7 +559,7 @@ bail: int lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len) { - int m; + int m, draining_flow = 0; lwsl_parser("%s: received %d byte packet\n", __func__, (int)len); @@ -572,6 +572,7 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len) if (wsi->rxflow_bitmap) { lws_rxflow_cache(wsi, *buf, 0, (int)len); lwsl_parser("%s: cached %ld\n", __func__, (long)len); + buf += len; /* stashing it is taking care of it */ return 1; } @@ -583,18 +584,21 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len) } /* account for what we're using in rxflow buffer */ - if (wsi->rxflow_buffer) { - wsi->rxflow_pos++; - if (wsi->rxflow_pos > wsi->rxflow_len) - assert(0); + if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) { + draining_flow = 1; + if (!lws_buflist_use_segment(&wsi->buflist_rxflow, 1)) + lws_dll_lws_remove(&wsi->dll_rxflow); } /* consume payload bytes efficiently */ if (wsi->lws_rx_parse_state == LWS_RXPS_PAYLOAD_UNTIL_LENGTH_EXHAUSTED) { m = lws_payload_until_length_exhausted(wsi, buf, &len); - if (wsi->rxflow_buffer) - wsi->rxflow_pos += m; + if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) { + draining_flow = 1; + if (!lws_buflist_use_segment(&wsi->buflist_rxflow, m)) + lws_dll_lws_remove(&wsi->dll_rxflow); + } } /* process the byte */ @@ -603,9 +607,10 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len) return -1; len--; - if (wsi->rxflow_buffer && wsi->rxflow_pos == wsi->rxflow_len) { + if (draining_flow && /* were draining, now nothing left */ + !lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) { lwsl_debug("%s: %p flow buf: drained\n", __func__, wsi); - lws_free_set_NULL(wsi->rxflow_buffer); + /* having drained the rxflow buffer, can rearm POLLIN */ #ifdef LWS_NO_SERVER m = diff --git a/lib/service.c b/lib/service.c index 74d858626..f9b709f4c 100644 --- a/lib/service.c +++ b/lib/service.c @@ -277,37 +277,36 @@ __lws_service_timeout_check(struct lws *wsi, time_t sec) int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len) { + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + uint8_t *buffered; + size_t blen; + int ret = 0, m; + if (wsi->role_ops->rxflow_cache) if (wsi->role_ops->rxflow_cache(wsi, buf, n, len)) return 0; /* his RX is flowcontrolled, don't send remaining now */ - if (wsi->rxflow_buffer) { - if (buf >= wsi->rxflow_buffer && - &buf[len - 1] < &wsi->rxflow_buffer[wsi->rxflow_len]) { + blen = lws_buflist_next_segment_len(&wsi->buflist_rxflow, &buffered); + if (blen) { + if (buf >= buffered && buf + len <= buffered + blen) { /* rxflow while we were spilling prev rxflow */ lwsl_info("%s: staying in rxflow buf\n", __func__); - return 1; - } else { - lwsl_err("%s: conflicting rxflow buf, " - "current %p len %d, new %p len %d\n", __func__, - wsi->rxflow_buffer, wsi->rxflow_len, buf, len); - assert(0); + return 1; } + ret = 1; } /* a new rxflow, buffer it and warn caller */ - lwsl_info("%s: new rxflow input buffer len %d\n", __func__, len - n); - wsi->rxflow_buffer = lws_malloc(len - n, "rxflow buf"); - if (!wsi->rxflow_buffer) + + m = lws_buflist_append_segment(&wsi->buflist_rxflow, buf + n, len - n); + if (m < 0) return -1; + if (m) + lws_dll_lws_add_front(&wsi->dll_rxflow, &pt->dll_head_rxflow); - wsi->rxflow_len = len - n; - wsi->rxflow_pos = 0; - memcpy(wsi->rxflow_buffer, buf + n, len - n); - - return 0; + return ret; } /* this is used by the platform service code to stop us waiting for network diff --git a/lwsws/main.c b/lwsws/main.c index 6efe5c87b..dcc7b7fa0 100644 --- a/lwsws/main.c +++ b/lwsws/main.c @@ -121,6 +121,7 @@ context_creation(void) info.external_baggage_free_on_destroy = config_strings; info.max_http_header_pool = 1024; + info.pt_serv_buf_size = 8192; info.options = opts | LWS_SERVER_OPTION_VALIDATE_UTF8 | LWS_SERVER_OPTION_EXPLICIT_VHOSTS | LWS_SERVER_OPTION_LIBUV; diff --git a/test-apps/test.html b/test-apps/test.html index 786ced01f..91c6dc2e3 100644 --- a/test-apps/test.html +++ b/test-apps/test.html @@ -845,7 +845,7 @@ function ev_mousemove (ev) { clearTimeout(lm_timer); pending = ""; } else - lm_timer = setTimeout(lm_timer_handler, 30); + lm_timer = setTimeout(lm_timer_handler, 1); last_x = x; last_y = y;