1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

service: assert on thread shenanigans

This commit is contained in:
Andy Green 2021-08-10 06:35:59 +01:00
parent eb8c8354c4
commit f3d5b9b99a
15 changed files with 191 additions and 34 deletions

View file

@ -67,6 +67,8 @@ struct lws_event_loop_ops {
void (*destroy_pt)(struct lws_context *context, int tsi);
/* called just before wsi is freed */
void (*destroy_wsi)(struct lws *wsi);
/* return nonzero if caller thread is not loop service thread */
int (*foreign_thread)(struct lws_context *context, int tsi);
uint8_t flags;

View file

@ -1426,6 +1426,13 @@ hubbub_error
html_parser_cb(const hubbub_token *token, void *pw);
#endif
#if defined(_DEBUG)
void
lws_service_assert_loop_thread(struct lws_context *cx, int tsi);
#else
#define lws_service_assert_loop_thread(_cx, _tsi)
#endif
int
lws_threadpool_tsi_context(struct lws_context *context, int tsi);

View file

@ -24,6 +24,27 @@
#include "private-lib-core.h"
#if defined(_DEBUG)
void
lws_service_assert_loop_thread(struct lws_context *cx, int tsi)
{
if (!cx->event_loop_ops->foreign_thread)
/* we can't judge it */
return;
if (!cx->event_loop_ops->foreign_thread(cx, tsi))
/* OK */
return;
/*
* Lws apis are NOT THREADSAFE with the sole exception of
* lws_cancel_service(). If you look at the assert backtrace, you
* should see you're illegally calling an lws api from another thread.
*/
assert(0);
}
#endif
int
lws_callback_as_writeable(struct lws *wsi)
{

View file

@ -490,6 +490,7 @@ static const struct lws_event_loop_ops event_loop_ops_glib = {
/* run_pt */ elops_run_pt_glib,
/* destroy_pt */ elops_destroy_pt_glib,
/* destroy wsi */ elops_destroy_wsi_glib,
/* foreign_thread */ NULL,
/* flags */ LELOF_DESTROY_FINAL,

View file

@ -428,6 +428,7 @@ static const struct lws_event_loop_ops event_loop_ops_ev = {
/* run_pt */ elops_run_pt_ev,
/* destroy_pt */ elops_destroy_pt_ev,
/* destroy wsi */ elops_destroy_wsi_ev,
/* foreign_thread */ NULL,
/* flags */ 0,

View file

@ -490,6 +490,7 @@ static const struct lws_event_loop_ops event_loop_ops_event = {
/* run_pt */ elops_run_pt_event,
/* destroy_pt */ elops_destroy_pt_event,
/* destroy wsi */ elops_destroy_wsi_event,
/* foreign_thread */ NULL,
/* flags */ 0,

View file

@ -1,7 +1,7 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
* Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
@ -95,6 +95,7 @@ lws_io_cb(uv_poll_t *watcher, int status, int revents)
struct lws *wsi = (struct lws *)((uv_handle_t *)watcher)->data;
struct lws_context *context = wsi->a.context;
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
struct lws_pt_eventlibs_libuv *ptpriv = pt_to_priv_uv(pt);
struct lws_pollfd eventfd;
lws_context_lock(pt->context, __func__);
@ -103,6 +104,12 @@ lws_io_cb(uv_poll_t *watcher, int status, int revents)
if (pt->is_destroyed)
goto bail;
if (!ptpriv->thread_valid) {
/* record the thread id that gave us our first event */
ptpriv->uv_thread = uv_thread_self();
ptpriv->thread_valid = 1;
}
#if defined(WIN32) || defined(_WIN32)
eventfd.fd = watcher->socket;
#else
@ -144,7 +151,7 @@ lws_io_cb(uv_poll_t *watcher, int status, int revents)
return;
}
uv_idle_start(&pt_to_priv_uv(pt)->idle, lws_uv_idle);
uv_idle_start(&ptpriv->idle, lws_uv_idle);
return;
bail:
@ -489,6 +496,11 @@ elops_accept_uv(struct lws *wsi)
struct lws_pt_eventlibs_libuv *ptpriv = pt_to_priv_uv(pt);
struct lws_io_watcher_libuv *w_read = &wsi_to_priv_uv(wsi)->w_read;
if (!ptpriv->thread_valid) {
/* record the thread id that gave us our first event */
ptpriv->uv_thread = uv_thread_self();
ptpriv->thread_valid = 1;
}
w_read->context = wsi->a.context;
@ -864,6 +876,27 @@ lws_libuv_closehandle(struct lws *wsi)
uv_close(handle, lws_libuv_closewsi);
}
static int
elops_foreign_thread_uv(struct lws_context *cx, int tsi)
{
struct lws_context_per_thread *pt = &cx->pt[tsi];
struct lws_pt_eventlibs_libuv *ptpriv = pt_to_priv_uv(pt);
uv_thread_t th = uv_thread_self();
if (!ptpriv->thread_valid)
/*
* We can't judge it until we get the first event from the loop
*/
return 0;
/*
* This is the same thread that gave us the first event on this loop?
* Return 0 if so.
*/
return !uv_thread_equal(&th, &ptpriv->uv_thread);
}
static const struct lws_event_loop_ops event_loop_ops_uv = {
/* name */ "libuv",
/* init_context */ elops_init_context_uv,
@ -879,6 +912,7 @@ static const struct lws_event_loop_ops event_loop_ops_uv = {
/* run_pt */ elops_run_pt_uv,
/* destroy_pt */ elops_destroy_pt_uv,
/* destroy wsi */ NULL,
/* foreign_thread */ elops_foreign_thread_uv,
/* flags */ 0,

View file

@ -52,27 +52,32 @@ struct lws_signal_watcher_libuv {
};
struct lws_pt_eventlibs_libuv {
uv_loop_t *io_loop;
struct lws_context_per_thread *pt;
uv_signal_t signals[8];
uv_timer_t sultimer;
uv_idle_t idle;
uv_loop_t *io_loop;
struct lws_context_per_thread *pt;
uv_signal_t signals[8];
uv_timer_t sultimer;
uv_idle_t idle;
uv_thread_t uv_thread;
struct lws_signal_watcher_libuv w_sigint;
int extant_handles;
int extant_handles;
char thread_valid;
};
struct lws_context_eventlibs_libuv {
uv_loop_t loop;
uv_loop_t loop;
};
struct lws_io_watcher_libuv {
uv_poll_t *pwatcher;
struct lws_context *context;
uint8_t actual_events;
uv_poll_t *pwatcher;
struct lws_context *context;
uint8_t actual_events;
};
struct lws_wsi_eventlibs_libuv {
struct lws_io_watcher_libuv w_read;
struct lws_io_watcher_libuv w_read;
};
uv_loop_t *

View file

@ -1,7 +1,7 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
* Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
@ -25,28 +25,29 @@
#include <private-lib-core.h>
#include "private-lib-event-libs-poll.h"
static int
elops_foreign_thread_poll(struct lws_context *cx, int tsi)
{
struct lws_context_per_thread *pt = &cx->pt[tsi];
volatile struct lws_context_per_thread *vpt =
(volatile struct lws_context_per_thread *)pt;
/*
* To avoid mandating a specific threading library, we can check
* probabilistically by seeing if the lws default wait is still asleep
* at the time we are checking, if it is then we cannot be being called
* by the event loop loop thread.
*/
return vpt->inside_poll;
}
struct lws_event_loop_ops event_loop_ops_poll = {
/* name */ "poll",
/* init_context */ NULL,
/* destroy_context1 */ NULL,
/* destroy_context2 */ NULL,
/* init_vhost_listen_wsi */ NULL,
/* init_pt */ NULL,
/* wsi_logical_close */ NULL,
/* check_client_connect_ok */ NULL,
/* close_handle_manually */ NULL,
/* accept */ NULL,
/* io */ NULL,
/* run */ NULL,
/* destroy_pt */ NULL,
/* destroy wsi */ NULL,
.name = "poll",
/* flags */ LELOF_ISPOLL,
.foreign_thread = elops_foreign_thread_poll,
/* evlib_size_ctx */ 0,
/* evlib_size_pt */ 0,
/* evlib_size_vh */ 0,
/* evlib_size_wsi */ 0,
.flags = LELOF_ISPOLL,
};
const lws_plugin_evlib_t evlib_poll = {

View file

@ -296,6 +296,7 @@ static const struct lws_event_loop_ops event_loop_ops_uloop = {
/* run_pt */ elops_run_pt_uloop,
/* destroy_pt */ elops_destroy_pt_uloop,
/* destroy wsi */ elops_destroy_wsi_uloop,
/* foreign_thread */ NULL,
/* flags */ 0,

View file

@ -40,6 +40,7 @@ lws_plat_service(struct lws_context *context, int timeout_ms)
int
_lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
{
volatile struct lws_context_per_thread *vpt;
struct lws_context_per_thread *pt;
lws_usec_t timeout_us;
int n = -1, m, c, a = 0;
@ -50,6 +51,7 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
return 1;
pt = &context->pt[tsi];
vpt = (volatile struct lws_context_per_thread *)pt;
{
unsigned long m = lws_now_secs();
@ -138,7 +140,11 @@ again:
FD_SET(pt->fds[n].fd, &errfds);
}
vpt->inside_poll = 1;
lws_memory_barrier();
n = select(max_fd + 1, &readfds, &writefds, &errfds, ptv);
vpt->inside_poll = 0;
lws_memory_barrier();
n = 0;
for (m = 0; m < (int)pt->fds_count; m++) {

View file

@ -85,6 +85,8 @@ lws_ss_set_metadata(struct lws_ss_handle *h, const char *name,
{
lws_ss_metadata_t *omd = lws_ss_get_handle_metadata(h, name);
lws_service_assert_loop_thread(h->context, h->tsi);
if (omd)
return _lws_ss_set_metadata(omd, name, value, len);
@ -147,6 +149,8 @@ lws_ss_alloc_set_metadata(struct lws_ss_handle *h, const char *name,
{
lws_ss_metadata_t *omd = lws_ss_get_handle_metadata(h, name);
lws_service_assert_loop_thread(h->context, h->tsi);
if (!omd) {
lwsl_info("%s: unknown metadata %s\n", __func__, name);
return 1;
@ -164,6 +168,8 @@ lws_ss_get_metadata(struct lws_ss_handle *h, const char *name,
int n;
#endif
lws_service_assert_loop_thread(h->context, h->tsi);
if (omd) {
*value = omd->value__may_own_heap;
*len = omd->length;
@ -217,6 +223,8 @@ lws_ss_get_handle_metadata(struct lws_ss_handle *h, const char *name)
{
int n;
lws_service_assert_loop_thread(h->context, h->tsi);
for (n = 0; n < h->policy->metadata_count; n++)
if (!strcmp(name, h->metadata[n].name))
return &h->metadata[n];

View file

@ -608,6 +608,8 @@ lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
uint8_t *ua;
char *p;
lws_service_assert_loop_thread(context, tsi);
/* allocate the handle (including ssi), the user alloc,
* and the streamname */
@ -716,6 +718,8 @@ lws_sspc_destroy(lws_sspc_handle_t **ph)
return;
}
lws_service_assert_loop_thread(h->context, 0);
if (h->destroying)
return;
@ -783,6 +787,8 @@ lws_sspc_request_tx(lws_sspc_handle_t *h)
if (!h || !h->cwsi)
return LWSSSSRET_OK;
lws_service_assert_loop_thread(h->context, 0);
if (!h->us_earliest_write_req)
h->us_earliest_write_req = lws_now_usecs();
@ -824,6 +830,8 @@ lws_sspc_request_tx_len(lws_sspc_handle_t *h, unsigned long len)
if (!h)
return LWSSSSRET_OK;
lws_service_assert_loop_thread(h->context, 0);
lwsl_sspc_notice(h, "setting writeable_len %u", (unsigned int)len);
h->writeable_len = len;
h->pending_writeable_len = 1;
@ -852,6 +860,8 @@ lws_sspc_client_connect(lws_sspc_handle_t *h)
if (!h || h->state == LPCSCLI_OPERATIONAL)
return 0;
lws_service_assert_loop_thread(h->context, 0);
assert(h->state == LPCSCLI_LOCAL_CONNECTED);
if (h->state == LPCSCLI_LOCAL_CONNECTED &&
h->conn_req_state == LWSSSPC_ONW_NONE)
@ -906,6 +916,8 @@ _lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name,
{
lws_sspc_metadata_t *md;
lws_service_assert_loop_thread(h->context, 0);
/*
* Are we replacing a pending metadata of the same name? It's not
* efficient to do this but user code can do what it likes... let's
@ -985,6 +997,8 @@ lws_sspc_get_metadata(struct lws_sspc_handle *h, const char *name,
* the same name first
*/
lws_service_assert_loop_thread(h->context, 0);
lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
lws_dll2_get_head(&h->metadata_owner_rx)) {
md = lws_container_of(d,
@ -1005,6 +1019,7 @@ lws_sspc_get_metadata(struct lws_sspc_handle *h, const char *name,
int
lws_sspc_add_peer_tx_credit(struct lws_sspc_handle *h, int32_t bump)
{
lws_service_assert_loop_thread(h->context, 0);
lwsl_sspc_notice(h, "%d\n", bump);
return _lws_sspc_set_metadata(h, "", NULL, 0, (int)bump);
}
@ -1012,12 +1027,14 @@ lws_sspc_add_peer_tx_credit(struct lws_sspc_handle *h, int32_t bump)
int
lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle *h)
{
lws_service_assert_loop_thread(h->context, 0);
return h->txc.peer_tx_cr_est;
}
void
lws_sspc_start_timeout(struct lws_sspc_handle *h, unsigned int timeout_ms)
{
lws_service_assert_loop_thread(h->context, 0);
if (!h->cwsi)
/* we can't fulfil it */
return;

View file

@ -518,6 +518,8 @@ _lws_ss_backoff(lws_ss_handle_t *h, lws_usec_t us_override)
uint64_t ms;
char conceal;
lws_service_assert_loop_thread(h->context, h->tsi);
if (h->seqstate == SSSEQ_RECONNECT_WAIT)
return LWSSSSRET_OK;
@ -577,6 +579,8 @@ lws_smd_ss_cb(void *opaque, lws_smd_class_t _class,
lws_ss_handle_t *h = (lws_ss_handle_t *)opaque;
uint8_t *p = (uint8_t *)buf - LWS_SMD_SS_RX_HEADER_LEN;
lws_service_assert_loop_thread(h->context, h->tsi);
/*
* When configured with SS enabled, lws over-allocates
* LWS_SMD_SS_RX_HEADER_LEN bytes behind the payload of the queued
@ -603,6 +607,8 @@ lws_ss_smd_tx_cb(lws_sorted_usec_list_t *sul)
lws_smd_class_t _class;
int flags = 0, n;
lws_service_assert_loop_thread(h->context, h->tsi);
if (!h->info.tx)
return;
@ -646,6 +652,8 @@ _lws_ss_client_connect(lws_ss_handle_t *h, int is_retry, void *conn_if_sspc_onw)
lws_strexp_t exp;
struct lws *wsi;
lws_service_assert_loop_thread(h->context, h->tsi);
if (!h->policy) {
lwsl_err("%s: ss with no policy\n", __func__);
@ -880,6 +888,9 @@ lws_ss_state_return_t
lws_ss_client_connect(lws_ss_handle_t *h)
{
lws_ss_state_return_t r;
lws_service_assert_loop_thread(h->context, h->tsi);
r = _lws_ss_client_connect(h, 0, 0);
_lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, h->wsi, &h);
return r;
@ -908,6 +919,8 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
char *p;
int n;
lws_service_assert_loop_thread(context, tsi);
#if defined(LWS_WITH_SECURE_STREAMS_CPP)
pol = ssi->policy;
if (!pol) {
@ -1320,6 +1333,8 @@ lws_ss_destroy(lws_ss_handle_t **ppss)
if (!h)
return;
lws_service_assert_loop_thread(h->context, h->tsi);
if (h == h->h_in_svc) {
lwsl_err("%s: illegal destroy, return LWSSSSRET_DESTROY_ME instead\n",
__func__);
@ -1508,6 +1523,8 @@ _lws_ss_request_tx(lws_ss_handle_t *h)
// lwsl_notice("%s: h %p, wsi %p\n", __func__, h, h->wsi);
lws_service_assert_loop_thread(h->context, h->tsi);
if (h->wsi) {
lws_callback_on_writable(h->wsi);
@ -1569,6 +1586,8 @@ _lws_ss_request_tx(lws_ss_handle_t *h)
lws_ss_state_return_t
lws_ss_request_tx_len(lws_ss_handle_t *h, unsigned long len)
{
lws_service_assert_loop_thread(h->context, h->tsi);
if (h->wsi && h->policy &&
(h->policy->protocol == LWSSSP_H1 ||
h->policy->protocol == LWSSSP_H2 ||
@ -1633,6 +1652,8 @@ lws_ss_add_peer_tx_credit(struct lws_ss_handle *h, int32_t bump)
{
const struct ss_pcols *ssp;
lws_service_assert_loop_thread(h->context, h->tsi);
ssp = ss_pcols[(int)h->policy->protocol];
if (h->wsi && ssp && ssp->tx_cr_add)
@ -1646,6 +1667,8 @@ lws_ss_get_est_peer_tx_credit(struct lws_ss_handle *h)
{
const struct ss_pcols *ssp;
lws_service_assert_loop_thread(h->context, h->tsi);
ssp = ss_pcols[(int)h->policy->protocol];
if (h->wsi && ssp && ssp->tx_cr_add)
@ -1681,6 +1704,8 @@ lws_ss_to_cb(lws_sorted_usec_list_t *sul)
void
lws_ss_start_timeout(struct lws_ss_handle *h, unsigned int timeout_ms)
{
lws_service_assert_loop_thread(h->context, h->tsi);
if (!timeout_ms && !h->policy->timeout_ms)
return;
@ -1692,6 +1717,7 @@ lws_ss_start_timeout(struct lws_ss_handle *h, unsigned int timeout_ms)
void
lws_ss_cancel_timeout(struct lws_ss_handle *h)
{
lws_service_assert_loop_thread(h->context, h->tsi);
lws_sul_cancel(&h->sul_timeout);
}

View file

@ -28,12 +28,22 @@
#include <pthread.h>
/*
* Define this to cause an ss api access from a foreign thread, it will
* assert. This is for testing lws, don't do this in your code.
*/
// #define DO_ILLEGAL_API_THREAD
static int interrupted, bad = 1, finished;
static lws_sorted_usec_list_t sul_timeout;
static struct lws_context *context;
static pthread_t pthread_spam;
static int wakes, started_thread;
#if defined(DO_ILLEGAL_API_THREAD)
static struct lws_ss_handle *ss; /* only needed for DO_ILLEGAL_API_THREAD */
#endif
/* the data shared between the spam thread and the lws event loop */
static pthread_mutex_t lock_shared;
@ -89,6 +99,16 @@ thread_spam(void *d)
__func__, shared_counter);
lws_cancel_service(context);
#if defined(DO_ILLEGAL_API_THREAD)
/*
* ILLEGAL...
* We cannot call any other lws api from a foreign thread
*/
if (ss)
lws_ss_request_tx(ss);
#endif
pthread_mutex_unlock(&lock_shared); /* } shared lock ------- */
usleep(100000); /* wait 100ms and signal main thread again */
@ -171,7 +191,13 @@ system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
* messages are coming
*/
if (lws_ss_create(context, 0, &ssi_lws_threads, NULL, NULL, NULL, NULL)) {
if (lws_ss_create(context, 0, &ssi_lws_threads, NULL,
#if defined(DO_ILLEGAL_API_THREAD)
&ss,
#else
NULL,
#endif
NULL, NULL)) {
lwsl_err("%s: failed to create secure stream\n",
__func__);