mirror of
https://github.com/warmcat/libwebsockets.git
synced 2025-03-09 00:00:04 +01:00
smd: add more scenarios to tests
Let's have 4 x forked clients all intercommunicate via the SS proxy, and add it to ctest.
This commit is contained in:
parent
8ff35b819a
commit
abc60c755a
6 changed files with 641 additions and 18 deletions
|
@ -376,7 +376,7 @@ lws_h2_issue_preface(struct lws *wsi)
|
|||
struct lws_h2_netconn *h2n = wsi->h2.h2n;
|
||||
struct lws_h2_protocol_send *pps;
|
||||
|
||||
lwsl_notice("%s: %s: fd %d\n", __func__, lws_wsi_tag(wsi), (int)wsi->desc.sockfd);
|
||||
lwsl_debug("%s: %s: fd %d\n", __func__, lws_wsi_tag(wsi), (int)wsi->desc.sockfd);
|
||||
|
||||
if (lws_issue_raw(wsi, (uint8_t *)preface, strlen(preface)) !=
|
||||
(int)strlen(preface))
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
#include <signal.h>
|
||||
|
||||
static int interrupted, ok, fail, _exp = 111;
|
||||
static lws_sorted_usec_list_t sul;
|
||||
static lws_sorted_usec_list_t sul, sul_initial_drain;
|
||||
struct lws_context *context;
|
||||
static pthread_t thread_spam;
|
||||
|
||||
|
@ -55,6 +55,24 @@ smd_cb2int(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp,
|
|||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* This is used in an smd participant that is deregistered before the message
|
||||
* can be delivered, it should never see any message
|
||||
*/
|
||||
|
||||
static int
|
||||
smd_cb3int(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp,
|
||||
void *buf, size_t len)
|
||||
{
|
||||
lwsl_err("%s: Countermanded ts %llu, len %d\n", __func__,
|
||||
(unsigned long long)timestamp, (int)len);
|
||||
lwsl_hexdump_err(buf, len);
|
||||
|
||||
fail++;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *
|
||||
_thread_spam(void *d)
|
||||
{
|
||||
|
@ -79,7 +97,7 @@ _thread_spam(void *d)
|
|||
#if defined(WIN32)
|
||||
Sleep(3);
|
||||
#else
|
||||
usleep(3000);
|
||||
usleep(1000);
|
||||
#endif
|
||||
}
|
||||
#if !defined(WIN32)
|
||||
|
@ -94,24 +112,60 @@ void sigint_handler(int sig)
|
|||
interrupted = 1;
|
||||
}
|
||||
|
||||
static int
|
||||
system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
|
||||
int current, int target)
|
||||
static void
|
||||
drained_cb(lws_sorted_usec_list_t *sul)
|
||||
{
|
||||
// struct lws_context *context = mgr->parent;
|
||||
|
||||
if (current != LWS_SYSTATE_OPERATIONAL || target != LWS_SYSTATE_OPERATIONAL)
|
||||
return 0;
|
||||
|
||||
lwsl_info("%s: operational\n", __func__);
|
||||
|
||||
/*
|
||||
* spawn the test thread, it's going to spam 100 messages at 20ms
|
||||
* spawn the test thread, it's going to spam 100 messages at 3ms
|
||||
* intervals... check we got everything
|
||||
*/
|
||||
|
||||
if (pthread_create(&thread_spam, NULL, _thread_spam, NULL))
|
||||
lwsl_err("%s: failed to create the spamming thread\n", __func__);
|
||||
}
|
||||
|
||||
static int
|
||||
system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
|
||||
int current, int target)
|
||||
{
|
||||
// struct lws_context *context = mgr->parent;
|
||||
int n;
|
||||
|
||||
if (current != LWS_SYSTATE_OPERATIONAL || target != LWS_SYSTATE_OPERATIONAL)
|
||||
return 0;
|
||||
|
||||
/*
|
||||
* Overflow the message queue too see if it handles it well, both
|
||||
* as overflowing and in recovery. These are all still going into the
|
||||
* smd buffer dll2, since we don't break for the event loop to have a
|
||||
* chance to deliver them.
|
||||
*/
|
||||
|
||||
n = 0;
|
||||
while (n++ < 100)
|
||||
if (lws_smd_msg_printf(context, LWSSMDCL_SYSTEM_STATE,
|
||||
"{\"s\":\"state\",\"test\":\"overflow\"}"))
|
||||
break;
|
||||
|
||||
lwsl_notice("%s: overflow test added %d messages\n", __func__, n);
|
||||
if (n == 100) {
|
||||
lwsl_err("%s: didn't overflow\n", __func__);
|
||||
interrupted = 1;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* So we have some normal messages from earlier and now the rest of the
|
||||
* smd buffer filled with junk overflow messages. Before we start the
|
||||
* actual spamming test from another thread, we need to return to the
|
||||
* event loop so these can be cleared first.
|
||||
*/
|
||||
|
||||
lws_sul_schedule(context, 0, &sul_initial_drain, drained_cb,
|
||||
5 * LWS_US_PER_MS);
|
||||
|
||||
|
||||
lwsl_info("%s: operational\n", __func__);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -123,6 +177,7 @@ main(int argc, const char **argv)
|
|||
lws_state_notify_link_t *na[] = { ¬ifier, NULL };
|
||||
int logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE;
|
||||
struct lws_context_creation_info info;
|
||||
struct lws_smd_peer *userreg;
|
||||
const char *p;
|
||||
void *retval;
|
||||
|
||||
|
@ -165,6 +220,20 @@ main(int argc, const char **argv)
|
|||
goto bail;
|
||||
}
|
||||
|
||||
/* temporarily register a messaging participant to hear a user class */
|
||||
|
||||
userreg = lws_smd_register(context, NULL, 0, 1 << LWSSMDCL_USER_BASE_BITNUM,
|
||||
smd_cb3int);
|
||||
if (!userreg) {
|
||||
lwsl_err("%s: smd register userclass failed\n", __func__);
|
||||
goto bail;
|
||||
}
|
||||
|
||||
/*
|
||||
* The event loop isn't started yet, so these smd messages are getting
|
||||
* buffered. Later we will deliberately overrun the buffer and wait
|
||||
* for that to be cleared before the spam thread test.
|
||||
*/
|
||||
|
||||
/* generate an INTERACTION class message */
|
||||
|
||||
|
@ -189,6 +258,22 @@ main(int argc, const char **argv)
|
|||
goto bail;
|
||||
}
|
||||
|
||||
/* generate a user class message... */
|
||||
|
||||
if (lws_smd_msg_printf(context, 1 << LWSSMDCL_USER_BASE_BITNUM,
|
||||
"{\"s\":\"userclass\"}")) {
|
||||
lwsl_err("%s: problem sending smd\n", __func__);
|
||||
goto bail;
|
||||
}
|
||||
|
||||
/*
|
||||
* ... and screw that user class message up by deregistering the only
|
||||
* handler before it can deliver it... it should not get delivered
|
||||
* and cleanly discarded
|
||||
*/
|
||||
|
||||
lws_smd_unregister(userreg);
|
||||
|
||||
/* the usual lws event loop */
|
||||
|
||||
while (!interrupted && lws_service(context, 0) >= 0)
|
||||
|
|
|
@ -46,13 +46,100 @@ if (requirements)
|
|||
if (HAS_LWS_WITH_SECURE_STREAMS_PROXY_API OR LWS_WITH_SECURE_STREAMS_PROXY_API)
|
||||
add_compile_options(-DLWS_SS_USE_SSPC)
|
||||
|
||||
add_executable(${PROJECT_NAME}-client minimal-secure-streams-smd.c)
|
||||
add_executable(${PROJECT_NAME}-client minimal-secure-streams-smd.c multi.c)
|
||||
|
||||
if (websockets_shared)
|
||||
target_link_libraries(${PROJECT_NAME}-client websockets_shared ${LIBWEBSOCKETS_DEP_LIBS})
|
||||
add_dependencies(${PROJECT_NAME}-client websockets_shared)
|
||||
else()
|
||||
target_link_libraries(${PROJECT_NAME}-client websockets ${LIBWEBSOCKETS_DEP_LIBS})
|
||||
endif()
|
||||
|
||||
#
|
||||
# Define test dep to bring up and take down the test
|
||||
# proxy
|
||||
#
|
||||
|
||||
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
|
||||
# uds abstract namespace for linux
|
||||
set(CTEST_SOCKET_PATH "@ctest-sspsmd_sspc-$ENV{SAI_PROJECT}-$ENV{SAI_OVN}")
|
||||
else()
|
||||
# filesystem socket for others
|
||||
set(CTEST_SOCKET_PATH "/tmp/ctest-sspsmd_sspc-$ENV{SAI_PROJECT}-$ENV{SAI_OVN}")
|
||||
endif()
|
||||
|
||||
add_test(NAME st_ssprxsmd_sspc COMMAND
|
||||
${CMAKE_SOURCE_DIR}/scripts/ctest-background.sh
|
||||
ssproxysmd_sspc $<TARGET_FILE:lws-minimal-secure-streams-proxy>
|
||||
-i ${CTEST_SOCKET_PATH} --ignore-sigterm -d1039)
|
||||
set_tests_properties(st_ssprxsmd_sspc PROPERTIES WORKING_DIRECTORY . FIXTURES_SETUP ssproxysmd_sspc TIMEOUT 800)
|
||||
|
||||
add_test(NAME ki_ssprxsmd_sspc COMMAND
|
||||
${CMAKE_SOURCE_DIR}/scripts/ctest-background-kill.sh
|
||||
ssproxysmd_sspc $<TARGET_FILE:lws-minimal-secure-streams-proxy>
|
||||
-i ${CTEST_SOCKET_PATH} --ignore-sigterm -d1039)
|
||||
set_tests_properties(ki_ssprxsmd_sspc PROPERTIES FIXTURES_CLEANUP ssproxysmd_sspc)
|
||||
|
||||
#
|
||||
# the client part that will connect to the proxy
|
||||
#
|
||||
|
||||
if (VALGRIND)
|
||||
message("testing via valgrind")
|
||||
add_test(NAME sspcsmd_sspc COMMAND
|
||||
${VALGRIND} --tool=memcheck --leak-check=yes --num-callers=20
|
||||
$<TARGET_FILE:lws-minimal-secure-streams-smd-client> -i +${CTEST_SOCKET_PATH})
|
||||
else()
|
||||
add_test(NAME sspcsmd_sspc COMMAND lws-minimal-secure-streams-smd-client -i +${CTEST_SOCKET_PATH})
|
||||
endif()
|
||||
set_tests_properties(sspcsmd_sspc PROPERTIES
|
||||
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/secure-streams/minimal-secure-streams-smd
|
||||
FIXTURES_REQUIRED "ssproxysmd_sspc"
|
||||
TIMEOUT 80)
|
||||
|
||||
|
||||
#
|
||||
# Define test dep to bring up and take down the test
|
||||
# proxy
|
||||
#
|
||||
|
||||
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
|
||||
# uds abstract namespace for linux
|
||||
set(CTEST_SOCKET_PATH "@ctest-mul-sspsmd_sspc-$ENV{SAI_PROJECT}-$ENV{SAI_OVN}")
|
||||
else()
|
||||
# filesystem socket for others
|
||||
set(CTEST_SOCKET_PATH "/tmp/ctest-mul-sspsmd_sspc-$ENV{SAI_PROJECT}-$ENV{SAI_OVN}")
|
||||
endif()
|
||||
|
||||
add_test(NAME st_mulssprxsmd_sspc COMMAND
|
||||
${CMAKE_SOURCE_DIR}/scripts/ctest-background.sh
|
||||
mulssproxysmd_sspc $<TARGET_FILE:lws-minimal-secure-streams-proxy>
|
||||
-i ${CTEST_SOCKET_PATH} --ignore-sigterm -d1039)
|
||||
set_tests_properties(st_mulssprxsmd_sspc PROPERTIES WORKING_DIRECTORY . FIXTURES_SETUP mulssproxysmd_sspc TIMEOUT 800)
|
||||
|
||||
add_test(NAME ki_mulssprxsmd_sspc COMMAND
|
||||
${CMAKE_SOURCE_DIR}/scripts/ctest-background-kill.sh
|
||||
mulssproxysmd_sspc $<TARGET_FILE:lws-minimal-secure-streams-proxy>
|
||||
-i ${CTEST_SOCKET_PATH} --ignore-sigterm -d1039)
|
||||
set_tests_properties(ki_mulssprxsmd_sspc PROPERTIES FIXTURES_CLEANUP mulssproxysmd_sspc)
|
||||
|
||||
#
|
||||
# multi tests for the client part that will connect to the proxy
|
||||
#
|
||||
|
||||
if (VALGRIND)
|
||||
message("testing via valgrind")
|
||||
add_test(NAME mulsspcsmd_sspc COMMAND
|
||||
${VALGRIND} --tool=memcheck --leak-check=yes --num-callers=20
|
||||
$<TARGET_FILE:lws-minimal-secure-streams-smd-client> -i +${CTEST_SOCKET_PATH} --multi -d1039)
|
||||
else()
|
||||
add_test(NAME mulsspcsmd_sspc COMMAND lws-minimal-secure-streams-smd-client -i +${CTEST_SOCKET_PATH} --multi -d1039)
|
||||
endif()
|
||||
set_tests_properties(mulsspcsmd_sspc PROPERTIES
|
||||
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/minimal-examples/secure-streams/minimal-secure-streams-smd
|
||||
FIXTURES_REQUIRED "mulssproxysmd_sspc"
|
||||
TIMEOUT 80)
|
||||
|
||||
endif()
|
||||
|
||||
endif()
|
||||
|
|
|
@ -60,6 +60,11 @@ CPD detect trigger. The SS link is set up to only accept messages of classes
|
|||
LWSSMDCL_SYSTEM_STATE and LWSSMDCL_NETWORK, INTERACTION type messages are
|
||||
not accepted.
|
||||
|
||||
### multi via proxy
|
||||
|
||||
If the -client version is run with `--multi`, it spawns four worker processes
|
||||
which send and confirm SMD messages between each other via the SS proxy.
|
||||
|
||||
## build
|
||||
|
||||
```
|
||||
|
@ -71,6 +76,7 @@ not accepted.
|
|||
Commandline option|Meaning
|
||||
---|---
|
||||
-d <loglevel>|Debug verbosity in decimal, eg, -d15
|
||||
--multi|Fork four worker processes that send and check messages to each other over sspc proxy
|
||||
|
||||
```
|
||||
$ ./bin/lws-minimal-secure-streams-smd -d 1151
|
||||
|
|
|
@ -92,7 +92,7 @@ sul_tx_periodic_cb(lws_sorted_usec_list_t *sul)
|
|||
{
|
||||
myss_t *m = lws_container_of(sul, myss_t, sul);
|
||||
|
||||
lwsl_notice("%s: requesting TX\n", __func__);
|
||||
lwsl_info("%s: requesting TX\n", __func__);
|
||||
lws_ss_request_tx(m->ss);
|
||||
}
|
||||
|
||||
|
@ -102,7 +102,7 @@ myss_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len,
|
|||
{
|
||||
myss_t *m = (myss_t *)userobj;
|
||||
|
||||
lwsl_notice("%s: sending SS smd\n", __func__);
|
||||
lwsl_info("%s: sending SS smd\n", __func__);
|
||||
|
||||
/*
|
||||
* The SS RX isn't going to see INTERACTION messages, because its class
|
||||
|
@ -139,7 +139,7 @@ myss_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len,
|
|||
count_tx++;
|
||||
|
||||
lws_sul_schedule(lws_ss_get_context(m->ss), 0, &m->sul,
|
||||
sul_tx_periodic_cb, 250 * LWS_US_PER_MS);
|
||||
sul_tx_periodic_cb, 200 * LWS_US_PER_MS);
|
||||
|
||||
return LWSSSSRET_OK;
|
||||
}
|
||||
|
@ -244,6 +244,8 @@ sigint_handler(int sig)
|
|||
interrupted = 1;
|
||||
}
|
||||
|
||||
extern int smd_ss_multi_test(int argc, const char **argv);
|
||||
|
||||
int main(int argc, const char **argv)
|
||||
{
|
||||
struct lws_context_creation_info info;
|
||||
|
@ -252,6 +254,12 @@ int main(int argc, const char **argv)
|
|||
signal(SIGINT, sigint_handler);
|
||||
|
||||
memset(&info, 0, sizeof info);
|
||||
|
||||
#if defined(LWS_SS_USE_SSPC)
|
||||
if (lws_cmdline_option(argc, argv, "--multi"))
|
||||
return smd_ss_multi_test(argc, argv);
|
||||
#endif
|
||||
|
||||
lws_cmdline_option_handle_builtin(argc, argv, &info);
|
||||
|
||||
lwsl_user("LWS Secure Streams SMD test client [-d<verb>]\n");
|
||||
|
@ -262,6 +270,24 @@ int main(int argc, const char **argv)
|
|||
info.pss_policies_json = default_ss_policy;
|
||||
#else
|
||||
info.protocols = lws_sspc_protocols;
|
||||
{
|
||||
const char *p;
|
||||
|
||||
/* connect to ssproxy via UDS by default, else via
|
||||
* tcp connection to this port */
|
||||
if ((p = lws_cmdline_option(argc, argv, "-p")))
|
||||
info.ss_proxy_port = (uint16_t)atoi(p);
|
||||
|
||||
/* UDS "proxy.ss.lws" in abstract namespace, else this socket
|
||||
* path; when -p given this can specify the network interface
|
||||
* to bind to */
|
||||
if ((p = lws_cmdline_option(argc, argv, "-i")))
|
||||
info.ss_proxy_bind = p;
|
||||
|
||||
/* if -p given, -a specifies the proxy address to connect to */
|
||||
if ((p = lws_cmdline_option(argc, argv, "-a")))
|
||||
info.ss_proxy_address = p;
|
||||
}
|
||||
#endif
|
||||
info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
|
||||
LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
|
||||
|
|
|
@ -0,0 +1,419 @@
|
|||
/*
|
||||
* lws-minimal-secure-streams-smd
|
||||
*
|
||||
* Written in 2010-2021 by Andy Green <andy@warmcat.com>
|
||||
*
|
||||
* This file is made available under the Creative Commons CC0 1.0
|
||||
* Universal Public Domain Dedication.
|
||||
*
|
||||
*
|
||||
* This demonstrates a minimal http client using secure streams to access the
|
||||
* SMD api. This file is only built when LWS_SS_USE_SSPC defined.
|
||||
*
|
||||
* This is an alternative test implementation selected by --multi at runtime,
|
||||
* it's in its own file to stop muddying up the main test sources. It's only
|
||||
* available when built with SSPC / produces -client executable.
|
||||
*
|
||||
* We will fork several times, the original thread and the forks hook up to
|
||||
* the proxy with smd SS, each fork waits a second for everyone to have joined,
|
||||
* and then each fork (NOT the original process) sends a bunch of user messages
|
||||
* that all the forks should receive, having been distributed by SMD and the
|
||||
* ss proxy.
|
||||
*
|
||||
* The participants check they received all the messages expected from everyone
|
||||
* and then send a final message indicating success and exits. The original
|
||||
* fork is watching for these to arrive before the timeout, if so it's a PASS.
|
||||
*/
|
||||
|
||||
#include <libwebsockets.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
|
||||
static int bad = 1, interrupted;
|
||||
|
||||
/* number of forks */
|
||||
#define FORKS 4
|
||||
/* number of messages each will send, eg, 4 forks 64 message == 256 messages */
|
||||
#define MSGCOUNT 64
|
||||
|
||||
typedef struct myss {
|
||||
struct lws_ss_handle *ss;
|
||||
void *opaque_data;
|
||||
/* ... application specific state ... */
|
||||
uint64_t seen_mask[FORKS];
|
||||
int seen_msgs[FORKS];
|
||||
lws_sorted_usec_list_t sul;
|
||||
int count;
|
||||
char seen_all;
|
||||
char send_seen_all;
|
||||
char starting;
|
||||
} myss_t;
|
||||
|
||||
|
||||
/* secure streams payload interface */
|
||||
|
||||
static lws_ss_state_return_t
|
||||
multi_myss_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
|
||||
{
|
||||
myss_t *m = (myss_t *)userobj;
|
||||
const char *p;
|
||||
int fk, t, n;
|
||||
size_t al;
|
||||
|
||||
/* ignore our and other forks announcing their result */
|
||||
|
||||
if (lws_json_simple_find((const char *)buf, len, "\"seen_all\":", &al))
|
||||
return LWSSSSRET_OK;
|
||||
|
||||
/*
|
||||
* otherwise once we saw the expected messages, any other messages
|
||||
* coming in this class are wrong
|
||||
*/
|
||||
|
||||
if (m->seen_all) {
|
||||
lwsl_err("%s: unexpected extra messages\n", __func__);
|
||||
return LWSSSSRET_DESTROY_ME;
|
||||
}
|
||||
|
||||
p = lws_json_simple_find((const char *)buf, len, "\"fork\":", &al);
|
||||
if (!p)
|
||||
return LWSSSSRET_DESTROY_ME;
|
||||
fk = atoi(p);
|
||||
if (fk < 1 || fk > FORKS)
|
||||
return LWSSSSRET_DESTROY_ME;
|
||||
|
||||
p = lws_json_simple_find((const char *)buf, len, "\"test\":", &al);
|
||||
if (!p)
|
||||
return LWSSSSRET_DESTROY_ME;
|
||||
t = atoi(p);
|
||||
|
||||
if (t < 0 || t >= MSGCOUNT)
|
||||
return LWSSSSRET_DESTROY_ME;
|
||||
|
||||
m->seen_mask[fk - 1] |= 1ull << t;
|
||||
m->seen_msgs[fk - 1]++; /* keep an eye on dupes */
|
||||
|
||||
/* Have we seen a full set of messages from everyone? */
|
||||
|
||||
for (n = 0; n < FORKS; n++) {
|
||||
if (m->seen_msgs[n] != (int)MSGCOUNT)
|
||||
return LWSSSSRET_OK;
|
||||
if (m->seen_mask[n] != 0xffffffffffffffffull)
|
||||
return LWSSSSRET_OK;
|
||||
}
|
||||
|
||||
/*
|
||||
* Oh... so we have finished collecting messages
|
||||
*/
|
||||
|
||||
lwsl_user("%s: test thread %d: %s received all messages\n", __func__,
|
||||
(int)(intptr_t)lws_context_user(lws_ss_get_context(m->ss)),
|
||||
lws_ss_tag(m->ss));
|
||||
m->seen_all = m->send_seen_all = 1;
|
||||
|
||||
/*
|
||||
* Prepare to inform the original process we saw everything
|
||||
* from everyone OK
|
||||
*/
|
||||
|
||||
lws_ss_request_tx(m->ss);
|
||||
|
||||
return LWSSSSRET_OK;
|
||||
}
|
||||
|
||||
static void
|
||||
sul_multi_tx_periodic_cb(lws_sorted_usec_list_t *sul)
|
||||
{
|
||||
myss_t *m = lws_container_of(sul, myss_t, sul);
|
||||
|
||||
if (!m->send_seen_all && m->seen_all) {
|
||||
lws_ss_destroy(&m->ss);
|
||||
return;
|
||||
}
|
||||
|
||||
m->starting = 1;
|
||||
if (m->count < MSGCOUNT || m->send_seen_all)
|
||||
lws_ss_request_tx(m->ss);
|
||||
}
|
||||
|
||||
static lws_ss_state_return_t
|
||||
multi_myss_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, size_t *len,
|
||||
int *flags)
|
||||
{
|
||||
myss_t *m = (myss_t *)userobj;
|
||||
|
||||
/*
|
||||
* We want to send exactly MSGCOUNT user class smd messages
|
||||
*/
|
||||
|
||||
if (!m->starting || (m->count == MSGCOUNT && !m->send_seen_all))
|
||||
return LWSSSSRET_TX_DONT_SEND;
|
||||
|
||||
// lwsl_notice("%s: sending SS smd\n", __func__);
|
||||
|
||||
lws_ser_wu64be(buf, 1 << LWSSMDCL_USER_BASE_BITNUM);
|
||||
lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */
|
||||
|
||||
if (m->send_seen_all) {
|
||||
*len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)
|
||||
lws_snprintf((char *)buf + LWS_SMD_SS_RX_HEADER_LEN, *len,
|
||||
"{\"class\":\"user\",\"fork\": %d,\"seen_all\":true}",
|
||||
(int)(intptr_t)lws_context_user(lws_ss_get_context(m->ss)));
|
||||
|
||||
m->send_seen_all = 0;
|
||||
lwsl_info("%s: test thread %d: sent summary message\n", __func__,
|
||||
(int)(intptr_t)lws_context_user(lws_ss_get_context(m->ss)));
|
||||
} else
|
||||
*len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)
|
||||
lws_snprintf((char *)buf + LWS_SMD_SS_RX_HEADER_LEN, *len,
|
||||
"{\"class\":\"user\",\"fork\": %d,\"test\":%u}",
|
||||
(int)(intptr_t)lws_context_user(lws_ss_get_context(m->ss)),
|
||||
m->count++);
|
||||
|
||||
*flags = LWSSS_FLAG_SOM | LWSSS_FLAG_EOM;
|
||||
|
||||
lws_sul_schedule(lws_ss_get_context(m->ss), 0, &m->sul,
|
||||
sul_multi_tx_periodic_cb, 25 * LWS_US_PER_MS);
|
||||
|
||||
return LWSSSSRET_OK;
|
||||
}
|
||||
|
||||
static lws_ss_state_return_t
|
||||
multi_myss_state(void *userobj, void *h_src, lws_ss_constate_t state,
|
||||
lws_ss_tx_ordinal_t ack)
|
||||
{
|
||||
myss_t *m = (myss_t *)userobj;
|
||||
int n;
|
||||
|
||||
lwsl_notice("%s: %s: %s (%d), ord 0x%x\n", __func__, lws_ss_tag(m->ss),
|
||||
lws_ss_state_name((int)state), state, (unsigned int)ack);
|
||||
|
||||
switch (state) {
|
||||
case LWSSSCS_DESTROYING:
|
||||
lws_sul_cancel(&m->sul);
|
||||
interrupted = 1;
|
||||
return 0;
|
||||
|
||||
case LWSSSCS_CONNECTED:
|
||||
lwsl_notice("%s: CONNECTED: test fork %d\n", __func__,
|
||||
(int)(intptr_t)lws_context_user(lws_ss_get_context(m->ss)));
|
||||
/*
|
||||
* Because in this test everybody is watching and counting
|
||||
* everybody else's messages from different forks, we have to
|
||||
* hold off starting sending for 1s so all forks can join the
|
||||
* proxy first and not miss anything
|
||||
*/
|
||||
lws_sul_schedule(lws_ss_get_context(m->ss), 0, &m->sul,
|
||||
sul_multi_tx_periodic_cb, 1 * LWS_US_PER_SEC);
|
||||
m->starting = 0;
|
||||
return 0;
|
||||
case LWSSSCS_DISCONNECTED:
|
||||
for (n = 0; n < FORKS; n++)
|
||||
lwsl_notice("%s: testfork %d: peer %d: seen_msg = %d, "
|
||||
"seen make = 0x%llx\n", __func__,
|
||||
(int)(intptr_t)lws_context_user(lws_ss_get_context(m->ss)),
|
||||
n, m->seen_msgs[n],
|
||||
(unsigned long long)m->seen_mask[n]);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static const lws_ss_info_t ssi_multi_lws_smd = {
|
||||
.handle_offset = offsetof(myss_t, ss),
|
||||
.opaque_user_data_offset = offsetof(myss_t, opaque_data),
|
||||
.rx = multi_myss_rx,
|
||||
.tx = multi_myss_tx,
|
||||
.state = multi_myss_state,
|
||||
.user_alloc = sizeof(myss_t),
|
||||
.streamtype = LWS_SMD_STREAMTYPENAME,
|
||||
.manual_initial_tx_credit = 1 << LWSSMDCL_USER_BASE_BITNUM,
|
||||
};
|
||||
|
||||
static lws_ss_state_return_t
|
||||
multi_myss_rx_monitor(void *userobj, const uint8_t *buf, size_t len, int flags)
|
||||
{
|
||||
myss_t *m = (myss_t *)userobj;
|
||||
const char *p;
|
||||
size_t al;
|
||||
int fk, n;
|
||||
|
||||
/* ignore our and other forks announcing their result */
|
||||
|
||||
if (!lws_json_simple_find((const char *)buf, len, "\"seen_all\":", &al))
|
||||
return LWSSSSRET_OK;
|
||||
|
||||
p = lws_json_simple_find((const char *)buf, len, "\"fork\":", &al);
|
||||
if (!p)
|
||||
return LWSSSSRET_DESTROY_ME;
|
||||
fk = atoi(p);
|
||||
if (fk < 1 || fk > FORKS)
|
||||
return LWSSSSRET_DESTROY_ME;
|
||||
|
||||
if (m->seen_msgs[fk - 1])
|
||||
/* expected only once ... dupe */
|
||||
return LWSSSSRET_DESTROY_ME;
|
||||
|
||||
m->seen_msgs[fk - 1] = 1;
|
||||
|
||||
for (n = 0; n < FORKS; n++)
|
||||
if (!m->seen_msgs[n])
|
||||
return LWSSSSRET_OK;
|
||||
|
||||
/* the test has succeeded */
|
||||
|
||||
bad = 0;
|
||||
interrupted = 1;
|
||||
|
||||
return LWSSSSRET_OK;
|
||||
}
|
||||
|
||||
static const lws_ss_info_t ssi_multi_lws_smd_monitor = {
|
||||
.handle_offset = offsetof(myss_t, ss),
|
||||
.opaque_user_data_offset = offsetof(myss_t, opaque_data),
|
||||
.rx = multi_myss_rx_monitor,
|
||||
// .state = multi_myss_state_monitor,
|
||||
.user_alloc = sizeof(myss_t),
|
||||
.streamtype = LWS_SMD_STREAMTYPENAME,
|
||||
.manual_initial_tx_credit = 1 << LWSSMDCL_USER_BASE_BITNUM,
|
||||
};
|
||||
|
||||
/* for comparison, this is a non-SS lws_smd participant */
|
||||
|
||||
static int
|
||||
direct_smd_cb(void *opaque, lws_smd_class_t _class, lws_usec_t timestamp,
|
||||
void *buf, size_t len)
|
||||
{
|
||||
struct lws_context **pctx = (struct lws_context **)opaque;
|
||||
|
||||
if (_class != LWSSMDCL_SYSTEM_STATE)
|
||||
return 0;
|
||||
|
||||
if (!lws_json_simple_strcmp(buf, len, "\"state\":", "OPERATIONAL")) {
|
||||
|
||||
/*
|
||||
* Create the SSPC link to lws_smd... notice in ssi_lws_smd
|
||||
* above, we tell this link to use the user class filter.
|
||||
*
|
||||
* If context->user is zero, we are the original process
|
||||
* monitoring the progress of the others, otherwise we are
|
||||
* 1 .. FORKS and producing / checking the smd messages
|
||||
*/
|
||||
|
||||
lwsl_info("%s: starting ss for test fork %d\n", __func__,
|
||||
(int)(intptr_t)lws_context_user(*pctx));
|
||||
|
||||
if (lws_ss_create(*pctx, 0, lws_context_user(*pctx) ?
|
||||
&ssi_multi_lws_smd /* forked process send / check */:
|
||||
&ssi_multi_lws_smd_monitor /* original monitors */,
|
||||
NULL, NULL, NULL, NULL)) {
|
||||
lwsl_err("%s: failed to create secure stream\n",
|
||||
__func__);
|
||||
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
sul_timeout_cb(lws_sorted_usec_list_t *sul)
|
||||
{
|
||||
interrupted = 1;
|
||||
}
|
||||
|
||||
int
|
||||
smd_ss_multi_test(int argc, const char **argv)
|
||||
{
|
||||
struct lws_context_creation_info info;
|
||||
lws_sorted_usec_list_t sul_timeout;
|
||||
struct lws_context *context;
|
||||
pid_t pid;
|
||||
int n;
|
||||
|
||||
lwsl_user("LWS Secure Streams SMD MULTI test client [-d<verb>]\n");
|
||||
|
||||
for (n = 0; n < FORKS; n++) {
|
||||
pid = fork();
|
||||
if (!pid) /* forked child */ {
|
||||
break;
|
||||
}
|
||||
lwsl_notice("%s: forked test process %u\n", __func__, pid);
|
||||
}
|
||||
|
||||
if (n == FORKS)
|
||||
/* the original process */
|
||||
n = -1; /* so original ends up with context.user as 0 below */
|
||||
|
||||
memset(&info, 0, sizeof info);
|
||||
memset(&sul_timeout, 0, sizeof sul_timeout);
|
||||
|
||||
lws_cmdline_option_handle_builtin(argc, argv, &info);
|
||||
|
||||
{
|
||||
const char *p;
|
||||
|
||||
/* connect to ssproxy via UDS by default, else via
|
||||
* tcp connection to this port */
|
||||
if ((p = lws_cmdline_option(argc, argv, "-p")))
|
||||
info.ss_proxy_port = (uint16_t)atoi(p);
|
||||
|
||||
/* UDS "proxy.ss.lws" in abstract namespace, else this socket
|
||||
* path; when -p given this can specify the network interface
|
||||
* to bind to */
|
||||
if ((p = lws_cmdline_option(argc, argv, "-i")))
|
||||
info.ss_proxy_bind = p;
|
||||
|
||||
/* if -p given, -a specifies the proxy address to connect to */
|
||||
if ((p = lws_cmdline_option(argc, argv, "-a")))
|
||||
info.ss_proxy_address = p;
|
||||
}
|
||||
|
||||
info.fd_limit_per_thread = 1 + 6 + 1;
|
||||
info.port = CONTEXT_PORT_NO_LISTEN;
|
||||
info.protocols = lws_sspc_protocols;
|
||||
info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
|
||||
LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
|
||||
|
||||
info.early_smd_cb = direct_smd_cb;
|
||||
info.early_smd_class_filter = 0xffffffff;
|
||||
info.early_smd_opaque = &context;
|
||||
|
||||
info.user = (void *)(intptr_t)(n + 1);
|
||||
|
||||
/* create the context */
|
||||
|
||||
context = lws_create_context(&info);
|
||||
if (!context) {
|
||||
lwsl_err("lws init failed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!lws_create_vhost(context, &info)) {
|
||||
lwsl_err("%s: failed to create default vhost\n", __func__);
|
||||
goto bail;
|
||||
}
|
||||
|
||||
/* set up the test timeout */
|
||||
|
||||
lws_sul_schedule(context, 0, &sul_timeout, sul_timeout_cb,
|
||||
10 * LWS_US_PER_SEC);
|
||||
|
||||
/* the event loop */
|
||||
|
||||
while (lws_service(context, 0) >= 0 && !interrupted)
|
||||
;
|
||||
|
||||
bail:
|
||||
lws_context_destroy(context);
|
||||
|
||||
if (n == -1)
|
||||
lwsl_user("%s: finished %s\n", __func__, bad ? "FAIL" : "PASS");
|
||||
|
||||
return bad;
|
||||
}
|
Loading…
Add table
Reference in a new issue