1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

smp: more lock assertions

This commit is contained in:
Andy Green 2021-04-04 04:06:24 +01:00
parent b582dd49fb
commit 7d8f742594
12 changed files with 138 additions and 58 deletions

View file

@ -110,10 +110,12 @@ lws_create_new_server_wsi(struct lws_vhost *vhost, int fixed_tsi, const char *de
}
/* if not a socket, it's a raw, non-ssl file descriptor */
/* if not a socket, it's a raw, non-ssl file descriptor
* req cx lock, acq pt lock, acq vh lock
*/
static struct lws *
lws_adopt_descriptor_vhost1(struct lws_vhost *vh, lws_adoption_type type,
__lws_adopt_descriptor_vhost1(struct lws_vhost *vh, lws_adoption_type type,
const char *vh_prot_name, struct lws *parent,
void *opaque, const char *fi_wsi_name)
{
@ -128,23 +130,21 @@ lws_adopt_descriptor_vhost1(struct lws_vhost *vh, lws_adoption_type type,
* we initialize it, it may become "live" concurrently unexpectedly...
*/
lws_context_lock(vh->context, __func__);
lws_context_assert_lock_held(vh->context);
n = -1;
if (parent)
n = parent->tsi;
new_wsi = lws_create_new_server_wsi(vh, n, "adopted");
if (!new_wsi) {
lws_context_unlock(vh->context);
if (!new_wsi)
return NULL;
}
/* bring in specific fault injection rules early */
lws_fi_inherit_copy(&new_wsi->fic, &context->fic, "wsi", fi_wsi_name);
if (lws_fi(&new_wsi->fic, "createfail")) {
lws_fi_destroy(&new_wsi->fic);
lws_context_unlock(vh->context);
return NULL;
}
@ -199,8 +199,6 @@ lws_adopt_descriptor_vhost1(struct lws_vhost *vh, lws_adoption_type type,
&new_wsi->a.vhost->vh_awaiting_socket_owner);
lws_vhost_unlock(new_wsi->a.vhost);
lws_context_unlock(vh->context);
return new_wsi;
bail:
@ -212,13 +210,11 @@ bail:
lws_fi_destroy(&new_wsi->fic);
lws_vhost_unbind_wsi(new_wsi);
lws_pt_unlock(pt);
__lws_vhost_unbind_wsi(new_wsi); /* req cx, acq vh lock */
lws_free(new_wsi);
lws_pt_unlock(pt);
lws_context_unlock(vh->context);
return NULL;
}
@ -442,7 +438,7 @@ lws_adopt_descriptor_vhost2(struct lws *new_wsi, lws_adoption_type type,
*
* !!! For mux protocols, this will cause an additional inactive ss
* representing the nwsi. Doing that allows us to support both h1
* (here) and h2 (at lws_wsi_server_new())
* (here) and h2 (at __lws_wsi_server_new())
*/
lwsl_info("%s: %s, vhost %s\n", __func__, new_wsi->lc.gutag,
@ -518,13 +514,15 @@ lws_adopt_descriptor_vhost_via_info(const lws_adopt_desc_t *info)
}
#endif
new_wsi = lws_adopt_descriptor_vhost1(info->vh, info->type,
lws_context_lock(info->vh->context, __func__);
new_wsi = __lws_adopt_descriptor_vhost1(info->vh, info->type,
info->vh_prot_name, info->parent,
info->opaque, info->fi_wsi_name);
if (!new_wsi) {
if (info->type & LWS_ADOPT_SOCKET)
compatible_close(info->fd.sockfd);
return NULL;
goto bail;
}
if (info->type & LWS_ADOPT_SOCKET &&
@ -537,7 +535,12 @@ lws_adopt_descriptor_vhost_via_info(const lws_adopt_desc_t *info)
lws_peer_add_wsi(info->vh->context, peer, new_wsi);
#endif
return lws_adopt_descriptor_vhost2(new_wsi, info->type, info->fd);
new_wsi = lws_adopt_descriptor_vhost2(new_wsi, info->type, info->fd);
bail:
lws_context_unlock(info->vh->context);
return new_wsi;
}
struct lws *
@ -793,10 +796,14 @@ lws_create_adopt_udp(struct lws_vhost *vhost, const char *ads, int port,
/* create the logical wsi without any valid fd */
wsi = lws_adopt_descriptor_vhost1(vhost, LWS_ADOPT_SOCKET |
lws_context_lock(vhost->context, __func__);
wsi = __lws_adopt_descriptor_vhost1(vhost, LWS_ADOPT_SOCKET |
LWS_ADOPT_RAW_SOCKET_UDP,
protocol_name, parent_wsi, opaque,
fi_wsi_name);
lws_context_unlock(vhost->context);
if (!wsi) {
lwsl_err("%s: udp wsi creation failed\n", __func__);
goto bail;
@ -888,6 +895,7 @@ lws_create_adopt_udp(struct lws_vhost *vhost, const char *ads, int port,
/* dns lookup is happening asynchronously */
// lwsl_notice("%s: returning wsi %p\n", __func__, wsi);
return wsi;
#endif
#if !defined(LWS_WITH_SYS_ASYNC_DNS)

View file

@ -243,12 +243,14 @@ solo:
!strcmp(meth, "MQTT")) &&
lws_dll2_is_detached(&wsi->dll2_cli_txn_queue) &&
lws_dll2_is_detached(&wsi->dll_cli_active_conns)) {
lws_context_lock(wsi->a.context, __func__);
lws_vhost_lock(wsi->a.vhost);
lwsl_info("%s: adding active conn %s\n", __func__, lws_wsi_tag(wsi));
/* caution... we will have to unpick this on oom4 path */
lws_dll2_add_head(&wsi->dll_cli_active_conns,
&wsi->a.vhost->dll_cli_active_conns_owner);
lws_vhost_unlock(wsi->a.vhost);
lws_context_unlock(wsi->a.context);
}
/*

View file

@ -168,12 +168,18 @@ __lws_reset_wsi(struct lws *wsi)
#endif
}
/* req cx lock */
void
__lws_free_wsi(struct lws *wsi)
{
struct lws_vhost *vh;
if (!wsi)
return;
lws_context_assert_lock_held(wsi->a.context);
#if defined(LWS_WITH_SECURE_STREAMS)
if (wsi->for_ss) {
/*
@ -191,10 +197,13 @@ __lws_free_wsi(struct lws *wsi)
__lws_reset_wsi(wsi);
__lws_wsi_remove_from_sul(wsi);
vh = wsi->a.vhost;
if (wsi->a.context->event_loop_ops->destroy_wsi)
wsi->a.context->event_loop_ops->destroy_wsi(wsi);
lws_vhost_unbind_wsi(wsi);
if (vh)
__lws_vhost_unbind_wsi(wsi); /* req cx + vh lock */
lwsl_debug("%s: %s, tsi fds count %d\n", __func__,
lws_wsi_tag(wsi),
@ -280,6 +289,8 @@ lws_addrinfo_clean(struct lws *wsi)
#endif
}
/* requires cx lock */
void
__lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason,
const char *caller)
@ -791,6 +802,9 @@ async_close:
__lws_close_free_wsi_final(wsi);
}
/* cx + vh lock */
void
__lws_close_free_wsi_final(struct lws *wsi)
{
@ -843,11 +857,17 @@ __lws_close_free_wsi_final(struct lws *wsi)
void
lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason, const char *caller)
{
struct lws_context *cx = wsi->a.context;
struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
lws_context_lock(cx, __func__);
lws_pt_lock(pt, __func__);
/* may destroy vhost, cannot hold vhost lock outside it */
__lws_close_free_wsi(wsi, reason, caller);
lws_pt_unlock(pt);
lws_context_unlock(cx);
}

View file

@ -561,6 +561,7 @@ lws_callback_on_writable(struct lws *wsi)
void
lws_same_vh_protocol_insert(struct lws *wsi, int n)
{
lws_context_lock(wsi->a.context, __func__);
lws_vhost_lock(wsi->a.vhost);
lws_dll2_remove(&wsi->same_vh_protocol);
@ -570,6 +571,7 @@ lws_same_vh_protocol_insert(struct lws *wsi, int n)
wsi->bound_vhost_index = (uint8_t)n;
lws_vhost_unlock(wsi->a.vhost);
lws_context_unlock(wsi->a.context);
}
void
@ -585,11 +587,13 @@ lws_same_vh_protocol_remove(struct lws *wsi)
if (!wsi->a.vhost)
return;
lws_context_lock(wsi->a.context, __func__);
lws_vhost_lock(wsi->a.vhost);
__lws_same_vh_protocol_remove(wsi);
lws_vhost_unlock(wsi->a.vhost);
lws_context_unlock(wsi->a.context);
}

View file

@ -433,8 +433,8 @@ struct lws_vhost {
char proxy_basic_auth_token[128];
#endif
#if LWS_MAX_SMP > 1
pthread_mutex_t lock;
char close_flow_vs_tsi[LWS_MAX_SMP];
struct lws_mutex_refcount mr;
char close_flow_vs_tsi[LWS_MAX_SMP];
#endif
#if defined(LWS_ROLE_H2)
@ -1150,7 +1150,7 @@ lws_adopt_socket_vhost(struct lws_vhost *vh, lws_sockfd_type accept_fd);
void
lws_vhost_bind_wsi(struct lws_vhost *vh, struct lws *wsi);
void
lws_vhost_unbind_wsi(struct lws *wsi);
__lws_vhost_unbind_wsi(struct lws *wsi); /* req cx + vh lock */
void
__lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs);

View file

@ -575,7 +575,7 @@ lws_create_vhost(struct lws_context *context,
#endif
#if LWS_MAX_SMP > 1
pthread_mutex_init(&vh->lock, NULL);
lws_mutex_refcount_init(&vh->mr);
#endif
if (!pcols && !info->pprotocols)
@ -1268,7 +1268,8 @@ lws_vhost_destroy1(struct lws_vhost *vh)
v->lserv_wsi = vh->lserv_wsi;
if (v->lserv_wsi) {
lws_vhost_unbind_wsi(vh->lserv_wsi);
/* req cx + vh lock */
__lws_vhost_unbind_wsi(vh->lserv_wsi);
lws_vhost_bind_wsi(v, v->lserv_wsi);
vh->lserv_wsi = NULL;
}
@ -1311,6 +1312,8 @@ destroy_ais(struct lws_dll2 *d, void *user)
/*
* Either start close or destroy any wsi on the vhost that belong to this pt,
* if SMP mark the vh that we have done it for
*
* Must not have lock on vh
*/
void
@ -1433,7 +1436,7 @@ __lws_vhost_destroy2(struct lws_vhost *vh)
#endif
#if LWS_MAX_SMP > 1
pthread_mutex_destroy(&vh->lock);
lws_mutex_refcount_destroy(&context->mr);
#endif
#if defined(LWS_WITH_UNIX_SOCK)
@ -1669,6 +1672,7 @@ lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin)
}
#endif
lws_context_lock(wsi->a.context, __func__); /* -------------- cx { */
lws_vhost_lock(wsi->a.vhost); /* ----------------------------------- { */
lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
@ -1738,6 +1742,7 @@ lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin)
wsi->client_h2_alpn = 1;
lws_wsi_h2_adopt(w, wsi);
lws_vhost_unlock(wsi->a.vhost); /* } ---------- */
lws_context_unlock(wsi->a.context); /* -------------- cx { */
*nwsi = w;
@ -1760,7 +1765,7 @@ lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin)
wsi->client_mux_substream = 1;
lws_vhost_unlock(wsi->a.vhost); /* } ---------- */
lws_context_unlock(wsi->a.context); /* -------------- cx { */
return ACTIVE_CONNS_MUXED;
}
@ -1794,6 +1799,7 @@ lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin)
* to take over parsing the rx.
*/
lws_vhost_unlock(wsi->a.vhost); /* } ---------- */
lws_context_unlock(wsi->a.context); /* -------------- cx { */
*nwsi = w;
@ -1804,6 +1810,7 @@ lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin)
solo:
lws_vhost_unlock(wsi->a.vhost); /* } ---------------------------------- */
lws_context_unlock(wsi->a.context); /* -------------- cx { */
/* there is nobody already connected in the same way */

