1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

bsd: support foreign thread event changes while in poll wait

https://github.com/warmcat/libwebsockets/issues/314
This commit is contained in:
Andy Green 2017-11-12 09:16:46 +08:00
parent 74fddbc09e
commit 1d5bd23268
6 changed files with 232 additions and 57 deletions

View file

@ -122,52 +122,93 @@ all the server resources.
@section evtloop Libwebsockets is singlethreaded
Libwebsockets works in a serialized event loop, in a single thread.
Libwebsockets works in a serialized event loop, in a single thread. It supports
not only the default poll() backend, but libuv, libev, and libevent event loop
libraries that also take this locking-free, nonblocking event loop approach that
is not threadsafe. There are several advantages to this technique, but one
disadvantage, it doesn't integrate easily if there are multiple threads that
want to use libwebsockets.
Directly performing websocket actions from other threads is not allowed.
Aside from the internal data being inconsistent in `forked()` processes,
the scope of a `wsi` (`struct websocket`) can end at any time during service
with the socket closing and the `wsi` freed.
However integration to multithreaded apps is possible if you follow some guidelines.
Websocket write activities should only take place in the
`LWS_CALLBACK_SERVER_WRITEABLE` callback as described below.
1) Aside from two APIs, directly calling lws apis from other threads is not allowed.
This network-programming necessity to link the issue of new data to
the peer taking the previous data is not obvious to all users so let's
repeat that in other words:
2) If you want to keep a list of live wsi, you need to use lifecycle callbacks on
the protocol in the service thread to manage the list, with your own locking.
Typically you use an ESTABLISHED callback to add ws wsi to your list and a CLOSED
callback to remove them.
***ONLY DO LWS_WRITE FROM THE WRITEABLE CALLBACK***
3) LWS regulates your write activity by being able to let you know when you may
write more on a connection. That reflects the reality that you cannot succeed to
send data to a peer that has no room for it, so you should not generate or buffer
write data until you know the peer connection can take more.
There is another network-programming truism that surprises some people which
is if the sink for the data cannot accept more:
Other libraries pretend that the guy doing the writing is the boss who decides
what happens, and absorb as much as you want to write to local buffering. That does
not scale to a lot of connections, because it will exhaust your memory and waste
time copying data around in memory needlessly.
***YOU MUST PERFORM RX FLOW CONTROL*** to stop taking new input. TCP will make
this situation known to the upstream sender by making it impossible for him to
send anything more on the connection until we start accepting things again.
The truth is the receiver, along with the network between you, is the boss who
decides what will happen. If he stops accepting data, no data will move. LWS is
designed to reflect that.
If you have something to send, you call `lws_callback_on_writable()` on the
connection, and when it is writeable, you will get a `LWS_CALLBACK_SERVER_WRITEABLE`
callback, where you should generate the data to send and send it with `lws_write()`.
You cannot send data using `lws_write()` outside of the WRITEABLE callback.
4) For multithreaded apps, this corresponds to a need to be able to provoke the
`lws_callback_on_writable()` action and to wake the service thread from its event
loop wait (sleeping in `poll()` or `epoll()` or whatever). The rules above
mean directly sending data on the connection from another thread is out of the
question.
Therefore the two apis mentioned above that may be used from another thread are
- For LWS using the default poll() event loop, `lws_callback_on_writable()`
- For LWS using libuv/libev/libevent event loop, `lws_cancel_service()`
If you are using the default poll() event loop, one "foreign thread" at a time may
call `lws_callback_on_writable()` directly for a wsi. You need to use your own
locking around that to serialize multiple thread access to it.
If you implement LWS_CALLBACK_GET_THREAD_ID in protocols[0], then LWS will detect
when it has been called from a foreign thread and automatically use
`lws_cancel_service()` to additionally wake the service loop from its wait.
For libuv/libev/libevent event loop, they cannot handle being called from other
threads. So there is a slightly different scheme, you may call `lws_cancel_service()`
to force the event loop to end immediately. This then broadcasts a callback (in the
service thread context) `LWS_CALLBACK_EVENT_WAIT_CANCELLED`, to all protocols on all
vhosts, where you can perform your own locking and walk a list of wsi that need
`lws_callback_on_writable()` calling on them.
`lws_cancel_service()` is very cheap to call.
5) The obverse of this truism about the receiver being the boss is the case where
we are receiving. If we get into a situation we actually can't usefully
receive any more, perhaps because we are passing the data on and the guy we want
to send to can't receive any more, then we should "turn off RX" by using the
RX flow control API, `lws_rx_flow_control(wsi, 0)`. When something happens where we
can accept more RX, (eg, we learn our onward connection is writeable) we can call
it again to re-enable it on the incoming wsi.
LWS stops calling back about RX immediately you use flow control to disable RX, it
buffers the data internally if necessary. So you will only see RX when you can
handle it. When flow control is disabled, LWS stops taking new data in... this makes
the situation known to the sender by TCP "backpressure", the tx window fills and the
sender finds he cannot write any more to the connection.
See the mirror protocol implementations for example code.
Only live connections appear in the user callbacks, so this removes any
possibility of trying to used closed and freed wsis.
If you need to service other socket or file descriptors as well as the
websocket ones, you can combine them together with the websocket ones
in one poll loop, see "External Polling Loop support" below, and
still do it all in one thread / process context.
SSL_library_init() is called from the context create api and it also is not
reentrant. So at least create the contexts sequentially.
If you must interoperate with other threads, you can use `lws_cancel_service()`
to notify lws that something has happened on another thread. lws will send
`LWS_CALLBACK_EVENT_WAIT_CANCELLED` events to all protocols, serialized with
the main event loop operations, ie, safely.
You can handle this callback to check the reason you were notified and take
action using any usual lws api, since you are in a callback in the normal
service thread.
`lws_cancel_service()` is very cheap for the other thread to call.
still do it all in one thread / process context. If the need is less
architectural, you can also create RAW mode client and serving sockets; this
is how the lws plugin for the ssh server works.
@section closing Closing connections from the user side

