1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

cancel_service: change to event-loop agnostic implementation

- Add platform helpers for pipe creation.

 - Change the direct-to-fds implementation to create a wsi for each
   pt and use the normal apis to bind it to the event loop.

 - Modifiy context creation and destroy to create and remove the
   event pipe wsis.

 - Create the event pipe wsis during context create if using the
   default poll() event loop, or when the other event loops start
   otherwise.

 - Add handler that calls back user code with
   LWS_CALLBACK_EVENT_WAIT_CANCELLED

This patch allows you to call `lws_cancel_service(struct lws_context *context)`
from another thread.

It's very cheap for the other thread to call and is safe without
locking.

Every use protocol receives a LWS_CALLBACK_EVENT_WAIT_CANCELLED from
the main thread serialized normally in the event loop.
This commit is contained in:
Andy Green 2017-10-24 11:59:44 +08:00
parent 97cd393649
commit 74fddbc09e
18 changed files with 384 additions and 192 deletions

View file

@ -132,7 +132,7 @@ with the socket closing and the `wsi` freed.
Websocket write activities should only take place in the
`LWS_CALLBACK_SERVER_WRITEABLE` callback as described below.
[This network-programming necessity to link the issue of new data to
This network-programming necessity to link the issue of new data to
the peer taking the previous data is not obvious to all users so let's
repeat that in other words:
@ -155,12 +155,20 @@ websocket ones, you can combine them together with the websocket ones
in one poll loop, see "External Polling Loop support" below, and
still do it all in one thread / process context.
If you insist on trying to use it from multiple threads, take special care if
you might simultaneously create more than one context from different threads.
SSL_library_init() is called from the context create api and it also is not
reentrant. So at least create the contexts sequentially.
If you must interoperate with other threads, you can use `lws_cancel_service()`
to notify lws that something has happened on another thread. lws will send
`LWS_CALLBACK_EVENT_WAIT_CANCELLED` events to all protocols, serialized with
the main event loop operations, ie, safely.
You can handle this callback to check the reason you were notified and take
action using any usual lws api, since you are in a callback in the normal
service thread.
`lws_cancel_service()` is very cheap for the other thread to call.
@section closing Closing connections from the user side
When you want to close a connection, you do it by returning `-1` from a

View file

@ -834,6 +834,77 @@ lws_init_vhost_client_ssl(const struct lws_context_creation_info *info,
return lws_context_init_client_ssl(&i, vhost);
}
LWS_VISIBLE void
lws_cancel_service_pt(struct lws *wsi)
{
lws_plat_pipe_signal(wsi);
}
LWS_VISIBLE void
lws_cancel_service(struct lws_context *context)
{
struct lws_context_per_thread *pt = &context->pt[0];
short m = context->count_threads;
lwsl_notice("%s\n", __func__);
while (m--) {
if (pt->pipe_wsi)
lws_plat_pipe_signal(pt->pipe_wsi);
pt++;
}
}
int
lws_create_event_pipes(struct lws_context *context)
{
struct lws *wsi;
int n;
/*
* Create the pt event pipes... these are unique in that they are
* not bound to a vhost or protocol (both are NULL)
*/
for (n = 0; n < context->count_threads; n++) {
if (context->pt[n].pipe_wsi)
continue;
wsi = lws_zalloc(sizeof(*wsi), "event pipe wsi");
if (!wsi) {
lwsl_err("Out of mem\n");
return 1;
}
wsi->context = context;
wsi->mode = LWSCM_EVENT_PIPE;
wsi->protocol = NULL;
wsi->tsi = n;
wsi->vhost = NULL;
wsi->event_pipe = 1;
if (lws_plat_pipe_create(wsi)) {
lws_free(wsi);
continue;
}
wsi->desc.sockfd = context->pt[n].dummy_pipe_fds[0];
lwsl_debug("event pipe fd %d\n", wsi->desc.sockfd);
context->pt[n].pipe_wsi = wsi;
lws_libuv_accept(wsi, wsi->desc);
lws_libev_accept(wsi, wsi->desc);
lws_libevent_accept(wsi, wsi->desc);
if (insert_wsi_socket_into_fds(context, wsi))
return 1;
lws_change_pollfd(context->pt[n].pipe_wsi, 0, LWS_POLLIN);
context->count_wsi_allocated++;
}
return 0;
}
LWS_VISIBLE struct lws_context *
lws_create_context(struct lws_context_creation_info *info)
{
@ -1156,6 +1227,16 @@ lws_create_context(struct lws_context_creation_info *info)
context->count_caps = info->count_caps;
#endif
/*
* The event libs handle doing this when their event loop starts,
* if we are using the default poll() service, do it here
*/
if (!LWS_LIBEV_ENABLED(context) &&
!LWS_LIBUV_ENABLED(context) &&
!LWS_LIBEVENT_ENABLED(context) && lws_create_event_pipes(context))
goto bail;
/*
* drop any root privs for this process
* to listen on port < 1023 we would have needed root, but now we are
@ -1515,9 +1596,15 @@ lws_context_destroy(struct lws_context *context)
if (!wsi)
continue;
lws_close_free_wsi(wsi,
LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY
/* no protocol close */);
if (wsi->event_pipe) {
lws_plat_pipe_close(wsi);
remove_wsi_socket_from_fds(wsi);
lws_free(wsi);
context->count_wsi_allocated--;
} else
lws_close_free_wsi(wsi,
LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY
/* no protocol close */);
n--;
}
lws_pt_mutex_destroy(pt);

