mirror of
https://github.com/warmcat/libwebsockets.git
synced 2025-03-09 00:00:04 +01:00
smd: account for new interested peers joining while queue exists
This commit is contained in:
parent
fc5302589c
commit
8a087043c6
7 changed files with 308 additions and 175 deletions
|
@ -820,6 +820,14 @@ struct lws_context_creation_info {
|
|||
*/
|
||||
void *early_smd_opaque;
|
||||
lws_smd_class_t early_smd_class_filter;
|
||||
lws_usec_t smd_ttl_us;
|
||||
/**< CONTEXT: SMD messages older than this many us are removed from the
|
||||
* queue and destroyed even if not fully delivered yet. If zero,
|
||||
* defaults to 2 seconds (5 second for FREERTOS).
|
||||
*/
|
||||
uint16_t smd_queue_depth;
|
||||
/**< CONTEXT: Maximum queue depth, If zero defaults to 40
|
||||
* (20 for FREERTOS) */
|
||||
#endif
|
||||
|
||||
/* Add new things just above here ---^
|
||||
|
|
|
@ -613,6 +613,22 @@ lws_create_context(const struct lws_context_creation_info *info)
|
|||
context->system_ops = info->system_ops;
|
||||
context->pt_serv_buf_size = (unsigned int)s1;
|
||||
|
||||
#if defined(LWS_WITH_SYS_SMD)
|
||||
context->smd_ttl_us = info->smd_ttl_us ? info->smd_ttl_us :
|
||||
#if defined(LWS_PLAT_FREERTOS)
|
||||
5000000;
|
||||
#else
|
||||
2000000;
|
||||
#endif
|
||||
context->smd_queue_depth = info->smd_queue_depth ?
|
||||
info->smd_queue_depth :
|
||||
#if defined(LWS_PLAT_FREERTOS)
|
||||
20;
|
||||
#else
|
||||
40;
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#if defined(LWS_WITH_UDP)
|
||||
context->udp_loss_sim_tx_pc = info->udp_loss_sim_tx_pc;
|
||||
context->udp_loss_sim_rx_pc = info->udp_loss_sim_rx_pc;
|
||||
|
|
|
@ -570,7 +570,9 @@ struct lws_context {
|
|||
#endif
|
||||
|
||||
lws_usec_t time_up; /* monotonic */
|
||||
|
||||
#if defined(LWS_WITH_SYS_SMD)
|
||||
lws_usec_t smd_ttl_us;
|
||||
#endif
|
||||
uint64_t options;
|
||||
|
||||
time_t last_ws_ping_pong_check_s;
|
||||
|
@ -608,6 +610,11 @@ struct lws_context {
|
|||
unsigned short ip_limit_ah;
|
||||
unsigned short ip_limit_wsi;
|
||||
#endif
|
||||
|
||||
#if defined(LWS_WITH_SYS_SMD)
|
||||
uint16_t smd_queue_depth;
|
||||
#endif
|
||||
|
||||
unsigned int deprecated:1;
|
||||
unsigned int inside_context_destroy:1;
|
||||
unsigned int being_destroyed:1;
|
||||
|
|
|
@ -22,16 +22,6 @@
|
|||
* IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Number of seconds registered peers must remain as zombies to handle in-flight
|
||||
* older messages correctly
|
||||
*/
|
||||
#if !defined(LWS_SMD_INFLIGHT_GRACE_SECS)
|
||||
#define LWS_SMD_INFLIGHT_GRACE_SECS (2)
|
||||
#endif
|
||||
#if !defined(LWS_SMD_MAX_QUEUE_DEPTH)
|
||||
#define LWS_SMD_MAX_QUEUE_DEPTH (20)
|
||||
#endif
|
||||
|
||||
#if defined(LWS_WITH_SECURE_STREAMS)
|
||||
#define LWS_SMD_SS_RX_HEADER_LEN_EFF (LWS_SMD_SS_RX_HEADER_LEN)
|
||||
|
@ -55,22 +45,9 @@ typedef struct lws_smd_msg {
|
|||
/* message itself is over-allocated after this */
|
||||
} lws_smd_msg_t;
|
||||
|
||||
/*
|
||||
* The peer's relationship to the lws instance doing the distribution
|
||||
*/
|
||||
|
||||
typedef enum {
|
||||
LSMDT_SAME_PROCESS, /* we call him back ourselves */
|
||||
LSMDT_SECURE_STREAMS_LOCAL, /* we call him back ourselves */
|
||||
LSMDT_SECURE_STREAMS_PROXIED, /* we ask to write and wait */
|
||||
} lws_smd_type_t;
|
||||
|
||||
typedef struct lws_smd_peer {
|
||||
lws_dll2_t list;
|
||||
|
||||
lws_usec_t timestamp_joined;
|
||||
lws_usec_t timestamp_left;
|
||||
|
||||
#if defined(LWS_WITH_SECURE_STREAMS)
|
||||
lws_ss_handle_t *ss_handle; /* LSMDT_SECURE_STREAMS */
|
||||
#endif
|
||||
|
@ -82,7 +59,6 @@ typedef struct lws_smd_peer {
|
|||
lws_smd_msg_t *tail;
|
||||
|
||||
lws_smd_class_t _class_filter;
|
||||
lws_smd_type_t type;
|
||||
} lws_smd_peer_t;
|
||||
|
||||
/*
|
||||
|
@ -101,8 +77,6 @@ typedef struct lws_smd {
|
|||
/* union of peer class filters, suppress creation of msg classes not set */
|
||||
lws_smd_class_t _class_filter;
|
||||
|
||||
lws_sorted_usec_list_t sul_tail_stale;
|
||||
|
||||
char delivering;
|
||||
} lws_smd_t;
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
/*
|
||||
* lws System Message Distribution
|
||||
*
|
||||
* Copyright (C) 2019 - 2020 Andy Green <andy@warmcat.com>
|
||||
* Copyright (C) 2019 - 2021 Andy Green <andy@warmcat.com>
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to
|
||||
|
@ -25,6 +25,16 @@
|
|||
#include "private-lib-core.h"
|
||||
#include <assert.h>
|
||||
|
||||
/* comment me to remove extra debug and sanity checks */
|
||||
// #define LWS_SMD_DEBUG
|
||||
|
||||
|
||||
#if defined(LWS_SMD_DEBUG)
|
||||
#define lwsl_smd lwsl_notice
|
||||
#else
|
||||
#define lwsl_smd(_s, ...)
|
||||
#endif
|
||||
|
||||
void *
|
||||
lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len)
|
||||
{
|
||||
|
@ -69,6 +79,40 @@ lws_smd_msg_free(void **ppay)
|
|||
*ppay = NULL;
|
||||
}
|
||||
|
||||
#if defined(LWS_SMD_DEBUG)
|
||||
static void
|
||||
lws_smd_dump(lws_smd_t *smd)
|
||||
{
|
||||
int n = 1;
|
||||
|
||||
lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
|
||||
smd->owner_messages.head) {
|
||||
lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
|
||||
|
||||
lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n",
|
||||
n++, msg, msg->refcount,
|
||||
(unsigned int)((lws_now_usecs() - msg->timestamp) / 1000),
|
||||
msg->length, msg->_class,
|
||||
(const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF);
|
||||
|
||||
} lws_end_foreach_dll_safe(p, p1);
|
||||
|
||||
n = 1;
|
||||
lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) {
|
||||
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
|
||||
|
||||
lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n",
|
||||
n++, pr, pr->tail, pr->_class_filter);
|
||||
} lws_end_foreach_dll(p);
|
||||
}
|
||||
#endif
|
||||
|
||||
static int
|
||||
_lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg)
|
||||
{
|
||||
return !!(msg->_class & pr->_class_filter);
|
||||
}
|
||||
|
||||
/*
|
||||
* Figure out what to set the initial refcount for the message to
|
||||
*/
|
||||
|
@ -83,21 +127,7 @@ _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg,
|
|||
lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
|
||||
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
|
||||
|
||||
/*
|
||||
* In order to optimize the tail managment into a refcount,
|
||||
* we have to account exactly for when peers arrived and
|
||||
* departed (including deferring the logical peer destruction
|
||||
* until no message pending he may have contributed to the
|
||||
* refcount of)
|
||||
*/
|
||||
|
||||
if (pr != exc &&
|
||||
pr->timestamp_joined <= msg->timestamp &&
|
||||
(!pr->timestamp_left || /* if zombie, only contribute to
|
||||
* refcount if msg from before we
|
||||
* left */
|
||||
pr->timestamp_left >= msg->timestamp) &&
|
||||
(msg->_class & pr->_class_filter))
|
||||
if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg))
|
||||
/*
|
||||
* This peer wants to consume it
|
||||
*/
|
||||
|
@ -117,7 +147,6 @@ _lws_smd_class_mask_union(lws_smd_t *smd)
|
|||
smd->owner_peers.head) {
|
||||
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
|
||||
|
||||
/* may destroy pr if zombie */
|
||||
mask |= pr->_class_filter;
|
||||
|
||||
} lws_end_foreach_dll_safe(p, p1);
|
||||
|
@ -127,13 +156,51 @@ _lws_smd_class_mask_union(lws_smd_t *smd)
|
|||
return 0;
|
||||
}
|
||||
|
||||
/* Call with message lock held */
|
||||
|
||||
static void
|
||||
_lws_smd_msg_destroy(lws_smd_t *smd, lws_smd_msg_t *msg)
|
||||
{
|
||||
/*
|
||||
* We think we gave the message to everyone and can destroy it.
|
||||
* Sanity check that no peer holds a pointer to this guy
|
||||
*/
|
||||
|
||||
lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
|
||||
smd->owner_peers.head) {
|
||||
lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list);
|
||||
|
||||
if (xpr->tail == msg) {
|
||||
lwsl_err("%s: peer %p has msg %p "
|
||||
"we are about to destroy as tail\n",
|
||||
__func__, xpr, msg);
|
||||
#if !defined(LWS_PLAT_FREERTOS)
|
||||
assert(0);
|
||||
#endif
|
||||
}
|
||||
|
||||
} lws_end_foreach_dll_safe(p, p1);
|
||||
|
||||
/*
|
||||
* We have fully delivered the message now, it
|
||||
* can be unlinked and destroyed
|
||||
*/
|
||||
lwsl_info("%s: destroy msg %p\n", __func__, msg);
|
||||
lws_dll2_remove(&msg->list);
|
||||
lws_free(msg);
|
||||
}
|
||||
|
||||
/*
|
||||
* This is wanting to be threadsafe, limiting the apis we can call
|
||||
*/
|
||||
|
||||
int
|
||||
_lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
|
||||
{
|
||||
lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) -
|
||||
LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
|
||||
|
||||
if (ctx->smd.owner_messages.count >= LWS_SMD_MAX_QUEUE_DEPTH) {
|
||||
if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) {
|
||||
lwsl_warn("%s: rejecting message on queue depth %d\n",
|
||||
__func__, (int)ctx->smd.owner_messages.count);
|
||||
/* reject the message due to max queue depth reached */
|
||||
|
@ -143,7 +210,8 @@ _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
|
|||
if (!ctx->smd.delivering)
|
||||
lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
|
||||
|
||||
msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(&ctx->smd, msg, exc);
|
||||
msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(
|
||||
&ctx->smd, msg, exc);
|
||||
if (!msg->refcount) {
|
||||
/* possible, condsidering exc and no other participants */
|
||||
lws_free(msg);
|
||||
|
@ -155,9 +223,10 @@ _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
|
|||
|
||||
msg->exc = exc;
|
||||
|
||||
/* let's add him on the queue... */
|
||||
|
||||
lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */
|
||||
lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages);
|
||||
lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
|
||||
|
||||
/*
|
||||
* Any peer with no active tail needs to check our class to see if we
|
||||
|
@ -168,11 +237,22 @@ _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
|
|||
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
|
||||
|
||||
if (pr != exc &&
|
||||
!pr->tail && (pr->_class_filter & msg->_class))
|
||||
!pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) {
|
||||
pr->tail = msg;
|
||||
/* tail message has to actually be of interest to the peer */
|
||||
assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
|
||||
}
|
||||
|
||||
} lws_end_foreach_dll(p);
|
||||
|
||||
#if defined(LWS_SMD_DEBUG)
|
||||
lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__,
|
||||
msg, msg->refcount, ctx->smd.owner_messages.count);
|
||||
lws_smd_dump(&ctx->smd);
|
||||
#endif
|
||||
|
||||
lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
|
||||
|
||||
if (!ctx->smd.delivering)
|
||||
lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
|
||||
|
||||
|
@ -182,12 +262,20 @@ _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
|
|||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* This is wanting to be threadsafe, limiting the apis we can call
|
||||
*/
|
||||
|
||||
int
|
||||
lws_smd_msg_send(struct lws_context *ctx, void *pay)
|
||||
{
|
||||
return _lws_smd_msg_send(ctx, pay, NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* This is wanting to be threadsafe, limiting the apis we can call
|
||||
*/
|
||||
|
||||
int
|
||||
lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
|
||||
const char *format, ...)
|
||||
|
@ -259,7 +347,7 @@ lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len,
|
|||
|
||||
*len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n;
|
||||
|
||||
lwsl_notice("%s: %s send cl 0x%x, len %u\n", __func__, tag, _class,
|
||||
lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, _class,
|
||||
(unsigned int)n);
|
||||
|
||||
return 0;
|
||||
|
@ -322,7 +410,7 @@ _lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag,
|
|||
return 1;
|
||||
}
|
||||
|
||||
lwsl_notice("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__,
|
||||
lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__,
|
||||
tag, _class, msg->length,
|
||||
(unsigned long long)msg->timestamp);
|
||||
|
||||
|
@ -353,39 +441,49 @@ lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
|
|||
|
||||
#endif
|
||||
|
||||
static void
|
||||
_lws_smd_peer_finalize_destroy(lws_smd_peer_t *pr)
|
||||
{
|
||||
lws_dll2_remove(&pr->list);
|
||||
lws_free(pr);
|
||||
}
|
||||
|
||||
/*
|
||||
* Peers that deregister may need to hang around as zombies, so they account
|
||||
* for refcounts on messages they already contributed to. Because older
|
||||
* messages may be in flight over UDS links, we have to stick around and make
|
||||
* sure all cases have their refcount handled correctly.
|
||||
* Peers that deregister need to adjust the refcount of messages they would
|
||||
* have been interested in, but didn't take delivery of yet
|
||||
*/
|
||||
|
||||
static void
|
||||
_lws_smd_peer_zombify(lws_smd_peer_t *pr)
|
||||
_lws_smd_peer_destroy(lws_smd_peer_t *pr)
|
||||
{
|
||||
lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t,
|
||||
owner_peers);
|
||||
|
||||
/* update the class mask union to reflect this peer no longer active */
|
||||
_lws_smd_class_mask_union(smd);
|
||||
lws_mutex_lock(smd->lock_messages); /* +++++++++ messages */
|
||||
|
||||
pr->timestamp_left = lws_now_usecs();
|
||||
lws_dll2_remove(&pr->list);
|
||||
|
||||
/*
|
||||
* We take the approach to adjust the refcount of every would-have-been
|
||||
* delivered message we were interested in
|
||||
*/
|
||||
|
||||
while (pr->tail) {
|
||||
|
||||
lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next,
|
||||
lws_smd_msg_t, list);
|
||||
|
||||
if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) {
|
||||
if (!--pr->tail->refcount)
|
||||
_lws_smd_msg_destroy(smd, pr->tail);
|
||||
}
|
||||
|
||||
pr->tail = m1;
|
||||
}
|
||||
|
||||
lws_free(pr);
|
||||
|
||||
lws_mutex_unlock(smd->lock_messages); /* messages ------- */
|
||||
}
|
||||
|
||||
static lws_smd_msg_t *
|
||||
_lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr)
|
||||
{
|
||||
lws_smd_msg_t *msg;
|
||||
|
||||
lws_dll2_t *tail = &pr->tail->list;
|
||||
lws_smd_class_t filter = pr->_class_filter;
|
||||
lws_smd_msg_t *msg;
|
||||
|
||||
do {
|
||||
tail = tail->next;
|
||||
|
@ -393,7 +491,8 @@ _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr)
|
|||
return NULL;
|
||||
|
||||
msg = lws_container_of(tail, lws_smd_msg_t, list);
|
||||
if (msg->exc != pr && (msg->_class & filter))
|
||||
if (msg->exc != pr &&
|
||||
_lws_smd_msg_peer_interested_in_msg(pr, msg))
|
||||
return msg;
|
||||
} while (1);
|
||||
|
||||
|
@ -401,8 +500,6 @@ _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr)
|
|||
}
|
||||
|
||||
/*
|
||||
* Note: May destroy zombie peers when it sees grace period has expired.
|
||||
*
|
||||
* Delivers only one message to the peer and advances the tail, or sets to NULL
|
||||
* if no more filtered queued messages. Returns nonzero if tail non-NULL.
|
||||
*
|
||||
|
@ -426,82 +523,30 @@ _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
|
|||
|
||||
msg = lws_container_of(pr->tail, lws_smd_msg_t, list);
|
||||
|
||||
/*
|
||||
* Check if zombie peer and the message predates our leaving
|
||||
*/
|
||||
|
||||
if (pr->timestamp_left &&
|
||||
msg->timestamp > pr->timestamp_left) {
|
||||
/*
|
||||
* We do not need to modify message refcount, if it was
|
||||
* generated after we became a zombie, and so we
|
||||
* definitely did not contribute to its refcount...
|
||||
*
|
||||
* ...have we waited out the grace period?
|
||||
*/
|
||||
lwsl_smd("%s: deliver cl 0x%x, len %d, refc %d, to peer %p\n",
|
||||
__func__, (unsigned int)msg->_class, (int)msg->length,
|
||||
(int)msg->refcount, pr);
|
||||
|
||||
if (lws_now_usecs() - pr->timestamp_left >
|
||||
LWS_SMD_INFLIGHT_GRACE_SECS * LWS_US_PER_SEC)
|
||||
/*
|
||||
* ... ok, it's time for the zombie to abandon
|
||||
* its attachment to the Earth and rejoin the
|
||||
* cosmic mandela
|
||||
*/
|
||||
_lws_smd_peer_finalize_destroy(pr);
|
||||
|
||||
/* ... either way, nothing further to do for this guy */
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!pr->timestamp_left) {
|
||||
|
||||
/*
|
||||
* Peer is not a zombie... deliver the tail
|
||||
*/
|
||||
#if 0
|
||||
if (pr->type == LSMDT_SECURE_STREAMS_PROXIED) {
|
||||
#if defined(LWS_WITH_SECURE_STREAMS)
|
||||
if (pr->ss_handle)
|
||||
lws_ss_request_tx(pr->ss_handle);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
lwsl_info("%s: deliver cl 0x%x, len %d, refc %d, to peer %p\n",
|
||||
__func__, (unsigned int)msg->_class, (int)msg->length,
|
||||
(int)msg->refcount, pr);
|
||||
|
||||
pr->cb(pr->opaque, msg->_class, msg->timestamp,
|
||||
((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF,
|
||||
(size_t)msg->length);
|
||||
}
|
||||
pr->cb(pr->opaque, msg->_class, msg->timestamp,
|
||||
((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF,
|
||||
(size_t)msg->length);
|
||||
|
||||
assert(msg->refcount);
|
||||
|
||||
/*
|
||||
* If there is one, move forward to the next queued
|
||||
* message that meets our filters
|
||||
* message that meets the filters of this peer
|
||||
*/
|
||||
pr->tail = _lws_smd_msg_next_matching_filter(pr);
|
||||
|
||||
lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */
|
||||
if (!--msg->refcount) {
|
||||
/*
|
||||
* We have fully delivered the message now, it
|
||||
* can be unlinked and destroyed
|
||||
*/
|
||||
lwsl_info("%s: destroy msg %p\n", __func__, msg);
|
||||
lws_dll2_remove(&msg->list);
|
||||
lws_free(msg);
|
||||
}
|
||||
lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
|
||||
/* tail message has to actually be of interest to the peer */
|
||||
assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
|
||||
|
||||
/*
|
||||
* Wait out the grace period even if no live messages
|
||||
* for a zombie peer... there may be some in flight
|
||||
*/
|
||||
lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */
|
||||
if (!--msg->refcount)
|
||||
_lws_smd_msg_destroy(&ctx->smd, msg);
|
||||
lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
|
||||
|
||||
return !!pr->tail;
|
||||
}
|
||||
|
@ -531,7 +576,6 @@ lws_smd_msg_distribute(struct lws_context *ctx)
|
|||
ctx->smd.owner_peers.head) {
|
||||
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
|
||||
|
||||
/* may destroy pr if zombie, hence _safe iterator */
|
||||
more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr));
|
||||
|
||||
} lws_end_foreach_dll_safe(p, p1);
|
||||
|
@ -556,35 +600,47 @@ lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
|
|||
pr->cb = cb;
|
||||
pr->opaque = opaque;
|
||||
pr->_class_filter = _class_filter;
|
||||
pr->timestamp_joined = lws_now_usecs();
|
||||
|
||||
/*
|
||||
* Figure out the type of peer from the situation...
|
||||
*/
|
||||
|
||||
#if 0
|
||||
#if defined(LWS_WITH_SECURE_STREAMS)
|
||||
if (!ctx->smd.listen_vh) {
|
||||
/*
|
||||
* The guy who is regsitering is actually a SS proxy link
|
||||
* between a client and SMD
|
||||
*/
|
||||
} else
|
||||
#endif
|
||||
#endif
|
||||
pr->type = LSMDT_SAME_PROCESS;
|
||||
|
||||
if (!ctx->smd.delivering)
|
||||
lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
|
||||
|
||||
/*
|
||||
* Let's lock the message list before adding this peer... because...
|
||||
*/
|
||||
|
||||
lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++ messages */
|
||||
|
||||
lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers);
|
||||
|
||||
/* update the global class mask union to account for new peer mask */
|
||||
_lws_smd_class_mask_union(&ctx->smd);
|
||||
|
||||
/*
|
||||
* Now there's a new peer added, any messages we have stashed will try
|
||||
* to deliver to this guy too, if he's interested in that class. So we
|
||||
* have to update the message refcounts for queued messages-he's-
|
||||
* interested-in accordingly.
|
||||
*/
|
||||
|
||||
lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
|
||||
ctx->smd.owner_messages.head) {
|
||||
lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
|
||||
|
||||
if (_lws_smd_msg_peer_interested_in_msg(pr, msg))
|
||||
msg->refcount++;
|
||||
|
||||
} lws_end_foreach_dll_safe(p, p1);
|
||||
|
||||
/* ... ok we are done adding the peer */
|
||||
|
||||
lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
|
||||
|
||||
lwsl_info("%s: peer %p (count %u) registered\n", __func__, pr,
|
||||
(unsigned int)ctx->smd.owner_peers.count);
|
||||
|
||||
if (!ctx->smd.delivering)
|
||||
lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
|
||||
|
||||
lwsl_debug("%s: registered\n", __func__);
|
||||
|
||||
return pr;
|
||||
}
|
||||
|
||||
|
@ -594,7 +650,8 @@ lws_smd_unregister(struct lws_smd_peer *pr)
|
|||
lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers);
|
||||
|
||||
lws_mutex_lock(smd->lock_peers); /* +++++++++++++++++++++++++++ peers */
|
||||
_lws_smd_peer_zombify(pr);
|
||||
lwsl_notice("%s: destroying peer %p\n", __func__, pr);
|
||||
_lws_smd_peer_destroy(pr);
|
||||
lws_mutex_unlock(smd->lock_peers); /* ------------------------- peers */
|
||||
}
|
||||
|
||||
|
@ -612,14 +669,58 @@ lws_smd_message_pending(struct lws_context *ctx)
|
|||
return 0;
|
||||
|
||||
/*
|
||||
* Walk the peer list
|
||||
* If there are any messages, check their age and expire ones that
|
||||
* have been hanging around too long
|
||||
*/
|
||||
|
||||
lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++++++++++ peers */
|
||||
lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */
|
||||
|
||||
lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
|
||||
ctx->smd.owner_messages.head) {
|
||||
lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
|
||||
|
||||
if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) {
|
||||
lwsl_warn("%s: timing out queued message %p\n",
|
||||
__func__, msg);
|
||||
|
||||
/*
|
||||
* We're forcibly yanking this guy, we can expect that
|
||||
* there might be peers that point to it as their tail.
|
||||
*
|
||||
* In that case, move their tails on to the next guy
|
||||
* they are interested in, if any.
|
||||
*/
|
||||
|
||||
lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1,
|
||||
ctx->smd.owner_peers.head) {
|
||||
lws_smd_peer_t *pr = lws_container_of(pp,
|
||||
lws_smd_peer_t, list);
|
||||
|
||||
if (pr->tail == msg)
|
||||
pr->tail = _lws_smd_msg_next_matching_filter(pr);
|
||||
|
||||
} lws_end_foreach_dll_safe(pp, pp1);
|
||||
|
||||
/*
|
||||
* No peer should fall foul of the peer tail checks
|
||||
* when destroying the message now.
|
||||
*/
|
||||
|
||||
_lws_smd_msg_destroy(&ctx->smd, msg);
|
||||
}
|
||||
} lws_end_foreach_dll_safe(p, p1);
|
||||
|
||||
lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
|
||||
|
||||
/*
|
||||
* Walk the peer list
|
||||
*/
|
||||
|
||||
lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
|
||||
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
|
||||
|
||||
if (pr->tail && pr->type == LSMDT_SAME_PROCESS)
|
||||
if (pr->tail)
|
||||
goto bail;
|
||||
|
||||
} lws_end_foreach_dll(p);
|
||||
|
@ -651,12 +752,11 @@ _lws_smd_destroy(struct lws_context *ctx)
|
|||
ctx->smd.owner_messages.head) {
|
||||
lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
|
||||
|
||||
lws_dll2_remove(&msg->list);
|
||||
lws_free(msg);
|
||||
|
||||
} lws_end_foreach_dll_safe(p, p1);
|
||||
|
||||
lws_mutex_destroy(ctx->smd.lock_messages);
|
||||
|
||||
/*
|
||||
* Walk the peer list, destroying them
|
||||
*/
|
||||
|
@ -665,10 +765,12 @@ _lws_smd_destroy(struct lws_context *ctx)
|
|||
ctx->smd.owner_peers.head) {
|
||||
lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
|
||||
|
||||
_lws_smd_peer_finalize_destroy(pr);
|
||||
pr->tail = NULL; /* we just nuked all the messages, ignore */
|
||||
_lws_smd_peer_destroy(pr);
|
||||
|
||||
} lws_end_foreach_dll_safe(p, p1);
|
||||
|
||||
lws_mutex_destroy(ctx->smd.lock_messages);
|
||||
lws_mutex_destroy(ctx->smd.lock_peers);
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include <signal.h>
|
||||
|
||||
static int interrupted, ok, fail, _exp = 111;
|
||||
static unsigned int how_many_msg = 100, usec_interval = 1000;
|
||||
static lws_sorted_usec_list_t sul, sul_initial_drain;
|
||||
struct lws_context *context;
|
||||
static pthread_t thread_spam;
|
||||
|
@ -23,6 +24,7 @@ static void
|
|||
timeout_cb(lws_sorted_usec_list_t *sul)
|
||||
{
|
||||
/* We should have completed the test before this fires */
|
||||
lwsl_notice("%s: test period finished\n", __func__);
|
||||
interrupted = 1;
|
||||
lws_cancel_service(context);
|
||||
}
|
||||
|
@ -76,15 +78,21 @@ smd_cb3int(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp,
|
|||
static void *
|
||||
_thread_spam(void *d)
|
||||
{
|
||||
int n, atm = 0;
|
||||
#if defined(WIN32)
|
||||
unsigned int mypid = 0;
|
||||
#else
|
||||
unsigned int mypid = (unsigned int)getpid();
|
||||
#endif
|
||||
unsigned int n = 0, atm = 0;
|
||||
|
||||
n = 0;
|
||||
while (n++ < 100) {
|
||||
while (n++ < how_many_msg) {
|
||||
|
||||
atm++;
|
||||
if (lws_smd_msg_printf(context, LWSSMDCL_SYSTEM_STATE,
|
||||
"{\"s\":\"state\",\"msg\":%d}",
|
||||
(unsigned int)n)) {
|
||||
"{\"s\":\"state\","
|
||||
"\"pid\":%u,"
|
||||
"\"msg\":%d}",
|
||||
mypid, (unsigned int)n)) {
|
||||
lwsl_err("%s: send attempt %d failed\n", __func__, atm);
|
||||
n--;
|
||||
fail++;
|
||||
|
@ -97,7 +105,7 @@ _thread_spam(void *d)
|
|||
#if defined(WIN32)
|
||||
Sleep(3);
|
||||
#else
|
||||
usleep(1000);
|
||||
usleep(usec_interval);
|
||||
#endif
|
||||
}
|
||||
#if !defined(WIN32)
|
||||
|
@ -188,8 +196,15 @@ main(int argc, const char **argv)
|
|||
if ((p = lws_cmdline_option(argc, argv, "-d")))
|
||||
logs = atoi(p);
|
||||
|
||||
if ((p = lws_cmdline_option(argc, argv, "--count")))
|
||||
how_many_msg = (unsigned int)atol(p);
|
||||
|
||||
if ((p = lws_cmdline_option(argc, argv, "--interval")))
|
||||
usec_interval = (unsigned int)atol(p);
|
||||
|
||||
lws_set_log_level(logs, NULL);
|
||||
lwsl_user("LWS API selftest: lws_smd\n");
|
||||
lwsl_user("LWS API selftest: lws_smd: %u msgs at %uus interval\n",
|
||||
how_many_msg, usec_interval);
|
||||
|
||||
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
|
||||
info.port = CONTEXT_PORT_NO_LISTEN;
|
||||
|
@ -202,7 +217,10 @@ main(int argc, const char **argv)
|
|||
return 1;
|
||||
}
|
||||
|
||||
lws_sul_schedule(context, 0, &sul, timeout_cb, 5 * LWS_US_PER_SEC);
|
||||
/* game over after this long */
|
||||
|
||||
lws_sul_schedule(context, 0, &sul, timeout_cb,
|
||||
(how_many_msg * (usec_interval + 1000)) + (4 * LWS_US_PER_SEC));
|
||||
|
||||
/* register a messaging participant to hear INTERACTION class */
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include <signal.h>
|
||||
|
||||
static int interrupted, bad = 1, count_p1, count_p2, count_tx;
|
||||
static unsigned int how_many_msg = 100, usec_interval = 1000;
|
||||
static lws_sorted_usec_list_t sul_timeout;
|
||||
|
||||
/*
|
||||
|
@ -139,7 +140,7 @@ myss_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len,
|
|||
count_tx++;
|
||||
|
||||
lws_sul_schedule(lws_ss_get_context(m->ss), 0, &m->sul,
|
||||
sul_tx_periodic_cb, 200 * LWS_US_PER_MS);
|
||||
sul_tx_periodic_cb, usec_interval);
|
||||
|
||||
return LWSSSSRET_OK;
|
||||
}
|
||||
|
@ -234,6 +235,7 @@ direct_smd_cb(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp,
|
|||
static void
|
||||
sul_timeout_cb(lws_sorted_usec_list_t *sul)
|
||||
{
|
||||
lwsl_notice("%s: test finishing\n", __func__);
|
||||
interrupted = 1;
|
||||
}
|
||||
|
||||
|
@ -250,6 +252,7 @@ int main(int argc, const char **argv)
|
|||
{
|
||||
struct lws_context_creation_info info;
|
||||
struct lws_context *context;
|
||||
const char *p;
|
||||
|
||||
signal(SIGINT, sigint_handler);
|
||||
|
||||
|
@ -262,7 +265,14 @@ int main(int argc, const char **argv)
|
|||
|
||||
lws_cmdline_option_handle_builtin(argc, argv, &info);
|
||||
|
||||
lwsl_user("LWS Secure Streams SMD test client [-d<verb>]\n");
|
||||
if ((p = lws_cmdline_option(argc, argv, "--count")))
|
||||
how_many_msg = (unsigned int)atol(p);
|
||||
|
||||
if ((p = lws_cmdline_option(argc, argv, "--interval")))
|
||||
usec_interval = (unsigned int)atol(p);
|
||||
|
||||
lwsl_user("LWS Secure Streams SMD test client [-d<verb>]: "
|
||||
"%u msgs at %uus interval\n", how_many_msg, usec_interval);
|
||||
|
||||
info.fd_limit_per_thread = 1 + 6 + 1;
|
||||
info.port = CONTEXT_PORT_NO_LISTEN;
|
||||
|
@ -271,8 +281,6 @@ int main(int argc, const char **argv)
|
|||
#else
|
||||
info.protocols = lws_sspc_protocols;
|
||||
{
|
||||
const char *p;
|
||||
|
||||
/* connect to ssproxy via UDS by default, else via
|
||||
* tcp connection to this port */
|
||||
if ((p = lws_cmdline_option(argc, argv, "-p")))
|
||||
|
@ -314,7 +322,7 @@ int main(int argc, const char **argv)
|
|||
/* set up the test timeout */
|
||||
|
||||
lws_sul_schedule(context, 0, &sul_timeout, sul_timeout_cb,
|
||||
4 * LWS_US_PER_SEC);
|
||||
(how_many_msg * (usec_interval + 1000)) + LWS_US_PER_SEC);
|
||||
|
||||
/* the event loop */
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue