From c11e31547f423af70ef70f848cf928b4dd36e868 Mon Sep 17 00:00:00 2001 From: Andy Green Date: Sat, 9 Oct 2021 10:58:21 +0100 Subject: [PATCH] ss: introduce sinks --- .../libwebsockets/lws-secure-streams-policy.h | 8 +- include/libwebsockets/lws-secure-streams.h | 3 + lib/core/context.c | 5 + lib/core/private-lib-core.h | 16 ++ lib/secure-streams/policy-json.c | 45 +++- .../private-lib-secure-streams.h | 13 +- lib/secure-streams/secure-streams.c | 195 ++++++++++++++++-- .../sink/hello_world/CMakeLists.txt | 52 +++++ minimal-examples/sink/hello_world/README.md | 87 ++++++++ .../sink/hello_world/example-policy.json | 21 ++ minimal-examples/sink/hello_world/main.c | 99 +++++++++ minimal-examples/sink/hello_world/ss-sink.c | 97 +++++++++ minimal-examples/sink/hello_world/ss-source.c | 103 +++++++++ 13 files changed, 722 insertions(+), 22 deletions(-) create mode 100644 minimal-examples/sink/hello_world/CMakeLists.txt create mode 100644 minimal-examples/sink/hello_world/README.md create mode 100644 minimal-examples/sink/hello_world/example-policy.json create mode 100644 minimal-examples/sink/hello_world/main.c create mode 100644 minimal-examples/sink/hello_world/ss-sink.c create mode 100644 minimal-examples/sink/hello_world/ss-source.c diff --git a/include/libwebsockets/lws-secure-streams-policy.h b/include/libwebsockets/lws-secure-streams-policy.h index 3e1c9f8b1..1e9668f77 100644 --- a/include/libwebsockets/lws-secure-streams-policy.h +++ b/include/libwebsockets/lws-secure-streams-policy.h @@ -234,6 +234,10 @@ typedef struct lws_ss_policy { const lws_metric_policy_t *metrics; /* linked-list of metric policies */ const lws_ss_auth_t *auth; /* NULL or auth object we bind to */ +#if defined(LWS_WITH_SERVER) + const struct lws_protocol_vhost_options *pvo; +#endif + /* protocol-specific connection policy details */ union { @@ -331,6 +335,9 @@ typedef struct lws_ss_policy { const lws_retry_bo_t *retry_bo; /**< retry policy to use */ + int32_t txc; + int32_t txc_peer; + uint32_t proxy_buflen; /**< max dsh alloc for proxy */ uint32_t proxy_buflen_rxflow_on_above; uint32_t proxy_buflen_rxflow_off_below; @@ -339,7 +346,6 @@ typedef struct lws_ss_policy { uint32_t client_buflen_rxflow_on_above; uint32_t client_buflen_rxflow_off_below; - uint32_t timeout_ms; /**< default message response * timeout in ms */ uint32_t flags; /**< stream attribute flags */ diff --git a/include/libwebsockets/lws-secure-streams.h b/include/libwebsockets/lws-secure-streams.h index 771fd4175..ee26e6d49 100644 --- a/include/libwebsockets/lws-secure-streams.h +++ b/include/libwebsockets/lws-secure-streams.h @@ -172,6 +172,9 @@ enum { LWSSSINFLAGS_ACCEPTED = (1 << 3), /**< Set on the accepted object copy of the ssi / info to indicate that * we are an accepted connection from a server's listening socket */ + LWSSSINFLAGS_ACCEPTED_SINK = (1 << 4), + /**< Set on the accepted object copy of the ssi / info to indicate that + * we are an accepted connection from a local sink */ }; typedef lws_ss_state_return_t (*lws_sscb_rx)(void *userobj, const uint8_t *buf, diff --git a/lib/core/context.c b/lib/core/context.c index 84a5e337a..202a57ac6 100644 --- a/lib/core/context.c +++ b/lib/core/context.c @@ -807,6 +807,11 @@ lws_create_context(const struct lws_context_creation_info *info) #endif #endif +#if defined(LWS_WITH_SERVER) + context->lcg[LWSLCG_WSI_SSP_SINK].tag_prefix = "SSsink"; + context->lcg[LWSLCG_WSI_SSP_SOURCE].tag_prefix = "SSsrc"; +#endif + #if defined(LWS_WITH_SECURE_STREAMS_STATIC_POLICY_ONLY) /* directly use the user-provided policy object list */ context->pss_policies = info->pss_policies; diff --git a/lib/core/private-lib-core.h b/lib/core/private-lib-core.h index 4c73957a0..45edd4959 100644 --- a/lib/core/private-lib-core.h +++ b/lib/core/private-lib-core.h @@ -411,10 +411,23 @@ enum { #endif #endif +#if defined(LWS_WITH_SERVER) + LWSLCG_WSI_SSP_SINK, /* accepted sink conn */ + LWSLCG_WSI_SSP_SOURCE, /* accepted source conn */ +#endif + /* always last */ LWSLCG_COUNT }; +#if defined(LWS_WITH_SECURE_STREAMS) && defined(LWS_WITH_SERVER) +typedef struct lws_ss_sinks { + lws_dll2_t list; + lws_ss_info_t info; + lws_dll2_owner_t accepts; +} lws_ss_sinks_t; +#endif + /* * the rest is managed per-context, that includes * @@ -668,6 +681,9 @@ struct lws_context { #endif const lws_ss_policy_t *pss_policies; const lws_ss_auth_t *pss_auths; +#if defined(LWS_WITH_SERVER) + lws_dll2_owner_t sinks; +#endif #if defined(LWS_WITH_SSPLUGINS) const lws_ss_plugin_t **pss_plugins; #endif diff --git a/lib/secure-streams/policy-json.c b/lib/secure-streams/policy-json.c index a207b92de..f033b45ad 100644 --- a/lib/secure-streams/policy-json.c +++ b/lib/secure-streams/policy-json.c @@ -105,6 +105,7 @@ static const char * const lejp_tokens_policy[] = { "s[].*.ws_subprotocol", "s[].*.ws_binary", "s[].*.local_sink", + "s[].*.options[].*", "s[].*.server", "s[].*.server_cert", "s[].*.server_key", @@ -213,6 +214,7 @@ typedef enum { LSSPPT_WS_SUBPROTOCOL, LSSPPT_WS_BINARY, LSSPPT_LOCAL_SINK, + LSSPPT_OPTIONS, LSSPPT_SERVER, LSSPPT_SERVER_CERT, LSSPPT_SERVER_KEY, @@ -311,6 +313,10 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason) lws_ss_trust_store_t *ts; lws_ss_metadata_t *pmd; lws_ss_x509_t *x, **py; +#if defined(LWS_WITH_SERVER) + struct lws_protocol_vhost_options *pvo; + const char *pvo_name; +#endif lws_ss_policy_t *p2; lws_retry_bo_t *b; size_t inl, outl; @@ -318,8 +324,8 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason) backoff_t *bot; int n = -1; -// lwsl_debug("%s: %d %d %s\n", __func__, reason, ctx->path_match - 1, -// ctx->path); + // lwsl_notice("%s: %d %d %s %s\n", __func__, reason, ctx->path_match - 1, + // ctx->path, ctx->buf); switch (ctx->path_match - 1) { case LSSPPT_RETRY: @@ -385,6 +391,11 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason) return 0; } + if (reason == LEJPCB_ARRAY_END && + ctx->path_match - 1 == LSSPPT_OPTIONS && + a->pvosp) + a->pvosp--; + if (reason == LEJPCB_OBJECT_END && a->p) { /* * Allocate a just-the-right-size buf for the cert DER now @@ -596,6 +607,36 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason) goto string2; #endif + case LSSPPT_OPTIONS: +#if defined(LWS_WITH_SERVER) + pvo_name = ctx->path + ctx->st[ctx->sp - 2].p + 1; + pvo = lwsac_use(&a->ac, sizeof(*pvo) + strlen(pvo_name) + 1 + + ctx->npos + 1, POL_AC_GRAIN); + if (!pvo) + goto oom; + + pvo->name = (const char *)&pvo[1]; + pvo->value = pvo->name + strlen(pvo_name) + 1; + memcpy((char *)pvo->name, pvo_name, strlen(pvo_name) + 1); + memcpy((char *)pvo->value, ctx->buf, ctx->npos); + *((char *)&pvo->value[ctx->npos]) = '\0'; + pvo->next = NULL; + pvo->options = NULL; + + if (!a->curr[LTY_POLICY].p->pvo) + a->curr[LTY_POLICY].p->pvo = pvo; + + /* for now we just support one level of options */ + + // lwsl_notice("%s: lv %d, %s=%s\n", __func__, a->pvosp, + // pvo->name, pvo->value); + + if (a->pvostack[a->pvosp]) + a->pvostack[a->pvosp]->next = pvo; + a->pvostack[a->pvosp] = pvo; +#endif + break; + case LSSPPT_SERVER_CERT: case LSSPPT_SERVER_KEY: diff --git a/lib/secure-streams/private-lib-secure-streams.h b/lib/secure-streams/private-lib-secure-streams.h index 2af93c36c..97c69428c 100644 --- a/lib/secure-streams/private-lib-secure-streams.h +++ b/lib/secure-streams/private-lib-secure-streams.h @@ -71,7 +71,7 @@ typedef struct lws_ss_handle { lws_fi_ctx_t fic; /**< Fault Injection context */ #endif - struct lws_dll2_owner src_list; /**< sink's list of bound sources */ + struct lws_dll2_owner src_list; /**< server's list of bound sources */ struct lws_context *context; /**< lws context we are created on */ const lws_ss_policy_t *policy; /**< system policy for stream */ @@ -91,9 +91,11 @@ typedef struct lws_ss_handle { #if defined(LWS_WITH_CONMON) char *conmon_json; #endif - - //struct lws_ss_handle *h_sink; /**< sink we are bound to, or NULL */ - //void *sink_obj;/**< sink's private object representing us */ +#if defined(LWS_WITH_SERVER) + lws_dll2_t sink_bind; /* if bound to / owned by a sink */ + lws_sorted_usec_list_t sul_txreq; /* pending tx req to peer */ + struct lws_ss_handle *sink_local_bind; /* nonproxy sink peer */ +#endif lws_sorted_usec_list_t sul_timeout; lws_sorted_usec_list_t sul; @@ -468,12 +470,15 @@ struct policy_cb_args { lws_ss_http_respmap_t respmap[16]; + struct lws_protocol_vhost_options *pvostack[4]; + union u heads[_LTY_COUNT]; union u curr[_LTY_COUNT]; uint8_t *p; int count; + int pvosp; char pending_respmap; uint8_t parse_data:1; diff --git a/lib/secure-streams/secure-streams.c b/lib/secure-streams/secure-streams.c index 5c3daa37c..ad4931686 100644 --- a/lib/secure-streams/secure-streams.c +++ b/lib/secure-streams/secure-streams.c @@ -711,12 +711,14 @@ _lws_ss_client_connect(lws_ss_handle_t *h, int is_retry, void *conn_if_sspc_onw) return LWSSSSRET_OK; } +#if defined(LWS_WITH_SERVER) /* * We are already bound to a sink? */ -// if (h->h_sink) -// return 0; + if (h->sink_local_bind) + return 0; +#endif if (!is_retry) h->retry = 0; @@ -1001,6 +1003,9 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi, const lws_ss_policy_t *pol; lws_ss_state_return_t r; lws_ss_metadata_t *smd; +#if defined(LWS_WITH_SERVER) + lws_ss_sinks_t *sn; +#endif lws_ss_handle_t *h; size_t size; void **v; @@ -1045,8 +1050,9 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi, } #endif -#if 0 +#if defined(LWS_WITH_SERVER) if (ssi->flags & LWSSSINFLAGS_REGISTER_SINK) { + /* * This can register a secure streams sink as well as normal * secure streams connections. If that's what's happening, @@ -1064,12 +1070,20 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi, return 1; } - } else { - if (!(pol->flags & LWSSSPOLF_LOCAL_SINK)) { + sn = lws_zalloc(sizeof(*sn), __func__); + if (!sn) + return 1; - } -// lws_dll2_foreach_safe(&pt->ss_owner, NULL, lws_ss_destroy_dll); + sn->info = *ssi; + sn->info.flags = (uint8_t)((sn->info.flags & + ~(LWSSSINFLAGS_REGISTER_SINK)) | + LWSSSINFLAGS_ACCEPTED_SINK); + lws_dll2_add_tail(&sn->list, &context->sinks); + + lwsl_cx_notice(context, "registered sink %s", ssi->streamtype); + + return 0; } #endif @@ -1093,15 +1107,23 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi, h->lc.log_cx = context->log_cx; + n = LWSLCG_WSI_SS_CLIENT; +#if defined(LWS_WITH_SERVER) + if (pol->flags & LWSSSPOLF_LOCAL_SINK) { + if (ssi->flags & LWSSSINFLAGS_ACCEPTED_SINK) + n = LWSLCG_WSI_SSP_SINK; + else + n = LWSLCG_WSI_SSP_SOURCE; + } +#endif + if (ssi->sss_protocol_version) - __lws_lc_tag(context, &context->lcg[LWSLCG_WSI_SS_CLIENT], - &h->lc, "%s|v%u|%u", + __lws_lc_tag(context, &context->lcg[n], &h->lc, "%s|v%u|%u", ssi->streamtype ? ssi->streamtype : "nostreamtype", (unsigned int)ssi->sss_protocol_version, (unsigned int)ssi->client_pid); else - __lws_lc_tag(context, &context->lcg[LWSLCG_WSI_SS_CLIENT], - &h->lc, "%s", + __lws_lc_tag(context, &context->lcg[n], &h->lc, "%s", ssi->streamtype ? ssi->streamtype : "nostreamtype"); #if defined(LWS_WITH_SYS_FAULT_INJECTION) @@ -1113,6 +1135,67 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi, lws_fi_inherit_copy(&h->fic, &context->fic, "ss", ssi->streamtype); #endif +#if defined(LWS_WITH_SERVER) + if (pol->flags & LWSSSPOLF_LOCAL_SINK) { + + if (ssi->flags & LWSSSINFLAGS_ACCEPTED_SINK) { + /* + * We are recursing to create the accepted sink, do + * the binding while still in create so any downstream + * actions understand our situation from the start + */ + h->sink_local_bind = (struct lws_ss_handle *) + opaque_user_data; + h->sink_local_bind->sink_local_bind = h; + } else { + + /* we are creating an ss connected to a sink... find the sink */ + + lws_start_foreach_dll(struct lws_dll2 *, d, + lws_dll2_get_head(&context->sinks)) { + sn = lws_container_of(d, lws_ss_sinks_t, list); + + if (!strcmp(sn->info.streamtype, ssi->streamtype)) { + lws_ss_handle_t *has; + + /* + * How does the sink feel about us joining? + */ + + if (sn->info.state(h + 1, h, LWSSSCS_SINK_JOIN, + sn->accepts.count)) { + lwsl_ss_notice(h, "sink rejected"); + goto fail_creation; + } + + /* + * Recurse to instantiate an accepted sink SS + * for us to bind to... pass bind source handle + * in as opaque data + */ + + if (lws_ss_create(context, tsi, &sn->info, + h, &has, NULL, NULL)) { + lwsl_ss_err(h, "sink accept failed"); + goto fail_creation; + } + + lws_dll2_add_tail(&has->sink_bind, &sn->accepts); + + lwsl_ss_notice(h, "bound to sink"); + break; + } + + } lws_end_foreach_dll(d); + + if (!h->sink_local_bind) { + lwsl_cx_err(context, "no sink %s", ssi->streamtype); + goto fail_creation; + } + } + } +#endif + h->info = *ssi; h->policy = pol; h->context = context; @@ -1338,9 +1421,18 @@ extant: lws_fi(&h->fic, "ss_create_destroy_me")) goto fail_creation; + n = 0; #if defined(LWS_WITH_SYS_SMD) if (!(ssi->flags & LWSSSINFLAGS_PROXIED) && - pol == &pol_smd) { + pol == &pol_smd) + n = 1; +#endif +#if defined(LWS_WITH_SERVER) + if (h->sink_local_bind) + n = 1; +#endif + + if (n) { r = lws_ss_event_helper(h, LWSSSCS_CONNECTING); if (r || lws_fi(&h->fic, "ss_create_smd_1")) goto fail_creation; @@ -1348,9 +1440,11 @@ extant: if (r || lws_fi(&h->fic, "ss_create_smd_2")) goto fail_creation; } -#endif - if (!(ssi->flags & LWSSSINFLAGS_REGISTER_SINK) && + if ( +#if defined(LWS_WITH_SERVER) + !h->sink_local_bind && +#endif ((h->policy->flags & LWSSSPOLF_NAILED_UP) #if defined(LWS_WITH_SYS_SMD) || ((h->policy == &pol_smd) //&& @@ -1381,6 +1475,9 @@ fail_creation: if (ppss) *ppss = NULL; +#if defined(LWS_WITH_SERVER) + lws_dll2_remove(&h->sink_bind); +#endif lws_ss_destroy(&h); return 1; @@ -1398,6 +1495,7 @@ lws_ss_destroy(lws_ss_handle_t **ppss) struct lws_context_per_thread *pt; #if defined(LWS_WITH_SERVER) struct lws_vhost *v = NULL; + lws_ss_handle_t *hlb; #endif lws_ss_handle_t *h = *ppss; lws_ss_metadata_t *pmd; @@ -1445,6 +1543,10 @@ lws_ss_destroy(lws_ss_handle_t **ppss) lws_set_timeout(h->wsi, 1, LWS_TO_KILL_SYNC); } +#if defined(LWS_WITH_SERVER) + lws_dll2_remove(&h->sink_bind); +#endif + /* * if we bound an smd registration to the SS, unregister it */ @@ -1471,9 +1573,17 @@ lws_ss_destroy(lws_ss_handle_t **ppss) lws_vfs_file_close(&h->fop_fd); #endif #if defined(LWS_WITH_SERVER) - lws_dll2_remove(&h->cli_list); + lws_dll2_remove(&h->cli_list); + lws_dll2_remove(&h->sink_bind); + lws_sul_cancel(&h->sul_txreq); + hlb = h->sink_local_bind; + if (hlb) { + h->sink_local_bind = NULL; + lws_ss_destroy(&hlb); + } #endif lws_dll2_remove(&h->to_list); + lws_sul_cancel(&h->sul_timeout); /* @@ -1611,6 +1721,46 @@ lws_ss_server_foreach_client(struct lws_ss_handle *h, lws_sssfec_cb cb, } lws_end_foreach_dll_safe(d, d1); } + +/* + * Deal with tx requests between source and accepted sink... h is the guy who + * requested the write + */ + +static void +lws_ss_sink_txreq_cb(lws_sorted_usec_list_t *sul) +{ + struct lws_ss_handle *h = lws_container_of(sul, struct lws_ss_handle, + sul_txreq); + uint8_t buf[1380 + LWS_PRE]; + size_t size = sizeof(buf) - LWS_PRE; + lws_ss_state_return_t r; + int flags = 0; + + /* !!! just let writes happen for now */ + + assert(h->sink_local_bind); + + /* collect the source tx */ + r = h->info.tx(h + 1, 0, buf + LWS_PRE, &size, &flags); + switch (r) { + case LWSSSSRET_OK: + if (!h->sink_local_bind->info.rx) { + lwsl_ss_warn(h->sink_local_bind, "No RX cb"); + break; + } + r = h->sink_local_bind->info.rx(&h->sink_local_bind[1], + buf + LWS_PRE, size, flags); + _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, NULL, + &h->sink_local_bind); + break; + case LWSSSSRET_TX_DONT_SEND: + break; + default: + _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, NULL, &h); + break; + } +} #endif lws_ss_state_return_t @@ -1647,6 +1797,21 @@ _lws_ss_request_tx(lws_ss_handle_t *h) if (h->policy->flags & LWSSSPOLF_SERVER) return LWSSSSRET_OK; +#if defined(LWS_WITH_SERVER) + if (h->sink_local_bind) { + /* + * We are bound to a local sink / source + */ + + lwsl_ss_notice(h->sink_local_bind, "Req tx"); + + lws_sul_schedule(h->context, 0, &h->sink_local_bind->sul_txreq, + lws_ss_sink_txreq_cb, 1); + + return LWSSSSRET_OK; + } +#endif + /* * there's currently no wsi / connection associated with the ss handle */ diff --git a/minimal-examples/sink/hello_world/CMakeLists.txt b/minimal-examples/sink/hello_world/CMakeLists.txt new file mode 100644 index 000000000..f85a1fd82 --- /dev/null +++ b/minimal-examples/sink/hello_world/CMakeLists.txt @@ -0,0 +1,52 @@ +project(lws-minimal-ss-sink-hello_world C) +cmake_minimum_required(VERSION 2.8.12) +find_package(libwebsockets CONFIG REQUIRED) +list(APPEND CMAKE_MODULE_PATH ${LWS_CMAKE_DIR}) +include(CheckCSourceCompiles) +include(LwsCheckRequirements) + +set(SRCS main.c ss-sink.c ss-source.c) + +set(requirements 1) +require_lws_config(LWS_ROLE_H1 1 requirements) +require_lws_config(LWS_WITH_SERVER 1 requirements) +require_lws_config(LWS_WITH_SYS_SMD 1 requirements) +require_lws_config(LWS_WITH_SECURE_STREAMS 1 requirements) +require_lws_config(LWS_WITH_SECURE_STREAMS_STATIC_POLICY_ONLY 0 requirements) + +require_lws_config(LWS_WITH_SECURE_STREAMS_PROXY_API 1 has_ss_proxy) + +if (requirements) + add_executable(${PROJECT_NAME} ${SRCS}) + + if (websockets_shared) + target_link_libraries(${PROJECT_NAME} + websockets_shared + ${LIBWEBSOCKETS_DEP_LIBS}) + add_dependencies(${PROJECT_NAME} + websockets_shared) + else() + target_link_libraries(${PROJECT_NAME} + websockets + ${LIBWEBSOCKETS_DEP_LIBS}) + endif() + + if (HAS_LWS_WITH_SECURE_STREAMS_PROXY_API OR has_ss_proxy OR + LWS_WITH_SECURE_STREAMS_PROXY_API) + + add_compile_options(-DLWS_SS_USE_SSPC) + add_executable(${PROJECT_NAME}-client ${SRCS}) + + if (websockets_shared) + target_link_libraries(${PROJECT_NAME}-client + websockets_shared + ${LIBWEBSOCKETS_DEP_LIBS}) + add_dependencies(${PROJECT_NAME}-client + websockets_shared) + else() + target_link_libraries(${PROJECT_NAME}-client + websockets + ${LIBWEBSOCKETS_DEP_LIBS}) + endif() + endif() +endif() \ No newline at end of file diff --git a/minimal-examples/sink/hello_world/README.md b/minimal-examples/sink/hello_world/README.md new file mode 100644 index 000000000..b62f087fc --- /dev/null +++ b/minimal-examples/sink/hello_world/README.md @@ -0,0 +1,87 @@ +# lws minimal secure streams sink hello_world + +This example shows how to register your own SS as a "sink", that is a "server" +that handles a given streamtype locally. + +User code just creates its own SS of that streamtype as usual, if it is marked +in the policy as a local_sink + +``` + "local_sink": true, +``` + +and the sink was registered, then the SS is fulfilled by the sink rather than +directly making onward connections. This lets you, eg handle streams completely +locally, or intercept or store-and-forward their content to the cloud using +another SS when convenient. + +Like any server, when you connect to it, at the server-side it creates an +"accepted" sink SS specific to the connection, which is closed when the incoming +connection closes. + +The example shows how to register the sink, and create a normal SS of the same +streamtype name. The source then sends a "hello_world" message to the sink +instance, and the sink instance responds with a message back to the source. + +In this example, the source getting the ack message makes us exit with a +success return. + +Sinks can be registered where the policy is in your system, either directly if +there is no proxying, or at the proxy process when there is. + +## build + +``` + $ cmake . && make +``` + +## usage + +Commandline option|Meaning +---|--- +-d |Debug verbosity in decimal, eg, -d15 + +``` +[2021/10/11 06:25:54:8413] U: LWS Secure Streams Sink hello_world +[2021/10/11 06:25:54:8757] N: LWS: 4.3.99-v4.3.0-19-g9da508e91b, NET CLI SRV H1 H2 WS MQTT SS-JSON-POL SSPROX ConMon IPv6-absent +[2021/10/11 06:25:54:8876] N: ++ [1819937|wsi|0|pipe] (1) +[2021/10/11 06:25:54:8920] N: ++ [1819937|vh|0|netlink] (1) +[2021/10/11 06:25:55:0233] N: ++ [1819937|vh|1|_ss_default||-1] (2) +[2021/10/11 06:25:55:4867] N: lws_ss_create: registered sink sink_hello_world +[2021/10/11 06:25:55:4881] N: ++ [1819937|SSsrc|0|sink_hello_world] (1) +[2021/10/11 06:25:55:4890] N: ++ [1819937|SSsink|0|sink_hello_world] (1) +[2021/10/11 06:25:55:4907] N: [1819937|SSsink|0|sink_hello_world]: lws_ss_check_next_state_ss: (unset) -> LWSSSCS_CREATING +[2021/10/11 06:25:55:4929] N: [1819937|SSsink|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CREATING -> LWSSSCS_CONNECTING +[2021/10/11 06:25:55:4930] N: [1819937|SSsink|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CONNECTING -> LWSSSCS_CONNECTED +[2021/10/11 06:25:55:4935] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_create: bound to sink +[2021/10/11 06:25:55:4940] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_check_next_state_ss: (unset) -> LWSSSCS_CREATING +[2021/10/11 06:25:55:4942] N: myss_src_state: CREATING +[2021/10/11 06:25:55:4945] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CREATING -> LWSSSCS_CONNECTING +[2021/10/11 06:25:55:4946] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CONNECTING -> LWSSSCS_CONNECTED +[2021/10/11 06:25:55:4950] N: [1819937|SSsink|0|sink_hello_world]: _lws_ss_request_tx: Req tx +[2021/10/11 06:25:55:4962] U: [1819937|SSsrc|0|sink_hello_world]: myss_src_tx: TX 39, flags 0x3, r 0 +[2021/10/11 06:25:55:4969] N: [1819937|SSsink|0|sink_hello_world]: myss_sink_rx: len 39, flags 0x3 +[2021/10/11 06:25:55:4974] N: +[2021/10/11 06:25:55:4987] N: 0000: 46 72 6F 6D 20 53 6F 75 72 63 65 3A 20 48 65 6C From Source: Hel +[2021/10/11 06:25:55:4989] N: 0010: 6C 6F 20 57 6F 72 6C 64 3A 20 31 38 35 35 37 30 lo World: 185570 +[2021/10/11 06:25:55:4993] N: 0020: 37 36 35 36 33 31 38 7656318 +[2021/10/11 06:25:55:4995] N: +[2021/10/11 06:25:55:4999] N: [1819937|SSsrc|0|sink_hello_world]: _lws_ss_request_tx: Req tx +[2021/10/11 06:25:55:5009] U: [1819937|SSsink|0|sink_hello_world]: myss_sink_tx: TX 37, flags 0x3, r 0 +[2021/10/11 06:25:55:5012] N: [1819937|SSsrc|0|sink_hello_world]: myss_src_rx: len 37, flags 0x3 +[2021/10/11 06:25:55:5014] N: +[2021/10/11 06:25:55:5015] N: 0000: 46 72 6F 6D 20 53 69 6E 6B 3A 20 48 65 6C 6C 6F From Sink: Hello +[2021/10/11 06:25:55:5015] N: 0010: 20 57 6F 72 6C 64 3A 20 31 38 35 35 37 30 37 36 World: 18557076 +[2021/10/11 06:25:55:5016] N: 0020: 36 31 33 32 34 61324 +[2021/10/11 06:25:55:5016] N: +[2021/10/11 06:25:55:5166] N: -- [1819937|wsi|0|pipe] (0) 628.987ms +[2021/10/11 06:25:55:5177] N: -- [1819937|vh|0|netlink] (1) 625.669ms +[2021/10/11 06:25:55:5205] N: [1819937|SSsink|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CONNECTED -> LWSSSCS_DISCONNECTED +[2021/10/11 06:25:55:5207] N: [1819937|SSsink|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_DISCONNECTED -> LWSSSCS_DESTROYING +[2021/10/11 06:25:55:5211] N: -- [1819937|SSsink|0|sink_hello_world] (0) 32.090ms +[2021/10/11 06:25:55:5215] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CONNECTED -> LWSSSCS_DISCONNECTED +[2021/10/11 06:25:55:5215] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_DISCONNECTED -> LWSSSCS_DESTROYING +[2021/10/11 06:25:55:5216] N: -- [1819937|SSsrc|0|sink_hello_world] (0) 33.434ms +[2021/10/11 06:25:55:5431] N: -- [1819937|vh|1|_ss_default||-1] (0) 519.819ms +[2021/10/11 06:25:55:5487] U: Completed: OK (seen expected 0) +``` \ No newline at end of file diff --git a/minimal-examples/sink/hello_world/example-policy.json b/minimal-examples/sink/hello_world/example-policy.json new file mode 100644 index 000000000..8e78e61d4 --- /dev/null +++ b/minimal-examples/sink/hello_world/example-policy.json @@ -0,0 +1,21 @@ +{ + "release":"01234567", + "product":"myproduct", + "schema-version":1, + "retry": [{"default": {"backoff": [1000,2000,3000,5000,10000],"conceal":5,"jitterpc":20,"svalidping":300,"svalidhup":310}}], + "certs": [{ + "isrg_root_x1": "MIIFazCCA1OgAwIBAgIRAIIQz7DSQONZRGPgu2OCiwAwDQYJKoZIhvcNAQELBQAwTzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2VhcmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMTUwNjA0MTEwNDM4WhcNMzUwNjA0MTEwNDM4WjBPMQswCQYDVQQGEwJVUzEpMCcGA1UEChMgSW50ZXJuZXQgU2VjdXJpdHkgUmVzZWFyY2ggR3JvdXAxFTATBgNVBAMTDElTUkcgUm9vdCBYMTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAK3oJHP0FDfzm54rVygch77ct984kIxuPOZXoHj3dcKi/vVqbvYATyjb3miGbESTtrFj/RQSa78f0uoxmyF+0TM8ukj13Xnfs7j/EvEhmkvBioZxaUpmZmyPfjxwv60pIgbz5MDmgK7iS4+3mX6UA5/TR5d8mUgjU+g4rk8Kb4Mu0UlXjIB0ttov0DiNewNwIRt18jA8+o+u3dpjq+sWT8KOEUt+zwvo/7V3LvSye0rgTBIlDHCNAymg4VMk7BPZ7hm/ELNKjD+Jo2FR3qyHB5T0Y3HsLuJvW5iB4YlcNHlsdu87kGJ55tukmi8mxdAQ4Q7e2RCOFvu396j3x+UCB5iPNgiV5+I3lg02dZ77DnKxHZu8A/lJBdiB3QW0KtZB6awBdpUKD9jf1b0SHzUvKBds0pjBqAlkd25HN7rOrFleaJ1/ctaJxQZBKT5ZPt0m9STJEadao0xAH0ahmbWnOlFuhjuefXKnEgV4We0+UXgVCwOPjdAvBbI+e0ocS3MFEvzG6uBQE3xDk3SzynTnjh8BCNAw1FtxNrQHusEwMFxIt4I7mKZ9YIqioymCzLq9gwQbooMDQaHWBfEbwrbwqHyGO0aoSCqI3Haadr8faqU9GY/rOPNk3sgrDQoo//fb4hVC1CLQJ13hef4Y53CIrU7m2Ys6xt0nUW7/vGT1M0NPAgMBAAGjQjBAMA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBR5tFnme7bl5AFzgAiIyBpY9umbbjANBgkqhkiG9w0BAQsFAAOCAgEAVR9YqbyyqFDQDLHYGmkgJykIrGF1XIpu+ILlaS/V9lZLubhzEFnTIZd+50xx+7LSYK05qAvqFyFWhfFQDlnrzuBZ6brJFe+GnY+EgPbk6ZGQ3BebYhtF8GaV0nxvwuo77x/Py9auJ/GpsMiu/X1+mvoiBOv/2X/qkSsisRcOj/KKNFtY2PwByVS5uCbMiogziUwthDyC3+6WVwW6LLv3xLfHTjuCvjHIInNzktHCgKQ5ORAzI4JMPJ+GslWYHb4phowim57iaztXOoJwTdwJx4nLCgdNbOhdjsnvzqvHu7UrTkXWStAmzOVyyghqpZXjFaH3pO3JLF+l+/+sKAIuvtd7u+Nxe5AW0wdeRlN8NwdCjNPElpzVmbUq4JUagEiuTDkHzsxHpFKVK7q4+63SM1N95R1NbdWhscdCb+ZAJzVcoyi3B43njTOQ5yOf+1CceWxG1bQVs5ZufpsMljq4Ui0/1lvh+wjChP4kqKOJ2qxq4RgqsahDYVvTH9w7jXbyLeiNdd8XM2w9U/t7y0Ff/9yi0GE44Za4rF2LN9d11TPAmRGunUHBcnWEvgJBQl9nJEiU0Zsnvgc/ubhPgXRR4Xq37Z0j4r7g1SgEEzwxA57demyPxgcYxn/eR44/KJ4EBs+lVDR3veyJm+kXQ99b21/+jh5Xos1AnX5iItreGCc=" + } +], + "trust_stores": [{ + "name": "le_via_isrg", + "stack": [ "isrg_root_x1" ] + }], + "s": [ + { "sink_hello_world": { + "local_sink": true + } + } + ] +} + diff --git a/minimal-examples/sink/hello_world/main.c b/minimal-examples/sink/hello_world/main.c new file mode 100644 index 000000000..59594a56f --- /dev/null +++ b/minimal-examples/sink/hello_world/main.c @@ -0,0 +1,99 @@ +/* + * lws-minimal-ss-sink-hello_world + * + * Written in 2010-2021 by Andy Green + * + * This file is made available under the Creative Commons CC0 1.0 + * Universal Public Domain Dedication. + * + * Simple example registers an SS "sink", it's a streamtype with server-type + * semantics you can subsequently bind to by creating SS with the same + * streamtype name. + * + * The user code doesn't know if it is being fulfilled by a sink locally, via a + * SS proxy, or talking to a remote peer, the policy decides it. + * + * In the example, we register the sink, then create a source SS. This also + * instantiates an accepted sink SS bound to the source. + * + * The source sends a message to the accepted sink instance, and that returns + * a message acknowledging it. + */ + +#include +#include + +extern const lws_ss_info_t ssi_myss_sink_t, ssi_myss_src_t; + +static struct lws_context *cx; +int test_result = 1, multipart; + +static int +smd_cb(void *opaque, lws_smd_class_t c, lws_usec_t ts, void *buf, size_t len) +{ + if (!(c & LWSSMDCL_SYSTEM_STATE)) + return 0; + + if (lws_json_simple_strcmp(buf, len, "\"state\":", "OPERATIONAL")) + return 0; + + /* + * Register our example sink + */ + + if (lws_ss_create(cx, 0, &ssi_myss_sink_t, NULL, NULL, NULL, NULL)) { + lwsl_err("%s: unable to register sink\n", __func__); + + return -1; + } + + /* + * Create our example source (which also instantiates an accepted + * sink) + */ + + if (lws_ss_create(cx, 0, &ssi_myss_src_t, NULL, NULL, NULL, NULL)) { + lwsl_err("%s: unable to register src\n", __func__); + + return -1; + } + + return 0; +} + +static void +sigint_handler(int sig) +{ + lws_default_loop_exit(cx); +} + +int +main(int argc, const char **argv) +{ + struct lws_context_creation_info *info = malloc(sizeof(*info)); + + if (!info) + return -1; + + lws_context_info_defaults(info, "example-policy.json"); + lws_cmdline_option_handle_builtin(argc, argv, info); + signal(SIGINT, sigint_handler); + + lwsl_user("LWS Secure Streams Sink hello_world\n"); + + info->early_smd_cb = smd_cb; + info->early_smd_class_filter = LWSSMDCL_SYSTEM_STATE; + + cx = lws_create_context(info); + free(info); + if (!cx) { + lwsl_err("lws init failed\n"); + return 1; + } + + lws_context_default_loop_run_destroy(cx); + + /* process ret 0 if actual is as expected (0, or--expected-exit 123) */ + + return lws_cmdline_passfail(argc, argv, test_result); +} diff --git a/minimal-examples/sink/hello_world/ss-sink.c b/minimal-examples/sink/hello_world/ss-sink.c new file mode 100644 index 000000000..05fb30c7f --- /dev/null +++ b/minimal-examples/sink/hello_world/ss-sink.c @@ -0,0 +1,97 @@ +/* + * lws-minimal-ss-sink-hello_world + * + * Written in 2010-2021 by Andy Green + * + * This file is made available under the Creative Commons CC0 1.0 + * Universal Public Domain Dedication. + * + * Simple SS sink + */ + +#include + +LWS_SS_USER_TYPEDEF + char payload[200]; + size_t size; + size_t pos; +} myss_sink_t; + +static lws_ss_state_return_t +myss_sink_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len, + int *flags) +{ + myss_sink_t *g = (myss_sink_t *)userobj; + lws_ss_state_return_t r = LWSSSSRET_OK; + + if (g->size == g->pos) + return LWSSSSRET_TX_DONT_SEND; + + if (*len > g->size - g->pos) + *len = g->size - g->pos; + + if (!g->pos) + *flags |= LWSSS_FLAG_SOM; + + memcpy(buf, g->payload + g->pos, *len); + g->pos += *len; + + if (g->pos != g->size) /* more to do */ + r = lws_ss_request_tx(lws_ss_from_user(g)); + else + *flags |= LWSSS_FLAG_EOM; + + lwsl_ss_user(lws_ss_from_user(g), "TX %zu, flags 0x%x, r %d", *len, + (unsigned int)*flags, (int)r); + + return r; +} + +static lws_ss_state_return_t +myss_sink_rx(void *userobj, const uint8_t *buf, size_t len, int flags) +{ + myss_sink_t *g = (myss_sink_t *)userobj; + + lwsl_ss_notice(lws_ss_from_user(g), "len %u, flags 0x%x", + (unsigned int)len, + (unsigned int)flags); + lwsl_hexdump_notice(buf, len); + + if (flags & LWSSS_FLAG_EOM) { + /* we're going to respond to it */ + + g->size = (size_t)lws_snprintf(g->payload, sizeof(g->payload), + "From Sink: Hello World: %lu", + (unsigned long)lws_now_usecs()); + g->pos = 0; + + return lws_ss_request_tx_len(lws_ss_from_user(g), + (unsigned long)g->size); + } + + return LWSSSSRET_OK; +} + +static lws_ss_state_return_t +myss_sink_state(void *userobj, void *sh, lws_ss_constate_t state, + lws_ss_tx_ordinal_t ack) +{ + myss_sink_t *g = (myss_sink_t *)userobj; + + switch ((int)state) { + case LWSSSCS_CREATING: + return lws_ss_request_tx(lws_ss_from_user(g)); + + case LWSSSCS_SERVER_TXN: + break; + } + + return LWSSSSRET_OK; +} + +LWS_SS_INFO("sink_hello_world", myss_sink_t) + .tx = myss_sink_tx, + .rx = myss_sink_rx, + .state = myss_sink_state, + .flags = LWSSSINFLAGS_REGISTER_SINK +}; diff --git a/minimal-examples/sink/hello_world/ss-source.c b/minimal-examples/sink/hello_world/ss-source.c new file mode 100644 index 000000000..d86646575 --- /dev/null +++ b/minimal-examples/sink/hello_world/ss-source.c @@ -0,0 +1,103 @@ +/* + * lws-minimal-ss-sink-hello_world + * + * Written in 2010-2021 by Andy Green + * + * This file is made available under the Creative Commons CC0 1.0 + * Universal Public Domain Dedication. + * + * Simple SS source... it's just a normal SS user implementation, it does not + * have any dependency on the policy routing it to a sink instead of, eg, + * wss to a cloud endpoint. + */ + +#include + +extern int test_result; + +LWS_SS_USER_TYPEDEF + char payload[200]; + size_t size; + size_t pos; +} myss_src_t; + +static lws_ss_state_return_t +myss_src_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len, + int *flags) +{ + myss_src_t *g = (myss_src_t *)userobj; + lws_ss_state_return_t r = LWSSSSRET_OK; + + if (g->size == g->pos) + return LWSSSSRET_TX_DONT_SEND; + + if (*len > g->size - g->pos) + *len = g->size - g->pos; + + if (!g->pos) + *flags |= LWSSS_FLAG_SOM; + + memcpy(buf, g->payload + g->pos, *len); + g->pos += *len; + + if (g->pos != g->size) /* more to do */ + r = lws_ss_request_tx(lws_ss_from_user(g)); + else + *flags |= LWSSS_FLAG_EOM; + + lwsl_ss_user(lws_ss_from_user(g), "TX %zu, flags 0x%x, r %d", *len, + (unsigned int)*flags, (int)r); + + return r; +} + +static lws_ss_state_return_t +myss_src_rx(void *userobj, const uint8_t *buf, size_t len, int flags) +{ + myss_src_t *g = (myss_src_t *)userobj; + + lwsl_ss_notice(lws_ss_from_user(g), "len %u, flags 0x%x", + (unsigned int)len, + (unsigned int)flags); + lwsl_hexdump_notice(buf, len); + + /* + * In this example, we take getting the sink's ack of our message + * as meaning "success". + */ + + test_result = 0; + lws_default_loop_exit(lws_ss_cx_from_user(g)); + + return LWSSSSRET_OK; +} + +static lws_ss_state_return_t +myss_src_state(void *userobj, void *sh, lws_ss_constate_t state, + lws_ss_tx_ordinal_t ack) +{ + myss_src_t *g = (myss_src_t *)userobj; + + switch ((int)state) { + case LWSSSCS_CREATING: + lwsl_notice("%s: CREATING\n", __func__); + return lws_ss_request_tx(lws_ss_from_user(g)); + + case LWSSSCS_CONNECTED: + g->size = (size_t)lws_snprintf(g->payload, sizeof(g->payload), + "From Source: Hello World: %lu", + (unsigned long)lws_now_usecs()); + g->pos = 0; + + return lws_ss_request_tx_len(lws_ss_from_user(g), + (unsigned long)g->size); + } + + return LWSSSSRET_OK; +} + +LWS_SS_INFO("sink_hello_world", myss_src_t) + .tx = myss_src_tx, + .rx = myss_src_rx, + .state = myss_src_state, +};