1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

lws_smd: system message distribution

- Add low level system message distibution framework
 - Add support for local Secure Streams to participate using _lws_smd streamtype
 - Add apit test and minimal example
 - Add SS proxy support for _lws_smd

See minimal-secure-streams-smd README.md
This commit is contained in:
Andy Green 2020-06-24 20:15:46 +01:00
parent 30f3606b0e
commit 8eca7e17f2
38 changed files with 2180 additions and 77 deletions

View file

@ -141,6 +141,7 @@ if (NOT LWS_WITH_NETWORK)
set(LWS_ROLE_DBUS 0)
set(LWS_WITH_LWS_DSH 0)
set(LWS_WITH_THREADPOOL 0)
set(LWS_WITH_SYS_SMD 0)
endif()
if (LWS_WITH_CGI)
@ -376,5 +377,5 @@ if (LWS_WITH_SSL OR LWS_WITH_MBEDTLS)
endif()
if (NOT LWS_WITH_SSL)
set(LWS_WITHOUT_BUILTIN_SHA1 OFF PARENT_SCOPE)
set(LWS_WITHOUT_BUILTIN_SHA1 OFF)
endif()

View file

@ -115,6 +115,7 @@ option(LWS_WITH_SYS_DHCP_CLIENT "Build in tiny DHCP client" OFF)
option(LWS_WITH_HTTP_BASIC_AUTH "Support Basic Auth" ON)
option(LWS_WITH_HTTP_UNCOMMON_HEADERS "Include less common http header support" ON)
option(LWS_WITH_SYS_STATE "lws_system state support" ON)
option(LWS_WITH_SYS_SMD "Lws System Message Distribution" ON)
#
# Secure Streams

View file

@ -170,6 +170,7 @@
#cmakedefine LWS_WITH_SELFTESTS
#cmakedefine LWS_WITH_SEQUENCER
#cmakedefine LWS_WITH_SERVER_STATUS
#cmakedefine LWS_WITH_SYS_SMD
#cmakedefine LWS_WITH_SMTP
#cmakedefine LWS_WITH_SOCKS5
#cmakedefine LWS_WITH_STATEFUL_URLDECODE

View file

@ -348,10 +348,6 @@ struct lws_pollfd {
#define LWS_POLLIN (FD_READ | FD_ACCEPT)
#define LWS_POLLOUT (FD_WRITE)
#if !defined(pid_t)
#define pid_t int
#endif
#else
@ -535,6 +531,9 @@ struct lws;
#include <libwebsockets/lws-dll2.h>
#include <libwebsockets/lws-timeout-timer.h>
#if defined(LWS_WITH_SYS_SMD)
#include <libwebsockets/lws-smd.h>
#endif
#include <libwebsockets/lws-state.h>
#include <libwebsockets/lws-retry.h>
#include <libwebsockets/lws-adopt.h>
@ -547,7 +546,9 @@ struct lws;
#include <libwebsockets/lws-ws-ext.h>
#include <libwebsockets/lws-protocols-plugins.h>
#include <libwebsockets/lws-plugin-generic-sessions.h>
#include <libwebsockets/lws-context-vhost.h>
#if defined(LWS_ROLE_MQTT)
#include <libwebsockets/lws-mqtt.h>
#endif

View file

@ -799,6 +799,18 @@ struct lws_context_creation_info {
* to make disappear, in order to simulate and test udp retry flow */
#endif
#if defined(LWS_WITH_SYS_SMD)
lws_smd_notification_cb_t early_smd_cb;
/**< CONTEXT: NULL, or an smd notification callback that will be registered
* immediately after the smd in the context is initialized. This ensures
* you can get all notifications without having to intercept the event loop
* creation, eg, when using an event library. Other callbacks can be
* registered later manually without problems.
*/
void *early_smd_opaque;
lws_smd_class_t early_smd_class_filter;
#endif
/* Add new things just above here ---^
* This is part of the ABI, don't needlessly break compatibility
*

View file

@ -252,6 +252,20 @@ enum lws_ss_state_return_t {
* to create the requested stream.
*/
enum {
LWSSSINFLAGS_REGISTER_SINK = (1 << 0),
/**< If set, we're not creating a specific stream, but registering
* ourselves as the "sink" for .streamtype. It's analogous to saying
* we want to be the many-to-one "server" for .streamtype; when other
* streams are created with that streamtype, they should be forwarded
* to this stream owner, where they join and part from the sink via
* (*state) LWSSSCS_SINK_JOIN / _PART events, the new client handle
* being provided in the h_src parameter.
*/
LWSSSINFLAGS_PROXIED = (1 << 1),
/**< Set if the stream is being created as a stand-in at the proxy */
};
typedef struct lws_ss_info {
const char *streamtype; /**< type of stream we want to create */
size_t user_alloc; /**< size of user allocation */
@ -276,16 +290,13 @@ typedef struct lws_ss_info {
int manual_initial_tx_credit;
/**< 0 = manage any tx credit automatically, nonzero explicitly sets the
* peer stream to have the given amount of tx credit, if the protocol
* can support it. */
char register_sink;
/**< If set, we're not creating a specific stream, but registering
* ourselves as the "sink" for .streamtype. It's analogous to saying
* we want to be the many-to-one "server" for .streamtype; when other
* streams are created with that streamtype, they should be forwarded
* to this stream owner, where they join and part from the sink via
* (*state) LWSSSCS_SINK_JOIN / _PART events, the new client handle
* being provided in the h_src parameter.
*/
* can support it.
*
* In the special case of _lws_smd streamtype, this is used to indicate
* the connection's rx class mask.
* */
uint8_t flags;
} lws_ss_info_t;
/**

View file

@ -0,0 +1,175 @@
/*
* lws System Message Distribution
*
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#define LWS_SMD_MAX_PAYLOAD 384
#define LWS_SMD_CLASS_BITFIELD_BYTES 4
#define LWS_SMD_STREAMTYPENAME "_lws_smd"
#define LWS_SMD_SS_RX_HEADER_LEN 16
typedef uint32_t lws_smd_class_t;
struct lws_smd_msg; /* opaque */
struct lws_smd_peer; /* opaque */
/*
* Well-known device classes
*/
enum {
LWSSMDCL_INTERACTION = (1 << 0),
/**<
* Any kind of event indicating a user was interacting with the device,
* eg, press a button, touched the screen, lifted the device etc
*/
LWSSMDCL_SYSTEM_STATE = (1 << 1),
/**<
* The lws_system state changed, eg, to OPERATIONAL
*/
LWSSMDCL_NETWORK = (1 << 2),
/**<
* Something happened on the network, eg, link-up or DHCP, or captive
* portal state update
*/
};
/**
* lws_smd_msg_alloc() - allocate a message of length len
*
* \param ctx: the lws_context
* \param _class: the smd message class, recipients filter on this
* \param len: the required payload length
*
* This helper returns an opaque lws_smd_msg pointer and sets *buf to a buffer
* associated with it of length \p len.
*
* In this way the lws_msg_smd type remains completely opaque and the allocated
* area can be prepared by the caller directly, without copying.
*
* On failure, it returns NULL... it may fail for OOM but it may also fail if
* you request to allocate for a message class that the system has no
* participant who is listening for that class of event currently... the event
* generation action at the caller should be bypassed without error then.
*
* This is useful if you have a message you know the length of. For text-based
* messages like JSON, lws_smd_msg_printf() is more convenient.
*/
LWS_VISIBLE LWS_EXTERN void * /* payload */
lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len);
/**
* lws_smd_msg_free() - abandon a previously allocated message before sending
*
* \param payload: pointer the previously-allocated message payload
*
* Destroys a previously-allocated opaque message object and the requested
* buffer space, in the case that between allocating it and sending it, some
* condition was met that means it can no longer be sent, eg, an error
* generating the content. Otherwise there is no need to destroy allocated
* message objects with this, lws will take care of it.
*/
LWS_VISIBLE LWS_EXTERN void
lws_smd_msg_free(void **payload);
/**
* lws_smd_msg_send() - queue a previously allocated message
*
* \param ctx: the lws_context
* \param msg: the prepared message
*
* Queues an allocated, prepared message for delivery to smd clients
*
* This is threadsafe to call from a non-service thread.
*/
LWS_VISIBLE LWS_EXTERN int
lws_smd_msg_send(struct lws_context *ctx, void *payload);
/**
* lws_smd_msg_printf() - queue a previously allocated message
*
* \param ctx: the lws_context
* \param _class: the message class
* \param format: the format string to prepare the payload with
* \param ...: arguments for the format string, if any
*
* For string-based messages, eg, JSON, allows formatted creating of the payload
* size discovery, allocation and message send all in one step.
*
* Unlike lws_smd_msg_alloc() you do not need to know the length beforehand as
* this computes it and calls lws_smd_msg_alloc() with the correct length.
*
* To be clear this also calls through to lws_smd_msg_send(), it really does
* everything in one step. If there are no registered participants that want
* messages of \p _class, this function returns immediately without doing any
* allocation or anything else.
*
* This is threadsafe to call from a non-service thread.
*/
LWS_VISIBLE LWS_EXTERN int
lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
const char *format, ...) LWS_FORMAT(3);
typedef int (*lws_smd_notification_cb_t)(void *opaque, lws_smd_class_t _class,
lws_usec_t timestamp, void *buf,
size_t len);
#define LWSSMDREG_FLAG_PROXIED_SS (1 << 0)
/**< It's actually a proxied SS connection registering, opaque is the ss h */
/*
* lws_smd_register() - register to receive smd messages
*
* \param ctx: the lws_context
* \param opaque: an opaque pointer handed to the callback
* \param flags: typically 0
* \param _class_filter: bitmap of message classes we care about
* \param cb: the callback to receive messages
*
* Queues an allocated, prepared message for delivery to smd clients.
*
* Returns NULL on failure, or an opaque handle which may be given to
* lws_smd_unregister() to stop participating in the shared message queue.
*
* This is threadsafe to call from a non-service thread.
*/
LWS_VISIBLE LWS_EXTERN struct lws_smd_peer *
lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb);
/*
* lws_smd_unregister() - unregister receiving smd messages
*
* \param pr: the handle returned from the registration
*
* Destroys the registration of the callback for messages and ability to send
* messages.
*
* It's not necessary to call this if the registration wants to survive for as
* long as the lws_context... lws_context_destroy will also clean up any
* registrations still active by then.
*/
LWS_VISIBLE LWS_EXTERN void
lws_smd_unregister(struct lws_smd_peer *pr);