View file

@ -92,6 +92,9 @@ lws_ev_initloop(struct lws_context *context, struct ev_loop *loop, int tsi)
context->pt[tsi].io_loop_ev = loop;
if (lws_create_event_pipes(context))
return -1;
/*
* Initialize the accept w_accept with all the listening sockets
* and register a callback for read operations

View file

@ -98,6 +98,9 @@ int tsi)
context->pt[tsi].io_loop_event_base = loop;
}
if (lws_create_event_pipes(context))
return 1;
/*
* Initialize all events with the listening sockets
* and register a callback for read operations

View file

@ -227,6 +227,9 @@ lws_uv_initloop(struct lws_context *context, uv_loop_t *loop, int tsi)
} else
first = 0;
if (lws_create_event_pipes(context))
goto bail;
/*
* Initialize the accept wsi read watcher with all the listening sockets
* and register a callback for read operations
@ -247,6 +250,9 @@ lws_uv_initloop(struct lws_context *context, uv_loop_t *loop, int tsi)
}
return status;
bail:
return -1;
}
static void lws_uv_close_cb(uv_handle_t *handle)
@ -325,7 +331,7 @@ lws_libuv_accept(struct lws *wsi, lws_sock_file_fd_type desc)
return;
wsi->w_read.context = context;
if (wsi->mode == LWSCM_RAW_FILEDESC)
if (wsi->mode == LWSCM_RAW_FILEDESC || wsi->event_pipe)
uv_poll_init(pt->io_loop_uv, &wsi->w_read.uv_watcher,
(int)desc.filefd);
else

View file

@ -85,7 +85,7 @@ lws_free_wsi(struct lws *wsi)
lws_header_table_force_to_detachable_state(wsi);
lws_header_table_detach(wsi, 0);
if (wsi->vhost->lserv_wsi == wsi)
if (wsi->vhost && wsi->vhost->lserv_wsi == wsi)
wsi->vhost->lserv_wsi = NULL;
lws_pt_lock(pt);
@ -822,8 +822,9 @@ lws_close_free_wsi_final(struct lws *wsi)
}
/* outermost destroy notification for wsi (user_space still intact) */
wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_WSI_DESTROY,
wsi->user_space, NULL, 0);
if (wsi->vhost)
wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_WSI_DESTROY,
wsi->user_space, NULL, 0);
#ifdef LWS_WITH_CGI
if (wsi->cgi) {

View file

@ -1416,6 +1416,12 @@ enum lws_callback_reasons {
LWS_CALLBACK_CGI_PROCESS_ATTACH = 70,
/**< CGI: Sent when the CGI process is spawned for the wsi. The
* len parameter is the PID of the child process */
LWS_CALLBACK_EVENT_WAIT_CANCELLED = 71,
/**< This is sent to every protocol of every vhost in response
* to lws_cancel_service() or lws_cancel_service_pt(). This
* callback is serialized in the lws event loop normally, even
* if the lws_cancel_service[_pt]() call was from a different
* thread. */
/****** add new things just above ---^ ******/
@ -3440,15 +3446,8 @@ lws_service_tsi(struct lws_context *context, int timeout_ms, int tsi);
* on one thread
* \param wsi: Cancel service on the thread this wsi is serviced by
*
* This function lets a call to lws_service() waiting for a timeout
* immediately return.
*
* It works by creating a phony event and then swallowing it silently.
*
* The reason it may be needed is when waiting in poll(), changes to
* the event masks are ignored by the OS until poll() is reentered. This
* lets you halt the poll() wait and make the reentry happen immediately
* instead of having the wait out the rest of the poll timeout.
* Same as lws_cancel_service(), but targets a single service thread, the one
* the wsi belongs to. You probably want to use lws_cancel_service() instead.
*/
LWS_VISIBLE LWS_EXTERN void
lws_cancel_service_pt(struct lws *wsi);
@ -3457,12 +3456,13 @@ lws_cancel_service_pt(struct lws *wsi);
* lws_cancel_service() - Cancel wait for new pending socket activity
* \param context: Websocket context
*
* This function let a call to lws_service() waiting for a timeout
* immediately return.
* This function creates an immediate "synchronous interrupt" to the lws poll()
* wait or event loop. As soon as possible in the serialzed service sequencing,
* a LWS_CALLBACK_EVENT_WAIT_CANCELLED callback is sent to every protocol on
* every vhost.
*
* What it basically does is provide a fake event that will be swallowed,
* so the wait in poll() is ended. That's useful because poll() doesn't
* attend to changes in POLLIN/OUT/ERR until it re-enters the wait.
* lws_cancel_service() may be called from another thread while the context
* exists, and its effect will be immediately serialized.
*/
LWS_VISIBLE LWS_EXTERN void
lws_cancel_service(struct lws_context *context);

View file

@ -317,6 +317,24 @@ lws_plat_drop_app_privileges(struct lws_context_creation_info *info)
{
}
int
lws_plat_pipe_create(struct lws *wsi)
{
return 1;
}
int
lws_plat_pipe_signal(struct lws *wsi)
{
return 1;
}
void
lws_plat_pipe_close(struct lws *wsi)
{
}
LWS_VISIBLE int
lws_plat_context_early_init(void)
{

View file

@ -41,6 +41,24 @@ time_t time(time_t *tloc)
return 0;
}
int
lws_plat_pipe_create(struct lws *wsi)
{
return 1;
}
int
lws_plat_pipe_signal(struct lws *wsi)
{
return 1;
}
void
lws_plat_pipe_close(struct lws *wsi)
{
}
LWS_VISIBLE int
lws_get_random(struct lws_context *context, void *buf, int len)
{
@ -171,16 +189,6 @@ lws_poll_listen_fd(struct lws_pollfd *fd)
return 0;
}
LWS_VISIBLE void
lws_cancel_service_pt(struct lws *wsi)
{
}
LWS_VISIBLE void
lws_cancel_service(struct lws_context *context)
{
}
LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
{
extern void output_redirect(const char *str);

View file

@ -4,6 +4,23 @@
* included from libwebsockets.c for OPTEE builds
*/
int
lws_plat_pipe_create(struct lws *wsi)
{
return 1;
}
int
lws_plat_pipe_signal(struct lws *wsi)
{
return 1;
}
void
lws_plat_pipe_close(struct lws *wsi)
{
}
void TEE_GenerateRandom(void *randomBuffer, uint32_t randomBufferLen);
unsigned long long time_in_microseconds(void)
@ -52,32 +69,6 @@ lws_poll_listen_fd(struct lws_pollfd *fd)
return 0;
}
LWS_VISIBLE void
lws_cancel_service_pt(struct lws *wsi)
{
#if 0
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
char buf = 0;
if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1)
lwsl_err("Cannot write to dummy pipe");
#endif
}
LWS_VISIBLE void
lws_cancel_service(struct lws_context *context)
{
#if 0
struct lws_context_per_thread *pt = &context->pt[0];
char buf = 0, m = context->count_threads;
while (m--) {
if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1)
lwsl_err("Cannot write to dummy pipe");
pt++;
}
#endif
}
#if 0
LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
{

View file

@ -29,6 +29,41 @@
#endif
#include <dirent.h>
int
lws_plat_pipe_create(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
return pipe(pt->dummy_pipe_fds);
}
int
lws_plat_pipe_signal(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
char buf = 0;
int n;
n = write(pt->dummy_pipe_fds[1], &buf, 1);
lwsl_debug("%s: fd %d %d\n", __func__, pt->dummy_pipe_fds[1], n);
return n != 1;
}
void
lws_plat_pipe_close(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
if (pt->dummy_pipe_fds[0] && pt->dummy_pipe_fds[0] != -1)
close(pt->dummy_pipe_fds[0]);
if (pt->dummy_pipe_fds[1] && pt->dummy_pipe_fds[1] != -1)
close(pt->dummy_pipe_fds[1]);
pt->dummy_pipe_fds[0] = pt->dummy_pipe_fds[1] = -1;
}
unsigned long long time_in_microseconds(void)
{
struct timeval tv;
@ -77,29 +112,6 @@ lws_poll_listen_fd(struct lws_pollfd *fd)
return poll(fd, 1, 0);
}
LWS_VISIBLE void
lws_cancel_service_pt(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
char buf = 0;
if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1)
lwsl_err("Cannot write to dummy pipe");
}
LWS_VISIBLE void
lws_cancel_service(struct lws_context *context)
{
struct lws_context_per_thread *pt = &context->pt[0];
char buf = 0, m = context->count_threads;
while (m--) {
if (write(pt->dummy_pipe_fds[1], &buf, sizeof(buf)) != 1)
lwsl_err("Cannot write to dummy pipe");
pt++;
}
}
LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
{
int syslog_level = LOG_DEBUG;
@ -126,7 +138,6 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
{
struct lws_context_per_thread *pt;
int n = -1, m, c;
char buf;
/* stay dead once we are dead */
@ -200,12 +211,6 @@ faked_service:
c--;
if (pt->fds[n].fd == pt->dummy_pipe_fds[0]) {
if (read(pt->fds[n].fd, &buf, 1) != 1)
lwsl_err("Cannot read from dummy pipe.");
continue;
}
m = lws_service_fd_tsi(context, &pt->fds[n], tsi);
if (m < 0)
return -1;
@ -543,9 +548,6 @@ lws_plat_context_early_destroy(struct lws_context *context)
LWS_VISIBLE void
lws_plat_context_late_destroy(struct lws_context *context)
{
struct lws_context_per_thread *pt = &context->pt[0];
int m = context->count_threads;
#ifdef LWS_WITH_PLUGINS
if (context->plugin_list)
lws_plat_plugins_destroy(context);
@ -554,13 +556,6 @@ lws_plat_context_late_destroy(struct lws_context *context)
if (context->lws_lookup)
lws_free(context->lws_lookup);
while (m--) {
if (pt->dummy_pipe_fds[0])
close(pt->dummy_pipe_fds[0]);
if (pt->dummy_pipe_fds[1])
close(pt->dummy_pipe_fds[1]);
pt++;
}
if (!context->fd_random)
lwsl_err("ZERO RANDOM FD\n");
if (context->fd_random != LWS_INVALID_FILE)
@ -794,13 +789,11 @@ _lws_plat_file_write(lws_fop_fd_t fop_fd, lws_filepos_t *amount,
return 0;
}
LWS_VISIBLE int
lws_plat_init(struct lws_context *context,
struct lws_context_creation_info *info)
{
struct lws_context_per_thread *pt = &context->pt[0];
int n = context->count_threads, fd;
int fd;
/* master context has the global fd lookup array */
context->lws_lookup = lws_zalloc(sizeof(struct lws *) *
@ -822,25 +815,9 @@ lws_plat_init(struct lws_context *context,
return 1;
}
if (!lws_libev_init_fd_table(context) &&
!lws_libuv_init_fd_table(context) &&
!lws_libevent_init_fd_table(context)) {
/* otherwise libev/uv/event handled it instead */
while (n--) {
if (pipe(pt->dummy_pipe_fds)) {
lwsl_err("Unable to create pipe\n");
return 1;
}
/* use the read end of pipe as first item */
pt->fds[0].fd = pt->dummy_pipe_fds[0];
pt->fds[0].events = LWS_POLLIN;
pt->fds[0].revents = 0;
pt->fds_count = 1;
pt++;
}
}
(void)lws_libev_init_fd_table(context);
(void)lws_libuv_init_fd_table(context);
(void)lws_libevent_init_fd_table(context);
#ifdef LWS_WITH_PLUGINS
if (info->plugin_dirs)

View file

@ -3,6 +3,27 @@
#endif
#include "private-libwebsockets.h"
int
lws_plat_pipe_create(struct lws *wsi)
{
return 1;
}
int
lws_plat_pipe_signal(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
WSASetEvent(pt->events[0]);
return 0;
}
void
lws_plat_pipe_close(struct lws *wsi)
{
}
unsigned long long
time_in_microseconds()
{
@ -81,7 +102,7 @@ delete_from_fd(struct lws_context *context, lws_sockfd_type fd)
if (context->fd_hashtable[h].wsi[n]->desc.sockfd == fd) {
while (n < context->fd_hashtable[h].length) {
context->fd_hashtable[h].wsi[n] =
context->fd_hashtable[h].wsi[n + 1];
context->fd_hashtable[h].wsi[n + 1];
n++;
}
context->fd_hashtable[h].length--;
@ -94,8 +115,8 @@ delete_from_fd(struct lws_context *context, lws_sockfd_type fd)
return 1;
}
LWS_VISIBLE int lws_get_random(struct lws_context *context,
void *buf, int len)
LWS_VISIBLE int
lws_get_random(struct lws_context *context, void *buf, int len)
{
int n;
char *p = (char *)buf;
@ -106,7 +127,8 @@ LWS_VISIBLE int lws_get_random(struct lws_context *context,
return n;
}
LWS_VISIBLE int lws_send_pipe_choked(struct lws *wsi)
LWS_VISIBLE int
lws_send_pipe_choked(struct lws *wsi)
{
/* treat the fact we got a truncated send pending as if we're choked */
if (wsi->trunc_len)
@ -115,7 +137,8 @@ LWS_VISIBLE int lws_send_pipe_choked(struct lws *wsi)
return (int)wsi->sock_send_blocking;
}
LWS_VISIBLE int lws_poll_listen_fd(struct lws_pollfd *fd)
LWS_VISIBLE int
lws_poll_listen_fd(struct lws_pollfd *fd)
{
fd_set readfds;
struct timeval tv = { 0, 0 };
@ -129,25 +152,7 @@ LWS_VISIBLE int lws_poll_listen_fd(struct lws_pollfd *fd)
}
LWS_VISIBLE void
lws_cancel_service(struct lws_context *context)
{
struct lws_context_per_thread *pt = &context->pt[0];
int n = context->count_threads;
while (n--) {
WSASetEvent(pt->events[0]);
pt++;
}
}
LWS_VISIBLE void
lws_cancel_service_pt(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
WSASetEvent(pt->events[0]);
}
LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
lwsl_emit_syslog(int level, const char *line)
{
lwsl_emit_stderr(level, line);
}
@ -182,9 +187,8 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
context->service_tid_detected = 1;
}
if (timeout_ms < 0)
{
if (lws_service_flag_pending(context, tsi)) {
if (timeout_ms < 0) {
if (lws_service_flag_pending(context, tsi)) {
/* any socket with events to service? */
for (n = 0; n < (int)pt->fds_count; n++) {
if (!pt->fds[n].revents)
@ -220,6 +224,9 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
if (n)
i--;
/*
* any wsi has truncated, force him signalled
*/
if (wsi->trunc_len)
WSASetEvent(pt->events[0]);
}
@ -236,29 +243,30 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
timeout_ms = 0;
}
ev = WSAWaitForMultipleEvents( 1, pt->events , FALSE, timeout_ms, FALSE);
ev = WSAWaitForMultipleEvents(1, pt->events, FALSE, timeout_ms, FALSE);
if (ev == WSA_WAIT_EVENT_0) {
unsigned int eIdx;
unsigned int eIdx, err;
WSAResetEvent(pt->events[0]);
for (eIdx = 0; eIdx < pt->fds_count; ++eIdx) {
if (WSAEnumNetworkEvents(pt->fds[eIdx].fd, 0, &networkevents) == SOCKET_ERROR) {
lwsl_err("WSAEnumNetworkEvents() failed with error %d\n", LWS_ERRNO);
if (WSAEnumNetworkEvents(pt->fds[eIdx].fd, 0,
&networkevents) == SOCKET_ERROR) {
lwsl_err("WSAEnumNetworkEvents() failed "
"with error %d\n", LWS_ERRNO);
return -1;
}
pfd = &pt->fds[eIdx];
pfd->revents = (short)networkevents.lNetworkEvents;
err = networkevents.iErrorCode[FD_CONNECT_BIT];
if ((networkevents.lNetworkEvents & FD_CONNECT) &&
networkevents.iErrorCode[FD_CONNECT_BIT] &&
networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EALREADY &&
networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EINPROGRESS &&
networkevents.iErrorCode[FD_CONNECT_BIT] != LWS_EWOULDBLOCK &&
networkevents.iErrorCode[FD_CONNECT_BIT] != WSAEINVAL) {
lwsl_debug("Unable to connect errno=%d\n",
networkevents.iErrorCode[FD_CONNECT_BIT]);
err && err != LWS_EALREADY &&
err != LWS_EINPROGRESS && err != LWS_EWOULDBLOCK &&
err != WSAEINVAL) {
lwsl_debug("Unable to connect errno=%d\n", err);
pfd->revents |= LWS_POLLHUP;
}
@ -269,20 +277,18 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
}
/* if something closed, retry this slot */
if (pfd->revents & LWS_POLLHUP)
--eIdx;
--eIdx;
if( pfd->revents != 0 ) {
if (pfd->revents)
lws_service_fd_tsi(context, pfd, tsi);
}
}
}
context->service_tid = 0;
if (ev == WSA_WAIT_TIMEOUT) {
if (ev == WSA_WAIT_TIMEOUT)
lws_service_fd(context, NULL);
}
return 0;;
}
@ -309,7 +315,7 @@ lws_plat_set_socket_options(struct lws_vhost *vhost, lws_sockfd_type fd)
/* enable keepalive on this socket */
optval = 1;
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE,
(const char *)&optval, optlen) < 0)
(const char *)&optval, optlen) < 0)
return 1;
alive.onoff = TRUE;
@ -317,7 +323,7 @@ lws_plat_set_socket_options(struct lws_vhost *vhost, lws_sockfd_type fd)
alive.keepaliveinterval = vhost->ka_interval;
if (WSAIoctl(fd, SIO_KEEPALIVE_VALS, &alive, sizeof(alive),
NULL, 0, &dwBytesRet, NULL, NULL))
NULL, 0, &dwBytesRet, NULL, NULL))
return 1;
}

View file

@ -60,13 +60,15 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
pfd = &pt->fds[wsi->position_in_fds_table];
pa->fd = wsi->desc.sockfd;
lwsl_debug("%s: fd %d old events %d\n", __func__, pa->fd, pfd->events);
pa->prev_events = pfd->events;
pa->events = pfd->events = (pfd->events & ~_and) | _or;
if (wsi->http2_substream)
return 0;
if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD,
if (wsi->vhost &&
wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD,
wsi->user_space, (void *)pa, 0)) {
ret = -1;
goto bail;
@ -113,7 +115,7 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
}
sampled_tid = context->service_tid;
if (sampled_tid) {
if (sampled_tid && wsi->vhost) {
tid = wsi->vhost->protocols[0].callback(wsi,
LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0);
if (tid == -1) {
@ -176,10 +178,11 @@ insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
#endif
assert(wsi);
assert(wsi->vhost);
assert(wsi->event_pipe || wsi->vhost);
assert(lws_socket_is_valid(wsi->desc.sockfd));
if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
if (wsi->vhost &&
wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
wsi->user_space, (void *) &pa, 1))
return -1;
@ -202,7 +205,8 @@ insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
lws_plat_insert_socket_into_fds(context, wsi);
/* external POLL support via protocol 0 */
if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_ADD_POLL_FD,
if (wsi->vhost &&
wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_ADD_POLL_FD,
wsi->user_space, (void *) &pa, 0))
ret = -1;
#ifndef LWS_NO_SERVER
@ -212,7 +216,8 @@ insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
#endif
lws_pt_unlock(pt);
if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
if (wsi->vhost &&
wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
wsi->user_space, (void *)&pa, 1))
ret = -1;
@ -244,7 +249,8 @@ remove_wsi_socket_from_fds(struct lws *wsi)
}
#endif
if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
if (wsi->vhost &&
wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
wsi->user_space, (void *)&pa, 1))
return -1;
@ -287,19 +293,21 @@ remove_wsi_socket_from_fds(struct lws *wsi)
wsi->position_in_fds_table = -1;
/* remove also from external POLL support via protocol 0 */
if (lws_socket_is_valid(wsi->desc.sockfd))
if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_DEL_POLL_FD,
wsi->user_space, (void *) &pa, 0))
ret = -1;
if (lws_socket_is_valid(wsi->desc.sockfd) && wsi->vhost &&
wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_DEL_POLL_FD,
wsi->user_space, (void *) &pa, 0))
ret = -1;
#ifndef LWS_NO_SERVER
if (!context->being_destroyed)
/* if this made some room, accept connects on this thread */
if ((unsigned int)pt->fds_count < context->fd_limit_per_thread - 1)
lws_accept_modulation(pt, 1);
if (!context->being_destroyed &&
/* if this made some room, accept connects on this thread */
(unsigned int)pt->fds_count < context->fd_limit_per_thread - 1)
lws_accept_modulation(pt, 1);
#endif
lws_pt_unlock(pt);
if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
if (wsi->vhost &&
wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
wsi->user_space, (void *) &pa, 1))
ret = -1;
#endif
@ -314,14 +322,16 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or)
struct lws_pollargs pa;
int ret = 0;
if (!wsi || !wsi->protocol || wsi->position_in_fds_table < 0)
if (!wsi || (!wsi->protocol && !wsi->event_pipe) ||
wsi->position_in_fds_table < 0)
return 1;
context = lws_get_context(wsi);
if (!context)
return 1;
if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
if (wsi->vhost &&
wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
wsi->user_space, (void *) &pa, 0))
return -1;
@ -330,7 +340,8 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or)
lws_pt_lock(pt);
ret = _lws_change_pollfd(wsi, _and, _or, &pa);
lws_pt_unlock(pt);
if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
if (wsi->vhost &&
wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
wsi->user_space, (void *) &pa, 0))
ret = -1;

