diff --git a/READMEs/README.coding.md b/READMEs/README.coding.md index 218b4454a..3dd680116 100644 --- a/READMEs/README.coding.md +++ b/READMEs/README.coding.md @@ -122,52 +122,93 @@ all the server resources. @section evtloop Libwebsockets is singlethreaded -Libwebsockets works in a serialized event loop, in a single thread. +Libwebsockets works in a serialized event loop, in a single thread. It supports +not only the default poll() backend, but libuv, libev, and libevent event loop +libraries that also take this locking-free, nonblocking event loop approach that +is not threadsafe. There are several advantages to this technique, but one +disadvantage, it doesn't integrate easily if there are multiple threads that +want to use libwebsockets. -Directly performing websocket actions from other threads is not allowed. -Aside from the internal data being inconsistent in `forked()` processes, -the scope of a `wsi` (`struct websocket`) can end at any time during service -with the socket closing and the `wsi` freed. +However integration to multithreaded apps is possible if you follow some guidelines. -Websocket write activities should only take place in the -`LWS_CALLBACK_SERVER_WRITEABLE` callback as described below. +1) Aside from two APIs, directly calling lws apis from other threads is not allowed. -This network-programming necessity to link the issue of new data to -the peer taking the previous data is not obvious to all users so let's -repeat that in other words: +2) If you want to keep a list of live wsi, you need to use lifecycle callbacks on +the protocol in the service thread to manage the list, with your own locking. +Typically you use an ESTABLISHED callback to add ws wsi to your list and a CLOSED +callback to remove them. -***ONLY DO LWS_WRITE FROM THE WRITEABLE CALLBACK*** +3) LWS regulates your write activity by being able to let you know when you may +write more on a connection. That reflects the reality that you cannot succeed to +send data to a peer that has no room for it, so you should not generate or buffer +write data until you know the peer connection can take more. -There is another network-programming truism that surprises some people which -is if the sink for the data cannot accept more: +Other libraries pretend that the guy doing the writing is the boss who decides +what happens, and absorb as much as you want to write to local buffering. That does +not scale to a lot of connections, because it will exhaust your memory and waste +time copying data around in memory needlessly. -***YOU MUST PERFORM RX FLOW CONTROL*** to stop taking new input. TCP will make -this situation known to the upstream sender by making it impossible for him to -send anything more on the connection until we start accepting things again. +The truth is the receiver, along with the network between you, is the boss who +decides what will happen. If he stops accepting data, no data will move. LWS is +designed to reflect that. + +If you have something to send, you call `lws_callback_on_writable()` on the +connection, and when it is writeable, you will get a `LWS_CALLBACK_SERVER_WRITEABLE` +callback, where you should generate the data to send and send it with `lws_write()`. + +You cannot send data using `lws_write()` outside of the WRITEABLE callback. + +4) For multithreaded apps, this corresponds to a need to be able to provoke the +`lws_callback_on_writable()` action and to wake the service thread from its event +loop wait (sleeping in `poll()` or `epoll()` or whatever). The rules above +mean directly sending data on the connection from another thread is out of the +question. + +Therefore the two apis mentioned above that may be used from another thread are + + - For LWS using the default poll() event loop, `lws_callback_on_writable()` + + - For LWS using libuv/libev/libevent event loop, `lws_cancel_service()` + +If you are using the default poll() event loop, one "foreign thread" at a time may +call `lws_callback_on_writable()` directly for a wsi. You need to use your own +locking around that to serialize multiple thread access to it. + +If you implement LWS_CALLBACK_GET_THREAD_ID in protocols[0], then LWS will detect +when it has been called from a foreign thread and automatically use +`lws_cancel_service()` to additionally wake the service loop from its wait. + +For libuv/libev/libevent event loop, they cannot handle being called from other +threads. So there is a slightly different scheme, you may call `lws_cancel_service()` +to force the event loop to end immediately. This then broadcasts a callback (in the +service thread context) `LWS_CALLBACK_EVENT_WAIT_CANCELLED`, to all protocols on all +vhosts, where you can perform your own locking and walk a list of wsi that need +`lws_callback_on_writable()` calling on them. + +`lws_cancel_service()` is very cheap to call. + +5) The obverse of this truism about the receiver being the boss is the case where +we are receiving. If we get into a situation we actually can't usefully +receive any more, perhaps because we are passing the data on and the guy we want +to send to can't receive any more, then we should "turn off RX" by using the +RX flow control API, `lws_rx_flow_control(wsi, 0)`. When something happens where we +can accept more RX, (eg, we learn our onward connection is writeable) we can call +it again to re-enable it on the incoming wsi. + +LWS stops calling back about RX immediately you use flow control to disable RX, it +buffers the data internally if necessary. So you will only see RX when you can +handle it. When flow control is disabled, LWS stops taking new data in... this makes +the situation known to the sender by TCP "backpressure", the tx window fills and the +sender finds he cannot write any more to the connection. See the mirror protocol implementations for example code. -Only live connections appear in the user callbacks, so this removes any -possibility of trying to used closed and freed wsis. - If you need to service other socket or file descriptors as well as the websocket ones, you can combine them together with the websocket ones in one poll loop, see "External Polling Loop support" below, and -still do it all in one thread / process context. - -SSL_library_init() is called from the context create api and it also is not -reentrant. So at least create the contexts sequentially. - -If you must interoperate with other threads, you can use `lws_cancel_service()` -to notify lws that something has happened on another thread. lws will send -`LWS_CALLBACK_EVENT_WAIT_CANCELLED` events to all protocols, serialized with -the main event loop operations, ie, safely. - -You can handle this callback to check the reason you were notified and take -action using any usual lws api, since you are in a callback in the normal -service thread. - -`lws_cancel_service()` is very cheap for the other thread to call. +still do it all in one thread / process context. If the need is less +architectural, you can also create RAW mode client and serving sockets; this +is how the lws plugin for the ssh server works. @section closing Closing connections from the user side diff --git a/lib/context.c b/lib/context.c index 0462bd640..447968dcd 100644 --- a/lib/context.c +++ b/lib/context.c @@ -1559,7 +1559,9 @@ lws_vhost_destroy(struct lws_vhost *vh) LWS_VISIBLE void lws_context_destroy(struct lws_context *context) { + volatile struct lws_foreign_thread_pollfd *ftp, *next; struct lws_context_per_thread *pt; + volatile struct lws_context_per_thread *vpt; struct lws_vhost *vh = NULL; struct lws wsi; int n, m; @@ -1590,6 +1592,15 @@ lws_context_destroy(struct lws_context *context) while (m--) { pt = &context->pt[m]; + vpt = (volatile struct lws_context_per_thread *)pt; + + ftp = vpt->foreign_pfd_list; + while (ftp) { + next = ftp->next; + lws_free((void *)ftp); + ftp = next; + } + vpt->foreign_pfd_list = NULL; for (n = 0; (unsigned int)n < context->pt[m].fds_count; n++) { struct lws *wsi = wsi_from_fd(context, pt->fds[n].fd); diff --git a/lib/plat/lws-plat-unix.c b/lib/plat/lws-plat-unix.c index 6e824a349..829354217 100644 --- a/lib/plat/lws-plat-unix.c +++ b/lib/plat/lws-plat-unix.c @@ -136,6 +136,8 @@ LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line) LWS_VISIBLE LWS_EXTERN int _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) { + volatile struct lws_foreign_thread_pollfd *ftp, *next; + volatile struct lws_context_per_thread *vpt; struct lws_context_per_thread *pt; int n = -1, m, c; @@ -145,6 +147,7 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) return 1; pt = &context->pt[tsi]; + vpt = (volatile struct lws_context_per_thread *)pt; lws_stats_atomic_bump(context, pt, LWSSTATS_C_SERVICE_ENTRY, 1); @@ -180,7 +183,41 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) timeout_ms = 0; } + vpt->inside_poll = 1; + lws_memory_barrier(); n = poll(pt->fds, pt->fds_count, timeout_ms); + vpt->inside_poll = 0; + lws_memory_barrier(); + + /* Collision will be rare and brief. Just spin until it completes */ + while (vpt->foreign_spinlock) + ; + + /* + * At this point we are not inside a foreign thread pollfd change, + * and we have marked ourselves as outside the poll() wait. So we + * are the only guys that can modify the lws_foreign_thread_pollfd + * list on the pt. Drain the list and apply the changes to the + * affected pollfds in the correct order. + */ + ftp = vpt->foreign_pfd_list; + //lwsl_notice("cleared list %p\n", ftp); + while (ftp) { + struct lws *wsi; + struct lws_pollfd *pfd; + + next = ftp->next; + pfd = &vpt->fds[ftp->fd_index]; + if (lws_sockfd_valid(pfd->fd)) { + wsi = wsi_from_fd(context, pfd->fd); + if (wsi) + lws_change_pollfd(wsi, ftp->_and, ftp->_or); + } + lws_free((void *)ftp); + ftp = next; + } + vpt->foreign_pfd_list = NULL; + lws_memory_barrier(); #ifdef LWS_OPENSSL_SUPPORT if (!n && !pt->rx_draining_ext_list && diff --git a/lib/pollfd.c b/lib/pollfd.c index 6af37a9a6..18b8c73b1 100644 --- a/lib/pollfd.c +++ b/lib/pollfd.c @@ -24,6 +24,9 @@ int _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa) { +#if !defined(LWS_WITH_LIBUV) && !defined(LWS_WITH_LIBEV) && !defined(LWS_WITH_LIBEVENT) + volatile struct lws_context_per_thread *vpt; +#endif struct lws_context_per_thread *pt; struct lws_context *context; int ret = 0, pa_events = 1; @@ -33,7 +36,8 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa) if (!wsi || wsi->position_in_fds_table < 0) return 0; - if (wsi->handling_pollout && !_and && _or == LWS_POLLOUT) { + if (((volatile struct lws *)wsi)->handling_pollout && + !_and && _or == LWS_POLLOUT) { /* * Happening alongside service thread handling POLLOUT. * The danger is when he is finished, he will disable POLLOUT, @@ -42,7 +46,7 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa) * Instead of changing the fds, inform the service thread * what happened, and ask it to leave POLLOUT active on exit */ - wsi->leave_pollout_active = 1; + ((volatile struct lws *)wsi)->leave_pollout_active = 1; /* * by definition service thread is not in poll wait, so no need * to cancel service @@ -55,9 +59,72 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa) context = wsi->context; pt = &context->pt[(int)wsi->tsi]; + assert(wsi->position_in_fds_table >= 0 && wsi->position_in_fds_table < (int)pt->fds_count); +#if !defined(LWS_WITH_LIBUV) && !defined(LWS_WITH_LIBEV) && !defined(LWS_WITH_LIBEVENT) + /* + * This only applies when we use the default poll() event loop. + * + * BSD can revert pa->events at any time, when the kernel decides to + * exit from poll(). We can't protect against it using locking. + * + * Therefore we must check first if the service thread is in poll() + * wait; if so, we know we must be being called from a foreign thread, + * and we must keep a strictly ordered list of changes we made instead + * of trying to apply them, since when poll() exits, which may happen + * at any time it would revert our changes. + * + * The plat code will apply them when it leaves the poll() wait + * before doing anything else. + */ + + vpt = (volatile struct lws_context_per_thread *)pt; + + vpt->foreign_spinlock = 1; + lws_memory_barrier(); + + if (vpt->inside_poll) { + struct lws_foreign_thread_pollfd *ftp, **ftp1; + /* + * We are certainly a foreign thread trying to change events + * while the service thread is in the poll() wait. + * + * Create a list of changes to be applied after poll() exit, + * instead of trying to apply them now. + */ + ftp = lws_malloc(sizeof(*ftp), "ftp"); + if (!ftp) { + vpt->foreign_spinlock = 0; + lws_memory_barrier(); + ret = -1; + goto bail; + } + + ftp->_and = _and; + ftp->_or = _or; + ftp->fd_index = wsi->position_in_fds_table; + ftp->next = NULL; + + /* place at END of list to maintain order */ + ftp1 = (struct lws_foreign_thread_pollfd **) + &vpt->foreign_pfd_list; + while (*ftp1) + ftp1 = &((*ftp1)->next); + + *ftp1 = ftp; + vpt->foreign_spinlock = 0; + lws_memory_barrier(); + lws_cancel_service_pt(wsi); + + return 0; + } + + vpt->foreign_spinlock = 0; + lws_memory_barrier(); +#endif + pfd = &pt->fds[wsi->position_in_fds_table]; pa->fd = wsi->desc.sockfd; lwsl_debug("%s: fd %d old events %d\n", __func__, pa->fd, pfd->events); @@ -107,13 +174,11 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa) #endif if (pa_events) { - if (lws_plat_change_pollfd(context, wsi, pfd)) { lwsl_info("%s failed\n", __func__); ret = -1; goto bail; } - sampled_tid = context->service_tid; if (sampled_tid && wsi->vhost) { tid = wsi->vhost->protocols[0].callback(wsi, @@ -126,6 +191,7 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa) lws_cancel_service_pt(wsi); } } + bail: return ret; } diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index a723d162b..1a5b25f74 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -534,6 +534,14 @@ enum lws_ssl_capable_status { LWS_SSL_CAPABLE_MORE_SERVICE = -4, /* general retry */ }; +#if defined(__clang__) +#define lws_memory_barrier() __sync_synchronize() +#elif defined(__GNUC__) +#define lws_memory_barrier() __sync_synchronize() +#else +#define lws_memory_barrier() +#endif + enum lws_websocket_opcodes_07 { LWSWSOPC_CONTINUATION = 0, LWSWSOPC_TEXT_FRAME = 1, @@ -776,6 +784,13 @@ struct lws_fd_hashtable { }; #endif +struct lws_foreign_thread_pollfd { + struct lws_foreign_thread_pollfd *next; + int fd_index; + int _and; + int _or; +}; + /* * This is totally opaque to code using the library. It's exported as a * forward-reference pointer-only declaration; the user can use the pointer with @@ -847,6 +862,7 @@ struct lws_context_per_thread { pthread_mutex_t lock; #endif struct lws_pollfd *fds; + volatile struct lws_foreign_thread_pollfd * volatile foreign_pfd_list; #if defined(LWS_WITH_ESP8266) struct lws **lws_vs_fds_index; #endif @@ -899,6 +915,9 @@ struct lws_context_per_thread { lws_sockfd_type dummy_pipe_fds[2]; struct lws *pipe_wsi; + volatile unsigned char inside_poll; + volatile unsigned char foreign_spinlock; + unsigned int fds_count; uint32_t ah_pool_length; @@ -2026,10 +2045,6 @@ struct lws { unsigned int redirect_to_https:1; #endif - /* volatile to make sure code is aware other thread can change */ - volatile unsigned int handling_pollout:1; - volatile unsigned int leave_pollout_active:1; - #ifndef LWS_NO_CLIENT unsigned short c_port; #endif @@ -2059,6 +2074,9 @@ struct lws { #if defined(LWS_WITH_CGI) || !defined(LWS_NO_CLIENT) char reason_bf; /* internal writeable callback reason bitfield */ #endif + /* volatile to make sure code is aware other thread can change */ + volatile char handling_pollout; + volatile char leave_pollout_active; }; #define lws_is_flowcontrolled(w) (!!(wsi->rxflow_bitmap)) diff --git a/lib/service.c b/lib/service.c index ec3b14f22..fa1cc8bd9 100644 --- a/lib/service.c +++ b/lib/service.c @@ -76,9 +76,10 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd) struct lws **wsi2, *wsi2a; #endif int ret, m, n; + volatile struct lws *vwsi = (volatile struct lws *)wsi; - wsi->leave_pollout_active = 0; - wsi->handling_pollout = 1; + vwsi->leave_pollout_active = 0; + vwsi->handling_pollout = 1; /* * if another thread wants POLLOUT on us, from here on while * handling_pollout is set, he will only set leave_pollout_active. @@ -332,14 +333,14 @@ user_service: /* one shot */ if (wsi->parent_carries_io) { - wsi->handling_pollout = 0; - wsi->leave_pollout_active = 0; + vwsi->handling_pollout = 0; + vwsi->leave_pollout_active = 0; return lws_calllback_as_writeable(wsi); } if (pollfd) { - int eff = wsi->leave_pollout_active; + int eff = vwsi->leave_pollout_active; if (!eff) if (lws_change_pollfd(wsi, LWS_POLLOUT, 0)) { @@ -347,7 +348,7 @@ user_service: goto bail_die; } - wsi->handling_pollout = 0; + vwsi->handling_pollout = 0; /* cannot get leave_pollout_active set after the above */ if (!eff && wsi->leave_pollout_active) @@ -355,7 +356,7 @@ user_service: * handling_pollout, force POLLOUT on */ lws_calllback_as_writeable(wsi); - wsi->leave_pollout_active = 0; + vwsi->leave_pollout_active = 0; } if (wsi->mode != LWSCM_WSCL_ISSUE_HTTP_BODY && @@ -460,7 +461,7 @@ user_service_go_again: if (w->state == LWSS_HTTP_ISSUING_FILE) { - w->leave_pollout_active = 0; + ((volatile struct lws *)w)->leave_pollout_active = 0; /* >0 == completion, <0 == error * @@ -528,12 +529,13 @@ next_child: notify: #endif - wsi->leave_pollout_active = 0; + vwsi = (volatile struct lws *)wsi; + vwsi->leave_pollout_active = 0; n = lws_calllback_as_writeable(wsi); - wsi->handling_pollout = 0; + vwsi->handling_pollout = 0; - if (wsi->leave_pollout_active) + if (vwsi->leave_pollout_active) lws_change_pollfd(wsi, 0, LWS_POLLOUT); return n; @@ -544,14 +546,14 @@ notify: */ bail_ok: - wsi->handling_pollout = 0; - wsi->leave_pollout_active = 0; + vwsi->handling_pollout = 0; + vwsi->leave_pollout_active = 0; return 0; bail_die: - wsi->handling_pollout = 0; - wsi->leave_pollout_active = 0; + vwsi->handling_pollout = 0; + vwsi->leave_pollout_active = 0; return -1; }