mirror of
https://github.com/warmcat/libwebsockets.git
synced 2025-03-09 00:00:04 +01:00
lws_ring: implement lagging client culling
This commit is contained in:
parent
c4dc102a0b
commit
412ff64be9
8 changed files with 132 additions and 12 deletions
|
@ -390,6 +390,9 @@ lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs)
|
|||
return;
|
||||
}
|
||||
|
||||
if (secs == LWS_TO_KILL_ASYNC)
|
||||
secs = 0;
|
||||
|
||||
lws_pt_lock(pt, __func__);
|
||||
__lws_set_timeout(wsi, reason, secs);
|
||||
lws_pt_unlock(pt);
|
||||
|
@ -2932,12 +2935,14 @@ lws_cmdline_option(int argc, const char **argv, const char *val)
|
|||
int n = (int)strlen(val), c = argc;
|
||||
|
||||
while (--c > 0) {
|
||||
/* coverity treats unchecked argv as "tainted" */
|
||||
if (!argv[c] || strlen(argv[c]) > 1024)
|
||||
return NULL;
|
||||
|
||||
if (!strncmp(argv[c], val, n)) {
|
||||
if (!*(argv[c] + n) && c < argc - 1)
|
||||
if (!*(argv[c] + n) && c < argc - 1) {
|
||||
/* coverity treats unchecked argv as "tainted" */
|
||||
if (!argv[c + 1] || strlen(argv[c + 1]) > 1024)
|
||||
return NULL;
|
||||
return argv[c + 1];
|
||||
}
|
||||
|
||||
return argv[c] + n;
|
||||
}
|
||||
|
|
|
@ -4668,6 +4668,7 @@ enum pending_timeout {
|
|||
PENDING_TIMEOUT_HOLDING_AH = 25,
|
||||
PENDING_TIMEOUT_UDP_IDLE = 26,
|
||||
PENDING_TIMEOUT_CLIENT_CONN_IDLE = 27,
|
||||
PENDING_TIMEOUT_LAGGING = 28,
|
||||
|
||||
/****** add new things just above ---^ ******/
|
||||
|
||||
|
@ -5542,6 +5543,13 @@ lws_interface_to_sa(int ipv6, const char *ifname, struct sockaddr_in *addr,
|
|||
type it = &(start); \
|
||||
while (*(it)) {
|
||||
|
||||
#define lws_start_foreach_llp_safe(type, it, start, nxt)\
|
||||
{ \
|
||||
type it = &(start); \
|
||||
type next; \
|
||||
while (*(it)) { \
|
||||
next = &((*(it))->nxt); \
|
||||
|
||||
/**
|
||||
* lws_end_foreach_llp(): linkedlist pointer iterator helper end
|
||||
*
|
||||
|
@ -5557,6 +5565,11 @@ lws_interface_to_sa(int ipv6, const char *ifname, struct sockaddr_in *addr,
|
|||
} \
|
||||
}
|
||||
|
||||
#define lws_end_foreach_llp_safe(it) \
|
||||
it = next; \
|
||||
} \
|
||||
}
|
||||
|
||||
#define lws_ll_fwd_insert(\
|
||||
___new_object, /* pointer to new object */ \
|
||||
___m_list, /* member for next list object ptr */ \
|
||||
|
|
|
@ -111,8 +111,8 @@ lws_tls_server_certs_load(struct lws_vhost *vhost, struct lws *wsi,
|
|||
lws_filepos_t flen;
|
||||
long err;
|
||||
|
||||
if (!cert || !private_key) {
|
||||
lwsl_notice("%s: no paths\n", __func__);
|
||||
if ((!cert || !private_key) && (!mem_cert || !mem_privkey)) {
|
||||
lwsl_notice("%s: no usable input\n", __func__);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -405,6 +405,9 @@ lws_tls_generic_cert_checks(struct lws_vhost *vhost, const char *cert,
|
|||
* parameter.
|
||||
*/
|
||||
|
||||
if (!cert || !private_key)
|
||||
return LWS_TLS_EXTANT_NO;
|
||||
|
||||
n = lws_tls_use_any_upgrade_check_extant(cert);
|
||||
if (n == LWS_TLS_EXTANT_ALTERNATIVE)
|
||||
return LWS_TLS_EXTANT_ALTERNATIVE;
|
||||
|
|
|
@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 2.8)
|
|||
include(CheckCSourceCompiles)
|
||||
|
||||
set(SAMP lws-minimal-ws-server-ring)
|
||||
set(SRCS minimal-ws-server.c)
|
||||
set(SRCS minimal-ws-server-ring.c)
|
||||
|
||||
# If we are being built as part of lws, confirm current build config supports
|
||||
# reqconfig, else skip building ourselves.
|
||||
|
@ -75,4 +75,4 @@ if (requirements)
|
|||
else()
|
||||
target_link_libraries(${SAMP} websockets)
|
||||
endif()
|
||||
endif()
|
||||
endif()
|
||||
|
|
|
@ -19,3 +19,6 @@ Visit http://localhost:7681 on multiple browser windows
|
|||
Text you type in any browser window is sent to all of them.
|
||||
|
||||
A ringbuffer holds up to 8 lines of text.
|
||||
|
||||
This also demonstrates how the ringbuffer can take action against lagging or
|
||||
disconnected clients that cause the ringbuffer to fill.
|
||||
|
|
|
@ -8,6 +8,9 @@
|
|||
*
|
||||
* This version uses an lws_ring ringbuffer to cache up to 8 messages at a time,
|
||||
* so it's not so easy to lose messages.
|
||||
*
|
||||
* This also demonstrates how to "cull", ie, kill, connections that can't
|
||||
* keep up for some reason.
|
||||
*/
|
||||
|
||||
#if !defined (LWS_PLUGIN_STATIC)
|
||||
|
@ -31,6 +34,8 @@ struct per_session_data__minimal {
|
|||
struct per_session_data__minimal *pss_list;
|
||||
struct lws *wsi;
|
||||
uint32_t tail;
|
||||
|
||||
unsigned int culled:1;
|
||||
};
|
||||
|
||||
/* one of these is created for each vhost our protocol is used with */
|
||||
|
@ -45,6 +50,87 @@ struct per_vhost_data__minimal {
|
|||
struct lws_ring *ring; /* ringbuffer holding unsent messages */
|
||||
};
|
||||
|
||||
static void
|
||||
cull_lagging_clients(struct per_vhost_data__minimal *vhd)
|
||||
{
|
||||
uint32_t oldest_tail = lws_ring_get_oldest_tail(vhd->ring);
|
||||
struct per_session_data__minimal *old_pss = NULL;
|
||||
int most = 0, before = lws_ring_get_count_waiting_elements(vhd->ring,
|
||||
&oldest_tail), m;
|
||||
|
||||
/*
|
||||
* At least one guy with the oldest tail has lagged too far, filling
|
||||
* the ringbuffer with stuff waiting for them, while new stuff is
|
||||
* coming in, and they must close, freeing up ringbuffer entries.
|
||||
*/
|
||||
|
||||
lws_start_foreach_llp_safe(struct per_session_data__minimal **,
|
||||
ppss, vhd->pss_list, pss_list) {
|
||||
|
||||
if ((*ppss)->tail == oldest_tail) {
|
||||
old_pss = *ppss;
|
||||
|
||||
lwsl_user("Killing lagging client %p\n", (*ppss)->wsi);
|
||||
|
||||
lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING,
|
||||
/*
|
||||
* we may kill the wsi we came in on,
|
||||
* so the actual close is deferred
|
||||
*/
|
||||
LWS_TO_KILL_ASYNC);
|
||||
|
||||
/*
|
||||
* We might try to write something before we get a
|
||||
* chance to close. But this pss is now detached
|
||||
* from the ring buffer. Mark this pss as culled so we
|
||||
* don't try to do anything more with it.
|
||||
*/
|
||||
|
||||
(*ppss)->culled = 1;
|
||||
|
||||
/*
|
||||
* Because we can't kill it synchronously, but we
|
||||
* know it's closing momentarily and don't want its
|
||||
* participation any more, remove its pss from the
|
||||
* vhd pss list early. (This is safe to repeat
|
||||
* uselessly later in the close flow).
|
||||
*
|
||||
* Notice this changes *ppss!
|
||||
*/
|
||||
|
||||
lws_ll_fwd_remove(struct per_session_data__minimal,
|
||||
pss_list, (*ppss), vhd->pss_list);
|
||||
|
||||
/* use the changed *ppss so we won't skip anything */
|
||||
|
||||
continue;
|
||||
|
||||
} else {
|
||||
/*
|
||||
* so this guy is a survivor of the cull. Let's track
|
||||
* what is the largest number of pending ring elements
|
||||
* for any survivor.
|
||||
*/
|
||||
m = lws_ring_get_count_waiting_elements(vhd->ring,
|
||||
&((*ppss)->tail));
|
||||
if (m > most)
|
||||
most = m;
|
||||
}
|
||||
|
||||
} lws_end_foreach_llp_safe(ppss);
|
||||
|
||||
/*
|
||||
* Let's recover (ie, free up) all the ring slots between the
|
||||
* original oldest's last one and the "worst" survivor.
|
||||
*/
|
||||
|
||||
lws_ring_consume_and_update_oldest_tail(vhd->ring,
|
||||
struct per_session_data__minimal, &old_pss->tail, before - most,
|
||||
vhd->pss_list, tail, pss_list);
|
||||
|
||||
lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most);
|
||||
}
|
||||
|
||||
/* destroys the message when everyone has had a copy of it */
|
||||
|
||||
static void
|
||||
|
@ -92,18 +178,22 @@ callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
|
|||
|
||||
case LWS_CALLBACK_ESTABLISHED:
|
||||
/* add ourselves to the list of live pss held in the vhd */
|
||||
lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi);
|
||||
lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
|
||||
pss->tail = lws_ring_get_oldest_tail(vhd->ring);
|
||||
pss->wsi = wsi;
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi);
|
||||
/* remove our closing pss from the list of live pss */
|
||||
lws_ll_fwd_remove(struct per_session_data__minimal, pss_list,
|
||||
pss, vhd->pss_list);
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE:
|
||||
if (pss->culled)
|
||||
break;
|
||||
pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
|
||||
if (!pmsg)
|
||||
break;
|
||||
|
@ -135,18 +225,24 @@ callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
|
|||
case LWS_CALLBACK_RECEIVE:
|
||||
n = (int)lws_ring_get_count_free_elements(vhd->ring);
|
||||
if (!n) {
|
||||
lwsl_user("dropping!\n");
|
||||
break;
|
||||
/* forcibly make space */
|
||||
cull_lagging_clients(vhd);
|
||||
n = (int)lws_ring_get_count_free_elements(vhd->ring);
|
||||
}
|
||||
if (!n)
|
||||
break;
|
||||
|
||||
lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n);
|
||||
|
||||
amsg.len = len;
|
||||
/* notice we over-allocate by LWS_PRE */
|
||||
/* notice we over-allocate by LWS_PRE... */
|
||||
amsg.payload = malloc(LWS_PRE + len);
|
||||
if (!amsg.payload) {
|
||||
lwsl_user("OOM: dropping\n");
|
||||
break;
|
||||
}
|
||||
|
||||
/* ...and we copy the payload in at +LWS_PRE */
|
||||
memcpy((char *)amsg.payload + LWS_PRE, in, len);
|
||||
if (!lws_ring_insert(vhd->ring, &amsg, 1)) {
|
||||
__minimal_destroy_message(&amsg);
|
||||
|
@ -176,7 +272,7 @@ callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
|
|||
"lws-minimal", \
|
||||
callback_minimal, \
|
||||
sizeof(struct per_session_data__minimal), \
|
||||
128, \
|
||||
0, \
|
||||
0, NULL, 0 \
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue