1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

event lib: update http client multi to work with it and clean destroy flow

Add selectable event lib support to minimal-http-client-multi and
clean up context destroy flow so we can use lws_destroy_context() from
inside the callback to indicate we want to end the event loop, without
using the traditional "interrupted" flag and in a way that works no
matter which event loop backend is being used.
This commit is contained in:
Andy Green 2020-01-23 11:12:28 +00:00
parent 978f2a476a
commit 271ca836c8
14 changed files with 219 additions and 83 deletions

View file

@ -147,7 +147,6 @@ __lws_free_wsi(struct lws *wsi)
__lws_reset_wsi(wsi);
if (wsi->context->event_loop_ops->destroy_wsi)
wsi->context->event_loop_ops->destroy_wsi(wsi);

View file

@ -459,8 +459,11 @@ struct lws_context_per_thread {
unsigned char tid;
unsigned char inside_service:1;
unsigned char inside_lws_service:1;
unsigned char event_loop_foreign:1;
unsigned char event_loop_destroy_processing_done:1;
unsigned char destroy_self:1;
unsigned char is_destroyed:1;
#ifdef _WIN32
unsigned char interrupt_requested:1;
#endif

View file

@ -387,7 +387,7 @@ lws_buflist_aware_read(struct lws_context_per_thread *pt, struct lws *wsi,
if (!bns && /* only acknowledge error when we handled buflist content */
n == LWS_SSL_CAPABLE_ERROR) {
lwsl_notice("%s: SSL_CAPABLE_ERROR\n", __func__);
lwsl_debug("%s: SSL_CAPABLE_ERROR\n", __func__);
return -1;
}
@ -513,11 +513,15 @@ lws_service_do_ripe_rxflow(struct lws_context_per_thread *pt)
(unsigned long)wsi->wsistate);
if (!lws_is_flowcontrolled(wsi) &&
lwsi_state(wsi) != LRS_DEFERRING_ACTION &&
(wsi->role_ops->handle_POLLIN)(pt, wsi, &pfd) ==
lwsi_state(wsi) != LRS_DEFERRING_ACTION) {
pt->inside_lws_service = 1;
if ((wsi->role_ops->handle_POLLIN)(pt, wsi, &pfd) ==
LWS_HPI_RET_PLEASE_CLOSE_ME)
lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS,
"close_and_handled");
lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS,
"close_and_handled");
pt->inside_lws_service = 0;
}
} lws_end_foreach_dll_safe(d, d1);
@ -679,6 +683,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd,
}
#endif
wsi->could_have_pending = 0; /* clear back-to-back write detection */
pt->inside_lws_service = 1;
/* okay, what we came here to do... */
@ -690,6 +695,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd,
switch ((wsi->role_ops->handle_POLLIN)(pt, wsi, pollfd)) {
case LWS_HPI_RET_WSI_ALREADY_DIED:
pt->inside_lws_service = 0;
return 1;
case LWS_HPI_RET_HANDLED:
break;
@ -714,6 +720,7 @@ close_and_handled:
* we can't clear revents now because it'd be the wrong guy's
* revents
*/
pt->inside_lws_service = 0;
return 1;
default:
assert(0);
@ -722,6 +729,7 @@ close_and_handled:
handled:
#endif
pollfd->revents = 0;
pt->inside_lws_service = 0;
return 0;
}

View file

@ -128,7 +128,9 @@ __lws_sul_service_ripe(lws_dll2_owner_t *own, lws_usec_t usnow)
/* his moment has come... remove him from timeout list */
lws_dll2_remove(&sul->list);
sul->us = 0;
pt->inside_lws_service = 1;
sul->cb(sul);
pt->inside_lws_service = 0;
/*
* The callback may have done any mixture of delete

View file

@ -936,7 +936,8 @@ lws_destroy_event_pipe(struct lws *wsi)
lwsl_info("%s\n", __func__);
__remove_wsi_socket_from_fds(wsi);
if (wsi->context->event_loop_ops->wsi_logical_close) {
if (!wsi->context->event_loop_ops->destroy_wsi &&
wsi->context->event_loop_ops->wsi_logical_close) {
wsi->context->event_loop_ops->wsi_logical_close(wsi);
lws_plat_pipe_close(wsi);
return;

View file

@ -1003,6 +1003,48 @@ lws_context_destroy2(struct lws_context *context)
lws_context_destroy3(context);
}
#if defined(LWS_WITH_NETWORK)
static void
lws_pt_destroy(struct lws_context_per_thread *pt)
{
volatile struct lws_foreign_thread_pollfd *ftp, *next;
volatile struct lws_context_per_thread *vpt;
assert(!pt->is_destroyed);
pt->destroy_self = 0;
vpt = (volatile struct lws_context_per_thread *)pt;
ftp = vpt->foreign_pfd_list;
while (ftp) {
next = ftp->next;
lws_free((void *)ftp);
ftp = next;
}
vpt->foreign_pfd_list = NULL;
if (pt->pipe_wsi)
lws_destroy_event_pipe(pt->pipe_wsi);
pt->pipe_wsi = NULL;
while (pt->fds_count) {
struct lws *wsi = wsi_from_fd(pt->context, pt->fds[0].fd);
if (!wsi)
break;
lws_close_free_wsi(wsi,
LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY,
"ctx destroy"
/* no protocol close */);
}
lws_pt_mutex_destroy(pt);
pt->is_destroyed = 1;
lwsl_info("%s: pt destroyed\n", __func__);
}
#endif
/*
* Begin the context takedown
*/
@ -1011,20 +1053,21 @@ void
lws_context_destroy(struct lws_context *context)
{
#if defined(LWS_WITH_NETWORK)
volatile struct lws_foreign_thread_pollfd *ftp, *next;
volatile struct lws_context_per_thread *vpt;
struct lws_vhost *vh = NULL;
int n, m;
int m, deferred_pt = 0;
#endif
if (!context)
if (!context || context->inside_context_destroy)
return;
context->inside_context_destroy = 1;
#if defined(LWS_WITH_NETWORK)
if (context->finalize_destroy_after_internal_loops_stopped) {
if (context->event_loop_ops->destroy_context2)
context->event_loop_ops->destroy_context2(context);
lws_context_destroy3(context);
/* context is invalid, no need to reset inside flag */
return;
}
#endif
@ -1032,20 +1075,19 @@ lws_context_destroy(struct lws_context *context)
if (!context->being_destroyed2) {
lws_context_destroy2(context);
return;
goto out;
}
lwsl_info("%s: ctx %p: already being destroyed\n",
__func__, context);
lws_context_destroy3(context);
/* context is invalid, no need to reset inside flag */
return;
}
lwsl_info("%s: ctx %p\n", __func__, context);
context->being_destroyed = 1;
context->being_destroyed1 = 1;
context->requested_kill = 1;
#if defined(LWS_WITH_NETWORK)
lws_state_transition(&context->mgr_system, LWS_SYSTATE_POLICY_INVALID);
@ -1054,32 +1096,27 @@ lws_context_destroy(struct lws_context *context)
while (m--) {
struct lws_context_per_thread *pt = &context->pt[m];
vpt = (volatile struct lws_context_per_thread *)pt;
ftp = vpt->foreign_pfd_list;
while (ftp) {
next = ftp->next;
lws_free((void *)ftp);
ftp = next;
}
vpt->foreign_pfd_list = NULL;
if (pt->is_destroyed)
continue;
for (n = 0; (unsigned int)n < context->pt[m].fds_count; n++) {
struct lws *wsi = wsi_from_fd(context, pt->fds[n].fd);
if (!wsi)
continue;
if (wsi->event_pipe)
lws_destroy_event_pipe(wsi);
else
lws_close_free_wsi(wsi,
LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY,
"ctx destroy"
/* no protocol close */);
n--;
if (pt->inside_lws_service) {
pt->destroy_self = 1;
deferred_pt = 1;
continue;
}
lws_pt_mutex_destroy(pt);
lws_pt_destroy(pt);
}
if (deferred_pt) {
lwsl_info("%s: waiting for deferred pt close\n", __func__);
lws_cancel_service(context);
goto out;
}
context->being_destroyed1 = 1;
context->requested_kill = 1;
/*
* inform all the protocols that they are done and will have no more
* callbacks.
@ -1119,7 +1156,7 @@ lws_context_destroy(struct lws_context *context)
if (context->event_loop_ops->destroy_context1) {
context->event_loop_ops->destroy_context1(context);
return;
goto out;
}
#endif
@ -1131,5 +1168,11 @@ lws_context_destroy(struct lws_context *context)
#endif
#endif
context->inside_context_destroy = 0;
lws_context_destroy2(context);
return;
out:
context->inside_context_destroy = 0;
}

View file

@ -468,6 +468,7 @@ struct lws_context {
unsigned short ip_limit_wsi;
#endif
unsigned int deprecated:1;
unsigned int inside_context_destroy:1;
unsigned int being_destroyed:1;
unsigned int being_destroyed1:1;
unsigned int being_destroyed2:1;

View file

@ -70,15 +70,18 @@ lws_ev_idle_cb(struct ev_loop *loop, struct ev_idle *handle, int revents)
/* there is nobody who needs service forcing, shut down idle */
if (!reschedule)
ev_idle_stop(loop, handle);
if (pt->destroy_self)
lws_context_destroy(pt->context);
}
static void
lws_accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
struct lws_context_per_thread *pt;
struct lws_io_watcher *lws_io = lws_container_of(watcher,
struct lws_io_watcher, ev.watcher);
struct lws_context *context = lws_io->context;
struct lws_context_per_thread *pt;
struct lws_pollfd eventfd;
struct lws *wsi;
@ -124,9 +127,9 @@ elops_init_pt_ev(struct lws_context *context, void *_loop, int tsi)
{
struct lws_context_per_thread *pt = &context->pt[tsi];
struct ev_signal *w_sigint = &context->pt[tsi].w_sigint.ev.watcher;
struct ev_loop *loop = (struct ev_loop *)_loop;
struct lws_vhost *vh = context->vhost_list;
const char *backend_name;
struct ev_loop *loop = (struct ev_loop *)_loop;
int status = 0;
int backend;
@ -267,7 +270,7 @@ elops_io_ev(struct lws *wsi, int flags)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
if (!pt->ev.io_loop)
if (!pt->ev.io_loop || pt->is_destroyed)
return;
assert((flags & (LWS_EV_START | LWS_EV_STOP)) &&
@ -284,6 +287,9 @@ elops_io_ev(struct lws *wsi, int flags)
if (flags & LWS_EV_READ)
ev_io_stop(pt->ev.io_loop, &wsi->w_read.ev.watcher);
}
if (pt->destroy_self)
lws_context_destroy(pt->context);
}
static void

