1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

sspc: dsh: add rx flow control to policy

Add .proxy_buflen_rxflow_on_above / .proxy_buflen_rxflow_off_below policy streamtype options
and manage the rx flow control for the onward ss wsi according to how the dsh for the
remote client is doing.

client_buflen_rxflow_... are there but not wired up.
This commit is contained in:
Andy Green 2021-02-01 19:11:25 +00:00
parent 9f1bd0a5c8
commit 15bb455249
4 changed files with 87 additions and 7 deletions

View file

@ -316,7 +316,14 @@ typedef struct lws_ss_policy {
const lws_retry_bo_t *retry_bo; /**< retry policy to use */
uint32_t proxy_buflen; /**< max dsh alloc for proxy */
uint32_t proxy_buflen_rxflow_on_above;
uint32_t proxy_buflen_rxflow_off_below;
uint32_t client_buflen; /**< max dsh alloc for client */
uint32_t client_buflen_rxflow_on_above;
uint32_t client_buflen_rxflow_off_below;
uint32_t timeout_ms; /**< default message response
* timeout in ms */
uint32_t flags; /**< stream attribute flags */

View file

@ -378,6 +378,15 @@ payload buffering (in bytes) the proxy will hold for this type of stream. If
the endpoint dumps a lot of data without any flow control, this may need to
be correspondingly large. Default is 32KB.
### `proxy_buflen_rxflow_on_above`, `proxy_buflen_rxflow_off_below`
When `proxy_buflen` is set, you can also wire up the amount of buffered
data intended for the client held at the proxy, to the onward ss wsi
rx flow control state. If more than `proxy_buflen_rxflow_on_above`
bytes are buffered, rx flow control is set stopping further rx. Once
the dsh is drained below `proxy_buflen_rxflow_off_below`, the rx flow
control is released and RX resumes.
### `client_buflen`
Only used when the streamtype is proxied... sets the maximum size of the

View file

@ -58,7 +58,11 @@ static const char * const lejp_tokens_policy[] = {
"s[].*.timeout_ms",
"s[].*.tls_trust_store",
"s[].*.proxy_buflen",
"s[].*.proxy_buflen_rxflow_on_above",
"s[].*.proxy_buflen_rxflow_off_below",
"s[].*.client_buflen",
"s[].*.client_buflen_rxflow_on_above",
"s[].*.client_buflen_rxflow_off_below",
"s[].*.metadata",
"s[].*.metadata[].*",
"s[].*.http_resp_map",
@ -143,7 +147,11 @@ typedef enum {
LSSPPT_DEFAULT_TIMEOUT_MS,
LSSPPT_TRUST,
LSSPPT_PROXY_BUFLEN,
LSSPPT_PROXY_BUFLEN_RXFLOW_ON_ABOVE,
LSSPPT_PROXY_BUFLEN_RXFLOW_OFF_BELOW,
LSSPPT_CLIENT_BUFLEN,
LSSPPT_CLIENT_BUFLEN_RXFLOW_ON_ABOVE,
LSSPPT_CLIENT_BUFLEN_RXFLOW_OFF_BELOW,
LSSPPT_METADATA,
LSSPPT_METADATA_ITEM,
LSSPPT_HTTPRESPMAP,
@ -201,7 +209,7 @@ typedef enum {
#define POL_AC_GRAIN 800
#define MAX_CERT_TEMP 3072 /* used to discover actual cert size for realloc */
static uint8_t sizes[] = {
static uint16_t sizes[] = {
sizeof(backoff_t),
sizeof(lws_ss_x509_t),
sizeof(lws_ss_trust_store_t),
@ -577,10 +585,28 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
a->curr[LTY_POLICY].p->proxy_buflen = (uint32_t)atol(ctx->buf);
break;
case LSSPPT_PROXY_BUFLEN_RXFLOW_ON_ABOVE:
a->curr[LTY_POLICY].p->proxy_buflen_rxflow_on_above =
(uint32_t)atol(ctx->buf);
break;
case LSSPPT_PROXY_BUFLEN_RXFLOW_OFF_BELOW:
a->curr[LTY_POLICY].p->proxy_buflen_rxflow_off_below =
(uint32_t)atol(ctx->buf);
break;
case LSSPPT_CLIENT_BUFLEN:
a->curr[LTY_POLICY].p->client_buflen = (uint32_t)atol(ctx->buf);
break;
case LSSPPT_CLIENT_BUFLEN_RXFLOW_ON_ABOVE:
a->curr[LTY_POLICY].p->client_buflen_rxflow_on_above =
(uint32_t)atol(ctx->buf);
break;
case LSSPPT_CLIENT_BUFLEN_RXFLOW_OFF_BELOW:
a->curr[LTY_POLICY].p->client_buflen_rxflow_off_below =
(uint32_t)atol(ctx->buf);
break;
case LSSPPT_HTTP_METHOD:
pp = (char **)&a->curr[LTY_POLICY].p->u.http.method;
goto string2;

View file

@ -144,6 +144,20 @@ ss_proxy_onward_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
if (n)
return n;
/*
* Manage rx flow on the SS (onward) side according to our situation
* in the dsh holding proxy->client serialized forwarding rx
*/
if (m->ss->policy->proxy_buflen_rxflow_on_above && m->ss->wsi &&
m->conn->dsh->oha[KIND_SS_TO_P].total_size >
m->ss->policy->proxy_buflen_rxflow_on_above) {
lwsl_notice("%s: %s: rxflow disabling rx\n", __func__,
lws_wsi_tag(m->ss->wsi));
/* stop receiving taking in rx once above the threshold */
lws_rx_flow_control(m->ss->wsi, 0);
}
if (m->conn->wsi) /* if possible, request client conn write */
lws_callback_on_writable(m->conn->wsi);
@ -166,23 +180,26 @@ ss_proxy_onward_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
lwsl_notice("%s: ss not ready\n", __func__);
*len = 0;
return 1;
return LWSSSSRET_TX_DONT_SEND;
}
/*
* The onward secure stream says that we could send something to it
* (by putting it in buf, and setting *len and *flags)
* (by putting it in buf, and setting *len and *flags)... dredge the
* next thing out of the dsh
*/
if (lws_ss_deserialize_tx_payload(m->conn->dsh, m->ss->wsi,
ord, buf, len, flags))
return 1;
return LWSSSSRET_TX_DONT_SEND;
/* ... there's more we want to send? */
if (!lws_dsh_get_head(m->conn->dsh, KIND_C_TO_P, (void **)&p, &si))
lws_ss_request_tx(m->conn->ss);
if (!*len && !*flags)
return 1; /* we don't actually want to send anything */
/* we don't actually want to send anything */
return LWSSSSRET_TX_DONT_SEND;
lwsl_info("%s: onward tx %d fl 0x%x\n", __func__, (int)*len, *flags);
@ -196,7 +213,7 @@ ss_proxy_onward_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
}
#endif
return 0;
return LWSSSSRET_OK;
}
static lws_ss_state_return_t
@ -562,6 +579,7 @@ callback_ss_proxy(struct lws *wsi, enum lws_callback_reasons reason,
if (lws_dsh_get_head(conn->dsh, KIND_SS_TO_P,
(void **)&p, &si))
break;
cp = p;
#if defined(LWS_WITH_DETAILED_LATENCY)
@ -615,8 +633,28 @@ again:
case LPCSPROX_REPORTING_FAIL:
goto hangup;
case LPCSPROX_OPERATIONAL:
if (pay)
if (pay) {
lws_dsh_free((void **)&p);
/*
* Did we go below the rx flow threshold for
* this dsh?
*/
if (conn->ss->policy->proxy_buflen_rxflow_on_above &&
conn->ss->wsi &&
conn->dsh->oha[KIND_SS_TO_P].total_size <
conn->ss->policy->proxy_buflen_rxflow_off_below) {
lwsl_notice("%s: %s: rxflow re-enabling rx\n",
__func__,
lws_wsi_tag(conn->ss->wsi));
/*
* Resume receiving taking in rx once
* below the low threshold
*/
lws_rx_flow_control(conn->ss->wsi, 1);
}
}
if (!lws_dsh_get_head(conn->dsh, KIND_SS_TO_P,
(void **)&p, &si)) {
if (!lws_send_pipe_choked(wsi)) {