View file

@ -79,7 +79,8 @@ static void
lws_sul_wsitimeout_cb(lws_sorted_usec_list_t *sul)
{
struct lws *wsi = lws_container_of(sul, struct lws, sul_timeout);
struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
struct lws_context *cx = wsi->a.context;
struct lws_context_per_thread *pt = &cx->pt[(int)wsi->tsi];
/* no need to log normal idle keepalive timeout */
// if (wsi->pending_timeout != PENDING_TIMEOUT_HTTP_KEEPALIVE_IDLE)
@ -115,9 +116,11 @@ lws_sul_wsitimeout_cb(lws_sorted_usec_list_t *sul)
(void *)"Timed out waiting SSL", 21);
#endif
lws_context_lock(cx, __func__);
lws_pt_lock(pt, __func__);
__lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS, "timeout");
lws_pt_unlock(pt);
lws_context_unlock(cx);
}
void

View file

@ -67,13 +67,17 @@ lws_vhost_bind_wsi(struct lws_vhost *vh, struct lws *wsi)
assert(wsi->a.vhost->count_bound_wsi > 0);
}
/* req cx lock... acquires vh lock */
void
lws_vhost_unbind_wsi(struct lws *wsi)
__lws_vhost_unbind_wsi(struct lws *wsi)
{
if (!wsi->a.vhost)
return;
lws_context_lock(wsi->a.context, __func__); /* ---------- context { */
lws_context_assert_lock_held(wsi->a.context);
lws_vhost_lock(wsi->a.vhost);
assert(wsi->a.vhost->count_bound_wsi > 0);
wsi->a.vhost->count_bound_wsi--;
@ -87,13 +91,16 @@ lws_vhost_unbind_wsi(struct lws *wsi)
* by any pt: nothing can be servicing any wsi belonging
* to it any more.
*
* Finalize the vh destruction
* Finalize the vh destruction... must drop vh lock
*/
lws_vhost_unlock(wsi->a.vhost);
__lws_vhost_destroy2(wsi->a.vhost);
wsi->a.vhost = NULL;
return;
}
wsi->a.vhost = NULL;
lws_context_unlock(wsi->a.context); /* } context ---------- */
lws_vhost_unlock(wsi->a.vhost);
wsi->a.vhost = NULL;
}
struct lws *
@ -1503,6 +1510,7 @@ lws_wsi_mux_apply_queue(struct lws *wsi)
{
/* we have a transaction queue that wants to pipeline */
lws_context_lock(wsi->a.context, __func__); /* -------------- cx { */
lws_vhost_lock(wsi->a.vhost);
lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
@ -1543,6 +1551,7 @@ lws_wsi_mux_apply_queue(struct lws *wsi)
} lws_end_foreach_dll_safe(d, d1);
lws_vhost_unlock(wsi->a.vhost);
lws_context_unlock(wsi->a.context); /* } cx -------------- */
return 0;
}