View file

@ -634,6 +634,7 @@ enum connection_mode {
LWSCM_CGI, /* stdin, stdout, stderr for another cgi master wsi */
LWSCM_RAW, /* raw with bulk handling */
LWSCM_RAW_FILEDESC, /* raw without bulk handling */
LWSCM_EVENT_PIPE, /* event pipe with no vhost or protocol binding */
/* HTTP Client related */
LWSCM_HTTP_CLIENT = LWSCM_FLAG_IMPLIES_CALLBACK_CLOSED_CLIENT_HTTP,
@ -894,9 +895,10 @@ struct lws_context_per_thread {
unsigned char *serv_buf;
#ifdef _WIN32
WSAEVENT *events;
#else
lws_sockfd_type dummy_pipe_fds[2];
#endif
lws_sockfd_type dummy_pipe_fds[2];
struct lws *pipe_wsi;
unsigned int fds_count;
uint32_t ah_pool_length;
@ -1993,6 +1995,7 @@ struct lws {
unsigned int cgi_stdout_zero_length:1;
unsigned int seen_zero_length_recv:1;
unsigned int rxflow_will_be_applied:1;
unsigned int event_pipe:1;
#if defined(LWS_WITH_ESP8266)
unsigned int pending_send_completion:3;
@ -2616,6 +2619,15 @@ void lws_free(void *p);
#define lws_free_set_NULL(P) do { lws_realloc(P, 0, "free"); (P) = NULL; } while(0)
#endif
int
lws_plat_pipe_create(struct lws *wsi);
int
lws_plat_pipe_signal(struct lws *wsi);
void
lws_plat_pipe_close(struct lws *wsi);
int
lws_create_event_pipes(struct lws_context *context);
const struct lws_plat_file_ops *
lws_vfs_select_fops(const struct lws_plat_file_ops *fops, const char *vfs_path,
const char **vpath);

View file

@ -1167,6 +1167,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
/* no, here to service a socket descriptor */
wsi = wsi_from_fd(context, pollfd->fd);
if (!wsi)
/* not lws connection ... leave revents alone and return */
return 0;
@ -1221,6 +1222,47 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
/* okay, what we came here to do... */
switch (wsi->mode) {
case LWSCM_EVENT_PIPE:
{
struct lws_vhost *v = context->vhost_list;
char s[10];
/*
* discard the byte(s) that signaled us
* We really don't care about the number of bytes, but coverity
* thinks we should.
*/
n = read(wsi->desc.sockfd, s, sizeof(s));
(void)n;
if (n < 0)
goto close_and_handled;
/*
* the poll() wait, or the event loop for libuv etc is a
* process-wide resource that we interrupted. So let every
* protocol that may be interested in the pipe event know that
* it happened.
*/
while (v) {
const struct lws_protocols *p = v->protocols;
wsi->vhost = v;
for (n = 0; n < v->count_protocols; n++) {
wsi->protocol = p;
if (p->callback && p->callback(wsi,
LWS_CALLBACK_EVENT_WAIT_CANCELLED,
NULL, NULL, 0)) {
lwsl_info("closed in event cancel\n");
goto close_and_handled;
}
p++;
}
v = v->vhost_next;
}
wsi->vhost = NULL;
wsi->protocol = NULL;
goto handled;
}
case LWSCM_HTTP_SERVING:
case LWSCM_HTTP_CLIENT:
case LWSCM_HTTP_SERVING_ACCEPTED:

View file

@ -402,6 +402,10 @@ req_writable:
mirror_callback_all_in_mi_on_writable(pss->mi);
break;
case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
lwsl_notice("LWS_CALLBACK_EVENT_WAIT_CANCELLED\n");
break;
default:
break;
}

View file

@ -168,6 +168,15 @@ static void timer_cb(uv_timer_t *t)
}
}
static void timer_test_cancel_cb(uv_timer_t *h)
{
if (context) {
lwsl_notice("(doing cancel test)\n");
lws_cancel_service(context);
}
}
static void timer_close_cb(uv_handle_t *h)
{
lwsl_notice("timer close cb %p, loop has %d handles\n",
@ -205,6 +214,7 @@ int main(int argc, char **argv)
uv_timer_t timer_outer;
struct counter ctr;
int foreign_libuv_loop = 0;
uv_timer_t timer_test_cancel;
/* <--- only needed for foreign loop test --- */
#endif
const char *iface = NULL;
@ -351,6 +361,9 @@ int main(int argc, char **argv)
uv_signal_init(&loop, &signal_outer);
uv_signal_start(&signal_outer, outer_signal_cb, SIGINT);
uv_timer_init(&loop, &timer_test_cancel);
uv_timer_start(&timer_test_cancel, timer_test_cancel_cb, 2000, 2000);
uv_timer_init(&loop, &timer_outer);
timer_outer.data = &ctr;
ctr.cur = 0;
@ -374,10 +387,10 @@ int main(int argc, char **argv)
lws_uv_sigint_cfg(context, 1, signal_cb);
#if UV_VERSION_MAJOR > 0
if (foreign_libuv_loop)
if (foreign_libuv_loop) {
/* we have our own uv loop outside of lws */
lws_uv_initloop(context, &loop, 0);
else
} else
#endif
{
/*
@ -438,6 +451,7 @@ int main(int argc, char **argv)
* outside of lws */
uv_timer_stop(&timer_outer);
uv_timer_stop(&timer_test_cancel);
uv_close((uv_handle_t*)&timer_outer, timer_close_cb);
uv_signal_stop(&signal_outer);

View file

@ -174,6 +174,7 @@ void sighandler(int sig)
* port + 1
*/
dynamic_vhost_enable ^= 1;
lws_cancel_service(context);
lwsl_notice("SIGUSR1: dynamic_vhost_enable: %d\n",
dynamic_vhost_enable);
return;