1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

smd-add-ss-rx-forwarder-helper

Add a helper to simplify passing smd ss rx traffic into the local
smd participants, excluding the rx that received it externally to
avoid looping.

Make the smd readme clearer with three diagrams and more explanation
of how the ss proxying works.
This commit is contained in:
Andy Green 2021-01-04 09:06:32 +00:00
parent 962993fd24
commit 8ff35b819a
10 changed files with 185 additions and 27 deletions

BIN
doc-assets/smd-message.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

BIN
doc-assets/smd-proxy.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 186 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 113 KiB

View file

@ -59,6 +59,7 @@
#define lws_ss_cancel_timeout lws_sspc_cancel_timeout
#define lws_ss_to_user_object lws_sspc_to_user_object
#define lws_ss_change_handlers lws_sspc_change_handlers
#define lws_smd_ss_rx_forward lws_smd_sspc_rx_forward
#define lws_ss_tag lws_sspc_tag
#endif

View file

@ -154,6 +154,29 @@ lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len,
lws_smd_class_t _class, const char *format, ...)
LWS_FORMAT(5);
/**
* lws_smd_ss_rx_forward() - helper to forward smd messages that came in by SS
*
* \param ss_user: ss user pointer, as delivered to rx callback
* \param buf: the ss rx buffer
* \param len: the length of the ss rx buffer
*
* Proxied Secure Streams with the streamtype LWS_SMD_STREAMTYPENAME receive
* serialized SMD messages from the proxy, this helper allows them to be
* translated into deserialized SMD messages and forwarded to registered SMD
* participants in the local context in one step.
*
* Just pass through what arrived in the LWS_SMD_STREAMTYPENAME rx() callback
* to this api.
*
* Returns 0 if OK else nonzero if unable to queue the SMD message.
*/
LWS_VISIBLE LWS_EXTERN int
lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len);
LWS_VISIBLE LWS_EXTERN int
lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len);
typedef int (*lws_smd_notification_cb_t)(void *opaque, lws_smd_class_t _class,
lws_usec_t timestamp, void *buf,
size_t len);

View file

@ -471,7 +471,7 @@ lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
else
h->txc.peer_tx_cr_est = ssi->manual_initial_tx_credit;
if (!strcmp(ssi->streamtype, "_lws_smd"))
if (!strcmp(ssi->streamtype, LWS_SMD_STREAMTYPENAME))
h->ignore_txc = 1;
lws_dll2_add_head(&h->client_list, &context->pt[tsi].ss_client_owner);

View file

@ -14,17 +14,31 @@ Message payloads are short, less than 384 bytes, below system limits for atomic
pipe or UDS datagrams and consistent with heap usage on smaller systems, but
large enough to carry JSON usefully. Messages are typically low duty cycle.
![SMD message](/doc-assets/smd-message.png)
Messages may be sent by any registered participant, they are allocated on heap
in a linked-list, and delivered no sooner than next time around the event loop.
This retains the ability to handle multiple event queuing in one event loop trip
while guaranteeing message handling is nonrecursive. Messages are passed to all
other registered participants before being destroyed.
in a linked-list, and delivered to all other registered participants for that
message class no sooner than next time around the event loop. This retains the
ability to handle multiple event queuing in one event loop trip while
guaranteeing message handling is nonrecursive and so with modest stack usage.
Messages are passed to all other registered participants before being destroyed.
Messages are delivered to all particpants on the same lws_context by default.
![SMD message](/doc-assets/smd-single-process.png)
`lws_smd` apis allow publication and subscription of message objects between
participants that are in a single process and are informed by callback from lws
service thread context, and, via Secure Streams proxying as the IPC method, also
between those in different processes. Registering as a participant and sending
messages are threadsafe APIs.
service thread context.
SMD messages can also broadcast between particpants in different lws_contexts in
different processes, using existing Secure Streams proxying. In this way
different application processes can intercommunicate and all observe any system
smd messages they are interested in.
![SMD message](/doc-assets/smd-proxy.png)
Registering as a participant and sending messages are threadsafe APIs.
## Message Class

View file

@ -39,9 +39,13 @@
#define LWS_SMD_SS_RX_HEADER_LEN_EFF (0)
#endif
struct lws_smd_peer;
typedef struct lws_smd_msg {
lws_dll2_t list;
struct lws_smd_peer *exc;
lws_usec_t timestamp;
lws_smd_class_t _class;
@ -112,3 +116,6 @@ lws_smd_msg_distribute(struct lws_context *ctx);
int
_lws_smd_destroy(struct lws_context *ctx);
int
_lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc);

View file