View file

@ -39,10 +39,14 @@ typedef struct lws_state_notify_link {
typedef struct lws_state_manager {
lws_dll2_owner_t notify_list;
struct lws_context *context;
void *parent;
#if defined(LWS_WITH_SYS_SMD)
lws_smd_class_t smd_class;
#endif
/**< optional opaque pointer to owning object... useful to make such
* a pointer available to a notification callback. Ignored by lws */
const char **state_names; /* may be NULL */
const char **state_names;
const char *name;
int state;
} lws_state_manager_t;

View file

@ -174,7 +174,7 @@ lws_client_connect_via_info(const struct lws_client_connect_info *i)
}
if (local) {
lwsl_info("%s: protocol binding to %s\n", __func__, local);
lwsl_info("%s: vh %s protocol binding to %s\n", __func__, wsi->vhost->name, local);
p = lws_vhost_name_to_protocol(wsi->vhost, local);
if (p)
lws_bind_protocol(wsi, p, __func__);

View file

@ -299,6 +299,14 @@ lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi)
if (!context)
return 1;
#if defined(LWS_WITH_SYS_SMD)
if (!tsi && lws_smd_message_pending(context)) {
lws_smd_msg_distribute(context);
if (lws_smd_message_pending(context))
return 0;
}
#endif
pt = &context->pt[tsi];
/*

View file

@ -109,6 +109,13 @@ _lws_state_transition(lws_state_manager_t *mgr, int target)
/* Indicate success by calling the notifers again with both args same */
_report(mgr, target, target);
#if defined(LWS_WITH_SYS_SMD)
if (mgr->smd_class)
(void)lws_smd_msg_printf(mgr->context,
mgr->smd_class, "{\"state\":\"%s\"}",
mgr->state_names[target]);
#endif
return 0;
}

View file

