1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-30 00:00:16 +01:00

ss: introduce sinks

This commit is contained in:
Andy Green 2021-10-09 10:58:21 +01:00
parent c24148826b
commit c11e31547f
13 changed files with 722 additions and 22 deletions

View file

@ -234,6 +234,10 @@ typedef struct lws_ss_policy {
const lws_metric_policy_t *metrics; /* linked-list of metric policies */ const lws_metric_policy_t *metrics; /* linked-list of metric policies */
const lws_ss_auth_t *auth; /* NULL or auth object we bind to */ const lws_ss_auth_t *auth; /* NULL or auth object we bind to */
#if defined(LWS_WITH_SERVER)
const struct lws_protocol_vhost_options *pvo;
#endif
/* protocol-specific connection policy details */ /* protocol-specific connection policy details */
union { union {
@ -331,6 +335,9 @@ typedef struct lws_ss_policy {
const lws_retry_bo_t *retry_bo; /**< retry policy to use */ const lws_retry_bo_t *retry_bo; /**< retry policy to use */
int32_t txc;
int32_t txc_peer;
uint32_t proxy_buflen; /**< max dsh alloc for proxy */ uint32_t proxy_buflen; /**< max dsh alloc for proxy */
uint32_t proxy_buflen_rxflow_on_above; uint32_t proxy_buflen_rxflow_on_above;
uint32_t proxy_buflen_rxflow_off_below; uint32_t proxy_buflen_rxflow_off_below;
@ -339,7 +346,6 @@ typedef struct lws_ss_policy {
uint32_t client_buflen_rxflow_on_above; uint32_t client_buflen_rxflow_on_above;
uint32_t client_buflen_rxflow_off_below; uint32_t client_buflen_rxflow_off_below;
uint32_t timeout_ms; /**< default message response uint32_t timeout_ms; /**< default message response
* timeout in ms */ * timeout in ms */
uint32_t flags; /**< stream attribute flags */ uint32_t flags; /**< stream attribute flags */

View file

@ -172,6 +172,9 @@ enum {
LWSSSINFLAGS_ACCEPTED = (1 << 3), LWSSSINFLAGS_ACCEPTED = (1 << 3),
/**< Set on the accepted object copy of the ssi / info to indicate that /**< Set on the accepted object copy of the ssi / info to indicate that
* we are an accepted connection from a server's listening socket */ * we are an accepted connection from a server's listening socket */
LWSSSINFLAGS_ACCEPTED_SINK = (1 << 4),
/**< Set on the accepted object copy of the ssi / info to indicate that
* we are an accepted connection from a local sink */
}; };
typedef lws_ss_state_return_t (*lws_sscb_rx)(void *userobj, const uint8_t *buf, typedef lws_ss_state_return_t (*lws_sscb_rx)(void *userobj, const uint8_t *buf,

View file

@ -807,6 +807,11 @@ lws_create_context(const struct lws_context_creation_info *info)
#endif #endif
#endif #endif
#if defined(LWS_WITH_SERVER)
context->lcg[LWSLCG_WSI_SSP_SINK].tag_prefix = "SSsink";
context->lcg[LWSLCG_WSI_SSP_SOURCE].tag_prefix = "SSsrc";
#endif
#if defined(LWS_WITH_SECURE_STREAMS_STATIC_POLICY_ONLY) #if defined(LWS_WITH_SECURE_STREAMS_STATIC_POLICY_ONLY)
/* directly use the user-provided policy object list */ /* directly use the user-provided policy object list */
context->pss_policies = info->pss_policies; context->pss_policies = info->pss_policies;

View file

@ -411,10 +411,23 @@ enum {
#endif #endif
#endif #endif
#if defined(LWS_WITH_SERVER)
LWSLCG_WSI_SSP_SINK, /* accepted sink conn */
LWSLCG_WSI_SSP_SOURCE, /* accepted source conn */
#endif
/* always last */ /* always last */
LWSLCG_COUNT LWSLCG_COUNT
}; };
#if defined(LWS_WITH_SECURE_STREAMS) && defined(LWS_WITH_SERVER)
typedef struct lws_ss_sinks {
lws_dll2_t list;
lws_ss_info_t info;
lws_dll2_owner_t accepts;
} lws_ss_sinks_t;
#endif
/* /*
* the rest is managed per-context, that includes * the rest is managed per-context, that includes
* *
@ -668,6 +681,9 @@ struct lws_context {
#endif #endif
const lws_ss_policy_t *pss_policies; const lws_ss_policy_t *pss_policies;
const lws_ss_auth_t *pss_auths; const lws_ss_auth_t *pss_auths;
#if defined(LWS_WITH_SERVER)
lws_dll2_owner_t sinks;
#endif
#if defined(LWS_WITH_SSPLUGINS) #if defined(LWS_WITH_SSPLUGINS)
const lws_ss_plugin_t **pss_plugins; const lws_ss_plugin_t **pss_plugins;
#endif #endif

View file

@ -105,6 +105,7 @@ static const char * const lejp_tokens_policy[] = {
"s[].*.ws_subprotocol", "s[].*.ws_subprotocol",
"s[].*.ws_binary", "s[].*.ws_binary",
"s[].*.local_sink", "s[].*.local_sink",
"s[].*.options[].*",
"s[].*.server", "s[].*.server",
"s[].*.server_cert", "s[].*.server_cert",
"s[].*.server_key", "s[].*.server_key",
@ -213,6 +214,7 @@ typedef enum {
LSSPPT_WS_SUBPROTOCOL, LSSPPT_WS_SUBPROTOCOL,
LSSPPT_WS_BINARY, LSSPPT_WS_BINARY,
LSSPPT_LOCAL_SINK, LSSPPT_LOCAL_SINK,
LSSPPT_OPTIONS,
LSSPPT_SERVER, LSSPPT_SERVER,
LSSPPT_SERVER_CERT, LSSPPT_SERVER_CERT,
LSSPPT_SERVER_KEY, LSSPPT_SERVER_KEY,
@ -311,6 +313,10 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
lws_ss_trust_store_t *ts; lws_ss_trust_store_t *ts;
lws_ss_metadata_t *pmd; lws_ss_metadata_t *pmd;
lws_ss_x509_t *x, **py; lws_ss_x509_t *x, **py;
#if defined(LWS_WITH_SERVER)
struct lws_protocol_vhost_options *pvo;
const char *pvo_name;
#endif
lws_ss_policy_t *p2; lws_ss_policy_t *p2;
lws_retry_bo_t *b; lws_retry_bo_t *b;
size_t inl, outl; size_t inl, outl;
@ -318,8 +324,8 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
backoff_t *bot; backoff_t *bot;
int n = -1; int n = -1;
// lwsl_debug("%s: %d %d %s\n", __func__, reason, ctx->path_match - 1, // lwsl_notice("%s: %d %d %s %s\n", __func__, reason, ctx->path_match - 1,
// ctx->path); // ctx->path, ctx->buf);
switch (ctx->path_match - 1) { switch (ctx->path_match - 1) {
case LSSPPT_RETRY: case LSSPPT_RETRY:
@ -385,6 +391,11 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
return 0; return 0;
} }
if (reason == LEJPCB_ARRAY_END &&
ctx->path_match - 1 == LSSPPT_OPTIONS &&
a->pvosp)
a->pvosp--;
if (reason == LEJPCB_OBJECT_END && a->p) { if (reason == LEJPCB_OBJECT_END && a->p) {
/* /*
* Allocate a just-the-right-size buf for the cert DER now * Allocate a just-the-right-size buf for the cert DER now
@ -596,6 +607,36 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
goto string2; goto string2;
#endif #endif
case LSSPPT_OPTIONS:
#if defined(LWS_WITH_SERVER)
pvo_name = ctx->path + ctx->st[ctx->sp - 2].p + 1;
pvo = lwsac_use(&a->ac, sizeof(*pvo) + strlen(pvo_name) + 1 +
ctx->npos + 1, POL_AC_GRAIN);
if (!pvo)
goto oom;
pvo->name = (const char *)&pvo[1];
pvo->value = pvo->name + strlen(pvo_name) + 1;
memcpy((char *)pvo->name, pvo_name, strlen(pvo_name) + 1);
memcpy((char *)pvo->value, ctx->buf, ctx->npos);
*((char *)&pvo->value[ctx->npos]) = '\0';
pvo->next = NULL;
pvo->options = NULL;
if (!a->curr[LTY_POLICY].p->pvo)
a->curr[LTY_POLICY].p->pvo = pvo;
/* for now we just support one level of options */
// lwsl_notice("%s: lv %d, %s=%s\n", __func__, a->pvosp,
// pvo->name, pvo->value);
if (a->pvostack[a->pvosp])
a->pvostack[a->pvosp]->next = pvo;
a->pvostack[a->pvosp] = pvo;
#endif
break;
case LSSPPT_SERVER_CERT: case LSSPPT_SERVER_CERT:
case LSSPPT_SERVER_KEY: case LSSPPT_SERVER_KEY:

View file

@ -71,7 +71,7 @@ typedef struct lws_ss_handle {
lws_fi_ctx_t fic; /**< Fault Injection context */ lws_fi_ctx_t fic; /**< Fault Injection context */
#endif #endif
struct lws_dll2_owner src_list; /**< sink's list of bound sources */ struct lws_dll2_owner src_list; /**< server's list of bound sources */
struct lws_context *context; /**< lws context we are created on */ struct lws_context *context; /**< lws context we are created on */
const lws_ss_policy_t *policy; /**< system policy for stream */ const lws_ss_policy_t *policy; /**< system policy for stream */
@ -91,9 +91,11 @@ typedef struct lws_ss_handle {
#if defined(LWS_WITH_CONMON) #if defined(LWS_WITH_CONMON)
char *conmon_json; char *conmon_json;
#endif #endif
#if defined(LWS_WITH_SERVER)
//struct lws_ss_handle *h_sink; /**< sink we are bound to, or NULL */ lws_dll2_t sink_bind; /* if bound to / owned by a sink */
//void *sink_obj;/**< sink's private object representing us */ lws_sorted_usec_list_t sul_txreq; /* pending tx req to peer */
struct lws_ss_handle *sink_local_bind; /* nonproxy sink peer */
#endif
lws_sorted_usec_list_t sul_timeout; lws_sorted_usec_list_t sul_timeout;
lws_sorted_usec_list_t sul; lws_sorted_usec_list_t sul;
@ -468,12 +470,15 @@ struct policy_cb_args {
lws_ss_http_respmap_t respmap[16]; lws_ss_http_respmap_t respmap[16];
struct lws_protocol_vhost_options *pvostack[4];
union u heads[_LTY_COUNT]; union u heads[_LTY_COUNT];
union u curr[_LTY_COUNT]; union u curr[_LTY_COUNT];
uint8_t *p; uint8_t *p;
int count; int count;
int pvosp;
char pending_respmap; char pending_respmap;
uint8_t parse_data:1; uint8_t parse_data:1;

View file

@ -711,12 +711,14 @@ _lws_ss_client_connect(lws_ss_handle_t *h, int is_retry, void *conn_if_sspc_onw)
return LWSSSSRET_OK; return LWSSSSRET_OK;
} }
#if defined(LWS_WITH_SERVER)
/* /*
* We are already bound to a sink? * We are already bound to a sink?
*/ */
// if (h->h_sink) if (h->sink_local_bind)
// return 0; return 0;
#endif
if (!is_retry) if (!is_retry)
h->retry = 0; h->retry = 0;
@ -1001,6 +1003,9 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
const lws_ss_policy_t *pol; const lws_ss_policy_t *pol;
lws_ss_state_return_t r; lws_ss_state_return_t r;
lws_ss_metadata_t *smd; lws_ss_metadata_t *smd;
#if defined(LWS_WITH_SERVER)
lws_ss_sinks_t *sn;
#endif
lws_ss_handle_t *h; lws_ss_handle_t *h;
size_t size; size_t size;
void **v; void **v;
@ -1045,8 +1050,9 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
} }
#endif #endif
#if 0 #if defined(LWS_WITH_SERVER)
if (ssi->flags & LWSSSINFLAGS_REGISTER_SINK) { if (ssi->flags & LWSSSINFLAGS_REGISTER_SINK) {
/* /*
* This can register a secure streams sink as well as normal * This can register a secure streams sink as well as normal
* secure streams connections. If that's what's happening, * secure streams connections. If that's what's happening,
@ -1064,12 +1070,20 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
return 1; return 1;
} }
} else {
if (!(pol->flags & LWSSSPOLF_LOCAL_SINK)) { sn = lws_zalloc(sizeof(*sn), __func__);
if (!sn)
return 1;
} sn->info = *ssi;
// lws_dll2_foreach_safe(&pt->ss_owner, NULL, lws_ss_destroy_dll); sn->info.flags = (uint8_t)((sn->info.flags &
~(LWSSSINFLAGS_REGISTER_SINK)) |
LWSSSINFLAGS_ACCEPTED_SINK);
lws_dll2_add_tail(&sn->list, &context->sinks);
lwsl_cx_notice(context, "registered sink %s", ssi->streamtype);
return 0;
} }
#endif #endif
@ -1093,15 +1107,23 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
h->lc.log_cx = context->log_cx; h->lc.log_cx = context->log_cx;
n = LWSLCG_WSI_SS_CLIENT;
#if defined(LWS_WITH_SERVER)
if (pol->flags & LWSSSPOLF_LOCAL_SINK) {
if (ssi->flags & LWSSSINFLAGS_ACCEPTED_SINK)
n = LWSLCG_WSI_SSP_SINK;
else
n = LWSLCG_WSI_SSP_SOURCE;
}
#endif
if (ssi->sss_protocol_version) if (ssi->sss_protocol_version)
__lws_lc_tag(context, &context->lcg[LWSLCG_WSI_SS_CLIENT], __lws_lc_tag(context, &context->lcg[n], &h->lc, "%s|v%u|%u",
&h->lc, "%s|v%u|%u",
ssi->streamtype ? ssi->streamtype : "nostreamtype", ssi->streamtype ? ssi->streamtype : "nostreamtype",
(unsigned int)ssi->sss_protocol_version, (unsigned int)ssi->sss_protocol_version,
(unsigned int)ssi->client_pid); (unsigned int)ssi->client_pid);
else else
__lws_lc_tag(context, &context->lcg[LWSLCG_WSI_SS_CLIENT], __lws_lc_tag(context, &context->lcg[n], &h->lc, "%s",
&h->lc, "%s",
ssi->streamtype ? ssi->streamtype : "nostreamtype"); ssi->streamtype ? ssi->streamtype : "nostreamtype");
#if defined(LWS_WITH_SYS_FAULT_INJECTION) #if defined(LWS_WITH_SYS_FAULT_INJECTION)
@ -1113,6 +1135,67 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
lws_fi_inherit_copy(&h->fic, &context->fic, "ss", ssi->streamtype); lws_fi_inherit_copy(&h->fic, &context->fic, "ss", ssi->streamtype);
#endif #endif
#if defined(LWS_WITH_SERVER)
if (pol->flags & LWSSSPOLF_LOCAL_SINK) {
if (ssi->flags & LWSSSINFLAGS_ACCEPTED_SINK) {
/*
* We are recursing to create the accepted sink, do
* the binding while still in create so any downstream
* actions understand our situation from the start
*/
h->sink_local_bind = (struct lws_ss_handle *)
opaque_user_data;
h->sink_local_bind->sink_local_bind = h;
} else {
/* we are creating an ss connected to a sink... find the sink */
lws_start_foreach_dll(struct lws_dll2 *, d,
lws_dll2_get_head(&context->sinks)) {
sn = lws_container_of(d, lws_ss_sinks_t, list);
if (!strcmp(sn->info.streamtype, ssi->streamtype)) {
lws_ss_handle_t *has;
/*
* How does the sink feel about us joining?
*/
if (sn->info.state(h + 1, h, LWSSSCS_SINK_JOIN,
sn->accepts.count)) {
lwsl_ss_notice(h, "sink rejected");
goto fail_creation;
}
/*
* Recurse to instantiate an accepted sink SS
* for us to bind to... pass bind source handle
* in as opaque data
*/
if (lws_ss_create(context, tsi, &sn->info,
h, &has, NULL, NULL)) {
lwsl_ss_err(h, "sink accept failed");
goto fail_creation;
}
lws_dll2_add_tail(&has->sink_bind, &sn->accepts);
lwsl_ss_notice(h, "bound to sink");
break;
}
} lws_end_foreach_dll(d);
if (!h->sink_local_bind) {
lwsl_cx_err(context, "no sink %s", ssi->streamtype);
goto fail_creation;
}
}
}
#endif
h->info = *ssi; h->info = *ssi;
h->policy = pol; h->policy = pol;
h->context = context; h->context = context;
@ -1338,9 +1421,18 @@ extant:
lws_fi(&h->fic, "ss_create_destroy_me")) lws_fi(&h->fic, "ss_create_destroy_me"))
goto fail_creation; goto fail_creation;
n = 0;
#if defined(LWS_WITH_SYS_SMD) #if defined(LWS_WITH_SYS_SMD)
if (!(ssi->flags & LWSSSINFLAGS_PROXIED) && if (!(ssi->flags & LWSSSINFLAGS_PROXIED) &&
pol == &pol_smd) { pol == &pol_smd)
n = 1;
#endif
#if defined(LWS_WITH_SERVER)
if (h->sink_local_bind)
n = 1;
#endif
if (n) {
r = lws_ss_event_helper(h, LWSSSCS_CONNECTING); r = lws_ss_event_helper(h, LWSSSCS_CONNECTING);
if (r || lws_fi(&h->fic, "ss_create_smd_1")) if (r || lws_fi(&h->fic, "ss_create_smd_1"))
goto fail_creation; goto fail_creation;
@ -1348,9 +1440,11 @@ extant:
if (r || lws_fi(&h->fic, "ss_create_smd_2")) if (r || lws_fi(&h->fic, "ss_create_smd_2"))
goto fail_creation; goto fail_creation;
} }
#endif
if (!(ssi->flags & LWSSSINFLAGS_REGISTER_SINK) && if (
#if defined(LWS_WITH_SERVER)
!h->sink_local_bind &&
#endif
((h->policy->flags & LWSSSPOLF_NAILED_UP) ((h->policy->flags & LWSSSPOLF_NAILED_UP)
#if defined(LWS_WITH_SYS_SMD) #if defined(LWS_WITH_SYS_SMD)
|| ((h->policy == &pol_smd) //&& || ((h->policy == &pol_smd) //&&
@ -1381,6 +1475,9 @@ fail_creation:
if (ppss) if (ppss)
*ppss = NULL; *ppss = NULL;
#if defined(LWS_WITH_SERVER)
lws_dll2_remove(&h->sink_bind);
#endif
lws_ss_destroy(&h); lws_ss_destroy(&h);
return 1; return 1;
@ -1398,6 +1495,7 @@ lws_ss_destroy(lws_ss_handle_t **ppss)
struct lws_context_per_thread *pt; struct lws_context_per_thread *pt;
#if defined(LWS_WITH_SERVER) #if defined(LWS_WITH_SERVER)
struct lws_vhost *v = NULL; struct lws_vhost *v = NULL;
lws_ss_handle_t *hlb;
#endif #endif
lws_ss_handle_t *h = *ppss; lws_ss_handle_t *h = *ppss;
lws_ss_metadata_t *pmd; lws_ss_metadata_t *pmd;
@ -1445,6 +1543,10 @@ lws_ss_destroy(lws_ss_handle_t **ppss)
lws_set_timeout(h->wsi, 1, LWS_TO_KILL_SYNC); lws_set_timeout(h->wsi, 1, LWS_TO_KILL_SYNC);
} }
#if defined(LWS_WITH_SERVER)
lws_dll2_remove(&h->sink_bind);
#endif
/* /*
* if we bound an smd registration to the SS, unregister it * if we bound an smd registration to the SS, unregister it
*/ */
@ -1471,9 +1573,17 @@ lws_ss_destroy(lws_ss_handle_t **ppss)
lws_vfs_file_close(&h->fop_fd); lws_vfs_file_close(&h->fop_fd);
#endif #endif
#if defined(LWS_WITH_SERVER) #if defined(LWS_WITH_SERVER)
lws_dll2_remove(&h->cli_list); lws_dll2_remove(&h->cli_list);
lws_dll2_remove(&h->sink_bind);
lws_sul_cancel(&h->sul_txreq);
hlb = h->sink_local_bind;
if (hlb) {
h->sink_local_bind = NULL;
lws_ss_destroy(&hlb);
}
#endif #endif
lws_dll2_remove(&h->to_list); lws_dll2_remove(&h->to_list);
lws_sul_cancel(&h->sul_timeout); lws_sul_cancel(&h->sul_timeout);
/* /*
@ -1611,6 +1721,46 @@ lws_ss_server_foreach_client(struct lws_ss_handle *h, lws_sssfec_cb cb,
} lws_end_foreach_dll_safe(d, d1); } lws_end_foreach_dll_safe(d, d1);
} }
/*
* Deal with tx requests between source and accepted sink... h is the guy who
* requested the write
*/
static void
lws_ss_sink_txreq_cb(lws_sorted_usec_list_t *sul)
{
struct lws_ss_handle *h = lws_container_of(sul, struct lws_ss_handle,
sul_txreq);
uint8_t buf[1380 + LWS_PRE];
size_t size = sizeof(buf) - LWS_PRE;
lws_ss_state_return_t r;
int flags = 0;
/* !!! just let writes happen for now */
assert(h->sink_local_bind);
/* collect the source tx */
r = h->info.tx(h + 1, 0, buf + LWS_PRE, &size, &flags);
switch (r) {
case LWSSSSRET_OK:
if (!h->sink_local_bind->info.rx) {
lwsl_ss_warn(h->sink_local_bind, "No RX cb");
break;
}
r = h->sink_local_bind->info.rx(&h->sink_local_bind[1],
buf + LWS_PRE, size, flags);
_lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, NULL,
&h->sink_local_bind);
break;
case LWSSSSRET_TX_DONT_SEND:
break;
default:
_lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, NULL, &h);
break;
}
}
#endif #endif
lws_ss_state_return_t lws_ss_state_return_t
@ -1647,6 +1797,21 @@ _lws_ss_request_tx(lws_ss_handle_t *h)
if (h->policy->flags & LWSSSPOLF_SERVER) if (h->policy->flags & LWSSSPOLF_SERVER)
return LWSSSSRET_OK; return LWSSSSRET_OK;
#if defined(LWS_WITH_SERVER)
if (h->sink_local_bind) {
/*
* We are bound to a local sink / source
*/
lwsl_ss_notice(h->sink_local_bind, "Req tx");
lws_sul_schedule(h->context, 0, &h->sink_local_bind->sul_txreq,
lws_ss_sink_txreq_cb, 1);
return LWSSSSRET_OK;
}
#endif
/* /*
* there's currently no wsi / connection associated with the ss handle * there's currently no wsi / connection associated with the ss handle
*/ */

View file

@ -0,0 +1,52 @@
project(lws-minimal-ss-sink-hello_world C)
cmake_minimum_required(VERSION 2.8.12)
find_package(libwebsockets CONFIG REQUIRED)
list(APPEND CMAKE_MODULE_PATH ${LWS_CMAKE_DIR})
include(CheckCSourceCompiles)
include(LwsCheckRequirements)
set(SRCS main.c ss-sink.c ss-source.c)
set(requirements 1)
require_lws_config(LWS_ROLE_H1 1 requirements)
require_lws_config(LWS_WITH_SERVER 1 requirements)
require_lws_config(LWS_WITH_SYS_SMD 1 requirements)
require_lws_config(LWS_WITH_SECURE_STREAMS 1 requirements)
require_lws_config(LWS_WITH_SECURE_STREAMS_STATIC_POLICY_ONLY 0 requirements)
require_lws_config(LWS_WITH_SECURE_STREAMS_PROXY_API 1 has_ss_proxy)
if (requirements)
add_executable(${PROJECT_NAME} ${SRCS})
if (websockets_shared)
target_link_libraries(${PROJECT_NAME}
websockets_shared
${LIBWEBSOCKETS_DEP_LIBS})
add_dependencies(${PROJECT_NAME}
websockets_shared)
else()
target_link_libraries(${PROJECT_NAME}
websockets
${LIBWEBSOCKETS_DEP_LIBS})
endif()
if (HAS_LWS_WITH_SECURE_STREAMS_PROXY_API OR has_ss_proxy OR
LWS_WITH_SECURE_STREAMS_PROXY_API)
add_compile_options(-DLWS_SS_USE_SSPC)
add_executable(${PROJECT_NAME}-client ${SRCS})
if (websockets_shared)
target_link_libraries(${PROJECT_NAME}-client
websockets_shared
${LIBWEBSOCKETS_DEP_LIBS})
add_dependencies(${PROJECT_NAME}-client
websockets_shared)
else()
target_link_libraries(${PROJECT_NAME}-client
websockets
${LIBWEBSOCKETS_DEP_LIBS})
endif()
endif()
endif()

View file

@ -0,0 +1,87 @@
# lws minimal secure streams sink hello_world
This example shows how to register your own SS as a "sink", that is a "server"
that handles a given streamtype locally.
User code just creates its own SS of that streamtype as usual, if it is marked
in the policy as a local_sink
```
"local_sink": true,
```
and the sink was registered, then the SS is fulfilled by the sink rather than
directly making onward connections. This lets you, eg handle streams completely
locally, or intercept or store-and-forward their content to the cloud using
another SS when convenient.
Like any server, when you connect to it, at the server-side it creates an
"accepted" sink SS specific to the connection, which is closed when the incoming
connection closes.
The example shows how to register the sink, and create a normal SS of the same
streamtype name. The source then sends a "hello_world" message to the sink
instance, and the sink instance responds with a message back to the source.
In this example, the source getting the ack message makes us exit with a
success return.
Sinks can be registered where the policy is in your system, either directly if
there is no proxying, or at the proxy process when there is.
## build
```
$ cmake . && make
```
## usage
Commandline option|Meaning
---|---
-d <loglevel>|Debug verbosity in decimal, eg, -d15
```
[2021/10/11 06:25:54:8413] U: LWS Secure Streams Sink hello_world
[2021/10/11 06:25:54:8757] N: LWS: 4.3.99-v4.3.0-19-g9da508e91b, NET CLI SRV H1 H2 WS MQTT SS-JSON-POL SSPROX ConMon IPv6-absent
[2021/10/11 06:25:54:8876] N: ++ [1819937|wsi|0|pipe] (1)
[2021/10/11 06:25:54:8920] N: ++ [1819937|vh|0|netlink] (1)
[2021/10/11 06:25:55:0233] N: ++ [1819937|vh|1|_ss_default||-1] (2)
[2021/10/11 06:25:55:4867] N: lws_ss_create: registered sink sink_hello_world
[2021/10/11 06:25:55:4881] N: ++ [1819937|SSsrc|0|sink_hello_world] (1)
[2021/10/11 06:25:55:4890] N: ++ [1819937|SSsink|0|sink_hello_world] (1)
[2021/10/11 06:25:55:4907] N: [1819937|SSsink|0|sink_hello_world]: lws_ss_check_next_state_ss: (unset) -> LWSSSCS_CREATING
[2021/10/11 06:25:55:4929] N: [1819937|SSsink|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CREATING -> LWSSSCS_CONNECTING
[2021/10/11 06:25:55:4930] N: [1819937|SSsink|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CONNECTING -> LWSSSCS_CONNECTED
[2021/10/11 06:25:55:4935] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_create: bound to sink
[2021/10/11 06:25:55:4940] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_check_next_state_ss: (unset) -> LWSSSCS_CREATING
[2021/10/11 06:25:55:4942] N: myss_src_state: CREATING
[2021/10/11 06:25:55:4945] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CREATING -> LWSSSCS_CONNECTING
[2021/10/11 06:25:55:4946] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CONNECTING -> LWSSSCS_CONNECTED
[2021/10/11 06:25:55:4950] N: [1819937|SSsink|0|sink_hello_world]: _lws_ss_request_tx: Req tx
[2021/10/11 06:25:55:4962] U: [1819937|SSsrc|0|sink_hello_world]: myss_src_tx: TX 39, flags 0x3, r 0
[2021/10/11 06:25:55:4969] N: [1819937|SSsink|0|sink_hello_world]: myss_sink_rx: len 39, flags 0x3
[2021/10/11 06:25:55:4974] N:
[2021/10/11 06:25:55:4987] N: 0000: 46 72 6F 6D 20 53 6F 75 72 63 65 3A 20 48 65 6C From Source: Hel
[2021/10/11 06:25:55:4989] N: 0010: 6C 6F 20 57 6F 72 6C 64 3A 20 31 38 35 35 37 30 lo World: 185570
[2021/10/11 06:25:55:4993] N: 0020: 37 36 35 36 33 31 38 7656318
[2021/10/11 06:25:55:4995] N:
[2021/10/11 06:25:55:4999] N: [1819937|SSsrc|0|sink_hello_world]: _lws_ss_request_tx: Req tx
[2021/10/11 06:25:55:5009] U: [1819937|SSsink|0|sink_hello_world]: myss_sink_tx: TX 37, flags 0x3, r 0
[2021/10/11 06:25:55:5012] N: [1819937|SSsrc|0|sink_hello_world]: myss_src_rx: len 37, flags 0x3
[2021/10/11 06:25:55:5014] N:
[2021/10/11 06:25:55:5015] N: 0000: 46 72 6F 6D 20 53 69 6E 6B 3A 20 48 65 6C 6C 6F From Sink: Hello
[2021/10/11 06:25:55:5015] N: 0010: 20 57 6F 72 6C 64 3A 20 31 38 35 35 37 30 37 36 World: 18557076
[2021/10/11 06:25:55:5016] N: 0020: 36 31 33 32 34 61324
[2021/10/11 06:25:55:5016] N:
[2021/10/11 06:25:55:5166] N: -- [1819937|wsi|0|pipe] (0) 628.987ms
[2021/10/11 06:25:55:5177] N: -- [1819937|vh|0|netlink] (1) 625.669ms
[2021/10/11 06:25:55:5205] N: [1819937|SSsink|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CONNECTED -> LWSSSCS_DISCONNECTED
[2021/10/11 06:25:55:5207] N: [1819937|SSsink|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_DISCONNECTED -> LWSSSCS_DESTROYING
[2021/10/11 06:25:55:5211] N: -- [1819937|SSsink|0|sink_hello_world] (0) 32.090ms
[2021/10/11 06:25:55:5215] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_CONNECTED -> LWSSSCS_DISCONNECTED
[2021/10/11 06:25:55:5215] N: [1819937|SSsrc|0|sink_hello_world]: lws_ss_check_next_state_ss: LWSSSCS_DISCONNECTED -> LWSSSCS_DESTROYING
[2021/10/11 06:25:55:5216] N: -- [1819937|SSsrc|0|sink_hello_world] (0) 33.434ms
[2021/10/11 06:25:55:5431] N: -- [1819937|vh|1|_ss_default||-1] (0) 519.819ms
[2021/10/11 06:25:55:5487] U: Completed: OK (seen expected 0)
```

View file

@ -0,0 +1,21 @@
{
"release":"01234567",
"product":"myproduct",
"schema-version":1,
"retry": [{"default": {"backoff": [1000,2000,3000,5000,10000],"conceal":5,"jitterpc":20,"svalidping":300,"svalidhup":310}}],
"certs": [{
"isrg_root_x1": "MIIFazCCA1OgAwIBAgIRAIIQz7DSQONZRGPgu2OCiwAwDQYJKoZIhvcNAQELBQAwTzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2VhcmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMTUwNjA0MTEwNDM4WhcNMzUwNjA0MTEwNDM4WjBPMQswCQYDVQQGEwJVUzEpMCcGA1UEChMgSW50ZXJuZXQgU2VjdXJpdHkgUmVzZWFyY2ggR3JvdXAxFTATBgNVBAMTDElTUkcgUm9vdCBYMTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAK3oJHP0FDfzm54rVygch77ct984kIxuPOZXoHj3dcKi/vVqbvYATyjb3miGbESTtrFj/RQSa78f0uoxmyF+0TM8ukj13Xnfs7j/EvEhmkvBioZxaUpmZmyPfjxwv60pIgbz5MDmgK7iS4+3mX6UA5/TR5d8mUgjU+g4rk8Kb4Mu0UlXjIB0ttov0DiNewNwIRt18jA8+o+u3dpjq+sWT8KOEUt+zwvo/7V3LvSye0rgTBIlDHCNAymg4VMk7BPZ7hm/ELNKjD+Jo2FR3qyHB5T0Y3HsLuJvW5iB4YlcNHlsdu87kGJ55tukmi8mxdAQ4Q7e2RCOFvu396j3x+UCB5iPNgiV5+I3lg02dZ77DnKxHZu8A/lJBdiB3QW0KtZB6awBdpUKD9jf1b0SHzUvKBds0pjBqAlkd25HN7rOrFleaJ1/ctaJxQZBKT5ZPt0m9STJEadao0xAH0ahmbWnOlFuhjuefXKnEgV4We0+UXgVCwOPjdAvBbI+e0ocS3MFEvzG6uBQE3xDk3SzynTnjh8BCNAw1FtxNrQHusEwMFxIt4I7mKZ9YIqioymCzLq9gwQbooMDQaHWBfEbwrbwqHyGO0aoSCqI3Haadr8faqU9GY/rOPNk3sgrDQoo//fb4hVC1CLQJ13hef4Y53CIrU7m2Ys6xt0nUW7/vGT1M0NPAgMBAAGjQjBAMA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBR5tFnme7bl5AFzgAiIyBpY9umbbjANBgkqhkiG9w0BAQsFAAOCAgEAVR9YqbyyqFDQDLHYGmkgJykIrGF1XIpu+ILlaS/V9lZLubhzEFnTIZd+50xx+7LSYK05qAvqFyFWhfFQDlnrzuBZ6brJFe+GnY+EgPbk6ZGQ3BebYhtF8GaV0nxvwuo77x/Py9auJ/GpsMiu/X1+mvoiBOv/2X/qkSsisRcOj/KKNFtY2PwByVS5uCbMiogziUwthDyC3+6WVwW6LLv3xLfHTjuCvjHIInNzktHCgKQ5ORAzI4JMPJ+GslWYHb4phowim57iaztXOoJwTdwJx4nLCgdNbOhdjsnvzqvHu7UrTkXWStAmzOVyyghqpZXjFaH3pO3JLF+l+/+sKAIuvtd7u+Nxe5AW0wdeRlN8NwdCjNPElpzVmbUq4JUagEiuTDkHzsxHpFKVK7q4+63SM1N95R1NbdWhscdCb+ZAJzVcoyi3B43njTOQ5yOf+1CceWxG1bQVs5ZufpsMljq4Ui0/1lvh+wjChP4kqKOJ2qxq4RgqsahDYVvTH9w7jXbyLeiNdd8XM2w9U/t7y0Ff/9yi0GE44Za4rF2LN9d11TPAmRGunUHBcnWEvgJBQl9nJEiU0Zsnvgc/ubhPgXRR4Xq37Z0j4r7g1SgEEzwxA57demyPxgcYxn/eR44/KJ4EBs+lVDR3veyJm+kXQ99b21/+jh5Xos1AnX5iItreGCc="
}
],
"trust_stores": [{
"name": "le_via_isrg",
"stack": [ "isrg_root_x1" ]
}],
"s": [
{ "sink_hello_world": {
"local_sink": true
}
}
]
}

View file

@ -0,0 +1,99 @@
/*
* lws-minimal-ss-sink-hello_world
*
* Written in 2010-2021 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* Simple example registers an SS "sink", it's a streamtype with server-type
* semantics you can subsequently bind to by creating SS with the same
* streamtype name.
*
* The user code doesn't know if it is being fulfilled by a sink locally, via a
* SS proxy, or talking to a remote peer, the policy decides it.
*
* In the example, we register the sink, then create a source SS. This also
* instantiates an accepted sink SS bound to the source.
*
* The source sends a message to the accepted sink instance, and that returns
* a message acknowledging it.
*/
#include <libwebsockets.h>
#include <signal.h>
extern const lws_ss_info_t ssi_myss_sink_t, ssi_myss_src_t;
static struct lws_context *cx;
int test_result = 1, multipart;
static int
smd_cb(void *opaque, lws_smd_class_t c, lws_usec_t ts, void *buf, size_t len)
{
if (!(c & LWSSMDCL_SYSTEM_STATE))
return 0;
if (lws_json_simple_strcmp(buf, len, "\"state\":", "OPERATIONAL"))
return 0;
/*
* Register our example sink
*/
if (lws_ss_create(cx, 0, &ssi_myss_sink_t, NULL, NULL, NULL, NULL)) {
lwsl_err("%s: unable to register sink\n", __func__);
return -1;
}
/*
* Create our example source (which also instantiates an accepted
* sink)
*/
if (lws_ss_create(cx, 0, &ssi_myss_src_t, NULL, NULL, NULL, NULL)) {
lwsl_err("%s: unable to register src\n", __func__);
return -1;
}
return 0;
}
static void
sigint_handler(int sig)
{
lws_default_loop_exit(cx);
}
int
main(int argc, const char **argv)
{
struct lws_context_creation_info *info = malloc(sizeof(*info));
if (!info)
return -1;
lws_context_info_defaults(info, "example-policy.json");
lws_cmdline_option_handle_builtin(argc, argv, info);
signal(SIGINT, sigint_handler);
lwsl_user("LWS Secure Streams Sink hello_world\n");
info->early_smd_cb = smd_cb;
info->early_smd_class_filter = LWSSMDCL_SYSTEM_STATE;
cx = lws_create_context(info);
free(info);
if (!cx) {
lwsl_err("lws init failed\n");
return 1;
}
lws_context_default_loop_run_destroy(cx);
/* process ret 0 if actual is as expected (0, or--expected-exit 123) */
return lws_cmdline_passfail(argc, argv, test_result);
}

View file

@ -0,0 +1,97 @@
/*
* lws-minimal-ss-sink-hello_world
*
* Written in 2010-2021 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* Simple SS sink
*/
#include <libwebsockets.h>
LWS_SS_USER_TYPEDEF
char payload[200];
size_t size;
size_t pos;
} myss_sink_t;
static lws_ss_state_return_t
myss_sink_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len,
int *flags)
{
myss_sink_t *g = (myss_sink_t *)userobj;
lws_ss_state_return_t r = LWSSSSRET_OK;
if (g->size == g->pos)
return LWSSSSRET_TX_DONT_SEND;
if (*len > g->size - g->pos)
*len = g->size - g->pos;
if (!g->pos)
*flags |= LWSSS_FLAG_SOM;
memcpy(buf, g->payload + g->pos, *len);
g->pos += *len;
if (g->pos != g->size) /* more to do */
r = lws_ss_request_tx(lws_ss_from_user(g));
else
*flags |= LWSSS_FLAG_EOM;
lwsl_ss_user(lws_ss_from_user(g), "TX %zu, flags 0x%x, r %d", *len,
(unsigned int)*flags, (int)r);
return r;
}
static lws_ss_state_return_t
myss_sink_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
{
myss_sink_t *g = (myss_sink_t *)userobj;
lwsl_ss_notice(lws_ss_from_user(g), "len %u, flags 0x%x",
(unsigned int)len,
(unsigned int)flags);
lwsl_hexdump_notice(buf, len);
if (flags & LWSSS_FLAG_EOM) {
/* we're going to respond to it */
g->size = (size_t)lws_snprintf(g->payload, sizeof(g->payload),
"From Sink: Hello World: %lu",
(unsigned long)lws_now_usecs());
g->pos = 0;
return lws_ss_request_tx_len(lws_ss_from_user(g),
(unsigned long)g->size);
}
return LWSSSSRET_OK;
}
static lws_ss_state_return_t
myss_sink_state(void *userobj, void *sh, lws_ss_constate_t state,
lws_ss_tx_ordinal_t ack)
{
myss_sink_t *g = (myss_sink_t *)userobj;
switch ((int)state) {
case LWSSSCS_CREATING:
return lws_ss_request_tx(lws_ss_from_user(g));
case LWSSSCS_SERVER_TXN:
break;
}
return LWSSSSRET_OK;
}
LWS_SS_INFO("sink_hello_world", myss_sink_t)
.tx = myss_sink_tx,
.rx = myss_sink_rx,
.state = myss_sink_state,
.flags = LWSSSINFLAGS_REGISTER_SINK
};