@ -33,8 +33,8 @@ lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len)
/* only allow it if someone wants to consume this class of event */
if (!(ctx->smd._class_filter & _class)) {
lwsl_info("%s: rejecting class 0x%x as no participant wants it\n", __func__,
(unsigned int)_class);
lwsl_info("%s: rejecting class 0x%x as no participant wants it\n",
__func__, (unsigned int)_class);
return NULL;
}
@ -74,7 +74,8 @@ lws_smd_msg_free(void **ppay)
*/
static int
_lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg)
_lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg,
struct lws_smd_peer *exc)
{
struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd);
int interested = 0;
@ -90,7 +91,8 @@ _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg)
* refcount of)
*/
if (pr->timestamp_joined <= msg->timestamp &&
if (pr != exc &&
pr->timestamp_joined <= msg->timestamp &&
(!pr->timestamp_left || /* if zombie, only contribute to
* refcount if msg from before we
* left */
@ -126,7 +128,7 @@ _lws_smd_class_mask_union(lws_smd_t *smd)
}
int
lws_smd_msg_send(struct lws_context *ctx, void *pay)
_lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
{
lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) -
LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
@ -141,7 +143,17 @@ lws_smd_msg_send(struct lws_context *ctx, void *pay)
if (!ctx->smd.delivering)
lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(&ctx->smd, msg);
msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(&ctx->smd, msg, exc);
if (!msg->refcount) {
/* possible, condsidering exc and no other participants */
lws_free(msg);
if (!ctx->smd.delivering)
lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
return 0;
}
msg->exc = exc;
lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */
lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages);
@ -155,7 +167,8 @@ lws_smd_msg_send(struct lws_context *ctx, void *pay)
lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
if (!pr->tail && (pr->_class_filter & msg->_class))
if (pr != exc &&
!pr->tail && (pr->_class_filter & msg->_class))
pr->tail = msg;
} lws_end_foreach_dll(p);
@ -169,6 +182,12 @@ lws_smd_msg_send(struct lws_context *ctx, void *pay)
return 0;
}
int
lws_smd_msg_send(struct lws_context *ctx, void *pay)
{
return _lws_smd_msg_send(ctx, pay, NULL);
}
int
lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
const char *format, ...)
@ -245,6 +264,93 @@ lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len,
return 0;
}
/*
* This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can
* call through to with the payload it received from the proxy. It will then
* forward the recieved SMD message to all local (same-context) participants
* that are interested in that class (except ones with callback skip_cb, so
* we don't loop).
*/
static int
_lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag,
struct lws_smd_peer *pr, const uint8_t *buf, size_t len)
{
lws_smd_class_t _class;
lws_smd_msg_t *msg;
void *p;
if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF)
return 1;
if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF)
return 1;
_class = (lws_smd_class_t)lws_ser_ru64be(buf);
/* only locally forward messages that we care about in this process */
if (!(ctx->smd._class_filter & _class))
/*
* There's nobody interested in messages of this class atm.
* Don't bother generating it, and act like all is well.
*/
return 0;
p = lws_smd_msg_alloc(ctx, _class, len);
if (!p)
return 1;
msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
sizeof(*msg));
msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF);
/* adopt the original source timestamp, not time we forwarded it */
msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8);
/* copy the message payload in */
memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length);
/*
* locks taken and released in here
*/
if (_lws_smd_msg_send(ctx, p, pr)) {
/* we couldn't send it after all that... */
lws_smd_msg_free(&p);
return 1;
}
lwsl_notice("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__,
tag, _class, msg->length,
(unsigned long long)msg->timestamp);
return 0;
}
int
lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
{
struct lws_ss_handle *h = (struct lws_ss_handle *)
(((char *)ss_user) - sizeof(*h));
struct lws_context *ctx = lws_ss_get_context(h);
return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len);
}
#if defined(LWS_WITH_SECURE_STREAMS_PROXY_API)
int
lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
{
struct lws_sspc_handle *h = (struct lws_sspc_handle *)
(((char *)ss_user) - sizeof(*h));
struct lws_context *ctx = lws_sspc_get_context(h);
return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len);
}
#endif
#endif
static void
@ -274,17 +380,20 @@ _lws_smd_peer_zombify(lws_smd_peer_t *pr)
}
static lws_smd_msg_t *
_lws_smd_msg_next_matching_filter(lws_dll2_t *tail, lws_smd_class_t filter)
_lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr)
{
lws_smd_msg_t *msg;
lws_dll2_t *tail = &pr->tail->list;
lws_smd_class_t filter = pr->_class_filter;
do {
tail = tail->next;
if (!tail)
return NULL;
msg = lws_container_of(tail, lws_smd_msg_t, list);
if (msg->_class & filter)
if (msg->exc != pr && (msg->_class & filter))
return msg;
} while (1);
@ -375,8 +484,7 @@ _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
* If there is one, move forward to the next queued
* message that meets our filters
*/
pr->tail = _lws_smd_msg_next_matching_filter(
&pr->tail->list, pr->_class_filter);
pr->tail = _lws_smd_msg_next_matching_filter(pr);
lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */
if (!--msg->refcount) {

View file

@ -73,10 +73,14 @@ typedef struct myss {
static lws_ss_state_return_t
myss_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
{
// myss_t *m = (myss_t *)userobj;
/*
* Call the helper to translate into a real smd message and forward to
* this context / process smd participants... except us, since we
* definitely already received it
*/
lwsl_notice("%s: len %d, flags: %d\n", __func__, (int)len, flags);
lwsl_hexdump_notice(buf, len);
if (lws_smd_ss_rx_forward(userobj, buf, len))
lwsl_warn("%s: forward failed\n", __func__);
count_p1++;
@ -125,8 +129,9 @@ myss_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len,
(m->alternate & 1) ? LWSSMDCL_NETWORK :
LWSSMDCL_INTERACTION,
(m->alternate & 1) ?
"{\"class\":\"NETWORK\"}" :
"{\"class\":\"INTERACTION\"}"))
"{\"class\":\"NETWORK\",\"x\":%d}" :
"{\"class\":\"INTERACTION\",\"x\":%d}",
count_tx))
return LWSSSSRET_TX_DONT_SEND;
*flags = LWSSS_FLAG_SOM | LWSSS_FLAG_EOM;
@ -183,9 +188,9 @@ direct_smd_cb(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp,
{
struct lws_context **pctx = (struct lws_context **)opaque;
lwsl_notice("%s: class: 0x%x, ts: %llu\n", __func__, _class,
(unsigned long long)timestamp);
lwsl_hexdump_notice(buf, len);
// lwsl_notice("%s: class: 0x%x, ts: %llu\n", __func__, _class,
// (unsigned long long)timestamp);
// lwsl_hexdump_notice(buf, len);
count_p2++;