1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

ss: wire up EVENT_WAIT_CANCELLED

Currently the lws_cancel_service() api only manifests itself at lws level.
This adds a state LWSSSCS_EVENT_WAIT_CANCELLED that is broadcast to all
SS in the event loop getting the cancel service api call, and allows
SS-level user code to pick up handling events from other threads.

There's a new example minimal-secure-streams-threads which shows the
pattern for other threads to communicate with and trigger the event in the
lws service thread.
This commit is contained in:
Andy Green 2021-04-13 08:51:27 +01:00
parent 5b6a89f79d
commit 68d9f3a7f2
9 changed files with 474 additions and 5 deletions

View file

@ -217,6 +217,8 @@ typedef enum {
LWSSSCS_SERVER_TXN,
LWSSSCS_SERVER_UPGRADE, /* the server protocol upgraded */
LWSSSCS_EVENT_WAIT_CANCELLED, /* somebody called lws_cancel_service */
LWSSSCS_SINK_JOIN, /* sinks get this when a new source
* stream joins the sink */
LWSSSCS_SINK_PART, /* sinks get this when a new source

View file

@ -93,6 +93,13 @@ rops_handle_POLLIN_pipe(struct lws_context_per_thread *pt, struct lws *wsi,
} lws_end_foreach_dll_safe(d, d1);
}
#endif
#if defined(LWS_WITH_SECURE_STREAMS)
lws_dll2_foreach_safe(&pt->ss_owner, NULL, lws_ss_cancel_notify_dll);
#if defined(LWS_WITH_SECURE_STREAMS_PROXY_API) && defined(LWS_WITH_CLIENT)
lws_dll2_foreach_safe(&pt->ss_client_owner, NULL, lws_sspc_cancel_notify_dll);
#endif
#endif
/*

View file

@ -500,6 +500,12 @@ lws_ss_check_next_state(lws_lifecycle_t *lc, uint8_t *prevstate,
void
lws_proxy_clean_conn_ss(struct lws *wsi);
int
lws_ss_cancel_notify_dll(struct lws_dll2 *d, void *user);
int
lws_sspc_cancel_notify_dll(struct lws_dll2 *d, void *user);
#if defined(LWS_WITH_SECURE_STREAMS_STATIC_POLICY_ONLY)
int
lws_ss_policy_unref_trust_store(struct lws_context *context,

View file

@ -978,3 +978,14 @@ lws_sspc_tag(struct lws_sspc_handle *h)
return "[null sspc]";
return lws_lc_tag(&h->lc);
}
int
lws_sspc_cancel_notify_dll(struct lws_dll2 *d, void *user)
{
lws_sspc_handle_t *h = lws_container_of(d, lws_sspc_handle_t, client_list);
lws_sspc_event_helper(h, LWSSSCS_EVENT_WAIT_CANCELLED, 0);
return 0;
}

View file

@ -270,6 +270,9 @@ lws_ss_serialize_state(struct lws *wsi, struct lws_dsh *dsh, lws_ss_constate_t s
uint8_t pre[12];
int n = 4;
if (state == LWSSSCS_EVENT_WAIT_CANCELLED)
return 0;
lwsl_info("%s: %s, ord 0x%x\n", __func__, lws_ss_state_name((int)state),
(unsigned int)ack);
@ -763,9 +766,11 @@ payload_ff:
parser);
h->txc.peer_tx_cr_est -= n;
if (client_pss_to_sspc_h(pss, ssi)) {
if (ssi->rx && client_pss_to_sspc_h(pss, ssi)) {
/* we still have an sspc handle */
int ret = ssi->rx(client_pss_to_userdata(pss),
int ret;
ret = ssi->rx(client_pss_to_userdata(pss),
(uint8_t *)cp, (unsigned int)n, (int)flags);
if (client_pss_to_sspc_h(pss, ssi) &&

View file

@ -67,6 +67,7 @@ static const char *state_names[] = {
"LWSSSCS_TIMEOUT",
"LWSSSCS_SERVER_TXN",
"LWSSSCS_SERVER_UPGRADE",
"LWSSSCS_EVENT_WAIT_CANCELLED",
};
/*
@ -274,10 +275,10 @@ int
lws_ss_check_next_state(lws_lifecycle_t *lc, uint8_t *prevstate,
lws_ss_constate_t cs)
{
if (cs >= LWSSSCS_USER_BASE)
if (cs >= LWSSSCS_USER_BASE || cs == LWSSSCS_EVENT_WAIT_CANCELLED)
/*
* we can't judge user states, leave the old state and
* just wave them through
* we can't judge user or transient states, leave the old state
* and just wave them through
*/
return 0;
@ -1481,6 +1482,17 @@ lws_ss_destroy_dll(struct lws_dll2 *d, void *user)
return 0;
}
int
lws_ss_cancel_notify_dll(struct lws_dll2 *d, void *user)
{
lws_ss_handle_t *h = lws_container_of(d, lws_ss_handle_t, list);
if (lws_ss_event_helper(h, LWSSSCS_EVENT_WAIT_CANCELLED))
lwsl_warn("%s: cancel event ignores return\n", __func__);
return 0;
}
struct lws_sequencer *
lws_ss_get_sequencer(lws_ss_handle_t *h)
{

View file

@ -0,0 +1,130 @@
project(lws-minimal-secure-streams-threads C)
cmake_minimum_required(VERSION 2.8.12)
find_package(libwebsockets CONFIG REQUIRED)
list(APPEND CMAKE_MODULE_PATH ${LWS_CMAKE_DIR})
include(CheckCSourceCompiles)
include(LwsCheckRequirements)
set(requirements 1)
require_pthreads(requirements)
require_lws_config(LWS_ROLE_H1 1 requirements)
require_lws_config(LWS_WITH_CLIENT 1 requirements)
require_lws_config(LWS_WITH_SECURE_STREAMS 1 requirements)
require_lws_config(LWS_WITH_SYS_SMD 1 requirements)
require_lws_config(LWS_WITH_SECURE_STREAMS_STATIC_POLICY_ONLY 0 requirements)
require_lws_config(LWS_WITH_SYS_STATE 1 requirements)
if (requirements AND NOT WIN32)
# win32 has problems with pthreads.h and timespec struct redef
add_executable(${PROJECT_NAME} minimal-secure-streams-threads.c)
find_program(VALGRIND "valgrind")
if (LWS_CTEST_INTERNET_AVAILABLE AND NOT WIN32)
if (VALGRIND)
add_test(NAME ss-threads COMMAND
${VALGRIND} --tool=memcheck --leak-check=yes --num-callers=20
$<TARGET_FILE:lws-minimal-secure-streams-threads>)
else()
add_test(NAME ss-threads COMMAND lws-minimal-secure-streams-threads)
endif()
set_tests_properties(ss-threads
PROPERTIES
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/secure-streams/minimal-secure-streams-threads
TIMEOUT 10)
endif()
if (websockets_shared)
target_link_libraries(${PROJECT_NAME} websockets_shared ${LIBWEBSOCKETS_DEP_LIBS})
add_dependencies(${PROJECT_NAME} websockets_shared)
else()
target_link_libraries(${PROJECT_NAME} websockets ${LIBWEBSOCKETS_DEP_LIBS})
endif()
CHECK_C_SOURCE_COMPILES("#include <libwebsockets.h>\nint main(void) {\ni#if defined(LWS_WITH_SECURE_STREAMS_PROXY_API)\n return 0;\n #else\n fail\n #endif\n return 0;\n}\n" HAS_LWS_WITH_SECURE_STREAMS_PROXY_API)
if (HAS_LWS_WITH_SECURE_STREAMS_PROXY_API OR LWS_WITH_SECURE_STREAMS_PROXY_API)
add_compile_options(-DLWS_SS_USE_SSPC)
add_executable(${PROJECT_NAME}-client minimal-secure-streams-threads.c)
if (websockets_shared)
target_link_libraries(${PROJECT_NAME}-client websockets_shared ${LIBWEBSOCKETS_DEP_LIBS})
add_dependencies(${PROJECT_NAME}-client websockets_shared)
else()
target_link_libraries(${PROJECT_NAME}-client websockets ${LIBWEBSOCKETS_DEP_LIBS})
endif()
#
# Define test dep to bring up and take down the test
# proxy
#
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
# uds abstract namespace for linux
set(CTEST_SOCKET_PATH "@ctest-sspthreads_sspc-$ENV{SAI_PROJECT}-$ENV{SAI_OVN}")
else()
# filesystem socket for others
set(CTEST_SOCKET_PATH "/tmp/ctest-sspthreads_sspc-$ENV{SAI_PROJECT}-$ENV{SAI_OVN}")
endif()
add_test(NAME st_ssprxthreads_sspc COMMAND
${CMAKE_SOURCE_DIR}/scripts/ctest-background.sh
ssproxythreads_sspc $<TARGET_FILE:lws-minimal-secure-streams-proxy>
-i ${CTEST_SOCKET_PATH} -d1039)
set_tests_properties(st_ssprxthreads_sspc PROPERTIES WORKING_DIRECTORY . FIXTURES_SETUP ssproxythreads_sspc TIMEOUT 800)
add_test(NAME ki_ssprxthreads_sspc COMMAND
${CMAKE_SOURCE_DIR}/scripts/ctest-background-kill.sh
ssproxythreads_sspc $<TARGET_FILE:lws-minimal-secure-streams-proxy>
-i ${CTEST_SOCKET_PATH} -d1039)
set_tests_properties(ki_ssprxthreads_sspc PROPERTIES FIXTURES_CLEANUP ssproxythreads_sspc)
#
# the client part that will connect to the proxy
#
if (VALGRIND)
message("testing via valgrind")
add_test(NAME sspcthreads_sspc COMMAND
${VALGRIND} --tool=memcheck --leak-check=yes --num-callers=20
$<TARGET_FILE:lws-minimal-secure-streams-threads-client> -i +${CTEST_SOCKET_PATH})
else()
add_test(NAME sspcthreads_sspc COMMAND lws-minimal-secure-streams-threads-client -i +${CTEST_SOCKET_PATH})
endif()
set_tests_properties(sspcthreads_sspc PROPERTIES
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/secure-streams/minimal-secure-streams-threads
FIXTURES_REQUIRED "ssproxythreads_sspc"
TIMEOUT 80)
#
# Define test dep to bring up and take down the test
# proxy
#
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
# uds abstract namespace for linux
set(CTEST_SOCKET_PATH "@ctest-mul-sspthreads_sspc-$ENV{SAI_PROJECT}-$ENV{SAI_OVN}")
else()
# filesystem socket for others
set(CTEST_SOCKET_PATH "/tmp/ctest-mul-sspthreads_sspc-$ENV{SAI_PROJECT}-$ENV{SAI_OVN}")
endif()
add_test(NAME st_mulssprxthreads_sspc COMMAND
${CMAKE_SOURCE_DIR}/scripts/ctest-background.sh
mulssproxythreads_sspc $<TARGET_FILE:lws-minimal-secure-streams-proxy>
-i ${CTEST_SOCKET_PATH} -d1039)
set_tests_properties(st_mulssprxthreads_sspc PROPERTIES WORKING_DIRECTORY . FIXTURES_SETUP mulssproxythreads_sspc TIMEOUT 800)
add_test(NAME ki_mulssprxthreads_sspc COMMAND
${CMAKE_SOURCE_DIR}/scripts/ctest-background-kill.sh
mulssproxythreads_sspc $<TARGET_FILE:lws-minimal-secure-streams-proxy>
-i ${CTEST_SOCKET_PATH} -d1039)
set_tests_properties(ki_mulssprxthreads_sspc PROPERTIES FIXTURES_CLEANUP mulssproxythreads_sspc)
endif()
endif()

View file

@ -0,0 +1,27 @@
# lws minimal secure streams threads
This application creates a thread and calls `lws_cancel_service()`
at 10Hz.
It creates a Secure Stream and checks that it is getting the
`LWSSSCS_EVENT_WAIT_CANCELLED` state for each `lws_cancel_service()`.
It also demonstrates how to protect a shared data area between the
thread(s) and the lws event loop thread to put data there that
describes what the thread wants the service loop to do.
It exits after 3s with a 0 return code if the SS saw the expected
amount of messages.
## build
```
$ cmake . && make
```
## usage
Commandline option|Meaning
---|---
-d <loglevel>|Debug verbosity in decimal, eg, -d15

View file

@ -0,0 +1,269 @@
/*
* lws-minimal-secure-streams-threads
*
* Written in 2010-2021 by Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
*
* This demonstrates how other threads can wake the lws event loop and ask it
* to do things via lws_cancel_service(), notifying Secure Streams using the
* LWSSSCS_EVENT_WAIT_CANCELLED state callback.
*
* Because of what we're testing, we don't actually connect the SS just create
* it and wait for the states we are testing for to come at 10Hz.
*
* We run the test for 3s and check we got an appropriate amount of wakes
* to call it a success.
*
* You can use the same pattern to have any amount of shared data protected by
* the mutex, containing whatever the other threads want the lws event loop
* thread to do for them.
*/
#include <libwebsockets.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
static int interrupted, bad = 1, finished;
static lws_sorted_usec_list_t sul_timeout;
static struct lws_context *context;
static pthread_t pthread_spam;
static int wakes, started_thread;
/* the data shared between the spam thread and the lws event loop */
static pthread_mutex_t lock_shared;
static int shared_counter;
#if !defined(LWS_SS_USE_SSPC)
static const char * const default_ss_policy =
"{"
"\"schema-version\":1,"
"\"s\": ["
"{"
"\"mintest\": {"
"\"endpoint\": \"connectivitycheck.android.com\","
"\"http_url\": \"generate_204\","
"\"port\": 80,"
"\"protocol\": \"h1\","
"\"http_method\": \"GET\","
"\"opportunistic\": true,"
"\"http_expect\": 204,"
"\"http_fail_redirect\": true"
"}"
"}"
"]"
"}"
;
#endif
typedef struct myss {
struct lws_ss_handle *ss;
void *opaque_data;
/* ... application specific state ... */
} myss_t;
static void *
thread_spam(void *d)
{
do {
pthread_mutex_lock(&lock_shared); /* --------- shared lock { */
/*
* prepare the shared data area to indicate whatever it is that
* we want doing on the main event loop. In this case, we just
* bump a counter, but it can be any amount of data prepared,
* eg, whole info struct for a connection we want.
*/
shared_counter++;
lwsl_notice("%s: cancelling wait from spam thread: %d\n",
__func__, shared_counter);
lws_cancel_service(context);
pthread_mutex_unlock(&lock_shared); /* } shared lock ------- */
usleep(100000); /* wait 100ms and signal main thread again */
} while (!finished);
pthread_exit(NULL);
return NULL;
}
static lws_ss_state_return_t
myss_state(void *userobj, void *h_src, lws_ss_constate_t state,
lws_ss_tx_ordinal_t ack)
{
// myss_t *m = (myss_t *)userobj;
void *retval;
switch (state) {
case LWSSSCS_CREATING:
if (pthread_create(&pthread_spam, NULL, thread_spam, NULL)) {
lwsl_err("thread creation failed\n");
return LWSSSSRET_DESTROY_ME;
}
started_thread = 1;
break;
case LWSSSCS_DESTROYING:
finished = 1;
if (started_thread)
pthread_join(pthread_spam, &retval);
break;
case LWSSSCS_EVENT_WAIT_CANCELLED:
pthread_mutex_lock(&lock_shared); /* --------- shared lock { */
lwsl_notice("%s: LWSSSCS_EVENT_WAIT_CANCELLED: %d, shared: %d\n",
__func__, ++wakes, shared_counter);
pthread_mutex_unlock(&lock_shared); /* } shared lock ------- */
break;
default:
break;
}
return LWSSSSRET_OK;
}
static const lws_ss_info_t ssi_lws_threads = {
.handle_offset = offsetof(myss_t, ss),
.opaque_user_data_offset = offsetof(myss_t, opaque_data),
/* we don't actually do any rx or tx in this test */
.state = myss_state,
.user_alloc = sizeof(myss_t),
.streamtype = "mintest",
.manual_initial_tx_credit = 0,
};
static void
sul_timeout_cb(lws_sorted_usec_list_t *sul)
{
lwsl_notice("%s: test finishing\n", __func__);
interrupted = 1;
}
static void
sigint_handler(int sig)
{
interrupted = 1;
}
static int
system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
int current, int target)
{
if (current != LWS_SYSTATE_OPERATIONAL || target != LWS_SYSTATE_OPERATIONAL)
return 0;
/* the test SS.. not going to connect it, just see if the cancel_service
* messages are coming
*/
if (lws_ss_create(context, 0, &ssi_lws_threads, NULL, NULL, NULL, NULL)) {
lwsl_err("%s: failed to create secure stream\n",
__func__);
return -1;
}
/* set up the test timeout */
lws_sul_schedule(context, 0, &sul_timeout, sul_timeout_cb,
3 * LWS_US_PER_SEC);
return 0;
}
int main(int argc, const char **argv)
{
lws_state_notify_link_t notifier = { {0}, system_notify_cb, "app" };
lws_state_notify_link_t *na[] = { &notifier, NULL };
struct lws_context_creation_info info;
signal(SIGINT, sigint_handler);
memset(&info, 0, sizeof info);
lws_cmdline_option_handle_builtin(argc, argv, &info);
lwsl_user("LWS Secure Streams threads test client [-d<verb>]\n");
info.fd_limit_per_thread = 1 + 6 + 1;
info.port = CONTEXT_PORT_NO_LISTEN;
#if !defined(LWS_SS_USE_SSPC)
info.pss_policies_json = default_ss_policy;
#else
info.protocols = lws_sspc_protocols;
{
const char *p;
/* connect to ssproxy via UDS by default, else via
* tcp connection to this port */
if ((p = lws_cmdline_option(argc, argv, "-p")))
info.ss_proxy_port = (uint16_t)atoi(p);
/* UDS "proxy.ss.lws" in abstract namespace, else this socket
* path; when -p given this can specify the network interface
* to bind to */
if ((p = lws_cmdline_option(argc, argv, "-i")))
info.ss_proxy_bind = p;
/* if -p given, -a specifies the proxy address to connect to */
if ((p = lws_cmdline_option(argc, argv, "-a")))
info.ss_proxy_address = p;
}
#endif
info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
info.register_notifier_list = na;
/* create the context */
context = lws_create_context(&info);
if (!context) {
lwsl_err("lws init failed\n");
return 1;
}
#if defined(LWS_SS_USE_SSPC)
if (!lws_create_vhost(context, &info)) {
lwsl_err("%s: failed to create default vhost\n", __func__);
goto bail;
}
#endif
/* the event loop */
while (lws_service(context, 0) >= 0 && !interrupted)
;
/* compare what happened with what we expect */
if (wakes > 10)
/* OSX can do the usleep thread slower than 100ms */
bad = 0;
lwsl_notice("wakes %d\n", wakes);
#if defined(LWS_SS_USE_SSPC)
bail:
#endif
lws_sul_cancel(&sul_timeout);
lws_context_destroy(context);
lwsl_user("Completed: %s\n", bad ? "failed" : "OK");
return bad;
}