View file

@ -806,18 +806,10 @@ void lwsl_emit_stderr(int level, const char *line);
#define lws_context_lock(c, reason) lws_mutex_refcount_lock(&c->mr, reason)
#define lws_context_unlock(c) lws_mutex_refcount_unlock(&c->mr)
#define lws_context_assert_lock_held(c) lws_mutex_refcount_assert_held(&c->mr)
static LWS_INLINE void
lws_vhost_lock(struct lws_vhost *vhost)
{
pthread_mutex_lock(&vhost->lock);
}
static LWS_INLINE void
lws_vhost_unlock(struct lws_vhost *vhost)
{
pthread_mutex_unlock(&vhost->lock);
}
#define lws_vhost_assert_lock_held(v) lws_mutex_refcount_assert_held(&v->mr)
/* enforce context lock held */
#define lws_vhost_lock(v) lws_mutex_refcount_lock(&v->mr, __func__)
#define lws_vhost_unlock(v) lws_mutex_refcount_unlock(&v->mr)
#else
@ -829,6 +821,7 @@ lws_vhost_unlock(struct lws_vhost *vhost)
#define lws_context_lock(_a, _b) (void)(_a)
#define lws_context_unlock(_a) (void)(_a)
#define lws_context_assert_lock_held(_a) (void)(_a)
#define lws_vhost_assert_lock_held(_a) (void)(_a)
#define lws_vhost_lock(_a) (void)(_a)
#define lws_vhost_unlock(_a) (void)(_a)
#define lws_pt_stats_lock(_a) (void)(_a)

