1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

ss: port binance example

This commit is contained in:
Andy Green 2021-08-13 14:13:08 +01:00
parent c34e24392e
commit 81a3ca3e84
9 changed files with 409 additions and 9 deletions

View file

@ -162,6 +162,8 @@ enum {
/**< metadata as direct protocol string, e.g. http header */
LWSSSPOLF_HTTP_CACHE_COOKIES = (1 << 24),
/**< Record http cookies and pass them back on future requests */
LWSSSPOLF_PRIORITIZE_READS = (1 << 25),
/**< prioritize clearing reads at expense of writes */
};

View file

@ -781,7 +781,7 @@ lws_create_context(const struct lws_context_creation_info *info)
#endif /* network */
lwsl_cx_notice(context, "LWS: %s, %s%s", library_version, opts_str, s);
lwsl_notice("LWS: %s, %s%s\n", library_version, opts_str, s);
#if defined(LWS_WITH_NETWORK)
lwsl_cx_info(context, "Event loop: %s", plev->ops->name);
#endif

View file

@ -185,7 +185,7 @@ An array of ms delays for each retry in turn
The number of retries to conceal from higher layers before giving errors. If
this is larger than the number of times in the backoff array, then the last time
is used for the extra delays
is used for the extra delays. 65535 means never stop trying.
### `jitterpc`
@ -642,6 +642,11 @@ protocol. Eg, a single multipart mime transaction carries content from two or m
Use if the ws messages are binary
### `ws_prioritize_reads`
Set `true` if the event loop should prioritize keeping up with input at the
potential expense of output latency.
## MQTT transport
### `mqtt_topic`

View file

