diff --git a/include/libwebsockets/lws-context-vhost.h b/include/libwebsockets/lws-context-vhost.h index cc7aad342..dbde666dc 100644 --- a/include/libwebsockets/lws-context-vhost.h +++ b/include/libwebsockets/lws-context-vhost.h @@ -820,6 +820,14 @@ struct lws_context_creation_info { */ void *early_smd_opaque; lws_smd_class_t early_smd_class_filter; + lws_usec_t smd_ttl_us; + /**< CONTEXT: SMD messages older than this many us are removed from the + * queue and destroyed even if not fully delivered yet. If zero, + * defaults to 2 seconds (5 second for FREERTOS). + */ + uint16_t smd_queue_depth; + /**< CONTEXT: Maximum queue depth, If zero defaults to 40 + * (20 for FREERTOS) */ #endif /* Add new things just above here ---^ diff --git a/lib/core/context.c b/lib/core/context.c index 3517b91c2..3576fc3c1 100644 --- a/lib/core/context.c +++ b/lib/core/context.c @@ -613,6 +613,22 @@ lws_create_context(const struct lws_context_creation_info *info) context->system_ops = info->system_ops; context->pt_serv_buf_size = (unsigned int)s1; +#if defined(LWS_WITH_SYS_SMD) + context->smd_ttl_us = info->smd_ttl_us ? info->smd_ttl_us : +#if defined(LWS_PLAT_FREERTOS) + 5000000; +#else + 2000000; +#endif + context->smd_queue_depth = info->smd_queue_depth ? + info->smd_queue_depth : +#if defined(LWS_PLAT_FREERTOS) + 20; +#else + 40; +#endif +#endif + #if defined(LWS_WITH_UDP) context->udp_loss_sim_tx_pc = info->udp_loss_sim_tx_pc; context->udp_loss_sim_rx_pc = info->udp_loss_sim_rx_pc; diff --git a/lib/core/private-lib-core.h b/lib/core/private-lib-core.h index 2ed83d536..8bc0c9ffc 100644 --- a/lib/core/private-lib-core.h +++ b/lib/core/private-lib-core.h @@ -570,7 +570,9 @@ struct lws_context { #endif lws_usec_t time_up; /* monotonic */ - +#if defined(LWS_WITH_SYS_SMD) + lws_usec_t smd_ttl_us; +#endif uint64_t options; time_t last_ws_ping_pong_check_s; @@ -608,6 +610,11 @@ struct lws_context { unsigned short ip_limit_ah; unsigned short ip_limit_wsi; #endif + +#if defined(LWS_WITH_SYS_SMD) + uint16_t smd_queue_depth; +#endif + unsigned int deprecated:1; unsigned int inside_context_destroy:1; unsigned int being_destroyed:1; diff --git a/lib/system/smd/private-lib-system-smd.h b/lib/system/smd/private-lib-system-smd.h index c7de32585..e79165740 100644 --- a/lib/system/smd/private-lib-system-smd.h +++ b/lib/system/smd/private-lib-system-smd.h @@ -22,16 +22,6 @@ * IN THE SOFTWARE. */ -/* - * Number of seconds registered peers must remain as zombies to handle in-flight - * older messages correctly - */ -#if !defined(LWS_SMD_INFLIGHT_GRACE_SECS) -#define LWS_SMD_INFLIGHT_GRACE_SECS (2) -#endif -#if !defined(LWS_SMD_MAX_QUEUE_DEPTH) -#define LWS_SMD_MAX_QUEUE_DEPTH (20) -#endif #if defined(LWS_WITH_SECURE_STREAMS) #define LWS_SMD_SS_RX_HEADER_LEN_EFF (LWS_SMD_SS_RX_HEADER_LEN) @@ -55,22 +45,9 @@ typedef struct lws_smd_msg { /* message itself is over-allocated after this */ } lws_smd_msg_t; -/* - * The peer's relationship to the lws instance doing the distribution - */ - -typedef enum { - LSMDT_SAME_PROCESS, /* we call him back ourselves */ - LSMDT_SECURE_STREAMS_LOCAL, /* we call him back ourselves */ - LSMDT_SECURE_STREAMS_PROXIED, /* we ask to write and wait */ -} lws_smd_type_t; - typedef struct lws_smd_peer { lws_dll2_t list; - lws_usec_t timestamp_joined; - lws_usec_t timestamp_left; - #if defined(LWS_WITH_SECURE_STREAMS) lws_ss_handle_t *ss_handle; /* LSMDT_SECURE_STREAMS */ #endif @@ -82,7 +59,6 @@ typedef struct lws_smd_peer { lws_smd_msg_t *tail; lws_smd_class_t _class_filter; - lws_smd_type_t type; } lws_smd_peer_t; /* @@ -101,8 +77,6 @@ typedef struct lws_smd { /* union of peer class filters, suppress creation of msg classes not set */ lws_smd_class_t _class_filter; - lws_sorted_usec_list_t sul_tail_stale; - char delivering; } lws_smd_t; diff --git a/lib/system/smd/smd.c b/lib/system/smd/smd.c index c1f763c8e..6ba05f986 100644 --- a/lib/system/smd/smd.c +++ b/lib/system/smd/smd.c @@ -1,7 +1,7 @@ /* * lws System Message Distribution * - * Copyright (C) 2019 - 2020 Andy Green + * Copyright (C) 2019 - 2021 Andy Green * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to @@ -25,6 +25,16 @@ #include "private-lib-core.h" #include +/* comment me to remove extra debug and sanity checks */ +// #define LWS_SMD_DEBUG + + +#if defined(LWS_SMD_DEBUG) +#define lwsl_smd lwsl_notice +#else +#define lwsl_smd(_s, ...) +#endif + void * lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len) { @@ -69,6 +79,40 @@ lws_smd_msg_free(void **ppay) *ppay = NULL; } +#if defined(LWS_SMD_DEBUG) +static void +lws_smd_dump(lws_smd_t *smd) +{ + int n = 1; + + lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, + smd->owner_messages.head) { + lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); + + lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n", + n++, msg, msg->refcount, + (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000), + msg->length, msg->_class, + (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF); + + } lws_end_foreach_dll_safe(p, p1); + + n = 1; + lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) { + lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); + + lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n", + n++, pr, pr->tail, pr->_class_filter); + } lws_end_foreach_dll(p); +} +#endif + +static int +_lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg) +{ + return !!(msg->_class & pr->_class_filter); +} + /* * Figure out what to set the initial refcount for the message to */ @@ -83,21 +127,7 @@ _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg, lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); - /* - * In order to optimize the tail managment into a refcount, - * we have to account exactly for when peers arrived and - * departed (including deferring the logical peer destruction - * until no message pending he may have contributed to the - * refcount of) - */ - - if (pr != exc && - pr->timestamp_joined <= msg->timestamp && - (!pr->timestamp_left || /* if zombie, only contribute to - * refcount if msg from before we - * left */ - pr->timestamp_left >= msg->timestamp) && - (msg->_class & pr->_class_filter)) + if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg)) /* * This peer wants to consume it */ @@ -117,7 +147,6 @@ _lws_smd_class_mask_union(lws_smd_t *smd) smd->owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); - /* may destroy pr if zombie */ mask |= pr->_class_filter; } lws_end_foreach_dll_safe(p, p1); @@ -127,13 +156,51 @@ _lws_smd_class_mask_union(lws_smd_t *smd) return 0; } +/* Call with message lock held */ + +static void +_lws_smd_msg_destroy(lws_smd_t *smd, lws_smd_msg_t *msg) +{ + /* + * We think we gave the message to everyone and can destroy it. + * Sanity check that no peer holds a pointer to this guy + */ + + lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, + smd->owner_peers.head) { + lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list); + + if (xpr->tail == msg) { + lwsl_err("%s: peer %p has msg %p " + "we are about to destroy as tail\n", + __func__, xpr, msg); +#if !defined(LWS_PLAT_FREERTOS) + assert(0); +#endif + } + + } lws_end_foreach_dll_safe(p, p1); + + /* + * We have fully delivered the message now, it + * can be unlinked and destroyed + */ + lwsl_info("%s: destroy msg %p\n", __func__, msg); + lws_dll2_remove(&msg->list); + lws_free(msg); +} + +/* + * This is wanting to be threadsafe, limiting the apis we can call + */ + int _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc) { lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) - LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); - if (ctx->smd.owner_messages.count >= LWS_SMD_MAX_QUEUE_DEPTH) { + if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) { lwsl_warn("%s: rejecting message on queue depth %d\n", __func__, (int)ctx->smd.owner_messages.count); /* reject the message due to max queue depth reached */ @@ -143,7 +210,8 @@ _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc) if (!ctx->smd.delivering) lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */ - msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(&ctx->smd, msg, exc); + msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested( + &ctx->smd, msg, exc); if (!msg->refcount) { /* possible, condsidering exc and no other participants */ lws_free(msg); @@ -155,9 +223,10 @@ _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc) msg->exc = exc; + /* let's add him on the queue... */ + lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */ lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages); - lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ /* * Any peer with no active tail needs to check our class to see if we @@ -168,11 +237,22 @@ _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc) lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); if (pr != exc && - !pr->tail && (pr->_class_filter & msg->_class)) + !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) { pr->tail = msg; + /* tail message has to actually be of interest to the peer */ + assert(!pr->tail || (pr->tail->_class & pr->_class_filter)); + } } lws_end_foreach_dll(p); +#if defined(LWS_SMD_DEBUG) + lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__, + msg, msg->refcount, ctx->smd.owner_messages.count); + lws_smd_dump(&ctx->smd); +#endif + + lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ + if (!ctx->smd.delivering) lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ @@ -182,12 +262,20 @@ _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc) return 0; } +/* + * This is wanting to be threadsafe, limiting the apis we can call + */ + int lws_smd_msg_send(struct lws_context *ctx, void *pay) { return _lws_smd_msg_send(ctx, pay, NULL); } +/* + * This is wanting to be threadsafe, limiting the apis we can call + */ + int lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class, const char *format, ...) @@ -259,7 +347,7 @@ lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len, *len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n; - lwsl_notice("%s: %s send cl 0x%x, len %u\n", __func__, tag, _class, + lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, _class, (unsigned int)n); return 0; @@ -322,7 +410,7 @@ _lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag, return 1; } - lwsl_notice("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__, + lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__, tag, _class, msg->length, (unsigned long long)msg->timestamp); @@ -353,39 +441,49 @@ lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len) #endif -static void -_lws_smd_peer_finalize_destroy(lws_smd_peer_t *pr) -{ - lws_dll2_remove(&pr->list); - lws_free(pr); -} - /* - * Peers that deregister may need to hang around as zombies, so they account - * for refcounts on messages they already contributed to. Because older - * messages may be in flight over UDS links, we have to stick around and make - * sure all cases have their refcount handled correctly. + * Peers that deregister need to adjust the refcount of messages they would + * have been interested in, but didn't take delivery of yet */ static void -_lws_smd_peer_zombify(lws_smd_peer_t *pr) +_lws_smd_peer_destroy(lws_smd_peer_t *pr) { lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers); - /* update the class mask union to reflect this peer no longer active */ - _lws_smd_class_mask_union(smd); + lws_mutex_lock(smd->lock_messages); /* +++++++++ messages */ - pr->timestamp_left = lws_now_usecs(); + lws_dll2_remove(&pr->list); + + /* + * We take the approach to adjust the refcount of every would-have-been + * delivered message we were interested in + */ + + while (pr->tail) { + + lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next, + lws_smd_msg_t, list); + + if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) { + if (!--pr->tail->refcount) + _lws_smd_msg_destroy(smd, pr->tail); + } + + pr->tail = m1; + } + + lws_free(pr); + + lws_mutex_unlock(smd->lock_messages); /* messages ------- */ } static lws_smd_msg_t * _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr) { - lws_smd_msg_t *msg; - lws_dll2_t *tail = &pr->tail->list; - lws_smd_class_t filter = pr->_class_filter; + lws_smd_msg_t *msg; do { tail = tail->next; @@ -393,7 +491,8 @@ _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr) return NULL; msg = lws_container_of(tail, lws_smd_msg_t, list); - if (msg->exc != pr && (msg->_class & filter)) + if (msg->exc != pr && + _lws_smd_msg_peer_interested_in_msg(pr, msg)) return msg; } while (1); @@ -401,8 +500,6 @@ _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr) } /* - * Note: May destroy zombie peers when it sees grace period has expired. - * * Delivers only one message to the peer and advances the tail, or sets to NULL * if no more filtered queued messages. Returns nonzero if tail non-NULL. * @@ -426,82 +523,30 @@ _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr) msg = lws_container_of(pr->tail, lws_smd_msg_t, list); - /* - * Check if zombie peer and the message predates our leaving - */ - if (pr->timestamp_left && - msg->timestamp > pr->timestamp_left) { - /* - * We do not need to modify message refcount, if it was - * generated after we became a zombie, and so we - * definitely did not contribute to its refcount... - * - * ...have we waited out the grace period? - */ + lwsl_smd("%s: deliver cl 0x%x, len %d, refc %d, to peer %p\n", + __func__, (unsigned int)msg->_class, (int)msg->length, + (int)msg->refcount, pr); - if (lws_now_usecs() - pr->timestamp_left > - LWS_SMD_INFLIGHT_GRACE_SECS * LWS_US_PER_SEC) - /* - * ... ok, it's time for the zombie to abandon - * its attachment to the Earth and rejoin the - * cosmic mandela - */ - _lws_smd_peer_finalize_destroy(pr); - - /* ... either way, nothing further to do for this guy */ - - return 0; - } - - if (!pr->timestamp_left) { - - /* - * Peer is not a zombie... deliver the tail - */ -#if 0 - if (pr->type == LSMDT_SECURE_STREAMS_PROXIED) { -#if defined(LWS_WITH_SECURE_STREAMS) - if (pr->ss_handle) - lws_ss_request_tx(pr->ss_handle); -#endif - return 0; - } -#endif - - lwsl_info("%s: deliver cl 0x%x, len %d, refc %d, to peer %p\n", - __func__, (unsigned int)msg->_class, (int)msg->length, - (int)msg->refcount, pr); - - pr->cb(pr->opaque, msg->_class, msg->timestamp, - ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF, - (size_t)msg->length); - } + pr->cb(pr->opaque, msg->_class, msg->timestamp, + ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF, + (size_t)msg->length); assert(msg->refcount); /* * If there is one, move forward to the next queued - * message that meets our filters + * message that meets the filters of this peer */ pr->tail = _lws_smd_msg_next_matching_filter(pr); - lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */ - if (!--msg->refcount) { - /* - * We have fully delivered the message now, it - * can be unlinked and destroyed - */ - lwsl_info("%s: destroy msg %p\n", __func__, msg); - lws_dll2_remove(&msg->list); - lws_free(msg); - } - lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */ + /* tail message has to actually be of interest to the peer */ + assert(!pr->tail || (pr->tail->_class & pr->_class_filter)); - /* - * Wait out the grace period even if no live messages - * for a zombie peer... there may be some in flight - */ + lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */ + if (!--msg->refcount) + _lws_smd_msg_destroy(&ctx->smd, msg); + lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */ return !!pr->tail; } @@ -531,7 +576,6 @@ lws_smd_msg_distribute(struct lws_context *ctx) ctx->smd.owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); - /* may destroy pr if zombie, hence _safe iterator */ more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr)); } lws_end_foreach_dll_safe(p, p1); @@ -556,35 +600,47 @@ lws_smd_register(struct lws_context *ctx, void *opaque, int flags, pr->cb = cb; pr->opaque = opaque; pr->_class_filter = _class_filter; - pr->timestamp_joined = lws_now_usecs(); - - /* - * Figure out the type of peer from the situation... - */ - -#if 0 -#if defined(LWS_WITH_SECURE_STREAMS) - if (!ctx->smd.listen_vh) { - /* - * The guy who is regsitering is actually a SS proxy link - * between a client and SMD - */ - } else -#endif -#endif - pr->type = LSMDT_SAME_PROCESS; if (!ctx->smd.delivering) lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */ + + /* + * Let's lock the message list before adding this peer... because... + */ + + lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */ + lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers); /* update the global class mask union to account for new peer mask */ _lws_smd_class_mask_union(&ctx->smd); + + /* + * Now there's a new peer added, any messages we have stashed will try + * to deliver to this guy too, if he's interested in that class. So we + * have to update the message refcounts for queued messages-he's- + * interested-in accordingly. + */ + + lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, + ctx->smd.owner_messages.head) { + lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); + + if (_lws_smd_msg_peer_interested_in_msg(pr, msg)) + msg->refcount++; + + } lws_end_foreach_dll_safe(p, p1); + + /* ... ok we are done adding the peer */ + + lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */ + + lwsl_info("%s: peer %p (count %u) registered\n", __func__, pr, + (unsigned int)ctx->smd.owner_peers.count); + if (!ctx->smd.delivering) lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ - lwsl_debug("%s: registered\n", __func__); - return pr; } @@ -594,7 +650,8 @@ lws_smd_unregister(struct lws_smd_peer *pr) lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers); lws_mutex_lock(smd->lock_peers); /* +++++++++++++++++++++++++++ peers */ - _lws_smd_peer_zombify(pr); + lwsl_notice("%s: destroying peer %p\n", __func__, pr); + _lws_smd_peer_destroy(pr); lws_mutex_unlock(smd->lock_peers); /* ------------------------- peers */ } @@ -612,14 +669,58 @@ lws_smd_message_pending(struct lws_context *ctx) return 0; /* - * Walk the peer list + * If there are any messages, check their age and expire ones that + * have been hanging around too long */ lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++++++++++ peers */ + lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */ + + lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, + ctx->smd.owner_messages.head) { + lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); + + if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) { + lwsl_warn("%s: timing out queued message %p\n", + __func__, msg); + + /* + * We're forcibly yanking this guy, we can expect that + * there might be peers that point to it as their tail. + * + * In that case, move their tails on to the next guy + * they are interested in, if any. + */ + + lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1, + ctx->smd.owner_peers.head) { + lws_smd_peer_t *pr = lws_container_of(pp, + lws_smd_peer_t, list); + + if (pr->tail == msg) + pr->tail = _lws_smd_msg_next_matching_filter(pr); + + } lws_end_foreach_dll_safe(pp, pp1); + + /* + * No peer should fall foul of the peer tail checks + * when destroying the message now. + */ + + _lws_smd_msg_destroy(&ctx->smd, msg); + } + } lws_end_foreach_dll_safe(p, p1); + + lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ + + /* + * Walk the peer list + */ + lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); - if (pr->tail && pr->type == LSMDT_SAME_PROCESS) + if (pr->tail) goto bail; } lws_end_foreach_dll(p); @@ -651,12 +752,11 @@ _lws_smd_destroy(struct lws_context *ctx) ctx->smd.owner_messages.head) { lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); + lws_dll2_remove(&msg->list); lws_free(msg); } lws_end_foreach_dll_safe(p, p1); - lws_mutex_destroy(ctx->smd.lock_messages); - /* * Walk the peer list, destroying them */ @@ -665,10 +765,12 @@ _lws_smd_destroy(struct lws_context *ctx) ctx->smd.owner_peers.head) { lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); - _lws_smd_peer_finalize_destroy(pr); + pr->tail = NULL; /* we just nuked all the messages, ignore */ + _lws_smd_peer_destroy(pr); } lws_end_foreach_dll_safe(p, p1); + lws_mutex_destroy(ctx->smd.lock_messages); lws_mutex_destroy(ctx->smd.lock_peers); return 0; diff --git a/minimal-examples/api-tests/api-test-lws_smd/main.c b/minimal-examples/api-tests/api-test-lws_smd/main.c index c1f600833..9e2510fa6 100644 --- a/minimal-examples/api-tests/api-test-lws_smd/main.c +++ b/minimal-examples/api-tests/api-test-lws_smd/main.c @@ -15,6 +15,7 @@ #include static int interrupted, ok, fail, _exp = 111; +static unsigned int how_many_msg = 100, usec_interval = 1000; static lws_sorted_usec_list_t sul, sul_initial_drain; struct lws_context *context; static pthread_t thread_spam; @@ -23,6 +24,7 @@ static void timeout_cb(lws_sorted_usec_list_t *sul) { /* We should have completed the test before this fires */ + lwsl_notice("%s: test period finished\n", __func__); interrupted = 1; lws_cancel_service(context); } @@ -76,15 +78,21 @@ smd_cb3int(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp, static void * _thread_spam(void *d) { - int n, atm = 0; +#if defined(WIN32) + unsigned int mypid = 0; +#else + unsigned int mypid = (unsigned int)getpid(); +#endif + unsigned int n = 0, atm = 0; - n = 0; - while (n++ < 100) { + while (n++ < how_many_msg) { atm++; if (lws_smd_msg_printf(context, LWSSMDCL_SYSTEM_STATE, - "{\"s\":\"state\",\"msg\":%d}", - (unsigned int)n)) { + "{\"s\":\"state\"," + "\"pid\":%u," + "\"msg\":%d}", + mypid, (unsigned int)n)) { lwsl_err("%s: send attempt %d failed\n", __func__, atm); n--; fail++; @@ -97,7 +105,7 @@ _thread_spam(void *d) #if defined(WIN32) Sleep(3); #else - usleep(1000); + usleep(usec_interval); #endif } #if !defined(WIN32) @@ -188,8 +196,15 @@ main(int argc, const char **argv) if ((p = lws_cmdline_option(argc, argv, "-d"))) logs = atoi(p); + if ((p = lws_cmdline_option(argc, argv, "--count"))) + how_many_msg = (unsigned int)atol(p); + + if ((p = lws_cmdline_option(argc, argv, "--interval"))) + usec_interval = (unsigned int)atol(p); + lws_set_log_level(logs, NULL); - lwsl_user("LWS API selftest: lws_smd\n"); + lwsl_user("LWS API selftest: lws_smd: %u msgs at %uus interval\n", + how_many_msg, usec_interval); memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ info.port = CONTEXT_PORT_NO_LISTEN; @@ -202,7 +217,10 @@ main(int argc, const char **argv) return 1; } - lws_sul_schedule(context, 0, &sul, timeout_cb, 5 * LWS_US_PER_SEC); + /* game over after this long */ + + lws_sul_schedule(context, 0, &sul, timeout_cb, + (how_many_msg * (usec_interval + 1000)) + (4 * LWS_US_PER_SEC)); /* register a messaging participant to hear INTERACTION class */ diff --git a/minimal-examples/secure-streams/minimal-secure-streams-smd/minimal-secure-streams-smd.c b/minimal-examples/secure-streams/minimal-secure-streams-smd/minimal-secure-streams-smd.c index 4212ad0e3..a0d028635 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams-smd/minimal-secure-streams-smd.c +++ b/minimal-examples/secure-streams/minimal-secure-streams-smd/minimal-secure-streams-smd.c @@ -16,6 +16,7 @@ #include static int interrupted, bad = 1, count_p1, count_p2, count_tx; +static unsigned int how_many_msg = 100, usec_interval = 1000; static lws_sorted_usec_list_t sul_timeout; /* @@ -139,7 +140,7 @@ myss_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len, count_tx++; lws_sul_schedule(lws_ss_get_context(m->ss), 0, &m->sul, - sul_tx_periodic_cb, 200 * LWS_US_PER_MS); + sul_tx_periodic_cb, usec_interval); return LWSSSSRET_OK; } @@ -234,6 +235,7 @@ direct_smd_cb(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp, static void sul_timeout_cb(lws_sorted_usec_list_t *sul) { + lwsl_notice("%s: test finishing\n", __func__); interrupted = 1; } @@ -250,6 +252,7 @@ int main(int argc, const char **argv) { struct lws_context_creation_info info; struct lws_context *context; + const char *p; signal(SIGINT, sigint_handler); @@ -262,7 +265,14 @@ int main(int argc, const char **argv) lws_cmdline_option_handle_builtin(argc, argv, &info); - lwsl_user("LWS Secure Streams SMD test client [-d]\n"); + if ((p = lws_cmdline_option(argc, argv, "--count"))) + how_many_msg = (unsigned int)atol(p); + + if ((p = lws_cmdline_option(argc, argv, "--interval"))) + usec_interval = (unsigned int)atol(p); + + lwsl_user("LWS Secure Streams SMD test client [-d]: " + "%u msgs at %uus interval\n", how_many_msg, usec_interval); info.fd_limit_per_thread = 1 + 6 + 1; info.port = CONTEXT_PORT_NO_LISTEN; @@ -271,8 +281,6 @@ int main(int argc, const char **argv) #else info.protocols = lws_sspc_protocols; { - const char *p; - /* connect to ssproxy via UDS by default, else via * tcp connection to this port */ if ((p = lws_cmdline_option(argc, argv, "-p"))) @@ -314,7 +322,7 @@ int main(int argc, const char **argv) /* set up the test timeout */ lws_sul_schedule(context, 0, &sul_timeout, sul_timeout_cb, - 4 * LWS_US_PER_SEC); + (how_many_msg * (usec_interval + 1000)) + LWS_US_PER_SEC); /* the event loop */