From 42dc817d8fab49a625c2b47a2ca02c4939b2cf4e Mon Sep 17 00:00:00 2001 From: Andy Green Date: Wed, 7 Apr 2021 14:25:07 +0100 Subject: [PATCH] ss: proxy: get rx flow control working This fixes the proxy rx flow by adding an lws_dsh helper to hide the off-by-one in the "kind" array (kind 0 is reserved for tracking the unallocated dsh blocks). For testing, it adds a --blob option on minimal-secure-streams[-client] which uses a streamtype "bulkproxflow" from here https://warmcat.com/policy/minimal-proxy-v4.2-v2.json "bulkproxflow": { "endpoint": "warmcat.com", "port": 443, "protocol": "h1", "http_method": "GET", "http_url": "blob.bin", "proxy_buflen": 32768, "proxy_buflen_rxflow_on_above": 24576, "proxy_buflen_rxflow_off_below": 8192, "tls": true, "retry": "default", "tls_trust_store": "le_via_dst" } This downloads a 51MB blob of random data with the SHA256sum ed5720c16830810e5829dfb9b66c96b2e24efc4f93aa5e38c7ff4150d31cfbbf The minimal-secure-streams --blob example client delays the download by 50ms every 10KiB it sees to force rx flow usage at the proxy. It downloads the whole thing and checks the SHA256 is as expected. Logs about rxflow status are available at LLL_INFO log level. --- include/libwebsockets/lws-dsh.h | 3 + lib/core-net/lws-dsh.c | 9 ++ .../private-lib-secure-streams.h | 2 + lib/secure-streams/secure-streams-process.c | 39 +++++--- .../CMakeLists.txt | 10 +- .../minimal-secure-streams.c | 2 +- .../minimal-secure-streams/CMakeLists.txt | 1 + .../minimal-secure-streams/README.md | 1 + .../minimal-secure-streams.c | 99 ++++++++++++++++--- 9 files changed, 130 insertions(+), 36 deletions(-) diff --git a/include/libwebsockets/lws-dsh.h b/include/libwebsockets/lws-dsh.h index ee7168bc2..09c950c05 100644 --- a/include/libwebsockets/lws-dsh.h +++ b/include/libwebsockets/lws-dsh.h @@ -115,6 +115,9 @@ lws_dsh_alloc_tail(struct lws_dsh *dsh, int kind, const void *src1, LWS_VISIBLE LWS_EXTERN void lws_dsh_free(void **obj); +LWS_VISIBLE LWS_EXTERN size_t +lws_dsh_get_size(struct lws_dsh *dsh, int kind); + /** * lws_dsh_get_head() - get the head allocation inside the dsh * diff --git a/lib/core-net/lws-dsh.c b/lib/core-net/lws-dsh.c index 324272baa..7a6c505ac 100644 --- a/lib/core-net/lws-dsh.c +++ b/lib/core-net/lws-dsh.c @@ -246,6 +246,15 @@ lws_dsh_destroy(lws_dsh_t **pdsh) lws_free_set_NULL(*pdsh); } +size_t +lws_dsh_get_size(struct lws_dsh *dsh, int kind) +{ + kind++; + assert(kind < dsh->count_kinds); + + return dsh->oha[kind].total_size; +} + static int _lws_dsh_alloc_tail(lws_dsh_t *dsh, int kind, const void *src1, size_t size1, const void *src2, size_t size2, lws_dll2_t *replace) diff --git a/lib/secure-streams/private-lib-secure-streams.h b/lib/secure-streams/private-lib-secure-streams.h index 78720c744..9bf6f310b 100644 --- a/lib/secure-streams/private-lib-secure-streams.h +++ b/lib/secure-streams/private-lib-secure-streams.h @@ -555,6 +555,8 @@ struct conn { lws_ss_handle_t *ss; /* the onward, ss side */ lws_ss_conn_states_t state; + + char onward_in_flow_control; }; extern const struct ss_pcols ss_pcol_h1; diff --git a/lib/secure-streams/secure-streams-process.c b/lib/secure-streams/secure-streams-process.c index 55a129a31..9edd7a631 100644 --- a/lib/secure-streams/secure-streams-process.c +++ b/lib/secure-streams/secure-streams-process.c @@ -154,13 +154,21 @@ ss_proxy_onward_rx(void *userobj, const uint8_t *buf, size_t len, int flags) * in the dsh holding proxy->client serialized forwarding rx */ - if (m->ss->policy->proxy_buflen_rxflow_on_above && m->ss->wsi && - m->conn->dsh->oha[KIND_SS_TO_P].total_size > + if (!m->conn->onward_in_flow_control && m->ss->wsi && + m->ss->policy->proxy_buflen_rxflow_on_above && + lws_dsh_get_size(m->conn->dsh, KIND_SS_TO_P) >= m->ss->policy->proxy_buflen_rxflow_on_above) { - lwsl_notice("%s: %s: rxflow disabling rx\n", __func__, - lws_wsi_tag(m->ss->wsi)); - /* stop receiving taking in rx once above the threshold */ + lwsl_info("%s: %s: rxflow disabling rx (%lu / %lu, hwm %lu)\n", __func__, + lws_wsi_tag(m->ss->wsi), + (unsigned long)lws_dsh_get_size(m->conn->dsh, KIND_SS_TO_P), + (unsigned long)m->ss->policy->proxy_buflen, + (unsigned long)m->ss->policy->proxy_buflen_rxflow_on_above); + /* + * stop taking in rx once the onward wsi rx is above the + * high water mark + */ lws_rx_flow_control(m->ss->wsi, 0); + m->conn->onward_in_flow_control = 1; } if (m->conn->wsi) /* if possible, request client conn write */ @@ -369,8 +377,7 @@ callback_ss_proxy(struct lws *wsi, enum lws_callback_reasons reason, * acceptance up rapidly with an initial tx containing the * streamtype name. We can't create the stream until then. */ - lws_set_timeout(wsi, - PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3); + lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3); break; case LWS_CALLBACK_RAW_CLOSE: @@ -660,7 +667,6 @@ callback_ss_proxy(struct lws *wsi, enum lws_callback_reasons reason, lws_ser_ru32be(&p[7]); } #endif - pay = 1; n = (int)si; break; @@ -695,18 +701,23 @@ again: * this dsh? */ - if (conn->ss->policy->proxy_buflen_rxflow_on_above && + if (conn->onward_in_flow_control && + conn->ss->policy->proxy_buflen_rxflow_on_above && conn->ss->wsi && - conn->dsh->oha[KIND_SS_TO_P].total_size < + lws_dsh_get_size(conn->dsh, KIND_SS_TO_P) < conn->ss->policy->proxy_buflen_rxflow_off_below) { - lwsl_notice("%s: %s: rxflow re-enabling rx\n", - __func__, - lws_wsi_tag(conn->ss->wsi)); + lwsl_info("%s: %s: rxflow enabling rx (%lu / %lu, lwm %lu)\n", __func__, + lws_wsi_tag(conn->ss->wsi), + (unsigned long)lws_dsh_get_size(conn->dsh, KIND_SS_TO_P), + (unsigned long)conn->ss->policy->proxy_buflen, + (unsigned long)conn->ss->policy->proxy_buflen_rxflow_off_below); /* * Resume receiving taking in rx once * below the low threshold */ - lws_rx_flow_control(conn->ss->wsi, 1); + lws_rx_flow_control(conn->ss->wsi, + LWS_RXFLOW_ALLOW); + conn->onward_in_flow_control = 0; } } if (!lws_dsh_get_head(conn->dsh, KIND_SS_TO_P, diff --git a/minimal-examples/secure-streams/minimal-secure-streams-perf/CMakeLists.txt b/minimal-examples/secure-streams/minimal-secure-streams-perf/CMakeLists.txt index ca3cb622d..2be9464be 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams-perf/CMakeLists.txt +++ b/minimal-examples/secure-streams/minimal-secure-streams-perf/CMakeLists.txt @@ -43,9 +43,9 @@ if (requirements) message("testing via valgrind") add_test(NAME ssperf-warmcat COMMAND ${VALGRIND} --tool=memcheck --leak-check=yes --num-callers=20 - $) + $) else() - add_test(NAME ssperf-warmcat COMMAND lws-minimal-secure-streams) + add_test(NAME ssperf-warmcat COMMAND lws-minimal-secure-streams-perf) endif() set_tests_properties(ssperf-warmcat @@ -90,9 +90,9 @@ if (requirements) message("testing via valgrind") add_test(NAME ssperfpc-minimal COMMAND ${VALGRIND} --tool=memcheck --leak-check=yes --num-callers=20 - $ -i +${CTEST_SOCKET_PATH}) + $ -i +${CTEST_SOCKET_PATH}) else() - add_test(NAME ssperfpc-minimal COMMAND lws-minimal-secure-streams-client -i +${CTEST_SOCKET_PATH}) + add_test(NAME ssperfpc-minimal COMMAND lws-minimal-secure-streams-perf-client -i +${CTEST_SOCKET_PATH}) endif() set(fixlist "ssperfproxy") @@ -101,7 +101,7 @@ if (requirements) endif() set_tests_properties(ssperfpc-minimal PROPERTIES - WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/secure-streams/minimal-secure-streams + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/secure-streams/minimal-secure-streams-perf FIXTURES_REQUIRED "${fixlist}" TIMEOUT 40) diff --git a/minimal-examples/secure-streams/minimal-secure-streams-perf/minimal-secure-streams.c b/minimal-examples/secure-streams/minimal-secure-streams-perf/minimal-secure-streams.c index 4bdbf89f3..be41e6b68 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams-perf/minimal-secure-streams.c +++ b/minimal-examples/secure-streams/minimal-secure-streams-perf/minimal-secure-streams.c @@ -424,7 +424,7 @@ int main(int argc, const char **argv) memset(&info, 0, sizeof info); lws_cmdline_option_handle_builtin(argc, argv, &info); - lwsl_user("LWS secure streams test client [-d]\n"); + lwsl_user("LWS secure streams test client PERF [-d]\n"); /* these options are mutually exclusive if given */ diff --git a/minimal-examples/secure-streams/minimal-secure-streams/CMakeLists.txt b/minimal-examples/secure-streams/minimal-secure-streams/CMakeLists.txt index 93bf34fdb..9559799a8 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams/CMakeLists.txt +++ b/minimal-examples/secure-streams/minimal-secure-streams/CMakeLists.txt @@ -13,6 +13,7 @@ require_lws_config(LWS_WITHOUT_CLIENT 0 requirements) require_lws_config(LWS_WITH_SECURE_STREAMS 1 requirements) require_lws_config(LWS_WITH_SECURE_STREAMS_STATIC_POLICY_ONLY 0 requirements) require_lws_config(LWS_WITH_SYS_STATE 1 requirements) +require_lws_config(LWS_WITH_GENCRYPTO 1 requirements) if (requirements) add_executable(${SAMP} minimal-secure-streams.c) diff --git a/minimal-examples/secure-streams/minimal-secure-streams/README.md b/minimal-examples/secure-streams/minimal-secure-streams/README.md index cb9b3b360..78f0a1bd0 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams/README.md +++ b/minimal-examples/secure-streams/minimal-secure-streams/README.md @@ -25,6 +25,7 @@ Commandline option|Meaning -p| Run as proxy server for clients to connect to over unix domain socket --force-portal|Force the SS Captive Portal Detection to feel it's behind a portal --force-no-internet|Force the SS Captive Portal Detection to feel it can't reach the internet +--blob|Download a 50MiB blob from warmact.com, using flow control at the proxy ``` [2019/08/12 07:16:11:0045] USR: LWS minimal secure streams [-d] [-f] diff --git a/minimal-examples/secure-streams/minimal-secure-streams/minimal-secure-streams.c b/minimal-examples/secure-streams/minimal-secure-streams/minimal-secure-streams.c index a53503939..202a696af 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams/minimal-secure-streams.c +++ b/minimal-examples/secure-streams/minimal-secure-streams/minimal-secure-streams.c @@ -38,7 +38,7 @@ // #define VIA_LOCALHOST_SOCKS static int interrupted, bad = 1, force_cpd_fail_portal, - force_cpd_fail_no_internet, test_respmap; + force_cpd_fail_no_internet, test_respmap, test_blob; static unsigned int timeout_ms = 3000; static lws_state_notify_link_t nl; @@ -125,7 +125,7 @@ static const char * const default_ss_policy = #if defined(VIA_LOCALHOST_SOCKS) "\"http_url\":" "\"policy/minimal-proxy-socks.json\"," #else - "\"http_url\":" "\"policy/minimal-proxy-v4.2.json\"," + "\"http_url\":" "\"policy/minimal-proxy-v4.2-v2.json\"," #endif "\"tls\":" "true," "\"opportunistic\":" "true," @@ -161,6 +161,9 @@ typedef struct myss { void *opaque_data; /* ... application specific state ... */ lws_sorted_usec_list_t sul; + size_t amt; + + struct lws_genhash_ctx hash_ctx; } myss_t; #if !defined(LWS_SS_USE_SSPC) @@ -183,19 +186,69 @@ static const char *canned_root_token_payload = /* secure streams payload interface */ +static const uint8_t expected_blob_hash[] = { + 0xed, 0x57, 0x20, 0xc1, 0x68, 0x30, 0x81, 0x0e, + 0x58, 0x29, 0xdf, 0xb9, 0xb6, 0x6c, 0x96, 0xb2, + 0xe2, 0x4e, 0xfc, 0x4f, 0x93, 0xaa, 0x5e, 0x38, + 0xc7, 0xff, 0x41, 0x50, 0xd3, 0x1c, 0xfb, 0xbf +}; + static lws_ss_state_return_t myss_rx(void *userobj, const uint8_t *buf, size_t len, int flags) { myss_t *m = (myss_t *)userobj; - const char *md_srv = NULL, *md_test = NULL; - size_t md_len; + const char *md_srv = "not set", *md_test = "not set"; + size_t md_srv_len = 7, md_test_len = 7; - lws_ss_get_metadata(m->ss, "srv", (const void **)&md_srv, &md_len); - lws_ss_get_metadata(m->ss, "test", (const void **)&md_test, &md_len); + if (flags & LWSSS_FLAG_PERF_JSON) + return LWSSSSRET_OK; - lwsl_user("%s: len %d, flags: %d, srv: %s, test: %s\n", __func__, - (int)len, flags, md_srv ? md_srv : "not set", - md_test ? md_test : "not set"); + if (test_blob) { + + if (flags & LWSSS_FLAG_SOM) { + if (lws_genhash_init(&m->hash_ctx, LWS_GENHASH_TYPE_SHA256)) + lwsl_err("%s: hash init failed\n", __func__); + m->amt = 0; + } + + if (lws_genhash_update(&m->hash_ctx, buf, len)) + lwsl_err("%s: hash failed\n", __func__); + + if ((m->amt + len) / 102400 != (m->amt / 102400)) { + + lwsl_user("%s: blob test: rx %uKiB\n", __func__, + (unsigned int)((m->amt + len) / 1024)); + /* + * Let's make it hard for client to keep up with onward + * server, delay 50ms after every 100K received, so we + * are forcing the flow control action at the proxy + */ + usleep(50000); + } + + m->amt += len; + + if (flags & LWSSS_FLAG_EOM) { + uint8_t digest[32]; + lws_genhash_destroy(&m->hash_ctx, digest); + + if (!memcmp(expected_blob_hash, digest, 32)) { + lwsl_user("%s: SHA256 match\n", __func__); + bad = 0; + } + + interrupted = 1; + } + + return LWSSSSRET_OK; + } + + lws_ss_get_metadata(m->ss, "srv", (const void **)&md_srv, &md_srv_len); + lws_ss_get_metadata(m->ss, "test", (const void **)&md_test, &md_test_len); + + lwsl_user("%s: len %d, flags: %d, srv: %.*s, test: %.*s\n", __func__, + (int)len, flags, (int)md_srv_len, md_srv, + (int)md_test_len, md_test); lwsl_hexdump_info(buf, len); /* @@ -236,13 +289,16 @@ myss_state(void *userobj, void *sh, lws_ss_constate_t state, case LWSSSCS_CONNECTING: lws_ss_start_timeout(m->ss, timeout_ms); - if (lws_ss_set_metadata(m->ss, "uptag", "myuptag123", 10)) - /* can fail, eg due to OOM, retry later if so */ - return LWSSSSRET_DISCONNECT_ME; - if (lws_ss_set_metadata(m->ss, "ctype", "myctype", 7)) - /* can fail, eg due to OOM, retry later if so */ - return LWSSSSRET_DISCONNECT_ME; + if (!test_blob) { + if (lws_ss_set_metadata(m->ss, "uptag", "myuptag123", 10)) + /* can fail, eg due to OOM, retry later if so */ + return LWSSSSRET_DISCONNECT_ME; + + if (lws_ss_set_metadata(m->ss, "ctype", "myctype", 7)) + /* can fail, eg due to OOM, retry later if so */ + return LWSSSSRET_DISCONNECT_ME; + } break; case LWSSSCS_ALL_RETRIES_FAILED: @@ -357,7 +413,8 @@ app_system_state_nf(lws_state_manager_t *mgr, lws_state_notify_link_t *link, ssi.tx = myss_tx; ssi.state = myss_state; ssi.user_alloc = sizeof(myss_t); - ssi.streamtype = test_respmap ? "respmap" : "mintest"; + ssi.streamtype = test_blob ? "bulkproxflow" : + (test_respmap ? "respmap" : "mintest"); if (lws_ss_create(context, 0, &ssi, NULL, NULL, NULL, NULL)) { @@ -434,6 +491,16 @@ int main(int argc, const char **argv) if ((p = lws_cmdline_option(argc, argv, "--timeout_ms"))) timeout_ms = (unsigned int)atoi(p); + if (lws_cmdline_option(argc, argv, "--blob")) { + test_blob = 1; + if (timeout_ms == 3000) + /* + * Don't use default 3s, we're going to be a lot + * slower + */ + timeout_ms = 60000; + } + info.fd_limit_per_thread = 1 + 6 + 1; info.port = CONTEXT_PORT_NO_LISTEN; #if defined(LWS_SS_USE_SSPC)