@ -64,6 +64,7 @@ static const char * const lejp_tokens_policy[] = {
"s[].*.attr_high_reliability",
"s[].*.attr_low_cost",
"s[].*.long_poll",
"s[].*.ws_prioritize_reads",
"s[].*.retry",
"s[].*.timeout_ms",
"s[].*.perf",
@ -167,6 +168,7 @@ typedef enum {
LSSPPT_ATTR_HIGH_RELIABILITY,
LSSPPT_ATTR_LOW_COST,
LSSPPT_LONG_POLL,
LSSPPT_PRIORITIZE_READS,
LSSPPT_RETRYPTR,
LSSPPT_DEFAULT_TIMEOUT_MS,
LSSPPT_PERF,
@ -758,6 +760,11 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
if (reason == LEJPCB_VAL_TRUE)
a->curr[LTY_POLICY].p->flags |= LWSSSPOLF_LONG_POLL;
break;
case LSSPPT_PRIORITIZE_READS:
if (reason == LEJPCB_VAL_TRUE)
a->curr[LTY_POLICY].p->flags |= LWSSSPOLF_PRIORITIZE_READS;
break;
case LSSPPT_HTTP_WWW_FORM_URLENCODED:
if (reason == LEJPCB_VAL_TRUE)
a->curr[LTY_POLICY].p->flags |=
@ -1169,7 +1176,7 @@ lws_ss_policy_parse_file(struct lws_context *cx, const char *filepath)
int n, m, fd = lws_open(filepath, LWS_O_RDONLY);
if (fd < 0)
return -1;
return LEJP_REJECT_UNKNOWN;
do {
n = (int)read(fd, buf, sizeof(buf));
@ -1208,10 +1215,8 @@ lws_ss_policy_parse(struct lws_context *context, const uint8_t *buf, size_t len)
int m;
#if !defined(LWS_PLAT_FREERTOS) && !defined(LWS_PLAT_OPTEE)
if (!args->jctx.line && buf[0] != '{') {
puts((const char *)buf);
if (args->jctx.line < 2 && buf[0] != '{')
return lws_ss_policy_parse_file(context, (const char *)buf);
}
#endif
m = lejp_parse(&args->jctx, buf, (int)len);

View file

@ -212,8 +212,6 @@ secstream_connect_munge_ws(lws_ss_handle_t *h, char *buf, size_t len,
size_t used_in, used_out;
lws_strexp_t exp;
lwsl_notice("%s\n", __func__);
/* i.path on entry is used to override the policy urlpath if not "" */
if (i->path[0])
@ -225,6 +223,9 @@ secstream_connect_munge_ws(lws_ss_handle_t *h, char *buf, size_t len,
if (h->policy->flags & LWSSSPOLF_HTTP_CACHE_COOKIES)
i->ssl_connection |= LCCSCF_CACHE_COOKIES;
if (h->policy->flags & LWSSSPOLF_PRIORITIZE_READS)
i->ssl_connection |= LCCSCF_PRIORITIZE_READS;
/* protocol aux is the path part ; ws subprotocol name */
i->path = buf;
@ -238,7 +239,7 @@ secstream_connect_munge_ws(lws_ss_handle_t *h, char *buf, size_t len,
i->protocol = h->policy->u.http.u.ws.subprotocol;
lwsl_notice("%s: url %s, ws subprotocol %s\n", __func__, buf, i->protocol);
lwsl_ss_info(h, "url %s, ws subprotocol %s", buf, i->protocol);
return 0;
}

View file

@ -0,0 +1,27 @@
project(lws-minimal-secure-streams-binance C)
cmake_minimum_required(VERSION 2.8.12)
find_package(libwebsockets CONFIG REQUIRED)
list(APPEND CMAKE_MODULE_PATH ${LWS_CMAKE_DIR})
include(CheckIncludeFile)
include(CheckCSourceCompiles)
include(LwsCheckRequirements)
set(SAMP lws-minimal-secure-streams-binance)
set(SRCS main.c)
set(requirements 1)
require_lws_config(LWS_ROLE_WS 1 requirements)
require_lws_config(LWS_WITH_CLIENT 1 requirements)
require_lws_config(LWS_WITHOUT_EXTENSIONS 0 requirements)
require_lws_config(LWS_WITH_SECURE_STREAMS 1 requirements)
if (requirements)
add_executable(${SAMP} ${SRCS})
if (websockets_shared)
target_link_libraries(${SAMP} websockets_shared ${LIBWEBSOCKETS_DEP_LIBS})
add_dependencies(${SAMP} websockets_shared)
else()
target_link_libraries(${SAMP} websockets ${LIBWEBSOCKETS_DEP_LIBS})
endif()
endif()

View file

@ -0,0 +1,56 @@
# lws minimal secure streams binance
This is a Secure Streams version of minimal-ws-client-binance.
"policy.json" contains all the information about endpoints, protocols and
connection validation, tagged by streamtype name.
The example tries to load it from the cwd, it lives in
./minimal-examples/secure-streams/minimal-secure-streams-binance dir, so
either run it from there, or copy the policy.json to your cwd. It's also
possible to put the policy json in the code as a string and pass that at
context creation time.
The secure stream object represents a nailed-up connection that outlives any
single socket connection, and can manage reconnections / retries according to
the policy to keep the connection nailed up automatically.
Secure Streams provides the same simplified communication api without any
protocol dependencies.
## build
Lws must have been built with `LWS_ROLE_WS=1`, `LWS_WITH_SECURE_STREAMS=1`, and
`LWS_WITHOUT_EXTENSIONS=0`
```
$ cmake . && make
```
## Commandline Options
Option|Meaning
---|---
-d|Set logging verbosity
## usage
```
$ ./bin/lws-minimal-ws-client-binance
[2021/08/15 06:42:40:8409] U: LWS minimal Secure Streams binance client
[2021/08/15 06:42:40:8410] N: LWS: 4.2.99-v4.2.0-156-g8f352f65e8, NET CLI SRV H1 H2 WS SS-JSON-POL SSPROX ConMon FLTINJ IPV6-on
[2021/08/15 06:42:40:8410] N: ++ [495958|wsi|0|pipe] (1)
[2021/08/15 06:42:40:8411] N: ++ [495958|vh|0|netlink] (1)
[2021/08/15 06:42:40:8433] N: ++ [495958|vh|1|digicert||-1] (2)
[2021/08/15 06:42:40:8471] N: ++ [495958|wsiSScli|0|binance] (1)
[2021/08/15 06:42:40:8471] N: [495958|wsiSScli|0|binance]: lws_ss_check_next_state_ss: (unset) -> LWSSSCS_CREATING
[2021/08/15 06:42:40:8472] N: [495958|wsiSScli|0|binance]: lws_ss_check_next_state_ss: LWSSSCS_CREATING -> LWSSSCS_CONNECTING
[2021/08/15 06:42:40:8472] N: ++ [495958|wsicli|0|WS/h1/fstream.binance.com/([495958|wsiSScli|0|binance])] (1)
[2021/08/15 06:42:41:8802] N: [495958|wsiSScli|0|binance]: lws_ss_check_next_state_ss: LWSSSCS_CONNECTING -> LWSSSCS_CONNECTED
[2021/08/15 06:42:42:8803] N: sul_hz_cb: price: min: 4669185¢, max: 4672159¢, avg: 4670061¢, (53 prices/s)
[2021/08/15 06:42:42:8803] N: sul_hz_cb: elatency: min: 131ms, max: 292ms, avg: 154ms, (53 msg/s)
[2021/08/15 06:42:43:8803] N: sul_hz_cb: price: min: 4669646¢, max: 4672159¢, avg: 4669953¢, (34 prices/s)
[2021/08/15 06:42:43:8803] N: sul_hz_cb: elatency: min: 130ms, max: 149ms, avg: 133ms, (34 msg/s)
[2021/08/15 06:42:44:8804] N: sul_hz_cb: price: min: 4669455¢, max: 4672159¢, avg: 4669904¢, (26 prices/s)
...
```

View file

@ -0,0 +1,266 @@
/*
* lws-minimal-secure-streams-binance
*
* Written in 2010-2021 by Andy Green <andy@warmcat.com>
* Kutoga <kutoga@user.github.invalid>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* This demonstrates a Secure Streams implementation of a client that connects
* to binance ws server efficiently.
*
* Build lws with -DLWS_WITH_SECURE_STREAMS=1 -DLWS_WITHOUT_EXTENSIONS=0
*
* "policy.json" contains all the information about endpoints, protocols and
* connection validation, tagged by streamtype name.
*
* The example tries to load it from the cwd, it lives
* in ./minimal-examples/secure-streams/minimal-secure-streams-binance dir, so
* either run it from there, or copy the policy.json to your cwd. It's also
* possible to put the policy json in the code as a string and pass that at
* context creation time.
*/
#include <libwebsockets.h>
#include <string.h>
#include <signal.h>
#include <ctype.h>
static int interrupted;
typedef struct range {
uint64_t sum;
uint64_t lowest;
uint64_t highest;
unsigned int samples;
} range_t;
typedef struct binance {
struct lws_ss_handle *ss;
void *opaque_data;
lws_sorted_usec_list_t sul_hz; /* 1hz summary dump */
range_t e_lat_range;
range_t price_range;
} binance_t;
/****** Part 1 / 3: application data processing */
static void
range_reset(range_t *r)
{
r->sum = r->highest = 0;
r->lowest = 999999999999ull;
r->samples = 0;
}
static uint64_t
get_us_timeofday(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (uint64_t)((lws_usec_t)tv.tv_sec * LWS_US_PER_SEC) +
(uint64_t)tv.tv_usec;
}
static uint64_t
pennies(const char *s)
{
uint64_t price = (uint64_t)atoll(s) * 100;
s = strchr(s, '.');
if (s && isdigit(s[1]) && isdigit(s[2]))
price = price + (uint64_t)((10 * (s[1] - '0')) + (s[2] - '0'));
return price;
}
static void
sul_hz_cb(lws_sorted_usec_list_t *sul)
{
binance_t *bin = lws_container_of(sul, binance_t, sul_hz);
/*
* We are called once a second to dump statistics on the connection
*/
lws_sul_schedule(lws_ss_get_context(bin->ss), 0, &bin->sul_hz,
sul_hz_cb, LWS_US_PER_SEC);
if (bin->price_range.samples)
lwsl_notice("%s: price: min: %llu¢, max: %llu¢, avg: %llu¢, "
"(%d prices/s)\n", __func__,
(unsigned long long)bin->price_range.lowest,
(unsigned long long)bin->price_range.highest,
(unsigned long long)(bin->price_range.sum /
bin->price_range.samples),
bin->price_range.samples);
if (bin->e_lat_range.samples)
lwsl_notice("%s: elatency: min: %llums, max: %llums, "
"avg: %llums, (%d msg/s)\n", __func__,
(unsigned long long)bin->e_lat_range.lowest / 1000,
(unsigned long long)bin->e_lat_range.highest / 1000,
(unsigned long long)(bin->e_lat_range.sum /
bin->e_lat_range.samples) / 1000,
bin->e_lat_range.samples);
range_reset(&bin->e_lat_range);
range_reset(&bin->price_range);
}
/****** Part 2 / 3: communication */
static lws_ss_state_return_t
binance_rx(void *userobj, const uint8_t *in, size_t len, int flags)
{
binance_t *bin = (binance_t *)userobj;
uint64_t latency_us, now_us;
char numbuf[16];
uint64_t price;
const char *p;
size_t alen;
now_us = (uint64_t)get_us_timeofday();
p = lws_json_simple_find((const char *)in, len, "\"depthUpdate\"",
&alen);
if (!p)
return LWSSSSRET_OK;
p = lws_json_simple_find((const char *)in, len, "\"E\":", &alen);
if (!p) {
lwsl_err("%s: no E JSON\n", __func__);
return LWSSSSRET_OK;
}
lws_strnncpy(numbuf, p, alen, sizeof(numbuf));
latency_us = now_us - ((uint64_t)atoll(numbuf) * LWS_US_PER_MS);
if (latency_us < bin->e_lat_range.lowest)
bin->e_lat_range.lowest = latency_us;
if (latency_us > bin->e_lat_range.highest)
bin->e_lat_range.highest = latency_us;
bin->e_lat_range.sum += latency_us;
bin->e_lat_range.samples++;
p = lws_json_simple_find((const char *)in, len, "\"a\":[[\"", &alen);
if (!p)
return LWSSSSRET_OK;
lws_strnncpy(numbuf, p, alen, sizeof(numbuf));
price = pennies(numbuf);
if (price < bin->price_range.lowest)
bin->price_range.lowest = price;
if (price > bin->price_range.highest)
bin->price_range.highest = price;
bin->price_range.sum += price;
bin->price_range.samples++;
return LWSSSSRET_OK;
}
static lws_ss_state_return_t
binance_state(void *userobj, void *h_src, lws_ss_constate_t state,
lws_ss_tx_ordinal_t ack)
{
binance_t *bin = (binance_t *)userobj;
lwsl_ss_info(bin->ss, "%s (%d), ord 0x%x",
lws_ss_state_name((int)state), state, (unsigned int)ack);
switch (state) {
case LWSSSCS_CONNECTED:
lws_sul_schedule(lws_ss_get_context(bin->ss), 0, &bin->sul_hz,
sul_hz_cb, LWS_US_PER_SEC);
range_reset(&bin->e_lat_range);
range_reset(&bin->price_range);
return LWSSSSRET_OK;
case LWSSSCS_DISCONNECTED:
lws_sul_cancel(&bin->sul_hz);
break;
default:
break;
}
return LWSSSSRET_OK;
}
static const lws_ss_info_t ssi_binance = {
.handle_offset = offsetof(binance_t, ss),
.opaque_user_data_offset = offsetof(binance_t, opaque_data),
.rx = binance_rx,
.state = binance_state,
.user_alloc = sizeof(binance_t),
.streamtype = "binance", /* bind to corresponding policy */
};
/****** Part 3 / 3: init and event loop */
static const struct lws_extension extensions[] = {
{
"permessage-deflate", lws_extension_callback_pm_deflate,
"permessage-deflate" "; client_no_context_takeover"
"; client_max_window_bits"
},
{ NULL, NULL, NULL /* terminator */ }
};
static void
sigint_handler(int sig)
{
interrupted = 1;
}
int main(int argc, const char **argv)
{
struct lws_context_creation_info info;
struct lws_context *cx;
int n = 0;
signal(SIGINT, sigint_handler);
memset(&info, 0, sizeof info);
lws_cmdline_option_handle_builtin(argc, argv, &info);
lwsl_user("LWS minimal Secure Streams binance client\n");
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT |
LWS_SERVER_OPTION_EXPLICIT_VHOSTS;
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
info.fd_limit_per_thread = 1 + 1 + 1;
info.extensions = extensions;
info.pss_policies_json = "policy.json"; /* literal JSON, or path */
cx = lws_create_context(&info);
if (!cx) {
lwsl_err("lws init failed\n");
return 1;
}
if (lws_ss_create(cx, 0, &ssi_binance, NULL, NULL, NULL, NULL)) {
lwsl_cx_err(cx, "failed to create secure stream");
interrupted = 1;
}
while (n >= 0 && !interrupted)
n = lws_service(cx, 0);
lws_context_destroy(cx);
lwsl_user("Completed\n");
return 0;
}

View file

@ -0,0 +1,38 @@
{
"release": "01234567",
"product": "myproduct",
"schema-version": 1,
"retry": [{
"default": {
"backoff": [1000, 2000, 3000, 4000, 5000],
"conceal": 65535,
"jitterpc": 20,
"svalidping": 30,
"svalidhup": 35
}
}],
"certs": [{
"digicert_global_root": "MIIDrzCCApegAwIBAgIQCDvgVpBCRrGhdWrJWZHHSjANBgkqhkiG9w0BAQUFADBhMQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBDQTAeFw0wNjExMTAwMDAwMDBaFw0zMTExMTAwMDAwMDBaMGExCzAJBgNVBAYTAlVTMRUwEwYDVQQKEwxEaWdpQ2VydCBJbmMxGTAXBgNVBAsTEHd3dy5kaWdpY2VydC5jb20xIDAeBgNVBAMTF0RpZ2lDZXJ0IEdsb2JhbCBSb290IENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4jvhEXLeqKTTo1eqUKKPC3eQyaKl7hLOllsBCSDMAZOnTjC3U/dDxGkAV53ijSLdhwZAAIEJzs4bg7/fzTtxRuLWZscFs3YnFo97nh6Vfe63SKMI2tavegw5BmV/Sl0fvBf4q77uKNd0f3p4mVmFaG5cIzJLv07A6Fpt43C/dxC//AH2hdmoRBBYMql1GNXRor5H4idq9Joz+EkIYIvUX7Q6hL+hqkpMfT7PT19sdl6gSzeRntwi5m3OFBqOasv+zbMUZBfHWymeMr/y7vrTC0LUq7dBMtoM1O/4gdW7jVg/tRvoSSiicNoxBN33shbyTApOB6jtSj1etX+jkMOvJwIDAQABo2MwYTAOBgNVHQ8BAf8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUA95QNVbRTLtm8KPiGxvDl7I90VUwHwYDVR0jBBgwFoAUA95QNVbRTLtm8KPiGxvDl7I90VUwDQYJKoZIhvcNAQEFBQADggEBAMucN6pIExIK+t1EnE9SsPTfrgT1eXkIoyQY/EsrhMAtudXH/vTBH1jLuG2cenTnmCmrEbXjcKChzUyImZOMkXDiqw8cvpOp/2PV5Adg06O/nVsJ8dWO41P0jmP6P6fbtGbfYmbW0W5BjfIttep3Sp+dWOIrWcBAI+0tKIJFPnlUkiaY4IBIqDfv8NZ5YBberOgOzW6sRBc4L0na4UU+Krk2U886UAb3LujEV0lsYSEY1QSteDwsOoBrp+uvFRTp2InBuThs4pFsiv9kuXclVzDAGySj4dzp30d8tbQkCAUw7C29C79Fv1C5qfPrmAESrciIxpg0X40KPMbp1ZWVbd4="
}
],
"trust_stores": [{
"name": "digicert",
"stack": ["digicert_global_root"]
}
],
"s": [
{ "binance": {
"endpoint": "fstream.binance.com",
"port": 443,
"protocol": "ws",
"http_url": "/stream?streams=btcusdt@depth@0ms/btcusdt@bookTicker/btcusdt@aggTrade",
"nailed_up": true,
"ws_prioritize_reads": true,
"tls": true,
"tls_trust_store": "digicert",
"retry": "default"
}
}
]
}