diff --git a/READMEs/README.coding.md b/READMEs/README.coding.md index a6f8e231..218b4454 100644 --- a/READMEs/README.coding.md +++ b/READMEs/README.coding.md @@ -132,7 +132,7 @@ with the socket closing and the `wsi` freed. Websocket write activities should only take place in the `LWS_CALLBACK_SERVER_WRITEABLE` callback as described below. -[This network-programming necessity to link the issue of new data to +This network-programming necessity to link the issue of new data to the peer taking the previous data is not obvious to all users so let's repeat that in other words: @@ -155,12 +155,20 @@ websocket ones, you can combine them together with the websocket ones in one poll loop, see "External Polling Loop support" below, and still do it all in one thread / process context. -If you insist on trying to use it from multiple threads, take special care if -you might simultaneously create more than one context from different threads. - SSL_library_init() is called from the context create api and it also is not reentrant. So at least create the contexts sequentially. +If you must interoperate with other threads, you can use `lws_cancel_service()` +to notify lws that something has happened on another thread. lws will send +`LWS_CALLBACK_EVENT_WAIT_CANCELLED` events to all protocols, serialized with +the main event loop operations, ie, safely. + +You can handle this callback to check the reason you were notified and take +action using any usual lws api, since you are in a callback in the normal +service thread. + +`lws_cancel_service()` is very cheap for the other thread to call. + @section closing Closing connections from the user side When you want to close a connection, you do it by returning `-1` from a diff --git a/lib/context.c b/lib/context.c index d68f3f42..0462bd64 100644 --- a/lib/context.c +++ b/lib/context.c @@ -834,6 +834,77 @@ lws_init_vhost_client_ssl(const struct lws_context_creation_info *info, return lws_context_init_client_ssl(&i, vhost); } +LWS_VISIBLE void +lws_cancel_service_pt(struct lws *wsi) +{ + lws_plat_pipe_signal(wsi); +} + +LWS_VISIBLE void +lws_cancel_service(struct lws_context *context) +{ + struct lws_context_per_thread *pt = &context->pt[0]; + short m = context->count_threads; + + lwsl_notice("%s\n", __func__); + + while (m--) { + if (pt->pipe_wsi) + lws_plat_pipe_signal(pt->pipe_wsi); + pt++; + } +} + +int +lws_create_event_pipes(struct lws_context *context) +{ + struct lws *wsi; + int n; + + /* + * Create the pt event pipes... these are unique in that they are + * not bound to a vhost or protocol (both are NULL) + */ + + for (n = 0; n < context->count_threads; n++) { + if (context->pt[n].pipe_wsi) + continue; + + wsi = lws_zalloc(sizeof(*wsi), "event pipe wsi"); + if (!wsi) { + lwsl_err("Out of mem\n"); + return 1; + } + wsi->context = context; + wsi->mode = LWSCM_EVENT_PIPE; + wsi->protocol = NULL; + wsi->tsi = n; + wsi->vhost = NULL; + wsi->event_pipe = 1; + + if (lws_plat_pipe_create(wsi)) { + lws_free(wsi); + continue; + } + wsi->desc.sockfd = context->pt[n].dummy_pipe_fds[0]; + lwsl_debug("event pipe fd %d\n", wsi->desc.sockfd); + + context->pt[n].pipe_wsi = wsi; + + lws_libuv_accept(wsi, wsi->desc); + lws_libev_accept(wsi, wsi->desc); + lws_libevent_accept(wsi, wsi->desc); + + if (insert_wsi_socket_into_fds(context, wsi)) + return 1; + + lws_change_pollfd(context->pt[n].pipe_wsi, 0, LWS_POLLIN); + context->count_wsi_allocated++; + } + + return 0; +} + LWS_VISIBLE struct lws_context * lws_create_context(struct lws_context_creation_info *info) { @@ -1156,6 +1227,16 @@ lws_create_context(struct lws_context_creation_info *info) context->count_caps = info->count_caps; #endif + /* + * The event libs handle doing this when their event loop starts, + * if we are using the default poll() service, do it here + */ + + if (!LWS_LIBEV_ENABLED(context) && + !LWS_LIBUV_ENABLED(context) && + !LWS_LIBEVENT_ENABLED(context) && lws_create_event_pipes(context)) + goto bail; + /* * drop any root privs for this process * to listen on port < 1023 we would have needed root, but now we are @@ -1515,9 +1596,15 @@ lws_context_destroy(struct lws_context *context) if (!wsi) continue; - lws_close_free_wsi(wsi, - LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY - /* no protocol close */); + if (wsi->event_pipe) { + lws_plat_pipe_close(wsi); + remove_wsi_socket_from_fds(wsi); + lws_free(wsi); + context->count_wsi_allocated--; + } else + lws_close_free_wsi(wsi, + LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY + /* no protocol close */); n--; } lws_pt_mutex_destroy(pt); diff --git a/lib/event-libs/libev.c b/lib/event-libs/libev.c index 2410488f..9a9d6e06 100644 --- a/lib/event-libs/libev.c +++ b/lib/event-libs/libev.c @@ -92,6 +92,9 @@ lws_ev_initloop(struct lws_context *context, struct ev_loop *loop, int tsi) context->pt[tsi].io_loop_ev = loop; + if (lws_create_event_pipes(context)) + return -1; + /* * Initialize the accept w_accept with all the listening sockets * and register a callback for read operations diff --git a/lib/event-libs/libevent.c b/lib/event-libs/libevent.c index 08b5a8d3..ad12ac51 100644 --- a/lib/event-libs/libevent.c +++ b/lib/event-libs/libevent.c @@ -98,6 +98,9 @@ int tsi) context->pt[tsi].io_loop_event_base = loop; } + if (lws_create_event_pipes(context)) + return 1; + /* * Initialize all events with the listening sockets * and register a callback for read operations diff --git a/lib/event-libs/libuv.c b/lib/event-libs/libuv.c index 60c93390..e88f1cff 100644 --- a/lib/event-libs/libuv.c +++ b/lib/event-libs/libuv.c @@ -227,6 +227,9 @@ lws_uv_initloop(struct lws_context *context, uv_loop_t *loop, int tsi) } else first = 0; + if (lws_create_event_pipes(context)) + goto bail; + /* * Initialize the accept wsi read watcher with all the listening sockets * and register a callback for read operations @@ -247,6 +250,9 @@ lws_uv_initloop(struct lws_context *context, uv_loop_t *loop, int tsi) } return status; + +bail: + return -1; } static void lws_uv_close_cb(uv_handle_t *handle) @@ -325,7 +331,7 @@ lws_libuv_accept(struct lws *wsi, lws_sock_file_fd_type desc) return; wsi->w_read.context = context; - if (wsi->mode == LWSCM_RAW_FILEDESC) + if (wsi->mode == LWSCM_RAW_FILEDESC || wsi->event_pipe) uv_poll_init(pt->io_loop_uv, &wsi->w_read.uv_watcher, (int)desc.filefd); else diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index c371ad95..91bf5173 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -85,7 +85,7 @@ lws_free_wsi(struct lws *wsi) lws_header_table_force_to_detachable_state(wsi); lws_header_table_detach(wsi, 0); - if (wsi->vhost->lserv_wsi == wsi) + if (wsi->vhost && wsi->vhost->lserv_wsi == wsi) wsi->vhost->lserv_wsi = NULL; lws_pt_lock(pt); @@ -822,8 +822,9 @@ lws_close_free_wsi_final(struct lws *wsi) } /* outermost destroy notification for wsi (user_space still intact) */ - wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_WSI_DESTROY, - wsi->user_space, NULL, 0); + if (wsi->vhost) + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_WSI_DESTROY, + wsi->user_space, NULL, 0); #ifdef LWS_WITH_CGI if (wsi->cgi) { diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index 6721aa26..9c756d30 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -1416,6 +1416,12 @@ enum lws_callback_reasons { LWS_CALLBACK_CGI_PROCESS_ATTACH = 70, /**< CGI: Sent when the CGI process is spawned for the wsi. The * len parameter is the PID of the child process */ + LWS_CALLBACK_EVENT_WAIT_CANCELLED = 71, + /**< This is sent to every protocol of every vhost in response + * to lws_cancel_service() or lws_cancel_service_pt(). This + * callback is serialized in the lws event loop normally, even + * if the lws_cancel_service[_pt]() call was from a different + * thread. */ /****** add new things just above ---^ ******/ @@ -3440,15 +3446,8 @@ lws_service_tsi(struct lws_context *context, int timeout_ms, int tsi); * on one thread * \param wsi: Cancel service on the thread this wsi is serviced by * - * This function lets a call to lws_service() waiting for a timeout - * immediately return. - * - * It works by creating a phony event and then swallowing it silently. - * - * The reason it may be needed is when waiting in poll(), changes to - * the event masks are ignored by the OS until poll() is reentered. This - * lets you halt the poll() wait and make the reentry happen immediately - * instead of having the wait out the rest of the poll timeout. + * Same as lws_cancel_service(), but targets a single service thread, the one + * the wsi belongs to. You probably want to use lws_cancel_service() instead. */ LWS_VISIBLE LWS_EXTERN void lws_cancel_service_pt(struct lws *wsi); @@ -3457,12 +3456,13 @@ lws_cancel_service_pt(struct lws *wsi); * lws_cancel_service() - Cancel wait for new pending socket activity * \param context: Websocket context * - * This function let a call to lws_service() waiting for a timeout - * immediately return. + * This function creates an immediate "synchronous interrupt" to the lws poll() + * wait or event loop. As soon as possible in the serialzed service sequencing, + * a LWS_CALLBACK_EVENT_WAIT_CANCELLED callback is sent to every protocol on + * every vhost. * - * What it basically does is provide a fake event that will be swallowed, - * so the wait in poll() is ended. That's useful because poll() doesn't - * attend to changes in POLLIN/OUT/ERR until it re-enters the wait. + * lws_cancel_service() may be called from another thread while the context + * exists, and its effect will be immediately serialized. */ LWS_VISIBLE LWS_EXTERN void lws_cancel_service(struct lws_context *context); diff --git a/lib/plat/lws-plat-esp32.c b/lib/plat/lws-plat-esp32.c index 58b5281f..88ab7682 100644 --- a/lib/plat/lws-plat-esp32.c +++ b/lib/plat/lws-plat-esp32.c @@ -317,6 +317,24 @@ lws_plat_drop_app_privileges(struct lws_context_creation_info *info) { } + +int +lws_plat_pipe_create(struct lws *wsi) +{ + return 1; +} + +int +lws_plat_pipe_signal(struct lws *wsi) +{ + return 1; +} + +void +lws_plat_pipe_close(struct lws *wsi) +{ +} + LWS_VISIBLE int lws_plat_context_early_init(void) { diff --git a/lib/plat/lws-plat-esp8266.c b/lib/plat/lws-plat-esp8266.c index 90ddbdf3..60623e3d 100644 --- a/lib/plat/lws-plat-esp8266.c +++ b/lib/plat/lws-plat-esp8266.c @@ -41,6 +41,24 @@ time_t time(time_t *tloc) return 0; } + +int +lws_plat_pipe_create(struct lws *wsi) +{ + return 1; +} + +int +lws_plat_pipe_signal(struct lws *wsi) +{ + return 1; +} + +void +lws_plat_pipe_close(struct lws *wsi) +{ +} + LWS_VISIBLE int lws_get_random(struct lws_context *context, void *buf, int len) { @@ -171,16 +189,6 @@ lws_poll_listen_fd(struct lws_pollfd *fd) return 0; } -LWS_VISIBLE void -lws_cancel_service_pt(struct lws *wsi) -{ -} - -LWS_VISIBLE void -lws_cancel_service(struct lws_context *context) -{ -} - LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line) { extern void output_redirect(const char *str); diff --git a/lib/plat/lws-plat-optee.c b/lib/plat/lws-plat-optee.c index 55677af6..4863ba05 100644 --- a/lib/plat/lws-plat-optee.c +++ b/lib/plat/lws-plat-optee.c @@ -4,6 +4,23 @@ * included from libwebsockets.c for OPTEE builds */ +int +lws_plat_pipe_create(struct lws *wsi) +{ + return 1; +} + +int +lws_plat_pipe_signal(struct lws *wsi) +{ + return 1; +} + +void +lws_plat_pipe_close(struct lws *wsi) +{ +} + void TEE_GenerateRandom(void *randomBuffer, uint32_t randomBufferLen); unsigned long long time_in_microseconds(void) @@ -52,32 +69,6 @@ lws_poll_listen_fd(struct lws_pollfd *fd) return 0; } -LWS_VISIBLE void -lws_cancel_service_pt(struct lws *wsi) -{ -#if 0 - struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; - char buf = 0; - - if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1) - lwsl_err("Cannot write to dummy pipe"); -#endif -} - -LWS_VISIBLE void -lws_cancel_service(struct lws_context *context) -{ -#if 0 - struct lws_context_per_thread *pt = &context->pt[0]; - char buf = 0, m = context->count_threads; - - while (m--) { - if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1) - lwsl_err("Cannot write to dummy pipe"); - pt++; - } -#endif -} #if 0 LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line) { diff --git a/lib/plat/lws-plat-unix.c b/lib/plat/lws-plat-unix.c index 068222bb..6e824a34 100644 --- a/lib/plat/lws-plat-unix.c +++ b/lib/plat/lws-plat-unix.c @@ -29,6 +29,41 @@ #endif #include +int +lws_plat_pipe_create(struct lws *wsi) +{ + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + + return pipe(pt->dummy_pipe_fds); +} + +int +lws_plat_pipe_signal(struct lws *wsi) +{ + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + char buf = 0; + int n; + + n = write(pt->dummy_pipe_fds[1], &buf, 1); + + lwsl_debug("%s: fd %d %d\n", __func__, pt->dummy_pipe_fds[1], n); + + return n != 1; +} + +void +lws_plat_pipe_close(struct lws *wsi) +{ + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + + if (pt->dummy_pipe_fds[0] && pt->dummy_pipe_fds[0] != -1) + close(pt->dummy_pipe_fds[0]); + if (pt->dummy_pipe_fds[1] && pt->dummy_pipe_fds[1] != -1) + close(pt->dummy_pipe_fds[1]); + + pt->dummy_pipe_fds[0] = pt->dummy_pipe_fds[1] = -1; +} + unsigned long long time_in_microseconds(void) { struct timeval tv; @@ -77,29 +112,6 @@ lws_poll_listen_fd(struct lws_pollfd *fd) return poll(fd, 1, 0); } -LWS_VISIBLE void -lws_cancel_service_pt(struct lws *wsi) -{ - struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; - char buf = 0; - - if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1) - lwsl_err("Cannot write to dummy pipe"); -} - -LWS_VISIBLE void -lws_cancel_service(struct lws_context *context) -{ - struct lws_context_per_thread *pt = &context->pt[0]; - char buf = 0, m = context->count_threads; - - while (m--) { - if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1) - lwsl_err("Cannot write to dummy pipe"); - pt++; - } -} - LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line) { int syslog_level = LOG_DEBUG; @@ -126,7 +138,6 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) { struct lws_context_per_thread *pt; int n = -1, m, c; - char buf; /* stay dead once we are dead */ @@ -200,12 +211,6 @@ faked_service: c--; - if (pt->fds[n].fd == pt->dummy_pipe_fds[0]) { - if (read(pt->fds[n].fd, &buf, 1) != 1) - lwsl_err("Cannot read from dummy pipe."); - continue; - } - m = lws_service_fd_tsi(context, &pt->fds[n], tsi); if (m < 0) return -1; @@ -543,9 +548,6 @@ lws_plat_context_early_destroy(struct lws_context *context) LWS_VISIBLE void lws_plat_context_late_destroy(struct lws_context *context) { - struct lws_context_per_thread *pt = &context->pt[0]; - int m = context->count_threads; - #ifdef LWS_WITH_PLUGINS if (context->plugin_list) lws_plat_plugins_destroy(context); @@ -554,13 +556,6 @@ lws_plat_context_late_destroy(struct lws_context *context) if (context->lws_lookup) lws_free(context->lws_lookup); - while (m--) { - if (pt->dummy_pipe_fds[0]) - close(pt->dummy_pipe_fds[0]); - if (pt->dummy_pipe_fds[1]) - close(pt->dummy_pipe_fds[1]); - pt++; - } if (!context->fd_random) lwsl_err("ZERO RANDOM FD\n"); if (context->fd_random != LWS_INVALID_FILE) @@ -794,13 +789,11 @@ _lws_plat_file_write(lws_fop_fd_t fop_fd, lws_filepos_t *amount, return 0; } - LWS_VISIBLE int lws_plat_init(struct lws_context *context, struct lws_context_creation_info *info) { - struct lws_context_per_thread *pt = &context->pt[0]; - int n = context->count_threads, fd; + int fd; /* master context has the global fd lookup array */ context->lws_lookup = lws_zalloc(sizeof(struct lws *) * @@ -822,25 +815,9 @@ lws_plat_init(struct lws_context *context, return 1; } - if (!lws_libev_init_fd_table(context) && - !lws_libuv_init_fd_table(context) && - !lws_libevent_init_fd_table(context)) { - /* otherwise libev/uv/event handled it instead */ - - while (n--) { - if (pipe(pt->dummy_pipe_fds)) { - lwsl_err("Unable to create pipe\n"); - return 1; - } - - /* use the read end of pipe as first item */ - pt->fds[0].fd = pt->dummy_pipe_fds[0]; - pt->fds[0].events = LWS_POLLIN; - pt->fds[0].revents = 0; - pt->fds_count = 1; - pt++; - } - } + (void)lws_libev_init_fd_table(context); + (void)lws_libuv_init_fd_table(context); + (void)lws_libevent_init_fd_table(context); #ifdef LWS_WITH_PLUGINS if (info->plugin_dirs) diff --git a/lib/plat/lws-plat-win.c b/lib/plat/lws-plat-win.c index f5b178ce..8f1ecf75 100644 --- a/lib/plat/lws-plat-win.c +++ b/lib/plat/lws-plat-win.c @@ -3,6 +3,27 @@ #endif #include "private-libwebsockets.h" +int +lws_plat_pipe_create(struct lws *wsi) +{ + return 1; +} + +int +lws_plat_pipe_signal(struct lws *wsi) +{ + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + + WSASetEvent(pt->events[0]); + + return 0; +} + +void +lws_plat_pipe_close(struct lws *wsi) +{ +} + unsigned long long time_in_microseconds() { @@ -81,7 +102,7 @@ delete_from_fd(struct lws_context *context, lws_sockfd_type fd) if (context->fd_hashtable[h].wsi[n]->desc.sockfd == fd) { while (n < context->fd_hashtable[h].length) { context->fd_hashtable[h].wsi[n] = - context->fd_hashtable[h].wsi[n + 1]; + context->fd_hashtable[h].wsi[n + 1]; n++; } context->fd_hashtable[h].length--; @@ -94,8 +115,8 @@ delete_from_fd(struct lws_context *context, lws_sockfd_type fd) return 1; } -LWS_VISIBLE int lws_get_random(struct lws_context *context, - void *buf, int len) +LWS_VISIBLE int +lws_get_random(struct lws_context *context, void *buf, int len) { int n; char *p = (char *)buf; @@ -106,7 +127,8 @@ LWS_VISIBLE int lws_get_random(struct lws_context *context, return n; } -LWS_VISIBLE int lws_send_pipe_choked(struct lws *wsi) +LWS_VISIBLE int +lws_send_pipe_choked(struct lws *wsi) { /* treat the fact we got a truncated send pending as if we're choked */ if (wsi->trunc_len) @@ -115,7 +137,8 @@ LWS_VISIBLE int lws_send_pipe_choked(struct lws *wsi) return (int)wsi->sock_send_blocking; } -LWS_VISIBLE int lws_poll_listen_fd(struct lws_pollfd *fd) +LWS_VISIBLE int +lws_poll_listen_fd(struct lws_pollfd *fd) { fd_set readfds; struct timeval tv = { 0, 0 }; @@ -129,25 +152,7 @@ LWS_VISIBLE int lws_poll_listen_fd(struct lws_pollfd *fd) } LWS_VISIBLE void -lws_cancel_service(struct lws_context *context) -{ - struct lws_context_per_thread *pt = &context->pt[0]; - int n = context->count_threads; - - while (n--) { - WSASetEvent(pt->events[0]); - pt++; - } -} - -LWS_VISIBLE void -lws_cancel_service_pt(struct lws *wsi) -{ - struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; - WSASetEvent(pt->events[0]); -} - -LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line) +lwsl_emit_syslog(int level, const char *line) { lwsl_emit_stderr(level, line); } @@ -182,9 +187,8 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) context->service_tid_detected = 1; } - if (timeout_ms < 0) - { - if (lws_service_flag_pending(context, tsi)) { + if (timeout_ms < 0) { + if (lws_service_flag_pending(context, tsi)) { /* any socket with events to service? */ for (n = 0; n < (int)pt->fds_count; n++) { if (!pt->fds[n].revents) @@ -220,6 +224,9 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) if (n) i--; + /* + * any wsi has truncated, force him signalled + */ if (wsi->trunc_len) WSASetEvent(pt->events[0]); } @@ -236,29 +243,30 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) timeout_ms = 0; } - ev = WSAWaitForMultipleEvents( 1, pt->events , FALSE, timeout_ms, FALSE); + ev = WSAWaitForMultipleEvents(1, pt->events, FALSE, timeout_ms, FALSE); if (ev == WSA_WAIT_EVENT_0) { - unsigned int eIdx; + unsigned int eIdx, err; WSAResetEvent(pt->events[0]); for (eIdx = 0; eIdx < pt->fds_count; ++eIdx) { - if (WSAEnumNetworkEvents(pt->fds[eIdx].fd, 0, &networkevents) == SOCKET_ERROR) { - lwsl_err("WSAEnumNetworkEvents() failed with error %d\n", LWS_ERRNO); + if (WSAEnumNetworkEvents(pt->fds[eIdx].fd, 0, + &networkevents) == SOCKET_ERROR) { + lwsl_err("WSAEnumNetworkEvents() failed " + "with error %d\n", LWS_ERRNO); return -1; } pfd = &pt->fds[eIdx]; pfd->revents = (short)networkevents.lNetworkEvents; + err = networkevents.iErrorCode[FD_CONNECT_BIT]; + if ((networkevents.lNetworkEvents & FD_CONNECT) && - networkevents.iErrorCode[FD_CONNECT_BIT] && - networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EALREADY && - networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EINPROGRESS && - networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EWOULDBLOCK && - networkevents.iErrorCode[FD_CONNECT_BIT] != WSAEINVAL) { - lwsl_debug("Unable to connect errno=%d\n", - networkevents.iErrorCode[FD_CONNECT_BIT]); + err && err != LWS_EALREADY && + err != LWS_EINPROGRESS && err != LWS_EWOULDBLOCK && + err != WSAEINVAL) { + lwsl_debug("Unable to connect errno=%d\n", err); pfd->revents |= LWS_POLLHUP; } @@ -269,20 +277,18 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi) } /* if something closed, retry this slot */ if (pfd->revents & LWS_POLLHUP) - --eIdx; + --eIdx; - if( pfd->revents != 0 ) { + if (pfd->revents) lws_service_fd_tsi(context, pfd, tsi); - - } } } context->service_tid = 0; - if (ev == WSA_WAIT_TIMEOUT) { + if (ev == WSA_WAIT_TIMEOUT) lws_service_fd(context, NULL); - } + return 0;; } @@ -309,7 +315,7 @@ lws_plat_set_socket_options(struct lws_vhost *vhost, lws_sockfd_type fd) /* enable keepalive on this socket */ optval = 1; if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, - (const char *)&optval, optlen) < 0) + (const char *)&optval, optlen) < 0) return 1; alive.onoff = TRUE; @@ -317,7 +323,7 @@ lws_plat_set_socket_options(struct lws_vhost *vhost, lws_sockfd_type fd) alive.keepaliveinterval = vhost->ka_interval; if (WSAIoctl(fd, SIO_KEEPALIVE_VALS, &alive, sizeof(alive), - NULL, 0, &dwBytesRet, NULL, NULL)) + NULL, 0, &dwBytesRet, NULL, NULL)) return 1; } diff --git a/lib/pollfd.c b/lib/pollfd.c index 8bd887fa..6af37a9a 100644 --- a/lib/pollfd.c +++ b/lib/pollfd.c @@ -60,13 +60,15 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa) pfd = &pt->fds[wsi->position_in_fds_table]; pa->fd = wsi->desc.sockfd; + lwsl_debug("%s: fd %d old events %d\n", __func__, pa->fd, pfd->events); pa->prev_events = pfd->events; pa->events = pfd->events = (pfd->events & ~_and) | _or; if (wsi->http2_substream) return 0; - if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD, + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD, wsi->user_space, (void *)pa, 0)) { ret = -1; goto bail; @@ -113,7 +115,7 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa) } sampled_tid = context->service_tid; - if (sampled_tid) { + if (sampled_tid && wsi->vhost) { tid = wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0); if (tid == -1) { @@ -176,10 +178,11 @@ insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi) #endif assert(wsi); - assert(wsi->vhost); + assert(wsi->event_pipe || wsi->vhost); assert(lws_socket_is_valid(wsi->desc.sockfd)); - if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, wsi->user_space, (void *) &pa, 1)) return -1; @@ -202,7 +205,8 @@ insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi) lws_plat_insert_socket_into_fds(context, wsi); /* external POLL support via protocol 0 */ - if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_ADD_POLL_FD, + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_ADD_POLL_FD, wsi->user_space, (void *) &pa, 0)) ret = -1; #ifndef LWS_NO_SERVER @@ -212,7 +216,8 @@ insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi) #endif lws_pt_unlock(pt); - if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, wsi->user_space, (void *)&pa, 1)) ret = -1; @@ -244,7 +249,8 @@ remove_wsi_socket_from_fds(struct lws *wsi) } #endif - if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, wsi->user_space, (void *)&pa, 1)) return -1; @@ -287,19 +293,21 @@ remove_wsi_socket_from_fds(struct lws *wsi) wsi->position_in_fds_table = -1; /* remove also from external POLL support via protocol 0 */ - if (lws_socket_is_valid(wsi->desc.sockfd)) - if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_DEL_POLL_FD, - wsi->user_space, (void *) &pa, 0)) - ret = -1; + if (lws_socket_is_valid(wsi->desc.sockfd) && wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_DEL_POLL_FD, + wsi->user_space, (void *) &pa, 0)) + ret = -1; + #ifndef LWS_NO_SERVER - if (!context->being_destroyed) - /* if this made some room, accept connects on this thread */ - if ((unsigned int)pt->fds_count < context->fd_limit_per_thread - 1) - lws_accept_modulation(pt, 1); + if (!context->being_destroyed && + /* if this made some room, accept connects on this thread */ + (unsigned int)pt->fds_count < context->fd_limit_per_thread - 1) + lws_accept_modulation(pt, 1); #endif lws_pt_unlock(pt); - if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, wsi->user_space, (void *) &pa, 1)) ret = -1; #endif @@ -314,14 +322,16 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or) struct lws_pollargs pa; int ret = 0; - if (!wsi || !wsi->protocol || wsi->position_in_fds_table < 0) + if (!wsi || (!wsi->protocol && !wsi->event_pipe) || + wsi->position_in_fds_table < 0) return 1; context = lws_get_context(wsi); if (!context) return 1; - if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL, wsi->user_space, (void *) &pa, 0)) return -1; @@ -330,7 +340,8 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or) lws_pt_lock(pt); ret = _lws_change_pollfd(wsi, _and, _or, &pa); lws_pt_unlock(pt); - if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, + if (wsi->vhost && + wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL, wsi->user_space, (void *) &pa, 0)) ret = -1; diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 144258a9..a723d162 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -634,6 +634,7 @@ enum connection_mode { LWSCM_CGI, /* stdin, stdout, stderr for another cgi master wsi */ LWSCM_RAW, /* raw with bulk handling */ LWSCM_RAW_FILEDESC, /* raw without bulk handling */ + LWSCM_EVENT_PIPE, /* event pipe with no vhost or protocol binding */ /* HTTP Client related */ LWSCM_HTTP_CLIENT = LWSCM_FLAG_IMPLIES_CALLBACK_CLOSED_CLIENT_HTTP, @@ -894,9 +895,10 @@ struct lws_context_per_thread { unsigned char *serv_buf; #ifdef _WIN32 WSAEVENT *events; -#else - lws_sockfd_type dummy_pipe_fds[2]; #endif + lws_sockfd_type dummy_pipe_fds[2]; + struct lws *pipe_wsi; + unsigned int fds_count; uint32_t ah_pool_length; @@ -1993,6 +1995,7 @@ struct lws { unsigned int cgi_stdout_zero_length:1; unsigned int seen_zero_length_recv:1; unsigned int rxflow_will_be_applied:1; + unsigned int event_pipe:1; #if defined(LWS_WITH_ESP8266) unsigned int pending_send_completion:3; @@ -2616,6 +2619,15 @@ void lws_free(void *p); #define lws_free_set_NULL(P) do { lws_realloc(P, 0, "free"); (P) = NULL; } while(0) #endif +int +lws_plat_pipe_create(struct lws *wsi); +int +lws_plat_pipe_signal(struct lws *wsi); +void +lws_plat_pipe_close(struct lws *wsi); +int +lws_create_event_pipes(struct lws_context *context); + const struct lws_plat_file_ops * lws_vfs_select_fops(const struct lws_plat_file_ops *fops, const char *vfs_path, const char **vpath); diff --git a/lib/service.c b/lib/service.c index 81321a26..ec3b14f2 100644 --- a/lib/service.c +++ b/lib/service.c @@ -1167,6 +1167,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t /* no, here to service a socket descriptor */ wsi = wsi_from_fd(context, pollfd->fd); + if (!wsi) /* not lws connection ... leave revents alone and return */ return 0; @@ -1221,6 +1222,47 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t /* okay, what we came here to do... */ switch (wsi->mode) { + case LWSCM_EVENT_PIPE: + { + struct lws_vhost *v = context->vhost_list; + char s[10]; + + /* + * discard the byte(s) that signaled us + * We really don't care about the number of bytes, but coverity + * thinks we should. + */ + n = read(wsi->desc.sockfd, s, sizeof(s)); + (void)n; + if (n < 0) + goto close_and_handled; + + /* + * the poll() wait, or the event loop for libuv etc is a + * process-wide resource that we interrupted. So let every + * protocol that may be interested in the pipe event know that + * it happened. + */ + while (v) { + const struct lws_protocols *p = v->protocols; + wsi->vhost = v; + + for (n = 0; n < v->count_protocols; n++) { + wsi->protocol = p; + if (p->callback && p->callback(wsi, + LWS_CALLBACK_EVENT_WAIT_CANCELLED, + NULL, NULL, 0)) { + lwsl_info("closed in event cancel\n"); + goto close_and_handled; + } + p++; + } + v = v->vhost_next; + } + wsi->vhost = NULL; + wsi->protocol = NULL; + goto handled; + } case LWSCM_HTTP_SERVING: case LWSCM_HTTP_CLIENT: case LWSCM_HTTP_SERVING_ACCEPTED: diff --git a/plugins/protocol_lws_mirror.c b/plugins/protocol_lws_mirror.c index 90b5faa9..8affca99 100644 --- a/plugins/protocol_lws_mirror.c +++ b/plugins/protocol_lws_mirror.c @@ -402,6 +402,10 @@ req_writable: mirror_callback_all_in_mi_on_writable(pss->mi); break; + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + lwsl_notice("LWS_CALLBACK_EVENT_WAIT_CANCELLED\n"); + break; + default: break; } diff --git a/test-apps/test-server-libuv.c b/test-apps/test-server-libuv.c index c8b305f6..de9c11e5 100644 --- a/test-apps/test-server-libuv.c +++ b/test-apps/test-server-libuv.c @@ -168,6 +168,15 @@ static void timer_cb(uv_timer_t *t) } } +static void timer_test_cancel_cb(uv_timer_t *h) +{ + if (context) { + lwsl_notice("(doing cancel test)\n"); + lws_cancel_service(context); + } +} + + static void timer_close_cb(uv_handle_t *h) { lwsl_notice("timer close cb %p, loop has %d handles\n", @@ -205,6 +214,7 @@ int main(int argc, char **argv) uv_timer_t timer_outer; struct counter ctr; int foreign_libuv_loop = 0; + uv_timer_t timer_test_cancel; /* <--- only needed for foreign loop test --- */ #endif const char *iface = NULL; @@ -351,6 +361,9 @@ int main(int argc, char **argv) uv_signal_init(&loop, &signal_outer); uv_signal_start(&signal_outer, outer_signal_cb, SIGINT); + uv_timer_init(&loop, &timer_test_cancel); + uv_timer_start(&timer_test_cancel, timer_test_cancel_cb, 2000, 2000); + uv_timer_init(&loop, &timer_outer); timer_outer.data = &ctr; ctr.cur = 0; @@ -374,10 +387,10 @@ int main(int argc, char **argv) lws_uv_sigint_cfg(context, 1, signal_cb); #if UV_VERSION_MAJOR > 0 - if (foreign_libuv_loop) + if (foreign_libuv_loop) { /* we have our own uv loop outside of lws */ lws_uv_initloop(context, &loop, 0); - else + } else #endif { /* @@ -438,6 +451,7 @@ int main(int argc, char **argv) * outside of lws */ uv_timer_stop(&timer_outer); + uv_timer_stop(&timer_test_cancel); uv_close((uv_handle_t*)&timer_outer, timer_close_cb); uv_signal_stop(&signal_outer); diff --git a/test-apps/test-server.c b/test-apps/test-server.c index 1d36f9e0..105d4881 100644 --- a/test-apps/test-server.c +++ b/test-apps/test-server.c @@ -174,6 +174,7 @@ void sighandler(int sig) * port + 1 */ dynamic_vhost_enable ^= 1; + lws_cancel_service(context); lwsl_notice("SIGUSR1: dynamic_vhost_enable: %d\n", dynamic_vhost_enable); return;