diff --git a/include/libwebsockets/lws-dsh.h b/include/libwebsockets/lws-dsh.h index ee7168bc2..09c950c05 100644 --- a/include/libwebsockets/lws-dsh.h +++ b/include/libwebsockets/lws-dsh.h @@ -115,6 +115,9 @@ lws_dsh_alloc_tail(struct lws_dsh *dsh, int kind, const void *src1, LWS_VISIBLE LWS_EXTERN void lws_dsh_free(void **obj); +LWS_VISIBLE LWS_EXTERN size_t +lws_dsh_get_size(struct lws_dsh *dsh, int kind); + /** * lws_dsh_get_head() - get the head allocation inside the dsh * diff --git a/lib/core-net/lws-dsh.c b/lib/core-net/lws-dsh.c index 324272baa..7a6c505ac 100644 --- a/lib/core-net/lws-dsh.c +++ b/lib/core-net/lws-dsh.c @@ -246,6 +246,15 @@ lws_dsh_destroy(lws_dsh_t **pdsh) lws_free_set_NULL(*pdsh); } +size_t +lws_dsh_get_size(struct lws_dsh *dsh, int kind) +{ + kind++; + assert(kind < dsh->count_kinds); + + return dsh->oha[kind].total_size; +} + static int _lws_dsh_alloc_tail(lws_dsh_t *dsh, int kind, const void *src1, size_t size1, const void *src2, size_t size2, lws_dll2_t *replace) diff --git a/lib/secure-streams/private-lib-secure-streams.h b/lib/secure-streams/private-lib-secure-streams.h index 78720c744..9bf6f310b 100644 --- a/lib/secure-streams/private-lib-secure-streams.h +++ b/lib/secure-streams/private-lib-secure-streams.h @@ -555,6 +555,8 @@ struct conn { lws_ss_handle_t *ss; /* the onward, ss side */ lws_ss_conn_states_t state; + + char onward_in_flow_control; }; extern const struct ss_pcols ss_pcol_h1; diff --git a/lib/secure-streams/secure-streams-process.c b/lib/secure-streams/secure-streams-process.c index 55a129a31..9edd7a631 100644 --- a/lib/secure-streams/secure-streams-process.c +++ b/lib/secure-streams/secure-streams-process.c @@ -154,13 +154,21 @@ ss_proxy_onward_rx(void *userobj, const uint8_t *buf, size_t len, int flags) * in the dsh holding proxy->client serialized forwarding rx */ - if (m->ss->policy->proxy_buflen_rxflow_on_above && m->ss->wsi && - m->conn->dsh->oha[KIND_SS_TO_P].total_size > + if (!m->conn->onward_in_flow_control && m->ss->wsi && + m->ss->policy->proxy_buflen_rxflow_on_above && + lws_dsh_get_size(m->conn->dsh, KIND_SS_TO_P) >= m->ss->policy->proxy_buflen_rxflow_on_above) { - lwsl_notice("%s: %s: rxflow disabling rx\n", __func__, - lws_wsi_tag(m->ss->wsi)); - /* stop receiving taking in rx once above the threshold */ + lwsl_info("%s: %s: rxflow disabling rx (%lu / %lu, hwm %lu)\n", __func__, + lws_wsi_tag(m->ss->wsi), + (unsigned long)lws_dsh_get_size(m->conn->dsh, KIND_SS_TO_P), + (unsigned long)m->ss->policy->proxy_buflen, + (unsigned long)m->ss->policy->proxy_buflen_rxflow_on_above); + /* + * stop taking in rx once the onward wsi rx is above the + * high water mark + */ lws_rx_flow_control(m->ss->wsi, 0); + m->conn->onward_in_flow_control = 1; } if (m->conn->wsi) /* if possible, request client conn write */ @@ -369,8 +377,7 @@ callback_ss_proxy(struct lws *wsi, enum lws_callback_reasons reason, * acceptance up rapidly with an initial tx containing the * streamtype name. We can't create the stream until then. */ - lws_set_timeout(wsi, - PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3); + lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3); break; case LWS_CALLBACK_RAW_CLOSE: @@ -660,7 +667,6 @@ callback_ss_proxy(struct lws *wsi, enum lws_callback_reasons reason, lws_ser_ru32be(&p[7]); } #endif - pay = 1; n = (int)si; break; @@ -695,18 +701,23 @@ again: * this dsh? */ - if (conn->ss->policy->proxy_buflen_rxflow_on_above && + if (conn->onward_in_flow_control && + conn->ss->policy->proxy_buflen_rxflow_on_above && conn->ss->wsi && - conn->dsh->oha[KIND_SS_TO_P].total_size < + lws_dsh_get_size(conn->dsh, KIND_SS_TO_P) < conn->ss->policy->proxy_buflen_rxflow_off_below) { - lwsl_notice("%s: %s: rxflow re-enabling rx\n", - __func__, - lws_wsi_tag(conn->ss->wsi)); + lwsl_info("%s: %s: rxflow enabling rx (%lu / %lu, lwm %lu)\n", __func__, + lws_wsi_tag(conn->ss->wsi), + (unsigned long)lws_dsh_get_size(conn->dsh, KIND_SS_TO_P), + (unsigned long)conn->ss->policy->proxy_buflen, + (unsigned long)conn->ss->policy->proxy_buflen_rxflow_off_below); /* * Resume receiving taking in rx once * below the low threshold */ - lws_rx_flow_control(conn->ss->wsi, 1); + lws_rx_flow_control(conn->ss->wsi, + LWS_RXFLOW_ALLOW); + conn->onward_in_flow_control = 0; } } if (!lws_dsh_get_head(conn->dsh, KIND_SS_TO_P, diff --git a/minimal-examples/secure-streams/minimal-secure-streams-perf/CMakeLists.txt b/minimal-examples/secure-streams/minimal-secure-streams-perf/CMakeLists.txt index ca3cb622d..2be9464be 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams-perf/CMakeLists.txt +++ b/minimal-examples/secure-streams/minimal-secure-streams-perf/CMakeLists.txt @@ -43,9 +43,9 @@ if (requirements) message("testing via valgrind") add_test(NAME ssperf-warmcat COMMAND ${VALGRIND} --tool=memcheck --leak-check=yes --num-callers=20 - $) + $) else() - add_test(NAME ssperf-warmcat COMMAND lws-minimal-secure-streams) + add_test(NAME ssperf-warmcat COMMAND lws-minimal-secure-streams-perf) endif() set_tests_properties(ssperf-warmcat @@ -90,9 +90,9 @@ if (requirements) message("testing via valgrind") add_test(NAME ssperfpc-minimal COMMAND ${VALGRIND} --tool=memcheck --leak-check=yes --num-callers=20 - $ -i +${CTEST_SOCKET_PATH}) + $ -i +${CTEST_SOCKET_PATH}) else() - add_test(NAME ssperfpc-minimal COMMAND lws-minimal-secure-streams-client -i +${CTEST_SOCKET_PATH}) + add_test(NAME ssperfpc-minimal COMMAND lws-minimal-secure-streams-perf-client -i +${CTEST_SOCKET_PATH}) endif() set(fixlist "ssperfproxy") @@ -101,7 +101,7 @@ if (requirements) endif() set_tests_properties(ssperfpc-minimal PROPERTIES - WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/secure-streams/minimal-secure-streams + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/secure-streams/minimal-secure-streams-perf FIXTURES_REQUIRED "${fixlist}" TIMEOUT 40) diff --git a/minimal-examples/secure-streams/minimal-secure-streams-perf/minimal-secure-streams.c b/minimal-examples/secure-streams/minimal-secure-streams-perf/minimal-secure-streams.c index 4bdbf89f3..be41e6b68 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams-perf/minimal-secure-streams.c +++ b/minimal-examples/secure-streams/minimal-secure-streams-perf/minimal-secure-streams.c @@ -424,7 +424,7 @@ int main(int argc, const char **argv) memset(&info, 0, sizeof info); lws_cmdline_option_handle_builtin(argc, argv, &info); - lwsl_user("LWS secure streams test client [-d]\n"); + lwsl_user("LWS secure streams test client PERF [-d]\n"); /* these options are mutually exclusive if given */ diff --git a/minimal-examples/secure-streams/minimal-secure-streams/CMakeLists.txt b/minimal-examples/secure-streams/minimal-secure-streams/CMakeLists.txt index 93bf34fdb..9559799a8 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams/CMakeLists.txt +++ b/minimal-examples/secure-streams/minimal-secure-streams/CMakeLists.txt @@ -13,6 +13,7 @@ require_lws_config(LWS_WITHOUT_CLIENT 0 requirements) require_lws_config(LWS_WITH_SECURE_STREAMS 1 requirements) require_lws_config(LWS_WITH_SECURE_STREAMS_STATIC_POLICY_ONLY 0 requirements) require_lws_config(LWS_WITH_SYS_STATE 1 requirements) +require_lws_config(LWS_WITH_GENCRYPTO 1 requirements) if (requirements) add_executable(${SAMP} minimal-secure-streams.c) diff --git a/minimal-examples/secure-streams/minimal-secure-streams/README.md b/minimal-examples/secure-streams/minimal-secure-streams/README.md index cb9b3b360..78f0a1bd0 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams/README.md +++ b/minimal-examples/secure-streams/minimal-secure-streams/README.md @@ -25,6 +25,7 @@ Commandline option|Meaning -p| Run as proxy server for clients to connect to over unix domain socket --force-portal|Force the SS Captive Portal Detection to feel it's behind a portal --force-no-internet|Force the SS Captive Portal Detection to feel it can't reach the internet +--blob|Download a 50MiB blob from warmact.com, using flow control at the proxy ``` [2019/08/12 07:16:11:0045] USR: LWS minimal secure streams [-d] [-f] diff --git a/minimal-examples/secure-streams/minimal-secure-streams/minimal-secure-streams.c b/minimal-examples/secure-streams/minimal-secure-streams/minimal-secure-streams.c index a53503939..202a696af 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams/minimal-secure-streams.c +++ b/minimal-examples/secure-streams/minimal-secure-streams/minimal-secure-streams.c @@ -38,7 +38,7 @@ // #define VIA_LOCALHOST_SOCKS static int interrupted, bad = 1, force_cpd_fail_portal, - force_cpd_fail_no_internet, test_respmap; + force_cpd_fail_no_internet, test_respmap, test_blob; static unsigned int timeout_ms = 3000; static lws_state_notify_link_t nl; @@ -125,7 +125,7 @@ static const char * const default_ss_policy = #if defined(VIA_LOCALHOST_SOCKS) "\"http_url\":" "\"policy/minimal-proxy-socks.json\"," #else - "\"http_url\":" "\"policy/minimal-proxy-v4.2.json\"," + "\"http_url\":" "\"policy/minimal-proxy-v4.2-v2.json\"," #endif "\"tls\":" "true," "\"opportunistic\":" "true," @@ -161,6 +161,9 @@ typedef struct myss { void *opaque_data; /* ... application specific state ... */ lws_sorted_usec_list_t sul; + size_t amt; + + struct lws_genhash_ctx hash_ctx; } myss_t; #if !defined(LWS_SS_USE_SSPC) @@ -183,19 +186,69 @@ static const char *canned_root_token_payload = /* secure streams payload interface */ +static const uint8_t expected_blob_hash[] = { + 0xed, 0x57, 0x20, 0xc1, 0x68, 0x30, 0x81, 0x0e, + 0x58, 0x29, 0xdf, 0xb9, 0xb6, 0x6c, 0x96, 0xb2, + 0xe2, 0x4e, 0xfc, 0x4f, 0x93, 0xaa, 0x5e, 0x38, + 0xc7, 0xff, 0x41, 0x50, 0xd3, 0x1c, 0xfb, 0xbf +}; + static lws_ss_state_return_t myss_rx(void *userobj, const uint8_t *buf, size_t len, int flags) { myss_t *m = (myss_t *)userobj; - const char *md_srv = NULL, *md_test = NULL; - size_t md_len; + const char *md_srv = "not set", *md_test = "not set"; + size_t md_srv_len = 7, md_test_len = 7; - lws_ss_get_metadata(m->ss, "srv", (const void **)&md_srv, &md_len); - lws_ss_get_metadata(m->ss, "test", (const void **)&md_test, &md_len); + if (flags & LWSSS_FLAG_PERF_JSON) + return LWSSSSRET_OK; - lwsl_user("%s: len %d, flags: %d, srv: %s, test: %s\n", __func__, - (int)len, flags, md_srv ? md_srv : "not set", - md_test ? md_test : "not set"); + if (test_blob) { + + if (flags & LWSSS_FLAG_SOM) { + if (lws_genhash_init(&m->hash_ctx, LWS_GENHASH_TYPE_SHA256)) + lwsl_err("%s: hash init failed\n", __func__); + m->amt = 0; + } + + if (lws_genhash_update(&m->hash_ctx, buf, len)) + lwsl_err("%s: hash failed\n", __func__); + + if ((m->amt + len) / 102400 != (m->amt / 102400)) { + + lwsl_user("%s: blob test: rx %uKiB\n", __func__, + (unsigned int)((m->amt + len) / 1024)); + /* + * Let's make it hard for client to keep up with onward + * server, delay 50ms after every 100K received, so we + * are forcing the flow control action at the proxy + */ + usleep(50000); + } + + m->amt += len; + + if (flags & LWSSS_FLAG_EOM) { + uint8_t digest[32]; + lws_genhash_destroy(&m->hash_ctx, digest); + + if (!memcmp(expected_blob_hash, digest, 32)) { + lwsl_user("%s: SHA256 match\n", __func__); + bad = 0; + } + + interrupted = 1; + } + + return LWSSSSRET_OK; + } + + lws_ss_get_metadata(m->ss, "srv", (const void **)&md_srv, &md_srv_len); + lws_ss_get_metadata(m->ss, "test", (const void **)&md_test, &md_test_len); + + lwsl_user("%s: len %d, flags: %d, srv: %.*s, test: %.*s\n", __func__, + (int)len, flags, (int)md_srv_len, md_srv, + (int)md_test_len, md_test); lwsl_hexdump_info(buf, len); /* @@ -236,13 +289,16 @@ myss_state(void *userobj, void *sh, lws_ss_constate_t state, case LWSSSCS_CONNECTING: lws_ss_start_timeout(m->ss, timeout_ms); - if (lws_ss_set_metadata(m->ss, "uptag", "myuptag123", 10)) - /* can fail, eg due to OOM, retry later if so */ - return LWSSSSRET_DISCONNECT_ME; - if (lws_ss_set_metadata(m->ss, "ctype", "myctype", 7)) - /* can fail, eg due to OOM, retry later if so */ - return LWSSSSRET_DISCONNECT_ME; + if (!test_blob) { + if (lws_ss_set_metadata(m->ss, "uptag", "myuptag123", 10)) + /* can fail, eg due to OOM, retry later if so */ + return LWSSSSRET_DISCONNECT_ME; + + if (lws_ss_set_metadata(m->ss, "ctype", "myctype", 7)) + /* can fail, eg due to OOM, retry later if so */ + return LWSSSSRET_DISCONNECT_ME; + } break; case LWSSSCS_ALL_RETRIES_FAILED: @@ -357,7 +413,8 @@ app_system_state_nf(lws_state_manager_t *mgr, lws_state_notify_link_t *link, ssi.tx = myss_tx; ssi.state = myss_state; ssi.user_alloc = sizeof(myss_t); - ssi.streamtype = test_respmap ? "respmap" : "mintest"; + ssi.streamtype = test_blob ? "bulkproxflow" : + (test_respmap ? "respmap" : "mintest"); if (lws_ss_create(context, 0, &ssi, NULL, NULL, NULL, NULL)) { @@ -434,6 +491,16 @@ int main(int argc, const char **argv) if ((p = lws_cmdline_option(argc, argv, "--timeout_ms"))) timeout_ms = (unsigned int)atoi(p); + if (lws_cmdline_option(argc, argv, "--blob")) { + test_blob = 1; + if (timeout_ms == 3000) + /* + * Don't use default 3s, we're going to be a lot + * slower + */ + timeout_ms = 60000; + } + info.fd_limit_per_thread = 1 + 6 + 1; info.port = CONTEXT_PORT_NO_LISTEN; #if defined(LWS_SS_USE_SSPC)