diff --git a/include/libwebsockets/lws-secure-streams-policy.h b/include/libwebsockets/lws-secure-streams-policy.h index 887295cd9..f84edec99 100644 --- a/include/libwebsockets/lws-secure-streams-policy.h +++ b/include/libwebsockets/lws-secure-streams-policy.h @@ -162,6 +162,8 @@ enum { /**< metadata as direct protocol string, e.g. http header */ LWSSSPOLF_HTTP_CACHE_COOKIES = (1 << 24), /**< Record http cookies and pass them back on future requests */ + LWSSSPOLF_PRIORITIZE_READS = (1 << 25), + /**< prioritize clearing reads at expense of writes */ }; diff --git a/lib/core/context.c b/lib/core/context.c index b54459b86..3b96ac130 100644 --- a/lib/core/context.c +++ b/lib/core/context.c @@ -781,7 +781,7 @@ lws_create_context(const struct lws_context_creation_info *info) #endif /* network */ - lwsl_cx_notice(context, "LWS: %s, %s%s", library_version, opts_str, s); + lwsl_notice("LWS: %s, %s%s\n", library_version, opts_str, s); #if defined(LWS_WITH_NETWORK) lwsl_cx_info(context, "Event loop: %s", plev->ops->name); #endif diff --git a/lib/secure-streams/README.md b/lib/secure-streams/README.md index 1892b9788..b7a9c3d50 100644 --- a/lib/secure-streams/README.md +++ b/lib/secure-streams/README.md @@ -185,7 +185,7 @@ An array of ms delays for each retry in turn The number of retries to conceal from higher layers before giving errors. If this is larger than the number of times in the backoff array, then the last time -is used for the extra delays +is used for the extra delays. 65535 means never stop trying. ### `jitterpc` @@ -642,6 +642,11 @@ protocol. Eg, a single multipart mime transaction carries content from two or m Use if the ws messages are binary +### `ws_prioritize_reads` + +Set `true` if the event loop should prioritize keeping up with input at the +potential expense of output latency. + ## MQTT transport ### `mqtt_topic` diff --git a/lib/secure-streams/policy-json.c b/lib/secure-streams/policy-json.c index 99d6de464..647707678 100644 --- a/lib/secure-streams/policy-json.c +++ b/lib/secure-streams/policy-json.c @@ -64,6 +64,7 @@ static const char * const lejp_tokens_policy[] = { "s[].*.attr_high_reliability", "s[].*.attr_low_cost", "s[].*.long_poll", + "s[].*.ws_prioritize_reads", "s[].*.retry", "s[].*.timeout_ms", "s[].*.perf", @@ -167,6 +168,7 @@ typedef enum { LSSPPT_ATTR_HIGH_RELIABILITY, LSSPPT_ATTR_LOW_COST, LSSPPT_LONG_POLL, + LSSPPT_PRIORITIZE_READS, LSSPPT_RETRYPTR, LSSPPT_DEFAULT_TIMEOUT_MS, LSSPPT_PERF, @@ -758,6 +760,11 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason) if (reason == LEJPCB_VAL_TRUE) a->curr[LTY_POLICY].p->flags |= LWSSSPOLF_LONG_POLL; break; + case LSSPPT_PRIORITIZE_READS: + if (reason == LEJPCB_VAL_TRUE) + a->curr[LTY_POLICY].p->flags |= LWSSSPOLF_PRIORITIZE_READS; + break; + case LSSPPT_HTTP_WWW_FORM_URLENCODED: if (reason == LEJPCB_VAL_TRUE) a->curr[LTY_POLICY].p->flags |= @@ -1169,7 +1176,7 @@ lws_ss_policy_parse_file(struct lws_context *cx, const char *filepath) int n, m, fd = lws_open(filepath, LWS_O_RDONLY); if (fd < 0) - return -1; + return LEJP_REJECT_UNKNOWN; do { n = (int)read(fd, buf, sizeof(buf)); @@ -1208,10 +1215,8 @@ lws_ss_policy_parse(struct lws_context *context, const uint8_t *buf, size_t len) int m; #if !defined(LWS_PLAT_FREERTOS) && !defined(LWS_PLAT_OPTEE) - if (!args->jctx.line && buf[0] != '{') { - puts((const char *)buf); + if (args->jctx.line < 2 && buf[0] != '{') return lws_ss_policy_parse_file(context, (const char *)buf); - } #endif m = lejp_parse(&args->jctx, buf, (int)len); diff --git a/lib/secure-streams/protocols/ss-ws.c b/lib/secure-streams/protocols/ss-ws.c index 6757f0ad0..eed62c5ba 100644 --- a/lib/secure-streams/protocols/ss-ws.c +++ b/lib/secure-streams/protocols/ss-ws.c @@ -212,8 +212,6 @@ secstream_connect_munge_ws(lws_ss_handle_t *h, char *buf, size_t len, size_t used_in, used_out; lws_strexp_t exp; - lwsl_notice("%s\n", __func__); - /* i.path on entry is used to override the policy urlpath if not "" */ if (i->path[0]) @@ -225,6 +223,9 @@ secstream_connect_munge_ws(lws_ss_handle_t *h, char *buf, size_t len, if (h->policy->flags & LWSSSPOLF_HTTP_CACHE_COOKIES) i->ssl_connection |= LCCSCF_CACHE_COOKIES; + if (h->policy->flags & LWSSSPOLF_PRIORITIZE_READS) + i->ssl_connection |= LCCSCF_PRIORITIZE_READS; + /* protocol aux is the path part ; ws subprotocol name */ i->path = buf; @@ -238,7 +239,7 @@ secstream_connect_munge_ws(lws_ss_handle_t *h, char *buf, size_t len, i->protocol = h->policy->u.http.u.ws.subprotocol; - lwsl_notice("%s: url %s, ws subprotocol %s\n", __func__, buf, i->protocol); + lwsl_ss_info(h, "url %s, ws subprotocol %s", buf, i->protocol); return 0; } diff --git a/minimal-examples/secure-streams/minimal-secure-streams-binance/CMakeLists.txt b/minimal-examples/secure-streams/minimal-secure-streams-binance/CMakeLists.txt new file mode 100644 index 000000000..e1f36476e --- /dev/null +++ b/minimal-examples/secure-streams/minimal-secure-streams-binance/CMakeLists.txt @@ -0,0 +1,27 @@ +project(lws-minimal-secure-streams-binance C) +cmake_minimum_required(VERSION 2.8.12) +find_package(libwebsockets CONFIG REQUIRED) +list(APPEND CMAKE_MODULE_PATH ${LWS_CMAKE_DIR}) +include(CheckIncludeFile) +include(CheckCSourceCompiles) +include(LwsCheckRequirements) + +set(SAMP lws-minimal-secure-streams-binance) +set(SRCS main.c) + +set(requirements 1) +require_lws_config(LWS_ROLE_WS 1 requirements) +require_lws_config(LWS_WITH_CLIENT 1 requirements) +require_lws_config(LWS_WITHOUT_EXTENSIONS 0 requirements) +require_lws_config(LWS_WITH_SECURE_STREAMS 1 requirements) + +if (requirements) + add_executable(${SAMP} ${SRCS}) + + if (websockets_shared) + target_link_libraries(${SAMP} websockets_shared ${LIBWEBSOCKETS_DEP_LIBS}) + add_dependencies(${SAMP} websockets_shared) + else() + target_link_libraries(${SAMP} websockets ${LIBWEBSOCKETS_DEP_LIBS}) + endif() +endif() diff --git a/minimal-examples/secure-streams/minimal-secure-streams-binance/README.md b/minimal-examples/secure-streams/minimal-secure-streams-binance/README.md new file mode 100644 index 000000000..5155ddd99 --- /dev/null +++ b/minimal-examples/secure-streams/minimal-secure-streams-binance/README.md @@ -0,0 +1,56 @@ +# lws minimal secure streams binance + +This is a Secure Streams version of minimal-ws-client-binance. + +"policy.json" contains all the information about endpoints, protocols and +connection validation, tagged by streamtype name. + +The example tries to load it from the cwd, it lives in +./minimal-examples/secure-streams/minimal-secure-streams-binance dir, so +either run it from there, or copy the policy.json to your cwd. It's also +possible to put the policy json in the code as a string and pass that at +context creation time. + +The secure stream object represents a nailed-up connection that outlives any +single socket connection, and can manage reconnections / retries according to +the policy to keep the connection nailed up automatically. + +Secure Streams provides the same simplified communication api without any +protocol dependencies. + +## build + +Lws must have been built with `LWS_ROLE_WS=1`, `LWS_WITH_SECURE_STREAMS=1`, and +`LWS_WITHOUT_EXTENSIONS=0` + +``` + $ cmake . && make +``` + +## Commandline Options + +Option|Meaning +---|--- +-d|Set logging verbosity + +## usage + +``` +$ ./bin/lws-minimal-ws-client-binance +[2021/08/15 06:42:40:8409] U: LWS minimal Secure Streams binance client +[2021/08/15 06:42:40:8410] N: LWS: 4.2.99-v4.2.0-156-g8f352f65e8, NET CLI SRV H1 H2 WS SS-JSON-POL SSPROX ConMon FLTINJ IPV6-on +[2021/08/15 06:42:40:8410] N: ++ [495958|wsi|0|pipe] (1) +[2021/08/15 06:42:40:8411] N: ++ [495958|vh|0|netlink] (1) +[2021/08/15 06:42:40:8433] N: ++ [495958|vh|1|digicert||-1] (2) +[2021/08/15 06:42:40:8471] N: ++ [495958|wsiSScli|0|binance] (1) +[2021/08/15 06:42:40:8471] N: [495958|wsiSScli|0|binance]: lws_ss_check_next_state_ss: (unset) -> LWSSSCS_CREATING +[2021/08/15 06:42:40:8472] N: [495958|wsiSScli|0|binance]: lws_ss_check_next_state_ss: LWSSSCS_CREATING -> LWSSSCS_CONNECTING +[2021/08/15 06:42:40:8472] N: ++ [495958|wsicli|0|WS/h1/fstream.binance.com/([495958|wsiSScli|0|binance])] (1) +[2021/08/15 06:42:41:8802] N: [495958|wsiSScli|0|binance]: lws_ss_check_next_state_ss: LWSSSCS_CONNECTING -> LWSSSCS_CONNECTED +[2021/08/15 06:42:42:8803] N: sul_hz_cb: price: min: 4669185¢, max: 4672159¢, avg: 4670061¢, (53 prices/s) +[2021/08/15 06:42:42:8803] N: sul_hz_cb: elatency: min: 131ms, max: 292ms, avg: 154ms, (53 msg/s) +[2021/08/15 06:42:43:8803] N: sul_hz_cb: price: min: 4669646¢, max: 4672159¢, avg: 4669953¢, (34 prices/s) +[2021/08/15 06:42:43:8803] N: sul_hz_cb: elatency: min: 130ms, max: 149ms, avg: 133ms, (34 msg/s) +[2021/08/15 06:42:44:8804] N: sul_hz_cb: price: min: 4669455¢, max: 4672159¢, avg: 4669904¢, (26 prices/s) +... +``` diff --git a/minimal-examples/secure-streams/minimal-secure-streams-binance/main.c b/minimal-examples/secure-streams/minimal-secure-streams-binance/main.c new file mode 100644 index 000000000..8327f3e6d --- /dev/null +++ b/minimal-examples/secure-streams/minimal-secure-streams-binance/main.c @@ -0,0 +1,266 @@ +/* + * lws-minimal-secure-streams-binance + * + * Written in 2010-2021 by Andy Green + * Kutoga + * + * This file is made available under the Creative Commons CC0 1.0 + * Universal Public Domain Dedication. + * + * This demonstrates a Secure Streams implementation of a client that connects + * to binance ws server efficiently. + * + * Build lws with -DLWS_WITH_SECURE_STREAMS=1 -DLWS_WITHOUT_EXTENSIONS=0 + * + * "policy.json" contains all the information about endpoints, protocols and + * connection validation, tagged by streamtype name. + * + * The example tries to load it from the cwd, it lives + * in ./minimal-examples/secure-streams/minimal-secure-streams-binance dir, so + * either run it from there, or copy the policy.json to your cwd. It's also + * possible to put the policy json in the code as a string and pass that at + * context creation time. + */ + +#include +#include +#include +#include + +static int interrupted; + +typedef struct range { + uint64_t sum; + uint64_t lowest; + uint64_t highest; + + unsigned int samples; +} range_t; + +typedef struct binance { + struct lws_ss_handle *ss; + void *opaque_data; + + lws_sorted_usec_list_t sul_hz; /* 1hz summary dump */ + + range_t e_lat_range; + range_t price_range; +} binance_t; + +/****** Part 1 / 3: application data processing */ + +static void +range_reset(range_t *r) +{ + r->sum = r->highest = 0; + r->lowest = 999999999999ull; + r->samples = 0; +} + +static uint64_t +get_us_timeofday(void) +{ + struct timeval tv; + + gettimeofday(&tv, NULL); + + return (uint64_t)((lws_usec_t)tv.tv_sec * LWS_US_PER_SEC) + + (uint64_t)tv.tv_usec; +} + +static uint64_t +pennies(const char *s) +{ + uint64_t price = (uint64_t)atoll(s) * 100; + + s = strchr(s, '.'); + + if (s && isdigit(s[1]) && isdigit(s[2])) + price = price + (uint64_t)((10 * (s[1] - '0')) + (s[2] - '0')); + + return price; +} + +static void +sul_hz_cb(lws_sorted_usec_list_t *sul) +{ + binance_t *bin = lws_container_of(sul, binance_t, sul_hz); + + /* + * We are called once a second to dump statistics on the connection + */ + + lws_sul_schedule(lws_ss_get_context(bin->ss), 0, &bin->sul_hz, + sul_hz_cb, LWS_US_PER_SEC); + + if (bin->price_range.samples) + lwsl_notice("%s: price: min: %llu¢, max: %llu¢, avg: %llu¢, " + "(%d prices/s)\n", __func__, + (unsigned long long)bin->price_range.lowest, + (unsigned long long)bin->price_range.highest, + (unsigned long long)(bin->price_range.sum / + bin->price_range.samples), + bin->price_range.samples); + if (bin->e_lat_range.samples) + lwsl_notice("%s: elatency: min: %llums, max: %llums, " + "avg: %llums, (%d msg/s)\n", __func__, + (unsigned long long)bin->e_lat_range.lowest / 1000, + (unsigned long long)bin->e_lat_range.highest / 1000, + (unsigned long long)(bin->e_lat_range.sum / + bin->e_lat_range.samples) / 1000, + bin->e_lat_range.samples); + + range_reset(&bin->e_lat_range); + range_reset(&bin->price_range); +} + +/****** Part 2 / 3: communication */ + +static lws_ss_state_return_t +binance_rx(void *userobj, const uint8_t *in, size_t len, int flags) +{ + binance_t *bin = (binance_t *)userobj; + uint64_t latency_us, now_us; + char numbuf[16]; + uint64_t price; + const char *p; + size_t alen; + + now_us = (uint64_t)get_us_timeofday(); + + p = lws_json_simple_find((const char *)in, len, "\"depthUpdate\"", + &alen); + if (!p) + return LWSSSSRET_OK; + + p = lws_json_simple_find((const char *)in, len, "\"E\":", &alen); + if (!p) { + lwsl_err("%s: no E JSON\n", __func__); + return LWSSSSRET_OK; + } + + lws_strnncpy(numbuf, p, alen, sizeof(numbuf)); + latency_us = now_us - ((uint64_t)atoll(numbuf) * LWS_US_PER_MS); + + if (latency_us < bin->e_lat_range.lowest) + bin->e_lat_range.lowest = latency_us; + if (latency_us > bin->e_lat_range.highest) + bin->e_lat_range.highest = latency_us; + + bin->e_lat_range.sum += latency_us; + bin->e_lat_range.samples++; + + p = lws_json_simple_find((const char *)in, len, "\"a\":[[\"", &alen); + if (!p) + return LWSSSSRET_OK; + + lws_strnncpy(numbuf, p, alen, sizeof(numbuf)); + price = pennies(numbuf); + + if (price < bin->price_range.lowest) + bin->price_range.lowest = price; + if (price > bin->price_range.highest) + bin->price_range.highest = price; + + bin->price_range.sum += price; + bin->price_range.samples++; + + return LWSSSSRET_OK; +} + +static lws_ss_state_return_t +binance_state(void *userobj, void *h_src, lws_ss_constate_t state, + lws_ss_tx_ordinal_t ack) +{ + binance_t *bin = (binance_t *)userobj; + + lwsl_ss_info(bin->ss, "%s (%d), ord 0x%x", + lws_ss_state_name((int)state), state, (unsigned int)ack); + + switch (state) { + + case LWSSSCS_CONNECTED: + lws_sul_schedule(lws_ss_get_context(bin->ss), 0, &bin->sul_hz, + sul_hz_cb, LWS_US_PER_SEC); + range_reset(&bin->e_lat_range); + range_reset(&bin->price_range); + + return LWSSSSRET_OK; + + case LWSSSCS_DISCONNECTED: + lws_sul_cancel(&bin->sul_hz); + break; + + default: + break; + } + + return LWSSSSRET_OK; +} + +static const lws_ss_info_t ssi_binance = { + .handle_offset = offsetof(binance_t, ss), + .opaque_user_data_offset = offsetof(binance_t, opaque_data), + .rx = binance_rx, + .state = binance_state, + .user_alloc = sizeof(binance_t), + .streamtype = "binance", /* bind to corresponding policy */ +}; + +/****** Part 3 / 3: init and event loop */ + +static const struct lws_extension extensions[] = { + { + "permessage-deflate", lws_extension_callback_pm_deflate, + "permessage-deflate" "; client_no_context_takeover" + "; client_max_window_bits" + }, + { NULL, NULL, NULL /* terminator */ } +}; + +static void +sigint_handler(int sig) +{ + interrupted = 1; +} + +int main(int argc, const char **argv) +{ + struct lws_context_creation_info info; + struct lws_context *cx; + int n = 0; + + signal(SIGINT, sigint_handler); + + memset(&info, 0, sizeof info); + lws_cmdline_option_handle_builtin(argc, argv, &info); + + lwsl_user("LWS minimal Secure Streams binance client\n"); + + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT | + LWS_SERVER_OPTION_EXPLICIT_VHOSTS; + info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ + info.fd_limit_per_thread = 1 + 1 + 1; + info.extensions = extensions; + info.pss_policies_json = "policy.json"; /* literal JSON, or path */ + + cx = lws_create_context(&info); + if (!cx) { + lwsl_err("lws init failed\n"); + return 1; + } + + if (lws_ss_create(cx, 0, &ssi_binance, NULL, NULL, NULL, NULL)) { + lwsl_cx_err(cx, "failed to create secure stream"); + interrupted = 1; + } + + while (n >= 0 && !interrupted) + n = lws_service(cx, 0); + + lws_context_destroy(cx); + + lwsl_user("Completed\n"); + + return 0; +} diff --git a/minimal-examples/secure-streams/minimal-secure-streams-binance/policy.json b/minimal-examples/secure-streams/minimal-secure-streams-binance/policy.json new file mode 100644 index 000000000..1ff4e0413 --- /dev/null +++ b/minimal-examples/secure-streams/minimal-secure-streams-binance/policy.json @@ -0,0 +1,38 @@ +{ + "release": "01234567", + "product": "myproduct", + "schema-version": 1, + "retry": [{ + "default": { + "backoff": [1000, 2000, 3000, 4000, 5000], + "conceal": 65535, + "jitterpc": 20, + "svalidping": 30, + "svalidhup": 35 + } + }], + "certs": [{ + "digicert_global_root": "MIIDrzCCApegAwIBAgIQCDvgVpBCRrGhdWrJWZHHSjANBgkqhkiG9w0BAQUFADBhMQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBDQTAeFw0wNjExMTAwMDAwMDBaFw0zMTExMTAwMDAwMDBaMGExCzAJBgNVBAYTAlVTMRUwEwYDVQQKEwxEaWdpQ2VydCBJbmMxGTAXBgNVBAsTEHd3dy5kaWdpY2VydC5jb20xIDAeBgNVBAMTF0RpZ2lDZXJ0IEdsb2JhbCBSb290IENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4jvhEXLeqKTTo1eqUKKPC3eQyaKl7hLOllsBCSDMAZOnTjC3U/dDxGkAV53ijSLdhwZAAIEJzs4bg7/fzTtxRuLWZscFs3YnFo97nh6Vfe63SKMI2tavegw5BmV/Sl0fvBf4q77uKNd0f3p4mVmFaG5cIzJLv07A6Fpt43C/dxC//AH2hdmoRBBYMql1GNXRor5H4idq9Joz+EkIYIvUX7Q6hL+hqkpMfT7PT19sdl6gSzeRntwi5m3OFBqOasv+zbMUZBfHWymeMr/y7vrTC0LUq7dBMtoM1O/4gdW7jVg/tRvoSSiicNoxBN33shbyTApOB6jtSj1etX+jkMOvJwIDAQABo2MwYTAOBgNVHQ8BAf8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUA95QNVbRTLtm8KPiGxvDl7I90VUwHwYDVR0jBBgwFoAUA95QNVbRTLtm8KPiGxvDl7I90VUwDQYJKoZIhvcNAQEFBQADggEBAMucN6pIExIK+t1EnE9SsPTfrgT1eXkIoyQY/EsrhMAtudXH/vTBH1jLuG2cenTnmCmrEbXjcKChzUyImZOMkXDiqw8cvpOp/2PV5Adg06O/nVsJ8dWO41P0jmP6P6fbtGbfYmbW0W5BjfIttep3Sp+dWOIrWcBAI+0tKIJFPnlUkiaY4IBIqDfv8NZ5YBberOgOzW6sRBc4L0na4UU+Krk2U886UAb3LujEV0lsYSEY1QSteDwsOoBrp+uvFRTp2InBuThs4pFsiv9kuXclVzDAGySj4dzp30d8tbQkCAUw7C29C79Fv1C5qfPrmAESrciIxpg0X40KPMbp1ZWVbd4=" + } + ], + "trust_stores": [{ + "name": "digicert", + "stack": ["digicert_global_root"] + } + ], + "s": [ + { "binance": { + "endpoint": "fstream.binance.com", + "port": 443, + "protocol": "ws", + "http_url": "/stream?streams=btcusdt@depth@0ms/btcusdt@bookTicker/btcusdt@aggTrade", + "nailed_up": true, + "ws_prioritize_reads": true, + "tls": true, + "tls_trust_store": "digicert", + "retry": "default" + } + } + ] +} +