From 43f290adbc060627c1ffe97ed0c48f50c6014101 Mon Sep 17 00:00:00 2001 From: Andy Green Date: Mon, 16 Aug 2021 10:42:55 +0100 Subject: [PATCH] sspc: add LWSSSCS_UPSTREAM_LINK_RETRY state Add a transient state that indicates that we are retrying an upstream link before the sspc creation can proceed. The state() ack paramemeter shows the number of ms we have been retrying. This is only issued if proxy mode clients are having difficulty reaching their proxy, and are retrying. As a transient state it doesn't affect the ss overall state, which will not have reached CREATING yet. --- include/libwebsockets/lws-secure-streams.h | 7 +++++ .../private-lib-secure-streams.h | 1 + lib/secure-streams/secure-streams-client.c | 27 +++++++++++++------ lib/secure-streams/secure-streams.c | 9 +++++-- 4 files changed, 34 insertions(+), 10 deletions(-) diff --git a/include/libwebsockets/lws-secure-streams.h b/include/libwebsockets/lws-secure-streams.h index e4cb6ba5d..8157c7002 100644 --- a/include/libwebsockets/lws-secure-streams.h +++ b/include/libwebsockets/lws-secure-streams.h @@ -219,6 +219,13 @@ typedef enum { LWSSSCS_EVENT_WAIT_CANCELLED, /* somebody called lws_cancel_service */ + LWSSSCS_UPSTREAM_LINK_RETRY, /* if we are being proxied over some + * intermediate link, this transient + * state may be sent to indicate we are + * waiting to establish that link before + * creation can proceed.. ack is the + * number of ms we have been trying */ + LWSSSCS_SINK_JOIN, /* sinks get this when a new source * stream joins the sink */ LWSSSCS_SINK_PART, /* sinks get this when a new source diff --git a/lib/secure-streams/private-lib-secure-streams.h b/lib/secure-streams/private-lib-secure-streams.h index 2b5f61190..ebfa7efa0 100644 --- a/lib/secure-streams/private-lib-secure-streams.h +++ b/lib/secure-streams/private-lib-secure-streams.h @@ -330,6 +330,7 @@ typedef struct lws_sspc_handle { */ lws_usec_t us_earliest_write_req; + lws_usec_t us_start_upstream; unsigned long writeable_len; diff --git a/lib/secure-streams/secure-streams-client.c b/lib/secure-streams/secure-streams-client.c index e90cf79cd..76ea9522e 100644 --- a/lib/secure-streams/secure-streams-client.c +++ b/lib/secure-streams/secure-streams-client.c @@ -203,17 +203,14 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason, size_t pktsize = wsi->a.context->max_http_header_data; void *m = (void *)((uint8_t *)&h[1]); uint8_t *pkt = NULL, *p = NULL, *end = NULL; + lws_ss_state_return_t r; + uint64_t interval; const uint8_t *cp; uint8_t s[64]; lws_usec_t us; int flags, n; switch (reason) { - case LWS_CALLBACK_PROTOCOL_INIT: - break; - - case LWS_CALLBACK_PROTOCOL_DESTROY: - break; case LWS_CALLBACK_CONNECTING: /* @@ -224,6 +221,8 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason, break; case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + lwsl_warn("%s: CCE: %s\n", __func__, + in ? (const char *)in : "null"); #if defined(LWS_WITH_SYS_METRICS) /* * If any hanging caliper measurement, dump it, and free any tags @@ -234,6 +233,17 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason, h->cwsi = NULL; lws_sul_schedule(h->context, 0, &h->sul_retry, lws_sspc_sul_retry_cb, LWS_US_PER_SEC); + if (h->ssi.state) { + interval = (uint64_t)(lws_now_usecs() - h->us_start_upstream) / + LWS_US_PER_MS; + if (interval > 0xffffffffull) + interval = 0xffffffffull; + r = h->ssi.state(lws_sspc_to_user_object(h), NULL, + LWSSSCS_UPSTREAM_LINK_RETRY, + (uint32_t)interval); + if (r == LWSSSSRET_DESTROY_ME) + lws_sspc_destroy(&h); + } break; case LWS_CALLBACK_RAW_CONNECTED: @@ -253,6 +263,7 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason, */ lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3); lws_callback_on_writable(wsi); + h->us_start_upstream = 0; break; case LWS_CALLBACK_RAW_CLOSE: @@ -268,14 +279,13 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason, lws_dsh_destroy(&h->dsh); if (h->ss_dangling_connected && h->ssi.state) { - lws_ss_state_return_t ret_state; lwsl_sspc_notice(h, "setting _DISCONNECTED"); h->ss_dangling_connected = 0; h->prev_ss_state = LWSSSCS_DISCONNECTED; - ret_state = h->ssi.state(ss_to_userobj(h), NULL, + r = h->ssi.state(ss_to_userobj(h), NULL, LWSSSCS_DISCONNECTED, 0); - if (ret_state == LWSSSSRET_DESTROY_ME) { + if (r == LWSSSSRET_DESTROY_ME) { h->cwsi = NULL; lws_set_opaque_user_data(wsi, NULL); lws_sspc_destroy(&h); @@ -657,6 +667,7 @@ lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi, memcpy(p, ssi->streamtype, strlen(ssi->streamtype) + 1); h->ssi.streamtype = (const char *)p; h->context = context; + h->us_start_upstream = lws_now_usecs(); if (!ssi->manual_initial_tx_credit) h->txc.peer_tx_cr_est = 500000000; diff --git a/lib/secure-streams/secure-streams.c b/lib/secure-streams/secure-streams.c index cfa355210..74d575732 100644 --- a/lib/secure-streams/secure-streams.c +++ b/lib/secure-streams/secure-streams.c @@ -68,6 +68,7 @@ static const char *state_names[] = { "LWSSSCS_SERVER_TXN", "LWSSSCS_SERVER_UPGRADE", "LWSSSCS_EVENT_WAIT_CANCELLED", + "LWSSSCS_UPSTREAM_LINK_RETRY", }; /* @@ -289,7 +290,9 @@ int lws_ss_check_next_state(lws_lifecycle_t *lc, uint8_t *prevstate, lws_ss_constate_t cs) { - if (cs >= LWSSSCS_USER_BASE || cs == LWSSSCS_EVENT_WAIT_CANCELLED) + if (cs >= LWSSSCS_USER_BASE || + cs == LWSSSCS_EVENT_WAIT_CANCELLED || + cs == LWSSSCS_UPSTREAM_LINK_RETRY) /* * we can't judge user or transient states, leave the old state * and just wave them through @@ -336,7 +339,9 @@ int lws_ss_check_next_state_ss(lws_ss_handle_t *ss, uint8_t *prevstate, lws_ss_constate_t cs) { - if (cs >= LWSSSCS_USER_BASE || cs == LWSSSCS_EVENT_WAIT_CANCELLED) + if (cs >= LWSSSCS_USER_BASE || + cs == LWSSSCS_EVENT_WAIT_CANCELLED || + cs == LWSSSCS_UPSTREAM_LINK_RETRY) /* * we can't judge user or transient states, leave the old state * and just wave them through