1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-30 00:00:16 +01:00

lws_system: helpers for attaching to existing event loop from other threads

In the case code is composed into a single process, but it isn't monolithic in the
sense it's made up of modular "applications" that are written separate projects,
provide a way for the "applications" to request a callback from the lws event loop
thread context safely.

From the callback the applications can set up their operations on the lws event
loop and drop their own thread.

Since it requires system-specific locking to be threadsafe, provide a non-threadsafe
helper and then indirect the actual usage through a user-defined lws_system ops
function pointer that wraps the unsafe api with the system locking to make it safe.
This commit is contained in:
Andy Green 2019-12-31 15:24:58 +00:00
parent 6e35da95e5
commit d8ccfc2370
10 changed files with 662 additions and 2 deletions

View file

@ -108,15 +108,36 @@ typedef enum { /* keep system_state_names[] in sync in context.c */
LWS_SYSTATE_OPERATIONAL, /* user code can operate normally */
LWS_SYSTATE_POLICY_INVALID, /* user code is changing its policies
LWS_SYSTATE_POLICY_INVALID, /* user code is changing its policies
* drop everything done with old
* policy, switch to new then enter
* LWS_SYSTATE_POLICY_VALID */
} lws_system_states_t;
typedef void (*lws_attach_cb_t)(struct lws_context *context, int tsi, void *opaque);
struct lws_attach_item;
typedef struct lws_system_ops {
int (*reboot)(void);
int (*set_clock)(lws_usec_t us);
int (*attach)(struct lws_context *context, int tsi, lws_attach_cb_t cb,
lws_system_states_t state, void *opaque,
struct lws_attach_item **get);
/**< if \p get is NULL, add an attach callback request to the pt for
* \p cb with arg \p opaque, that should be called when we're at or past
* system state \p state.
*
* If \p get is non-NULL, look for the first listed item on the pt whose
* state situation is ready, and set *get to point to it. If no items,
* or none where the system state is right, set *get to NULL.
*
* It's done like this so (*attach) can perform system-specific
* locking outside of lws core, for both getting and adding items the
* same so it is thread-safe. A non-threadsafe helper
* __lws_system_attach() is provided to do the actual work inside the
* system-specific locking.
*/
} lws_system_ops_t;
/**
@ -159,6 +180,44 @@ lws_system_get_ops(struct lws_context *context);
LWS_EXTERN LWS_VISIBLE struct lws_context *
lws_system_context_from_system_mgr(lws_state_manager_t *mgr);
/**
* __lws_system_attach() - get and set items on context attach list
*
* \param context: context to get or set attach items to
* \param tsi: thread service index (normally 0)
* \param cb: callback to call from context event loop thread
* \param state: the lws_system state we have to be in or have passed through
* \param opaque: optional pointer to user specific info given to callback
* \param get: NULL, or pointer to pointer to take detached tail item on exit
*
* This allows other threads to enqueue callback requests to happen from a pt's
* event loop thread safely. The callback gets the context pointer and a user
* opaque pointer that can be optionally given when the item is added to the
* attach list.
*
* This api is the no-locking core function for getting and setting items on the
* pt's attach list. The lws_system operation (*attach) is the actual
* api that user and internal code calls for this feature, it should perform
* system-specific locking, call this helper, release the locking and then
* return the result. This api is public only so it can be used in the locked
* implementation of (*attach).
*
* If get is NULL, then the call adds to the head of the pt attach list using
* cb, state, and opaque; if get is non-NULL, then *get is set to the first
* waiting attached item that meets the state criteria and that item is removed
* from the list.
*
* This is a non-threadsafe helper only designed to be called from
* implementations of struct lws_system's (*attach) operation where system-
* specific locking has been applied around it, making it threadsafe.
*/
LWS_EXTERN LWS_VISIBLE int
__lws_system_attach(struct lws_context *context, int tsi, lws_attach_cb_t cb,
lws_system_states_t state, void *opaque,
struct lws_attach_item **get);
typedef int (*dhcpc_cb_t)(void *opaque, int af, uint8_t *ip, int ip_len);
/**

View file

@ -355,6 +355,7 @@ struct lws_context_per_thread {
#endif
struct lws_dll2_owner dll_buflist_owner; /* guys with pending rxflow */
struct lws_dll2_owner seq_owner; /* list of lws_sequencer-s */
lws_dll2_owner_t attach_owner; /* pending lws_attach */
struct lws_dll2_owner pt_sul_owner;