View file

@ -0,0 +1,103 @@
/*
* lws-minimal-ss-sink-hello_world
*
* Written in 2010-2021 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* Simple SS source... it's just a normal SS user implementation, it does not
* have any dependency on the policy routing it to a sink instead of, eg,
* wss to a cloud endpoint.
*/
#include <libwebsockets.h>
extern int test_result;
LWS_SS_USER_TYPEDEF
char payload[200];
size_t size;
size_t pos;
} myss_src_t;
static lws_ss_state_return_t
myss_src_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len,
int *flags)
{
myss_src_t *g = (myss_src_t *)userobj;
lws_ss_state_return_t r = LWSSSSRET_OK;
if (g->size == g->pos)
return LWSSSSRET_TX_DONT_SEND;
if (*len > g->size - g->pos)
*len = g->size - g->pos;
if (!g->pos)
*flags |= LWSSS_FLAG_SOM;
memcpy(buf, g->payload + g->pos, *len);
g->pos += *len;
if (g->pos != g->size) /* more to do */
r = lws_ss_request_tx(lws_ss_from_user(g));
else
*flags |= LWSSS_FLAG_EOM;
lwsl_ss_user(lws_ss_from_user(g), "TX %zu, flags 0x%x, r %d", *len,
(unsigned int)*flags, (int)r);
return r;
}
static lws_ss_state_return_t
myss_src_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
{
myss_src_t *g = (myss_src_t *)userobj;
lwsl_ss_notice(lws_ss_from_user(g), "len %u, flags 0x%x",
(unsigned int)len,
(unsigned int)flags);
lwsl_hexdump_notice(buf, len);
/*
* In this example, we take getting the sink's ack of our message
* as meaning "success".
*/
test_result = 0;
lws_default_loop_exit(lws_ss_cx_from_user(g));
return LWSSSSRET_OK;
}
static lws_ss_state_return_t
myss_src_state(void *userobj, void *sh, lws_ss_constate_t state,
lws_ss_tx_ordinal_t ack)
{
myss_src_t *g = (myss_src_t *)userobj;
switch ((int)state) {
case LWSSSCS_CREATING:
lwsl_notice("%s: CREATING\n", __func__);
return lws_ss_request_tx(lws_ss_from_user(g));
case LWSSSCS_CONNECTED:
g->size = (size_t)lws_snprintf(g->payload, sizeof(g->payload),
"From Source: Hello World: %lu",
(unsigned long)lws_now_usecs());
g->pos = 0;
return lws_ss_request_tx_len(lws_ss_from_user(g),
(unsigned long)g->size);
}
return LWSSSSRET_OK;
}
LWS_SS_INFO("sink_hello_world", myss_src_t)
.tx = myss_src_tx,
.rx = myss_src_rx,
.state = myss_src_state,
};