View file

@ -48,6 +48,9 @@ lws_event_idle_timer_cb(int fd, short event, void *p)
struct timeval tv;
lws_usec_t us;
if (pt->is_destroyed)
return;
lws_service_do_ripe_rxflow(pt);
/*
@ -80,6 +83,10 @@ lws_event_idle_timer_cb(int fd, short event, void *p)
evtimer_add(pt->event.hrtimer, &tv);
}
lws_pt_unlock(pt);
if (pt->destroy_self)
lws_context_destroy(pt->context);
}
static void
@ -117,13 +124,20 @@ lws_event_cb(evutil_socket_t sock_fd, short revents, void *ctx)
}
wsi = wsi_from_fd(context, sock_fd);
if (!wsi) {
if (!wsi)
return;
}
pt = &context->pt[(int)wsi->tsi];
if (pt->is_destroyed)
return;
lws_service_fd_tsi(context, &eventfd, wsi->tsi);
if (pt->destroy_self) {
lws_context_destroy(pt->context);
return;
}
/* set the idle timer for 1ms ahead */
tv.tv_sec = 0;
@ -252,7 +266,8 @@ elops_io_event(struct lws *wsi, int flags)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
if (!pt->event.io_loop || wsi->context->being_destroyed)
if (!pt->event.io_loop || wsi->context->being_destroyed ||
pt->is_destroyed)
return;
assert((flags & (LWS_EV_START | LWS_EV_STOP)) &&
@ -319,14 +334,32 @@ elops_destroy_pt_event(struct lws_context *context, int tsi)
static void
elops_destroy_wsi_event(struct lws *wsi)
{
struct lws_context_per_thread *pt;
if (!wsi)
return;
if (wsi->w_read.event.watcher)
event_free(wsi->w_read.event.watcher);
pt = &wsi->context->pt[(int)wsi->tsi];
if (pt->is_destroyed)
return;
if (wsi->w_write.event.watcher)
if (wsi->w_read.event.watcher) {
event_free(wsi->w_read.event.watcher);
wsi->w_read.event.watcher = NULL;
}
if (wsi->w_write.event.watcher) {
event_free(wsi->w_write.event.watcher);
wsi->w_write.event.watcher = NULL;
}
}
static int
elops_wsi_logical_close_event(struct lws *wsi)
{
elops_destroy_wsi_event(wsi);
return 0;
}
static int
@ -411,7 +444,7 @@ struct lws_event_loop_ops event_loop_ops_event = {
/* destroy_context2 */ elops_destroy_context2_event,
/* init_vhost_listen_wsi */ elops_init_vhost_listen_wsi_event,
/* init_pt */ elops_init_pt_event,
/* wsi_logical_close */ NULL,
/* wsi_logical_close */ elops_wsi_logical_close_event,
/* check_client_connect_ok */ NULL,
/* close_handle_manually */ NULL,
/* accept */ elops_accept_event,

View file

@ -84,6 +84,9 @@ lws_io_cb(uv_poll_t *watcher, int status, int revents)
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
struct lws_pollfd eventfd;
if (pt->is_destroyed)
return;
#if defined(WIN32) || defined(_WIN32)
eventfd.fd = watcher->socket;
#else
@ -116,6 +119,11 @@ lws_io_cb(uv_poll_t *watcher, int status, int revents)
}
lws_service_fd_tsi(context, &eventfd, wsi->tsi);
if (pt->destroy_self) {
lws_context_destroy(pt->context);
return;
}
uv_idle_start(&pt->uv.idle, lws_uv_idle);
}
@ -497,7 +505,7 @@ elops_destroy_context1_uv(struct lws_context *context)
UV_RUN_NOWAIT)))
;
if (m)
lwsl_err("%s: tsi %d: not all closed\n",
lwsl_info("%s: tsi %d: not all closed\n",
__func__, n);
}

View file

@ -207,6 +207,11 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
if (_lws_plat_service_forced_tsi(context, tsi) < 0)
return -1;
if (pt->destroy_self) {
lws_context_destroy(pt->context);
return -1;
}
return 0;
}

View file

@ -611,7 +611,7 @@ rops_destroy_role_h2(struct lws *wsi)
struct allocated_headers *ah;
/* we may not have an ah, but may be on the waiting list... */
lwsl_info("%s: ah det due to close\n", __func__);
lwsl_info("%s: wsi %p: ah det due to close\n", __func__, wsi);
__lws_header_table_detach(wsi, 0);
ah = pt->http.ah_list;

View file

@ -22,4 +22,6 @@ Option|Meaning
--h1|Force http/1 only
-l|Connect to server on https://localhost:7681 instead of https://warmcat.com:443
-n|Read numbered files like /1.png, /2.png etc. Default is just read /
--uv|Use libuv event loop if lws built for it
--event|Use libevent event loop if lws built for it
--ev|Use libev event loop if lws built for it

View file

@ -1,7 +1,7 @@
/*
* lws-minimal-http-client-multi
*
* Written in 2010-2019 by Andy Green <andy@warmcat.com>
* Written in 2010-2020 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
@ -40,10 +40,10 @@ struct cliuser {
int index;
};
static int interrupted, completed, failed, numbered, stagger_idx;
static struct lws *client_wsi[COUNT];
static int completed, failed, numbered, stagger_idx;
static lws_sorted_usec_list_t sul_stagger;
static struct lws_client_connect_info i;
static struct lws *client_wsi[COUNT];
struct lws_context *context;
static int
@ -65,11 +65,7 @@ callback_http(struct lws *wsi, enum lws_callback_reasons reason,
in ? (char *)in : "(null)");
client_wsi[idx] = NULL;
failed++;
if (++completed == COUNT) {
lwsl_err("Done: failed: %d\n", failed);
interrupted = 1;
}
break;
goto finished;
/* chunks of chunked content, with header removed */
case LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ:
@ -103,16 +99,7 @@ callback_http(struct lws *wsi, enum lws_callback_reasons reason,
lwsl_user("LWS_CALLBACK_COMPLETED_CLIENT_HTTP %p: idx %d\n",
wsi, idx);
client_wsi[idx] = NULL;
if (++completed == COUNT) {
if (!failed)
lwsl_user("Done: all OK\n");
else
lwsl_err("Done: failed: %d\n", failed);
interrupted = 1;
/* so we exit immediately */
lws_cancel_service(lws_get_context(wsi));
}
break;
goto finished;
case LWS_CALLBACK_CLOSED_CLIENT_HTTP:
lwsl_info("%s: closed: %p\n", __func__, client_wsi[idx]);
@ -124,10 +111,7 @@ callback_http(struct lws *wsi, enum lws_callback_reasons reason,
*/
client_wsi[idx] = NULL;
failed++;
if (++completed == COUNT) {
lwsl_err("Done: failed: %d\n", failed);
interrupted = 1;
}
goto finished;
}
break;
@ -136,6 +120,23 @@ callback_http(struct lws *wsi, enum lws_callback_reasons reason,
}
return lws_callback_http_dummy(wsi, reason, user, in, len);
finished:
if (++completed == COUNT) {
if (!failed)
lwsl_user("Done: all OK\n");
else
lwsl_err("Done: failed: %d\n", failed);
//interrupted = 1;
/*
* This is how we can exit the event loop even when it's an
* event library backing it... it will start and stage the
* destroy to happen after we exited this service for each pt
*/
lws_context_destroy(lws_get_context(wsi));
}
return 0;
}
static const struct lws_protocols protocols[] = {
@ -143,10 +144,24 @@ static const struct lws_protocols protocols[] = {
{ NULL, NULL, 0, 0 }
};
static void
signal_cb(void *handle, int signum)
{
switch (signum) {
case SIGTERM:
case SIGINT:
break;
default:
lwsl_err("%s: signal %d\n", __func__, signum);
break;
}
lws_context_destroy(context);
}
static void
sigint_handler(int sig)
{
interrupted = 1;
signal_cb(NULL, sig);
}
#if defined(WIN32)
@ -199,7 +214,7 @@ lws_try_client_connection(struct lws_client_connect_info *i, int m)
failed++;
if (++completed == COUNT) {
lwsl_user("Done: failed: %d\n", failed);
interrupted = 1;
lws_context_destroy(context);
}
} else
lwsl_user("started connection %p: idx %d (%s)\n",
@ -233,8 +248,7 @@ int main(int argc, const char **argv)
struct lws_context_creation_info info;
unsigned long long start;
const char *p;
int n = 0, m, staggered = 0, logs =
LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
int m, staggered = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
/* for LLL_ verbosity above NOTICE to be built into lws,
* lws must have been configured and built with
* -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */
@ -242,10 +256,23 @@ int main(int argc, const char **argv)
/* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
/* | LLL_DEBUG */;
signal(SIGINT, sigint_handler);
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
info.signal_cb = signal_cb;
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
if (lws_cmdline_option(argc, argv, "--uv"))
info.options |= LWS_SERVER_OPTION_LIBUV;
else
if (lws_cmdline_option(argc, argv, "--event"))
info.options |= LWS_SERVER_OPTION_LIBEVENT;
else
if (lws_cmdline_option(argc, argv, "--ev"))
info.options |= LWS_SERVER_OPTION_LIBEV;
else
signal(SIGINT, sigint_handler);
staggered = !!lws_cmdline_option(argc, argv, "-s");
if ((p = lws_cmdline_option(argc, argv, "-d")))
logs = atoi(p);
@ -255,8 +282,6 @@ int main(int argc, const char **argv)
lwsl_user(" [--h1 (http/1 only)] [-l (localhost)] [-d <logs>]\n");
lwsl_user(" [-n (numbered)]\n");
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
info.protocols = protocols;
/*
@ -341,8 +366,8 @@ int main(int argc, const char **argv)
100 * LWS_US_PER_MS);
start = us();
while (n >= 0 && !interrupted)
n = lws_service(context, 0);
while (!lws_service(context, 0))
;
lwsl_user("Duration: %lldms\n", (us() - start) / 1000);
lws_context_destroy(context);