@ -84,7 +84,6 @@ lws_sul_peer_limits_cb(lws_sorted_usec_list_t *sul)
#if defined(LWS_WITH_SYS_STATE)
#if defined(_DEBUG)
static const char * system_state_names[] = {
"undef",
"CONTEXT_CREATED",
@ -101,7 +100,6 @@ static const char * system_state_names[] = {
"OPERATIONAL",
"POLICY_INVALID"
};
#endif
/*
@ -217,8 +215,31 @@ lws_context_creation_completion_cb(lws_sorted_usec_list_t *sul)
LWS_SYSTATE_OPERATIONAL);
}
#endif /* WITH_SYS_STATE */
#if defined(LWS_WITH_SYS_SMD)
static int
lws_system_smd_cb(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp,
void *buf, size_t len)
{
struct lws_context *cx = (struct lws_context *)opaque;
if (_class != LWSSMDCL_NETWORK)
return 0;
if (!lws_json_simple_strcmp(buf, len, "\"trigger\":", "cpdcheck")) {
lwsl_notice("%s: SMD -> Captive Portal Detect request\n",
__func__);
lws_system_cpd_start(cx);
}
return 0;
}
#endif
#endif /* NETWORK */
struct lws_context *
lws_create_context(const struct lws_context_creation_info *info)
{
@ -591,6 +612,27 @@ lws_create_context(const struct lws_context_creation_info *info)
context->fd_limit_per_thread = context->max_fds /
context->count_threads;
#if defined(LWS_WITH_SYS_SMD)
lws_mutex_init(context->smd.lock_messages);
lws_mutex_init(context->smd.lock_peers);
/* lws_system smd participant */
if (!lws_smd_register(context, context, 0, LWSSMDCL_NETWORK,
lws_system_smd_cb)) {
lwsl_err("%s: early smd register failed\n", __func__);
}
/* user smd participant */
if (info->early_smd_cb &&
!lws_smd_register(context, info->early_smd_opaque, 0,
info->early_smd_class_filter,
info->early_smd_cb)) {
lwsl_err("%s: early smd register failed\n", __func__);
}
#endif
#if defined(LWS_WITH_NETWORK)
context->default_retry.retry_ms_table = default_backoff_table;
@ -832,15 +874,18 @@ lws_create_context(const struct lws_context_creation_info *info)
/*
* init the lws_state mgr for the system state
*/
#if defined(_DEBUG)
context->mgr_system.state_names = system_state_names;
#endif
context->mgr_system.name = "system";
context->mgr_system.state = LWS_SYSTATE_CONTEXT_CREATED;
context->mgr_system.parent = context;
context->protocols_notify.name = "prot_init";
context->protocols_notify.notify_cb = lws_state_notify_protocol_init;
context->mgr_system.state_names = system_state_names;
context->mgr_system.name = "system";
context->mgr_system.state = LWS_SYSTATE_CONTEXT_CREATED;
context->mgr_system.parent = context;
context->mgr_system.context = context;
#if defined(LWS_WITH_SYS_SMD)
context->mgr_system.smd_class = LWSSMDCL_SYSTEM_STATE;
#endif
context->protocols_notify.name = "prot_init";
context->protocols_notify.notify_cb = lws_state_notify_protocol_init;
lws_state_reg_notifier(&context->mgr_system, &context->protocols_notify);
@ -997,9 +1042,7 @@ lws_system_cpd_start(struct lws_context *cx)
#endif
}
#if (_LWS_ENABLED_LOGS & LLL_NOTICE)
static const char *cname[] = { "?", "OK", "Captive", "No internet" };
#endif
static const char *cname[] = { "Unknown", "OK", "Captive", "No internet" };
void
lws_system_cpd_set(struct lws_context *cx, lws_cpd_result_t result)
@ -1010,7 +1053,14 @@ lws_system_cpd_set(struct lws_context *cx, lws_cpd_result_t result)
lwsl_notice("%s: setting CPD result %s\n", __func__, cname[result]);
cx->captive_portal_detect = (uint8_t)result;
#if defined(LWS_WITH_SYS_STATE)
#if defined(LWS_WITH_SYS_SMD)
lws_smd_msg_printf(cx, LWSSMDCL_NETWORK,
"{\"type\":\"cpd\",\"result\":\"%s\"}",
cname[cx->captive_portal_detect]);
#endif
/* if nothing is there to intercept anything, go all the way */
if (cx->mgr_system.state != LWS_SYSTATE_POLICY_INVALID)
lws_state_transition_steps(&cx->mgr_system,
@ -1096,6 +1146,10 @@ lws_context_destroy3(struct lws_context *context)
#endif
}
#if defined(LWS_WITH_SYS_SMD)
_lws_smd_destroy(context);
#endif
#if defined(LWS_WITH_SYS_ASYNC_DNS)
lws_async_dns_deinit(&context->async_dns);
#endif
@ -1122,7 +1176,7 @@ lws_context_destroy3(struct lws_context *context)
#endif
lws_free(context);
lwsl_info("%s: ctx %p freed\n", __func__, context);
lwsl_debug("%s: ctx %p freed\n", __func__, context);
if (pcontext_finalize)
*pcontext_finalize = NULL;
@ -1450,7 +1504,7 @@ struct lws_context *
lws_system_context_from_system_mgr(lws_state_manager_t *mgr)
{
#if defined(LWS_WITH_NETWORK)
return lws_container_of(mgr, struct lws_context, mgr_system);
return mgr->context;
#else
return NULL;
#endif

View file

@ -62,8 +62,14 @@
#include <sys/stat.h>
#endif
#if LWS_MAX_SMP > 1
#if LWS_MAX_SMP > 1 || defined(LWS_WITH_SYS_SMD)
/* https://stackoverflow.com/questions/33557506/timespec-redefinition-error */
#define HAVE_STRUCT_TIMESPEC
#include <pthread.h>
#else
#if !defined(pid_t) && defined(WIN32)
#define pid_t int
#endif
#endif
#ifndef LWS_DEF_HEADER_LEN
@ -212,6 +218,11 @@ struct lws;
#include "private-lib-secure-streams.h"
#endif
#if defined(LWS_WITH_SYS_SMD)
#include "private-lib-system-smd.h"
#endif
struct lws_io_watcher {
#ifdef LWS_WITH_LIBEV
struct lws_io_watcher_libev ev;
@ -326,6 +337,10 @@ struct lws_context {
lws_system_blob_t system_blobs[LWS_SYSBLOB_TYPE_COUNT];
#if defined(LWS_WITH_SYS_SMD)
lws_smd_t smd;
#endif
#if defined(LWS_WITH_NETWORK)
struct lws_context_per_thread pt[LWS_MAX_SMP];
lws_retry_bo_t default_retry;

View file

@ -46,7 +46,7 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
/* stay dead once we are dead */
if (!context || !context->vhost_list)
if (!context)
return 1;
pt = &context->pt[tsi];
@ -86,7 +86,7 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
timeout_ms = 2000000000;
timeout_us = ((lws_usec_t)timeout_ms) * LWS_US_PER_MS;
if (!pt->service_tid_detected) {
if (!pt->service_tid_detected && context->vhost_list) {
struct lws *_lws = pt->fake_wsi;
if (!_lws)

View file

@ -96,7 +96,7 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
/* stay dead once we are dead */
if (!context || !context->vhost_list)
if (!context)
return 1;
pt = &context->pt[tsi];
@ -106,7 +106,7 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
else
timeout_ms = 2000000000;
if (!pt->service_tid_detected) {
if (!pt->service_tid_detected && context->vhost_list) {
struct lws _lws;
memset(&_lws, 0, sizeof(_lws));

View file

@ -78,7 +78,7 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
/* stay dead once we are dead */
if (!context || !context->vhost_list)
if (!context)
return 1;
pt = &context->pt[tsi];
@ -96,7 +96,7 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
if (context->event_loop_ops->run_pt)
context->event_loop_ops->run_pt(context, tsi);
if (!pt->service_tid_detected) {
if (!pt->service_tid_detected && context->vhost_list) {
struct lws _lws;
memset(&_lws, 0, sizeof(_lws));

View file

@ -69,8 +69,7 @@
#endif
#if defined(LWS_HAVE_PTHREAD_H)
#include <pthread.h>
typedef pthread_mutex_t lws_mutex_t;
#define lws_mutex_t pthread_mutex_t
#define lws_mutex_init(x) pthread_mutex_init(&(x), NULL)
#define lws_mutex_destroy(x) pthread_mutex_destroy(&(x))
#define lws_mutex_lock(x) pthread_mutex_lock(&(x))

View file

@ -71,12 +71,12 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
int interrupt_requested;
/* stay dead once we are dead */
if (context == NULL || !context->vhost_list)
if (context == NULL)
return 1;
pt = &context->pt[tsi];
if (!pt->service_tid_detected) {
if (!pt->service_tid_detected && context->vhost_list) {
struct lws _lws;
memset(&_lws, 0, sizeof(_lws));

View file

@ -27,6 +27,12 @@
#include <private-lib-core.h>
#if defined(LWS_WITH_SYS_SMD)
const lws_ss_policy_t pol_smd = {
.flags = 0, /* have to set something for windows */
};
#endif
const lws_ss_policy_t *
lws_ss_policy_lookup(const struct lws_context *context, const char *streamtype)
{
@ -35,6 +41,11 @@ lws_ss_policy_lookup(const struct lws_context *context, const char *streamtype)
if (!streamtype)
return NULL;
#if defined(LWS_WITH_SYS_SMD)
if (!strcmp(streamtype, LWS_SMD_STREAMTYPENAME))
return &pol_smd;
#endif
while (p) {
if (!strcmp(p->streamtype, streamtype))
return p;
@ -212,7 +223,7 @@ lws_ss_policy_set(struct lws_context *context, const char *name)
if (!pol->trust_store) {
pol = pol->next;
if (!pol && !context->vhost_list) {
if (!pol) {
/* corner case... there's no trust store used */
i.options = context->options;
i.vhost_name = "_ss_default";
@ -254,6 +265,7 @@ lws_ss_policy_set(struct lws_context *context, const char *name)
for (n = 1; v && n < pol->trust_store->count; n++) {
lwsl_info("%s: add '%s' to trust store\n", __func__,
pol->trust_store->ssx509[n]->vhost_name);
#if defined(LWS_WITH_TLS)
if (lws_tls_client_vhost_extra_cert_mem(v,
pol->trust_store->ssx509[n]->ca_der,
pol->trust_store->ssx509[n]->ca_der_len)) {
@ -261,6 +273,7 @@ lws_ss_policy_set(struct lws_context *context, const char *name)
__func__);
ret = 1;
}
#endif
}
pol = pol->next;

View file

@ -120,6 +120,12 @@ typedef struct lws_ss_handle {
const char *subscribe_to;
size_t subscribe_to_len;
} mqtt;
#endif
#if defined(LWS_WITH_SYS_SMD)
struct {
struct lws_smd_peer *smd_peer;
lws_sorted_usec_list_t sul_write;
} smd;
#endif
} u;
@ -257,6 +263,8 @@ typedef struct lws_sspc_handle {
uint8_t rsidx;
uint8_t destroying:1;
uint8_t non_wsi:1;
uint8_t ignore_txc:1;
} lws_sspc_handle_t;
typedef struct backoffs {
@ -299,6 +307,10 @@ struct policy_cb_args {
int count;
};
#if defined(LWS_WITH_SYS_SMD)
extern const lws_ss_policy_t pol_smd;
#endif
int
lws_ss_deserialize_parse(struct lws_ss_serialization_parser *par,
struct lws_context *context,

View file

@ -57,6 +57,8 @@ lws_sspc_sul_retry_cb(lws_sorted_usec_list_t *sul)
return;
}
lwsl_notice("%s: sspc ss wsi %p\n", __func__, h->cwsi);
}
static int
@ -172,7 +174,7 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
if (!h)
break;
lwsl_info("%s: WRITEABLE %p: (%s) state %d\n", __func__, wsi,
lwsl_notice("%s: WRITEABLE %p: (%s) state %d\n", __func__, wsi,
h->ssi.streamtype, h->state);
n = 0;
@ -250,7 +252,7 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
/* we can't write anything if we don't have credit */
if (h->txc.tx_cr <= 0) {
if (!h->ignore_txc && h->txc.tx_cr <= 0) {
lwsl_notice("%s: WRITEABLE / OPERATIONAL:"
" lack credit (%d)\n", __func__,
h->txc.tx_cr);
@ -289,8 +291,6 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
if (!n)
break;
// lwsl_hexdump_notice(cp, n);
n = lws_write(wsi, (uint8_t *)cp, n, LWS_WRITE_RAW);
if (n < 0) {
lwsl_notice("%s: WRITEABLE: %d\n", __func__, n);
@ -347,11 +347,15 @@ lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
memcpy(p, ssi->streamtype, strlen(ssi->streamtype) + 1);
h->ssi.streamtype = (const char *)p;
h->context = context;
if (!ssi->manual_initial_tx_credit)
h->txc.peer_tx_cr_est = 500000000;
else
h->txc.peer_tx_cr_est = ssi->manual_initial_tx_credit;
if (!strcmp(ssi->streamtype, "_lws_smd"))
h->ignore_txc = 1;
lws_dll2_add_head(&h->client_list, &context->pt[tsi].ss_client_owner);
/* fill in the things the real api does for the caller */
@ -409,7 +413,8 @@ lws_sspc_destroy(lws_sspc_handle_t **ph)
if (h->cwsi) {
struct lws *wsi = h->cwsi;
h->cwsi = NULL;
lws_set_timeout(wsi, 1, LWS_TO_KILL_SYNC);
if (h->cwsi)
lws_set_timeout(wsi, 1, LWS_TO_KILL_SYNC);
}
/* clean out any pending metadata changes that didn't make it */

View file

@ -91,6 +91,8 @@ ss_proxy_onward_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
const char *rsp = NULL;
int n;
// lwsl_notice("%s: len %d\n", __func__, (int)len);
/*
* The onward secure stream connection has received something.
*/

View file

@ -774,10 +774,18 @@ payload_ff:
__func__, par->streamtype, par->txcr_out);
ssi->streamtype = par->streamtype;
if (par->txcr_out)
if (par->txcr_out) // !!!
ssi->manual_initial_tx_credit = par->txcr_out;
if (lws_ss_create(context, 0, ssi, parconn, pss, NULL, NULL)) {
/*
* Even for a synthetic SS proxing action like _lws_smd,
* we create an actual SS in the proxy representing the
* connection
*/
ssi->flags |= LWSSSINFLAGS_PROXIED;
if (lws_ss_create(context, 0, ssi, parconn, pss,
NULL, NULL)) {
/*
* We're unable to create the onward secure
* stream he asked for... schedule a chance to
@ -794,9 +802,19 @@ payload_ff:
if (*pss) {
(*pss)->being_serialized = 1;
lwsl_notice("%s: Created SS initial credit %d\n",
__func__, par->txcr_out);
(*pss)->info.manual_initial_tx_credit = par->txcr_out;
#if defined(LWS_WITH_SYS_SMD)
if ((*pss)->policy != &pol_smd)
/*
* In SMD case we overloaded the
* initial credit to be the class mask
*/
#endif
{
lwsl_info("%s: Created SS initial credit %d\n",
__func__, par->txcr_out);
(*pss)->info.manual_initial_tx_credit = par->txcr_out;
}
}
/* parent needs to schedule write on client conn */

View file

@ -220,6 +220,84 @@ lws_ss_backoff(lws_ss_handle_t *h)
return 0;
}
#if defined(LWS_WITH_SYS_SMD)
/*
* Local SMD <-> SS
*
* We pass received messages through to the SS handler synchronously, using the
* lws service thread context.
*
* After the SS is created and registered, still nothing is going to come here
* until the peer sends us his rx_class_mask and we update his registration with
* it, because from SS creation his rx_class_mask defaults to 0.
*/
static int
lws_smd_ss_cb(void *opaque, lws_smd_class_t _class,
lws_usec_t timestamp, void *buf, size_t len)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)opaque;
uint8_t *p = (uint8_t *)buf - LWS_SMD_SS_RX_HEADER_LEN;
/*
* When configured with SS enabled, lws over-allocates
* LWS_SMD_SS_RX_HEADER_LEN bytes behind the payload of the queued
* message, for prepending serialized class and timestamp data in-band
* with the payload.
*/
lws_ser_wu64be(p, _class);
lws_ser_wu64be(p + 8, timestamp);
if (h->info.rx)
h->info.rx((void *)&h[1], p, len + LWS_SMD_SS_RX_HEADER_LEN,
LWSSS_FLAG_SOM | LWSSS_FLAG_EOM);
return 0;
}
static void
lws_ss_smd_tx_cb(lws_sorted_usec_list_t *sul)
{
lws_ss_handle_t *h = lws_container_of(sul, lws_ss_handle_t, u.smd.sul_write);
uint8_t buf[LWS_SMD_SS_RX_HEADER_LEN + LWS_SMD_MAX_PAYLOAD], *p;
size_t len = sizeof(buf);
lws_smd_class_t _class;
int flags = 0, n;
if (!h->info.tx)
return;
n = h->info.tx(&h[1], h->txord++, buf, &len, &flags);
if (n)
/* nonzero return means don't want to send anything */
return;
// lwsl_notice("%s: (SS %p bound to _lws_smd creates message) tx len %d\n", __func__, h, (int)len);
// lwsl_hexdump_notice(buf, len);
assert(len >= LWS_SMD_SS_RX_HEADER_LEN);
_class = (lws_smd_class_t)lws_ser_ru64be(buf);
p = lws_smd_msg_alloc(h->context, _class, len - LWS_SMD_SS_RX_HEADER_LEN);
if (!p) {
lwsl_notice("%s: failed to alloc\n", __func__);
return;
}
memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN, len - LWS_SMD_SS_RX_HEADER_LEN);
if (lws_smd_msg_send(h->context, p)) {
lwsl_notice("%s: failed to queue\n", __func__);
return;
}
}
#endif
/*
* This is a local SS binding to a local SMD server
*/
int
lws_ss_client_connect(lws_ss_handle_t *h)
{
@ -245,6 +323,34 @@ lws_ss_client_connect(lws_ss_handle_t *h)
if (h->h_sink)
return 0;
#if defined(LWS_WITH_SYS_SMD)
if (h->policy == &pol_smd) {
if (h->u.smd.smd_peer) {
// lwsl_notice("%s: peer already set\n", __func__);
return 0;
}
// lwsl_notice("%s: received connect for _lws_smd, registering for class mask 0x%x\n",
// __func__, h->info.manual_initial_tx_credit);
h->u.smd.smd_peer = lws_smd_register(h->context, h,
(h->info.flags & LWSSSINFLAGS_PROXIED) ?
LWSSMDREG_FLAG_PROXIED_SS : 0,
h->info.manual_initial_tx_credit,
lws_smd_ss_cb);
if (!h->u.smd.smd_peer)
return -1;
if (lws_ss_event_helper(h, LWSSSCS_CONNECTING))
return -1;
// lwsl_err("%s: registered SS SMD\n", __func__);
if (lws_ss_event_helper(h, LWSSSCS_CONNECTED))
return -1;
return 0;
}
#endif
/*
* We're going to substitute ${metadata} in the endpoint at connection-
* time, so this can be set dynamically...
@ -367,7 +473,6 @@ lws_ss_client_connect(lws_ss_handle_t *h)
return 0;
}
/*
* Public API
*/
@ -397,7 +502,7 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
return 1;
}
if (ssi->register_sink) {
if (ssi->flags & LWSSSINFLAGS_REGISTER_SINK) {
/*
* This can register a secure streams sink as well as normal
* secure streams connections. If that's what's happening,
@ -505,13 +610,41 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
if (ppayload_fmt)
*ppayload_fmt = pol->payload_fmt;
if (ssi->register_sink) {
#if defined(LWS_WITH_SYS_SMD)
/*
* For a local Secure Streams connection
*/
if (!(ssi->flags & LWSSSINFLAGS_PROXIED) &&
pol == &pol_smd) {
/*
* So he has asked to be wired up to SMD over a SS link.
* Register him as an smd participant in his own right.
*
* Just for this case, ssi->manual_initial_tx_credit is used
* to set the rx class mask (this is part of the SS serialization
* format as well)
*/
h->u.smd.smd_peer = lws_smd_register(context, h, 0,
ssi->manual_initial_tx_credit,
lws_smd_ss_cb);
if (!h->u.smd.smd_peer)
goto late_bail;
lwsl_info("%s: registered SS SMD\n", __func__);
if (lws_ss_event_helper(h, LWSSSCS_CONNECTING))
return -1;
if (lws_ss_event_helper(h, LWSSSCS_CONNECTED))
return -1;
}
#endif
if (ssi->flags & LWSSSINFLAGS_REGISTER_SINK) {
/*
*
*/
}
if (lws_ss_event_helper(h, LWSSSCS_CREATING)) {
late_bail:
lws_pt_lock(pt, __func__);
lws_dll2_remove(&h->list);
lws_pt_unlock(pt);
@ -520,7 +653,14 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
return 1;
}
if (!ssi->register_sink && (h->policy->flags & LWSSSPOLF_NAILED_UP))
if (!(ssi->flags & LWSSSINFLAGS_REGISTER_SINK) &&
((h->policy->flags & LWSSSPOLF_NAILED_UP)
#if defined(LWS_WITH_SYS_SMD)
|| ((h->policy == &pol_smd) //&&
//(ssi->flags & LWSSSINFLAGS_PROXIED))
)
#endif
))
if (lws_ss_client_connect(h))
lws_ss_backoff(h);
@ -552,6 +692,15 @@ lws_ss_destroy(lws_ss_handle_t **ppss)
lws_set_timeout(h->wsi, 1, LWS_TO_KILL_SYNC);
}
/*
* if we bound an smd registration to the SS, unregister it
*/
if (h->policy == &pol_smd && h->u.smd.smd_peer) {
lws_smd_unregister(h->u.smd.smd_peer);
h->u.smd.smd_peer = NULL;
}
pt = &h->context->pt[h->tsi];
lws_pt_lock(pt, __func__);
@ -588,6 +737,22 @@ lws_ss_request_tx(lws_ss_handle_t *h)
return;
}
#if defined(LWS_WITH_SYS_SMD)
if (h->policy == &pol_smd) {
/*
* He's an _lws_smd... and no wsi... since we're just going
* to queue it, we could call his tx() right here, but rather
* than surprise him let's set a sul to do it next time around
* the event loop
*/
lws_sul_schedule(h->context, 0, &h->u.smd.sul_write,
lws_ss_smd_tx_cb, 1);
return;
}
#endif
if (h->seqstate != SSSEQ_IDLE &&
h->seqstate != SSSEQ_DO_RETRY)
return;

View file

@ -32,8 +32,6 @@ typedef struct ss_cpd {
/* ... application specific state ... */
lws_sorted_usec_list_t sul;
uint8_t partway;
} ss_cpd_t;
static int
@ -70,21 +68,18 @@ ss_cpd_state(void *userobj, void *sh, lws_ss_constate_t state,
return LWSSSSRET_OK;
}
static const lws_ss_info_t ssi_cpd = {
.handle_offset = offsetof(ss_cpd_t, ss),
.opaque_user_data_offset = offsetof(ss_cpd_t, opaque_data),
.state = ss_cpd_state,
.user_alloc = sizeof(ss_cpd_t),
.streamtype = "captive_portal_detect",
};
int
lws_ss_sys_cpd(struct lws_context *cx)
{
lws_ss_info_t ssi;
/* We're making an outgoing secure stream ourselves */
memset(&ssi, 0, sizeof(ssi));
ssi.handle_offset = offsetof(ss_cpd_t, ss);
ssi.opaque_user_data_offset = offsetof(ss_cpd_t, opaque_data);
ssi.state = ss_cpd_state;
ssi.user_alloc = sizeof(ss_cpd_t);
ssi.streamtype = "captive_portal_detect";
if (lws_ss_create(cx, 0, &ssi, cx, NULL, NULL, NULL)) {
if (lws_ss_create(cx, 0, &ssi_cpd, cx, NULL, NULL, NULL)) {
lwsl_info("%s: Create stream failed (policy?)\n", __func__);
return 1;

View file

@ -51,7 +51,11 @@ if (LWS_WITH_NETWORK)
list(APPEND SOURCES
system/dhcpclient/dhcpclient.c)
endif()
if (LWS_WITH_SYS_SMD)
add_subdir_include_dirs(smd)
endif()
endif()
#

View file

@ -0,0 +1,8 @@
include_directories(.)
list(APPEND SOURCES
system/smd/smd.c
)
exports_to_parent_scope()

223
lib/system/smd/README.md Normal file
View file

@ -0,0 +1,223 @@
# LWS System Message Distribution
## Overview
Independent pieces of a system may need to become aware of events and state
changes in the other pieces quickly, along with the new state if it is small.
These messages are local to inside a system, although they may be triggered by
events outside of it. Examples include keypresses, or networking state changes.
Individual OSes and frameworks typically have their own fragmented apis for
message-passing, but the lws apis operate the same across any platforms
including, eg, Windows and RTOS and allow crossplatform code to be written once.
Message payloads are short, less than 384 bytes, below system limits for atomic
pipe or UDS datagrams and consistent with heap usage on smaller systems, but
large enough to carry JSON usefully. Messages are typically low duty cycle.
Messages may be sent by any registered participant, they are allocated on heap
in a linked-list, and delivered no sooner than next time around the event loop.
This retains the ability to handle multiple event queuing in one event loop trip
while guaranteeing message handling is nonrecursive. Messages are passed to all
other registered participants before being destroyed.
`lws_smd` apis allow publication and subscription of message objects between
participants that are in a single process and are informed by callback from lws
service thread context, and, via Secure Streams proxying as the IPC method, also
between those in different processes. Registering as a participant and sending
messages are threadsafe APIs.
## Message Class
Message class is a bitfield messages use to indicate their general type, eg,
network status, or UI event like a keypress. Participants set a bitmask to
filter what kind of messages they care about, classes that are 0 in the peer's
filter are never delivered to the peer. A message usually indicates it is a
single class, but it's possible to set multiple class bits and match on any. If
so, care must be taken the payload can be parsed by readers expecting any of the
indicated classes, eg, by using JSON.
`lws_smd` tracks a global union mask for all participants' class mask. Requests
to allocate a message of a class that no participant listens for are rejected,
not at distribution-time but at message allocation-time, so no heap or cpu is
wasted on things that are not currently interesting; but such messages start to
appear as soon as a participant appears that wants them. The message generation
action should be bypassed without error in the case lws_smd_msg_alloc()
returns NULL.
## Messaging guarantees
Sent messages are delivered to all registered participants whose class mask
indicates they want it, including the sender. The send apis are threadsafe.
Locally-delivered message delivery callbacks occur from lws event loop thread
context 0 (the only one in the default case LWS_MAX_SMP = 1). Clients in
different processes receive callbacks from the thread context of their UDS
networking thread.
The message payload may be destroyed immediately when you return from the
callback, you can't store references to it or expect it to be there later.
Messages are timestamped with a systemwide monotonic timestamp. When
participants are on the lws event loop, messages are delivered in-order. When
participants are on different threads, delivery order depends on platform lock
acquisition. External process participants are connected by the Unix Domain
Socket capability of Secure Streams, and may be delivered out-of-order;
receivers that care must consult the message creation timestamps.
## Message Refcounting
To avoid keeping a list of the length of the number of participants for each
message, a refcount is used in the message, computed at the time the message
arrived considering the number of active participants that indicated a desire to
receive messages of that class.
Since peers may detach / close their link asynchronously, the logical peer
objects at the distributor defer destroying themselves until there is no more
possibility of messages arriving timestamped with the period they were active.
A grace period (default 2s) is used to ensure departing peers correctly account
for message refcounts before being destroyed.
## Message creation
Messages may contain arbitrary text or binary data depending on the class. JSON
is recommended since lws_smd messages are small and low duty cycle but have
open-ended content: JSON is maintainable, extensible, debuggable and self-
documenting and avoids, eg, fragile dependencies on header versions shared
between teams. To simplify issuing JSON, a threadsafe api to create and send
messages in one step using format strings is provided:
```
int
lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
const char *format, ...);
```
## Secure Streams `lws_smd` streamtype
When built with LWS_WITH_SECURE_STREAMS, lws_smd exposes a built-in streamtype
`_lws_smd` which user Secure Streams may use to interoperate with lws_smd using
SS payload semantics.
When using `_lws_smd`, the SS info struct member `manual_initial_tx_credit`
provided by the user when creating the Secure Stream is overloaded to be used as
the RX class mask for the SMD connection associated with the Secure Stream.
Both RX and TX payloads have a 16-byte binary header before the actual payload.
For TX, although the header is 16-bytes, only the first 64-bit class bitfield
needs setting, the timestamp is fetched and added by lws.
- MSB-first 64-bit class bitfield (currently only 32 least-sig in use)
- MSB-First Order 64-bit us-resolution timestamp
## Well-known message schema
Class|Schema
---|---
LWSSMDCL_INTERACTION|lws_button events
LWSSMDCL_NETWORK|captive portal detection requests and results
LWSSMDCL_SYSTEM_STATE|lws_system state progression
### User interaction Button events
Class: `LWSSMDCL_INTERACTION`
Produced by lws_button when a user interacts with a defined button.
Click-related events are produced alongside up and down related events, the
participant can choose which to attend to according to the meaning of the
interaction.
Both kinds of event go through sophisticated filtering before being issued, see
`./lib/drivers/button/README.md` for details.
#### SMD Button interaction event
Schema:
```
{
"type": "button",
"src": "<controller-name>/<button-name>",
"event": "<event-name>"
}
```
For example, `{"type":"button","src":"bc/user","event":"doubleclick"}`
Event name|Meaning
---|---
down|The button passes a filter for being down, useful for duration-based response
up|The button has come up, useful for duration-based response
click|The button activity resulted in a classification as a single-click
longclick|The button activity resulted in a classification as a long-click
doubleclick|The button activity resulted in a classification as a double-click
### Captive Portal Detection
Class: `LWSSMDCL_NETWORK`
Actively detects if the network can reach the internet or if it is
intercepted by a captive portal. The detection steps are programmable
via the Secure Streams Policy for a streamtype `captive_portal_detect`, eg
```
"captive_portal_detect": {
"endpoint": "connectivitycheck.android.com",
"http_url": "generate_204",
"port": 80,
"protocol": "h1",
"http_method": "GET",
"opportunistic": true,
"http_expect": 204,
"http_fail_redirect": true
}
```
#### SMD Report Result
Schema: `{"type": "cpd", "result":"<result>"}`
result|meaning
---|---
OK|Internet is reachable
Captive|Internet is behind a captive portal
No internet|There is no connectivity
#### SMD Request re-detection
Schema: `{"trigger": "cpdcheck"}`
### lws_system state progression
Class: `LWSSMDCL_SYSTEM_STATE`
Lws system state changes are forwarded to lws_smd messages so participants not
on the lws event loop directly can be aware of progress. Code registering a
lws_system notifier callback, on the main lws loop, can synchronously veto state
changes and hook proposed state changes, lws_smd events are asynchronous
notifications of state changes after they were decided only... however they are
available over the whole system.
It's not possible to make validated TLS connections until the system has
acquired the date as well as acquired an IP on a non-captive portal connection,
for that reason user code will usually be dependent on the system reaching
"OPERATIONAL" state if lws is responsible for managing the boot process.
#### System state event
Schema: `{"state":"<state>"}"`
State|Meaning
---|---
CONTEXT_CREATED|We're creating the lws_context
INITIALIZED|Initial vhosts and protocols initialized
IFACE_COLDPLUG|Network interfaces discovered
DHCP|DHCP acquired
CPD_PRE_TIME|Captive portal detect hook before we have system time
TIME_VALID|Ntpclient has run
CPD_POST_TIME|Captive portal detect hook after system time (tls-based check)
POLICY_VALID|The system policy has been acquired and parsed
REGISTERED|This device is registered with an authority
AUTH1|We acquired auth1 from the authority using our registration info
AUTH2|We acquired auth2 from the authority using our registration info
OPERATIONAL|We are active and able to make authenticated tls connections
POLICY_INVALID|The policy is being changed

View file

@ -0,0 +1,114 @@
/*
* lws System Message Distribution
*
* Copyright (C) 2019 - 2020 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
/*
* Number of seconds registered peers must remain as zombies to handle in-flight
* older messages correctly
*/
#if !defined(LWS_SMD_INFLIGHT_GRACE_SECS)
#define LWS_SMD_INFLIGHT_GRACE_SECS (2)
#endif
#if !defined(LWS_SMD_MAX_QUEUE_DEPTH)
#define LWS_SMD_MAX_QUEUE_DEPTH (12)
#endif
#if defined(LWS_WITH_SECURE_STREAMS)
#define LWS_SMD_SS_RX_HEADER_LEN_EFF (LWS_SMD_SS_RX_HEADER_LEN)
#else
#define LWS_SMD_SS_RX_HEADER_LEN_EFF (0)
#endif
typedef struct lws_smd_msg {
lws_dll2_t list;
lws_usec_t timestamp;
lws_smd_class_t _class;
uint16_t length;
uint16_t refcount;
/* message itself is over-allocated after this */
} lws_smd_msg_t;
/*
* The peer's relationship to the lws instance doing the distribution
*/
typedef enum {
LSMDT_SAME_PROCESS, /* we call him back ourselves */
LSMDT_SECURE_STREAMS_LOCAL, /* we call him back ourselves */
LSMDT_SECURE_STREAMS_PROXIED, /* we ask to write and wait */
} lws_smd_type_t;
typedef struct lws_smd_peer {
lws_dll2_t list;
lws_usec_t timestamp_joined;
lws_usec_t timestamp_left;
#if defined(LWS_WITH_SECURE_STREAMS)
lws_ss_handle_t *ss_handle; /* LSMDT_SECURE_STREAMS */
#endif
lws_smd_notification_cb_t cb; /* LSMDT_<other> */
void *opaque;
/* NULL, or next message we will handle */
lws_smd_msg_t *tail;
lws_smd_class_t _class_filter;
lws_smd_type_t type;
} lws_smd_peer_t;
/*
* Manages message distribution
*
* There is one of these in the lws_context, but the distribution action also
* gets involved in delivering to pt event loops individually for SMP case
*/
typedef struct lws_smd {
lws_dll2_owner_t owner_messages; /* lws_smd_msg_t */
lws_mutex_t lock_messages;
lws_dll2_owner_t owner_peers; /* lws_smd_peer_t */
lws_mutex_t lock_peers;
/* union of peer class filters, suppress creation of msg classes not set */
lws_smd_class_t _class_filter;
lws_sorted_usec_list_t sul_tail_stale;
char delivering;
} lws_smd_t;
/* check if this tsi has pending messages to deliver */
int
lws_smd_message_pending(struct lws_context *ctx);
int
lws_smd_msg_distribute(struct lws_context *ctx);
int
_lws_smd_destroy(struct lws_context *ctx);

524
lib/system/smd/smd.c Normal file
View file

@ -0,0 +1,524 @@
/*
* lws System Message Distribution
*
* Copyright (C) 2019 - 2020 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "private-lib-core.h"
#include <assert.h>
void *
lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len)
{
lws_smd_msg_t *msg;
/* only allow it if someone wants to consume this class of event */
if (!(ctx->smd._class_filter & _class)) {
lwsl_info("%s: rejecting class 0x%x as no participant wants it\n", __func__,
(unsigned int)_class);
return NULL;
}
assert(len <= LWS_SMD_MAX_PAYLOAD);
/*
* If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind
* payload, ie, msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload
*/
msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len,
__func__);
if (!msg)
return NULL;
memset(msg, 0, sizeof(*msg));
msg->timestamp = lws_now_usecs();
msg->length = (uint16_t)len;
msg->_class = _class;
return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF;
}
void
lws_smd_msg_free(void **ppay)
{
lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) -
LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
/* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */
lws_free(msg);
*ppay = NULL;
}
/*
* Figure out what to set the initial refcount for the message to
*/
static int
_lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg)
{
struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd);
int interested = 0;
lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
/*
* In order to optimize the tail managment into a refcount,
* we have to account exactly for when peers arrived and
* departed (including deferring the logical peer destruction
* until no message pending he may have contributed to the
* refcount of)
*/
if (pr->timestamp_joined <= msg->timestamp &&
(!pr->timestamp_left || /* if zombie, only contribute to
* refcount if msg from before we
* left */
pr->timestamp_left < msg->timestamp) &&
(msg->_class & pr->_class_filter))
/*
* This peer wants to consume it
*/
interested++;
} lws_end_foreach_dll(p);
return interested;
}
static int
_lws_smd_class_mask_union(lws_smd_t *smd)
{
uint32_t mask = 0;
lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
smd->owner_peers.head) {
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
/* may destroy pr if zombie */
mask |= pr->_class_filter;
} lws_end_foreach_dll_safe(p, p1);
smd->_class_filter = mask;
return 0;
}
int
lws_smd_msg_send(struct lws_context *ctx, void *pay)
{
lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) -
LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
if (ctx->smd.owner_messages.count >= LWS_SMD_MAX_QUEUE_DEPTH)
/* reject the message due to max queue depth reached */
return 1;
if (!ctx->smd.delivering)
lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
msg->refcount = _lws_smd_msg_assess_peers_interested(&ctx->smd, msg);
lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */
lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages);
lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
/*
* Any peer with no active tail needs to check our class to see if we
* should become his tail
*/
lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
if (!pr->tail && (pr->_class_filter & msg->_class))
pr->tail = msg;
} lws_end_foreach_dll(p);
if (!ctx->smd.delivering)
lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
/* we may be happening from another thread context */
lws_cancel_service(ctx);
return 0;
}
int
lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
const char *format, ...)
{
lws_smd_msg_t *msg;
va_list ap;
char *p;
int n;
if (!(ctx->smd._class_filter & _class))
/*
* There's nobody interested in messages of this class atm.
* Don't bother generating it, and act like all is well.
*/
return 0;
va_start(ap, format);
n = vsnprintf(NULL, 0, format, ap);
va_end(ap);
if (n > LWS_SMD_MAX_PAYLOAD)
/* too large to send */
return 1;
p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2);
if (!p)
return 1;
msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
sizeof(*msg));
msg->length = (uint16_t)n;
va_start(ap, format);
vsnprintf(p, n + 2, format, ap);
va_end(ap);
/*
* locks taken and released in here
*/
if (lws_smd_msg_send(ctx, p)) {
lws_smd_msg_free((void **)&p);
return 1;
}
return 0;
}
static void
_lws_smd_peer_finalize_destroy(lws_smd_peer_t *pr)
{
lws_dll2_remove(&pr->list);
lws_free(pr);
}
/*
* Peers that deregister may need to hang around as zombies, so they account
* for refcounts on messages they already contributed to. Because older
* messages may be in flight over UDS links, we have to stick around and make
* sure all cases have their refcount handled correctly.
*/
static void
_lws_smd_peer_zombify(lws_smd_peer_t *pr)
{
lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t,
owner_peers);
/* update the class mask union to reflect this peer no longer active */
_lws_smd_class_mask_union(smd);
pr->timestamp_left = lws_now_usecs();
}
static lws_smd_msg_t *
_lws_smd_msg_next_matching_filter(lws_dll2_t *tail, lws_smd_class_t filter)
{
lws_smd_msg_t *msg;
do {
tail = tail->next;
if (!tail)
return NULL;
msg = lws_container_of(tail, lws_smd_msg_t, list);
if (msg->_class & filter)
return msg;
} while (1);
return NULL;
}
/*
* Note: May destroy zombie peers when it sees grace period has expired.
*
* Delivers only one message to the peer and advances the tail, or sets to NULL
* if no more filtered queued messages. Returns nonzero if tail non-NULL.
*
* For Proxied SS, only asks for writeable and does not advance or change the
* tail.
*
* This is done so if multiple messages queued, we don't get a situation where
* one participant gets them all spammed, then the next etc. Instead they are
* delivered round-robin.
*/
static int
_lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
{
lws_smd_msg_t *msg;
if (!pr->tail)
return 0;
msg = lws_container_of(pr->tail, lws_smd_msg_t, list);
/*
* Check if zombie peer and the message predates our leaving
*/
if (pr->timestamp_left &&
msg->timestamp > pr->timestamp_left) {
/*
* We do not need to modify message refcount, if it was
* generated after we became a zombie, and so we
* definitely did not contribute to its refcount...
*
* ...have we waited out the grace period?
*/
if (lws_now_usecs() - pr->timestamp_left >
LWS_SMD_INFLIGHT_GRACE_SECS * LWS_US_PER_SEC)
/*
* ... ok, it's time for the zombie to abandon
* its attachment to the Earth and rejoin the
* cosmic mandela
*/
_lws_smd_peer_finalize_destroy(pr);
/* ... either way, nothing further to do for this guy */
return 0;
}
if (!pr->timestamp_left) {
/*
* Peer is not a zombie... deliver the tail
*/
#if 0
if (pr->type == LSMDT_SECURE_STREAMS_PROXIED) {
#if defined(LWS_WITH_SECURE_STREAMS)
if (pr->ss_handle)
lws_ss_request_tx(pr->ss_handle);
#endif
return 0;
}
#endif
pr->cb(pr->opaque, msg->_class, msg->timestamp,
((uint8_t *)&msg[1]) +
LWS_SMD_SS_RX_HEADER_LEN_EFF,
(size_t)msg->length);
}
assert(msg->refcount);
/*
* If there is one, move forward to the next queued
* message that meets our filters
*/
pr->tail = _lws_smd_msg_next_matching_filter(
&pr->tail->list, pr->_class_filter);
if (!--msg->refcount) {
/*
* We have fully delivered the message now, it
* can be unlinked and destroyed
*/
lws_dll2_remove(&msg->list);
lws_free(msg);
}
/*
* Wait out the grace period even if no live messages
* for a zombie peer... there may be some in flight
*/
return !!pr->tail;
}
/*
* Called when the event loop could deliver messages synchronously, eg, on
* entry to idle
*/
int
lws_smd_msg_distribute(struct lws_context *ctx)
{
char more;
/* commonly, no messages and nothing to do... */
if (!ctx->smd.owner_messages.count)
return 0;
ctx->smd.delivering = 1;
do {
more = 0;
lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
ctx->smd.owner_peers.head) {
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
/* may destroy pr if zombie, hence _safe iterator */
more |= _lws_smd_msg_deliver_peer(ctx, pr);
} lws_end_foreach_dll_safe(p, p1);
lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
} while (more);
ctx->smd.delivering = 0;
return 0;
}
struct lws_smd_peer *
lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb)
{
lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__);
if (!pr)
return NULL;
pr->cb = cb;
pr->opaque = opaque;
pr->_class_filter = _class_filter;
pr->timestamp_joined = lws_now_usecs();
/*
* Figure out the type of peer from the situation...
*/
#if 0
#if defined(LWS_WITH_SECURE_STREAMS)
if (!ctx->smd.listen_vh) {
/*
* The guy who is regsitering is actually a SS proxy link
* between a client and SMD
*/
} else
#endif
#endif
pr->type = LSMDT_SAME_PROCESS;
if (!ctx->smd.delivering)
lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers);
/* update the global class mask union to account for new peer mask */
_lws_smd_class_mask_union(&ctx->smd);
if (!ctx->smd.delivering)
lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
lwsl_debug("%s: registered\n", __func__);
return pr;
}
void
lws_smd_unregister(struct lws_smd_peer *pr)
{
lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers);
lws_mutex_lock(smd->lock_peers); /* +++++++++++++++++++++++++++ peers */
_lws_smd_peer_zombify(pr);
lws_mutex_unlock(smd->lock_peers); /* ------------------------- peers */
}
int
lws_smd_message_pending(struct lws_context *ctx)
{
int ret = 1;
/*
* First cheaply check the common case no messages pending, so there's
* definitely nothing for this tsi or anything else
*/
if (!ctx->smd.owner_messages.count)
return 0;
/*
* Walk the peer list
*/
lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++++++++++ peers */
lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
if (pr->tail && pr->type == LSMDT_SAME_PROCESS)
goto bail;
} lws_end_foreach_dll(p);
/*
* There's no message pending that we need to handle
*/
ret = 0;
bail:
lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */
return ret;
}
int
_lws_smd_destroy(struct lws_context *ctx)
{
/* stop any message creation */
ctx->smd._class_filter = 0;
/*
* Walk the message list, destroying them
*/
lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
ctx->smd.owner_messages.head) {
lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
lws_free(msg);
} lws_end_foreach_dll_safe(p, p1);
lws_mutex_destroy(ctx->smd.lock_messages);
/*
* Walk the peer list, destroying them
*/
lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
ctx->smd.owner_peers.head) {
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
_lws_smd_peer_finalize_destroy(pr);
} lws_end_foreach_dll_safe(p, p1);
lws_mutex_destroy(ctx->smd.lock_peers);
return 0;
}