View file

@ -68,6 +68,9 @@ __lws_shadow_wsi(struct lws_dbus_ctx *ctx, DBusWatch *w, int fd, int create_ok)
if (!create_ok)
return NULL;
lws_context_assert_lock_held(wsi->a.context);
lws_vhost_assert_lock_held(wsi->a.vhost);
/* requires context lock */
wsi = __lws_wsi_create_with_role(ctx->vh->context, ctx->tsi, NULL);
if (wsi == NULL) {
@ -89,7 +92,7 @@ __lws_shadow_wsi(struct lws_dbus_ctx *ctx, DBusWatch *w, int fd, int create_ok)
lws_vhost_bind_wsi(ctx->vh, wsi);
if (__insert_wsi_socket_into_fds(ctx->vh->context, wsi)) {
lwsl_err("inserting wsi socket into fds failed\n");
lws_vhost_unbind_wsi(wsi);
__lws_vhost_unbind_wsi(wsi); /* cx + vh lock */
lws_free(wsi);
return NULL;
}
@ -98,7 +101,7 @@ __lws_shadow_wsi(struct lws_dbus_ctx *ctx, DBusWatch *w, int fd, int create_ok)
}
/*
* Requires vhost lock
* Requires cx + vhost lock
*/
static int
@ -106,6 +109,9 @@ __lws_shadow_wsi_destroy(struct lws_dbus_ctx *ctx, struct lws *wsi)
{
lwsl_info("%s: destroying shadow wsi\n", __func__);
lws_context_assert_lock_held(wsi->a.context);
lws_vhost_assert_lock_held(wsi->a.vhost);
if (__remove_wsi_socket_from_fds(wsi)) {
lwsl_err("%s: unable to remove %d from fds\n", __func__,
wsi->desc.sockfd);
@ -113,7 +119,7 @@ __lws_shadow_wsi_destroy(struct lws_dbus_ctx *ctx, struct lws *wsi)
return 1;
}
lws_vhost_unbind_wsi(wsi);
__lws_vhost_unbind_wsi(wsi);
lws_free(wsi);
@ -191,8 +197,9 @@ lws_dbus_add_watch(DBusWatch *w, void *data)
return TRUE;
}
/* cx + vh lock */
static int
check_destroy_shadow_wsi(struct lws_dbus_ctx *ctx, struct lws *wsi)
__check_destroy_shadow_wsi(struct lws_dbus_ctx *ctx, struct lws *wsi)
{
int n;
@ -499,7 +506,7 @@ rops_handle_POLLIN_dbus(struct lws_context_per_thread *pt, struct lws *wsi,
handle_dispatch_status(NULL, DBUS_DISPATCH_DATA_REMAINS, NULL);
check_destroy_shadow_wsi(ctx, wsi);
__check_destroy_shadow_wsi(ctx, wsi);
} else
if (ctx->dbs)
/* ??? */

View file

@ -210,9 +210,11 @@ lws_h2_update_peer_txcredit_thresh(struct lws *wsi, unsigned int sid, int thresh
return lws_h2_update_peer_txcredit(wsi, sid, bump);
}
struct lws *
lws_wsi_server_new(struct lws_vhost *vh, struct lws *parent_wsi,
unsigned int sid)
/* cx + vh lock */
static struct lws *
__lws_wsi_server_new(struct lws_vhost *vh, struct lws *parent_wsi,
unsigned int sid)
{
struct lws *nwsi = lws_get_network_wsi(parent_wsi);
struct lws_h2_netconn *h2n = nwsi->h2.h2n;
@ -221,6 +223,9 @@ lws_wsi_server_new(struct lws_vhost *vh, struct lws *parent_wsi,
struct lws *wsi;
const char *p;
lws_context_assert_lock_held(vh->context);
lws_vhost_assert_lock_held(vh);
/*
* The identifier of a newly established stream MUST be numerically
* greater than all streams that the initiating endpoint has opened or
@ -310,7 +315,7 @@ bail1:
if (wsi->user_space)
lws_free_set_NULL(wsi->user_space);
vh->protocols[0].callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
lws_vhost_unbind_wsi(wsi);
__lws_vhost_unbind_wsi(wsi);
lws_free(wsi);
return NULL;
@ -795,7 +800,15 @@ int lws_h2_do_pps_send(struct lws *wsi)
* we need to treat the headers from the upgrade as the
* first job. So these need to get shifted to sid 1.
*/
h2n->swsi = lws_wsi_server_new(wsi->a.vhost, wsi, 1);
lws_context_lock(wsi->a.context, "h2 mig");
lws_vhost_lock(wsi->a.vhost);
h2n->swsi = __lws_wsi_server_new(wsi->a.vhost, wsi, 1);
lws_vhost_unlock(wsi->a.vhost);
lws_context_unlock(wsi->a.context);
if (!h2n->swsi)
goto bail;
@ -1260,8 +1273,15 @@ lws_h2_parse_frame_header(struct lws *wsi)
* of a new stream
*/
h2n->swsi = lws_wsi_server_new(wsi->a.vhost, wsi,
h2n->sid);
lws_context_lock(wsi->a.context, "h2 new str");
lws_vhost_lock(wsi->a.vhost);
h2n->swsi = __lws_wsi_server_new(wsi->a.vhost, wsi,
h2n->sid);
lws_vhost_unlock(wsi->a.vhost);
lws_context_unlock(wsi->a.context);
if (!h2n->swsi) {
lws_h2_goaway(wsi, H2_ERR_PROTOCOL_ERROR,
"OOM");
@ -1442,7 +1462,14 @@ lws_h2_parse_end_of_frame(struct lws *wsi)
* we need to treat the headers from the upgrade as the
* first job. So these need to get shifted to sid 1.
*/
h2n->swsi = lws_wsi_server_new(wsi->a.vhost, wsi, 1);
lws_context_lock(wsi->a.context, "h2 mig");
lws_vhost_lock(wsi->a.vhost);
h2n->swsi = __lws_wsi_server_new(wsi->a.vhost, wsi, 1);
lws_vhost_unlock(wsi->a.vhost);
lws_context_unlock(wsi->a.context);
if (!h2n->swsi)
return 1;
h2n->sid = 1;

View file

@ -1209,7 +1209,7 @@ bail1:
w->a.vhost->protocols[0].callback(w,
LWS_CALLBACK_WSI_DESTROY,
NULL, NULL, 0);
lws_vhost_unbind_wsi(w);
__lws_vhost_unbind_wsi(w); /* cx + vh lock */
lws_free(w);
return 0;