diff --git a/lib/client.c b/lib/client.c index f712271d..cb314d08 100755 --- a/lib/client.c +++ b/lib/client.c @@ -939,7 +939,7 @@ check_extensions: LWS_EXT_CB_CLIENT_CONSTRUCT, (void *)&wsi->act_ext_user[wsi->count_act_ext], (void *)&opts, 0)) { - lwsl_notice(" ext %s failed construction\n", ext_name); + lwsl_info(" ext %s failed construction\n", ext_name); ext++; continue; } diff --git a/lib/extension-permessage-deflate.c b/lib/extension-permessage-deflate.c index f027e1f0..c9bb75f1 100644 --- a/lib/extension-permessage-deflate.c +++ b/lib/extension-permessage-deflate.c @@ -59,7 +59,7 @@ lws_extension_pmdeflate_restrict_args(struct lws *wsi, if (extra < priv->args[PMD_RX_BUF_PWR2]) { priv->args[PMD_RX_BUF_PWR2] = extra; - lwsl_err(" Capping pmd rx to %d\n", 1 << extra); + lwsl_info(" Capping pmd rx to %d\n", 1 << extra); } } @@ -123,7 +123,7 @@ lws_extension_callback_pm_deflate(struct lws_context *context, n = wsi->protocol->rx_buffer_size; if (n < 128) { - lwsl_err(" permessage-deflate requires the protocol (%s) to have an RX buffer >= 128\n", + lwsl_info(" permessage-deflate requires the protocol (%s) to have an RX buffer >= 128\n", wsi->protocol->name); return -1; } diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index 00ffcfd8..c2ccdd71 100755 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -3748,3 +3748,243 @@ lws_stats_atomic_max(struct lws_context * context, } #endif + +LWS_VISIBLE LWS_EXTERN struct lws_ring * +lws_ring_create(size_t element_len, size_t count, void (*destroy_element)(void *)) +{ + struct lws_ring *ring = lws_malloc(sizeof(*ring)); + + if (!ring) + return NULL; + + ring->buflen = count * element_len; + ring->element_len = element_len; + ring->head = 0; + ring->oldest_tail = 0; + ring->destroy_element = destroy_element; + + ring->buf = lws_malloc(ring->buflen); + if (!ring->buf) { + lws_free(ring); + + return NULL; + } + + return ring; +} + +LWS_VISIBLE LWS_EXTERN void +lws_ring_destroy(struct lws_ring *ring) +{ + if (ring->destroy_element) + while (ring->oldest_tail != ring->head) { + ring->destroy_element((uint8_t *)ring->buf + ring->oldest_tail); + ring->oldest_tail = (ring->oldest_tail + ring->element_len) % + ring->buflen; + } + if (ring->buf) + lws_free_set_NULL(ring->buf); + + lws_free(ring); +} + +LWS_VISIBLE LWS_EXTERN size_t +lws_ring_get_count_free_elements(struct lws_ring *ring) +{ + int f; + + /* + * possible ringbuf patterns + * + * h == t + * |--------t***h---| + * |**h-----------t*| + * |t**************h| + * |*****ht*********| + */ + if (ring->head == ring->oldest_tail) + f = ring->buflen - ring->element_len; + else + if (ring->head < ring->oldest_tail) + f = (ring->oldest_tail - ring->head) - ring->element_len; + else + f = (ring->buflen - ring->head) + ring->oldest_tail - ring->element_len; + + if (f < 2) + return 0; + + return f / ring->element_len; +} + +LWS_VISIBLE LWS_EXTERN size_t +lws_ring_get_count_waiting_elements(struct lws_ring *ring, uint32_t *tail) +{ int f; + + if (!tail) + tail = &ring->oldest_tail; + /* + * possible ringbuf patterns + * + * h == t + * |--------t***h---| + * |**h-----------t*| + * |t**************h| + * |*****ht*********| + */ + if (ring->head == *tail) + f = 0; + else + if (ring->head > *tail) + f = (ring->head - *tail); + else + f = (ring->buflen - *tail) + ring->head; + + return f / ring->element_len; +} + +LWS_VISIBLE LWS_EXTERN int +lws_ring_next_linear_insert_range(struct lws_ring *ring, void **start, size_t *bytes) +{ + int n; + + /* n is how many bytes the whole fifo can take */ + n = lws_ring_get_count_free_elements(ring) * ring->element_len; + + if (!n) + return 1; + + if (ring->head + n > ring->buflen) { + *start = (void *)(((uint8_t *)ring->buf) + ring->head); + *bytes = ring->buflen - ring->head; + + return 0; + } + + *start = (void *)(((uint8_t *)ring->buf) + ring->head); + *bytes = n; + + return 0; +} + +LWS_VISIBLE LWS_EXTERN void +lws_ring_bump_head(struct lws_ring *ring, size_t bytes) +{ + ring->head = (ring->head + bytes) % ring->buflen; +} + +LWS_VISIBLE LWS_EXTERN size_t +lws_ring_insert(struct lws_ring *ring, const void *src, size_t max_count) +{ + const uint8_t *osrc = src; + int m, n; + + /* n is how many bytes the whole fifo can take */ + n = lws_ring_get_count_free_elements(ring) * ring->element_len; + + /* restrict n to how much we want to insert */ + if ((size_t)n > max_count * ring->element_len) + n = max_count * ring->element_len; + + /* + * n is legal to insert, but as an optimization we can cut the + * insert into one or two memcpys, depending on if it wraps + */ + + if (ring->head + n > ring->buflen) { + + /* + * He does wrap. The first memcpy should take us up to + * the end of the buffer + */ + + m = ring->buflen - ring->head; + memcpy(((uint8_t *)ring->buf) + ring->head, src, m); + /* we know it will wrap exactly back to zero */ + ring->head = 0; + + /* adapt the second memcpy for what we already did */ + + src = ((uint8_t *)src) + m; + n -= m; + } + + memcpy(((uint8_t *)ring->buf) + ring->head, src, n); + ring->head = (ring->head + n) % ring->buflen; + + return (((uint8_t *)src + n) - osrc) / ring->element_len; +} + +LWS_VISIBLE LWS_EXTERN size_t +lws_ring_consume(struct lws_ring *ring, uint32_t *tail, void *dest, size_t max_count) +{ + uint8_t *odest = dest; + void *orig_tail = tail; + uint32_t fake_tail; + int m, n; + + if (!tail) { + fake_tail = ring->oldest_tail; + tail = &fake_tail; + } + + /* n is how many bytes the whole fifo has for us */ + n = lws_ring_get_count_waiting_elements(ring, tail) * ring->element_len; + + /* restrict n to how much we want to insert */ + if ((size_t)n > max_count * ring->element_len) + n = max_count * ring->element_len; + + if (!dest) { + *tail = ((*tail) + n) % ring->buflen; + if (!orig_tail) /* single tail */ + lws_ring_update_oldest_tail(ring, *tail); + + return n / ring->element_len; + } + + if (*tail + n > ring->buflen) { + + /* + * He does wrap. The first memcpy should take us up to + * the end of the buffer + */ + + m = ring->buflen - *tail; + memcpy(dest, ((uint8_t *)ring->buf) + *tail, m); + /* we know it will wrap exactly back to zero */ + *tail = 0; + + /* adapt the second memcpy for what we already did */ + + dest = ((uint8_t *)dest) + m; + n -= m; + } + + memcpy(dest, ((uint8_t *)ring->buf) + *tail, n); + + *tail = ((*tail) + n) % ring->buflen; + if (!orig_tail) /* single tail */ + lws_ring_update_oldest_tail(ring, *tail); + + return (((uint8_t *)dest + n) - odest) / ring->element_len; +} + +LWS_VISIBLE LWS_EXTERN void +lws_ring_update_oldest_tail(struct lws_ring *ring, uint32_t tail) +{ + if (!ring->destroy_element) { + ring->oldest_tail = tail; + return; + } + + while (ring->oldest_tail != tail) { + ring->destroy_element((uint8_t *)ring->buf + ring->oldest_tail); + ring->oldest_tail = (ring->oldest_tail + ring->element_len) % ring->buflen; + } +} + +LWS_VISIBLE LWS_EXTERN uint32_t +lws_ring_get_oldest_tail(struct lws_ring *ring) +{ + return ring->oldest_tail; +} diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index 90c20417..e4e7142d 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -4602,6 +4602,196 @@ lws_interface_to_sa(int ipv6, const char *ifname, struct sockaddr_in *addr, } \ } +/** + * lws_ring: generic ringbuffer struct + * + * all of the members are opaque and manipulated by lws_ring_...() apis. + * + * The lws_ring and its buffer is allocated at runtime on the heap, using + * + * - lws_ring_create() + * - lws_ring_destroy() + * + * It may contain any type, the size of the "element" stored in the ring + * buffer and the number of elements is given at creation time. + * + * Whole elements may be inserted into the ringbuffer and removed from it, using + * + * - lws_ring_insert() + * - lws_ring_consume() + * + * You can find out how many whole elements are free or waiting using + * + * - lws_ring_get_count_free_elements() + * - lws_ring_get_count_waiting_elements() + * + * In addition there are special purpose optional byte-centric apis + * + * - lws_ring_next_linear_insert_range() + * - lws_ring_bump_head() + * + * which let you, eg, read() directly into the ringbuffer without needing + * an intermediate bounce buffer. + * + * The accessors understand that the ring wraps, and optimizes insertion and + * consumption into one or two memcpy()s depending on if the head or tail + * wraps. + * + * lws_ring only supports a single head, but optionally multiple tails with + * an API to inform it when the "oldest" tail has moved on. You can give + * NULL where-ever an api asks for a tail pointer, and it will use an internal + * single tail pointer for convenience. + * + * The "oldest tail", which is the only tail if you give it NULL instead of + * some other tail, is used to track which elements in the ringbuffer are + * still unread by anyone. + * + * - lws_ring_update_oldest_tail() + */ +struct lws_ring; + +/** + * lws_ring_create(): create a new ringbuffer + * + * \param element_len: the size in bytes of one element in the ringbuffer + * \param count: the number of elements the ringbuffer can contain + * \param destroy_element: NULL, or callback to be called for each element + * that is removed from the ringbuffer due to the + * oldest tail moving beyond it + * + * Creates the ringbuffer and allocates the storage. Returns the new + * lws_ring *, or NULL if the allocation failed. + * + * If non-NULL, destroy_element will get called back for every element that is + * retired from the ringbuffer after the oldest tail has gone past it, and for + * any element still left in the ringbuffer when it is destroyed. It replaces + * all other element destruction code in your user code. + */ +LWS_VISIBLE LWS_EXTERN struct lws_ring * +lws_ring_create(size_t element_len, size_t count, + void (*destroy_element)(void *element)); + +/** + * lws_ring_destroy(): destroy a previously created ringbuffer + * + * \param ring: the struct lws_ring to destroy + * + * Destroys the ringbuffer allocation and the struct lws_ring itself. + */ +LWS_VISIBLE LWS_EXTERN void +lws_ring_destroy(struct lws_ring *ring); + +/** + * lws_ring_get_count_free_elements(): return how many elements can fit + * in the free space + * + * \param ring: the struct lws_ring to report on + * + * Returns how much room is left in the ringbuffer for whole element insertion. + */ +LWS_VISIBLE LWS_EXTERN size_t +lws_ring_get_count_free_elements(struct lws_ring *ring); + +/** + * lws_ring_get_count_waiting_elements(): return how many elements can be consumed + * + * \param ring: the struct lws_ring to report on + * \param tail: a pointer to the tail struct to use, or NULL for single tail + * + * Returns how many elements are waiting to be consumed from the perspective + * of the tail pointer given. + */ +LWS_VISIBLE LWS_EXTERN size_t +lws_ring_get_count_waiting_elements(struct lws_ring *ring, uint32_t *tail); + +/** + * lws_ring_insert(): attempt to insert up to max_count elements from src + * + * \param ring: the struct lws_ring to report on + * \param src: the array of elements to be inserted + * \param max_count: the number of available elements at src + * + * Attempts to insert as many of the elements at src as possible, up to the + * maximum max_count. Returns the number of elements actually inserted. + */ +LWS_VISIBLE LWS_EXTERN size_t +lws_ring_insert(struct lws_ring *ring, const void *src, size_t max_count); + +/** + * lws_ring_consume(): attempt to copy out and remove up to max_count elements + * to src + * + * \param ring: the struct lws_ring to report on + * \param tail: a pointer to the tail struct to use, or NULL for single tail + * \param dest: the array of elements to be inserted + * \param max_count: the number of available elements at src + * + * Attempts to copy out as many waiting elements as possible into dest, from + * the perspective of the given tail, up to max_count. + * + * Returns the number of elements copied out. + */ +LWS_VISIBLE LWS_EXTERN size_t +lws_ring_consume(struct lws_ring *ring, uint32_t *tail, void *dest, + size_t max_count); + +/** + * lws_ring_update_oldest_tail(): free up elements older than tail for reuse + * + * \param ring: the struct lws_ring to report on + * \param tail: a pointer to the tail struct to use, or NULL for single tail + * + * If you are using multiple tails, you must use this API to inform the + * lws_ring when none of the tails still need elements in the fifo any more, + * by updating it when the "oldest" tail has moved on. + */ +LWS_VISIBLE LWS_EXTERN void +lws_ring_update_oldest_tail(struct lws_ring *ring, uint32_t tail); + +/** + * lws_ring_get_oldest_tail(): get current oldest available data index + * + * \param ring: the struct lws_ring to report on + * + * If you are initializing a new ringbuffer consumer, you can set its tail to + * this to start it from the oldest ringbuffer entry still available. + */ +LWS_VISIBLE LWS_EXTERN uint32_t +lws_ring_get_oldest_tail(struct lws_ring *ring); + +/** + * lws_ring_next_linear_insert_range(): used to write directly into the ring + * + * \param ring: the struct lws_ring to report on + * \param start: pointer to a void * set to the start of the next ringbuffer area + * \param bytes: pointer to a size_t set to the max length you may use from *start + * + * This provides a low-level, bytewise access directly into the ringbuffer + * allowing direct insertion of data without having to use a bounce buffer. + * + * The api reports the position and length of the next linear range that can + * be written in the ringbuffer, ie, up to the point it would wrap, and sets + * *start and *bytes accordingly. You can then, eg, directly read() into + * *start for up to *bytes, and use lws_ring_bump_head() to update the lws_ring + * with what you have done. + * + * Returns nonzero if no insertion is currently possible. + */ +LWS_VISIBLE LWS_EXTERN int +lws_ring_next_linear_insert_range(struct lws_ring *ring, void **start, + size_t *bytes); + + +/** + * lws_ring_bump_head(): used to write directly into the ring + * + * \param ring: the struct lws_ring to operate on + * \param bytes: the number of bytes you inserted at the current head + */ +LWS_VISIBLE LWS_EXTERN void +lws_ring_bump_head(struct lws_ring *ring, size_t bytes); + + /** * lws_snprintf(): snprintf that truncates the returned length too * diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index aca3416f..0bac91e3 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -668,6 +668,15 @@ enum { LWS_RXFLOW_PENDING_CHANGE = (1 << 1), }; +struct lws_ring { + void *buf; + void (*destroy_element)(void *element); + size_t buflen; + size_t element_len; + uint32_t head; + uint32_t oldest_tail; +}; + /* this is not usable directly by user code any more, lws_close_reason() */ #define LWS_WRITE_CLOSE 4 diff --git a/lib/server-handshake.c b/lib/server-handshake.c index ec9b14e9..2e9563c5 100644 --- a/lib/server-handshake.c +++ b/lib/server-handshake.c @@ -157,7 +157,7 @@ lws_extension_server_handshake(struct lws *wsi, char **p, int budget) (void *)&wsi->act_ext_user[ wsi->count_act_ext], (void *)&opts, 0)) { - lwsl_notice("ext %s failed construction\n", + lwsl_info("ext %s failed construction\n", ext_name); ext_count--; ext++; diff --git a/plugins/protocol_lws_mirror.c b/plugins/protocol_lws_mirror.c index afac1b6d..a9fb0657 100644 --- a/plugins/protocol_lws_mirror.c +++ b/plugins/protocol_lws_mirror.c @@ -1,7 +1,7 @@ /* * libwebsockets-test-server - libwebsockets test implementation * - * Copyright (C) 2010-2016 Andy Green + * Copyright (C) 2010-2017 Andy Green * * This file is made available under the Creative Commons CC0 1.0 * Universal Public Domain Dedication. @@ -27,23 +27,21 @@ #include #include -/* lws-mirror_protocol */ - -#if defined(LWS_WITH_ESP8266) -#define MAX_MESSAGE_QUEUE 64 -#else -#define MAX_MESSAGE_QUEUE 512 -#endif +#define QUEUELEN 64 +/* queue free space below this, rx flow is disabled */ +#define RXFLOW_MIN (4) +/* queue free space above this, rx flow is enabled */ +#define RXFLOW_MAX (QUEUELEN / 3) #define MAX_MIRROR_INSTANCES 10 -struct lws_mirror_instance; +struct mirror_instance; struct per_session_data__lws_mirror { struct lws *wsi; - struct lws_mirror_instance *mi; + struct mirror_instance *mi; struct per_session_data__lws_mirror *same_mi_pss_list; - int ringbuffer_tail; + uint32_t tail; }; struct a_message { @@ -51,18 +49,132 @@ struct a_message { size_t len; }; -struct lws_mirror_instance { - struct lws_mirror_instance *next; +struct mirror_instance { + struct mirror_instance *next; struct per_session_data__lws_mirror *same_mi_pss_list; + struct lws_ring *ring; char name[30]; - struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; - int ringbuffer_head; + char rx_enabled; }; struct per_vhost_data__lws_mirror { - struct lws_mirror_instance *mi_list; + struct mirror_instance *mi_list; }; + +/* enable or disable rx from all connections to this mirror instance */ +static void +mirror_rxflow_instance(struct mirror_instance *mi, int enable) +{ + lws_start_foreach_ll(struct per_session_data__lws_mirror *, + pss, mi->same_mi_pss_list) { + lws_rx_flow_control(pss->wsi, enable); + } lws_end_foreach_ll(pss, same_mi_pss_list); + + mi->rx_enabled = enable; +} + +/* + * Find out which connection to this mirror instance has the longest number + * of still unread elements in the ringbuffer and update the lws_ring "oldest + * tail" with it. Elements behind the "oldest tail" are freed and recycled for + * new head content. Elements after the "oldest tail" are still waiting to be + * read by somebody. + * + * If the oldest tail moved on from before, check if it created enough space + * in the queue to re-enable RX flow control for the mirror instance. + * + * Mark connections that are at the oldest tail as being on a 3s timeout to + * transmit something, otherwise the connection will be closed. Without this, + * a choked or nonresponsive connection can block the FIFO from freeing up any + * new space for new data. + * + * You can skip calling this if on your connection, before processing, the tail + * was not equal to the current worst, ie, if the tail you will work on is != + * lws_ring_get_oldest_tail(ring) then no need to call this when the tail + * has changed; it wasn't the oldest so it won't change the oldest. + * + * Returns 0 if oldest unchanged or 1 if oldest changed from this call. + */ +static int +mirror_update_worst_tail(struct mirror_instance *mi) +{ + uint32_t wai, worst = 0, worst_tail, oldest; + struct per_session_data__lws_mirror *worst_pss = NULL; + + oldest = lws_ring_get_oldest_tail(mi->ring); + + lws_start_foreach_ll(struct per_session_data__lws_mirror *, + pss, mi->same_mi_pss_list) { + wai = lws_ring_get_count_waiting_elements(mi->ring, &pss->tail); + if (wai >= worst) { + worst = wai; + worst_tail = pss->tail; + worst_pss = pss; + } + } lws_end_foreach_ll(pss, same_mi_pss_list); + + if (!worst_pss) + return 0; + + lws_ring_update_oldest_tail(mi->ring, worst_tail); + if (oldest == lws_ring_get_oldest_tail(mi->ring)) + return 0; + + /* + * The oldest tail did move on. Check if we should re-enable rx flow + * for the mirror instance since we made some space now. + */ + if (!mi->rx_enabled && /* rx is disabled */ + lws_ring_get_count_free_elements(mi->ring) > RXFLOW_MAX) + /* there is enough space, let's re-enable rx for our instance */ + mirror_rxflow_instance(mi, 1); + + /* if nothing in queue, no timeout needed */ + if (!worst) + return 1; + + /* + * The guy(s) with the oldest tail block the ringbuffer from recycling + * the FIFO entries he has not read yet. Don't allow those guys to + * block the FIFO operation for very long. + */ + lws_start_foreach_ll(struct per_session_data__lws_mirror *, + pss, mi->same_mi_pss_list) { + if (pss->tail == worst_tail) + /* + * Our policy is if you are the slowest connection, + * you had better transmit something to help with that + * within 3s, or we will hang up on you to stop you + * blocking the FIFO for everyone else. + */ + lws_set_timeout(pss->wsi, + PENDING_TIMEOUT_USER_REASON_BASE, 3); + } lws_end_foreach_ll(pss, same_mi_pss_list); + + return 1; +} + +static void +mirror_callback_all_in_mi_on_writable(struct mirror_instance *mi) +{ + /* ask for WRITABLE callback for every wsi on this mi */ + lws_start_foreach_ll(struct per_session_data__lws_mirror *, + pss, mi->same_mi_pss_list) { + lws_callback_on_writable(pss->wsi); + } lws_end_foreach_ll(pss, same_mi_pss_list); +} + +static void +mirror_destroy_message(void *_msg) +{ + struct a_message *msg = _msg; + + free(msg->payload); + msg->payload = NULL; + msg->len = 0; +} + static int callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) @@ -72,13 +184,15 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, struct per_vhost_data__lws_mirror *v = (struct per_vhost_data__lws_mirror *) lws_protocol_vh_priv_get(lws_get_vhost(wsi), - lws_get_protocol(wsi)); - struct lws_mirror_instance *mi = NULL; - char name[30]; - int n, m, count_mi = 0; + lws_get_protocol(wsi)); + struct mirror_instance *mi = NULL; + const struct a_message *msg; + struct a_message amsg; + char name[300], update_worst, sent_something; + uint32_t oldest_tail; + int n, count_mi = 0; switch (reason) { - case LWS_CALLBACK_ESTABLISHED: lwsl_info("%s: LWS_CALLBACK_ESTABLISHED\n", __func__); @@ -86,21 +200,21 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, * mirror instance name... defaults to "", but if URL includes * "?mirror=xxx", will be "xxx" */ - name[0] = '\0'; - lws_get_urlarg_by_name(wsi, "mirror", name, sizeof(name) - 1); - - lwsl_notice("mirror %s\n", name); + if (lws_get_urlarg_by_name(wsi, "mirror", name, + sizeof(name) - 1)) + lwsl_debug("get urlarg failed\n"); + lwsl_info("%s: mirror name '%s'\n", __func__, name); /* is there already a mirror instance of this name? */ - lws_start_foreach_ll(struct lws_mirror_instance *, - mi1, v->mi_list) { + lws_start_foreach_ll(struct mirror_instance *, mi1, + v->mi_list) { count_mi++; if (strcmp(name, mi1->name)) continue; /* yes... we will join it */ - lwsl_notice("Joining existing mi %p '%s'\n", mi1, name); + lwsl_info("Joining existing mi %p '%s'\n", mi1, name); mi = mi1; break; } lws_end_foreach_ll(mi1, next); @@ -108,20 +222,28 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, if (!mi) { /* no existing mirror instance for name */ - if (count_mi == MAX_MIRROR_INSTANCES) return -1; /* create one with this name, and join it */ - mi = malloc(sizeof(*mi)); + if (!mi) + return 1; memset(mi, 0, sizeof(*mi)); + mi->ring = lws_ring_create(sizeof(struct a_message), + QUEUELEN, + mirror_destroy_message); + if (!mi->ring) { + free(mi); + return 1; + } + mi->next = v->mi_list; v->mi_list = mi; strcpy(mi->name, name); - mi->ringbuffer_head = 0; + mi->rx_enabled = 1; - lwsl_notice("Created new mi %p '%s'\n", mi, name); + lwsl_info("Created new mi %p '%s'\n", mi, name); } /* add our pss to list of guys bound to this mi */ @@ -132,23 +254,20 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, /* init the pss */ pss->mi = mi; - pss->ringbuffer_tail = mi->ringbuffer_head; + pss->tail = lws_ring_get_oldest_tail(mi->ring); pss->wsi = wsi; - break; case LWS_CALLBACK_CLOSED: - /* detach our pss from the mirror instance */ - mi = pss->mi; if (!mi) break; + /* remove our closing pss from its mirror instance list */ lws_start_foreach_llp(struct per_session_data__lws_mirror **, ppss, mi->same_mi_pss_list) { if (*ppss == pss) { - *ppss = pss->same_mi_pss_list; break; } @@ -156,106 +275,131 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, pss->mi = NULL; - if (mi->same_mi_pss_list) + if (mi->same_mi_pss_list) { + /* + * Still other pss using the mirror instance. The pss + * going away may have had the oldest tail, reconfirm + * using the remaining pss what is the current oldest + * tail. If the oldest tail moves on, this call also + * will re-enable rx flow control when appropriate. + */ + mirror_update_worst_tail(mi); break; + } - /* last pss unbound from mi... delete mi */ + /* No more pss using the mirror instance... delete mi */ - lws_start_foreach_llp(struct lws_mirror_instance **, + lws_start_foreach_llp(struct mirror_instance **, pmi, v->mi_list) { if (*pmi != mi) continue; *pmi = (*pmi)->next; - lwsl_info("%s: mirror cleaniup %p\n", __func__, v); - for (n = 0; n < ARRAY_SIZE(mi->ringbuffer); n++) - if (mi->ringbuffer[n].payload) { - free(mi->ringbuffer[n].payload); - mi->ringbuffer[n].payload = NULL; - } - + lws_ring_destroy(mi->ring); free(mi); break; } lws_end_foreach_llp(pmi, next); break; + case LWS_CALLBACK_CONFIRM_EXTENSION_OKAY: + return 1; /* disallow compression */ + case LWS_CALLBACK_PROTOCOL_INIT: /* per vhost */ lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), lws_get_protocol(wsi), sizeof(struct per_vhost_data__lws_mirror)); break; - case LWS_CALLBACK_PROTOCOL_DESTROY: /* per vhost */ - break; - case LWS_CALLBACK_SERVER_WRITEABLE: - while (pss->ringbuffer_tail != pss->mi->ringbuffer_head) { - m = pss->mi->ringbuffer[pss->ringbuffer_tail].len; - n = lws_write(wsi, (unsigned char *) - pss->mi->ringbuffer[pss->ringbuffer_tail].payload + - LWS_PRE, m, LWS_WRITE_TEXT); - if (n < 0) { - lwsl_err("ERROR %d writing to mirror socket\n", n); - return -1; - } - if (n < m) - lwsl_err("mirror partial write %d vs %d\n", n, m); + oldest_tail = lws_ring_get_oldest_tail(pss->mi->ring); + update_worst = oldest_tail == pss->tail; + sent_something = 0; - if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) - pss->ringbuffer_tail = 0; - else - pss->ringbuffer_tail++; + lwsl_debug(" original oldest %d, free elements %ld\n", + oldest_tail, + lws_ring_get_count_free_elements(pss->mi->ring)); - if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) & - (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15)) - lws_rx_flow_allow_all_protocol(lws_get_context(wsi), - lws_get_protocol(wsi)); + do { + msg = lws_ring_get_element(pss->mi->ring, &pss->tail); + if (!msg) + break; - if (lws_send_pipe_choked(wsi)) { - lws_callback_on_writable(wsi); + if (!msg->payload) { + lwsl_err("%s: NULL payload: worst = %d," + " pss->tail = %d\n", __func__, + oldest_tail, pss->tail); + if (lws_ring_consume(pss->mi->ring, &pss->tail, + NULL, 1)) + continue; break; } - } + + n = lws_write(wsi, (unsigned char *)msg->payload + + LWS_PRE, msg->len, LWS_WRITE_TEXT); + if (n < 0) { + lwsl_info("%s: WRITEABLE: %d\n", __func__, n); + + return -1; + } + sent_something = 1; + lws_ring_consume(pss->mi->ring, &pss->tail, NULL, 1); + + } while (!lws_send_pipe_choked(wsi)); + + /* if any left for us to send, ask for writeable again */ + if (lws_ring_get_count_waiting_elements(pss->mi->ring, + &pss->tail)) + lws_callback_on_writable(wsi); + + if (!sent_something || !update_worst) + break; + + /* + * We are no longer holding the oldest tail (since we sent + * something. So free us of the timeout related to hogging the + * oldest tail. + */ + lws_set_timeout(pss->wsi, NO_PENDING_TIMEOUT, 0); + /* + * If we were originally at the oldest fifo position of + * all the tails, now we used some up we may have + * changed the oldest fifo position and made some space. + */ + mirror_update_worst_tail(pss->mi); break; case LWS_CALLBACK_RECEIVE: - if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) & - (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) { - lwsl_err("dropping!\n"); - goto choke; + n = lws_ring_get_count_free_elements(pss->mi->ring); + if (!n) { + lwsl_notice("dropping!\n"); + if (pss->mi->rx_enabled) + mirror_rxflow_instance(pss->mi, 0); + goto req_writable; } - if (pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload) - free(pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload); + amsg.payload = malloc(LWS_PRE + len); + amsg.len = len; + if (!amsg.payload) { + lwsl_notice("OOM: dropping\n"); + break; + } + memcpy((char *)amsg.payload + LWS_PRE, in, len); + if (!lws_ring_insert(pss->mi->ring, &amsg, 1)) { + mirror_destroy_message(&amsg); + lwsl_notice("dropping!\n"); + if (pss->mi->rx_enabled) + mirror_rxflow_instance(pss->mi, 0); + goto req_writable; + } - pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload = malloc(LWS_PRE + len); - pss->mi->ringbuffer[pss->mi->ringbuffer_head].len = len; - memcpy((char *)pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload + - LWS_PRE, in, len); - if (pss->mi->ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) - pss->mi->ringbuffer_head = 0; - else - pss->mi->ringbuffer_head++; + if (pss->mi->rx_enabled && + lws_ring_get_count_free_elements(pss->mi->ring) < RXFLOW_MIN) + mirror_rxflow_instance(pss->mi, 0); - if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) & - (MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2)) - goto done; - -choke: - lwsl_debug("LWS_CALLBACK_RECEIVE: throttling %p\n", wsi); - lws_rx_flow_control(wsi, 0); - -done: - /* - * ask for WRITABLE callback for every wsi bound to this - * mirror instance - */ - lws_start_foreach_ll(struct per_session_data__lws_mirror *, - pss1, pss->mi->same_mi_pss_list) { - lws_callback_on_writable(pss1->wsi); - } lws_end_foreach_ll(pss1, same_mi_pss_list); +req_writable: + mirror_callback_all_in_mi_on_writable(pss->mi); break; default: diff --git a/test-server/test.html b/test-server/test.html index 46426180..ec09d3db 100644 --- a/test-server/test.html +++ b/test-server/test.html @@ -694,6 +694,8 @@ function ot_req_close() { var ctx; var socket_lm; var color = "#000000"; + var pending = ""; + var lm_timer; socket_lm = lws_meta.new_ws(get_appropriate_ws_url("?mirror=" + mirror_name), "lws-mirror-protocol"); @@ -772,6 +774,11 @@ function ev_mouseup(ev) { no_last = 1; } +function lm_timer_handler(ev) { + socket_lm.send(pending); + pending=""; +} + function ev_mousemove (ev) { var x, y; @@ -792,7 +799,15 @@ function ev_mousemove (ev) { last_y = y; return; } - socket_lm.send("d " + color + " " + last_x + " " + last_y + " " + x + ' ' + y + ';'); + pending = pending + "d " + color + " " + last_x + " " + last_y + + " " + x + ' ' + y + ';'; + + if (pending.length > 400) { + socket_lm.send(pending); + clearTimeout(lm_timer); + pending = ""; + } else + lm_timer = setTimeout(lm_timer_handler, 30); last_x = x; last_y = y;