1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

rx flow: handle partial flow buffer consumption

https://github.com/warmcat/libwebsockets/issues/1550

rx flow control needs to handle the situation that it is draining from
a previous rx flow control period, and the user code reasserts rx flow
control partway through that.

The accounting for the used rx then boils down to only trimming the
rxflow buflist we were "replaying" to consume however much we managed
to deliver of that this time before the rx flow control came again.

"Normal" rx consumption is wrong in this case, since we accounted for
it entirely in the rxflow cache buflist.

The patch recognizes this situation, does the accounting in the cache
buflist, and then lies to the caller that there was no rx consumption
to be accounted for at his level.
This commit is contained in:
Andy Green 2019-04-19 07:13:40 +01:00
parent 359aeb1093
commit bb0e7d986d
7 changed files with 69 additions and 22 deletions

View file

@ -786,6 +786,14 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or);
LWS_EXTERN int
__remove_wsi_socket_from_fds(struct lws *wsi);
enum {
LWSRXFC_ERROR = -1,
LWSRXFC_CACHED = 0,
LWSRXFC_ADDITIONAL = 1,
LWSRXFC_TRIMMED = 2,
};
LWS_EXTERN int
lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len);

View file

@ -295,23 +295,32 @@ __lws_service_timeout_check(struct lws *wsi, time_t sec)
return 0;
}
int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len)
int
lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
uint8_t *buffered;
size_t blen;
int ret = 0, m;
int ret = LWSRXFC_CACHED, m;
/* his RX is flowcontrolled, don't send remaining now */
blen = lws_buflist_next_segment_len(&wsi->buflist, &buffered);
if (blen) {
if (buf >= buffered && buf + len <= buffered + blen) {
/* rxflow while we were spilling prev rxflow */
lwsl_info("%s: staying in rxflow buf\n", __func__);
/*
* rxflow while we were spilling prev rxflow
*
* len indicates how much was unused, then... so trim
* the head buflist to match that situation
*/
return 1;
lws_buflist_use_segment(&wsi->buflist, blen - len);
lwsl_debug("%s: trim existing rxflow %d -> %d\n",
__func__, (int)blen, (int)len);
return LWSRXFC_TRIMMED;
}
ret = 1;
ret = LWSRXFC_ADDITIONAL;
}
/* a new rxflow, buffer it and warn caller */
@ -319,7 +328,7 @@ int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len)
m = lws_buflist_append_segment(&wsi->buflist, buf + n, len - n);
if (m < 0)
return -1;
return LWSRXFC_ERROR;
if (m) {
lwsl_debug("%s: added %p to rxflow list\n", __func__, wsi);
lws_dll_add_head(&wsi->dll_buflist, &pt->dll_head_buflist);

View file

@ -337,6 +337,8 @@ __lws_rx_flow_control(struct lws *wsi)
/* adjust the pollfd for this wsi */
if (wsi->rxflow_change_to & LWS_RXFLOW_ALLOW) {
lwsl_info("%s: reenable POLLIN\n", __func__);
// lws_buflist_describe(&wsi->buflist, NULL);
if (__lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
lwsl_info("%s: fail\n", __func__);
return -1;

View file

@ -66,22 +66,46 @@ lws_create_client_ws_object(const struct lws_client_connect_info *i,
int
lws_ws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len)
{
unsigned char *bufin = *buf;
if ((lwsi_state(wsi) != LRS_WAITING_PROXY_REPLY) &&
(lwsi_state(wsi) != LRS_H1C_ISSUE_HANDSHAKE) &&
(lwsi_state(wsi) != LRS_WAITING_SERVER_REPLY) &&
!lwsi_role_client(wsi))
return 0;
// lwsl_notice("%s: hs client gets %d in\n", __func__, (int)len);
lwsl_debug("%s: hs client feels it has %d in\n", __func__, (int)len);
while (len) {
/*
* we were accepting input but now we stopped doing so
*/
if (lws_is_flowcontrolled(wsi)) {
//lwsl_notice("%s: caching %ld\n", __func__, (long)len);
lws_rxflow_cache(wsi, *buf, 0, (int)len);
*buf += len;
lwsl_debug("%s: caching %ld\n", __func__, (long)len);
/*
* Since we cached the remaining available input, we
* can say we "consumed" it.
*
* But what about the case where the available input
* came out of the rxflow cache already? If we are
* effectively "putting it back in the cache", we have
* to place it at the cache head, not the tail as usual.
*/
if (lws_rxflow_cache(wsi, *buf, 0, (int)len) ==
LWSRXFC_TRIMMED) {
/*
* we dealt with it by trimming the existing
* rxflow cache HEAD to account for what we used.
*
* indicate we didn't use anything to the caller
* so he doesn't do any consumed processing
*/
lwsl_info("%s: trimming inside rxflow cache\n",
__func__);
*buf = bufin;
} else
*buf += len;
return 0;
}
#if !defined(LWS_WITHOUT_EXTENSIONS)

View file

@ -937,10 +937,6 @@ rops_handle_POLLIN_ws(struct lws_context_per_thread *pt, struct lws *wsi,
return LWS_HPI_RET_HANDLED;
}
//lwsl_notice("%s: wsi->ws->tx_draining_ext %d revents 0x%x 0x%x %d\n",
//__func__, wsi->ws->tx_draining_ext, pollfd->revents, wsi->wsistate,
//lwsi_state_can_handle_POLLOUT(wsi));
/* 1: something requested a callback when it was OK to write */
if ((pollfd->revents & LWS_POLLOUT) &&
@ -977,14 +973,21 @@ rops_handle_POLLIN_ws(struct lws_context_per_thread *pt, struct lws *wsi,
*/
return LWS_HPI_RET_HANDLED;
#endif
if (lws_is_flowcontrolled(wsi)) {
if ((pollfd->revents & LWS_POLLIN) && lws_is_flowcontrolled(wsi)) {
/* We cannot deal with any kind of new RX because we are
* RX-flowcontrolled.
*/
lwsl_info("flowcontrolled\n");
lwsl_info("%s: flowcontrolled, ignoring rx\n", __func__);
if (__lws_change_pollfd(wsi, LWS_POLLIN, 0))
return -1;
return LWS_HPI_RET_HANDLED;
}
if (lws_is_flowcontrolled(wsi))
return LWS_HPI_RET_HANDLED;
#if defined(LWS_WITH_HTTP2)
if (wsi->http2_substream || wsi->upgraded_to_http2) {
wsi1 = lws_get_network_wsi(wsi);

View file

@ -899,10 +899,11 @@ lws_parse_ws(struct lws *wsi, unsigned char **buf, size_t len)
* we were accepting input but now we stopped doing so
*/
if (wsi->rxflow_bitmap) {
lwsl_info("%s: doing rxflow\n", __func__);
lws_rxflow_cache(wsi, *buf, 0, (int)len);
lwsl_parser("%s: cached %ld\n", __func__, (long)len);
*buf += len; /* stashing it is taking care of it */
lwsl_info("%s: doing rxflow, caching %d\n", __func__,
(int)len);
if (lws_rxflow_cache(wsi, *buf, 0, (int)len) !=
LWSRXFC_TRIMMED)
*buf += len; /* stashing it is taking care of it */
return 1;
}
#if !defined(LWS_WITHOUT_EXTENSIONS)

View file

@ -161,7 +161,7 @@ callback_minimal_pmd_bulk(struct lws *wsi, enum lws_callback_reasons reason,
m = lws_write(wsi, start, n, flags);
lwsl_user("LWS_CALLBACK_SERVER_WRITEABLE: wrote %d\n", n);
if (m < n) {
lwsl_err("ERROR %d writing ws\n", n);
lwsl_err("ERROR %d / %d writing ws\n", m, n);
return -1;
}
if (pss->position_tx != MESSAGE_SIZE) /* if more to do... */