1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

sspc: add LWSSSCS_UPSTREAM_LINK_RETRY state

Add a transient state that indicates that we are retrying an upstream link
before the sspc creation can proceed.

The state() ack paramemeter shows the number of ms we have been retrying.

This is only issued if proxy mode clients are having difficulty reaching
their proxy, and are retrying.

As a transient state it doesn't affect the ss overall state, which will not
have reached CREATING yet.
This commit is contained in:
Andy Green 2021-08-16 10:42:55 +01:00
parent 770dc7bc85
commit 43f290adbc
4 changed files with 34 additions and 10 deletions

View file

@ -219,6 +219,13 @@ typedef enum {
LWSSSCS_EVENT_WAIT_CANCELLED, /* somebody called lws_cancel_service */
LWSSSCS_UPSTREAM_LINK_RETRY, /* if we are being proxied over some
* intermediate link, this transient
* state may be sent to indicate we are
* waiting to establish that link before
* creation can proceed.. ack is the
* number of ms we have been trying */
LWSSSCS_SINK_JOIN, /* sinks get this when a new source
* stream joins the sink */
LWSSSCS_SINK_PART, /* sinks get this when a new source

View file

@ -330,6 +330,7 @@ typedef struct lws_sspc_handle {
*/
lws_usec_t us_earliest_write_req;
lws_usec_t us_start_upstream;
unsigned long writeable_len;

View file

@ -203,17 +203,14 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
size_t pktsize = wsi->a.context->max_http_header_data;
void *m = (void *)((uint8_t *)&h[1]);
uint8_t *pkt = NULL, *p = NULL, *end = NULL;
lws_ss_state_return_t r;
uint64_t interval;
const uint8_t *cp;
uint8_t s[64];
lws_usec_t us;
int flags, n;
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
break;
case LWS_CALLBACK_PROTOCOL_DESTROY:
break;
case LWS_CALLBACK_CONNECTING:
/*
@ -224,6 +221,8 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
break;
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
lwsl_warn("%s: CCE: %s\n", __func__,
in ? (const char *)in : "null");
#if defined(LWS_WITH_SYS_METRICS)
/*
* If any hanging caliper measurement, dump it, and free any tags
@ -234,6 +233,17 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
h->cwsi = NULL;
lws_sul_schedule(h->context, 0, &h->sul_retry,
lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
if (h->ssi.state) {
interval = (uint64_t)(lws_now_usecs() - h->us_start_upstream) /
LWS_US_PER_MS;
if (interval > 0xffffffffull)
interval = 0xffffffffull;
r = h->ssi.state(lws_sspc_to_user_object(h), NULL,
LWSSSCS_UPSTREAM_LINK_RETRY,
(uint32_t)interval);
if (r == LWSSSSRET_DESTROY_ME)
lws_sspc_destroy(&h);
}
break;
case LWS_CALLBACK_RAW_CONNECTED:
@ -253,6 +263,7 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
*/
lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3);
lws_callback_on_writable(wsi);
h->us_start_upstream = 0;
break;
case LWS_CALLBACK_RAW_CLOSE:
@ -268,14 +279,13 @@ callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
lws_dsh_destroy(&h->dsh);
if (h->ss_dangling_connected && h->ssi.state) {
lws_ss_state_return_t ret_state;
lwsl_sspc_notice(h, "setting _DISCONNECTED");
h->ss_dangling_connected = 0;
h->prev_ss_state = LWSSSCS_DISCONNECTED;
ret_state = h->ssi.state(ss_to_userobj(h), NULL,
r = h->ssi.state(ss_to_userobj(h), NULL,
LWSSSCS_DISCONNECTED, 0);
if (ret_state == LWSSSSRET_DESTROY_ME) {
if (r == LWSSSSRET_DESTROY_ME) {
h->cwsi = NULL;
lws_set_opaque_user_data(wsi, NULL);
lws_sspc_destroy(&h);
@ -657,6 +667,7 @@ lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
memcpy(p, ssi->streamtype, strlen(ssi->streamtype) + 1);
h->ssi.streamtype = (const char *)p;
h->context = context;
h->us_start_upstream = lws_now_usecs();
if (!ssi->manual_initial_tx_credit)
h->txc.peer_tx_cr_est = 500000000;

View file

@ -68,6 +68,7 @@ static const char *state_names[] = {
"LWSSSCS_SERVER_TXN",
"LWSSSCS_SERVER_UPGRADE",
"LWSSSCS_EVENT_WAIT_CANCELLED",
"LWSSSCS_UPSTREAM_LINK_RETRY",
};
/*
@ -289,7 +290,9 @@ int
lws_ss_check_next_state(lws_lifecycle_t *lc, uint8_t *prevstate,
lws_ss_constate_t cs)
{
if (cs >= LWSSSCS_USER_BASE || cs == LWSSSCS_EVENT_WAIT_CANCELLED)
if (cs >= LWSSSCS_USER_BASE ||
cs == LWSSSCS_EVENT_WAIT_CANCELLED ||
cs == LWSSSCS_UPSTREAM_LINK_RETRY)
/*
* we can't judge user or transient states, leave the old state
* and just wave them through
@ -336,7 +339,9 @@ int
lws_ss_check_next_state_ss(lws_ss_handle_t *ss, uint8_t *prevstate,
lws_ss_constate_t cs)
{
if (cs >= LWSSSCS_USER_BASE || cs == LWSSSCS_EVENT_WAIT_CANCELLED)
if (cs >= LWSSSCS_USER_BASE ||
cs == LWSSSCS_EVENT_WAIT_CANCELLED ||
cs == LWSSSCS_UPSTREAM_LINK_RETRY)
/*
* we can't judge user or transient states, leave the old state
* and just wave them through