From d702b83d1025bc526c8b74205cd1e11ebfb3049e Mon Sep 17 00:00:00 2001 From: Andy Green Date: Sat, 13 Oct 2018 11:59:04 +0800 Subject: [PATCH] uv: allocate watcher Until now the uv watcher has been composed in the wsi. This works fine except in the case of a client wsi that meets a redirect when the event loop is libuv with its requirement for handle close via the event loop. We want to reuse the wsi, since the originator of it has a copy of the wsi pointer, and we want to conceal the redirect. Since the redirect is commonly to a different IP, we want to keep the wsi alive while closing its socket cleanly. That's not too difficult, unless you are using uv. With UV the comoposed watcher is a disaster, since after the close is requested the wsi will start to reconnect. We tried to deal with that by copying the uv handle and freeing it when the handle close finalizes. But it turns out the handle is in a linked-list scheme in uv. This patch hopefully finally solves it by giving the uv handle its own allocation from the start. When we want to close the socket and reuse the wsi, we simply take responsibility for freeing the handle and set the wsi watcher pointer to NULL. --- lib/core/adopt.c | 3 +- lib/core/context.c | 3 +- lib/event-libs/libev/libev.c | 4 +- lib/event-libs/libevent/libevent.c | 4 +- lib/event-libs/libuv/libuv.c | 126 ++++++++++++++++------- lib/event-libs/libuv/private.h | 2 +- lib/event-libs/private.h | 2 +- lib/roles/cgi/cgi-server.c | 3 +- lib/roles/http/client/client-handshake.c | 6 +- 9 files changed, 106 insertions(+), 47 deletions(-) diff --git a/lib/core/adopt.c b/lib/core/adopt.c index 99ca856cc..d16713795 100644 --- a/lib/core/adopt.c +++ b/lib/core/adopt.c @@ -195,7 +195,8 @@ lws_adopt_descriptor_vhost(struct lws_vhost *vh, lws_adoption_type type, lwsl_debug("new wsi wsistate 0x%x\n", new_wsi->wsistate); if (context->event_loop_ops->accept) - context->event_loop_ops->accept(new_wsi); + if (context->event_loop_ops->accept(new_wsi)) + goto fail; if (!(type & LWS_ADOPT_ALLOW_SSL)) { lws_pt_lock(pt, __func__); diff --git a/lib/core/context.c b/lib/core/context.c index ba133eb00..1a39b10ab 100644 --- a/lib/core/context.c +++ b/lib/core/context.c @@ -786,7 +786,8 @@ lws_create_event_pipes(struct lws_context *context) lwsl_debug("event pipe fd %d\n", wsi->desc.sockfd); if (context->event_loop_ops->accept) - context->event_loop_ops->accept(wsi); + if (context->event_loop_ops->accept(wsi)) + return 1; if (__insert_wsi_socket_into_fds(context, wsi)) return 1; diff --git a/lib/event-libs/libev/libev.c b/lib/event-libs/libev/libev.c index 3a9853d45..26f41a638 100644 --- a/lib/event-libs/libev/libev.c +++ b/lib/event-libs/libev/libev.c @@ -243,7 +243,7 @@ elops_init_context_ev(struct lws_context *context, return 0; } -static void +static int elops_accept_ev(struct lws *wsi) { int fd; @@ -258,6 +258,8 @@ elops_accept_ev(struct lws *wsi) ev_io_init(&wsi->w_read.ev.watcher, lws_accept_cb, fd, EV_READ); ev_io_init(&wsi->w_write.ev.watcher, lws_accept_cb, fd, EV_WRITE); + + return 0; } static void diff --git a/lib/event-libs/libevent/libevent.c b/lib/event-libs/libevent/libevent.c index f4f5cfd57..9cbe511be 100644 --- a/lib/event-libs/libevent/libevent.c +++ b/lib/event-libs/libevent/libevent.c @@ -213,7 +213,7 @@ elops_init_context_event(struct lws_context *context, return 0; } -static void +static int elops_accept_event(struct lws *wsi) { struct lws_context *context = lws_get_context(wsi); @@ -235,6 +235,8 @@ elops_accept_event(struct lws *wsi) (EV_READ | EV_PERSIST), lws_event_cb, &wsi->w_read); wsi->w_write.event.watcher = event_new(pt->event.io_loop, fd, (EV_WRITE | EV_PERSIST), lws_event_cb, &wsi->w_write); + + return 0; } static void diff --git a/lib/event-libs/libuv/libuv.c b/lib/event-libs/libuv/libuv.c index e04f45148..3de132f03 100644 --- a/lib/event-libs/libuv/libuv.c +++ b/lib/event-libs/libuv/libuv.c @@ -79,9 +79,7 @@ lws_uv_idle(uv_idle_t *handle static void lws_io_cb(uv_poll_t *watcher, int status, int revents) { - struct lws_io_watcher *lws_io = lws_container_of(watcher, - struct lws_io_watcher, uv.watcher); - struct lws *wsi = lws_container_of(lws_io, struct lws, w_read); + struct lws *wsi = (struct lws *)((uv_handle_t *)watcher)->data; struct lws_context *context = wsi->context; struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; struct lws_pollfd eventfd; @@ -153,7 +151,7 @@ lws_libuv_stop(struct lws_context *context) pt = &context->pt[m]; if (pt->pipe_wsi) { - uv_poll_stop(&pt->pipe_wsi->w_read.uv.watcher); + uv_poll_stop(pt->pipe_wsi->w_read.uv.pwatcher); lws_destroy_event_pipe(pt->pipe_wsi); pt->pipe_wsi = NULL; } @@ -311,19 +309,13 @@ lws_uv_getloop(struct lws_context *context, int tsi) return NULL; } -static void -lws_libuv_closewsi_m(uv_handle_t* handle) -{ - lws_sockfd_type sockfd = (lws_sockfd_type)(lws_intptr_t)handle->data; - lwsl_debug("%s: sockfd %d\n", __func__, sockfd); - compatible_close(sockfd); - lws_free(handle); -} - int lws_libuv_check_watcher_active(struct lws *wsi) { - uv_handle_t *h = (void *)&wsi->w_read.uv.watcher; + uv_handle_t *h = (uv_handle_t *)wsi->w_read.uv.pwatcher; + + if (!h) + return 0; return uv_is_active(h); } @@ -571,7 +563,7 @@ elops_wsi_logical_close_uv(struct lws *wsi) if (wsi->listener || wsi->event_pipe) { lwsl_debug("%s: %p: %d %d stop listener / pipe poll\n", __func__, wsi, wsi->listener, wsi->event_pipe); - uv_poll_stop(&wsi->w_read.uv.watcher); + uv_poll_stop(wsi->w_read.uv.pwatcher); } lwsl_debug("%s: lws_libuv_closehandle: wsi %p\n", __func__, wsi); /* @@ -594,31 +586,61 @@ elops_check_client_connect_ok_uv(struct lws *wsi) } static void -elops_close_handle_manually_uv(struct lws *wsi) +lws_libuv_closewsi_m(uv_handle_t* handle) { - struct lws_io_watcher *h = (void *)&wsi->w_read.uv.watcher, *nh; - - nh = lws_malloc(sizeof(*h), __func__); - *nh = *h; - - lwsl_debug("%s: lws_libuv_closehandle: wsi %p\n", __func__, wsi); - ((uv_handle_t *)nh)->data = (void *)(lws_intptr_t)wsi->desc.sockfd; - /* required to defer actual deletion until libuv has processed it */ - uv_close((uv_handle_t *)nh, lws_libuv_closewsi_m); + lws_sockfd_type sockfd = (lws_sockfd_type)(lws_intptr_t)handle->data; + lwsl_debug("%s: sockfd %d\n", __func__, sockfd); + compatible_close(sockfd); + lws_free(handle); } static void +elops_close_handle_manually_uv(struct lws *wsi) +{ + uv_handle_t *h = (uv_handle_t *)wsi->w_read.uv.pwatcher; + + lwsl_debug("%s: lws_libuv_closehandle: wsi %p\n", __func__, wsi); + + /* + * the "manual" variant only closes the handle itself and the + * related fd. handle->data is the fd. + */ + h->data = (void *)(lws_intptr_t)wsi->desc.sockfd; + + /* + * We take responsibility to close / destroy these now. + * Remove any trace from the wsi. + */ + + wsi->desc.sockfd = LWS_SOCK_INVALID; + wsi->w_read.uv.pwatcher = NULL; + + uv_close(h, lws_libuv_closewsi_m); +} + +static int elops_accept_uv(struct lws *wsi) { struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; wsi->w_read.context = wsi->context; + + wsi->w_read.uv.pwatcher = + lws_malloc(sizeof(*wsi->w_read.uv.pwatcher), "uvh"); + if (!wsi->w_read.uv.pwatcher) + return -1; + if (wsi->role_ops->file_handle) - uv_poll_init(pt->uv.io_loop, &wsi->w_read.uv.watcher, + uv_poll_init(pt->uv.io_loop, wsi->w_read.uv.pwatcher, (int)(long long)wsi->desc.filefd); else - uv_poll_init_socket(pt->uv.io_loop, &wsi->w_read.uv.watcher, - wsi->desc.sockfd); + uv_poll_init_socket(pt->uv.io_loop, + wsi->w_read.uv.pwatcher, + wsi->desc.sockfd); + + ((uv_handle_t *)wsi->w_read.uv.pwatcher)->data = (void *)wsi; + + return 0; } static void @@ -650,7 +672,7 @@ elops_io_uv(struct lws *wsi, int flags) if (flags & LWS_EV_READ) current_events |= UV_READABLE; - uv_poll_start(&w->uv.watcher, current_events, lws_io_cb); + uv_poll_start(w->uv.pwatcher, current_events, lws_io_cb); } else { if (flags & LWS_EV_WRITE) current_events &= ~UV_WRITABLE; @@ -659,9 +681,9 @@ elops_io_uv(struct lws *wsi, int flags) current_events &= ~UV_READABLE; if (!(current_events & (UV_READABLE | UV_WRITABLE))) - uv_poll_stop(&w->uv.watcher); + uv_poll_stop(w->uv.pwatcher); else - uv_poll_start(&w->uv.watcher, current_events, + uv_poll_start(w->uv.pwatcher, current_events, lws_io_cb); } @@ -684,14 +706,23 @@ elops_init_vhost_listen_wsi_uv(struct lws *wsi) return 0; wsi->w_read.context = wsi->context; - n = uv_poll_init_socket(pt->uv.io_loop, - &wsi->w_read.uv.watcher, wsi->desc.sockfd); + + wsi->w_read.uv.pwatcher = + lws_malloc(sizeof(*wsi->w_read.uv.pwatcher), "uvh"); + if (!wsi->w_read.uv.pwatcher) + return -1; + + n = uv_poll_init_socket(pt->uv.io_loop, wsi->w_read.uv.pwatcher, + wsi->desc.sockfd); if (n) { - lwsl_err("uv_poll_init failed %d, sockfd=%p\n", - n, (void *)(lws_intptr_t)wsi->desc.sockfd); + lwsl_err("uv_poll_init failed %d, sockfd=%p\n", n, + (void *)(lws_intptr_t)wsi->desc.sockfd); return -1; } + + ((uv_handle_t *)wsi->w_read.uv.pwatcher)->data = (void *)wsi; + elops_io_uv(wsi, LWS_EV_START | LWS_EV_READ); return 0; @@ -835,8 +866,7 @@ elops_init_pt_uv(struct lws_context *context, void *_loop, int tsi) static void lws_libuv_closewsi(uv_handle_t* handle) { - struct lws *n = NULL, *wsi = (struct lws *)(((char *)handle) - - (char *)(&n->w_read.uv.watcher)); + struct lws *wsi = (struct lws *)handle->data; struct lws_context *context = lws_get_context(wsi); struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; int lspd = 0, m; @@ -858,6 +888,9 @@ lws_libuv_closewsi(uv_handle_t* handle) __lws_close_free_wsi_final(wsi); lws_pt_unlock(pt); + /* it's our job to close the handle finally */ + lws_free(handle); + if (lspd == 2 && context->deprecation_cb) { lwsl_notice("calling deprecation callback\n"); context->deprecation_cb(); @@ -902,6 +935,11 @@ lws_libuv_closewsi(uv_handle_t* handle) void lws_libuv_closehandle(struct lws *wsi) { + uv_handle_t* handle; + + if (!wsi->w_read.uv.pwatcher) + return; + if (wsi->told_event_loop_closed) { assert(0); return; @@ -911,8 +949,18 @@ lws_libuv_closehandle(struct lws *wsi) wsi->told_event_loop_closed = 1; - /* required to defer actual deletion until libuv has processed it */ - uv_close((uv_handle_t*)&wsi->w_read.uv.watcher, lws_libuv_closewsi); + /* + * The normal close path attaches the related wsi as the + * handle->data. + */ + + handle = (uv_handle_t *)wsi->w_read.uv.pwatcher; + + /* ensure we can only do this once */ + + wsi->w_read.uv.pwatcher = NULL; + + uv_close(handle, lws_libuv_closewsi); } struct lws_event_loop_ops event_loop_ops_uv = { diff --git a/lib/event-libs/libuv/private.h b/lib/event-libs/libuv/private.h index 58a263915..7e19c0e8b 100644 --- a/lib/event-libs/libuv/private.h +++ b/lib/event-libs/libuv/private.h @@ -58,7 +58,7 @@ struct lws_context_eventlibs_libuv { }; struct lws_io_watcher_libuv { - uv_poll_t watcher; + uv_poll_t *pwatcher; }; struct lws_signal_watcher_libuv { diff --git a/lib/event-libs/private.h b/lib/event-libs/private.h index c36d39c8c..58bca946b 100644 --- a/lib/event-libs/private.h +++ b/lib/event-libs/private.h @@ -41,7 +41,7 @@ struct lws_event_loop_ops { /* close handle manually */ void (*close_handle_manually)(struct lws *wsi); /* event loop accept processing */ - void (*accept)(struct lws *wsi); + int (*accept)(struct lws *wsi); /* control wsi active events */ void (*io)(struct lws *wsi, int flags); /* run the event loop for a pt */ diff --git a/lib/roles/cgi/cgi-server.c b/lib/roles/cgi/cgi-server.c index 3105b704d..a73299bd9 100644 --- a/lib/roles/cgi/cgi-server.c +++ b/lib/roles/cgi/cgi-server.c @@ -164,7 +164,8 @@ lws_cgi(struct lws *wsi, const char * const *exec_array, int script_uri_path_len for (n = 0; n < 3; n++) { if (wsi->context->event_loop_ops->accept) - wsi->context->event_loop_ops->accept(cgi->stdwsi[n]); + if (wsi->context->event_loop_ops->accept(cgi->stdwsi[n])) + goto bail3; if (__insert_wsi_socket_into_fds(wsi->context, cgi->stdwsi[n])) goto bail3; diff --git a/lib/roles/http/client/client-handshake.c b/lib/roles/http/client/client-handshake.c index 173ac4665..fc645b32c 100644 --- a/lib/roles/http/client/client-handshake.c +++ b/lib/roles/http/client/client-handshake.c @@ -420,7 +420,11 @@ ads_known: lwsi_set_state(wsi, LRS_WAITING_CONNECT); if (wsi->context->event_loop_ops->accept) - wsi->context->event_loop_ops->accept(wsi); + if (wsi->context->event_loop_ops->accept(wsi)) { + compatible_close(wsi->desc.sockfd); + cce = "event loop accept failed"; + goto oom4; + } if (__insert_wsi_socket_into_fds(wsi->context, wsi)) { compatible_close(wsi->desc.sockfd);