mirror of
https://github.com/warmcat/libwebsockets.git
synced 2025-03-30 00:00:16 +01:00
lws_flow helpers
Add a generic struct to manage a buflist with an incrementally-consumable head, and helpers to deal with retiring the last segment and starting the new head. The lws_flow is added to using the buflist member directly, it autohandles SS window management to try to keep the total buffered at the client to the window member limit.
This commit is contained in:
parent
9b92c47101
commit
d892b86b93
2 changed files with 109 additions and 0 deletions
|
@ -178,6 +178,66 @@ lws_buflist_destroy_all_segments(struct lws_buflist **head);
|
|||
LWS_VISIBLE LWS_EXTERN void
|
||||
lws_buflist_describe(struct lws_buflist **head, void *id, const char *reason);
|
||||
|
||||
|
||||
/*
|
||||
* Optional helpers for closely-managed stream flow control. These are useful
|
||||
* when there is no memory for large rx buffers and instead tx credit is being
|
||||
* used to regulate the server sending data.
|
||||
*
|
||||
* When combined with stateful consumption-on-demand, this can be very effective
|
||||
* at managing data flows through restricted circumstances. These helpers
|
||||
* implement a golden implementation that can be bound to a stream in its priv
|
||||
* data.
|
||||
*
|
||||
* The helper is sophisticated enough to contain a buflist to manage overflows
|
||||
* on heap and preferentially drain it. RX goes through heap to guarantee the
|
||||
* consumer can exit cleanly at any time.
|
||||
*/
|
||||
|
||||
enum {
|
||||
LWSDLOFLOW_STATE_READ, /* default, we want input */
|
||||
LWSDLOFLOW_STATE_READ_COMPLETED, /* we do not need further rx, every-
|
||||
* thing is locally buffered or used */
|
||||
LWSDLOFLOW_STATE_READ_FAILED, /* operation has fatal error */
|
||||
};
|
||||
|
||||
struct lws_ss_handle;
|
||||
|
||||
typedef struct lws_flow {
|
||||
lws_dll2_t list;
|
||||
|
||||
struct lws_ss_handle *h;
|
||||
struct lws_buflist *bl;
|
||||
|
||||
const uint8_t *data;
|
||||
size_t len; /* bytes left in data */
|
||||
uint32_t blseglen; /* bytes issued */
|
||||
int32_t window;
|
||||
|
||||
uint8_t state;
|
||||
} lws_flow_t;
|
||||
|
||||
/**
|
||||
* lws_flow_feed() - consume waiting data if ready for it
|
||||
*
|
||||
* \param flow: pointer to the flow struct managing waiting data
|
||||
*
|
||||
* This will bring out waiting data from the flow buflist when it is needed.
|
||||
*/
|
||||
LWS_VISIBLE LWS_EXTERN lws_stateful_ret_t
|
||||
lws_flow_feed(lws_flow_t *flow);
|
||||
|
||||
/**
|
||||
* lws_flow_req() - request remote data if we have run low
|
||||
*
|
||||
* \param flow: pointer to the flow struct managing waiting data
|
||||
*
|
||||
* When the estimated remote tx credit is below flow->window, accounting for
|
||||
* what is in the buflist, add to the peer tx credit so it can send us more.
|
||||
*/
|
||||
LWS_VISIBLE LWS_EXTERN lws_stateful_ret_t
|
||||
lws_flow_req(lws_flow_t *flow);
|
||||
|
||||
/**
|
||||
* lws_ptr_diff(): helper to report distance between pointers as an int
|
||||
*
|
||||
|
|
|
@ -271,3 +271,52 @@ lws_buflist_describe(struct lws_buflist **head, void *id, const char *reason)
|
|||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
lws_stateful_ret_t
|
||||
lws_flow_feed(lws_flow_t *flow)
|
||||
{
|
||||
if (flow->len)
|
||||
return LWS_SRET_OK;
|
||||
|
||||
if (flow->blseglen)
|
||||
lws_buflist_use_segment(&flow->bl, flow->blseglen);
|
||||
|
||||
flow->len = lws_buflist_next_segment_len(&flow->bl,
|
||||
(uint8_t **)&flow->data);
|
||||
flow->blseglen = (uint32_t)flow->len;
|
||||
|
||||
return flow->len ||
|
||||
flow->state != LWSDLOFLOW_STATE_READ ? LWS_SRET_OK :
|
||||
LWS_SRET_WANT_INPUT;
|
||||
}
|
||||
|
||||
lws_stateful_ret_t
|
||||
lws_flow_req(lws_flow_t *flow)
|
||||
{
|
||||
#if defined(LWS_WITH_CLIENT) && defined(LWS_WITH_SECURE_STREAMS)
|
||||
int32_t est, ask;
|
||||
#endif
|
||||
|
||||
lws_flow_feed(flow);
|
||||
|
||||
if (!flow->h || flow->state != LWSDLOFLOW_STATE_READ)
|
||||
return LWS_SRET_OK;
|
||||
|
||||
#if defined(LWS_WITH_CLIENT) && defined(LWS_WITH_SECURE_STREAMS)
|
||||
if (flow->window) {
|
||||
est = lws_ss_get_est_peer_tx_credit(flow->h) +
|
||||
(int)lws_buflist_total_len(&flow->bl) -
|
||||
(int)flow->blseglen + (int)flow->len;
|
||||
|
||||
if (est < flow->window) {
|
||||
ask = (int32_t)(flow->window - est);
|
||||
if (ask > (flow->window / 2) || !est)
|
||||
lws_ss_add_peer_tx_credit(flow->h, ask);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
return flow->len ||
|
||||
flow->state != LWSDLOFLOW_STATE_READ ? LWS_SRET_OK :
|
||||
LWS_SRET_WANT_INPUT;
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue