From d892b86b93691231ca4f5fb7b91ddc4419e28d67 Mon Sep 17 00:00:00 2001 From: Andy Green Date: Tue, 1 Feb 2022 09:16:21 +0000 Subject: [PATCH] lws_flow helpers Add a generic struct to manage a buflist with an incrementally-consumable head, and helpers to deal with retiring the last segment and starting the new head. The lws_flow is added to using the buflist member directly, it autohandles SS window management to try to keep the total buffered at the client to the window member limit. --- include/libwebsockets/lws-misc.h | 60 ++++++++++++++++++++++++++++++++ lib/core/buflist.c | 49 ++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/include/libwebsockets/lws-misc.h b/include/libwebsockets/lws-misc.h index 481fbd70d..233cb3456 100644 --- a/include/libwebsockets/lws-misc.h +++ b/include/libwebsockets/lws-misc.h @@ -178,6 +178,66 @@ lws_buflist_destroy_all_segments(struct lws_buflist **head); LWS_VISIBLE LWS_EXTERN void lws_buflist_describe(struct lws_buflist **head, void *id, const char *reason); + +/* + * Optional helpers for closely-managed stream flow control. These are useful + * when there is no memory for large rx buffers and instead tx credit is being + * used to regulate the server sending data. + * + * When combined with stateful consumption-on-demand, this can be very effective + * at managing data flows through restricted circumstances. These helpers + * implement a golden implementation that can be bound to a stream in its priv + * data. + * + * The helper is sophisticated enough to contain a buflist to manage overflows + * on heap and preferentially drain it. RX goes through heap to guarantee the + * consumer can exit cleanly at any time. + */ + +enum { + LWSDLOFLOW_STATE_READ, /* default, we want input */ + LWSDLOFLOW_STATE_READ_COMPLETED, /* we do not need further rx, every- + * thing is locally buffered or used */ + LWSDLOFLOW_STATE_READ_FAILED, /* operation has fatal error */ +}; + +struct lws_ss_handle; + +typedef struct lws_flow { + lws_dll2_t list; + + struct lws_ss_handle *h; + struct lws_buflist *bl; + + const uint8_t *data; + size_t len; /* bytes left in data */ + uint32_t blseglen; /* bytes issued */ + int32_t window; + + uint8_t state; +} lws_flow_t; + +/** + * lws_flow_feed() - consume waiting data if ready for it + * + * \param flow: pointer to the flow struct managing waiting data + * + * This will bring out waiting data from the flow buflist when it is needed. + */ +LWS_VISIBLE LWS_EXTERN lws_stateful_ret_t +lws_flow_feed(lws_flow_t *flow); + +/** + * lws_flow_req() - request remote data if we have run low + * + * \param flow: pointer to the flow struct managing waiting data + * + * When the estimated remote tx credit is below flow->window, accounting for + * what is in the buflist, add to the peer tx credit so it can send us more. + */ +LWS_VISIBLE LWS_EXTERN lws_stateful_ret_t +lws_flow_req(lws_flow_t *flow); + /** * lws_ptr_diff(): helper to report distance between pointers as an int * diff --git a/lib/core/buflist.c b/lib/core/buflist.c index b89330ad7..dbe544e5c 100644 --- a/lib/core/buflist.c +++ b/lib/core/buflist.c @@ -271,3 +271,52 @@ lws_buflist_describe(struct lws_buflist **head, void *id, const char *reason) } } #endif + +lws_stateful_ret_t +lws_flow_feed(lws_flow_t *flow) +{ + if (flow->len) + return LWS_SRET_OK; + + if (flow->blseglen) + lws_buflist_use_segment(&flow->bl, flow->blseglen); + + flow->len = lws_buflist_next_segment_len(&flow->bl, + (uint8_t **)&flow->data); + flow->blseglen = (uint32_t)flow->len; + + return flow->len || + flow->state != LWSDLOFLOW_STATE_READ ? LWS_SRET_OK : + LWS_SRET_WANT_INPUT; +} + +lws_stateful_ret_t +lws_flow_req(lws_flow_t *flow) +{ +#if defined(LWS_WITH_CLIENT) && defined(LWS_WITH_SECURE_STREAMS) + int32_t est, ask; +#endif + + lws_flow_feed(flow); + + if (!flow->h || flow->state != LWSDLOFLOW_STATE_READ) + return LWS_SRET_OK; + +#if defined(LWS_WITH_CLIENT) && defined(LWS_WITH_SECURE_STREAMS) + if (flow->window) { + est = lws_ss_get_est_peer_tx_credit(flow->h) + + (int)lws_buflist_total_len(&flow->bl) - + (int)flow->blseglen + (int)flow->len; + + if (est < flow->window) { + ask = (int32_t)(flow->window - est); + if (ask > (flow->window / 2) || !est) + lws_ss_add_peer_tx_credit(flow->h, ask); + } + } +#endif + + return flow->len || + flow->state != LWSDLOFLOW_STATE_READ ? LWS_SRET_OK : + LWS_SRET_WANT_INPUT; +}