diff --git a/lib/core-net/private.h b/lib/core-net/private.h index d2168d817..fab2f0685 100644 --- a/lib/core-net/private.h +++ b/lib/core-net/private.h @@ -786,6 +786,14 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or); LWS_EXTERN int __remove_wsi_socket_from_fds(struct lws *wsi); + +enum { + LWSRXFC_ERROR = -1, + LWSRXFC_CACHED = 0, + LWSRXFC_ADDITIONAL = 1, + LWSRXFC_TRIMMED = 2, +}; + LWS_EXTERN int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len); diff --git a/lib/core-net/service.c b/lib/core-net/service.c index d9f0202a1..ab0812806 100644 --- a/lib/core-net/service.c +++ b/lib/core-net/service.c @@ -295,23 +295,32 @@ __lws_service_timeout_check(struct lws *wsi, time_t sec) return 0; } -int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len) +int +lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len) { struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; uint8_t *buffered; size_t blen; - int ret = 0, m; + int ret = LWSRXFC_CACHED, m; /* his RX is flowcontrolled, don't send remaining now */ blen = lws_buflist_next_segment_len(&wsi->buflist, &buffered); if (blen) { if (buf >= buffered && buf + len <= buffered + blen) { - /* rxflow while we were spilling prev rxflow */ - lwsl_info("%s: staying in rxflow buf\n", __func__); + /* + * rxflow while we were spilling prev rxflow + * + * len indicates how much was unused, then... so trim + * the head buflist to match that situation + */ - return 1; + lws_buflist_use_segment(&wsi->buflist, blen - len); + lwsl_debug("%s: trim existing rxflow %d -> %d\n", + __func__, (int)blen, (int)len); + + return LWSRXFC_TRIMMED; } - ret = 1; + ret = LWSRXFC_ADDITIONAL; } /* a new rxflow, buffer it and warn caller */ @@ -319,7 +328,7 @@ int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len) m = lws_buflist_append_segment(&wsi->buflist, buf + n, len - n); if (m < 0) - return -1; + return LWSRXFC_ERROR; if (m) { lwsl_debug("%s: added %p to rxflow list\n", __func__, wsi); lws_dll_add_head(&wsi->dll_buflist, &pt->dll_head_buflist); diff --git a/lib/core-net/wsi.c b/lib/core-net/wsi.c index e98addf78..d64dfd9a3 100644 --- a/lib/core-net/wsi.c +++ b/lib/core-net/wsi.c @@ -337,6 +337,8 @@ __lws_rx_flow_control(struct lws *wsi) /* adjust the pollfd for this wsi */ if (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW) { + lwsl_info("%s: reenable POLLIN\n", __func__); + // lws_buflist_describe(&wsi->buflist, NULL); if (__lws_change_pollfd(wsi, 0, LWS_POLLIN)) { lwsl_info("%s: fail\n", __func__); return -1; diff --git a/lib/roles/ws/client-ws.c b/lib/roles/ws/client-ws.c index 18b225595..ab3709654 100644 --- a/lib/roles/ws/client-ws.c +++ b/lib/roles/ws/client-ws.c @@ -66,22 +66,46 @@ lws_create_client_ws_object(const struct lws_client_connect_info *i, int lws_ws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len) { + unsigned char *bufin = *buf; + if ((lwsi_state(wsi) != LRS_WAITING_PROXY_REPLY) && (lwsi_state(wsi) != LRS_H1C_ISSUE_HANDSHAKE) && (lwsi_state(wsi) != LRS_WAITING_SERVER_REPLY) && !lwsi_role_client(wsi)) return 0; - // lwsl_notice("%s: hs client gets %d in\n", __func__, (int)len); + lwsl_debug("%s: hs client feels it has %d in\n", __func__, (int)len); while (len) { /* * we were accepting input but now we stopped doing so */ if (lws_is_flowcontrolled(wsi)) { - //lwsl_notice("%s: caching %ld\n", __func__, (long)len); - lws_rxflow_cache(wsi, *buf, 0, (int)len); - *buf += len; + lwsl_debug("%s: caching %ld\n", __func__, (long)len); + /* + * Since we cached the remaining available input, we + * can say we "consumed" it. + * + * But what about the case where the available input + * came out of the rxflow cache already? If we are + * effectively "putting it back in the cache", we have + * to place it at the cache head, not the tail as usual. + */ + if (lws_rxflow_cache(wsi, *buf, 0, (int)len) == + LWSRXFC_TRIMMED) { + /* + * we dealt with it by trimming the existing + * rxflow cache HEAD to account for what we used. + * + * indicate we didn't use anything to the caller + * so he doesn't do any consumed processing + */ + lwsl_info("%s: trimming inside rxflow cache\n", + __func__); + *buf = bufin; + } else + *buf += len; + return 0; } #if !defined(LWS_WITHOUT_EXTENSIONS) diff --git a/lib/roles/ws/ops-ws.c b/lib/roles/ws/ops-ws.c index 7f1d5e13e..b6e975738 100644 --- a/lib/roles/ws/ops-ws.c +++ b/lib/roles/ws/ops-ws.c @@ -937,10 +937,6 @@ rops_handle_POLLIN_ws(struct lws_context_per_thread *pt, struct lws *wsi, return LWS_HPI_RET_HANDLED; } - //lwsl_notice("%s: wsi->ws->tx_draining_ext %d revents 0x%x 0x%x %d\n", - //__func__, wsi->ws->tx_draining_ext, pollfd->revents, wsi->wsistate, - //lwsi_state_can_handle_POLLOUT(wsi)); - /* 1: something requested a callback when it was OK to write */ if ((pollfd->revents & LWS_POLLOUT) && @@ -977,14 +973,21 @@ rops_handle_POLLIN_ws(struct lws_context_per_thread *pt, struct lws *wsi, */ return LWS_HPI_RET_HANDLED; #endif - if (lws_is_flowcontrolled(wsi)) { + if ((pollfd->revents & LWS_POLLIN) && lws_is_flowcontrolled(wsi)) { /* We cannot deal with any kind of new RX because we are * RX-flowcontrolled. */ - lwsl_info("flowcontrolled\n"); + lwsl_info("%s: flowcontrolled, ignoring rx\n", __func__); + + if (__lws_change_pollfd(wsi, LWS_POLLIN, 0)) + return -1; + return LWS_HPI_RET_HANDLED; } + if (lws_is_flowcontrolled(wsi)) + return LWS_HPI_RET_HANDLED; + #if defined(LWS_WITH_HTTP2) if (wsi->http2_substream || wsi->upgraded_to_http2) { wsi1 = lws_get_network_wsi(wsi); diff --git a/lib/roles/ws/server-ws.c b/lib/roles/ws/server-ws.c index 80e3e7794..4cfbae54f 100644 --- a/lib/roles/ws/server-ws.c +++ b/lib/roles/ws/server-ws.c @@ -899,10 +899,11 @@ lws_parse_ws(struct lws *wsi, unsigned char **buf, size_t len) * we were accepting input but now we stopped doing so */ if (wsi->rxflow_bitmap) { - lwsl_info("%s: doing rxflow\n", __func__); - lws_rxflow_cache(wsi, *buf, 0, (int)len); - lwsl_parser("%s: cached %ld\n", __func__, (long)len); - *buf += len; /* stashing it is taking care of it */ + lwsl_info("%s: doing rxflow, caching %d\n", __func__, + (int)len); + if (lws_rxflow_cache(wsi, *buf, 0, (int)len) != + LWSRXFC_TRIMMED) + *buf += len; /* stashing it is taking care of it */ return 1; } #if !defined(LWS_WITHOUT_EXTENSIONS) diff --git a/minimal-examples/ws-server/minimal-ws-server-pmd-bulk/protocol_lws_minimal_pmd_bulk.c b/minimal-examples/ws-server/minimal-ws-server-pmd-bulk/protocol_lws_minimal_pmd_bulk.c index 679ea07a7..831aff695 100644 --- a/minimal-examples/ws-server/minimal-ws-server-pmd-bulk/protocol_lws_minimal_pmd_bulk.c +++ b/minimal-examples/ws-server/minimal-ws-server-pmd-bulk/protocol_lws_minimal_pmd_bulk.c @@ -161,7 +161,7 @@ callback_minimal_pmd_bulk(struct lws *wsi, enum lws_callback_reasons reason, m = lws_write(wsi, start, n, flags); lwsl_user("LWS_CALLBACK_SERVER_WRITEABLE: wrote %d\n", n); if (m < n) { - lwsl_err("ERROR %d writing ws\n", n); + lwsl_err("ERROR %d / %d writing ws\n", m, n); return -1; } if (pss->position_tx != MESSAGE_SIZE) /* if more to do... */