diff --git a/doc-assets/smd-message.png b/doc-assets/smd-message.png new file mode 100644 index 000000000..cd528a754 Binary files /dev/null and b/doc-assets/smd-message.png differ diff --git a/doc-assets/smd-proxy.png b/doc-assets/smd-proxy.png new file mode 100644 index 000000000..ae522cae4 Binary files /dev/null and b/doc-assets/smd-proxy.png differ diff --git a/doc-assets/smd-single-process.png b/doc-assets/smd-single-process.png new file mode 100644 index 000000000..4ae9e9c00 Binary files /dev/null and b/doc-assets/smd-single-process.png differ diff --git a/include/libwebsockets/lws-secure-streams-client.h b/include/libwebsockets/lws-secure-streams-client.h index d0dd95929..fa12014f3 100644 --- a/include/libwebsockets/lws-secure-streams-client.h +++ b/include/libwebsockets/lws-secure-streams-client.h @@ -59,6 +59,7 @@ #define lws_ss_cancel_timeout lws_sspc_cancel_timeout #define lws_ss_to_user_object lws_sspc_to_user_object #define lws_ss_change_handlers lws_sspc_change_handlers +#define lws_smd_ss_rx_forward lws_smd_sspc_rx_forward #define lws_ss_tag lws_sspc_tag #endif diff --git a/include/libwebsockets/lws-smd.h b/include/libwebsockets/lws-smd.h index 8a54fe8b1..f5188b5af 100644 --- a/include/libwebsockets/lws-smd.h +++ b/include/libwebsockets/lws-smd.h @@ -154,6 +154,29 @@ lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len, lws_smd_class_t _class, const char *format, ...) LWS_FORMAT(5); +/** + * lws_smd_ss_rx_forward() - helper to forward smd messages that came in by SS + * + * \param ss_user: ss user pointer, as delivered to rx callback + * \param buf: the ss rx buffer + * \param len: the length of the ss rx buffer + * + * Proxied Secure Streams with the streamtype LWS_SMD_STREAMTYPENAME receive + * serialized SMD messages from the proxy, this helper allows them to be + * translated into deserialized SMD messages and forwarded to registered SMD + * participants in the local context in one step. + * + * Just pass through what arrived in the LWS_SMD_STREAMTYPENAME rx() callback + * to this api. + * + * Returns 0 if OK else nonzero if unable to queue the SMD message. + */ +LWS_VISIBLE LWS_EXTERN int +lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len); + +LWS_VISIBLE LWS_EXTERN int +lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len); + typedef int (*lws_smd_notification_cb_t)(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp, void *buf, size_t len); diff --git a/lib/secure-streams/secure-streams-client.c b/lib/secure-streams/secure-streams-client.c index a72a57392..f5655b10c 100644 --- a/lib/secure-streams/secure-streams-client.c +++ b/lib/secure-streams/secure-streams-client.c @@ -471,7 +471,7 @@ lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi, else h->txc.peer_tx_cr_est = ssi->manual_initial_tx_credit; - if (!strcmp(ssi->streamtype, "_lws_smd")) + if (!strcmp(ssi->streamtype, LWS_SMD_STREAMTYPENAME)) h->ignore_txc = 1; lws_dll2_add_head(&h->client_list, &context->pt[tsi].ss_client_owner); diff --git a/lib/system/smd/README.md b/lib/system/smd/README.md index 78280d9d6..570e21c70 100644 --- a/lib/system/smd/README.md +++ b/lib/system/smd/README.md @@ -14,17 +14,31 @@ Message payloads are short, less than 384 bytes, below system limits for atomic pipe or UDS datagrams and consistent with heap usage on smaller systems, but large enough to carry JSON usefully. Messages are typically low duty cycle. +![SMD message](/doc-assets/smd-message.png) + Messages may be sent by any registered participant, they are allocated on heap -in a linked-list, and delivered no sooner than next time around the event loop. -This retains the ability to handle multiple event queuing in one event loop trip -while guaranteeing message handling is nonrecursive. Messages are passed to all -other registered participants before being destroyed. +in a linked-list, and delivered to all other registered participants for that +message class no sooner than next time around the event loop. This retains the +ability to handle multiple event queuing in one event loop trip while +guaranteeing message handling is nonrecursive and so with modest stack usage. +Messages are passed to all other registered participants before being destroyed. + +Messages are delivered to all particpants on the same lws_context by default. + +![SMD message](/doc-assets/smd-single-process.png) `lws_smd` apis allow publication and subscription of message objects between participants that are in a single process and are informed by callback from lws -service thread context, and, via Secure Streams proxying as the IPC method, also -between those in different processes. Registering as a participant and sending -messages are threadsafe APIs. +service thread context. + +SMD messages can also broadcast between particpants in different lws_contexts in +different processes, using existing Secure Streams proxying. In this way +different application processes can intercommunicate and all observe any system +smd messages they are interested in. + +![SMD message](/doc-assets/smd-proxy.png) + +Registering as a participant and sending messages are threadsafe APIs. ## Message Class diff --git a/lib/system/smd/private-lib-system-smd.h b/lib/system/smd/private-lib-system-smd.h index fdd71d61c..c7de32585 100644 --- a/lib/system/smd/private-lib-system-smd.h +++ b/lib/system/smd/private-lib-system-smd.h @@ -39,9 +39,13 @@ #define LWS_SMD_SS_RX_HEADER_LEN_EFF (0) #endif +struct lws_smd_peer; + typedef struct lws_smd_msg { lws_dll2_t list; + struct lws_smd_peer *exc; + lws_usec_t timestamp; lws_smd_class_t _class; @@ -112,3 +116,6 @@ lws_smd_msg_distribute(struct lws_context *ctx); int _lws_smd_destroy(struct lws_context *ctx); + +int +_lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc); diff --git a/lib/system/smd/smd.c b/lib/system/smd/smd.c index e328619ab..ed754bab4 100644 --- a/lib/system/smd/smd.c +++ b/lib/system/smd/smd.c @@ -33,8 +33,8 @@ lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len) /* only allow it if someone wants to consume this class of event */ if (!(ctx->smd._class_filter & _class)) { - lwsl_info("%s: rejecting class 0x%x as no participant wants it\n", __func__, - (unsigned int)_class); + lwsl_info("%s: rejecting class 0x%x as no participant wants it\n", + __func__, (unsigned int)_class); return NULL; } @@ -74,7 +74,8 @@ lws_smd_msg_free(void **ppay) */ static int -_lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg) +_lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg, + struct lws_smd_peer *exc) { struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd); int interested = 0; @@ -90,7 +91,8 @@ _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg) * refcount of) */ - if (pr->timestamp_joined <= msg->timestamp && + if (pr != exc && + pr->timestamp_joined <= msg->timestamp && (!pr->timestamp_left || /* if zombie, only contribute to * refcount if msg from before we * left */ @@ -126,7 +128,7 @@ _lws_smd_class_mask_union(lws_smd_t *smd) } int -lws_smd_msg_send(struct lws_context *ctx, void *pay) +_lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc) { lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) - LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); @@ -141,7 +143,17 @@ lws_smd_msg_send(struct lws_context *ctx, void *pay) if (!ctx->smd.delivering) lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */ - msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(&ctx->smd, msg); + msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(&ctx->smd, msg, exc); + if (!msg->refcount) { + /* possible, condsidering exc and no other participants */ + lws_free(msg); + if (!ctx->smd.delivering) + lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ + + return 0; + } + + msg->exc = exc; lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */ lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages); @@ -155,7 +167,8 @@ lws_smd_msg_send(struct lws_context *ctx, void *pay) lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); - if (!pr->tail && (pr->_class_filter & msg->_class)) + if (pr != exc && + !pr->tail && (pr->_class_filter & msg->_class)) pr->tail = msg; } lws_end_foreach_dll(p); @@ -169,6 +182,12 @@ lws_smd_msg_send(struct lws_context *ctx, void *pay) return 0; } +int +lws_smd_msg_send(struct lws_context *ctx, void *pay) +{ + return _lws_smd_msg_send(ctx, pay, NULL); +} + int lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class, const char *format, ...) @@ -245,6 +264,93 @@ lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len, return 0; } + +/* + * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can + * call through to with the payload it received from the proxy. It will then + * forward the recieved SMD message to all local (same-context) participants + * that are interested in that class (except ones with callback skip_cb, so + * we don't loop). + */ + +static int +_lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag, + struct lws_smd_peer *pr, const uint8_t *buf, size_t len) +{ + lws_smd_class_t _class; + lws_smd_msg_t *msg; + void *p; + + if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF) + return 1; + + if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF) + return 1; + + _class = (lws_smd_class_t)lws_ser_ru64be(buf); + + /* only locally forward messages that we care about in this process */ + + if (!(ctx->smd._class_filter & _class)) + /* + * There's nobody interested in messages of this class atm. + * Don't bother generating it, and act like all is well. + */ + return 0; + + p = lws_smd_msg_alloc(ctx, _class, len); + if (!p) + return 1; + + msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF - + sizeof(*msg)); + msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF); + /* adopt the original source timestamp, not time we forwarded it */ + msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8); + + /* copy the message payload in */ + memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length); + + /* + * locks taken and released in here + */ + + if (_lws_smd_msg_send(ctx, p, pr)) { + /* we couldn't send it after all that... */ + lws_smd_msg_free(&p); + + return 1; + } + + lwsl_notice("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__, + tag, _class, msg->length, + (unsigned long long)msg->timestamp); + + return 0; +} + +int +lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len) +{ + struct lws_ss_handle *h = (struct lws_ss_handle *) + (((char *)ss_user) - sizeof(*h)); + struct lws_context *ctx = lws_ss_get_context(h); + + return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len); +} + +#if defined(LWS_WITH_SECURE_STREAMS_PROXY_API) +int +lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len) +{ + struct lws_sspc_handle *h = (struct lws_sspc_handle *) + (((char *)ss_user) - sizeof(*h)); + struct lws_context *ctx = lws_sspc_get_context(h); + + return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len); +} +#endif + #endif static void @@ -274,17 +380,20 @@ _lws_smd_peer_zombify(lws_smd_peer_t *pr) } static lws_smd_msg_t * -_lws_smd_msg_next_matching_filter(lws_dll2_t *tail, lws_smd_class_t filter) +_lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr) { lws_smd_msg_t *msg; + lws_dll2_t *tail = &pr->tail->list; + lws_smd_class_t filter = pr->_class_filter; + do { tail = tail->next; if (!tail) return NULL; msg = lws_container_of(tail, lws_smd_msg_t, list); - if (msg->_class & filter) + if (msg->exc != pr && (msg->_class & filter)) return msg; } while (1); @@ -375,8 +484,7 @@ _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr) * If there is one, move forward to the next queued * message that meets our filters */ - pr->tail = _lws_smd_msg_next_matching_filter( - &pr->tail->list, pr->_class_filter); + pr->tail = _lws_smd_msg_next_matching_filter(pr); lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */ if (!--msg->refcount) { diff --git a/minimal-examples/secure-streams/minimal-secure-streams-smd/minimal-secure-streams-smd.c b/minimal-examples/secure-streams/minimal-secure-streams-smd/minimal-secure-streams-smd.c index 88a6ffaef..368738b2d 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams-smd/minimal-secure-streams-smd.c +++ b/minimal-examples/secure-streams/minimal-secure-streams-smd/minimal-secure-streams-smd.c @@ -73,10 +73,14 @@ typedef struct myss { static lws_ss_state_return_t myss_rx(void *userobj, const uint8_t *buf, size_t len, int flags) { -// myss_t *m = (myss_t *)userobj; + /* + * Call the helper to translate into a real smd message and forward to + * this context / process smd participants... except us, since we + * definitely already received it + */ - lwsl_notice("%s: len %d, flags: %d\n", __func__, (int)len, flags); - lwsl_hexdump_notice(buf, len); + if (lws_smd_ss_rx_forward(userobj, buf, len)) + lwsl_warn("%s: forward failed\n", __func__); count_p1++; @@ -125,8 +129,9 @@ myss_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len, (m->alternate & 1) ? LWSSMDCL_NETWORK : LWSSMDCL_INTERACTION, (m->alternate & 1) ? - "{\"class\":\"NETWORK\"}" : - "{\"class\":\"INTERACTION\"}")) + "{\"class\":\"NETWORK\",\"x\":%d}" : + "{\"class\":\"INTERACTION\",\"x\":%d}", + count_tx)) return LWSSSSRET_TX_DONT_SEND; *flags = LWSSS_FLAG_SOM | LWSSS_FLAG_EOM; @@ -183,9 +188,9 @@ direct_smd_cb(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp, { struct lws_context **pctx = (struct lws_context **)opaque; - lwsl_notice("%s: class: 0x%x, ts: %llu\n", __func__, _class, - (unsigned long long)timestamp); - lwsl_hexdump_notice(buf, len); +// lwsl_notice("%s: class: 0x%x, ts: %llu\n", __func__, _class, +// (unsigned long long)timestamp); +// lwsl_hexdump_notice(buf, len); count_p2++;