View file

@ -128,7 +128,7 @@ lws_system_blob_destroy(lws_system_blob_t *b)
{
if (!b)
return;
lwsl_info("%s: blob %p\n", __func__, b);
// lwsl_info("%s: blob %p\n", __func__, b);
if (!b->is_direct)
lws_buflist_destroy_all_segments(&b->u.bl);
}

View file

@ -0,0 +1,27 @@
project(lws-api-test-lws_smd)
cmake_minimum_required(VERSION 2.8)
find_package(libwebsockets CONFIG REQUIRED)
list(APPEND CMAKE_MODULE_PATH ${LWS_CMAKE_DIR})
include(CheckCSourceCompiles)
include(LwsCheckRequirements)
set(requirements 1)
require_pthreads(requirements)
require_lws_config(LWS_WITH_SYS_SMD 1 requirements)
if (requirements)
add_executable(${PROJECT_NAME} main.c)
add_test(NAME api-test-lws_smd COMMAND lws-api-test-lws_smd -d1039)
set_tests_properties(api-test-lws_smd
PROPERTIES
RUN_SERIAL TRUE
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/api-tests/api-test-lws_smd
TIMEOUT 60)
if (websockets_shared)
target_link_libraries(${PROJECT_NAME} websockets_shared ${PTHREAD_LIB} ${LIBWEBSOCKETS_DEP_LIBS})
add_dependencies(${PROJECT_NAME} websockets_shared)
else()
target_link_libraries(${PROJECT_NAME} websockets ${PTHREAD_LIB} ${LIBWEBSOCKETS_DEP_LIBS})
endif()
endif()