View file

@ -1559,7 +1559,9 @@ lws_vhost_destroy(struct lws_vhost *vh)
LWS_VISIBLE void
lws_context_destroy(struct lws_context *context)
{
volatile struct lws_foreign_thread_pollfd *ftp, *next;
struct lws_context_per_thread *pt;
volatile struct lws_context_per_thread *vpt;
struct lws_vhost *vh = NULL;
struct lws wsi;
int n, m;
@ -1590,6 +1592,15 @@ lws_context_destroy(struct lws_context *context)
while (m--) {
pt = &context->pt[m];
vpt = (volatile struct lws_context_per_thread *)pt;
ftp = vpt->foreign_pfd_list;
while (ftp) {
next = ftp->next;
lws_free((void *)ftp);
ftp = next;
}
vpt->foreign_pfd_list = NULL;
for (n = 0; (unsigned int)n < context->pt[m].fds_count; n++) {
struct lws *wsi = wsi_from_fd(context, pt->fds[n].fd);

View file

@ -136,6 +136,8 @@ LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
LWS_VISIBLE LWS_EXTERN int
_lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
{
volatile struct lws_foreign_thread_pollfd *ftp, *next;
volatile struct lws_context_per_thread *vpt;
struct lws_context_per_thread *pt;
int n = -1, m, c;
@ -145,6 +147,7 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
return 1;
pt = &context->pt[tsi];
vpt = (volatile struct lws_context_per_thread *)pt;
lws_stats_atomic_bump(context, pt, LWSSTATS_C_SERVICE_ENTRY, 1);
@ -180,7 +183,41 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
timeout_ms = 0;
}
vpt->inside_poll = 1;
lws_memory_barrier();
n = poll(pt->fds, pt->fds_count, timeout_ms);
vpt->inside_poll = 0;
lws_memory_barrier();
/* Collision will be rare and brief. Just spin until it completes */
while (vpt->foreign_spinlock)
;
/*
* At this point we are not inside a foreign thread pollfd change,
* and we have marked ourselves as outside the poll() wait. So we
* are the only guys that can modify the lws_foreign_thread_pollfd
* list on the pt. Drain the list and apply the changes to the
* affected pollfds in the correct order.
*/
ftp = vpt->foreign_pfd_list;
//lwsl_notice("cleared list %p\n", ftp);
while (ftp) {
struct lws *wsi;
struct lws_pollfd *pfd;
next = ftp->next;
pfd = &vpt->fds[ftp->fd_index];
if (lws_sockfd_valid(pfd->fd)) {
wsi = wsi_from_fd(context, pfd->fd);
if (wsi)
lws_change_pollfd(wsi, ftp->_and, ftp->_or);
}
lws_free((void *)ftp);
ftp = next;
}
vpt->foreign_pfd_list = NULL;
lws_memory_barrier();
#ifdef LWS_OPENSSL_SUPPORT
if (!n && !pt->rx_draining_ext_list &&

View file

@ -24,6 +24,9 @@
int
_lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
{
#if !defined(LWS_WITH_LIBUV) && !defined(LWS_WITH_LIBEV) && !defined(LWS_WITH_LIBEVENT)
volatile struct lws_context_per_thread *vpt;
#endif
struct lws_context_per_thread *pt;
struct lws_context *context;
int ret = 0, pa_events = 1;
@ -33,7 +36,8 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
if (!wsi || wsi->position_in_fds_table < 0)
return 0;
if (wsi->handling_pollout && !_and && _or == LWS_POLLOUT) {
if (((volatile struct lws *)wsi)->handling_pollout &&
!_and && _or == LWS_POLLOUT) {
/*
* Happening alongside service thread handling POLLOUT.
* The danger is when he is finished, he will disable POLLOUT,
@ -42,7 +46,7 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
* Instead of changing the fds, inform the service thread
* what happened, and ask it to leave POLLOUT active on exit
*/
wsi->leave_pollout_active = 1;
((volatile struct lws *)wsi)->leave_pollout_active = 1;
/*
* by definition service thread is not in poll wait, so no need
* to cancel service
@ -55,9 +59,72 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
context = wsi->context;
pt = &context->pt[(int)wsi->tsi];
assert(wsi->position_in_fds_table >= 0 &&
wsi->position_in_fds_table < (int)pt->fds_count);
#if !defined(LWS_WITH_LIBUV) && !defined(LWS_WITH_LIBEV) && !defined(LWS_WITH_LIBEVENT)
/*
* This only applies when we use the default poll() event loop.
*
* BSD can revert pa->events at any time, when the kernel decides to
* exit from poll(). We can't protect against it using locking.
*
* Therefore we must check first if the service thread is in poll()
* wait; if so, we know we must be being called from a foreign thread,
* and we must keep a strictly ordered list of changes we made instead
* of trying to apply them, since when poll() exits, which may happen
* at any time it would revert our changes.
*
* The plat code will apply them when it leaves the poll() wait
* before doing anything else.
*/
vpt = (volatile struct lws_context_per_thread *)pt;
vpt->foreign_spinlock = 1;
lws_memory_barrier();
if (vpt->inside_poll) {
struct lws_foreign_thread_pollfd *ftp, **ftp1;
/*
* We are certainly a foreign thread trying to change events
* while the service thread is in the poll() wait.
*
* Create a list of changes to be applied after poll() exit,
* instead of trying to apply them now.
*/
ftp = lws_malloc(sizeof(*ftp), "ftp");
if (!ftp) {
vpt->foreign_spinlock = 0;
lws_memory_barrier();
ret = -1;
goto bail;
}
ftp->_and = _and;
ftp->_or = _or;
ftp->fd_index = wsi->position_in_fds_table;
ftp->next = NULL;
/* place at END of list to maintain order */
ftp1 = (struct lws_foreign_thread_pollfd **)
&vpt->foreign_pfd_list;
while (*ftp1)
ftp1 = &((*ftp1)->next);
*ftp1 = ftp;
vpt->foreign_spinlock = 0;
lws_memory_barrier();
lws_cancel_service_pt(wsi);
return 0;
}
vpt->foreign_spinlock = 0;
lws_memory_barrier();
#endif
pfd = &pt->fds[wsi->position_in_fds_table];
pa->fd = wsi->desc.sockfd;
lwsl_debug("%s: fd %d old events %d\n", __func__, pa->fd, pfd->events);
@ -107,13 +174,11 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
#endif
if (pa_events) {
if (lws_plat_change_pollfd(context, wsi, pfd)) {
lwsl_info("%s failed\n", __func__);
ret = -1;
goto bail;
}
sampled_tid = context->service_tid;
if (sampled_tid && wsi->vhost) {
tid = wsi->vhost->protocols[0].callback(wsi,
@ -126,6 +191,7 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
lws_cancel_service_pt(wsi);
}
}
bail:
return ret;
}

View file

@ -534,6 +534,14 @@ enum lws_ssl_capable_status {
LWS_SSL_CAPABLE_MORE_SERVICE = -4, /* general retry */
};
#if defined(__clang__)
#define lws_memory_barrier() __sync_synchronize()
#elif defined(__GNUC__)
#define lws_memory_barrier() __sync_synchronize()
#else
#define lws_memory_barrier()
#endif
enum lws_websocket_opcodes_07 {
LWSWSOPC_CONTINUATION = 0,
LWSWSOPC_TEXT_FRAME = 1,
@ -776,6 +784,13 @@ struct lws_fd_hashtable {
};
#endif
struct lws_foreign_thread_pollfd {
struct lws_foreign_thread_pollfd *next;
int fd_index;
int _and;
int _or;
};
/*
* This is totally opaque to code using the library. It's exported as a
* forward-reference pointer-only declaration; the user can use the pointer with
@ -847,6 +862,7 @@ struct lws_context_per_thread {
pthread_mutex_t lock;
#endif
struct lws_pollfd *fds;
volatile struct lws_foreign_thread_pollfd * volatile foreign_pfd_list;
#if defined(LWS_WITH_ESP8266)
struct lws **lws_vs_fds_index;
#endif
@ -899,6 +915,9 @@ struct lws_context_per_thread {
lws_sockfd_type dummy_pipe_fds[2];
struct lws *pipe_wsi;
volatile unsigned char inside_poll;
volatile unsigned char foreign_spinlock;
unsigned int fds_count;
uint32_t ah_pool_length;
@ -2026,10 +2045,6 @@ struct lws {
unsigned int redirect_to_https:1;
#endif
/* volatile to make sure code is aware other thread can change */
volatile unsigned int handling_pollout:1;
volatile unsigned int leave_pollout_active:1;
#ifndef LWS_NO_CLIENT
unsigned short c_port;
#endif
@ -2059,6 +2074,9 @@ struct lws {
#if defined(LWS_WITH_CGI) || !defined(LWS_NO_CLIENT)
char reason_bf; /* internal writeable callback reason bitfield */
#endif
/* volatile to make sure code is aware other thread can change */
volatile char handling_pollout;
volatile char leave_pollout_active;
};
#define lws_is_flowcontrolled(w) (!!(wsi->rxflow_bitmap))

View file

@ -76,9 +76,10 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
struct lws **wsi2, *wsi2a;
#endif
int ret, m, n;
volatile struct lws *vwsi = (volatile struct lws *)wsi;
wsi->leave_pollout_active = 0;
wsi->handling_pollout = 1;
vwsi->leave_pollout_active = 0;
vwsi->handling_pollout = 1;
/*
* if another thread wants POLLOUT on us, from here on while
* handling_pollout is set, he will only set leave_pollout_active.
@ -332,14 +333,14 @@ user_service:
/* one shot */
if (wsi->parent_carries_io) {
wsi->handling_pollout = 0;
wsi->leave_pollout_active = 0;
vwsi->handling_pollout = 0;
vwsi->leave_pollout_active = 0;
return lws_calllback_as_writeable(wsi);
}
if (pollfd) {
int eff = wsi->leave_pollout_active;
int eff = vwsi->leave_pollout_active;
if (!eff)
if (lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
@ -347,7 +348,7 @@ user_service:
goto bail_die;
}
wsi->handling_pollout = 0;
vwsi->handling_pollout = 0;
/* cannot get leave_pollout_active set after the above */
if (!eff && wsi->leave_pollout_active)
@ -355,7 +356,7 @@ user_service:
* handling_pollout, force POLLOUT on */
lws_calllback_as_writeable(wsi);
wsi->leave_pollout_active = 0;
vwsi->leave_pollout_active = 0;
}
if (wsi->mode != LWSCM_WSCL_ISSUE_HTTP_BODY &&
@ -460,7 +461,7 @@ user_service_go_again:
if (w->state == LWSS_HTTP_ISSUING_FILE) {
w->leave_pollout_active = 0;
((volatile struct lws *)w)->leave_pollout_active = 0;
/* >0 == completion, <0 == error
*
@ -528,12 +529,13 @@ next_child:
notify:
#endif
wsi->leave_pollout_active = 0;
vwsi = (volatile struct lws *)wsi;
vwsi->leave_pollout_active = 0;
n = lws_calllback_as_writeable(wsi);
wsi->handling_pollout = 0;
vwsi->handling_pollout = 0;
if (wsi->leave_pollout_active)
if (vwsi->leave_pollout_active)
lws_change_pollfd(wsi, 0, LWS_POLLOUT);
return n;
@ -544,14 +546,14 @@ notify:
*/
bail_ok:
wsi->handling_pollout = 0;
wsi->leave_pollout_active = 0;
vwsi->handling_pollout = 0;
vwsi->leave_pollout_active = 0;
return 0;
bail_die:
wsi->handling_pollout = 0;
wsi->leave_pollout_active = 0;
vwsi->handling_pollout = 0;
vwsi->leave_pollout_active = 0;
return -1;
}