diff --git a/include/libwebsockets/lws-secure-streams-policy.h b/include/libwebsockets/lws-secure-streams-policy.h index 26766b9d6..8255927c1 100644 --- a/include/libwebsockets/lws-secure-streams-policy.h +++ b/include/libwebsockets/lws-secure-streams-policy.h @@ -316,7 +316,14 @@ typedef struct lws_ss_policy { const lws_retry_bo_t *retry_bo; /**< retry policy to use */ uint32_t proxy_buflen; /**< max dsh alloc for proxy */ + uint32_t proxy_buflen_rxflow_on_above; + uint32_t proxy_buflen_rxflow_off_below; + uint32_t client_buflen; /**< max dsh alloc for client */ + uint32_t client_buflen_rxflow_on_above; + uint32_t client_buflen_rxflow_off_below; + + uint32_t timeout_ms; /**< default message response * timeout in ms */ uint32_t flags; /**< stream attribute flags */ diff --git a/lib/secure-streams/README.md b/lib/secure-streams/README.md index 6e1c8b1d7..f0182601f 100644 --- a/lib/secure-streams/README.md +++ b/lib/secure-streams/README.md @@ -378,6 +378,15 @@ payload buffering (in bytes) the proxy will hold for this type of stream. If the endpoint dumps a lot of data without any flow control, this may need to be correspondingly large. Default is 32KB. +### `proxy_buflen_rxflow_on_above`, `proxy_buflen_rxflow_off_below` + +When `proxy_buflen` is set, you can also wire up the amount of buffered +data intended for the client held at the proxy, to the onward ss wsi +rx flow control state. If more than `proxy_buflen_rxflow_on_above` +bytes are buffered, rx flow control is set stopping further rx. Once +the dsh is drained below `proxy_buflen_rxflow_off_below`, the rx flow +control is released and RX resumes. + ### `client_buflen` Only used when the streamtype is proxied... sets the maximum size of the diff --git a/lib/secure-streams/policy-json.c b/lib/secure-streams/policy-json.c index 9bc93a329..45f7c5281 100644 --- a/lib/secure-streams/policy-json.c +++ b/lib/secure-streams/policy-json.c @@ -58,7 +58,11 @@ static const char * const lejp_tokens_policy[] = { "s[].*.timeout_ms", "s[].*.tls_trust_store", "s[].*.proxy_buflen", + "s[].*.proxy_buflen_rxflow_on_above", + "s[].*.proxy_buflen_rxflow_off_below", "s[].*.client_buflen", + "s[].*.client_buflen_rxflow_on_above", + "s[].*.client_buflen_rxflow_off_below", "s[].*.metadata", "s[].*.metadata[].*", "s[].*.http_resp_map", @@ -143,7 +147,11 @@ typedef enum { LSSPPT_DEFAULT_TIMEOUT_MS, LSSPPT_TRUST, LSSPPT_PROXY_BUFLEN, + LSSPPT_PROXY_BUFLEN_RXFLOW_ON_ABOVE, + LSSPPT_PROXY_BUFLEN_RXFLOW_OFF_BELOW, LSSPPT_CLIENT_BUFLEN, + LSSPPT_CLIENT_BUFLEN_RXFLOW_ON_ABOVE, + LSSPPT_CLIENT_BUFLEN_RXFLOW_OFF_BELOW, LSSPPT_METADATA, LSSPPT_METADATA_ITEM, LSSPPT_HTTPRESPMAP, @@ -201,7 +209,7 @@ typedef enum { #define POL_AC_GRAIN 800 #define MAX_CERT_TEMP 3072 /* used to discover actual cert size for realloc */ -static uint8_t sizes[] = { +static uint16_t sizes[] = { sizeof(backoff_t), sizeof(lws_ss_x509_t), sizeof(lws_ss_trust_store_t), @@ -577,10 +585,28 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason) a->curr[LTY_POLICY].p->proxy_buflen = (uint32_t)atol(ctx->buf); break; + case LSSPPT_PROXY_BUFLEN_RXFLOW_ON_ABOVE: + a->curr[LTY_POLICY].p->proxy_buflen_rxflow_on_above = + (uint32_t)atol(ctx->buf); + break; + case LSSPPT_PROXY_BUFLEN_RXFLOW_OFF_BELOW: + a->curr[LTY_POLICY].p->proxy_buflen_rxflow_off_below = + (uint32_t)atol(ctx->buf); + break; + case LSSPPT_CLIENT_BUFLEN: a->curr[LTY_POLICY].p->client_buflen = (uint32_t)atol(ctx->buf); break; + case LSSPPT_CLIENT_BUFLEN_RXFLOW_ON_ABOVE: + a->curr[LTY_POLICY].p->client_buflen_rxflow_on_above = + (uint32_t)atol(ctx->buf); + break; + case LSSPPT_CLIENT_BUFLEN_RXFLOW_OFF_BELOW: + a->curr[LTY_POLICY].p->client_buflen_rxflow_off_below = + (uint32_t)atol(ctx->buf); + break; + case LSSPPT_HTTP_METHOD: pp = (char **)&a->curr[LTY_POLICY].p->u.http.method; goto string2; diff --git a/lib/secure-streams/secure-streams-process.c b/lib/secure-streams/secure-streams-process.c index 5e5e80137..2f6813ae6 100644 --- a/lib/secure-streams/secure-streams-process.c +++ b/lib/secure-streams/secure-streams-process.c @@ -144,6 +144,20 @@ ss_proxy_onward_rx(void *userobj, const uint8_t *buf, size_t len, int flags) if (n) return n; + /* + * Manage rx flow on the SS (onward) side according to our situation + * in the dsh holding proxy->client serialized forwarding rx + */ + + if (m->ss->policy->proxy_buflen_rxflow_on_above && m->ss->wsi && + m->conn->dsh->oha[KIND_SS_TO_P].total_size > + m->ss->policy->proxy_buflen_rxflow_on_above) { + lwsl_notice("%s: %s: rxflow disabling rx\n", __func__, + lws_wsi_tag(m->ss->wsi)); + /* stop receiving taking in rx once above the threshold */ + lws_rx_flow_control(m->ss->wsi, 0); + } + if (m->conn->wsi) /* if possible, request client conn write */ lws_callback_on_writable(m->conn->wsi); @@ -166,23 +180,26 @@ ss_proxy_onward_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, lwsl_notice("%s: ss not ready\n", __func__); *len = 0; - return 1; + return LWSSSSRET_TX_DONT_SEND; } /* * The onward secure stream says that we could send something to it - * (by putting it in buf, and setting *len and *flags) + * (by putting it in buf, and setting *len and *flags)... dredge the + * next thing out of the dsh */ if (lws_ss_deserialize_tx_payload(m->conn->dsh, m->ss->wsi, ord, buf, len, flags)) - return 1; + return LWSSSSRET_TX_DONT_SEND; + /* ... there's more we want to send? */ if (!lws_dsh_get_head(m->conn->dsh, KIND_C_TO_P, (void **)&p, &si)) lws_ss_request_tx(m->conn->ss); if (!*len && !*flags) - return 1; /* we don't actually want to send anything */ + /* we don't actually want to send anything */ + return LWSSSSRET_TX_DONT_SEND; lwsl_info("%s: onward tx %d fl 0x%x\n", __func__, (int)*len, *flags); @@ -196,7 +213,7 @@ ss_proxy_onward_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, } #endif - return 0; + return LWSSSSRET_OK; } static lws_ss_state_return_t @@ -562,6 +579,7 @@ callback_ss_proxy(struct lws *wsi, enum lws_callback_reasons reason, if (lws_dsh_get_head(conn->dsh, KIND_SS_TO_P, (void **)&p, &si)) break; + cp = p; #if defined(LWS_WITH_DETAILED_LATENCY) @@ -615,8 +633,28 @@ again: case LPCSPROX_REPORTING_FAIL: goto hangup; case LPCSPROX_OPERATIONAL: - if (pay) + if (pay) { lws_dsh_free((void **)&p); + + /* + * Did we go below the rx flow threshold for + * this dsh? + */ + + if (conn->ss->policy->proxy_buflen_rxflow_on_above && + conn->ss->wsi && + conn->dsh->oha[KIND_SS_TO_P].total_size < + conn->ss->policy->proxy_buflen_rxflow_off_below) { + lwsl_notice("%s: %s: rxflow re-enabling rx\n", + __func__, + lws_wsi_tag(conn->ss->wsi)); + /* + * Resume receiving taking in rx once + * below the low threshold + */ + lws_rx_flow_control(conn->ss->wsi, 1); + } + } if (!lws_dsh_get_head(conn->dsh, KIND_SS_TO_P, (void **)&p, &si)) { if (!lws_send_pipe_choked(wsi)) {