View file

@ -0,0 +1,202 @@
/*
* lws-api-test-lws_smd
*
* Written in 2020 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* This api test confirms lws_smd System Message Distribution
*/
#include <libwebsockets.h>
#define HAVE_STRUCT_TIMESPEC
#include <pthread.h>
#include <signal.h>
static int interrupted, ok, fail, _exp = 111;
static lws_sorted_usec_list_t sul;
struct lws_context *context;
static pthread_t thread_spam;
static void
timeout_cb(lws_sorted_usec_list_t *sul)
{
/* We should have completed the test before this fires */
interrupted = 1;
lws_cancel_service(context);
}
static int
smd_cb1int(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp,
void *buf, size_t len)
{
#if 0
lwsl_notice("%s: ts %llu, len %d\n", __func__,
(unsigned long long)timestamp, (int)len);
lwsl_hexdump_notice(buf, len);
#endif
ok++;
return 0;
}
static int
smd_cb2int(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp,
void *buf, size_t len)
{
#if 0
lwsl_notice("%s: ts %llu, len %d\n", __func__,
(unsigned long long)timestamp, (int)len);
lwsl_hexdump_notice(buf, len);
#endif
ok++;
return 0;
}
static void *
_thread_spam(void *d)
{
int n;
n = 0;
while (n++ < 100) {
if (lws_smd_msg_printf(context, LWSSMDCL_SYSTEM_STATE,
"{\"s\":\"state\",\"msg\":%d}",
(unsigned int)n)) {
lwsl_info("%s: send failed\n", __func__);
n--;
}
#if defined(WIN32)
Sleep(3);
#else
usleep(3000);
#endif
}
#if !defined(WIN32)
pthread_exit(NULL);
#endif
return NULL;
}
void sigint_handler(int sig)
{
interrupted = 1;
}
static int
system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
int current, int target)
{
// struct lws_context *context = mgr->parent;
if (current != LWS_SYSTATE_OPERATIONAL || target != LWS_SYSTATE_OPERATIONAL)
return 0;
lwsl_info("%s: operational\n", __func__);
/*
* spawn the test thread, it's going to spam 100 messages at 20ms
* intervals... check we got everything
*/
if (pthread_create(&thread_spam, NULL, _thread_spam, NULL))
lwsl_err("%s: failed to create the spamming thread\n", __func__);
return 0;
}
int
main(int argc, const char **argv)
{
lws_state_notify_link_t notifier = { {0}, system_notify_cb, "app" };
lws_state_notify_link_t *na[] = { &notifier, NULL };
int logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE;
struct lws_context_creation_info info;
const char *p;
void *retval;
/* the normal lws init */
signal(SIGINT, sigint_handler);
if ((p = lws_cmdline_option(argc, argv, "-d")))
logs = atoi(p);
lws_set_log_level(logs, NULL);
lwsl_user("LWS API selftest: lws_smd\n");
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
info.port = CONTEXT_PORT_NO_LISTEN;
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
info.register_notifier_list = na;
context = lws_create_context(&info);
if (!context) {
lwsl_err("lws init failed\n");
return 1;
}
lws_sul_schedule(context, 0, &sul, timeout_cb, 5 * LWS_US_PER_SEC);
/* register a messaging participant to hear INTERACTION class */
if (!lws_smd_register(context, NULL, 0, LWSSMDCL_INTERACTION,
smd_cb1int)) {
lwsl_err("%s: smd register 1 failed\n", __func__);
goto bail;
}
/* register a messaging participant to hear SYSTEM_STATE class */
if (!lws_smd_register(context, NULL, 0, LWSSMDCL_SYSTEM_STATE,
smd_cb2int)) {
lwsl_err("%s: smd register 2 failed\n", __func__);
goto bail;
}
/* generate an INTERACTION class message */
if (lws_smd_msg_printf(context, LWSSMDCL_INTERACTION,
"{\"s\":\"interaction\"}")) {
lwsl_err("%s: problem sending smd\n", __func__);
goto bail;
}
/* generate a SYSTEM_STATE class message */
if (lws_smd_msg_printf(context, LWSSMDCL_SYSTEM_STATE,
"{\"s\":\"state\"}")) {
lwsl_err("%s: problem sending smd\n", __func__);
goto bail;
}
/* no participant listens for this class, so it should be skipped */
if (lws_smd_msg_printf(context, LWSSMDCL_NETWORK, "{\"s\":\"network\"}")) {
lwsl_err("%s: problem sending smd\n", __func__);
goto bail;
}
/* the usual lws event loop */
while (!interrupted && lws_service(context, 0) >= 0)
;
pthread_join(thread_spam, &retval);
bail:
lws_context_destroy(context);
if (fail || ok >= _exp)
lwsl_user("Completed: PASS: %d / %d, FAIL: %d\n", ok, _exp,
fail);
else
lwsl_user("Completed: ALL PASS: %d / %d\n", ok, _exp);
return !(ok >= _exp && !fail);
}

