1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

ss: improve callback return consistency

Formalize the LWSSSSRET_ enums into a type "lws_ss_state_return_t"
returned by the rx, tx and state callbacks, and some private helpers
lws_ss_backoff() and lws_ss_event_helper().

Remove LWSSSSRET_SS_HANDLE_DESTROYED concept... the two helpers that could
have destroyed the ss and returned that, now return LWSSSSRET_DESTROY_ME
to the caller to perform or pass up to their caller instead.

Handle helper returns in all the ss protocols and update the rx / tx
calls to have their returns from rx / tx / event helper and ss backoff
all handled by unified code.
This commit is contained in:
Andy Green 2020-08-26 11:05:41 +01:00
parent e3e177a7d6
commit 4ae3ef51c1
17 changed files with 283 additions and 290 deletions

View file

@ -87,7 +87,7 @@ lws_sspc_destroy(struct lws_sspc_handle **ppss);
* write on this stream, the *tx callback will occur with an empty buffer for
* the stream owner to fill in.
*/
LWS_VISIBLE LWS_EXTERN void
LWS_VISIBLE LWS_EXTERN lws_ss_state_return_t
lws_sspc_request_tx(struct lws_sspc_handle *pss);
/**
@ -107,7 +107,7 @@ lws_sspc_request_tx(struct lws_sspc_handle *pss);
* hint to its upstream proxy, where it's available for use to produce the
* internet-capable protocol framing.
*/
LWS_VISIBLE LWS_EXTERN void
LWS_VISIBLE LWS_EXTERN lws_ss_state_return_t
lws_sspc_request_tx_len(struct lws_sspc_handle *h, unsigned long len);
/**
@ -207,9 +207,12 @@ lws_sspc_to_user_object(struct lws_sspc_handle *h);
LWS_VISIBLE LWS_EXTERN void
lws_sspc_change_handlers(struct lws_sspc_handle *h,
int (*rx)(void *userobj, const uint8_t *buf, size_t len, int flags),
int (*tx)(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
size_t *len, int *flags),
int (*state)(void *userobj, void *h_src /* ss handle type */,
lws_ss_constate_t state, lws_ss_tx_ordinal_t ack));
lws_ss_state_return_t (*rx)(void *userobj, const uint8_t *buf,
size_t len, int flags),
lws_ss_state_return_t (*tx)(void *userobj, lws_ss_tx_ordinal_t ord,
uint8_t *buf, size_t *len, int *flags),
lws_ss_state_return_t (*state)(void *userobj, void *h_src
/* ss handle type */,
lws_ss_constate_t state,
lws_ss_tx_ordinal_t ack));

View file

