diff --git a/include/libwebsockets/lws-eventlib-exports.h b/include/libwebsockets/lws-eventlib-exports.h index 972d339ac..ceb885c3f 100644 --- a/include/libwebsockets/lws-eventlib-exports.h +++ b/include/libwebsockets/lws-eventlib-exports.h @@ -67,6 +67,8 @@ struct lws_event_loop_ops { void (*destroy_pt)(struct lws_context *context, int tsi); /* called just before wsi is freed */ void (*destroy_wsi)(struct lws *wsi); + /* return nonzero if caller thread is not loop service thread */ + int (*foreign_thread)(struct lws_context *context, int tsi); uint8_t flags; diff --git a/lib/core-net/private-lib-core-net.h b/lib/core-net/private-lib-core-net.h index fae9fc948..68add0ce0 100644 --- a/lib/core-net/private-lib-core-net.h +++ b/lib/core-net/private-lib-core-net.h @@ -1426,6 +1426,13 @@ hubbub_error html_parser_cb(const hubbub_token *token, void *pw); #endif +#if defined(_DEBUG) +void +lws_service_assert_loop_thread(struct lws_context *cx, int tsi); +#else +#define lws_service_assert_loop_thread(_cx, _tsi) +#endif + int lws_threadpool_tsi_context(struct lws_context *context, int tsi); diff --git a/lib/core-net/service.c b/lib/core-net/service.c index 8e99f3274..840f32929 100644 --- a/lib/core-net/service.c +++ b/lib/core-net/service.c @@ -24,6 +24,27 @@ #include "private-lib-core.h" +#if defined(_DEBUG) +void +lws_service_assert_loop_thread(struct lws_context *cx, int tsi) +{ + if (!cx->event_loop_ops->foreign_thread) + /* we can't judge it */ + return; + + if (!cx->event_loop_ops->foreign_thread(cx, tsi)) + /* OK */ + return; + + /* + * Lws apis are NOT THREADSAFE with the sole exception of + * lws_cancel_service(). If you look at the assert backtrace, you + * should see you're illegally calling an lws api from another thread. + */ + assert(0); +} +#endif + int lws_callback_as_writeable(struct lws *wsi) { diff --git a/lib/event-libs/glib/glib.c b/lib/event-libs/glib/glib.c index 615b867c8..14c779e15 100644 --- a/lib/event-libs/glib/glib.c +++ b/lib/event-libs/glib/glib.c @@ -490,6 +490,7 @@ static const struct lws_event_loop_ops event_loop_ops_glib = { /* run_pt */ elops_run_pt_glib, /* destroy_pt */ elops_destroy_pt_glib, /* destroy wsi */ elops_destroy_wsi_glib, + /* foreign_thread */ NULL, /* flags */ LELOF_DESTROY_FINAL, diff --git a/lib/event-libs/libev/libev.c b/lib/event-libs/libev/libev.c index 74758a8c7..4bdede78d 100644 --- a/lib/event-libs/libev/libev.c +++ b/lib/event-libs/libev/libev.c @@ -428,6 +428,7 @@ static const struct lws_event_loop_ops event_loop_ops_ev = { /* run_pt */ elops_run_pt_ev, /* destroy_pt */ elops_destroy_pt_ev, /* destroy wsi */ elops_destroy_wsi_ev, + /* foreign_thread */ NULL, /* flags */ 0, diff --git a/lib/event-libs/libevent/libevent.c b/lib/event-libs/libevent/libevent.c index 4fc2d915a..b7b310ca7 100644 --- a/lib/event-libs/libevent/libevent.c +++ b/lib/event-libs/libevent/libevent.c @@ -490,6 +490,7 @@ static const struct lws_event_loop_ops event_loop_ops_event = { /* run_pt */ elops_run_pt_event, /* destroy_pt */ elops_destroy_pt_event, /* destroy wsi */ elops_destroy_wsi_event, + /* foreign_thread */ NULL, /* flags */ 0, diff --git a/lib/event-libs/libuv/libuv.c b/lib/event-libs/libuv/libuv.c index 1c1d90d99..6ae5d3eed 100644 --- a/lib/event-libs/libuv/libuv.c +++ b/lib/event-libs/libuv/libuv.c @@ -1,7 +1,7 @@ /* * libwebsockets - small server side websockets and web server implementation * - * Copyright (C) 2010 - 2019 Andy Green + * Copyright (C) 2010 - 2021 Andy Green * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to @@ -95,6 +95,7 @@ lws_io_cb(uv_poll_t *watcher, int status, int revents) struct lws *wsi = (struct lws *)((uv_handle_t *)watcher)->data; struct lws_context *context = wsi->a.context; struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; + struct lws_pt_eventlibs_libuv *ptpriv = pt_to_priv_uv(pt); struct lws_pollfd eventfd; lws_context_lock(pt->context, __func__); @@ -103,6 +104,12 @@ lws_io_cb(uv_poll_t *watcher, int status, int revents) if (pt->is_destroyed) goto bail; + if (!ptpriv->thread_valid) { + /* record the thread id that gave us our first event */ + ptpriv->uv_thread = uv_thread_self(); + ptpriv->thread_valid = 1; + } + #if defined(WIN32) || defined(_WIN32) eventfd.fd = watcher->socket; #else @@ -144,7 +151,7 @@ lws_io_cb(uv_poll_t *watcher, int status, int revents) return; } - uv_idle_start(&pt_to_priv_uv(pt)->idle, lws_uv_idle); + uv_idle_start(&ptpriv->idle, lws_uv_idle); return; bail: @@ -489,6 +496,11 @@ elops_accept_uv(struct lws *wsi) struct lws_pt_eventlibs_libuv *ptpriv = pt_to_priv_uv(pt); struct lws_io_watcher_libuv *w_read = &wsi_to_priv_uv(wsi)->w_read; + if (!ptpriv->thread_valid) { + /* record the thread id that gave us our first event */ + ptpriv->uv_thread = uv_thread_self(); + ptpriv->thread_valid = 1; + } w_read->context = wsi->a.context; @@ -864,6 +876,27 @@ lws_libuv_closehandle(struct lws *wsi) uv_close(handle, lws_libuv_closewsi); } +static int +elops_foreign_thread_uv(struct lws_context *cx, int tsi) +{ + struct lws_context_per_thread *pt = &cx->pt[tsi]; + struct lws_pt_eventlibs_libuv *ptpriv = pt_to_priv_uv(pt); + uv_thread_t th = uv_thread_self(); + + if (!ptpriv->thread_valid) + /* + * We can't judge it until we get the first event from the loop + */ + return 0; + + /* + * This is the same thread that gave us the first event on this loop? + * Return 0 if so. + */ + + return !uv_thread_equal(&th, &ptpriv->uv_thread); +} + static const struct lws_event_loop_ops event_loop_ops_uv = { /* name */ "libuv", /* init_context */ elops_init_context_uv, @@ -879,6 +912,7 @@ static const struct lws_event_loop_ops event_loop_ops_uv = { /* run_pt */ elops_run_pt_uv, /* destroy_pt */ elops_destroy_pt_uv, /* destroy wsi */ NULL, + /* foreign_thread */ elops_foreign_thread_uv, /* flags */ 0, diff --git a/lib/event-libs/libuv/private-lib-event-libs-libuv.h b/lib/event-libs/libuv/private-lib-event-libs-libuv.h index e2625900a..48715110a 100644 --- a/lib/event-libs/libuv/private-lib-event-libs-libuv.h +++ b/lib/event-libs/libuv/private-lib-event-libs-libuv.h @@ -52,27 +52,32 @@ struct lws_signal_watcher_libuv { }; struct lws_pt_eventlibs_libuv { - uv_loop_t *io_loop; - struct lws_context_per_thread *pt; - uv_signal_t signals[8]; - uv_timer_t sultimer; - uv_idle_t idle; + uv_loop_t *io_loop; + struct lws_context_per_thread *pt; + uv_signal_t signals[8]; + uv_timer_t sultimer; + uv_idle_t idle; + + uv_thread_t uv_thread; + struct lws_signal_watcher_libuv w_sigint; - int extant_handles; + int extant_handles; + + char thread_valid; }; struct lws_context_eventlibs_libuv { - uv_loop_t loop; + uv_loop_t loop; }; struct lws_io_watcher_libuv { - uv_poll_t *pwatcher; - struct lws_context *context; - uint8_t actual_events; + uv_poll_t *pwatcher; + struct lws_context *context; + uint8_t actual_events; }; struct lws_wsi_eventlibs_libuv { - struct lws_io_watcher_libuv w_read; + struct lws_io_watcher_libuv w_read; }; uv_loop_t * diff --git a/lib/event-libs/poll/poll.c b/lib/event-libs/poll/poll.c index f3685bf28..aa45dc53e 100644 --- a/lib/event-libs/poll/poll.c +++ b/lib/event-libs/poll/poll.c @@ -1,7 +1,7 @@ /* * libwebsockets - small server side websockets and web server implementation * - * Copyright (C) 2010 - 2019 Andy Green + * Copyright (C) 2010 - 2021 Andy Green * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to @@ -25,28 +25,29 @@ #include #include "private-lib-event-libs-poll.h" +static int +elops_foreign_thread_poll(struct lws_context *cx, int tsi) +{ + struct lws_context_per_thread *pt = &cx->pt[tsi]; + volatile struct lws_context_per_thread *vpt = + (volatile struct lws_context_per_thread *)pt; + + /* + * To avoid mandating a specific threading library, we can check + * probabilistically by seeing if the lws default wait is still asleep + * at the time we are checking, if it is then we cannot be being called + * by the event loop loop thread. + */ + + return vpt->inside_poll; +} + struct lws_event_loop_ops event_loop_ops_poll = { - /* name */ "poll", - /* init_context */ NULL, - /* destroy_context1 */ NULL, - /* destroy_context2 */ NULL, - /* init_vhost_listen_wsi */ NULL, - /* init_pt */ NULL, - /* wsi_logical_close */ NULL, - /* check_client_connect_ok */ NULL, - /* close_handle_manually */ NULL, - /* accept */ NULL, - /* io */ NULL, - /* run */ NULL, - /* destroy_pt */ NULL, - /* destroy wsi */ NULL, + .name = "poll", - /* flags */ LELOF_ISPOLL, + .foreign_thread = elops_foreign_thread_poll, - /* evlib_size_ctx */ 0, - /* evlib_size_pt */ 0, - /* evlib_size_vh */ 0, - /* evlib_size_wsi */ 0, + .flags = LELOF_ISPOLL, }; const lws_plugin_evlib_t evlib_poll = { diff --git a/lib/event-libs/uloop/uloop.c b/lib/event-libs/uloop/uloop.c index 1214378b1..be9046c75 100644 --- a/lib/event-libs/uloop/uloop.c +++ b/lib/event-libs/uloop/uloop.c @@ -296,6 +296,7 @@ static const struct lws_event_loop_ops event_loop_ops_uloop = { /* run_pt */ elops_run_pt_uloop, /* destroy_pt */ elops_destroy_pt_uloop, /* destroy wsi */ elops_destroy_wsi_uloop, + /* foreign_thread */ NULL, /* flags */ 0, diff --git a/lib/plat/freertos/freertos-service.c b/lib/plat/freertos/freertos-service.c index 6170a1c55..3c60d0bd2 100644 --- a/lib/plat/freertos/freertos-service.c +++ b/lib/plat/freertos/freertos-service.c @@ -40,6 +40,7 @@ lws_plat_service(struct lws_context *context, int timeout_ms) int _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) { + volatile struct lws_context_per_thread *vpt; struct lws_context_per_thread *pt; lws_usec_t timeout_us; int n = -1, m, c, a = 0; @@ -50,6 +51,7 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) return 1; pt = &context->pt[tsi]; + vpt = (volatile struct lws_context_per_thread *)pt; { unsigned long m = lws_now_secs(); @@ -138,7 +140,11 @@ again: FD_SET(pt->fds[n].fd, &errfds); } + vpt->inside_poll = 1; + lws_memory_barrier(); n = select(max_fd + 1, &readfds, &writefds, &errfds, ptv); + vpt->inside_poll = 0; + lws_memory_barrier(); n = 0; for (m = 0; m < (int)pt->fds_count; m++) { diff --git a/lib/secure-streams/policy-common.c b/lib/secure-streams/policy-common.c index 0f4256b05..b748a77c1 100644 --- a/lib/secure-streams/policy-common.c +++ b/lib/secure-streams/policy-common.c @@ -85,6 +85,8 @@ lws_ss_set_metadata(struct lws_ss_handle *h, const char *name, { lws_ss_metadata_t *omd = lws_ss_get_handle_metadata(h, name); + lws_service_assert_loop_thread(h->context, h->tsi); + if (omd) return _lws_ss_set_metadata(omd, name, value, len); @@ -147,6 +149,8 @@ lws_ss_alloc_set_metadata(struct lws_ss_handle *h, const char *name, { lws_ss_metadata_t *omd = lws_ss_get_handle_metadata(h, name); + lws_service_assert_loop_thread(h->context, h->tsi); + if (!omd) { lwsl_info("%s: unknown metadata %s\n", __func__, name); return 1; @@ -164,6 +168,8 @@ lws_ss_get_metadata(struct lws_ss_handle *h, const char *name, int n; #endif + lws_service_assert_loop_thread(h->context, h->tsi); + if (omd) { *value = omd->value__may_own_heap; *len = omd->length; @@ -217,6 +223,8 @@ lws_ss_get_handle_metadata(struct lws_ss_handle *h, const char *name) { int n; + lws_service_assert_loop_thread(h->context, h->tsi); + for (n = 0; n < h->policy->metadata_count; n++) if (!strcmp(name, h->metadata[n].name)) return &h->metadata[n]; diff --git a/lib/secure-streams/secure-streams-client.c b/lib/secure-streams/secure-streams-client.c index c036ec520..157413ea2 100644 --- a/lib/secure-streams/secure-streams-client.c +++ b/lib/secure-streams/secure-streams-client.c @@ -608,6 +608,8 @@ lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi, uint8_t *ua; char *p; + lws_service_assert_loop_thread(context, tsi); + /* allocate the handle (including ssi), the user alloc, * and the streamname */ @@ -716,6 +718,8 @@ lws_sspc_destroy(lws_sspc_handle_t **ph) return; } + lws_service_assert_loop_thread(h->context, 0); + if (h->destroying) return; @@ -783,6 +787,8 @@ lws_sspc_request_tx(lws_sspc_handle_t *h) if (!h || !h->cwsi) return LWSSSSRET_OK; + lws_service_assert_loop_thread(h->context, 0); + if (!h->us_earliest_write_req) h->us_earliest_write_req = lws_now_usecs(); @@ -824,6 +830,8 @@ lws_sspc_request_tx_len(lws_sspc_handle_t *h, unsigned long len) if (!h) return LWSSSSRET_OK; + lws_service_assert_loop_thread(h->context, 0); + lwsl_sspc_notice(h, "setting writeable_len %u", (unsigned int)len); h->writeable_len = len; h->pending_writeable_len = 1; @@ -852,6 +860,8 @@ lws_sspc_client_connect(lws_sspc_handle_t *h) if (!h || h->state == LPCSCLI_OPERATIONAL) return 0; + lws_service_assert_loop_thread(h->context, 0); + assert(h->state == LPCSCLI_LOCAL_CONNECTED); if (h->state == LPCSCLI_LOCAL_CONNECTED && h->conn_req_state == LWSSSPC_ONW_NONE) @@ -906,6 +916,8 @@ _lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name, { lws_sspc_metadata_t *md; + lws_service_assert_loop_thread(h->context, 0); + /* * Are we replacing a pending metadata of the same name? It's not * efficient to do this but user code can do what it likes... let's @@ -985,6 +997,8 @@ lws_sspc_get_metadata(struct lws_sspc_handle *h, const char *name, * the same name first */ + lws_service_assert_loop_thread(h->context, 0); + lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1, lws_dll2_get_head(&h->metadata_owner_rx)) { md = lws_container_of(d, @@ -1005,6 +1019,7 @@ lws_sspc_get_metadata(struct lws_sspc_handle *h, const char *name, int lws_sspc_add_peer_tx_credit(struct lws_sspc_handle *h, int32_t bump) { + lws_service_assert_loop_thread(h->context, 0); lwsl_sspc_notice(h, "%d\n", bump); return _lws_sspc_set_metadata(h, "", NULL, 0, (int)bump); } @@ -1012,12 +1027,14 @@ lws_sspc_add_peer_tx_credit(struct lws_sspc_handle *h, int32_t bump) int lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle *h) { + lws_service_assert_loop_thread(h->context, 0); return h->txc.peer_tx_cr_est; } void lws_sspc_start_timeout(struct lws_sspc_handle *h, unsigned int timeout_ms) { + lws_service_assert_loop_thread(h->context, 0); if (!h->cwsi) /* we can't fulfil it */ return; diff --git a/lib/secure-streams/secure-streams.c b/lib/secure-streams/secure-streams.c index 3048766b1..ced61f324 100644 --- a/lib/secure-streams/secure-streams.c +++ b/lib/secure-streams/secure-streams.c @@ -518,6 +518,8 @@ _lws_ss_backoff(lws_ss_handle_t *h, lws_usec_t us_override) uint64_t ms; char conceal; + lws_service_assert_loop_thread(h->context, h->tsi); + if (h->seqstate == SSSEQ_RECONNECT_WAIT) return LWSSSSRET_OK; @@ -577,6 +579,8 @@ lws_smd_ss_cb(void *opaque, lws_smd_class_t _class, lws_ss_handle_t *h = (lws_ss_handle_t *)opaque; uint8_t *p = (uint8_t *)buf - LWS_SMD_SS_RX_HEADER_LEN; + lws_service_assert_loop_thread(h->context, h->tsi); + /* * When configured with SS enabled, lws over-allocates * LWS_SMD_SS_RX_HEADER_LEN bytes behind the payload of the queued @@ -603,6 +607,8 @@ lws_ss_smd_tx_cb(lws_sorted_usec_list_t *sul) lws_smd_class_t _class; int flags = 0, n; + lws_service_assert_loop_thread(h->context, h->tsi); + if (!h->info.tx) return; @@ -646,6 +652,8 @@ _lws_ss_client_connect(lws_ss_handle_t *h, int is_retry, void *conn_if_sspc_onw) lws_strexp_t exp; struct lws *wsi; + lws_service_assert_loop_thread(h->context, h->tsi); + if (!h->policy) { lwsl_err("%s: ss with no policy\n", __func__); @@ -880,6 +888,9 @@ lws_ss_state_return_t lws_ss_client_connect(lws_ss_handle_t *h) { lws_ss_state_return_t r; + + lws_service_assert_loop_thread(h->context, h->tsi); + r = _lws_ss_client_connect(h, 0, 0); _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, h->wsi, &h); return r; @@ -908,6 +919,8 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi, char *p; int n; + lws_service_assert_loop_thread(context, tsi); + #if defined(LWS_WITH_SECURE_STREAMS_CPP) pol = ssi->policy; if (!pol) { @@ -1320,6 +1333,8 @@ lws_ss_destroy(lws_ss_handle_t **ppss) if (!h) return; + lws_service_assert_loop_thread(h->context, h->tsi); + if (h == h->h_in_svc) { lwsl_err("%s: illegal destroy, return LWSSSSRET_DESTROY_ME instead\n", __func__); @@ -1508,6 +1523,8 @@ _lws_ss_request_tx(lws_ss_handle_t *h) // lwsl_notice("%s: h %p, wsi %p\n", __func__, h, h->wsi); + lws_service_assert_loop_thread(h->context, h->tsi); + if (h->wsi) { lws_callback_on_writable(h->wsi); @@ -1569,6 +1586,8 @@ _lws_ss_request_tx(lws_ss_handle_t *h) lws_ss_state_return_t lws_ss_request_tx_len(lws_ss_handle_t *h, unsigned long len) { + lws_service_assert_loop_thread(h->context, h->tsi); + if (h->wsi && h->policy && (h->policy->protocol == LWSSSP_H1 || h->policy->protocol == LWSSSP_H2 || @@ -1633,6 +1652,8 @@ lws_ss_add_peer_tx_credit(struct lws_ss_handle *h, int32_t bump) { const struct ss_pcols *ssp; + lws_service_assert_loop_thread(h->context, h->tsi); + ssp = ss_pcols[(int)h->policy->protocol]; if (h->wsi && ssp && ssp->tx_cr_add) @@ -1646,6 +1667,8 @@ lws_ss_get_est_peer_tx_credit(struct lws_ss_handle *h) { const struct ss_pcols *ssp; + lws_service_assert_loop_thread(h->context, h->tsi); + ssp = ss_pcols[(int)h->policy->protocol]; if (h->wsi && ssp && ssp->tx_cr_add) @@ -1681,6 +1704,8 @@ lws_ss_to_cb(lws_sorted_usec_list_t *sul) void lws_ss_start_timeout(struct lws_ss_handle *h, unsigned int timeout_ms) { + lws_service_assert_loop_thread(h->context, h->tsi); + if (!timeout_ms && !h->policy->timeout_ms) return; @@ -1692,6 +1717,7 @@ lws_ss_start_timeout(struct lws_ss_handle *h, unsigned int timeout_ms) void lws_ss_cancel_timeout(struct lws_ss_handle *h) { + lws_service_assert_loop_thread(h->context, h->tsi); lws_sul_cancel(&h->sul_timeout); } diff --git a/minimal-examples/secure-streams/minimal-secure-streams-threads/minimal-secure-streams-threads.c b/minimal-examples/secure-streams/minimal-secure-streams-threads/minimal-secure-streams-threads.c index 57a11f8fb..bc4d42ffd 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams-threads/minimal-secure-streams-threads.c +++ b/minimal-examples/secure-streams/minimal-secure-streams-threads/minimal-secure-streams-threads.c @@ -28,12 +28,22 @@ #include +/* + * Define this to cause an ss api access from a foreign thread, it will + * assert. This is for testing lws, don't do this in your code. + */ +// #define DO_ILLEGAL_API_THREAD + static int interrupted, bad = 1, finished; static lws_sorted_usec_list_t sul_timeout; static struct lws_context *context; static pthread_t pthread_spam; static int wakes, started_thread; +#if defined(DO_ILLEGAL_API_THREAD) +static struct lws_ss_handle *ss; /* only needed for DO_ILLEGAL_API_THREAD */ +#endif + /* the data shared between the spam thread and the lws event loop */ static pthread_mutex_t lock_shared; @@ -89,6 +99,16 @@ thread_spam(void *d) __func__, shared_counter); lws_cancel_service(context); +#if defined(DO_ILLEGAL_API_THREAD) + /* + * ILLEGAL... + * We cannot call any other lws api from a foreign thread + */ + + if (ss) + lws_ss_request_tx(ss); +#endif + pthread_mutex_unlock(&lock_shared); /* } shared lock ------- */ usleep(100000); /* wait 100ms and signal main thread again */ @@ -171,7 +191,13 @@ system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link, * messages are coming */ - if (lws_ss_create(context, 0, &ssi_lws_threads, NULL, NULL, NULL, NULL)) { + if (lws_ss_create(context, 0, &ssi_lws_threads, NULL, +#if defined(DO_ILLEGAL_API_THREAD) + &ss, +#else + NULL, +#endif + NULL, NULL)) { lwsl_err("%s: failed to create secure stream\n", __func__);