1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

lws_ring: introduce generic flexible ringbuffer abstraction

This commit is contained in:
Andy Green 2017-09-20 10:37:59 +08:00
parent f85a1d42d8
commit 12adb39542
8 changed files with 706 additions and 108 deletions

View file

@ -939,7 +939,7 @@ check_extensions:
LWS_EXT_CB_CLIENT_CONSTRUCT,
(void *)&wsi->act_ext_user[wsi->count_act_ext],
(void *)&opts, 0)) {
lwsl_notice(" ext %s failed construction\n", ext_name);
lwsl_info(" ext %s failed construction\n", ext_name);
ext++;
continue;
}

View file

@ -59,7 +59,7 @@ lws_extension_pmdeflate_restrict_args(struct lws *wsi,
if (extra < priv->args[PMD_RX_BUF_PWR2]) {
priv->args[PMD_RX_BUF_PWR2] = extra;
lwsl_err(" Capping pmd rx to %d\n", 1 << extra);
lwsl_info(" Capping pmd rx to %d\n", 1 << extra);
}
}
@ -123,7 +123,7 @@ lws_extension_callback_pm_deflate(struct lws_context *context,
n = wsi->protocol->rx_buffer_size;
if (n < 128) {
lwsl_err(" permessage-deflate requires the protocol (%s) to have an RX buffer >= 128\n",
lwsl_info(" permessage-deflate requires the protocol (%s) to have an RX buffer >= 128\n",
wsi->protocol->name);
return -1;
}

View file

@ -3748,3 +3748,243 @@ lws_stats_atomic_max(struct lws_context * context,
}
#endif
LWS_VISIBLE LWS_EXTERN struct lws_ring *
lws_ring_create(size_t element_len, size_t count, void (*destroy_element)(void *))
{
struct lws_ring *ring = lws_malloc(sizeof(*ring));
if (!ring)
return NULL;
ring->buflen = count * element_len;
ring->element_len = element_len;
ring->head = 0;
ring->oldest_tail = 0;
ring->destroy_element = destroy_element;
ring->buf = lws_malloc(ring->buflen);
if (!ring->buf) {
lws_free(ring);
return NULL;
}
return ring;
}
LWS_VISIBLE LWS_EXTERN void
lws_ring_destroy(struct lws_ring *ring)
{
if (ring->destroy_element)
while (ring->oldest_tail != ring->head) {
ring->destroy_element((uint8_t *)ring->buf + ring->oldest_tail);
ring->oldest_tail = (ring->oldest_tail + ring->element_len) %
ring->buflen;
}
if (ring->buf)
lws_free_set_NULL(ring->buf);
lws_free(ring);
}
LWS_VISIBLE LWS_EXTERN size_t
lws_ring_get_count_free_elements(struct lws_ring *ring)
{
int f;
/*
* possible ringbuf patterns
*
* h == t
* |--------t***h---|
* |**h-----------t*|
* |t**************h|
* |*****ht*********|
*/
if (ring->head == ring->oldest_tail)
f = ring->buflen - ring->element_len;
else
if (ring->head < ring->oldest_tail)
f = (ring->oldest_tail - ring->head) - ring->element_len;
else
f = (ring->buflen - ring->head) + ring->oldest_tail - ring->element_len;
if (f < 2)
return 0;
return f / ring->element_len;
}
LWS_VISIBLE LWS_EXTERN size_t
lws_ring_get_count_waiting_elements(struct lws_ring *ring, uint32_t *tail)
{ int f;
if (!tail)
tail = &ring->oldest_tail;
/*
* possible ringbuf patterns
*
* h == t
* |--------t***h---|
* |**h-----------t*|
* |t**************h|
* |*****ht*********|
*/
if (ring->head == *tail)
f = 0;
else
if (ring->head > *tail)
f = (ring->head - *tail);
else
f = (ring->buflen - *tail) + ring->head;
return f / ring->element_len;
}
LWS_VISIBLE LWS_EXTERN int
lws_ring_next_linear_insert_range(struct lws_ring *ring, void **start, size_t *bytes)
{
int n;
/* n is how many bytes the whole fifo can take */
n = lws_ring_get_count_free_elements(ring) * ring->element_len;
if (!n)
return 1;
if (ring->head + n > ring->buflen) {
*start = (void *)(((uint8_t *)ring->buf) + ring->head);
*bytes = ring->buflen - ring->head;
return 0;
}
*start = (void *)(((uint8_t *)ring->buf) + ring->head);
*bytes = n;
return 0;
}
LWS_VISIBLE LWS_EXTERN void
lws_ring_bump_head(struct lws_ring *ring, size_t bytes)
{
ring->head = (ring->head + bytes) % ring->buflen;
}
LWS_VISIBLE LWS_EXTERN size_t
lws_ring_insert(struct lws_ring *ring, const void *src, size_t max_count)
{
const uint8_t *osrc = src;
int m, n;
/* n is how many bytes the whole fifo can take */
n = lws_ring_get_count_free_elements(ring) * ring->element_len;
/* restrict n to how much we want to insert */
if ((size_t)n > max_count * ring->element_len)
n = max_count * ring->element_len;
/*
* n is legal to insert, but as an optimization we can cut the
* insert into one or two memcpys, depending on if it wraps
*/
if (ring->head + n > ring->buflen) {
/*
* He does wrap. The first memcpy should take us up to
* the end of the buffer
*/
m = ring->buflen - ring->head;
memcpy(((uint8_t *)ring->buf) + ring->head, src, m);
/* we know it will wrap exactly back to zero */
ring->head = 0;
/* adapt the second memcpy for what we already did */
src = ((uint8_t *)src) + m;
n -= m;
}
memcpy(((uint8_t *)ring->buf) + ring->head, src, n);
ring->head = (ring->head + n) % ring->buflen;
return (((uint8_t *)src + n) - osrc) / ring->element_len;
}
LWS_VISIBLE LWS_EXTERN size_t
lws_ring_consume(struct lws_ring *ring, uint32_t *tail, void *dest, size_t max_count)
{
uint8_t *odest = dest;
void *orig_tail = tail;
uint32_t fake_tail;
int m, n;
if (!tail) {
fake_tail = ring->oldest_tail;
tail = &fake_tail;
}
/* n is how many bytes the whole fifo has for us */
n = lws_ring_get_count_waiting_elements(ring, tail) * ring->element_len;
/* restrict n to how much we want to insert */
if ((size_t)n > max_count * ring->element_len)
n = max_count * ring->element_len;
if (!dest) {
*tail = ((*tail) + n) % ring->buflen;
if (!orig_tail) /* single tail */
lws_ring_update_oldest_tail(ring, *tail);
return n / ring->element_len;
}
if (*tail + n > ring->buflen) {
/*
* He does wrap. The first memcpy should take us up to
* the end of the buffer
*/
m = ring->buflen - *tail;
memcpy(dest, ((uint8_t *)ring->buf) + *tail, m);
/* we know it will wrap exactly back to zero */
*tail = 0;
/* adapt the second memcpy for what we already did */
dest = ((uint8_t *)dest) + m;
n -= m;
}
memcpy(dest, ((uint8_t *)ring->buf) + *tail, n);
*tail = ((*tail) + n) % ring->buflen;
if (!orig_tail) /* single tail */
lws_ring_update_oldest_tail(ring, *tail);
return (((uint8_t *)dest + n) - odest) / ring->element_len;
}
LWS_VISIBLE LWS_EXTERN void
lws_ring_update_oldest_tail(struct lws_ring *ring, uint32_t tail)
{
if (!ring->destroy_element) {
ring->oldest_tail = tail;
return;
}
while (ring->oldest_tail != tail) {
ring->destroy_element((uint8_t *)ring->buf + ring->oldest_tail);
ring->oldest_tail = (ring->oldest_tail + ring->element_len) % ring->buflen;
}
}
LWS_VISIBLE LWS_EXTERN uint32_t
lws_ring_get_oldest_tail(struct lws_ring *ring)
{
return ring->oldest_tail;
}

View file

@ -4602,6 +4602,196 @@ lws_interface_to_sa(int ipv6, const char *ifname, struct sockaddr_in *addr,
} \
}
/**
* lws_ring: generic ringbuffer struct
*
* all of the members are opaque and manipulated by lws_ring_...() apis.
*
* The lws_ring and its buffer is allocated at runtime on the heap, using
*
* - lws_ring_create()
* - lws_ring_destroy()
*
* It may contain any type, the size of the "element" stored in the ring
* buffer and the number of elements is given at creation time.
*
* Whole elements may be inserted into the ringbuffer and removed from it, using
*
* - lws_ring_insert()
* - lws_ring_consume()
*
* You can find out how many whole elements are free or waiting using
*
* - lws_ring_get_count_free_elements()
* - lws_ring_get_count_waiting_elements()
*
* In addition there are special purpose optional byte-centric apis
*
* - lws_ring_next_linear_insert_range()
* - lws_ring_bump_head()
*
* which let you, eg, read() directly into the ringbuffer without needing
* an intermediate bounce buffer.
*
* The accessors understand that the ring wraps, and optimizes insertion and
* consumption into one or two memcpy()s depending on if the head or tail
* wraps.
*
* lws_ring only supports a single head, but optionally multiple tails with
* an API to inform it when the "oldest" tail has moved on. You can give
* NULL where-ever an api asks for a tail pointer, and it will use an internal
* single tail pointer for convenience.
*
* The "oldest tail", which is the only tail if you give it NULL instead of
* some other tail, is used to track which elements in the ringbuffer are
* still unread by anyone.
*
* - lws_ring_update_oldest_tail()
*/
struct lws_ring;
/**
* lws_ring_create(): create a new ringbuffer
*
* \param element_len: the size in bytes of one element in the ringbuffer
* \param count: the number of elements the ringbuffer can contain
* \param destroy_element: NULL, or callback to be called for each element
* that is removed from the ringbuffer due to the
* oldest tail moving beyond it
*
* Creates the ringbuffer and allocates the storage. Returns the new
* lws_ring *, or NULL if the allocation failed.
*
* If non-NULL, destroy_element will get called back for every element that is
* retired from the ringbuffer after the oldest tail has gone past it, and for
* any element still left in the ringbuffer when it is destroyed. It replaces
* all other element destruction code in your user code.
*/
LWS_VISIBLE LWS_EXTERN struct lws_ring *
lws_ring_create(size_t element_len, size_t count,
void (*destroy_element)(void *element));
/**
* lws_ring_destroy(): destroy a previously created ringbuffer
*
* \param ring: the struct lws_ring to destroy
*
* Destroys the ringbuffer allocation and the struct lws_ring itself.
*/
LWS_VISIBLE LWS_EXTERN void
lws_ring_destroy(struct lws_ring *ring);
/**
* lws_ring_get_count_free_elements(): return how many elements can fit
* in the free space
*
* \param ring: the struct lws_ring to report on
*
* Returns how much room is left in the ringbuffer for whole element insertion.
*/
LWS_VISIBLE LWS_EXTERN size_t
lws_ring_get_count_free_elements(struct lws_ring *ring);
/**
* lws_ring_get_count_waiting_elements(): return how many elements can be consumed
*
* \param ring: the struct lws_ring to report on
* \param tail: a pointer to the tail struct to use, or NULL for single tail
*
* Returns how many elements are waiting to be consumed from the perspective
* of the tail pointer given.
*/
LWS_VISIBLE LWS_EXTERN size_t
lws_ring_get_count_waiting_elements(struct lws_ring *ring, uint32_t *tail);
/**
* lws_ring_insert(): attempt to insert up to max_count elements from src
*
* \param ring: the struct lws_ring to report on
* \param src: the array of elements to be inserted
* \param max_count: the number of available elements at src
*
* Attempts to insert as many of the elements at src as possible, up to the
* maximum max_count. Returns the number of elements actually inserted.
*/
LWS_VISIBLE LWS_EXTERN size_t
lws_ring_insert(struct lws_ring *ring, const void *src, size_t max_count);
/**
* lws_ring_consume(): attempt to copy out and remove up to max_count elements
* to src
*
* \param ring: the struct lws_ring to report on
* \param tail: a pointer to the tail struct to use, or NULL for single tail
* \param dest: the array of elements to be inserted
* \param max_count: the number of available elements at src
*
* Attempts to copy out as many waiting elements as possible into dest, from
* the perspective of the given tail, up to max_count.
*
* Returns the number of elements copied out.
*/
LWS_VISIBLE LWS_EXTERN size_t
lws_ring_consume(struct lws_ring *ring, uint32_t *tail, void *dest,
size_t max_count);
/**
* lws_ring_update_oldest_tail(): free up elements older than tail for reuse
*
* \param ring: the struct lws_ring to report on
* \param tail: a pointer to the tail struct to use, or NULL for single tail
*
* If you are using multiple tails, you must use this API to inform the
* lws_ring when none of the tails still need elements in the fifo any more,
* by updating it when the "oldest" tail has moved on.
*/
LWS_VISIBLE LWS_EXTERN void
lws_ring_update_oldest_tail(struct lws_ring *ring, uint32_t tail);
/**
* lws_ring_get_oldest_tail(): get current oldest available data index
*
* \param ring: the struct lws_ring to report on
*
* If you are initializing a new ringbuffer consumer, you can set its tail to
* this to start it from the oldest ringbuffer entry still available.
*/
LWS_VISIBLE LWS_EXTERN uint32_t
lws_ring_get_oldest_tail(struct lws_ring *ring);
/**
* lws_ring_next_linear_insert_range(): used to write directly into the ring
*
* \param ring: the struct lws_ring to report on
* \param start: pointer to a void * set to the start of the next ringbuffer area
* \param bytes: pointer to a size_t set to the max length you may use from *start
*
* This provides a low-level, bytewise access directly into the ringbuffer
* allowing direct insertion of data without having to use a bounce buffer.
*
* The api reports the position and length of the next linear range that can
* be written in the ringbuffer, ie, up to the point it would wrap, and sets
* *start and *bytes accordingly. You can then, eg, directly read() into
* *start for up to *bytes, and use lws_ring_bump_head() to update the lws_ring
* with what you have done.
*
* Returns nonzero if no insertion is currently possible.
*/
LWS_VISIBLE LWS_EXTERN int
lws_ring_next_linear_insert_range(struct lws_ring *ring, void **start,
size_t *bytes);
/**
* lws_ring_bump_head(): used to write directly into the ring
*
* \param ring: the struct lws_ring to operate on
* \param bytes: the number of bytes you inserted at the current head
*/
LWS_VISIBLE LWS_EXTERN void
lws_ring_bump_head(struct lws_ring *ring, size_t bytes);
/**
* lws_snprintf(): snprintf that truncates the returned length too
*

View file

@ -668,6 +668,15 @@ enum {
LWS_RXFLOW_PENDING_CHANGE = (1 << 1),
};
struct lws_ring {
void *buf;
void (*destroy_element)(void *element);
size_t buflen;
size_t element_len;
uint32_t head;
uint32_t oldest_tail;
};
/* this is not usable directly by user code any more, lws_close_reason() */
#define LWS_WRITE_CLOSE 4

View file

@ -157,7 +157,7 @@ lws_extension_server_handshake(struct lws *wsi, char **p, int budget)
(void *)&wsi->act_ext_user[
wsi->count_act_ext],
(void *)&opts, 0)) {
lwsl_notice("ext %s failed construction\n",
lwsl_info("ext %s failed construction\n",
ext_name);
ext_count--;
ext++;

View file

@ -1,7 +1,7 @@
/*
* libwebsockets-test-server - libwebsockets test implementation
*
* Copyright (C) 2010-2016 Andy Green <andy@warmcat.com>
* Copyright (C) 2010-2017 Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
@ -27,23 +27,21 @@
#include <string.h>
#include <stdlib.h>
/* lws-mirror_protocol */
#if defined(LWS_WITH_ESP8266)
#define MAX_MESSAGE_QUEUE 64
#else
#define MAX_MESSAGE_QUEUE 512
#endif
#define QUEUELEN 64
/* queue free space below this, rx flow is disabled */
#define RXFLOW_MIN (4)
/* queue free space above this, rx flow is enabled */
#define RXFLOW_MAX (QUEUELEN / 3)
#define MAX_MIRROR_INSTANCES 10
struct lws_mirror_instance;
struct mirror_instance;
struct per_session_data__lws_mirror {
struct lws *wsi;
struct lws_mirror_instance *mi;
struct mirror_instance *mi;
struct per_session_data__lws_mirror *same_mi_pss_list;
int ringbuffer_tail;
uint32_t tail;
};
struct a_message {
@ -51,18 +49,132 @@ struct a_message {
size_t len;
};
struct lws_mirror_instance {
struct lws_mirror_instance *next;
struct mirror_instance {
struct mirror_instance *next;
struct per_session_data__lws_mirror *same_mi_pss_list;
struct lws_ring *ring;
char name[30];
struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
int ringbuffer_head;
char rx_enabled;
};
struct per_vhost_data__lws_mirror {
struct lws_mirror_instance *mi_list;
struct mirror_instance *mi_list;
};
/* enable or disable rx from all connections to this mirror instance */
static void
mirror_rxflow_instance(struct mirror_instance *mi, int enable)
{
lws_start_foreach_ll(struct per_session_data__lws_mirror *,
pss, mi->same_mi_pss_list) {
lws_rx_flow_control(pss->wsi, enable);
} lws_end_foreach_ll(pss, same_mi_pss_list);
mi->rx_enabled = enable;
}
/*
* Find out which connection to this mirror instance has the longest number
* of still unread elements in the ringbuffer and update the lws_ring "oldest
* tail" with it. Elements behind the "oldest tail" are freed and recycled for
* new head content. Elements after the "oldest tail" are still waiting to be
* read by somebody.
*
* If the oldest tail moved on from before, check if it created enough space
* in the queue to re-enable RX flow control for the mirror instance.
*
* Mark connections that are at the oldest tail as being on a 3s timeout to
* transmit something, otherwise the connection will be closed. Without this,
* a choked or nonresponsive connection can block the FIFO from freeing up any
* new space for new data.
*
* You can skip calling this if on your connection, before processing, the tail
* was not equal to the current worst, ie, if the tail you will work on is !=
* lws_ring_get_oldest_tail(ring) then no need to call this when the tail
* has changed; it wasn't the oldest so it won't change the oldest.
*
* Returns 0 if oldest unchanged or 1 if oldest changed from this call.
*/
static int
mirror_update_worst_tail(struct mirror_instance *mi)
{
uint32_t wai, worst = 0, worst_tail, oldest;
struct per_session_data__lws_mirror *worst_pss = NULL;
oldest = lws_ring_get_oldest_tail(mi->ring);
lws_start_foreach_ll(struct per_session_data__lws_mirror *,
pss, mi->same_mi_pss_list) {
wai = lws_ring_get_count_waiting_elements(mi->ring, &pss->tail);
if (wai >= worst) {
worst = wai;
worst_tail = pss->tail;
worst_pss = pss;
}
} lws_end_foreach_ll(pss, same_mi_pss_list);
if (!worst_pss)
return 0;
lws_ring_update_oldest_tail(mi->ring, worst_tail);
if (oldest == lws_ring_get_oldest_tail(mi->ring))
return 0;
/*
* The oldest tail did move on. Check if we should re-enable rx flow
* for the mirror instance since we made some space now.
*/
if (!mi->rx_enabled && /* rx is disabled */
lws_ring_get_count_free_elements(mi->ring) > RXFLOW_MAX)
/* there is enough space, let's re-enable rx for our instance */
mirror_rxflow_instance(mi, 1);
/* if nothing in queue, no timeout needed */
if (!worst)
return 1;
/*
* The guy(s) with the oldest tail block the ringbuffer from recycling
* the FIFO entries he has not read yet. Don't allow those guys to
* block the FIFO operation for very long.
*/
lws_start_foreach_ll(struct per_session_data__lws_mirror *,
pss, mi->same_mi_pss_list) {
if (pss->tail == worst_tail)
/*
* Our policy is if you are the slowest connection,
* you had better transmit something to help with that
* within 3s, or we will hang up on you to stop you
* blocking the FIFO for everyone else.
*/
lws_set_timeout(pss->wsi,
PENDING_TIMEOUT_USER_REASON_BASE, 3);
} lws_end_foreach_ll(pss, same_mi_pss_list);
return 1;
}
static void
mirror_callback_all_in_mi_on_writable(struct mirror_instance *mi)
{
/* ask for WRITABLE callback for every wsi on this mi */
lws_start_foreach_ll(struct per_session_data__lws_mirror *,
pss, mi->same_mi_pss_list) {
lws_callback_on_writable(pss->wsi);
} lws_end_foreach_ll(pss, same_mi_pss_list);
}
static void
mirror_destroy_message(void *_msg)
{
struct a_message *msg = _msg;
free(msg->payload);
msg->payload = NULL;
msg->len = 0;
}
static int
callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
@ -72,13 +184,15 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
struct per_vhost_data__lws_mirror *v =
(struct per_vhost_data__lws_mirror *)
lws_protocol_vh_priv_get(lws_get_vhost(wsi),
lws_get_protocol(wsi));
struct lws_mirror_instance *mi = NULL;
char name[30];
int n, m, count_mi = 0;
lws_get_protocol(wsi));
struct mirror_instance *mi = NULL;
const struct a_message *msg;
struct a_message amsg;
char name[300], update_worst, sent_something;
uint32_t oldest_tail;
int n, count_mi = 0;
switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
lwsl_info("%s: LWS_CALLBACK_ESTABLISHED\n", __func__);
@ -86,21 +200,21 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
* mirror instance name... defaults to "", but if URL includes
* "?mirror=xxx", will be "xxx"
*/
name[0] = '\0';
lws_get_urlarg_by_name(wsi, "mirror", name, sizeof(name) - 1);
lwsl_notice("mirror %s\n", name);
if (lws_get_urlarg_by_name(wsi, "mirror", name,
sizeof(name) - 1))
lwsl_debug("get urlarg failed\n");
lwsl_info("%s: mirror name '%s'\n", __func__, name);
/* is there already a mirror instance of this name? */
lws_start_foreach_ll(struct lws_mirror_instance *,
mi1, v->mi_list) {
lws_start_foreach_ll(struct mirror_instance *, mi1,
v->mi_list) {
count_mi++;
if (strcmp(name, mi1->name))
continue;
/* yes... we will join it */
lwsl_notice("Joining existing mi %p '%s'\n", mi1, name);
lwsl_info("Joining existing mi %p '%s'\n", mi1, name);
mi = mi1;
break;
} lws_end_foreach_ll(mi1, next);
@ -108,20 +222,28 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
if (!mi) {
/* no existing mirror instance for name */
if (count_mi == MAX_MIRROR_INSTANCES)
return -1;
/* create one with this name, and join it */
mi = malloc(sizeof(*mi));
if (!mi)
return 1;
memset(mi, 0, sizeof(*mi));
mi->ring = lws_ring_create(sizeof(struct a_message),
QUEUELEN,
mirror_destroy_message);
if (!mi->ring) {
free(mi);
return 1;
}
mi->next = v->mi_list;
v->mi_list = mi;
strcpy(mi->name, name);
mi->ringbuffer_head = 0;
mi->rx_enabled = 1;
lwsl_notice("Created new mi %p '%s'\n", mi, name);
lwsl_info("Created new mi %p '%s'\n", mi, name);
}
/* add our pss to list of guys bound to this mi */
@ -132,23 +254,20 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
/* init the pss */
pss->mi = mi;
pss->ringbuffer_tail = mi->ringbuffer_head;
pss->tail = lws_ring_get_oldest_tail(mi->ring);
pss->wsi = wsi;
break;
case LWS_CALLBACK_CLOSED:
/* detach our pss from the mirror instance */
mi = pss->mi;
if (!mi)
break;
/* remove our closing pss from its mirror instance list */
lws_start_foreach_llp(struct per_session_data__lws_mirror **,
ppss, mi->same_mi_pss_list) {
if (*ppss == pss) {
*ppss = pss->same_mi_pss_list;
break;
}
@ -156,106 +275,131 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
pss->mi = NULL;
if (mi->same_mi_pss_list)
if (mi->same_mi_pss_list) {
/*
* Still other pss using the mirror instance. The pss
* going away may have had the oldest tail, reconfirm
* using the remaining pss what is the current oldest
* tail. If the oldest tail moves on, this call also
* will re-enable rx flow control when appropriate.
*/
mirror_update_worst_tail(mi);
break;
}
/* last pss unbound from mi... delete mi */
/* No more pss using the mirror instance... delete mi */
lws_start_foreach_llp(struct lws_mirror_instance **,
lws_start_foreach_llp(struct mirror_instance **,
pmi, v->mi_list) {
if (*pmi != mi)
continue;
*pmi = (*pmi)->next;
lwsl_info("%s: mirror cleaniup %p\n", __func__, v);
for (n = 0; n < ARRAY_SIZE(mi->ringbuffer); n++)
if (mi->ringbuffer[n].payload) {
free(mi->ringbuffer[n].payload);
mi->ringbuffer[n].payload = NULL;
}
lws_ring_destroy(mi->ring);
free(mi);
break;
} lws_end_foreach_llp(pmi, next);
break;
case LWS_CALLBACK_CONFIRM_EXTENSION_OKAY:
return 1; /* disallow compression */
case LWS_CALLBACK_PROTOCOL_INIT: /* per vhost */
lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
lws_get_protocol(wsi),
sizeof(struct per_vhost_data__lws_mirror));
break;
case LWS_CALLBACK_PROTOCOL_DESTROY: /* per vhost */
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
while (pss->ringbuffer_tail != pss->mi->ringbuffer_head) {
m = pss->mi->ringbuffer[pss->ringbuffer_tail].len;
n = lws_write(wsi, (unsigned char *)
pss->mi->ringbuffer[pss->ringbuffer_tail].payload +
LWS_PRE, m, LWS_WRITE_TEXT);
if (n < 0) {
lwsl_err("ERROR %d writing to mirror socket\n", n);
return -1;
}
if (n < m)
lwsl_err("mirror partial write %d vs %d\n", n, m);
oldest_tail = lws_ring_get_oldest_tail(pss->mi->ring);
update_worst = oldest_tail == pss->tail;
sent_something = 0;
if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
pss->ringbuffer_tail = 0;
else
pss->ringbuffer_tail++;
lwsl_debug(" original oldest %d, free elements %ld\n",
oldest_tail,
lws_ring_get_count_free_elements(pss->mi->ring));
if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
(MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15))
lws_rx_flow_allow_all_protocol(lws_get_context(wsi),
lws_get_protocol(wsi));
do {
msg = lws_ring_get_element(pss->mi->ring, &pss->tail);
if (!msg)
break;
if (lws_send_pipe_choked(wsi)) {
lws_callback_on_writable(wsi);
if (!msg->payload) {
lwsl_err("%s: NULL payload: worst = %d,"
" pss->tail = %d\n", __func__,
oldest_tail, pss->tail);
if (lws_ring_consume(pss->mi->ring, &pss->tail,
NULL, 1))
continue;
break;
}
}
n = lws_write(wsi, (unsigned char *)msg->payload +
LWS_PRE, msg->len, LWS_WRITE_TEXT);
if (n < 0) {
lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
return -1;
}
sent_something = 1;
lws_ring_consume(pss->mi->ring, &pss->tail, NULL, 1);
} while (!lws_send_pipe_choked(wsi));
/* if any left for us to send, ask for writeable again */
if (lws_ring_get_count_waiting_elements(pss->mi->ring,
&pss->tail))
lws_callback_on_writable(wsi);
if (!sent_something || !update_worst)
break;
/*
* We are no longer holding the oldest tail (since we sent
* something. So free us of the timeout related to hogging the
* oldest tail.
*/
lws_set_timeout(pss->wsi, NO_PENDING_TIMEOUT, 0);
/*
* If we were originally at the oldest fifo position of
* all the tails, now we used some up we may have
* changed the oldest fifo position and made some space.
*/
mirror_update_worst_tail(pss->mi);
break;
case LWS_CALLBACK_RECEIVE:
if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
(MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) {
lwsl_err("dropping!\n");
goto choke;
n = lws_ring_get_count_free_elements(pss->mi->ring);
if (!n) {
lwsl_notice("dropping!\n");
if (pss->mi->rx_enabled)
mirror_rxflow_instance(pss->mi, 0);
goto req_writable;
}
if (pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload)
free(pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload);
amsg.payload = malloc(LWS_PRE + len);
amsg.len = len;
if (!amsg.payload) {
lwsl_notice("OOM: dropping\n");
break;
}
memcpy((char *)amsg.payload + LWS_PRE, in, len);
if (!lws_ring_insert(pss->mi->ring, &amsg, 1)) {
mirror_destroy_message(&amsg);
lwsl_notice("dropping!\n");
if (pss->mi->rx_enabled)
mirror_rxflow_instance(pss->mi, 0);
goto req_writable;
}
pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload = malloc(LWS_PRE + len);
pss->mi->ringbuffer[pss->mi->ringbuffer_head].len = len;
memcpy((char *)pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload +
LWS_PRE, in, len);
if (pss->mi->ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
pss->mi->ringbuffer_head = 0;
else
pss->mi->ringbuffer_head++;
if (pss->mi->rx_enabled &&
lws_ring_get_count_free_elements(pss->mi->ring) < RXFLOW_MIN)
mirror_rxflow_instance(pss->mi, 0);
if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) &
(MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2))
goto done;
choke:
lwsl_debug("LWS_CALLBACK_RECEIVE: throttling %p\n", wsi);
lws_rx_flow_control(wsi, 0);
done:
/*
* ask for WRITABLE callback for every wsi bound to this
* mirror instance
*/
lws_start_foreach_ll(struct per_session_data__lws_mirror *,
pss1, pss->mi->same_mi_pss_list) {
lws_callback_on_writable(pss1->wsi);
} lws_end_foreach_ll(pss1, same_mi_pss_list);
req_writable:
mirror_callback_all_in_mi_on_writable(pss->mi);
break;
default:

View file

@ -694,6 +694,8 @@ function ot_req_close() {
var ctx;
var socket_lm;
var color = "#000000";
var pending = "";
var lm_timer;
socket_lm = lws_meta.new_ws(get_appropriate_ws_url("?mirror=" + mirror_name),
"lws-mirror-protocol");
@ -772,6 +774,11 @@ function ev_mouseup(ev) {
no_last = 1;
}
function lm_timer_handler(ev) {
socket_lm.send(pending);
pending="";
}
function ev_mousemove (ev) {
var x, y;
@ -792,7 +799,15 @@ function ev_mousemove (ev) {
last_y = y;
return;
}
socket_lm.send("d " + color + " " + last_x + " " + last_y + " " + x + ' ' + y + ';');
pending = pending + "d " + color + " " + last_x + " " + last_y +
" " + x + ' ' + y + ';';
if (pending.length > 400) {
socket_lm.send(pending);
clearTimeout(lm_timer);
pending = "";
} else
lm_timer = setTimeout(lm_timer_handler, 30);
last_x = x;
last_y = y;