View file

@ -461,7 +461,8 @@ int main(int argc, const char **argv)
mcp = lws_json_simple_find(t1, strlen(t1), "\"myname1\":", &alen);
if (mcp != t1 + 11 || alen != 4) {
lwsl_err("%s: lws_json_simple_find 1 failed: (%d) %s\n", __func__, (int)alen, mcp);
lwsl_err("%s: lws_json_simple_find 1 failed: (%d) %s\n",
__func__, (int)alen, mcp);
return 1;
}

View file

@ -133,8 +133,18 @@ static const char * const default_ss_policy =
"]"
"}"
"],"
"\"s\": ["
"{\"fetch_policy\": {"
"\"s\": [{"
"\"captive_portal_detect\": {"
"\"endpoint\": \"connectivitycheck.android.com\","
"\"http_url\": \"generate_204\","
"\"port\": 80,"
"\"protocol\": \"h1\","
"\"http_method\": \"GET\","
"\"opportunistic\": true,"
"\"http_expect\": 204,"
"\"http_fail_redirect\": true"
"},"
"\"fetch_policy\": {"
"\"endpoint\":" "\"warmcat.com\","
"\"port\":" "443,"
"\"protocol\":" "\"h1\","

View file

@ -0,0 +1,46 @@
project(lws-minimal-secure-streams-smd C)
cmake_minimum_required(VERSION 2.8)
find_package(libwebsockets CONFIG REQUIRED)
list(APPEND CMAKE_MODULE_PATH ${LWS_CMAKE_DIR})
include(CheckCSourceCompiles)
include(LwsCheckRequirements)
set(requirements 1)
require_lws_config(LWS_ROLE_H1 1 requirements)
require_lws_config(LWS_WITH_CLIENT 1 requirements)
require_lws_config(LWS_WITH_SECURE_STREAMS 1 requirements)
require_lws_config(LWS_WITH_SYS_SMD 1 requirements)
require_lws_config(LWS_WITH_SECURE_STREAMS_STATIC_POLICY_ONLY 0 requirements)
require_lws_config(LWS_WITH_SYS_STATE 1 requirements)
if (requirements)
add_executable(${PROJECT_NAME} minimal-secure-streams-smd.c)
if (LWS_CTEST_INTERNET_AVAILABLE)
add_test(NAME ss-smd COMMAND lws-minimal-secure-streams-smd)
set_tests_properties(ss-smd
PROPERTIES
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/secure-streams/minimal-secure-streams-smd
TIMEOUT 10)
endif()
if (websockets_shared)
target_link_libraries(${PROJECT_NAME} websockets_shared ${LIBWEBSOCKETS_DEP_LIBS})
add_dependencies(${PROJECT_NAME} websockets_shared)
else()
target_link_libraries(${PROJECT_NAME} websockets ${LIBWEBSOCKETS_DEP_LIBS})
endif()
if (LWS_WITH_SECURE_STREAMS_PROXY_API)
add_compile_options(-DLWS_SS_USE_SSPC)
add_executable(${PROJECT_NAME}-client minimal-secure-streams-smd.c)
if (websockets_shared)
target_link_libraries(${PROJECT_NAME}-client websockets_shared ${LIBWEBSOCKETS_DEP_LIBS})
add_dependencies(${PROJECT_NAME}-client websockets_shared)
else()
target_link_libraries(${PROJECT_NAME}-client websockets ${LIBWEBSOCKETS_DEP_LIBS})
endif()
endif()
endif()

View file

@ -0,0 +1,126 @@
# lws minimal secure streams SMD
This application creates a Secure Stream link to LWS SMD, System
Message Distribution.
The SS is able to receive system messages matching a specified
class filter, and issue system messages also using SS payload
semantics.
Both a direct api lws_smd participant and an SS based one are instantiated.
They both filter on system messages.
When the Secure Stream is created, it asks to send using normal the SS api.
In the SS tx callback, it prepares a header and then send a NETWORK class
message.
Numbers of messages received each way and sent is compared after 2s and the
test exits with a success or a fail.
### Building and testing
Build with
-DLWS_WITH_SECURE_STREAMS=1
-DLWS_WITH_SECURE_STREAMS_PROXY_API=1
-DLWS_WITH_MINIMAL_EXAMPLES=1
The run ./bin/lws-minimal-secure-streams-smd alone (local SS and direct SMD tests)
and after run ./bin/lws-minimal-secure-streams-proxy in one console and
./bin-lws-minimal-secure-streams-smd-client in the other (SS proxy tests)
### What's going on in the -client test
The -client build version contains the test logic as usual, but outsources the
policy and smd_ server part to the Secure Streams Proxy.
- start lws-minimal-secure-streams-proxy first
- start lws-minimal-secure-streams-smd-client
1) When the client starts, we waits to hear the client state is OPERATIONAL in
a direct smd participant callback. When it is, he creates a Secure Stream of
streamtype "_lws_smd", creating a local SS handle.
2) The SS creation request is proxied to the SS proxy process over Unix Domain
Sockets. There it creates a Secure Stream object proxyside, and registers as
an SMD participant... this smd-related behaviour is tied to the special
streamtype name "_lws_smd". The SMD registration uses a class mask passed to
the proxy in the tx credit field of the serialization.
3) SMD messages that pass the class mask filter are proxied back to the client
over the connection.
4) SMD messages created at the client are passed to the proxy and added to the
proxy's SMD queue, if the same connection's class mask accepts the message then
it will be proxied back to the client same as other messages.
The minimal example produces a variety of messages on the SS link, including
CPD detect trigger. The SS link is set up to only accept messages of classes
LWSSMDCL_SYSTEM_STATE and LWSSMDCL_NETWORK, INTERACTION type messages are
not accepted.
## build
```
$ cmake . && make
```
## usage
Commandline option|Meaning
---|---
-d <loglevel>|Debug verbosity in decimal, eg, -d15
```
$ ./bin/lws-minimal-secure-streams-smd -d 1151
[2020/06/18 21:44:54:5148] U: LWS Secure Streams SMD test client [-d<verb>]
[2020/06/18 21:44:54:5601] I: Initial logging level 1151
[2020/06/18 21:44:54:5605] I: Libwebsockets version: 4.0.99-v4.0.0-174-ga8a2eb954 v4.0.0-174-ga8a2eb954
[2020/06/18 21:44:54:5607] I: IPV6 not compiled in
...
[2020/06/18 21:44:54:7906] D: _lws_state_transition: system: changed 11 'AUTH2' -> 12 'OPERATIONAL'
[2020/06/18 21:44:54:7906] D: _realloc: size 81: lws_smd_msg_alloc
[2020/06/18 21:44:54:7907] I: lws_cancel_service
[2020/06/18 21:44:54:7912] I: lws_state_transition_steps: CONTEXT_CREATED -> OPERATIONAL
[2020/06/18 21:44:54:7919] N: myss_tx: sending SS smd
[2020/06/18 21:44:54:7940] D: _realloc: size 84: lws_smd_msg_alloc
[2020/06/18 21:44:54:7944] I: lws_cancel_service
[2020/06/18 21:44:54:7966] D: direct_smd_cb: class: 0x2, ts: 3139600721554
[2020/06/18 21:44:54:7972] D:
[2020/06/18 21:44:54:7990] D: 0000: 7B 22 73 74 61 74 65 22 3A 22 49 4E 49 54 49 41 {"state":"INITIA
[2020/06/18 21:44:54:7998] D: 0010: 4C 49 5A 45 44 22 7D LIZED"}
[2020/06/18 21:44:54:8001] D:
[2020/06/18 21:44:54:8016] I: myss_rx: len 39, flags: 3
[2020/06/18 21:44:54:8018] I:
[2020/06/18 21:44:54:8021] I: 0000: 00 00 00 00 00 00 00 02 00 00 02 DA FE C9 26 92 ..............&.
[2020/06/18 21:44:54:8022] I: 0010: 7B 22 73 74 61 74 65 22 3A 22 49 4E 49 54 49 41 {"state":"INITIA
[2020/06/18 21:44:54:8023] I: 0020: 4C 49 5A 45 44 22 7D LIZED"}
[2020/06/18 21:44:54:8023] I:
[2020/06/18 21:44:54:8029] D: direct_smd_cb: class: 0x2, ts: 3139600724243
[2020/06/18 21:44:54:8029] D:
[2020/06/18 21:44:54:8030] D: 0000: 7B 22 73 74 61 74 65 22 3A 22 49 46 41 43 45 5F {"state":"IFACE_
[2020/06/18 21:44:54:8031] D: 0010: 43 4F 4C 44 50 4C 55 47 22 7D COLDPLUG"}
[2020/06/18 21:44:54:8032] D:
...
[2020/06/18 21:44:54:8112] D: direct_smd_cb: class: 0x4, ts: 3139600732952
[2020/06/18 21:44:54:8112] D:
[2020/06/18 21:44:54:8114] D: 0000: 7B 22 73 6F 6D 74 68 69 6E 67 22 3A 22 6E 6F 74 {"somthing":"not
[2020/06/18 21:44:54:8115] D: 0010: 73 65 65 6E 62 79 73 73 72 78 22 7D seenbyssrx"}
[2020/06/18 21:44:54:8115] D:
[2020/06/18 21:44:57:5823] I: 11 12 1
[2020/06/18 21:44:57:5838] I: lws_context_destroy: ctx 0x4f61db0
[2020/06/18 21:44:57:5849] D: _lws_state_transition: system: changed 12 'OPERATIONAL' -> 13 'POLICY_INVALID'
[2020/06/18 21:44:57:5851] D: _realloc: size 84: lws_smd_msg_alloc
[2020/06/18 21:44:57:5853] I: lws_cancel_service
[2020/06/18 21:44:57:5871] I: lws_destroy_event_pipe
[2020/06/18 21:44:57:5906] I: lws_pt_destroy: pt destroyed
[2020/06/18 21:44:57:5913] I: lws_context_destroy2: ctx 0x4f61db0
[2020/06/18 21:44:57:5936] D: lwsac_free: head (nil)
[2020/06/18 21:44:57:5947] D: 0x455970: post vh listl
[2020/06/18 21:44:57:5950] D: 0x455970: post pdl
[2020/06/18 21:44:57:5961] D: 0x455970: baggage
[2020/06/18 21:44:57:5968] D: 0x455970: post dc2
[2020/06/18 21:44:57:6010] D: lws_context_destroy3: ctx 0x4f61db0 freed
[2020/06/18 21:44:57:6014] U: Completed: OK
```

View file

@ -0,0 +1,319 @@
/*
* lws-minimal-secure-streams-smd
*
* Written in 2010-2020 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
*
* This demonstrates a minimal http client using secure streams to access the
* SMD api.
*/
#include <libwebsockets.h>
#include <string.h>
#include <signal.h>
static int interrupted, bad = 1, count_p1, count_p2, count_tx;
static lws_sorted_usec_list_t sul_timeout;
/*
* If the -proxy app is fulfilling our connection, then we don't need to have
* the policy in the client.
*
* When we build with LWS_SS_USE_SSPC, the apis hook up to a proxy process over
* a Unix Domain Socket. To test that, you need to separately run the
* ./lws-minimal-secure-streams-proxy test app on the same machine.
*/
#if !defined(LWS_SS_USE_SSPC)
static const char * const default_ss_policy =
"{"
"\"schema-version\":1,"
"\"s\": ["
"{"
/*
* "captive_portal_detect" describes
* what to do in order to check if the path to
* the Internet is being interrupted by a
* captive portal. If there's a larger policy
* fetched from elsewhere, it should also include
* this since it needs to be done at least after
* every DHCP acquisition
*/
"\"captive_portal_detect\": {"
"\"endpoint\": \"connectivitycheck.android.com\","
"\"http_url\": \"generate_204\","
"\"port\": 80,"
"\"protocol\": \"h1\","
"\"http_method\": \"GET\","
"\"opportunistic\": true,"
"\"http_expect\": 204,"
"\"http_fail_redirect\": true"
"}"
"}"
"]"
"}"
;
#endif
typedef struct myss {
struct lws_ss_handle *ss;
void *opaque_data;
/* ... application specific state ... */
lws_sorted_usec_list_t sul;
char alternate;
} myss_t;
/* secure streams payload interface */
static int
myss_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
{
// myss_t *m = (myss_t *)userobj;
lwsl_notice("%s: len %d, flags: %d\n", __func__, (int)len, flags);
lwsl_hexdump_notice(buf, len);
count_p1++;
return 0;
}
static void
sul_tx_periodic_cb(lws_sorted_usec_list_t *sul)
{
myss_t *m = lws_container_of(sul, myss_t, sul);
lwsl_notice("%s: requesting TX\n", __func__);
lws_ss_request_tx(m->ss);
}
static int
myss_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len,
int *flags)
{
myss_t *m = (myss_t *)userobj;
lwsl_notice("%s: sending SS smd\n", __func__);
/*
* The SS RX isn't going to see INTERACTION messages, because its class
* filter doesn't accept INTERACTION class messages. The direct
* participant we also set up for the test will see them though.
*
* Let's alternate between sending NETWORK class smd messages and
* INTERACTION so we can test both rx paths
*/
m->alternate++;
lws_ser_wu64be(buf, (m->alternate & 1) ? LWSSMDCL_NETWORK : LWSSMDCL_INTERACTION);
lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */
if (m->alternate == 4) {
/*
* after a few, let's request a CPD check
*/
*len = LWS_SMD_SS_RX_HEADER_LEN +
lws_snprintf((char *)buf + LWS_SMD_SS_RX_HEADER_LEN, *len,
"{\"trigger\": \"cpdcheck\", \"src\":\"SS-test\"}");
} else
*len = LWS_SMD_SS_RX_HEADER_LEN +
lws_snprintf((char *)buf + LWS_SMD_SS_RX_HEADER_LEN, *len,
(m->alternate & 1) ? "{\"class\":\"NETWORK\"}" :
"{\"class\":\"INTERACTION\"}");
*flags = LWSSS_FLAG_SOM | LWSSS_FLAG_EOM;
count_tx++;
lws_sul_schedule(lws_ss_get_context(m->ss), 0, &m->sul,
sul_tx_periodic_cb, 250 * LWS_US_PER_MS);
return 0;
}
static int
myss_state(void *userobj, void *h_src, lws_ss_constate_t state,
lws_ss_tx_ordinal_t ack)
{
myss_t *m = (myss_t *)userobj;
if (state == LWSSSCS_DESTROYING) {
lws_sul_cancel(&m->sul);
return 0;
}
if (state == LWSSSCS_CONNECTED) {
lwsl_notice("%s: CONNECTED\n", __func__);
lws_sul_schedule(lws_ss_get_context(m->ss), 0, &m->sul,
sul_tx_periodic_cb, 1);
return 0;
}
return 0;
}
static const lws_ss_info_t ssi_lws_smd = {
.handle_offset = offsetof(myss_t, ss),
.opaque_user_data_offset = offsetof(myss_t, opaque_data),
.rx = myss_rx,
.tx = myss_tx,
.state = myss_state,
.user_alloc = sizeof(myss_t),
.streamtype = LWS_SMD_STREAMTYPENAME,
.manual_initial_tx_credit = LWSSMDCL_SYSTEM_STATE |
LWSSMDCL_NETWORK,
};
/* for comparison, this is a non-SS lws_smd participant */
static int
direct_smd_cb(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp,
void *buf, size_t len)
{
struct lws_context **pctx = (struct lws_context **)opaque;
lwsl_notice("%s: class: 0x%x, ts: %llu\n", __func__, _class,
(unsigned long long)timestamp);
lwsl_hexdump_notice(buf, len);
count_p2++;
if (_class != LWSSMDCL_SYSTEM_STATE)
return 0;
if (!lws_json_simple_strcmp(buf, len, "\"state\":", "OPERATIONAL")) {
#if !defined(LWS_SS_USE_SSPC)
/*
* Let's trigger a CPD check, just as a test. SS can't see it
* anyway since it doesn't listen for NETWORK but the direct /
* local participant will see it and the result
*
* This process doesn't run the smd / captive portal action
* when it's a client of the SS proxy. SMD has to be passed
* via the SS _lws_smd proxied connection in that case.
*/
(void)lws_smd_msg_printf(*pctx, LWSSMDCL_NETWORK,
"{\"trigger\": \"cpdcheck\", \"src\":\"direct-test\"}");
#endif
/*
* Create the SS link to lws_smd... notice in ssi_lws_smd
* above, we tell this link to use a class filter that excludes
* NETWORK messages.
*/
if (lws_ss_create(*pctx, 0, &ssi_lws_smd, NULL, NULL, NULL, NULL)) {
lwsl_err("%s: failed to create secure stream\n",
__func__);
return -1;
}
}
return 0;
}
static void
sul_timeout_cb(lws_sorted_usec_list_t *sul)
{
interrupted = 1;
}
static void
sigint_handler(int sig)
{
interrupted = 1;
}
int main(int argc, const char **argv)
{
struct lws_context_creation_info info;
struct lws_context *context;
signal(SIGINT, sigint_handler);
memset(&info, 0, sizeof info);
lws_cmdline_option_handle_builtin(argc, argv, &info);
lwsl_user("LWS Secure Streams SMD test client [-d<verb>]\n");
info.fd_limit_per_thread = 1 + 6 + 1;
info.port = CONTEXT_PORT_NO_LISTEN;
#if !defined(LWS_SS_USE_SSPC)
info.pss_policies_json = default_ss_policy;
#else
info.protocols = lws_sspc_protocols;
#endif
info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
info.early_smd_cb = direct_smd_cb;
info.early_smd_class_filter = 0xffffffff;
info.early_smd_opaque = &context;
/* create the context */
context = lws_create_context(&info);
if (!context) {
lwsl_err("lws init failed\n");
return 1;
}
#if defined(LWS_SS_USE_SSPC)
if (!lws_create_vhost(context, &info)) {
lwsl_err("%s: failed to create default vhost\n", __func__);
goto bail;
}
#endif
/* set up the test timeout */
lws_sul_schedule(context, 0, &sul_timeout, sul_timeout_cb,
4 * LWS_US_PER_SEC);
/* the event loop */
while (lws_service(context, 0) >= 0 && !interrupted)
;
/* compare what happened with what we expect */
#if defined(LWS_SS_USE_SSPC)
/* if SSPC
*
* - the SS _lws_smd link does not enable INTERACTION class, so doesn't
* see these messages (count_p1 is half count_tx)
*
* - the direct smd participant sees local state, but it doesn't send
* any local CPD request, since as a client it doesn't do CPD
* directly (count_p2 -= 1 compared to non-SSPC)
*
* - one CPD trigger is sent on the proxied SS link (countp1 += 1)
*/
if (count_p1 >= 6 && count_p2 >= 11 && count_tx >= 12)
#else
/* if not SSPC, then we can see direct smd activity */
if (count_p1 >= 2 && count_p2 >= 15 && count_tx >= 5)
#endif
bad = 0;
lwsl_notice("%d %d %d\n", count_p1, count_p2, count_tx);
#if defined(LWS_SS_USE_SSPC)
bail:
#endif
lws_context_destroy(context);
lwsl_user("Completed: %s\n", bad ? "failed" : "OK");
return bad;
}