@ -262,14 +262,13 @@ typedef enum {
* wants to do
*/
enum lws_ss_state_return_t {
typedef enum lws_ss_state_return {
LWSSSSRET_TX_DONT_SEND = 1, /* (*tx) only */
LWSSSSRET_OK = 0,
LWSSSSRET_DISCONNECT_ME = -1,
LWSSSSRET_DESTROY_ME = -2,
LWSSSSRET_SS_HANDLE_DESTROYED = -3,
};
LWSSSSRET_OK = 0, /* no error */
LWSSSSRET_DISCONNECT_ME = -1, /* caller should disconnect us */
LWSSSSRET_DESTROY_ME = -2, /* caller should destroy us */
} lws_ss_state_return_t;
/**
* lws_ss_info_t: information about stream to be created
@ -309,15 +308,15 @@ typedef struct lws_ss_info {
/**< offset of opaque user data ptr in user_alloc type, set to
offsetof(mytype, opaque_ud_member) */
int (*rx)(void *userobj, const uint8_t *buf, size_t len,
int flags);
lws_ss_state_return_t (*rx)(void *userobj, const uint8_t *buf,
size_t len, int flags);
/**< callback with rx payload for this stream */
int (*tx)(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
size_t *len, int *flags);
lws_ss_state_return_t (*tx)(void *userobj, lws_ss_tx_ordinal_t ord,
uint8_t *buf, size_t *len, int *flags);
/**< callback to send payload on this stream... 0 = send as set in
* len and flags, 1 = do not send anything (ie, not even 0 len frame) */
int (*state)(void *userobj, void *h_src /* ss handle type */,
lws_ss_constate_t state, lws_ss_tx_ordinal_t ack);
lws_ss_state_return_t (*state)(void *userobj, void *h_src /* ss handle type */,
lws_ss_constate_t state, lws_ss_tx_ordinal_t ack);
/**< advisory cb about state of stream and QoS status if applicable...
* h_src is only used with sinks and LWSSSCS_SINK_JOIN/_PART events.
* Return nonzero to indicate you want to destroy the stream. */
@ -392,8 +391,10 @@ lws_ss_destroy(struct lws_ss_handle **ppss);
* Schedules a write on the stream represented by \p pss. When it's possible to
* write on this stream, the *tx callback will occur with an empty buffer for
* the stream owner to fill in.
*
* Returns 0 or LWSSSSRET_SS_HANDLE_DESTROYED
*/
LWS_VISIBLE LWS_EXTERN void
LWS_VISIBLE LWS_EXTERN lws_ss_state_return_t
lws_ss_request_tx(struct lws_ss_handle *pss);
/**
@ -409,7 +410,7 @@ lws_ss_request_tx(struct lws_ss_handle *pss);
* This api variant should be used when it's possible the payload will go out
* over h1 with x-web-form-urlencoded or similar Content-Type.
*/
LWS_VISIBLE LWS_EXTERN void
LWS_VISIBLE LWS_EXTERN lws_ss_state_return_t
lws_ss_request_tx_len(struct lws_ss_handle *pss, unsigned long len);
/**

View file

@ -322,7 +322,7 @@ lws_process_ws_upgrade2(struct lws *wsi)
* want to treat subsequent payloads differently
*/
lws_ss_event_helper(wsi->a.vhost->ss_handle,
(void)lws_ss_event_helper(wsi->a.vhost->ss_handle,
LWSSSCS_SERVER_UPGRADE);
}
#endif

View file

@ -34,9 +34,24 @@ If the underlying protocol gives indications of transaction success, such as,
eg, a 200 for http, or an ACK from MQTT, the stream state is called back with
an `LWSSSCS_QOS_ACK_REMOTE` or `LWSSSCS_QOS_NACK_REMOTE`.
State callbacks and tx() can indicate they want to drop the connection
(`LWSSSRET_DISCONNECT_ME`) or destroy the whole logical Secure Stream
(`LWSSSRET_DESTROY_ME`).
## SS Callback return handling
SS state(), rx() and tx() can indicate with their return code some common
situations that should be handled by the caller.
Constant|Scope|Meaning
---|---|---
LWSSSSRET_TX_DONT_SEND|tx|This opportunity to send something was passed on
LWSSSSRET_OK|state, rx, tx|No error, continue doing what we're doing
LWSSSSRET_DISCONNECT_ME|state, rx|assertively disconnect from peer
LWSSSSRET_DESTROY_ME|state, rx|Caller should now destroy the stream itself
LWSSSSRET_SS_HANDLE_DESTROYED|state|Something handled a request to destroy the stream
Destruction of the stream we're calling back on inside the callback is tricky,
it's preferable to return `LWSSSSRET_DESTROY_ME` if it is required, and let the
caller handle it. But in some cases, helpers called from the callbacks may
destroy the handle themselves, in that case the handler should return
`LWSSSSRET_SS_HANDLE_DESTROYED` indicating that the handle is already destroyed.
## Secure Streams SERVER State lifecycle
@ -593,6 +608,9 @@ listens on a Unix Domain Socket and is connected to by one or more other
processes that pass their SS API activity to the proxy for fulfilment (or
onward proxying).
Each Secure Stream that is created then in turn creates a private Unix Domain
Socket connection to the proxy for each stream.
In this case the proxy uses secure-streams.c and policy.c as before to fulfil
the inbound proxy streams, but uses secure-streams-serialize.c to serialize and
deserialize the proxied SS API activity. The proxy clients define

View file

@ -175,7 +175,7 @@ lws_ss_policy_ref_trust_store(struct lws_context *context,
pol->trust.store->ssx509[0]->ca_der_len;
#endif
i.port = CONTEXT_PORT_NO_LISTEN;
lwsl_notice("%s: %s trust store initial '%s'\n", __func__,
lwsl_info("%s: %s trust store initial '%s'\n", __func__,
i.vhost_name, pol->trust.store->ssx509[0]->vhost_name);
v = lws_create_vhost(context, &i);

View file

@ -378,12 +378,16 @@ lws_ss_policy_set(struct lws_context *context, const char *name);
int
lws_ss_sys_fetch_policy(struct lws_context *context);
int
lws_ss_state_return_t
lws_ss_event_helper(lws_ss_handle_t *h, lws_ss_constate_t cs);
int
lws_ss_state_return_t
lws_ss_backoff(lws_ss_handle_t *h);
int
_lws_ss_handle_state_ret(lws_ss_state_return_t r, struct lws *wsi,
lws_ss_handle_t **ph);
int
lws_ss_set_timeout_us(lws_ss_handle_t *h, lws_usec_t us);

View file

@ -221,6 +221,7 @@ secstream_h1(struct lws *wsi, enum lws_callback_reasons reason, void *user,
*start = p,
#endif
*end = &buf[sizeof(buf) - 1];
lws_ss_state_return_t r;
int f = 0, m, status;
char conceal_eom = 0;
size_t buflen;
@ -234,15 +235,14 @@ secstream_h1(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lwsl_info("%s: h: %p, %s CLIENT_CONNECTION_ERROR: %s\n", __func__,
h, h->policy->streamtype, in ? (char *)in : "(null)");
/* already disconnected, no action for DISCONNECT_ME */
if (lws_ss_event_helper(h, LWSSSCS_UNREACHABLE))
/* h has been destroyed */
break;
r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
if (r)
return _lws_ss_handle_state_ret(r, wsi, &h);
h->wsi = NULL;
lwsl_debug("a4\n");
if (lws_ss_backoff(h))
/* was destroyed */
return -1;
r = lws_ss_backoff(h);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
break;
case LWS_CALLBACK_CLIENT_HTTP_REDIRECT:
@ -262,20 +262,22 @@ secstream_h1(struct lws *wsi, enum lws_callback_reasons reason, void *user,
__func__, h,
h->policy ? h->policy->streamtype : "no policy");
h->wsi = NULL;
//bad = status != 200;
//lws_cancel_service(lws_get_context(wsi)); /* abort poll wait */
if (h->policy && !(h->policy->flags & LWSSSPOLF_OPPORTUNISTIC) &&
#if defined(LWS_WITH_SERVER)
!(h->info.flags & LWSSSINFLAGS_ACCEPTED) && /* not server */
#endif
!h->txn_ok && !wsi->a.context->being_destroyed) {
if (lws_ss_backoff(h))
break;
r = lws_ss_backoff(h);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
break;
} else
h->seqstate = SSSEQ_IDLE;
/* already disconnected, no action for DISCONNECT_ME */
lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
/* may have been destroyed */
r = lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
break;
@ -308,9 +310,9 @@ secstream_h1(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->seqstate = SSSEQ_CONNECTED;
lws_sul_cancel(&h->sul);
if (lws_ss_event_helper(h, LWSSSCS_CONNECTED))
/* was destroyed */
return -1;
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
/*
* Since it's an http transaction we initiated... this is
@ -456,9 +458,11 @@ malformed:
h->policy->protocol == LWSSSP_H2) &&
h->being_serialized && (
!strcmp(h->policy->u.http.method, "PUT") ||
!strcmp(h->policy->u.http.method, "POST")))
if (lws_ss_event_helper(h, LWSSSCS_CONNECTED))
return LWSSSSRET_SS_HANDLE_DESTROYED;
!strcmp(h->policy->u.http.method, "POST"))) {
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r)
return _lws_ss_handle_state_ret(r, wsi, &h);
}
break;
@ -485,11 +489,9 @@ malformed:
// lwsl_notice("%s: HTTP_READ: client side sent len %d fl 0x%x\n",
// __func__, (int)len, (int)f);
m = h->info.rx(ss_to_userobj(h), (const uint8_t *)in, len, f);
if (m == LWSSSSRET_DESTROY_ME)
goto do_destroy_me;
if (m < 0)
return -1;
r = h->info.rx(ss_to_userobj(h), (const uint8_t *)in, len, f);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
return 0; /* don't passthru */
@ -517,16 +519,12 @@ malformed:
h->txn_ok = 1;
if (h->u.http.good_respcode) {
if (lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE))
/* was destroyed */
return -1;
} else
if (lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE))
/* was destroyed */
return -1;
r = lws_ss_event_helper(h, h->u.http.good_respcode ?
LWSSSCS_QOS_ACK_REMOTE :
LWSSSCS_QOS_NACK_REMOTE);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
//bad = status != 200;
lws_cancel_service(lws_get_context(wsi)); /* abort poll wait */
break;
@ -598,25 +596,11 @@ malformed:
#else
buflen = lws_ptr_diff(end, p);
#endif
switch(h->info.tx(ss_to_userobj(h), h->txord++, p, &buflen, &f)) {
case LWSSSSRET_DISCONNECT_ME:
lwsl_debug("%s: tx handler asked to close conn\n", __func__);
return -1; /* close connection */
case LWSSSSRET_DESTROY_ME:
do_destroy_me:
lws_set_opaque_user_data(wsi, NULL);
h->wsi = NULL;
lws_ss_destroy(&h);
return -1; /* close connection */
case LWSSSSRET_TX_DONT_SEND:
/* don't want to send anything */
lwsl_debug("%s: dont want to write\n", __func__);
r = h->info.tx(ss_to_userobj(h), h->txord++, p, &buflen, &f);
if (r == LWSSSSRET_TX_DONT_SEND)
return 0;
default:
break;
}
if (r < 0)
return _lws_ss_handle_state_ret(r, wsi, &h);
// lwsl_notice("%s: WRITEABLE: user tx says len %d fl 0x%x\n",
// __func__, (int)buflen, (int)f);
@ -717,10 +701,9 @@ do_destroy_me:
}
}
if (lws_ss_event_helper(h, LWSSSCS_SERVER_TXN))
/* was destroyed */
return -1;
r = lws_ss_event_helper(h, LWSSSCS_SERVER_TXN);
if (r)
return _lws_ss_handle_state_ret(r, wsi, &h);
return 0;
#endif

View file

@ -33,6 +33,7 @@ secstream_h2(struct lws *wsi, enum lws_callback_reasons reason, void *user,
void *in, size_t len)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
lws_ss_state_return_t r;
int n;
switch (reason) {
@ -63,18 +64,13 @@ secstream_h2(struct lws *wsi, enum lws_callback_reasons reason, void *user,
case LWS_CALLBACK_COMPLETED_CLIENT_HTTP:
// lwsl_err("%s: h2 COMPLETED_CLIENT_HTTP\n", __func__);
n = h->info.rx(ss_to_userobj(h), NULL, 0, LWSSS_FLAG_EOM);
r = h->info.rx(ss_to_userobj(h), NULL, 0, LWSSS_FLAG_EOM);
/* decouple the fates of the wsi and the ss */
h->wsi = NULL;
h->txn_ok = 1;
//bad = status != 200;
lws_cancel_service(lws_get_context(wsi)); /* abort poll wait */
if (n == LWSSSSRET_DESTROY_ME) {
lws_set_opaque_user_data(wsi, NULL);
lwsl_info("%s: destroying ss handle\n", __func__);
lws_ss_destroy(&h);
return -1;
}
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
return 0;
case LWS_CALLBACK_WSI_TX_CREDIT_GET:

View file

@ -31,6 +31,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
lws_mqtt_publish_param_t mqpp, *pmqpp;
uint8_t buf[LWS_PRE + 1400];
lws_ss_state_return_t r;
size_t buflen;
int f = 0;
@ -42,22 +43,28 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
in ? (char *)in : "(null)");
if (!h)
break;
lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
h->wsi = NULL;
if (h->u.mqtt.heap_baggage) {
lws_free(h->u.mqtt.heap_baggage);
h->u.mqtt.heap_baggage = NULL;
}
lws_ss_backoff(h);
/* may have been destroyed */
if (r == LWSSSSRET_DESTROY_ME)
return _lws_ss_handle_state_ret(r, wsi, &h);
r = lws_ss_backoff(h);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
break;
case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
if (!h)
break;
lws_sul_cancel(&h->sul_timeout);
f = lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
r= lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
if (h->wsi)
lws_set_opaque_user_data(h->wsi, NULL);
h->wsi = NULL;
@ -67,16 +74,15 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->u.mqtt.heap_baggage = NULL;
}
if (f) {
lws_ss_destroy(&h);
break;
}
if (r)
return _lws_ss_handle_state_ret(r, wsi, &h);
if (h->policy && !(h->policy->flags & LWSSSPOLF_OPPORTUNISTIC) &&
!h->txn_ok && !wsi->a.context->being_destroyed)
if (lws_ss_backoff(h))
/* has been destroyed */
return -1;
!h->txn_ok && !wsi->a.context->being_destroyed) {
r = lws_ss_backoff(h);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
}
break;
case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
@ -88,7 +94,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->retry = 0;
h->seqstate = SSSEQ_CONNECTED;
lws_sul_cancel(&h->sul);
lws_ss_event_helper(h, LWSSSCS_CONNECTED);
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
if (h->policy->u.mqtt.topic)
lws_callback_on_writable(wsi);
break;
@ -108,9 +116,10 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->subseq = 1;
if (h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload,
len, f) < 0)
return -1;
r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload,
len, f);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
return 0; /* don't passthru */
@ -121,7 +130,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
case LWS_CALLBACK_MQTT_ACK:
lws_sul_cancel(&h->sul_timeout);
lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
break;
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
@ -163,25 +174,13 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
buflen = sizeof(buf) - LWS_PRE;
switch(h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
&buflen, &f)) {
case LWSSSSRET_DISCONNECT_ME:
lwsl_debug("%s: tx handler asked to close conn\n", __func__);
return -1; /* close connection */
case LWSSSSRET_DESTROY_ME:
lws_set_opaque_user_data(wsi, NULL);
h->wsi = NULL;
lws_ss_destroy(&h);
return -1; /* close connection */
case LWSSSSRET_TX_DONT_SEND:
/* don't want to send anything */
lwsl_debug("%s: dont want to write\n", __func__);
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
&buflen, &f);
if (r == LWSSSSRET_TX_DONT_SEND)
return 0;
default:
break;
}
if (r < 0)
return _lws_ss_handle_state_ret(r, wsi, &h);
memset(&mqpp, 0, sizeof(mqpp));
/* this is the string-substituted h->policy->u.mqtt.topic */

