1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

lws_retry: udp support

This commit is contained in:
Andy Green 2019-09-08 08:08:55 +01:00
parent bb7f96d32b
commit 04f99f1499
16 changed files with 264 additions and 29 deletions

View file

@ -16,6 +16,16 @@ various scenarios, CC0-licensed (public domain) for cut-and-paste, allow you to
News
----
## UDP integration with `lws_retry`
UDP support in lws has new helper that allow `lws_retry` to be applied for retry,
and the ability to synthesize rx and tx udp packetloss systemwide to confirm
retry strategies. Since multiple transactions may be in flight on one UDP
socket, the support relies on an `lws_sul` in the transaction object to manage
the transaction retries individually.
See `READMEs/README.udp.md` for details.
## `lws_system`: system state and notification handlers
Lws now has the concept of systemwide state held in the context... this is to

50
READMEs/README.udp.md Normal file
View file

@ -0,0 +1,50 @@
## Using UDP in lws
UDP is supported in lws... the quickest way is to use the api
`lws_create_adopt_udp()` which returns a wsi bound to the provided
vhost, protocol, `lws_retry` struct, dns address and port.
The wsi can be treated normally and `lws_write()` used to write on
it.
## Implementing UDP retries
Retries are important in udp but there's no standardized ack method
unlike tcp. Lws allows you to bind an `lws_retry` struct describing
the policy to the udp wsi, but since one UDP socket may have many
transactions in flight, the `lws_sul` and `uint16_t` to count the
retries must live in the user's transaction object like this
```
...
lws_sorted_usec_list_t sul;
uint16_t retry;
...
```
in the `LWS_CALLBACK_RAW_WRITEABLE` callback, before doing the write,
set up the retry like this
```
if (lws_dll2_is_detached(&transaction->sul_write.list) &&
lws_retry_sul_schedule_retry_wsi(wsi, &transaction->sul_write,
transaction_retry_write_cb,
&transaction->retry_count_write)) {
/* we have reached the end of our concealed retries */
lwsl_warn("%s: concealed retries done, failing\n", __func__);
goto retry_conn;
}
```
This manages the retry counter in the transaction object, guards against it wrapping,
selects the timeout using the policy bound to the wsi, and sets the `lws_sul` in the
transaction object to call the given callback if the sul time expires.
In the callback, it should simply call `lws_callback_on_writable()` for the udp wsi.
## Simulating packetloss
lws now allows you to set the amount of simulated packetloss on udp rx and tx in
the context creation info struct, using `.udp_loss_sim_tx_pc` and `.udp_loss_sim_rx_pc`,
the values are percentages between 0 and 100. 0, the default, means no packetloss.

View file

