ext: pmd: improve dealing with partial input usage with drain
https://github.com/warmcat/libwebsockets/issues/841
This commit is contained in:
parent
f9e16eacd9
commit
7bc6f5699c
8 changed files with 88 additions and 23 deletions
|
@ -202,7 +202,7 @@ lws_extension_callback_pm_deflate(struct lws_context *context,
|
|||
* rx buffer by the caller, so this assumption is safe while
|
||||
* we block new rx while draining the existing rx
|
||||
*/
|
||||
if (eff_buf->token && eff_buf->token_len) {
|
||||
if (!priv->rx.avail_in && eff_buf->token && eff_buf->token_len) {
|
||||
priv->rx.next_in = (unsigned char *)eff_buf->token;
|
||||
priv->rx.avail_in = eff_buf->token_len;
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ lws_uv_idle(uv_idle_t *handle
|
|||
struct lws_context_per_thread *pt = lws_container_of(handle,
|
||||
struct lws_context_per_thread, uv_idle);
|
||||
|
||||
lwsl_debug("%s\n", __func__);
|
||||
// lwsl_debug("%s\n", __func__);
|
||||
|
||||
/*
|
||||
* is there anybody with pending stuff that needs service forcing?
|
||||
|
@ -51,7 +51,7 @@ lws_uv_idle(uv_idle_t *handle
|
|||
/* still somebody left who wants forced service? */
|
||||
if (!lws_service_adjust_timeout(pt->context, 1, pt->tid))
|
||||
/* yes... come back again later */
|
||||
lwsl_debug("%s: done again\n", __func__);
|
||||
// lwsl_debug("%s: done again\n", __func__);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -986,6 +986,24 @@ LWS_VISIBLE int lws_frame_is_binary(struct lws *wsi)
|
|||
{
|
||||
return wsi->u.ws.frame_is_binary;
|
||||
}
|
||||
static void
|
||||
lws_remove_wsi_from_draining_ext_list(struct lws *wsi)
|
||||
{
|
||||
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
|
||||
struct lws **w = &pt->rx_draining_ext_list;
|
||||
|
||||
wsi->u.ws.rx_draining_ext = 0;
|
||||
/* remove us from context draining ext list */
|
||||
while (*w) {
|
||||
if (*w == wsi) {
|
||||
*w = wsi->u.ws.rx_draining_ext_list;
|
||||
break;
|
||||
}
|
||||
w = &((*w)->u.ws.rx_draining_ext_list);
|
||||
}
|
||||
wsi->u.ws.rx_draining_ext_list = NULL;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
lws_rx_sm(struct lws *wsi, unsigned char c)
|
||||
|
@ -995,26 +1013,19 @@ lws_rx_sm(struct lws *wsi, unsigned char c)
|
|||
int ret = 0, n, rx_draining_ext = 0;
|
||||
struct lws_tokens eff_buf;
|
||||
|
||||
eff_buf.token = NULL;
|
||||
eff_buf.token_len = 0;
|
||||
|
||||
if (wsi->socket_is_permanently_unusable)
|
||||
return -1;
|
||||
|
||||
|
||||
switch (wsi->lws_rx_parse_state) {
|
||||
case LWS_RXPS_NEW:
|
||||
if (wsi->u.ws.rx_draining_ext) {
|
||||
struct lws **w = &pt->rx_draining_ext_list;
|
||||
|
||||
eff_buf.token = NULL;
|
||||
eff_buf.token_len = 0;
|
||||
wsi->u.ws.rx_draining_ext = 0;
|
||||
/* remove us from context draining ext list */
|
||||
while (*w) {
|
||||
if (*w == wsi) {
|
||||
*w = wsi->u.ws.rx_draining_ext_list;
|
||||
break;
|
||||
}
|
||||
w = &((*w)->u.ws.rx_draining_ext_list);
|
||||
}
|
||||
wsi->u.ws.rx_draining_ext_list = NULL;
|
||||
lws_remove_wsi_from_draining_ext_list(wsi);
|
||||
rx_draining_ext = 1;
|
||||
lwsl_err("%s: doing draining flow\n", __func__);
|
||||
|
||||
|
@ -1270,6 +1281,9 @@ handle_first:
|
|||
case LWS_RXPS_PAYLOAD_UNTIL_LENGTH_EXHAUSTED:
|
||||
assert(wsi->u.ws.rx_ubuf);
|
||||
|
||||
if (wsi->u.ws.rx_draining_ext)
|
||||
goto drain_extension;
|
||||
|
||||
if (wsi->u.ws.rx_ubuf_head + LWS_PRE >=
|
||||
wsi->u.ws.rx_ubuf_alloc) {
|
||||
lwsl_err("Attempted overflow \n");
|
||||
|
@ -1430,6 +1444,9 @@ drain_extension:
|
|||
goto already_done;
|
||||
|
||||
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_RX, &eff_buf, 0);
|
||||
/* eff_buf may be pointing somewhere completely different now,
|
||||
* it's the output
|
||||
*/
|
||||
if (n < 0) {
|
||||
/*
|
||||
* we may rely on this to get RX, just drop connection
|
||||
|
@ -1443,9 +1460,12 @@ drain_extension:
|
|||
|
||||
if (n && eff_buf.token_len) {
|
||||
/* extension had more... main loop will come back */
|
||||
// lwsl_notice("ext has stuff to drain\n");
|
||||
wsi->u.ws.rx_draining_ext = 1;
|
||||
wsi->u.ws.rx_draining_ext_list = pt->rx_draining_ext_list;
|
||||
pt->rx_draining_ext_list = wsi;
|
||||
} else {
|
||||
lws_remove_wsi_from_draining_ext_list(wsi);
|
||||
}
|
||||
|
||||
if (eff_buf.token_len > 0 ||
|
||||
|
|
|
@ -1690,7 +1690,7 @@ try_pollout:
|
|||
if (accept_fd < 0) {
|
||||
if (LWS_ERRNO == LWS_EAGAIN ||
|
||||
LWS_ERRNO == LWS_EWOULDBLOCK) {
|
||||
lwsl_err("accept asks to try again\n");
|
||||
// lwsl_err("accept asks to try again\n");
|
||||
break;
|
||||
}
|
||||
lwsl_err("ERROR on accept: %s\n", strerror(LWS_ERRNO));
|
||||
|
@ -1854,6 +1854,7 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
|
|||
}
|
||||
|
||||
if (wsi->u.ws.rx_draining_ext) {
|
||||
// lwsl_notice("draining with 0\n");
|
||||
m = lws_rx_sm(wsi, 0);
|
||||
if (m < 0)
|
||||
return -1;
|
||||
|
@ -1865,7 +1866,8 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
|
|||
wsi->rxflow_pos++;
|
||||
|
||||
/* consume payload bytes efficiently */
|
||||
if (wsi->lws_rx_parse_state ==
|
||||
if (
|
||||
wsi->lws_rx_parse_state ==
|
||||
LWS_RXPS_PAYLOAD_UNTIL_LENGTH_EXHAUSTED) {
|
||||
m = lws_payload_until_length_exhausted(wsi, buf, &len);
|
||||
if (wsi->rxflow_buffer)
|
||||
|
|
|
@ -630,6 +630,17 @@ completed:
|
|||
}
|
||||
#endif
|
||||
|
||||
static int
|
||||
lws_is_ws_with_ext(struct lws *wsi)
|
||||
{
|
||||
#if defined(LWS_NO_EXTENSIONS)
|
||||
return 0;
|
||||
#else
|
||||
return wsi->state == LWSS_ESTABLISHED &&
|
||||
!!wsi->count_act_ext;
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
* lws_service_fd() - Service polled socket with something waiting
|
||||
* @context: Websocket context
|
||||
|
@ -894,9 +905,21 @@ read:
|
|||
wsi->u.hdr.ah->rxpos;
|
||||
} else {
|
||||
if (wsi->mode != LWSCM_HTTP_CLIENT_ACCEPTED) {
|
||||
/*
|
||||
* extension may not consume everything (eg, pmd may be constrained
|
||||
* as to what it can output...) has to go in per-wsi rx buf area.
|
||||
* Otherwise in large temp serv_buf area.
|
||||
*/
|
||||
eff_buf.token = (char *)pt->serv_buf;
|
||||
if (lws_is_ws_with_ext(wsi)) {
|
||||
eff_buf.token_len = wsi->u.ws.rx_ubuf_alloc;
|
||||
} else {
|
||||
eff_buf.token_len = LWS_MAX_SOCKET_IO_BUF;
|
||||
}
|
||||
|
||||
eff_buf.token_len = lws_ssl_capable_read(wsi,
|
||||
pt->serv_buf, pending ? pending :
|
||||
LWS_MAX_SOCKET_IO_BUF);
|
||||
(unsigned char *)eff_buf.token, pending ? pending :
|
||||
eff_buf.token_len);
|
||||
switch (eff_buf.token_len) {
|
||||
case 0:
|
||||
lwsl_info("%s: zero length read\n", __func__);
|
||||
|
@ -909,8 +932,7 @@ read:
|
|||
lwsl_info("Closing when error\n");
|
||||
goto close_and_handled;
|
||||
}
|
||||
|
||||
eff_buf.token = (char *)pt->serv_buf;
|
||||
// lwsl_notice("Actual RX %d\n", eff_buf.token_len);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -966,6 +988,8 @@ drain:
|
|||
* around again it will pick up from where it
|
||||
* left off.
|
||||
*/
|
||||
// lwsl_notice("doing lws_read from pt->serv_buf %p %p for len %d\n", pt->serv_buf, eff_buf.token, (int)eff_buf.token_len);
|
||||
|
||||
n = lws_read(wsi, (unsigned char *)eff_buf.token,
|
||||
eff_buf.token_len);
|
||||
if (n < 0) {
|
||||
|
@ -992,7 +1016,11 @@ drain:
|
|||
|
||||
pending = lws_ssl_pending(wsi);
|
||||
if (pending) {
|
||||
pending = pending > LWS_MAX_SOCKET_IO_BUF ?
|
||||
if (lws_is_ws_with_ext(wsi))
|
||||
pending = pending > wsi->u.ws.rx_ubuf_alloc ?
|
||||
wsi->u.ws.rx_ubuf_alloc : pending;
|
||||
else
|
||||
pending = pending > LWS_MAX_SOCKET_IO_BUF ?
|
||||
LWS_MAX_SOCKET_IO_BUF : pending;
|
||||
goto read;
|
||||
}
|
||||
|
|
|
@ -151,6 +151,11 @@ callback_lws_status(struct lws *wsi, enum lws_callback_reasons reason,
|
|||
}
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_RECEIVE:
|
||||
lwsl_notice("pmd test: RX len %d\n", (int)len);
|
||||
puts(in);
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
pp = &list;
|
||||
while (*pp) {
|
||||
|
|
|
@ -131,6 +131,11 @@ callback_lws_status(struct lws *wsi, enum lws_callback_reasons reason,
|
|||
update_status(wsi, pss);
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_RECEIVE:
|
||||
lwsl_notice("pmd test: RX len %d\n", (int)len);
|
||||
puts(in);
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE:
|
||||
m = lws_write(wsi, (unsigned char *)cache + LWS_PRE, cache_len,
|
||||
LWS_WRITE_TEXT);
|
||||
|
|
|
@ -234,7 +234,8 @@ initiate the close, which it does with code 1001 and reason "Seeya".
|
|||
<div id=s_status>Websocket connection not initialized</div>
|
||||
</td>
|
||||
<td colspan=1>
|
||||
<span class="title">Server Info</span>
|
||||
<span class="title">Server Info</span> <input type=button id=pmd value="Test pmd" onclick=on_pmd>
|
||||
|
||||
</td>
|
||||
</tr><tr>
|
||||
<td class="explain" colspan=2>
|
||||
|
@ -550,6 +551,10 @@ function junk() {
|
|||
socket_di.send(word);
|
||||
}
|
||||
|
||||
function on_pmd() {
|
||||
socket_status.send("{ \"RequestType\":\"DDoS\", \"blob\":\"\" }");
|
||||
}
|
||||
|
||||
var socket_ot;
|
||||
|
||||
function ot_open() {
|
||||
|
|
Loading…
Add table
Reference in a new issue