View file

@ -33,6 +33,7 @@ secstream_raw(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
uint8_t buf[LWS_PRE + 1520], *p = &buf[LWS_PRE],
*end = &buf[sizeof(buf) - 1];
lws_ss_state_return_t r;
size_t buflen;
int f = 0;
@ -43,10 +44,13 @@ secstream_raw(struct lws *wsi, enum lws_callback_reasons reason, void *user,
assert(h->policy);
lwsl_info("%s: h: %p, %s CLIENT_CONNECTION_ERROR: %s\n", __func__,
h, h->policy->streamtype, in ? (char *)in : "(null)");
lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
if (r == LWSSSSRET_DESTROY_ME)
return _lws_ss_handle_state_ret(r, wsi, &h);
h->wsi = NULL;
lws_ss_backoff(h);
/* may have been destroyed */
r = lws_ss_backoff(h);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
break;
case LWS_CALLBACK_RAW_CLOSE:
@ -61,12 +65,16 @@ secstream_raw(struct lws *wsi, enum lws_callback_reasons reason, void *user,
#if defined(LWS_WITH_SERVER)
!(h->info.flags & LWSSSINFLAGS_ACCEPTED) && /* not server */
#endif
!h->txn_ok && !wsi->a.context->being_destroyed)
if (lws_ss_backoff(h))
/* has been destroyed */
break;
!h->txn_ok && !wsi->a.context->being_destroyed) {
r = lws_ss_backoff(h);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
break;
}
/* wsi is going down anyway */
lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
r = lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
if (r == LWSSSSRET_DESTROY_ME)
return _lws_ss_handle_state_ret(r, wsi, &h);
break;
case LWS_CALLBACK_RAW_CONNECTED:
@ -75,7 +83,9 @@ secstream_raw(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->retry = 0;
h->seqstate = SSSEQ_CONNECTED;
lws_sul_cancel(&h->sul);
lws_ss_event_helper(h, LWSSSCS_CONNECTED);
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
lws_validity_confirmed(wsi);
break;
@ -89,8 +99,9 @@ secstream_raw(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (!h || !h->info.rx)
return 0;
if (h->info.rx(ss_to_userobj(h), (const uint8_t *)in, len, 0) < 0)
return -1;
r = h->info.rx(ss_to_userobj(h), (const uint8_t *)in, len, 0);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
return 0; /* don't passthru */
@ -100,24 +111,11 @@ secstream_raw(struct lws *wsi, enum lws_callback_reasons reason, void *user,
return 0;
buflen = lws_ptr_diff(end, p);
switch(h->info.tx(ss_to_userobj(h), h->txord++, p, &buflen, &f)) {
case LWSSSSRET_DISCONNECT_ME:
lwsl_debug("%s: tx handler asked to close conn\n", __func__);
return -1; /* close connection */
case LWSSSSRET_DESTROY_ME:
lws_set_opaque_user_data(wsi, NULL);
h->wsi = NULL;
lws_ss_destroy(&h);
return -1; /* close connection */
case LWSSSSRET_TX_DONT_SEND:
/* don't want to send anything */
lwsl_debug("%s: dont want to write\n", __func__);
r = h->info.tx(ss_to_userobj(h), h->txord++, p, &buflen, &f);
if (r == LWSSSSRET_TX_DONT_SEND)
return 0;
default:
break;
}
if (r < 0)
return _lws_ss_handle_state_ret(r, wsi, &h);
/*
* flags are ignored with raw, there are no protocol payload

View file

@ -30,6 +30,7 @@ secstream_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user,
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
uint8_t buf[LWS_PRE + 1400];
lws_ss_state_return_t r;
int f = 0, f1, n;
size_t buflen;
@ -41,13 +42,14 @@ secstream_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user,
in ? (char *)in : "(null)");
if (!h)
break;
if (lws_ss_event_helper(h, LWSSSCS_UNREACHABLE))
/* h has been destroyed */
break;
r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
if (r == LWSSSSRET_DESTROY_ME)
return _lws_ss_handle_state_ret(r, wsi, &h);
h->wsi = NULL;
lws_ss_backoff(h);
/* may have been destroyed */
r = lws_ss_backoff(h);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
break;
case LWS_CALLBACK_CLOSED: /* server */
@ -55,9 +57,10 @@ secstream_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (!h)
break;
lws_sul_cancel(&h->sul_timeout);
if (lws_ss_event_helper(h, LWSSSCS_DISCONNECTED))
/* has been destroyed */
break;
r = lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
if (r == LWSSSSRET_DESTROY_ME)
return _lws_ss_handle_state_ret(r, wsi, &h);
if (h->wsi)
lws_set_opaque_user_data(h->wsi, NULL);
h->wsi = NULL;
@ -68,10 +71,13 @@ secstream_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user,
#if defined(LWS_WITH_SERVER)
!(h->info.flags & LWSSSINFLAGS_ACCEPTED) && /* not server */
#endif
!h->txn_ok && !wsi->a.context->being_destroyed)
lws_ss_backoff(h);
!h->txn_ok && !wsi->a.context->being_destroyed) {
r = lws_ss_backoff(h);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
break;
}
}
/* may have been destroyed */
break;
case LWS_CALLBACK_ESTABLISHED:
@ -79,8 +85,9 @@ secstream_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->retry = 0;
h->seqstate = SSSEQ_CONNECTED;
lws_sul_cancel(&h->sul);
if (lws_ss_event_helper(h, LWSSSCS_CONNECTED))
return -1;
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
break;
case LWS_CALLBACK_RECEIVE:
@ -96,8 +103,9 @@ secstream_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->subseq = 1;
if (h->info.rx(ss_to_userobj(h), (const uint8_t *)in, len, f) < 0)
return -1;
r = h->info.rx(ss_to_userobj(h), (const uint8_t *)in, len, f);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
return 0; /* don't passthru */
@ -113,27 +121,12 @@ secstream_ws(struct lws *wsi, enum lws_callback_reasons reason, void *user,
}
buflen = sizeof(buf) - LWS_PRE;
n = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
&buflen, &f);
switch (n) {
case LWSSSSRET_DISCONNECT_ME:
lwsl_debug("%s: tx handler asked to close conn\n", __func__);
return -1; /* close connection */
case LWSSSSRET_DESTROY_ME:
lws_set_opaque_user_data(wsi, NULL);
h->wsi = NULL;
lws_ss_destroy(&h);
return -1; /* close connection */
case LWSSSSRET_TX_DONT_SEND:
/* don't want to send anything */
lwsl_debug("%s: dont want to write\n", __func__);
if (r == LWSSSSRET_TX_DONT_SEND)
return 0;
default:
break;
}
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret(r, wsi, &h);
f1 = lws_write_ws_flags(h->policy->u.http.u.ws.binary ?
LWS_WRITE_BINARY : LWS_WRITE_TEXT,

View file

@ -535,11 +535,11 @@ lws_sspc_destroy(lws_sspc_handle_t **ph)
free(h);
}
void
lws_ss_state_return_t
lws_sspc_request_tx(lws_sspc_handle_t *h)
{
if (!h || !h->cwsi)
return;
return LWSSSSRET_OK;
if (!h->us_earliest_write_req)
h->us_earliest_write_req = lws_now_usecs();
@ -549,6 +549,8 @@ lws_sspc_request_tx(lws_sspc_handle_t *h)
h->conn_req_state = LWSSSPC_ONW_REQ;
lws_callback_on_writable(h->cwsi);
return LWSSSSRET_OK;
}
/*
@ -567,7 +569,7 @@ lws_sspc_request_tx(lws_sspc_handle_t *h)
* length at this point.
*/
void
lws_ss_state_return_t
lws_sspc_request_tx_len(lws_sspc_handle_t *h, unsigned long len)
{
/*
@ -578,7 +580,7 @@ lws_sspc_request_tx_len(lws_sspc_handle_t *h, unsigned long len)
*/
if (!h)
return;
return LWSSSSRET_OK;
lwsl_notice("%s: setting h %p writeable_len %u\n", __func__, h,
(unsigned int)len);
@ -598,6 +600,8 @@ lws_sspc_request_tx_len(lws_sspc_handle_t *h, unsigned long len)
*/
lws_callback_on_writable(h->cwsi);
return LWSSSSRET_OK;
}
int
@ -757,10 +761,10 @@ lws_sspc_to_user_object(struct lws_sspc_handle *h)
void
lws_sspc_change_handlers(struct lws_sspc_handle *h,
int (*rx)(void *userobj, const uint8_t *buf, size_t len, int flags),
int (*tx)(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
lws_ss_state_return_t (*rx)(void *userobj, const uint8_t *buf, size_t len, int flags),
lws_ss_state_return_t (*tx)(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
size_t *len, int *flags),
int (*state)(void *userobj, void *h_src /* ss handle type */,
lws_ss_state_return_t (*state)(void *userobj, void *h_src /* ss handle type */,
lws_ss_constate_t state, lws_ss_tx_ordinal_t ack))
{
if (rx)

View file

@ -84,7 +84,7 @@ typedef struct ss_proxy_onward {
/* secure streams payload interface */
static int
static lws_ss_state_return_t
ss_proxy_onward_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
{
ss_proxy_t *m = (ss_proxy_t *)userobj;
@ -116,7 +116,7 @@ ss_proxy_onward_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
* we are transmitting buffered payload originally from the client on to the ss
*/
static int
static lws_ss_state_return_t
ss_proxy_onward_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
size_t *len, int *flags)
{
@ -161,7 +161,7 @@ ss_proxy_onward_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
return 0;
}
static int
static lws_ss_state_return_t
ss_proxy_onward_state(void *userobj, void *sh,
lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
{

View file

@ -77,13 +77,13 @@ lws_ss_state_name(int state)
return state_names[state];
}
int
lws_ss_state_return_t
lws_ss_event_helper(lws_ss_handle_t *h, lws_ss_constate_t cs)
{
int n;
lws_ss_state_return_t r;
if (!h)
return 0;
return LWSSSSRET_OK;
#if defined(LWS_WITH_SEQUENCER)
/*
@ -95,48 +95,31 @@ lws_ss_event_helper(lws_ss_handle_t *h, lws_ss_constate_t cs)
(void *)h, NULL);
#endif
#if 0
if (h->h_sink && h->h_sink->info.state) {
n = h->h_sink->info.state(h->sink_obj, h->h_sink, cs, 0);
if (n) {
lws_set_timeout(h->wsi, 1, LWS_TO_KILL_ASYNC);
h->wsi = NULL; /* stop destroy trying to repeat this */
if (n == LWSSSSRET_DESTROY_ME) {
lwsl_info("%s: DESTROYING ss %p\n", __func__, h);
lws_ss_destroy(&h);
return 1;
}
}
}
#endif
if (h->info.state) {
n = h->info.state(ss_to_userobj(h), NULL, cs, 0);
r = h->info.state(ss_to_userobj(h), NULL, cs, 0);
#if defined(LWS_WITH_SERVER)
if ((h->info.flags & LWSSSINFLAGS_ACCEPTED) &&
cs == LWSSSCS_DISCONNECTED)
n = LWSSSSRET_DESTROY_ME;
r = LWSSSSRET_DESTROY_ME;
#endif
if (n) {
if (cs == LWSSSCS_CREATING)
/* just let caller handle it */
return 1;
if (h->wsi)
lws_set_timeout(h->wsi, 1, LWS_TO_KILL_ASYNC);
if (n == LWSSSSRET_DESTROY_ME) {
lwsl_info("%s: DESTROYING ss %p\n", __func__, h);
/* disconnect ss from the wsi */
if (h->wsi)
lws_set_opaque_user_data(h->wsi, NULL);
h->wsi = NULL; /* stop destroy trying to repeat this */
lws_ss_destroy(&h);
return 1;
}
h->wsi = NULL; /* stop destroy trying to repeat this */
}
return r;
}
return 0;
return LWSSSSRET_OK;
}
int
_lws_ss_handle_state_ret(lws_ss_state_return_t r, struct lws *wsi,
lws_ss_handle_t **ph)
{
if (r == LWSSSSRET_DESTROY_ME) {
if (wsi)
lws_set_opaque_user_data(wsi, NULL);
(*ph)->wsi = NULL;
lws_ss_destroy(ph);
}
return -1; /* close connection */
}
static void
@ -202,14 +185,14 @@ lws_ss_set_timeout_us(lws_ss_handle_t *h, lws_usec_t us)
return 0;
}
int
lws_ss_state_return_t
lws_ss_backoff(lws_ss_handle_t *h)
{
uint64_t ms;
char conceal;
if (h->seqstate == SSSEQ_RECONNECT_WAIT)
return 0;
return LWSSSSRET_OK;
/* figure out what we should do about another retry */
@ -219,10 +202,8 @@ lws_ss_backoff(lws_ss_handle_t *h)
if (!conceal) {
lwsl_info("%s: ss %p: abandon conn attempt \n",__func__, h);
h->seqstate = SSSEQ_IDLE;
if (lws_ss_event_helper(h, LWSSSCS_ALL_RETRIES_FAILED))
lwsl_notice("%s: was desroyed on ARF event\n", __func__);
/* may have been destroyed */
return 1;
return lws_ss_event_helper(h, LWSSSCS_ALL_RETRIES_FAILED);
}
h->seqstate = SSSEQ_RECONNECT_WAIT;
@ -230,7 +211,7 @@ lws_ss_backoff(lws_ss_handle_t *h)
lwsl_info("%s: ss %p: retry wait %"PRIu64"ms\n", __func__, h, ms);
return 0;
return LWSSSSRET_OK;
}
#if defined(LWS_WITH_SYS_SMD)
@ -317,6 +298,7 @@ _lws_ss_client_connect(lws_ss_handle_t *h, int is_retry)
size_t used_in, used_out;
union lws_ss_contemp ct;
char path[1024], ep[96];
lws_ss_state_return_t r;
int port, _port, tls;
lws_strexp_t exp;
@ -339,10 +321,8 @@ _lws_ss_client_connect(lws_ss_handle_t *h, int is_retry)
#if defined(LWS_WITH_SYS_SMD)
if (h->policy == &pol_smd) {
if (h->u.smd.smd_peer) {
// lwsl_notice("%s: peer already set\n", __func__);
if (h->u.smd.smd_peer)
return 0;
}
// lwsl_notice("%s: received connect for _lws_smd, registering for class mask 0x%x\n",
// __func__, h->info.manual_initial_tx_credit);
@ -477,17 +457,18 @@ _lws_ss_client_connect(lws_ss_handle_t *h, int is_retry)
i.alpn, i.address, i.path);
h->txn_ok = 0;
if (lws_ss_event_helper(h, LWSSSCS_CONNECTING))
return LWSSSSRET_SS_HANDLE_DESTROYED;
r = lws_ss_event_helper(h, LWSSSCS_CONNECTING);
if (r)
return r;
if (!lws_client_connect_via_info(&i)) {
if (lws_ss_event_helper(h, LWSSSCS_UNREACHABLE)) {
/* was destroyed */
lwsl_err("%s: client connect UNREACHABLE destroyed the ss\n", __func__);
return LWSSSSRET_SS_HANDLE_DESTROYED;
}
if (lws_ss_backoff(h))
return LWSSSSRET_SS_HANDLE_DESTROYED;
r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
if (r)
return r;
r = lws_ss_backoff(h);
if (r)
return r;
return 1;
}
@ -657,6 +638,7 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
*/
if (!(ssi->flags & LWSSSINFLAGS_PROXIED) &&
pol == &pol_smd) {
lws_ss_state_return_t r;
/*
* So he has asked to be wired up to SMD over a SS link.
* Register him as an smd participant in his own right.
@ -671,10 +653,12 @@ lws_ss_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
if (!h->u.smd.smd_peer)
goto late_bail;
lwsl_info("%s: registered SS SMD\n", __func__);
if (lws_ss_event_helper(h, LWSSSCS_CONNECTING))
return -1;
if (lws_ss_event_helper(h, LWSSSCS_CONNECTED))
return -1;
r = lws_ss_event_helper(h, LWSSSCS_CONNECTING);
if (r)
return r;
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r)
return r;
}
#endif
@ -842,7 +826,7 @@ lws_ss_destroy(lws_ss_handle_t **ppss)
lws_dll2_remove(&h->to_list);
lws_sul_cancel(&h->sul_timeout);
lws_ss_event_helper(h, LWSSSCS_DESTROYING);
(void)lws_ss_event_helper(h, LWSSSCS_DESTROYING);
lws_pt_unlock(pt);
/* in proxy case, metadata value on heap may need cleaning up */
@ -902,17 +886,17 @@ lws_ss_server_ack(struct lws_ss_handle *h, int nack)
h->txn_resp_set = 1;
}
void
lws_ss_state_return_t
lws_ss_request_tx(lws_ss_handle_t *h)
{
int n;
lws_ss_state_return_t r;
// lwsl_notice("%s: h %p, wsi %p\n", __func__, h, h->wsi);
if (h->wsi) {
lws_callback_on_writable(h->wsi);
return;
return LWSSSSRET_OK;
}
/*
@ -931,30 +915,34 @@ lws_ss_request_tx(lws_ss_handle_t *h)
lws_sul_schedule(h->context, 0, &h->u.smd.sul_write,
lws_ss_smd_tx_cb, 1);
return;
return LWSSSSRET_OK;
}
#endif
if (h->seqstate != SSSEQ_IDLE &&
h->seqstate != SSSEQ_DO_RETRY)
return;
return LWSSSSRET_OK;
h->seqstate = SSSEQ_TRY_CONNECT;
lws_ss_event_helper(h, LWSSSCS_POLL);
r = lws_ss_event_helper(h, LWSSSCS_POLL);
if (r)
return r;
/*
* Retries operate via lws_ss_request_tx(), explicitly ask for a
* reconnection to clear the retry limit
*/
n = _lws_ss_client_connect(h, 1);
if (n == LWSSSSRET_SS_HANDLE_DESTROYED)
return;
r = _lws_ss_client_connect(h, 1);
if (r == LWSSSSRET_DESTROY_ME)
return r;
if (n)
lws_ss_backoff(h);
if (r)
return lws_ss_backoff(h);
return LWSSSSRET_OK;
}
void
lws_ss_state_return_t
lws_ss_request_tx_len(lws_ss_handle_t *h, unsigned long len)
{
if (h->wsi &&
@ -965,7 +953,7 @@ lws_ss_request_tx_len(lws_ss_handle_t *h, unsigned long len)
else
h->writeable_len = len;
lws_ss_request_tx(h);
return lws_ss_request_tx(h);
}
/*
@ -1039,12 +1027,16 @@ static void
lws_ss_to_cb(lws_sorted_usec_list_t *sul)
{
lws_ss_handle_t *h = lws_container_of(sul, lws_ss_handle_t, sul_timeout);
lws_ss_state_return_t r;
lwsl_info("%s: ss %p timeout fired\n", __func__, h);
lws_ss_event_helper(h, LWSSSCS_TIMEOUT);
/* may have been destroyed */
r = lws_ss_event_helper(h, LWSSSCS_TIMEOUT);
if (r == LWSSSSRET_DESTROY_ME) {
if (h->wsi)
lws_set_timeout(h->wsi, 1, LWS_TO_KILL_ASYNC);
_lws_ss_handle_state_ret(r, NULL, &h);
}
}
void
@ -1066,11 +1058,13 @@ lws_ss_cancel_timeout(struct lws_ss_handle *h)
void
lws_ss_change_handlers(struct lws_ss_handle *h,
int (*rx)(void *userobj, const uint8_t *buf, size_t len, int flags),
int (*tx)(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
size_t *len, int *flags),
int (*state)(void *userobj, void *h_src /* ss handle type */,
lws_ss_constate_t state, lws_ss_tx_ordinal_t ack))
lws_ss_state_return_t (*rx)(void *userobj, const uint8_t *buf,
size_t len, int flags),
lws_ss_state_return_t (*tx)(void *userobj, lws_ss_tx_ordinal_t ord,
uint8_t *buf, size_t *len, int *flags),
lws_ss_state_return_t (*state)(void *userobj, void *h_src /* ss handle type */,
lws_ss_constate_t state,
lws_ss_tx_ordinal_t ack))
{
if (rx)
h->info.rx = rx;

View file

@ -110,7 +110,7 @@ auth_api_amazon_com_parser_cb(struct lejp_ctx *ctx, char reason)
/* secure streams payload interface */
static int
static lws_ss_state_return_t
ss_api_amazon_auth_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
{
ss_api_amazon_auth_t *m = (ss_api_amazon_auth_t *)userobj;
@ -159,7 +159,7 @@ ss_api_amazon_auth_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
return 0;
}
static int
static lws_ss_state_return_t
ss_api_amazon_auth_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
size_t *len, int *flags)
{
@ -197,7 +197,7 @@ ss_api_amazon_auth_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
return 0;
}
static int
static lws_ss_state_return_t
ss_api_amazon_auth_state(void *userobj, void *sh, lws_ss_constate_t state,
lws_ss_tx_ordinal_t ack)
{

View file

@ -34,7 +34,7 @@ typedef struct ss_cpd {
lws_sorted_usec_list_t sul;
} ss_cpd_t;
static int
static lws_ss_state_return_t
ss_cpd_state(void *userobj, void *sh, lws_ss_constate_t state,
lws_ss_tx_ordinal_t ack)
{

View file

@ -38,7 +38,7 @@ typedef struct ss_fetch_policy {
/* secure streams payload interface */
static int
static lws_ss_state_return_t
ss_fetch_policy_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
{
ss_fetch_policy_t *m = (ss_fetch_policy_t *)userobj;
@ -46,12 +46,12 @@ ss_fetch_policy_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
if (flags & LWSSS_FLAG_SOM) {
if (lws_ss_policy_parse_begin(context, 0))
return 1;
return LWSSSSRET_OK;
m->partway = 1;
}
if (len && lws_ss_policy_parse(context, buf, len) < 0)
return 1;
return LWSSSSRET_OK;
if (flags & LWSSS_FLAG_EOM)
m->partway = 2;
@ -59,7 +59,7 @@ ss_fetch_policy_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
return 0;
}
static int
static lws_ss_state_return_t
ss_fetch_policy_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
size_t *len, int *flags)
{
@ -90,7 +90,7 @@ policy_set(lws_sorted_usec_list_t *sul)
}
}
static int
static lws_ss_state_return_t
ss_fetch_policy_state(void *userobj, void *sh, lws_ss_constate_t state,
lws_ss_tx_ordinal_t ack)
{