1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

ss: proxy: get rx flow control working

This fixes the proxy rx flow by adding an lws_dsh helper to hide the
off-by-one in the "kind" array (kind 0 is reserved for tracking the
unallocated dsh blocks).

For testing, it adds a --blob option on minimal-secure-streams[-client]
which uses a streamtype "bulkproxflow" from here

https://warmcat.com/policy/minimal-proxy-v4.2-v2.json

		"bulkproxflow": {
			"endpoint": "warmcat.com",
			"port": 443,
			"protocol": "h1",
			"http_method": "GET",
			"http_url": "blob.bin",
			"proxy_buflen": 32768,
			"proxy_buflen_rxflow_on_above": 24576,
			"proxy_buflen_rxflow_off_below": 8192,
			"tls": true,
			"retry": "default",
			"tls_trust_store": "le_via_dst"
		}

This downloads a 51MB blob of random data with the SHA256sum

ed5720c16830810e5829dfb9b66c96b2e24efc4f93aa5e38c7ff4150d31cfbbf

The minimal-secure-streams --blob example client delays the download by
50ms every 10KiB it sees to force rx flow usage at the proxy.

It downloads the whole thing and checks the SHA256 is as expected.

Logs about rxflow status are available at LLL_INFO log level.
This commit is contained in:
Andy Green 2021-04-07 14:25:07 +01:00
parent fdc1e1e9a5
commit 42dc817d8f
9 changed files with 130 additions and 36 deletions

View file

@ -115,6 +115,9 @@ lws_dsh_alloc_tail(struct lws_dsh *dsh, int kind, const void *src1,
LWS_VISIBLE LWS_EXTERN void
lws_dsh_free(void **obj);
LWS_VISIBLE LWS_EXTERN size_t
lws_dsh_get_size(struct lws_dsh *dsh, int kind);
/**
* lws_dsh_get_head() - get the head allocation inside the dsh
*

View file

@ -246,6 +246,15 @@ lws_dsh_destroy(lws_dsh_t **pdsh)
lws_free_set_NULL(*pdsh);
}
size_t
lws_dsh_get_size(struct lws_dsh *dsh, int kind)
{
kind++;
assert(kind < dsh->count_kinds);
return dsh->oha[kind].total_size;
}
static int
_lws_dsh_alloc_tail(lws_dsh_t *dsh, int kind, const void *src1, size_t size1,
const void *src2, size_t size2, lws_dll2_t *replace)

View file

@ -555,6 +555,8 @@ struct conn {
lws_ss_handle_t *ss; /* the onward, ss side */
lws_ss_conn_states_t state;
char onward_in_flow_control;
};
extern const struct ss_pcols ss_pcol_h1;

View file

