diff --git a/minimal-examples/ws-client/minimal-ws-client-binance/CMakeLists.txt b/minimal-examples/ws-client/minimal-ws-client-binance/CMakeLists.txt new file mode 100644 index 000000000..7c36f0d95 --- /dev/null +++ b/minimal-examples/ws-client/minimal-ws-client-binance/CMakeLists.txt @@ -0,0 +1,26 @@ +project(lws-minimal-ws-client-binance C) +cmake_minimum_required(VERSION 2.8) +find_package(libwebsockets CONFIG REQUIRED) +list(APPEND CMAKE_MODULE_PATH ${LWS_CMAKE_DIR}) +include(CheckIncludeFile) +include(CheckCSourceCompiles) +include(LwsCheckRequirements) + +set(SAMP lws-minimal-ws-client-binance) +set(SRCS main.c) + +set(requirements 1) +require_lws_config(LWS_ROLE_WS 1 requirements) +require_lws_config(LWS_WITH_CLIENT 1 requirements) +require_lws_config(LWS_WITHOUT_EXTENSIONS 0 requirements) + +if (requirements) + add_executable(${SAMP} ${SRCS}) + + if (websockets_shared) + target_link_libraries(${SAMP} websockets_shared ${LIBWEBSOCKETS_DEP_LIBS}) + add_dependencies(${SAMP} websockets_shared) + else() + target_link_libraries(${SAMP} websockets ${LIBWEBSOCKETS_DEP_LIBS}) + endif() +endif() diff --git a/minimal-examples/ws-client/minimal-ws-client-binance/README.md b/minimal-examples/ws-client/minimal-ws-client-binance/README.md new file mode 100644 index 000000000..7809a0be0 --- /dev/null +++ b/minimal-examples/ws-client/minimal-ws-client-binance/README.md @@ -0,0 +1,67 @@ +# lws minimal ws client binance + +This connects to the binance ws server and monitors transactions with +an eye on low latency. + +Latency seems to be associated with server-side coalescing at tls +layer, and the coalescing at server side seems somewhat correlated to number +of transactions per second, which seems to cause increased packet sizes from the +server as a reaction. The relationship is more complex probably according to what +actually happens at the server backend, but it seems to be broadly related +reliably. + +Typically when showing low latency at ~70msg/s, the messages on the wire are +eg, ~70 byte packets containing small tls records + +10:14:40.682293 IP ec2-54-249-113-172.ap-northeast-1.compute.amazonaws.com.https > constance.42952: Flags [P.], seq 50846:50927, ack 1, win 11, options [nop,nop,TS val 366445630 ecr 3893437035], length 81 + +under pressure from increased messages per second, the tls records increase above 2KB + +08:06:02.825160 IP ec2-54-249-113-172.ap-northeast-1.compute.amazonaws.com.https > constance.42688: Flags [.], seq 512319:513643, ack 1, win 11, options [nop,nop,TS val 3990208942 ecr 3885719233], length 1324 +08:06:02.825290 IP constance.42688 > ec2-54-249-113-172.ap-northeast-1.compute.amazonaws.com.https: Flags [.], ack 513643, win 14248, options [nop,nop,TS val 3885719479 ecr 3990208942], length 0 +08:06:02.891646 IP ec2-54-249-113-172.ap-northeast-1.compute.amazonaws.com.https > constance.42688: Flags [.], seq 513643:516291, ack 1, win 11, options [nop,nop,TS val 3990209006 ecr 3885719296], length 2648 + +The larger the packets, the longer the first item in the packet had to +wait before it was sent, and a tls record cannot be authenticated until +all of it has been received. + +The example circumvents this somewhat by using `permessage_deflate`, which reduces +the packet size before tls by applying compression, making even coalesced packets +smaller, and a new option for adjusting how lws manages conflicting requirements to +clear pending rx and allow interleaved tx, `LCCSCF_PRIORITIZE_READS` that causes the +stream to prioritize handling any pending rx, not just pending at ssl layer, in one +event loop trip. + +## build + +Lws must have been built with `LWS_ROLE_WS=1` and `LWS_WITHOUT_EXTENSIONS=0` + +``` + $ cmake . && make +``` + +## Commandline Options + +Option|Meaning +---|--- +-d|Set logging verbosity + +## usage + +``` +$ ./bin/lws-minimal-ws-client-binance +[2020/08/23 10:22:49:3003] U: LWS minimal binance client +[2020/08/23 10:22:49:3005] N: LWS: 4.0.99-v4.1.0-rc2-4-g3cf133aef, loglevel 1031 +[2020/08/23 10:22:49:3005] N: NET CLI SRV H1 H2 WS MQTT SS-JSON-POL SSPROX ASYNC_DNS IPv6-absent +[2020/08/23 10:22:50:8243] N: checking client ext permessage-deflate +[2020/08/23 10:22:50:8244] N: instantiating client ext permessage-deflate +[2020/08/23 10:22:50:8244] U: callback_minimal: established +[2020/08/23 10:22:51:8244] N: sul_hz_cb: price: min: 1160284¢, max: 1163794¢, avg: 1160516¢, (150 prices/s) +[2020/08/23 10:22:51:8245] N: sul_hz_cb: elatency: min: 112ms, max: 547ms, avg: 259ms, (155 msg/s) +[2020/08/23 10:22:52:8244] N: sul_hz_cb: price: min: 1160287¢, max: 1178845¢, avg: 1160897¢, (112 prices/s) +[2020/08/23 10:22:52:8245] N: sul_hz_cb: elatency: min: 111ms, max: 226ms, avg: 152ms, (134 msg/s) +[2020/08/23 10:22:53:8247] N: sul_hz_cb: price: min: 1160287¢, max: 1168005¢, avg: 1160806¢, (86 prices/s) +[2020/08/23 10:22:53:8248] N: sul_hz_cb: elatency: min: 112ms, max: 476ms, avg: 287ms, (101 msg/s) +[2020/08/23 10:22:54:8247] N: sul_hz_cb: price: min: 1160284¢, max: 1162780¢, avg: 1160698¢, (71 prices/s) +... +``` diff --git a/minimal-examples/ws-client/minimal-ws-client-binance/main.c b/minimal-examples/ws-client/minimal-ws-client-binance/main.c new file mode 100644 index 000000000..1639fcf78 --- /dev/null +++ b/minimal-examples/ws-client/minimal-ws-client-binance/main.c @@ -0,0 +1,341 @@ +/* + * lws-minimal-ws-client-binance + * + * Written in 2010-2020 by Andy Green + * Kutoga + * + * This file is made available under the Creative Commons CC0 1.0 + * Universal Public Domain Dedication. + * + * This demonstrates a ws client that connects to binance ws server efficiently + */ + +#include +#include +#include +#include + +typedef struct range { + lws_usec_t sum; + lws_usec_t lowest; + lws_usec_t highest; + + int samples; +} range_t; + +/* + * This represents your object that "contains" the client connection and has + * the client connection bound to it + */ + +static struct my_conn { + lws_sorted_usec_list_t sul; /* schedule connection retry */ + lws_sorted_usec_list_t sul_hz; /* 1hz summary */ + + range_t e_lat_range; + range_t price_range; + + struct lws *wsi; /* related wsi if any */ + uint16_t retry_count; /* count of consequetive retries */ +} mco; + +static struct lws_context *context; +static int interrupted; + +/* + * The retry and backoff policy we want to use for our client connections + */ + +static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 }; + +static const lws_retry_bo_t retry = { + .retry_ms_table = backoff_ms, + .retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms), + .conceal_count = LWS_ARRAY_SIZE(backoff_ms), + + .secs_since_valid_ping = 400, /* force PINGs after secs idle */ + .secs_since_valid_hangup = 400, /* hangup after secs idle */ + + .jitter_percent = 0, +}; + +/* + * If we don't enable permessage-deflate ws extension, during times when there + * are many ws messages per second the server coalesces them inside a smaller + * number of larger ssl records, for >100 mps typically >2048 records. + * + * This is a problem, because the coalesced record cannot be send nor decrypted + * until the last part of the record is received, meaning additional latency + * for the earlier members of the coalesced record that have just been sitting + * there waiting for the last one to go out and be decrypted. + * + * permessage-deflate reduces the data size before the tls layer, for >100mps + * reducing the colesced records to ~1.2KB. + */ + +static const struct lws_extension extensions[] = { + { + "permessage-deflate", + lws_extension_callback_pm_deflate, + "permessage-deflate" + "; client_no_context_takeover" + "; client_max_window_bits" + }, + { NULL, NULL, NULL /* terminator */ } +}; +/* + * Scheduled sul callback that starts the connection attempt + */ + +static void +connect_client(lws_sorted_usec_list_t *sul) +{ + struct my_conn *mco = lws_container_of(sul, struct my_conn, sul); + struct lws_client_connect_info i; + + memset(&i, 0, sizeof(i)); + + i.context = context; + i.port = 443; + i.address = "fstream.binance.com"; + i.path = "/stream?" + "streams=btcusdt@depth@0ms/btcusdt@bookTicker/btcusdt@aggTrade"; + i.host = i.address; + i.origin = i.address; + i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_PRIORITIZE_READS; + i.protocol = NULL; + i.local_protocol_name = "lws-minimal-client"; + i.pwsi = &mco->wsi; + i.retry_and_idle_policy = &retry; + i.userdata = mco; + + if (!lws_client_connect_via_info(&i)) + /* + * Failed... schedule a retry... we can't use the _retry_wsi() + * convenience wrapper api here because no valid wsi at this + * point. + */ + if (lws_retry_sul_schedule(context, 0, sul, &retry, + connect_client, &mco->retry_count)) { + lwsl_err("%s: connection attempts exhausted\n", __func__); + interrupted = 1; + } +} + +static void +range_reset(range_t *r) +{ + r->sum = r->highest = 0; + r->lowest = 999999999999ull; + r->samples = 0; +} + +static uint64_t +get_us_timeofday(void) +{ + struct timeval tv; + + gettimeofday(&tv, NULL); + + return ((lws_usec_t)tv.tv_sec * LWS_US_PER_SEC) + tv.tv_usec; +} + +static void +sul_hz_cb(lws_sorted_usec_list_t *sul) +{ + struct my_conn *mco = lws_container_of(sul, struct my_conn, sul_hz); + + /* + * We are called once a second to dump statistics on the connection + */ + + lws_sul_schedule(lws_get_context(mco->wsi), 0, &mco->sul_hz, + sul_hz_cb, LWS_US_PER_SEC); + + if (mco->price_range.samples) + lwsl_notice("%s: price: min: %llu¢, max: %llu¢, avg: %llu¢, " + "(%d prices/s)\n", + __func__, + (unsigned long long)mco->price_range.lowest, + (unsigned long long)mco->price_range.highest, + (unsigned long long)(mco->price_range.sum / mco->price_range.samples), + mco->price_range.samples); + if (mco->e_lat_range.samples) + lwsl_notice("%s: elatency: min: %llums, max: %llums, " + "avg: %llums, (%d msg/s)\n", __func__, + (unsigned long long)mco->e_lat_range.lowest / 1000, + (unsigned long long)mco->e_lat_range.highest / 1000, + (unsigned long long)(mco->e_lat_range.sum / + mco->e_lat_range.samples) / 1000, + mco->e_lat_range.samples); + + range_reset(&mco->e_lat_range); + range_reset(&mco->price_range); +} + +static uint64_t +pennies(const char *s) +{ + uint64_t price = (uint64_t)atoll(s) * 100; + + s = strchr(s, '.'); + + if (s && isdigit(s[1]) && isdigit(s[2])) + price += (10 * (s[1] - '0')) + (s[2] - '0'); + + return price; +} + +static int +callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len) +{ + struct my_conn *mco = (struct my_conn *)user; + lws_usec_t latency_us, now_us, price; + char numbuf[16]; + const char *p; + size_t alen; + + switch (reason) { + + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", + in ? (char *)in : "(null)"); + goto do_retry; + break; + + case LWS_CALLBACK_CLIENT_RECEIVE: + /* + * The messages are a few 100 bytes of JSON each + */ + + // lwsl_hexdump_notice(in, len); + + now_us = get_us_timeofday(); + + p = lws_json_simple_find((const char *)in, len, + "\"depthUpdate\"", &alen); + /* + * Only the JSON with depthUpdate init has the numbers we care + * about as well + */ + if (!p) + break; + + p = lws_json_simple_find((const char *)in, len, "\"E\":", &alen); + if (!p) { + lwsl_err("%s: no E JSON\n", __func__); + break; + } + lws_strnncpy(numbuf, p, alen, sizeof(numbuf)); + latency_us = now_us - + ((lws_usec_t)atoll(numbuf) * LWS_US_PER_MS); + + if (latency_us < mco->e_lat_range.lowest) + mco->e_lat_range.lowest = latency_us; + if (latency_us > mco->e_lat_range.highest) + mco->e_lat_range.highest = latency_us; + + mco->e_lat_range.sum += latency_us; + mco->e_lat_range.samples++; + + p = lws_json_simple_find((const char *)in, len, + "\"a\":[[\"", &alen); + if (p) { + lws_strnncpy(numbuf, p, alen, sizeof(numbuf)); + price = pennies(numbuf); + + if (price < mco->price_range.lowest) + mco->price_range.lowest = price; + if (price > mco->price_range.highest) + mco->price_range.highest = price; + + mco->price_range.sum += price; + mco->price_range.samples++; + } + break; + + case LWS_CALLBACK_CLIENT_ESTABLISHED: + lwsl_user("%s: established\n", __func__); + lws_sul_schedule(lws_get_context(wsi), 0, &mco->sul_hz, + sul_hz_cb, LWS_US_PER_SEC); + mco->wsi = wsi; + range_reset(&mco->e_lat_range); + range_reset(&mco->price_range); + break; + + case LWS_CALLBACK_CLIENT_CLOSED: + goto do_retry; + + default: + break; + } + + return lws_callback_http_dummy(wsi, reason, user, in, len); + +do_retry: + /* + * retry the connection to keep it nailed up + * + * For this example, we try to conceal any problem for one set of + * backoff retries and then exit the app. + * + * If you set retry.conceal_count to be larger than the number of + * elements in the backoff table, it will never give up and keep + * retrying at the last backoff delay plus the random jitter amount. + */ + if (lws_retry_sul_schedule_retry_wsi(wsi, &mco->sul, connect_client, + &mco->retry_count)) { + lwsl_err("%s: connection attempts exhausted\n", __func__); + interrupted = 1; + } + + return 0; +} + +static const struct lws_protocols protocols[] = { + { "lws-minimal-client", callback_minimal, 0, 0, }, + { NULL, NULL, 0, 0 } +}; + +static void +sigint_handler(int sig) +{ + interrupted = 1; +} + +int main(int argc, const char **argv) +{ + struct lws_context_creation_info info; + int n = 0; + + signal(SIGINT, sigint_handler); + memset(&info, 0, sizeof info); + lws_cmdline_option_handle_builtin(argc, argv, &info); + + lwsl_user("LWS minimal binance client\n"); + + info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; + info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ + info.protocols = protocols; + info.fd_limit_per_thread = 1 + 1 + 1; + info.extensions = extensions; + + context = lws_create_context(&info); + if (!context) { + lwsl_err("lws init failed\n"); + return 1; + } + + /* schedule the first client connection attempt to happen immediately */ + lws_sul_schedule(context, 0, &mco.sul, connect_client, 1); + + while (n >= 0 && !interrupted) + n = lws_service(context, 0); + + lws_context_destroy(context); + lwsl_user("Completed\n"); + + return 0; +} +