diff --git a/plugins/protocol_lws_mirror.c b/plugins/protocol_lws_mirror.c index 80b11d7f..11a6a2d1 100644 --- a/plugins/protocol_lws_mirror.c +++ b/plugins/protocol_lws_mirror.c @@ -16,6 +16,11 @@ * The test apps are intended to be adapted for use in your code, which * may be proprietary. So unlike the library itself, they are licensed * Public Domain. + * + * Notice that the lws_pthread... locking apis are all zero-footprint + * NOPs in the case LWS_MAX_SMP == 1, which is the default. When lws + * is built for multiple service threads though, they resolve to their + * pthreads equivalents. */ #if !defined (LWS_PLUGIN_STATIC) @@ -52,7 +57,10 @@ struct a_message { struct mirror_instance { struct mirror_instance *next; + lws_pthread_mutex(lock); /* protects all mirror instance data */ struct per_session_data__lws_mirror *same_mi_pss_list; + /**< must hold the the per_vhost_data__lws_mirror.lock as well + * to change mi list membership */ struct lws_ring *ring; int messages_allocated; char name[30]; @@ -60,13 +68,14 @@ struct mirror_instance { }; struct per_vhost_data__lws_mirror { + lws_pthread_mutex(lock); /* protects mi_list membership changes */ struct mirror_instance *mi_list; }; /* enable or disable rx from all connections to this mirror instance */ static void -mirror_rxflow_instance(struct mirror_instance *mi, int enable) +__mirror_rxflow_instance(struct mirror_instance *mi, int enable) { lws_start_foreach_ll(struct per_session_data__lws_mirror *, pss, mi->same_mi_pss_list) { @@ -99,7 +108,7 @@ mirror_rxflow_instance(struct mirror_instance *mi, int enable) * Returns 0 if oldest unchanged or 1 if oldest changed from this call. */ static int -mirror_update_worst_tail(struct mirror_instance *mi) +__mirror_update_worst_tail(struct mirror_instance *mi) { uint32_t wai, worst = 0, worst_tail = 0, oldest; struct per_session_data__lws_mirror *worst_pss = NULL; @@ -130,7 +139,7 @@ mirror_update_worst_tail(struct mirror_instance *mi) if (!mi->rx_enabled && /* rx is disabled */ lws_ring_get_count_free_elements(mi->ring) >= RXFLOW_MAX) /* there is enough space, let's re-enable rx for our instance */ - mirror_rxflow_instance(mi, 1); + __mirror_rxflow_instance(mi, 1); /* if nothing in queue, no timeout needed */ if (!worst) @@ -158,7 +167,7 @@ mirror_update_worst_tail(struct mirror_instance *mi) } static void -mirror_callback_all_in_mi_on_writable(struct mirror_instance *mi) +__mirror_callback_all_in_mi_on_writable(struct mirror_instance *mi) { /* ask for WRITABLE callback for every wsi on this mi */ lws_start_foreach_ll(struct per_session_data__lws_mirror *, @@ -168,7 +177,7 @@ mirror_callback_all_in_mi_on_writable(struct mirror_instance *mi) } static void -mirror_destroy_message(void *_msg) +__mirror_destroy_message(void *_msg) { struct a_message *msg = _msg; @@ -213,6 +222,8 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, /* is there already a mirror instance of this name? */ + lws_pthread_mutex_lock(&v->lock); /* vhost lock { */ + lws_start_foreach_ll(struct mirror_instance *, mi1, v->mi_list) { count_mi++; @@ -233,14 +244,14 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, /* create one with this name, and join it */ mi = malloc(sizeof(*mi)); if (!mi) - return 1; + goto bail1; memset(mi, 0, sizeof(*mi)); mi->ring = lws_ring_create(sizeof(struct a_message), QUEUELEN, - mirror_destroy_message); + __mirror_destroy_message); if (!mi->ring) { free(mi); - return 1; + goto bail1; } mi->next = v->mi_list; @@ -248,6 +259,8 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, lws_snprintf(mi->name, sizeof(mi->name) - 1, "%s", pn); mi->rx_enabled = 1; + lws_pthread_mutex_init(&mi->lock); + lwsl_notice("Created new mi %p '%s'\n", mi, pn); } @@ -261,14 +274,22 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, pss->mi = mi; pss->tail = lws_ring_get_oldest_tail(mi->ring); pss->wsi = wsi; + + lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */ break; +bail1: + lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */ + return 1; + case LWS_CALLBACK_CLOSED: /* detach our pss from the mirror instance */ mi = pss->mi; if (!mi) break; + lws_pthread_mutex_lock(&v->lock); /* vhost lock { */ + /* remove our closing pss from its mirror instance list */ lws_start_foreach_llp(struct per_session_data__lws_mirror **, ppss, mi->same_mi_pss_list) { @@ -288,7 +309,10 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, * tail. If the oldest tail moves on, this call also * will re-enable rx flow control when appropriate. */ - mirror_update_worst_tail(mi); + lws_pthread_mutex_lock(&mi->lock); /* mi lock { */ + __mirror_update_worst_tail(mi); + lws_pthread_mutex_unlock(&mi->lock); /* } mi lock */ + lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */ break; } @@ -300,11 +324,14 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, *pmi = (*pmi)->next; lws_ring_destroy(mi->ring); + lws_pthread_mutex_destroy(&mi->lock); + free(mi); break; } } lws_end_foreach_llp(pmi, next); + lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */ break; case LWS_CALLBACK_CONFIRM_EXTENSION_OKAY: @@ -314,9 +341,18 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), lws_get_protocol(wsi), sizeof(struct per_vhost_data__lws_mirror)); + v = (struct per_vhost_data__lws_mirror *) + lws_protocol_vh_priv_get(lws_get_vhost(wsi), + lws_get_protocol(wsi)); + lws_pthread_mutex_init(&v->lock); + break; + + case LWS_CALLBACK_PROTOCOL_DESTROY: + lws_pthread_mutex_destroy(&v->lock); break; case LWS_CALLBACK_SERVER_WRITEABLE: + lws_pthread_mutex_lock(&pss->mi->lock); /* instance lock { */ oldest_tail = lws_ring_get_oldest_tail(pss->mi->ring); update_worst = oldest_tail == pss->tail; sent_something = 0; @@ -341,7 +377,7 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, if (n < 0) { lwsl_info("%s: WRITEABLE: %d\n", __func__, n); - return -1; + goto bail2; } sent_something = 1; lws_ring_consume(pss->mi->ring, &pss->tail, NULL, 1); @@ -354,7 +390,7 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, lws_callback_on_writable(wsi); if (!sent_something || !update_worst) - break; + goto done1; /* * We are no longer holding the oldest tail (since we sent @@ -367,15 +403,24 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, * all the tails, now we used some up we may have * changed the oldest fifo position and made some space. */ - mirror_update_worst_tail(pss->mi); + __mirror_update_worst_tail(pss->mi); + +done1: + lws_pthread_mutex_unlock(&pss->mi->lock); /* } instance lock */ break; +bail2: + lws_pthread_mutex_unlock(&pss->mi->lock); /* } instance lock */ + + return -1; + case LWS_CALLBACK_RECEIVE: + lws_pthread_mutex_lock(&pss->mi->lock); /* mi lock { */ n = (int)lws_ring_get_count_free_elements(pss->mi->ring); if (!n) { lwsl_notice("dropping!\n"); if (pss->mi->rx_enabled) - mirror_rxflow_instance(pss->mi, 0); + __mirror_rxflow_instance(pss->mi, 0); goto req_writable; } @@ -383,28 +428,31 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, amsg.len = len; if (!amsg.payload) { lwsl_notice("OOM: dropping\n"); - break; + goto done2; } memcpy((char *)amsg.payload + LWS_PRE, in, len); if (!lws_ring_insert(pss->mi->ring, &amsg, 1)) { - mirror_destroy_message(&amsg); + __mirror_destroy_message(&amsg); lwsl_notice("dropping!\n"); if (pss->mi->rx_enabled) - mirror_rxflow_instance(pss->mi, 0); + __mirror_rxflow_instance(pss->mi, 0); goto req_writable; } if (pss->mi->rx_enabled && lws_ring_get_count_free_elements(pss->mi->ring) < RXFLOW_MIN) - mirror_rxflow_instance(pss->mi, 0); + __mirror_rxflow_instance(pss->mi, 0); req_writable: - mirror_callback_all_in_mi_on_writable(pss->mi); + __mirror_callback_all_in_mi_on_writable(pss->mi); + +done2: + lws_pthread_mutex_unlock(&pss->mi->lock); /* } mi lock */ break; case LWS_CALLBACK_EVENT_WAIT_CANCELLED: - lwsl_notice("LWS_CALLBACK_EVENT_WAIT_CANCELLED\n"); + lwsl_info("LWS_CALLBACK_EVENT_WAIT_CANCELLED\n"); break; default: