1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

uv: allocate watcher

Until now the uv watcher has been composed in the wsi.

This works fine except in the case of a client wsi that
meets a redirect when the event loop is libuv with its
requirement for handle close via the event loop.

We want to reuse the wsi, since the originator of it has
a copy of the wsi pointer, and we want to conceal the
redirect.  Since the redirect is commonly to a different
IP, we want to keep the wsi alive while closing its
socket cleanly.  That's not too difficult, unless you are
using uv.

With UV the comoposed watcher is a disaster, since after
the close is requested the wsi will start to reconnect.
We tried to deal with that by copying the uv handle and
freeing it when the handle close finalizes.  But it turns
out the handle is in a linked-list scheme in uv.

This patch hopefully finally solves it by giving the uv
handle its own allocation from the start.  When we want
to close the socket and reuse the wsi, we simply take
responsibility for freeing the handle and set the wsi
watcher pointer to NULL.
This commit is contained in:
Andy Green 2018-10-13 11:59:04 +08:00
parent b5227df2d7
commit d702b83d10
9 changed files with 106 additions and 47 deletions

View file

@ -195,7 +195,8 @@ lws_adopt_descriptor_vhost(struct lws_vhost *vh, lws_adoption_type type,
lwsl_debug("new wsi wsistate 0x%x\n", new_wsi->wsistate);
if (context->event_loop_ops->accept)
context->event_loop_ops->accept(new_wsi);
if (context->event_loop_ops->accept(new_wsi))
goto fail;
if (!(type & LWS_ADOPT_ALLOW_SSL)) {
lws_pt_lock(pt, __func__);

View file

@ -786,7 +786,8 @@ lws_create_event_pipes(struct lws_context *context)
lwsl_debug("event pipe fd %d\n", wsi->desc.sockfd);
if (context->event_loop_ops->accept)
context->event_loop_ops->accept(wsi);
if (context->event_loop_ops->accept(wsi))
return 1;
if (__insert_wsi_socket_into_fds(context, wsi))
return 1;

View file

@ -243,7 +243,7 @@ elops_init_context_ev(struct lws_context *context,
return 0;
}
static void
static int
elops_accept_ev(struct lws *wsi)
{
int fd;
@ -258,6 +258,8 @@ elops_accept_ev(struct lws *wsi)
ev_io_init(&wsi->w_read.ev.watcher, lws_accept_cb, fd, EV_READ);
ev_io_init(&wsi->w_write.ev.watcher, lws_accept_cb, fd, EV_WRITE);
return 0;
}
static void

View file

@ -213,7 +213,7 @@ elops_init_context_event(struct lws_context *context,
return 0;
}
static void
static int
elops_accept_event(struct lws *wsi)
{
struct lws_context *context = lws_get_context(wsi);
@ -235,6 +235,8 @@ elops_accept_event(struct lws *wsi)
(EV_READ | EV_PERSIST), lws_event_cb, &wsi->w_read);
wsi->w_write.event.watcher = event_new(pt->event.io_loop, fd,
(EV_WRITE | EV_PERSIST), lws_event_cb, &wsi->w_write);
return 0;
}
static void

View file

@ -79,9 +79,7 @@ lws_uv_idle(uv_idle_t *handle
static void
lws_io_cb(uv_poll_t *watcher, int status, int revents)
{
struct lws_io_watcher *lws_io = lws_container_of(watcher,
struct lws_io_watcher, uv.watcher);
struct lws *wsi = lws_container_of(lws_io, struct lws, w_read);
struct lws *wsi = (struct lws *)((uv_handle_t *)watcher)->data;
struct lws_context *context = wsi->context;
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
struct lws_pollfd eventfd;
@ -153,7 +151,7 @@ lws_libuv_stop(struct lws_context *context)
pt = &context->pt[m];
if (pt->pipe_wsi) {
uv_poll_stop(&pt->pipe_wsi->w_read.uv.watcher);
uv_poll_stop(pt->pipe_wsi->w_read.uv.pwatcher);
lws_destroy_event_pipe(pt->pipe_wsi);
pt->pipe_wsi = NULL;
}
@ -311,19 +309,13 @@ lws_uv_getloop(struct lws_context *context, int tsi)
return NULL;
}
static void
lws_libuv_closewsi_m(uv_handle_t* handle)
{
lws_sockfd_type sockfd = (lws_sockfd_type)(lws_intptr_t)handle->data;
lwsl_debug("%s: sockfd %d\n", __func__, sockfd);
compatible_close(sockfd);
lws_free(handle);
}
int
lws_libuv_check_watcher_active(struct lws *wsi)
{
uv_handle_t *h = (void *)&wsi->w_read.uv.watcher;
uv_handle_t *h = (uv_handle_t *)wsi->w_read.uv.pwatcher;
if (!h)
return 0;
return uv_is_active(h);
}
@ -571,7 +563,7 @@ elops_wsi_logical_close_uv(struct lws *wsi)
if (wsi->listener || wsi->event_pipe) {
lwsl_debug("%s: %p: %d %d stop listener / pipe poll\n",
__func__, wsi, wsi->listener, wsi->event_pipe);
uv_poll_stop(&wsi->w_read.uv.watcher);
uv_poll_stop(wsi->w_read.uv.pwatcher);
}
lwsl_debug("%s: lws_libuv_closehandle: wsi %p\n", __func__, wsi);
/*
@ -594,31 +586,61 @@ elops_check_client_connect_ok_uv(struct lws *wsi)
}
static void
elops_close_handle_manually_uv(struct lws *wsi)
lws_libuv_closewsi_m(uv_handle_t* handle)
{
struct lws_io_watcher *h = (void *)&wsi->w_read.uv.watcher, *nh;
nh = lws_malloc(sizeof(*h), __func__);
*nh = *h;
lwsl_debug("%s: lws_libuv_closehandle: wsi %p\n", __func__, wsi);
((uv_handle_t *)nh)->data = (void *)(lws_intptr_t)wsi->desc.sockfd;
/* required to defer actual deletion until libuv has processed it */
uv_close((uv_handle_t *)nh, lws_libuv_closewsi_m);
lws_sockfd_type sockfd = (lws_sockfd_type)(lws_intptr_t)handle->data;
lwsl_debug("%s: sockfd %d\n", __func__, sockfd);
compatible_close(sockfd);
lws_free(handle);
}
static void
elops_close_handle_manually_uv(struct lws *wsi)
{
uv_handle_t *h = (uv_handle_t *)wsi->w_read.uv.pwatcher;
lwsl_debug("%s: lws_libuv_closehandle: wsi %p\n", __func__, wsi);
/*
* the "manual" variant only closes the handle itself and the
* related fd. handle->data is the fd.
*/
h->data = (void *)(lws_intptr_t)wsi->desc.sockfd;
/*
* We take responsibility to close / destroy these now.
* Remove any trace from the wsi.
*/
wsi->desc.sockfd = LWS_SOCK_INVALID;
wsi->w_read.uv.pwatcher = NULL;
uv_close(h, lws_libuv_closewsi_m);
}
static int
elops_accept_uv(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
wsi->w_read.context = wsi->context;
wsi->w_read.uv.pwatcher =
lws_malloc(sizeof(*wsi->w_read.uv.pwatcher), "uvh");
if (!wsi->w_read.uv.pwatcher)
return -1;
if (wsi->role_ops->file_handle)
uv_poll_init(pt->uv.io_loop, &wsi->w_read.uv.watcher,
uv_poll_init(pt->uv.io_loop, wsi->w_read.uv.pwatcher,
(int)(long long)wsi->desc.filefd);
else
uv_poll_init_socket(pt->uv.io_loop, &wsi->w_read.uv.watcher,
wsi->desc.sockfd);
uv_poll_init_socket(pt->uv.io_loop,
wsi->w_read.uv.pwatcher,
wsi->desc.sockfd);
((uv_handle_t *)wsi->w_read.uv.pwatcher)->data = (void *)wsi;
return 0;
}
static void
@ -650,7 +672,7 @@ elops_io_uv(struct lws *wsi, int flags)
if (flags & LWS_EV_READ)
current_events |= UV_READABLE;
uv_poll_start(&w->uv.watcher, current_events, lws_io_cb);
uv_poll_start(w->uv.pwatcher, current_events, lws_io_cb);
} else {
if (flags & LWS_EV_WRITE)
current_events &= ~UV_WRITABLE;
@ -659,9 +681,9 @@ elops_io_uv(struct lws *wsi, int flags)
current_events &= ~UV_READABLE;
if (!(current_events & (UV_READABLE | UV_WRITABLE)))
uv_poll_stop(&w->uv.watcher);
uv_poll_stop(w->uv.pwatcher);
else
uv_poll_start(&w->uv.watcher, current_events,
uv_poll_start(w->uv.pwatcher, current_events,
lws_io_cb);
}
@ -684,14 +706,23 @@ elops_init_vhost_listen_wsi_uv(struct lws *wsi)
return 0;
wsi->w_read.context = wsi->context;
n = uv_poll_init_socket(pt->uv.io_loop,
&wsi->w_read.uv.watcher, wsi->desc.sockfd);
wsi->w_read.uv.pwatcher =
lws_malloc(sizeof(*wsi->w_read.uv.pwatcher), "uvh");
if (!wsi->w_read.uv.pwatcher)
return -1;
n = uv_poll_init_socket(pt->uv.io_loop, wsi->w_read.uv.pwatcher,
wsi->desc.sockfd);
if (n) {
lwsl_err("uv_poll_init failed %d, sockfd=%p\n",
n, (void *)(lws_intptr_t)wsi->desc.sockfd);
lwsl_err("uv_poll_init failed %d, sockfd=%p\n", n,
(void *)(lws_intptr_t)wsi->desc.sockfd);
return -1;
}
((uv_handle_t *)wsi->w_read.uv.pwatcher)->data = (void *)wsi;
elops_io_uv(wsi, LWS_EV_START | LWS_EV_READ);
return 0;
@ -835,8 +866,7 @@ elops_init_pt_uv(struct lws_context *context, void *_loop, int tsi)
static void
lws_libuv_closewsi(uv_handle_t* handle)
{
struct lws *n = NULL, *wsi = (struct lws *)(((char *)handle) -
(char *)(&n->w_read.uv.watcher));
struct lws *wsi = (struct lws *)handle->data;
struct lws_context *context = lws_get_context(wsi);
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
int lspd = 0, m;
@ -858,6 +888,9 @@ lws_libuv_closewsi(uv_handle_t* handle)
__lws_close_free_wsi_final(wsi);
lws_pt_unlock(pt);
/* it's our job to close the handle finally */
lws_free(handle);
if (lspd == 2 && context->deprecation_cb) {
lwsl_notice("calling deprecation callback\n");
context->deprecation_cb();
@ -902,6 +935,11 @@ lws_libuv_closewsi(uv_handle_t* handle)
void
lws_libuv_closehandle(struct lws *wsi)
{
uv_handle_t* handle;
if (!wsi->w_read.uv.pwatcher)
return;
if (wsi->told_event_loop_closed) {
assert(0);
return;
@ -911,8 +949,18 @@ lws_libuv_closehandle(struct lws *wsi)
wsi->told_event_loop_closed = 1;
/* required to defer actual deletion until libuv has processed it */
uv_close((uv_handle_t*)&wsi->w_read.uv.watcher, lws_libuv_closewsi);
/*
* The normal close path attaches the related wsi as the
* handle->data.
*/
handle = (uv_handle_t *)wsi->w_read.uv.pwatcher;
/* ensure we can only do this once */
wsi->w_read.uv.pwatcher = NULL;
uv_close(handle, lws_libuv_closewsi);
}
struct lws_event_loop_ops event_loop_ops_uv = {

View file

@ -58,7 +58,7 @@ struct lws_context_eventlibs_libuv {
};
struct lws_io_watcher_libuv {
uv_poll_t watcher;
uv_poll_t *pwatcher;
};
struct lws_signal_watcher_libuv {

View file

@ -41,7 +41,7 @@ struct lws_event_loop_ops {
/* close handle manually */
void (*close_handle_manually)(struct lws *wsi);
/* event loop accept processing */
void (*accept)(struct lws *wsi);
int (*accept)(struct lws *wsi);
/* control wsi active events */
void (*io)(struct lws *wsi, int flags);
/* run the event loop for a pt */

View file

@ -164,7 +164,8 @@ lws_cgi(struct lws *wsi, const char * const *exec_array, int script_uri_path_len
for (n = 0; n < 3; n++) {
if (wsi->context->event_loop_ops->accept)
wsi->context->event_loop_ops->accept(cgi->stdwsi[n]);
if (wsi->context->event_loop_ops->accept(cgi->stdwsi[n]))
goto bail3;
if (__insert_wsi_socket_into_fds(wsi->context, cgi->stdwsi[n]))
goto bail3;

View file

@ -420,7 +420,11 @@ ads_known:
lwsi_set_state(wsi, LRS_WAITING_CONNECT);
if (wsi->context->event_loop_ops->accept)
wsi->context->event_loop_ops->accept(wsi);
if (wsi->context->event_loop_ops->accept(wsi)) {
compatible_close(wsi->desc.sockfd);
cce = "event loop accept failed";
goto oom4;
}
if (__insert_wsi_socket_into_fds(wsi->context, wsi)) {
compatible_close(wsi->desc.sockfd);