@ -81,11 +81,11 @@ typedef union {
#if !defined(LWS_PLAT_FREERTOS) && !defined(LWS_PLAT_OPTEE)
struct lws_udp {
struct sockaddr sa;
socklen_t salen;
struct sockaddr sa;
socklen_t salen;
struct sockaddr sa_pending;
socklen_t salen_pending;
struct sockaddr sa_pending;
socklen_t salen_pending;
};
#endif
@ -177,6 +177,7 @@ lws_adopt_socket_vhost_readbuf(struct lws_vhost *vhost,
* \param flags: 0 or LWS_CAUDP_NO_BIND
* \param protocol_name: Name of protocol on vhost to bind wsi to
* \param parent_wsi: NULL or parent wsi new wsi will be a child of
* \param retry_policy: NULL for vhost default policy else wsi specific policy
*
* Either returns new wsi bound to accept_fd, or closes accept_fd and
* returns NULL, having cleaned up any new wsi pieces.
@ -184,5 +185,5 @@ lws_adopt_socket_vhost_readbuf(struct lws_vhost *vhost,
LWS_VISIBLE LWS_EXTERN struct lws *
lws_create_adopt_udp(struct lws_vhost *vhost, const char *ads, int port,
int flags, const char *protocol_name,
struct lws *parent_wsi);
struct lws *parent_wsi, const lws_retry_bo_t *retry_policy);
///@}

View file

@ -698,6 +698,12 @@ struct lws_context_creation_info {
/**< CONTEXT: NULL, or pointer to an array of notifiers that should
* be registered during context creation, so they can see state change
* events from very early on. The array should end with a NULL. */
uint8_t udp_loss_sim_tx_pc;
/**< CONTEXT: percentage of udp writes we could have performed
* to instead not do, in order to simulate and test udp retry flow */
uint8_t udp_loss_sim_rx_pc;
/**< CONTEXT: percentage of udp reads we actually received
* to make disappear, in order to simulate and test udp retry flow */
/* Add new things just above here ---^
* This is part of the ABI, don't needlessly break compatibility

View file

@ -31,6 +31,8 @@ typedef struct lws_retry_bo {
uint8_t jitter_percent; /* % additional random jitter */
} lws_retry_bo_t;
#define LWS_RETRY_CONCEAL_ALWAYS (0xffff)
/**
* lws_retry_get_delay_ms() - get next delay from backoff table
*
@ -65,9 +67,29 @@ lws_retry_get_delay_ms(struct lws_context *context, const lws_retry_bo_t *retry,
*
* Helper that combines interpreting the retry table with scheduling a sul to
* the computed delay. If conceal is not set, it will not schedule the sul
* and return 1. Otherwise the sul is scheduled and it returns 0.
* and just return 1. Otherwise the sul is scheduled and it returns 0.
*/
LWS_VISIBLE LWS_EXTERN int
lws_retry_sul_schedule(struct lws_context *context, int tid,
lws_sorted_usec_list_t *sul, const lws_retry_bo_t *retry,
sul_cb_t cb, uint16_t *ctry);
/**
* lws_retry_sul_schedule_retry_wsi() - retry sul schedule helper using wsi
*
* \param wsi: the wsi to set the hrtimer sul on to the next retry interval
* \param sul: pointer to the sul to schedule
* \param cb: the callback for when the sul schedule time arrives
* \param ctry: pointer to the try counter
*
* Helper that uses context, tid and retry policy from a wsi to call
* lws_retry_sul_schedule.
*
* Since a udp connection can have many writes in flight, the retry count and
* the sul used to track each thing that wants to be written have to be handled
* individually, not the wsi. But the retry policy and the other things can
* be filled in from the wsi conveniently.
*/
LWS_VISIBLE LWS_EXTERN int
lws_retry_sul_schedule_retry_wsi(struct lws *wsi, lws_sorted_usec_list_t *sul,
sul_cb_t cb, uint16_t *ctry);

View file

@ -31,7 +31,8 @@ lws_get_idlest_tsi(struct lws_context *context)
int n = 0, hit = -1;
for (; n < context->count_threads; n++) {
lwsl_notice("%s: %d %d\n", __func__, context->pt[n].fds_count, context->fd_limit_per_thread - 1);
lwsl_debug("%s: %d %d\n", __func__, context->pt[n].fds_count,
context->fd_limit_per_thread - 1);
if ((unsigned int)context->pt[n].fds_count !=
context->fd_limit_per_thread - 1 &&
(unsigned int)context->pt[n].fds_count < lowest) {
@ -164,6 +165,14 @@ lws_adopt_descriptor_vhost1(struct lws_vhost *vh, lws_adoption_type type,
goto bail;
}
/*
* he's an allocated wsi, but he's not on any fds list or child list,
* join him to the vhost's list of these kinds of incomplete wsi until
* he gets another identity (he may do async dns now...)
*/
lws_dll2_add_head(&new_wsi->vh_awaiting_socket,
&new_wsi->vhost->vh_awaiting_socket_owner);
return new_wsi;
bail:
@ -256,6 +265,9 @@ lws_adopt_descriptor_vhost2(struct lws *new_wsi, lws_adoption_type type,
}
#endif
/* he has fds visibility now, remove from vhost orphan list */
lws_dll2_remove(&new_wsi->vh_awaiting_socket);
/*
* by deferring callback to this point, after insertion to fds,
* lws_callback_on_writable() can work from the callback
@ -423,8 +435,21 @@ lws_create_adopt_udp2(struct lws *wsi, const char *ads,
if (!wsi->dns_results)
wsi->dns_results_next = wsi->dns_results = r;
if (n < 0 || !r)
if (n < 0 || !r) {
/*
* DNS lookup failed: there are no usable results. Fail the
* overall connection request.
*/
lwsl_debug("%s: bad: n %d, r %p\n", __func__, n, r);
/*
* We didn't get a callback on a cache item and bump the
* refcount. So don't let the cleanup continue to think it
* needs to decrement any refcount.
*/
wsi->dns_results_next = wsi->dns_results = NULL;
goto bail;
}
while (wsi->dns_results_next) {
@ -478,7 +503,7 @@ lws_create_adopt_udp2(struct lws *wsi, const char *ads,
wsi->udp->salen = wsi->dns_results_next->ai_addrlen;
}
/* complete the udp socket adoption flow */
/* we connected: complete the udp socket adoption flow */
lws_addrinfo_clean(wsi);
return lws_adopt_descriptor_vhost2(wsi,
@ -500,10 +525,11 @@ bail:
struct lws *
lws_create_adopt_udp(struct lws_vhost *vhost, const char *ads, int port,
int flags, const char *protocol_name,
struct lws *parent_wsi)
struct lws *parent_wsi, const lws_retry_bo_t *retry_policy)
{
#if !defined(LWS_PLAT_OPTEE)
struct lws *wsi;
int n;
lwsl_info("%s: %s:%u\n", __func__, ads ? ads : "null", port);
@ -517,6 +543,10 @@ lws_create_adopt_udp(struct lws_vhost *vhost, const char *ads, int port,
}
wsi->do_bind = !!(flags & LWS_CAUDP_BIND);
wsi->c_port = port;
if (retry_policy)
wsi->retry_policy = retry_policy;
else
wsi->retry_policy = vhost->retry_policy;
#if !defined(LWS_WITH_SYS_ASYNC_DNS)
{
@ -562,11 +592,16 @@ lws_create_adopt_udp(struct lws_vhost *vhost, const char *ads, int port,
*
* Keep a refcount on the results and free it when we connected
* or definitively failed.
*
* Notice wsi has no socket at this point (we don't know what
* kind to ask for until we get the dns back). But it is bound
* to a vhost and can be cleaned up from that at vhost destroy.
*/
if (lws_async_dns_query(vhost->context, 0, ads,
n = lws_async_dns_query(vhost->context, 0, ads,
LWS_ADNS_RECORD_A,
lws_create_adopt_udp2, wsi, NULL) ==
LADNS_RET_FAILED) {
lws_create_adopt_udp2, wsi, NULL);
lwsl_debug("%s: dns query returned %d\n", __func__, n);
if (n == LADNS_RET_FAILED) {
lwsl_err("%s: async dns failed\n", __func__);
wsi = NULL;
/*
@ -575,11 +610,14 @@ lws_create_adopt_udp(struct lws_vhost *vhost, const char *ads, int port,
*/
goto bail;
}
} else
} else {
lwsl_debug("%s: fail on no ads\n", __func__);
wsi = lws_create_adopt_udp2(wsi, ads, NULL, 0, NULL);
}
/* dns lookup is happening asynchronously */
lwsl_debug("%s: returning wsi %p\n", __func__, wsi);
return wsi;
#endif
#if !defined(LWS_WITH_SYS_ASYNC_DNS)

View file

@ -71,6 +71,9 @@ __lws_reset_wsi(struct lws *wsi)
}
#endif
if (wsi->vhost)
lws_dll2_remove(&wsi->vh_awaiting_socket);
/*
* Protocol user data may be allocated either internally by lws
* or by specified the user. We should only free what we allocated.
@ -82,6 +85,7 @@ __lws_reset_wsi(struct lws *wsi)
lws_buflist_destroy_all_segments(&wsi->buflist);
lws_buflist_destroy_all_segments(&wsi->buflist_out);
lws_free_set_NULL(wsi->udp);
wsi->retry = 0;
#if defined(LWS_WITH_CLIENT)
lws_dll2_remove(&wsi->dll2_cli_txn_queue);
@ -145,8 +149,9 @@ __lws_free_wsi(struct lws *wsi)
lws_vhost_unbind_wsi(wsi);
lwsl_debug("%s: %p, remaining wsi %d\n", __func__, wsi,
wsi->context->count_wsi_allocated);
lwsl_debug("%s: %p, remaining wsi %d, tsi fds count %d\n", __func__, wsi,
wsi->context->count_wsi_allocated,
wsi->context->pt[(int)wsi->tsi].fds_count);
lws_free(wsi);
}
@ -215,7 +220,7 @@ lws_addrinfo_clean(struct lws *wsi)
return;
#if defined(LWS_WITH_SYS_ASYNC_DNS)
lws_async_dns_freeaddrinfo(wsi->dns_results);
lws_async_dns_freeaddrinfo(&wsi->dns_results);
#else
freeaddrinfo((struct addrinfo *)wsi->dns_results);
#endif
@ -230,7 +235,7 @@ __lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason,
struct lws_context_per_thread *pt;
struct lws *wsi1, *wsi2;
struct lws_context *context;
int n;
int n, ccb;
lwsl_info("%s: %p: caller: %s\n", __func__, wsi, caller);
@ -409,6 +414,8 @@ just_kill_connection:
if (wsi->http.buflist_post_body)
lws_buflist_destroy_all_segments(&wsi->http.buflist_post_body);
#endif
if (wsi->udp)
lws_free_set_NULL(wsi->udp);
if (wsi->role_ops->close_kill_connection)
wsi->role_ops->close_kill_connection(wsi, reason);
@ -536,18 +543,15 @@ just_kill_connection:
/* tell the user it's all over for this guy */
ccb = 0;
if ((lwsi_state_est_PRE_CLOSE(wsi) ||
/* raw skt adopted but didn't complete tls hs should CLOSE */
(wsi->role_ops == &role_ops_raw_skt && !lwsi_role_client(wsi)) ||
lwsi_state_PRE_CLOSE(wsi) == LRS_WAITING_SERVER_REPLY) &&
!wsi->told_user_closed &&
wsi->role_ops->close_cb[lwsi_role_server(wsi)]) {
const struct lws_protocols *pro = wsi->protocol;
if (!wsi->protocol && wsi->vhost && wsi->vhost->protocols)
pro = &wsi->vhost->protocols[0];
if (pro && (!wsi->upgraded_to_http2 || !lwsi_role_client(wsi)))
if (!wsi->upgraded_to_http2 || !lwsi_role_client(wsi))
ccb = 1;
/*
* The network wsi for a client h2 connection shouldn't
* call back for its role: the child stream connections
@ -555,9 +559,27 @@ just_kill_connection:
* one too many times as the children do it and then
* the closing network stream.
*/
}
if (!wsi->told_user_closed &&
!lws_dll2_is_detached(&wsi->vh_awaiting_socket))
/*
* He's a guy who go started with dns, but failed or is
* caught with a shutdown before he got the result. We have
* to issue him a close cb
*/
ccb = 1;
if (ccb) {
const struct lws_protocols *pro = wsi->protocol;
if (!wsi->protocol && wsi->vhost && wsi->vhost->protocols)
pro = &wsi->vhost->protocols[0];
if (pro)
pro->callback(wsi,
wsi->role_ops->close_cb[lwsi_role_server(wsi)],
wsi->user_space, NULL, 0);
wsi->role_ops->close_cb[lwsi_role_server(wsi)],
wsi->user_space, NULL, 0);
wsi->told_user_closed = 1;
}

View file

@ -426,11 +426,22 @@ lws_retry_sul_schedule(struct lws_context *context, int tid,
if (!conceal)
return 1;
lwsl_info("%s: sul %p: scheduling retry in %dms\n", __func__, sul,
(int)ms);
lws_sul_schedule(context, tid, sul, cb, ms * 1000);
return 0;
}
int
lws_retry_sul_schedule_retry_wsi(struct lws *wsi, lws_sorted_usec_list_t *sul,
sul_cb_t cb, uint16_t *ctry)
{
return lws_retry_sul_schedule(wsi->context, wsi->tsi, sul,
wsi->retry_policy, cb, ctry);
}
#if defined(LWS_WITH_IPV6)
LWS_EXTERN unsigned long
lws_get_addr_scope(const char *ipaddr)

View file

@ -336,6 +336,21 @@ lws_ssl_capable_write_no_ssl(struct lws *wsi, unsigned char *buf, int len)
if (lws_wsi_is_udp(wsi)) {
#if !defined(LWS_PLAT_FREERTOS) && !defined(LWS_PLAT_OPTEE)
if (wsi->context->udp_loss_sim_tx_pc) {
uint16_t u16;
/*
* We should randomly drop some of these
*/
if (lws_get_random(wsi->context, &u16, 2) == 2 &&
((u16 * 100) / 0xffff) <=
wsi->context->udp_loss_sim_tx_pc) {
lwsl_warn("%s: dropping udp tx\n", __func__);
/* pretend it was sent */
n = len;
goto post_send;
}
}
if (lws_has_buffered_out(wsi))
n = sendto(wsi->desc.sockfd, (const char *)buf,
len, 0, &wsi->udp->sa_pending,
@ -347,6 +362,8 @@ lws_ssl_capable_write_no_ssl(struct lws *wsi, unsigned char *buf, int len)
} else
n = send(wsi->desc.sockfd, (char *)buf, len, MSG_NOSIGNAL);
// lwsl_info("%s: sent len %d result %d", __func__, len, n);
post_send:
if (n >= 0)
return n;

View file

@ -304,8 +304,7 @@ typedef struct lws_dsh {
typedef struct lws_async_dns {
lws_sockaddr46 sa46; /* nameserver */
lws_dll2_owner_t waiting_send;
lws_dll2_owner_t waiting_resp;
lws_dll2_owner_t waiting;
lws_dll2_owner_t cached;
struct lws *wsi;
time_t time_set_server;
@ -534,6 +533,7 @@ struct lws_vhost {
#if defined(LWS_WITH_CLIENT)
struct lws_dll2_owner dll_cli_active_conns_owner;
#endif
struct lws_dll2_owner vh_awaiting_socket_owner;
#if defined(LWS_WITH_TLS)
struct lws_vhost_tls tls;
@ -614,6 +614,7 @@ struct lws {
struct lws_dll2 dll_buflist; /* guys with pending rxflow */
struct lws_dll2 same_vh_protocol;
struct lws_dll2 vh_awaiting_socket;
#if defined(LWS_WITH_SYS_ASYNC_DNS)
struct lws_dll2 adns; /* on adns list of guys to tell result */
lws_async_dns_cb_t adns_cb; /* callback with result */
@ -744,6 +745,7 @@ struct lws {
#endif
uint16_t c_port;
uint16_t retry;
/* chars */

View file

@ -1040,6 +1040,22 @@ __lws_vhost_destroy2(struct lws_vhost *vh)
struct lws wsi;
int n;
#if defined(LWS_WITH_CLIENT)
/*
* destroy any wsi that are associated with us but have no socket
* (and will otherwise be missed for destruction)
*/
lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
vh->vh_awaiting_socket_owner.head) {
struct lws *w =
lws_container_of(d, struct lws, vh_awaiting_socket);
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
"awaiting skt");
} lws_end_foreach_dll_safe(d, d1);
#endif
/*
* destroy any pending timed events
*/

View file

@ -149,6 +149,7 @@ lws_create_context(const struct lws_context_creation_info *info)
int lpf = info->fd_limit_per_thread;
if (lpf) {
lpf++;
#if defined(LWS_WITH_SYS_ASYNC_DNS)
lpf++;
#endif
@ -210,6 +211,14 @@ lws_create_context(const struct lws_context_creation_info *info)
context->groupname = info->groupname;
context->system_ops = info->system_ops;
context->pt_serv_buf_size = s1;
context->udp_loss_sim_tx_pc = info->udp_loss_sim_tx_pc;
context->udp_loss_sim_rx_pc = info->udp_loss_sim_rx_pc;
if (context->udp_loss_sim_tx_pc || context->udp_loss_sim_rx_pc)
lwsl_warn("%s: simulating udp loss tx: %d%%, rx: %d%%\n",
__func__, context->udp_loss_sim_tx_pc,
context->udp_loss_sim_rx_pc);
#if defined(LWS_WITH_NETWORK)
context->count_threads = count_threads;
#if defined(LWS_WITH_DETAILED_LATENCY)

View file

@ -445,6 +445,8 @@ struct lws_context {
unsigned short deprecation_pending_listen_close_count;
uint8_t max_fi;
uint8_t udp_loss_sim_tx_pc;
uint8_t udp_loss_sim_rx_pc;
#if defined(LWS_WITH_STATS)
uint8_t updated;

View file

@ -345,6 +345,18 @@ lws_client_connect_3_connect(struct lws *wsi, const char *ads,
__func__, result, result->ai_next);
}
#if defined(LWS_WITH_DETAILED_LATENCY)
if (lwsi_state(wsi) == LRS_WAITING_DNS &&
wsi->context->detailed_latency_cb) {
wsi->detlat.type = LDLT_NAME_RESOLUTION;
wsi->detlat.latencies[LAT_DUR_PROXY_CLIENT_REQ_TO_WRITE] =
lws_now_usecs() -
wsi->detlat.earliest_write_req_pre_write;
wsi->detlat.latencies[LAT_DUR_USERCB] = 0;
lws_det_lat_cb(wsi->context, &wsi->detlat);
wsi->detlat.earliest_write_req_pre_write = lws_now_usecs();
}
#endif
#if defined(LWS_CLIENT_HTTP_PROXYING) && \
(defined(LWS_ROLE_H1) || defined(LWS_ROLE_H2))

View file

@ -92,10 +92,27 @@ rops_handle_POLLIN_raw_skt(struct lws_context_per_thread *pt, struct lws *wsi,
goto try_pollout;
}
if (wsi->context->udp_loss_sim_rx_pc) {
uint16_t u16;
/*
* We should randomly drop some of these
*/
if (lws_get_random(wsi->context, &u16, 2) == 2 &&
((u16 * 100) / 0xffff) <=
wsi->context->udp_loss_sim_rx_pc) {
lwsl_warn("%s: dropping udp rx\n", __func__);
/* pretend it was handled */
n = ebuf.len;
goto post_rx;
}
}
n = user_callback_handle_rxflow(wsi->protocol->callback,
wsi, LWS_CALLBACK_RAW_RX,
wsi->user_space, ebuf.token,
ebuf.len);
post_rx:
if (n < 0) {
lwsl_info("LWS_CALLBACK_RAW_RX_fail\n");
goto fail;

View file

@ -170,7 +170,7 @@ int main(int argc, const char **argv)
* Create our own "foreign" UDP socket bound to 7681/udp
*/
if (!lws_create_adopt_udp(vhost, NULL, 7681, LWS_CAUDP_BIND,
protocols[0].name, NULL)) {
protocols[0].name, NULL, NULL)) {
lwsl_err("%s: foreign socket adoption failed\n", __func__);
goto bail;
}