@ -154,13 +154,21 @@ ss_proxy_onward_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
* in the dsh holding proxy->client serialized forwarding rx
*/
if (m->ss->policy->proxy_buflen_rxflow_on_above && m->ss->wsi &&
m->conn->dsh->oha[KIND_SS_TO_P].total_size >
if (!m->conn->onward_in_flow_control && m->ss->wsi &&
m->ss->policy->proxy_buflen_rxflow_on_above &&
lws_dsh_get_size(m->conn->dsh, KIND_SS_TO_P) >=
m->ss->policy->proxy_buflen_rxflow_on_above) {
lwsl_notice("%s: %s: rxflow disabling rx\n", __func__,
lws_wsi_tag(m->ss->wsi));
/* stop receiving taking in rx once above the threshold */
lwsl_info("%s: %s: rxflow disabling rx (%lu / %lu, hwm %lu)\n", __func__,
lws_wsi_tag(m->ss->wsi),
(unsigned long)lws_dsh_get_size(m->conn->dsh, KIND_SS_TO_P),
(unsigned long)m->ss->policy->proxy_buflen,
(unsigned long)m->ss->policy->proxy_buflen_rxflow_on_above);
/*
* stop taking in rx once the onward wsi rx is above the
* high water mark
*/
lws_rx_flow_control(m->ss->wsi, 0);
m->conn->onward_in_flow_control = 1;
}
if (m->conn->wsi) /* if possible, request client conn write */
@ -369,8 +377,7 @@ callback_ss_proxy(struct lws *wsi, enum lws_callback_reasons reason,
* acceptance up rapidly with an initial tx containing the
* streamtype name. We can't create the stream until then.
*/
lws_set_timeout(wsi,
PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3);
lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3);
break;
case LWS_CALLBACK_RAW_CLOSE:
@ -660,7 +667,6 @@ callback_ss_proxy(struct lws *wsi, enum lws_callback_reasons reason,
lws_ser_ru32be(&p[7]);
}
#endif
pay = 1;
n = (int)si;
break;
@ -695,18 +701,23 @@ again:
* this dsh?
*/
if (conn->ss->policy->proxy_buflen_rxflow_on_above &&
if (conn->onward_in_flow_control &&
conn->ss->policy->proxy_buflen_rxflow_on_above &&
conn->ss->wsi &&
conn->dsh->oha[KIND_SS_TO_P].total_size <
lws_dsh_get_size(conn->dsh, KIND_SS_TO_P) <
conn->ss->policy->proxy_buflen_rxflow_off_below) {
lwsl_notice("%s: %s: rxflow re-enabling rx\n",
__func__,
lws_wsi_tag(conn->ss->wsi));
lwsl_info("%s: %s: rxflow enabling rx (%lu / %lu, lwm %lu)\n", __func__,
lws_wsi_tag(conn->ss->wsi),
(unsigned long)lws_dsh_get_size(conn->dsh, KIND_SS_TO_P),
(unsigned long)conn->ss->policy->proxy_buflen,
(unsigned long)conn->ss->policy->proxy_buflen_rxflow_off_below);
/*
* Resume receiving taking in rx once
* below the low threshold
*/
lws_rx_flow_control(conn->ss->wsi, 1);
lws_rx_flow_control(conn->ss->wsi,
LWS_RXFLOW_ALLOW);
conn->onward_in_flow_control = 0;
}
}
if (!lws_dsh_get_head(conn->dsh, KIND_SS_TO_P,

View file

@ -43,9 +43,9 @@ if (requirements)
message("testing via valgrind")
add_test(NAME ssperf-warmcat COMMAND
${VALGRIND} --tool=memcheck --leak-check=yes --num-callers=20
$<TARGET_FILE:lws-minimal-secure-streams>)
$<TARGET_FILE:lws-minimal-secure-streams-perf>)
else()
add_test(NAME ssperf-warmcat COMMAND lws-minimal-secure-streams)
add_test(NAME ssperf-warmcat COMMAND lws-minimal-secure-streams-perf)
endif()
set_tests_properties(ssperf-warmcat
@ -90,9 +90,9 @@ if (requirements)
message("testing via valgrind")
add_test(NAME ssperfpc-minimal COMMAND
${VALGRIND} --tool=memcheck --leak-check=yes --num-callers=20
$<TARGET_FILE:lws-minimal-secure-streams-client> -i +${CTEST_SOCKET_PATH})
$<TARGET_FILE:lws-minimal-secure-streams-perf-client> -i +${CTEST_SOCKET_PATH})
else()
add_test(NAME ssperfpc-minimal COMMAND lws-minimal-secure-streams-client -i +${CTEST_SOCKET_PATH})
add_test(NAME ssperfpc-minimal COMMAND lws-minimal-secure-streams-perf-client -i +${CTEST_SOCKET_PATH})
endif()
set(fixlist "ssperfproxy")
@ -101,7 +101,7 @@ if (requirements)
endif()
set_tests_properties(ssperfpc-minimal PROPERTIES
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/secure-streams/minimal-secure-streams
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/secure-streams/minimal-secure-streams-perf
FIXTURES_REQUIRED "${fixlist}"
TIMEOUT 40)

View file

@ -424,7 +424,7 @@ int main(int argc, const char **argv)
memset(&info, 0, sizeof info);
lws_cmdline_option_handle_builtin(argc, argv, &info);
lwsl_user("LWS secure streams test client [-d<verb>]\n");
lwsl_user("LWS secure streams test client PERF [-d<verb>]\n");
/* these options are mutually exclusive if given */

View file

@ -13,6 +13,7 @@ require_lws_config(LWS_WITHOUT_CLIENT 0 requirements)
require_lws_config(LWS_WITH_SECURE_STREAMS 1 requirements)
require_lws_config(LWS_WITH_SECURE_STREAMS_STATIC_POLICY_ONLY 0 requirements)
require_lws_config(LWS_WITH_SYS_STATE 1 requirements)
require_lws_config(LWS_WITH_GENCRYPTO 1 requirements)
if (requirements)
add_executable(${SAMP} minimal-secure-streams.c)

View file

@ -25,6 +25,7 @@ Commandline option|Meaning
-p| Run as proxy server for clients to connect to over unix domain socket
--force-portal|Force the SS Captive Portal Detection to feel it's behind a portal
--force-no-internet|Force the SS Captive Portal Detection to feel it can't reach the internet
--blob|Download a 50MiB blob from warmact.com, using flow control at the proxy
```
[2019/08/12 07:16:11:0045] USR: LWS minimal secure streams [-d<verbosity>] [-f]

View file

@ -38,7 +38,7 @@
// #define VIA_LOCALHOST_SOCKS
static int interrupted, bad = 1, force_cpd_fail_portal,
force_cpd_fail_no_internet, test_respmap;
force_cpd_fail_no_internet, test_respmap, test_blob;
static unsigned int timeout_ms = 3000;
static lws_state_notify_link_t nl;
@ -125,7 +125,7 @@ static const char * const default_ss_policy =
#if defined(VIA_LOCALHOST_SOCKS)
"\"http_url\":" "\"policy/minimal-proxy-socks.json\","
#else
"\"http_url\":" "\"policy/minimal-proxy-v4.2.json\","
"\"http_url\":" "\"policy/minimal-proxy-v4.2-v2.json\","
#endif
"\"tls\":" "true,"
"\"opportunistic\":" "true,"
@ -161,6 +161,9 @@ typedef struct myss {
void *opaque_data;
/* ... application specific state ... */
lws_sorted_usec_list_t sul;
size_t amt;
struct lws_genhash_ctx hash_ctx;
} myss_t;
#if !defined(LWS_SS_USE_SSPC)
@ -183,19 +186,69 @@ static const char *canned_root_token_payload =
/* secure streams payload interface */
static const uint8_t expected_blob_hash[] = {
0xed, 0x57, 0x20, 0xc1, 0x68, 0x30, 0x81, 0x0e,
0x58, 0x29, 0xdf, 0xb9, 0xb6, 0x6c, 0x96, 0xb2,
0xe2, 0x4e, 0xfc, 0x4f, 0x93, 0xaa, 0x5e, 0x38,
0xc7, 0xff, 0x41, 0x50, 0xd3, 0x1c, 0xfb, 0xbf
};
static lws_ss_state_return_t
myss_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
{
myss_t *m = (myss_t *)userobj;
const char *md_srv = NULL, *md_test = NULL;
size_t md_len;
const char *md_srv = "not set", *md_test = "not set";
size_t md_srv_len = 7, md_test_len = 7;
lws_ss_get_metadata(m->ss, "srv", (const void **)&md_srv, &md_len);
lws_ss_get_metadata(m->ss, "test", (const void **)&md_test, &md_len);
if (flags & LWSSS_FLAG_PERF_JSON)
return LWSSSSRET_OK;
lwsl_user("%s: len %d, flags: %d, srv: %s, test: %s\n", __func__,
(int)len, flags, md_srv ? md_srv : "not set",
md_test ? md_test : "not set");
if (test_blob) {
if (flags & LWSSS_FLAG_SOM) {
if (lws_genhash_init(&m->hash_ctx, LWS_GENHASH_TYPE_SHA256))
lwsl_err("%s: hash init failed\n", __func__);
m->amt = 0;
}
if (lws_genhash_update(&m->hash_ctx, buf, len))
lwsl_err("%s: hash failed\n", __func__);
if ((m->amt + len) / 102400 != (m->amt / 102400)) {
lwsl_user("%s: blob test: rx %uKiB\n", __func__,
(unsigned int)((m->amt + len) / 1024));
/*
* Let's make it hard for client to keep up with onward
* server, delay 50ms after every 100K received, so we
* are forcing the flow control action at the proxy
*/
usleep(50000);
}
m->amt += len;
if (flags & LWSSS_FLAG_EOM) {
uint8_t digest[32];
lws_genhash_destroy(&m->hash_ctx, digest);
if (!memcmp(expected_blob_hash, digest, 32)) {
lwsl_user("%s: SHA256 match\n", __func__);
bad = 0;
}
interrupted = 1;
}
return LWSSSSRET_OK;
}
lws_ss_get_metadata(m->ss, "srv", (const void **)&md_srv, &md_srv_len);
lws_ss_get_metadata(m->ss, "test", (const void **)&md_test, &md_test_len);
lwsl_user("%s: len %d, flags: %d, srv: %.*s, test: %.*s\n", __func__,
(int)len, flags, (int)md_srv_len, md_srv,
(int)md_test_len, md_test);
lwsl_hexdump_info(buf, len);
/*
@ -236,13 +289,16 @@ myss_state(void *userobj, void *sh, lws_ss_constate_t state,
case LWSSSCS_CONNECTING:
lws_ss_start_timeout(m->ss, timeout_ms);
if (lws_ss_set_metadata(m->ss, "uptag", "myuptag123", 10))
/* can fail, eg due to OOM, retry later if so */
return LWSSSSRET_DISCONNECT_ME;
if (lws_ss_set_metadata(m->ss, "ctype", "myctype", 7))
/* can fail, eg due to OOM, retry later if so */
return LWSSSSRET_DISCONNECT_ME;
if (!test_blob) {
if (lws_ss_set_metadata(m->ss, "uptag", "myuptag123", 10))
/* can fail, eg due to OOM, retry later if so */
return LWSSSSRET_DISCONNECT_ME;
if (lws_ss_set_metadata(m->ss, "ctype", "myctype", 7))
/* can fail, eg due to OOM, retry later if so */
return LWSSSSRET_DISCONNECT_ME;
}
break;
case LWSSSCS_ALL_RETRIES_FAILED:
@ -357,7 +413,8 @@ app_system_state_nf(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
ssi.tx = myss_tx;
ssi.state = myss_state;
ssi.user_alloc = sizeof(myss_t);
ssi.streamtype = test_respmap ? "respmap" : "mintest";
ssi.streamtype = test_blob ? "bulkproxflow" :
(test_respmap ? "respmap" : "mintest");
if (lws_ss_create(context, 0, &ssi, NULL, NULL,
NULL, NULL)) {
@ -434,6 +491,16 @@ int main(int argc, const char **argv)
if ((p = lws_cmdline_option(argc, argv, "--timeout_ms")))
timeout_ms = (unsigned int)atoi(p);
if (lws_cmdline_option(argc, argv, "--blob")) {
test_blob = 1;
if (timeout_ms == 3000)
/*
* Don't use default 3s, we're going to be a lot
* slower
*/
timeout_ms = 60000;
}
info.fd_limit_per_thread = 1 + 6 + 1;
info.port = CONTEXT_PORT_NO_LISTEN;
#if defined(LWS_SS_USE_SSPC)