View file

@ -107,6 +107,13 @@ lws_sul_schedule(struct lws_context *context, int tsi,
lws_usec_t
__lws_sul_service_ripe(lws_dll2_owner_t *own, lws_usec_t usnow)
{
struct lws_context_per_thread *pt = (struct lws_context_per_thread *)
lws_container_of(own, struct lws_context_per_thread,
pt_sul_owner);
if (pt->attach_owner.count)
lws_system_do_attach(pt);
while (lws_dll2_get_head(own)) {
/* .list is always first member in lws_sorted_usec_list_t */

View file

@ -103,6 +103,15 @@ lws_state_notify_protocol_init(struct lws_state_manager *mgr,
{
struct lws_context *context = lws_container_of(mgr, struct lws_context,
mgr_system);
int n;
/*
* Deal with any attachments that were waiting for the right state
* to come along
*/
for (n = 0; n < context->count_threads; n++)
lws_system_do_attach(&context->pt[n]);
#if defined(LWS_WITH_SYS_DHCP_CLIENT)
if (current == LWS_SYSTATE_DHCP) {

View file

@ -279,6 +279,14 @@ typedef struct lws_system_blob {
char is_direct;
} lws_system_blob_t;
typedef struct lws_attach_item {
lws_dll2_t list;
lws_attach_cb_t cb;
void *opaque;
lws_system_states_t state;
} lws_attach_item_t;
/*
* the rest is managed per-context, that includes
*
@ -500,9 +508,12 @@ LWS_EXTERN int
lws_find_string_in_file(const char *filename, const char *str, int stringlen);
#endif
signed char char_to_hex(const char c);
#if defined(LWS_WITH_NETWORK)
int
lws_system_do_attach(struct lws_context_per_thread *pt);
#endif
struct lws_buflist {
struct lws_buflist *next;

View file

@ -11,3 +11,58 @@ If any system helper is enabled for build, lws creates an additional vhost
features are bound to this. In the context object, this is available as
`.vhost_system`.
# Attaching to an existing context from other threads
To simplify the case different pieces of code want to attach to a single
lws_context at runtime, from different thread contexts, lws_system has an api
via an lws_system operation function pointer where the other threads can use
platform-specific locking to request callbacks to their own code from the
lws event loop thread context safely.
For convenience, the callback can be delayed until the system has entered or
passed a specified system state, eg, LWS_SYSTATE_OPERATIONAL so the code will
only get called back after the network, ntpclient and auth have been done.
Additionally an opaque pointer can be passed to the callback when it is called
from the lws event loop context.
## Implementing the system-specific locking
`lws_system_ops_t` struct has a member `.attach`
```
int (*attach)(struct lws_context *context, int tsi, lws_attach_cb_t *cb,
lws_system_states_t state, void *opaque,
struct lws_attach_item **get);
```
This should be defined in user code as setting locking, then passing the
arguments through to a non-threadsafe helper
```
int
__lws_system_attach(struct lws_context *context, int tsi, lws_attach_cb_t *cb,
lws_system_states_t state, void *opaque,
struct lws_attach_item **get);
```
that does the actual attach work. When it returns, the locking should be
unlocked and the return passed back.
## Attaching the callback request
User code should call the lws_system_ops_t `.attach` function like
```
lws_system_get_ops(context)->attach(...);
```
The callback function which will be called from the lws event loop context
should look like this
```
void my_callback(struct lws_context *context, int tsi, void *opaque);
```
with the callback function name passed into the (*attach)() call above. When
the callback happens, the opaque user pointer set at the (*attach)() call is
passed back to it as an argument.

View file

@ -141,3 +141,120 @@ lws_system_get_blob(struct lws_context *context, lws_system_blob_item_t type,
return &context->system_blobs[type + idx];
}
#if defined(LWS_WITH_NETWORK)
/*
* Caller must protect the whole call with system-specific locking
*/
int
__lws_system_attach(struct lws_context *context, int tsi, lws_attach_cb_t cb,
lws_system_states_t state, void *opaque,
struct lws_attach_item **get)
{
struct lws_context_per_thread *pt = &context->pt[tsi];
struct lws_attach_item *item;
if (!get) {
/*
* allocate and add to the head of the pt's attach list
*/
item = lws_zalloc(sizeof(*item), __func__);
if (!item)
return 1;
item->cb = cb;
item->opaque = opaque;
item->state = state;
lws_dll2_add_head(&item->list, &pt->attach_owner);
lws_cancel_service(context);
return 0;
}
*get = NULL;
if (!pt->attach_owner.count)
return 0;
/*
* If any, return the first guy whose state requirement matches
*/
lws_start_foreach_dll(struct lws_dll2 *, d,
lws_dll2_get_head(&pt->attach_owner)) {
item = lws_container_of(d, lws_attach_item_t, list);
if (pt->context->mgr_system.state >= (int)item->state) {
*get = item;
lws_dll2_remove(d);
/*
* We detached it, but the caller now has the
* responsibility to lws_free() *get.
*/
return 0;
}
} lws_end_foreach_dll(d);
/* nobody ready to go... leave *get as NULL and return cleanly */
return 0;
}
int
lws_system_do_attach(struct lws_context_per_thread *pt)
{
/*
* If nothing to do, we just return immediately
*/
while (pt->attach_owner.count) {
struct lws_attach_item *item;
/*
* If anybody used the attach apis, there must be an
* implementation of the (*attach) lws_system op function
*/
assert(pt->context->system_ops->attach);
if (!pt->context->system_ops->attach) {
lwsl_err("%s: define (*attach)\n", __func__);
return 1;
}
/*
* System locking is applied only around this next call, while
* we detach and get a pointer to the tail attach item. We
* become responsible to free what we have detached.
*/
if (pt->context->system_ops->attach(pt->context, pt->tid, NULL,
0, NULL, &item)) {
lwsl_err("%s: attach problem\n", __func__);
return 1;
}
if (!item)
/* there's nothing more to do at the moment */
return 0;
/*
* Do the callback from the lws event loop thread
*/
item->cb(pt->context, pt->tid, item->opaque);
/* it's done, destroy the item */
lws_free(item);
}
return 0;
}
#endif

View file

@ -0,0 +1,91 @@
cmake_minimum_required(VERSION 2.8)
include(CheckIncludeFile)
include(CheckCSourceCompiles)
Project(lws-minimal-http-client-attach)
set(SAMP lws-minimal-http-client-attach)
set(SRCS minimal-http-client-attach.c)
MACRO(require_pthreads result)
CHECK_INCLUDE_FILE(pthread.h LWS_HAVE_PTHREAD_H)
if (NOT LWS_HAVE_PTHREAD_H)
if (LWS_WITH_MINIMAL_EXAMPLES)
set(result 0)
else()
message(FATAL_ERROR "threading support requires pthreads")
endif()
endif()
ENDMACRO()
# If we are being built as part of lws, confirm current build config supports
# reqconfig, else skip building ourselves.
#
# If we are being built externally, confirm installed lws was configured to
# support reqconfig, else error out with a helpful message about the problem.
#
MACRO(require_lws_config reqconfig _val result)
if (DEFINED ${reqconfig})
if (${reqconfig})
set (rq 1)
else()
set (rq 0)
endif()
else()
set(rq 0)
endif()
if (${_val} EQUAL ${rq})
set(SAME 1)
else()
set(SAME 0)
endif()
if (LWS_WITH_MINIMAL_EXAMPLES AND NOT ${SAME})
if (${_val})
message("${SAMP}: skipping as lws being built without ${reqconfig}")
else()
message("${SAMP}: skipping as lws built with ${reqconfig}")
endif()
set(${result} 0)
else()
if (LWS_WITH_MINIMAL_EXAMPLES)
set(MET ${SAME})
else()
CHECK_C_SOURCE_COMPILES("#include <libwebsockets.h>\nint main(void) {\n#if defined(${reqconfig})\n return 0;\n#else\n fail;\n#endif\n return 0;\n}\n" HAS_${reqconfig})
if (NOT DEFINED HAS_${reqconfig} OR NOT HAS_${reqconfig})
set(HAS_${reqconfig} 0)
else()
set(HAS_${reqconfig} 1)
endif()
if ((HAS_${reqconfig} AND ${_val}) OR (NOT HAS_${reqconfig} AND NOT ${_val}))
set(MET 1)
else()
set(MET 0)
endif()
endif()
if (NOT MET)
if (${_val})
message(FATAL_ERROR "This project requires lws must have been configured with ${reqconfig}")
else()
message(FATAL_ERROR "Lws configuration of ${reqconfig} is incompatible with this project")
endif()
endif()
endif()
ENDMACRO()
set(requirements 1)
require_pthreads(requirements)
require_lws_config(LWS_ROLE_H1 1 requirements)
require_lws_config(LWS_WITH_CLIENT 1 requirements)
if (requirements)
add_executable(${SAMP} ${SRCS})
if (websockets_shared)
target_link_libraries(${SAMP} websockets_shared pthread)
add_dependencies(${SAMP} websockets_shared)
else()
target_link_libraries(${SAMP} websockets pthread)
endif()
endif()

View file

@ -0,0 +1,35 @@
# lws minimal http client attach
This demonstrates how other threads can reach out to an existing lws_context
and join its event loop cleanly and safely.
## build
```
$ cmake . && make
```
Pthreads is required on your system.
## usage
```
$ ./lws-minimal-http-client-attach
[2019/12/31 18:30:49:3495] U: main: main thread tid 0x503e1c0
[2019/12/31 18:30:50:3584] U: LWS minimal http client attach
[2019/12/31 18:30:50:4002] U: lws_create: tid 0x5c41700
[2019/12/31 18:30:50:5727] E: callback_ntpc: set up system ops for set_clock
[2019/12/31 18:30:50:2110] N: callback_ntpc: Unix time: 1577817053
[2019/12/31 18:30:50:2136] U: attach_callback: called from tid 0x5c41700
[2019/12/31 18:30:51:8733] U: Connected to 46.105.127.147, http response: 200
[2019/12/31 18:30:51:8818] U: RECEIVE_CLIENT_HTTP_READ: read 4087
[2019/12/31 18:30:51:8823] U: RECEIVE_CLIENT_HTTP_READ: read 4096
[2019/12/31 18:30:51:8846] U: RECEIVE_CLIENT_HTTP_READ: read 4087
[2019/12/31 18:30:51:8847] U: RECEIVE_CLIENT_HTTP_READ: read 4096
[2019/12/31 18:30:51:8855] U: RECEIVE_CLIENT_HTTP_READ: read 4087
[2019/12/31 18:30:51:8856] U: RECEIVE_CLIENT_HTTP_READ: read 4096
[2019/12/31 18:30:51:8860] U: RECEIVE_CLIENT_HTTP_READ: read 1971
[2019/12/31 18:30:51:8873] U: LWS_CALLBACK_COMPLETED_CLIENT_HTTP
[2019/12/31 18:30:51:9629] U: main: finished
```

View file

@ -0,0 +1,275 @@
/*
* lws-minimal-http-client-attach
*
* Written in 2010-2019 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* This demonstrates how to use the lws_system (*attach) api to allow a
* different thread to arrange to join an existing lws event loop safely. The
* attached stuff does an http client GET from the lws event loop, even though
* it was originally requested from a different thread than the lws event loop.
*/
#include <libwebsockets.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
static struct lws_context *context;
static pthread_t lws_thread;
static pthread_mutex_t lock;
static int interrupted, bad = 1, status;
static int
callback_http(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
switch (reason) {
/* because we are protocols[0] ... */
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
in ? (char *)in : "(null)");
interrupted = 1;
break;
case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP:
{
char buf[128];
lws_get_peer_simple(wsi, buf, sizeof(buf));
status = lws_http_client_http_response(wsi);
lwsl_user("Connected to %s, http response: %d\n",
buf, status);
}
break;
/* chunks of chunked content, with header removed */
case LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ:
lwsl_user("RECEIVE_CLIENT_HTTP_READ: read %d\n", (int)len);
#if 0 /* enable to dump the html */
{
const char *p = in;
while (len--)
if (*p < 0x7f)
putchar(*p++);
else
putchar('.');
}
#endif
return 0; /* don't passthru */
/* uninterpreted http content */
case LWS_CALLBACK_RECEIVE_CLIENT_HTTP:
{
char buffer[1024 + LWS_PRE];
char *px = buffer + LWS_PRE;
int lenx = sizeof(buffer) - LWS_PRE;
if (lws_http_client_read(wsi, &px, &lenx) < 0)
return -1;
}
return 0; /* don't passthru */
case LWS_CALLBACK_COMPLETED_CLIENT_HTTP:
lwsl_user("LWS_CALLBACK_COMPLETED_CLIENT_HTTP\n");
interrupted = 1;
bad = status != 200;
lws_cancel_service(lws_get_context(wsi)); /* abort poll wait */
break;
case LWS_CALLBACK_CLOSED_CLIENT_HTTP:
interrupted = 1;
bad = status != 200;
lws_cancel_service(lws_get_context(wsi)); /* abort poll wait */
break;
default:
break;
}
return lws_callback_http_dummy(wsi, reason, user, in, len);
}
static const struct lws_protocols protocols[] = {
{
"http",
callback_http,
0,
0,
},
{ NULL, NULL, 0, 0 }
};
void sigint_handler(int sig)
{
interrupted = 1;
}
static void
attach_callback(struct lws_context *context, int tsi, void *opaque)
{
struct lws_client_connect_info i;
/*
* Even though it was asked for from a different thread, we are called
* back by lws from the lws event loop thread context
*/
lwsl_user("%s: called from tid %p\n", __func__, (void *)pthread_self());
/*
* We can set up our operations on the lws event loop and return so
* they can happen asynchronously
*/
memset(&i, 0, sizeof i); /* otherwise uninitialized garbage */
i.context = context;
i.ssl_connection = LCCSCF_USE_SSL;
i.ssl_connection |= LCCSCF_H2_QUIRK_OVERFLOWS_TXCR |
LCCSCF_H2_QUIRK_NGHTTP2_END_STREAM;
i.port = 443;
i.address = "warmcat.com";
i.path = "/";
i.host = i.address;
i.origin = i.address;
i.method = "GET";
i.protocol = protocols[0].name;
lws_client_connect_via_info(&i);
}
static int
lws_attach_with_pthreads_locking(struct lws_context *context, int tsi,
lws_attach_cb_t cb, lws_system_states_t state,
void *opaque, struct lws_attach_item **get)
{
int n;
pthread_mutex_lock(&lock);
/*
* We just provide system-specific locking around the lws non-threadsafe
* helper that adds and removes things from the pt list
*/
n = __lws_system_attach(context, tsi, cb, state, opaque, get);
pthread_mutex_unlock(&lock);
return n;
}
lws_system_ops_t ops = {
.attach = lws_attach_with_pthreads_locking
};
/*
* We made this into a different thread to model it being run from completely
* different codebase that's all linked together
*/
static void *
lws_create(void *d)
{
struct lws_context_creation_info info;
lwsl_user("%s: tid %p\n", __func__, (void *)pthread_self());
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
info.port = CONTEXT_PORT_NO_LISTEN;
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
info.system_ops = &ops;
info.protocols = protocols;
context = lws_create_context(&info);
if (!context) {
lwsl_err("lws init failed\n");
goto bail;
}
/* start the event loop */
while (!interrupted)
if (lws_service(context, 0))
interrupted = 1;
lws_context_destroy(context);
bail:
pthread_exit(NULL);
return NULL;
}
int main(int argc, const char **argv)
{
int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE;
const char *p;
void *retval;
signal(SIGINT, sigint_handler);
if ((p = lws_cmdline_option(argc, argv, "-d")))
logs = atoi(p);
lws_set_log_level(logs, NULL);
lwsl_user("%s: main thread tid %p\n", __func__, (void *)pthread_self());
lwsl_user("LWS minimal http client attach\n");
pthread_mutex_init(&lock, NULL);
/*
* The idea of the example is we're going to split the lws context and
* event loop off to be created from its own thread... this is like it
* was actually started by some completely different code...
*/
if (pthread_create(&lws_thread, NULL, lws_create, NULL)) {
lwsl_err("thread creation failed\n");
goto bail1;
}
/*
* Now on the original / different thread representing a different
* codebase that wants to join this existing event loop, we'll ask to
* get a callback from the event loop context when the event loop
* thread is operational. We have to wait around a bit because we
* may run before the lws context was created.
*/
while (!context && n++ < 30)
usleep(10000);
if (!context) {
lwsl_err("%s: context didn't start\n", __func__);
goto bail;
}
/*
* From our different, non event loop thread, ask for our attach
* callback to get called when lws system state is OPERATIONAL
*/
lws_system_get_ops(context)->attach(context, 0, attach_callback,
LWS_SYSTATE_OPERATIONAL,
NULL, NULL);
/*
* That's all we wanted to do with our thread. Just wait for the lws
* thread to exit as well.
*/
bail:
pthread_join(lws_thread, &retval);
bail1:
pthread_mutex_destroy(&lock);
lwsl_user("%s: finished\n", __func__);
return 0;
}