ext: pmd: improve dealing with partial input usage with drain

https://github.com/warmcat/libwebsockets/issues/841
This commit is contained in:
Andy Green 2017-03-20 19:35:26 +08:00
parent 9207bc51bb
commit 316b0293e9
8 changed files with 86 additions and 23 deletions

View file

@ -202,7 +202,7 @@ lws_extension_callback_pm_deflate(struct lws_context *context,
* rx buffer by the caller, so this assumption is safe while
* we block new rx while draining the existing rx
*/
if (eff_buf->token && eff_buf->token_len) {
if (!priv->rx.avail_in && eff_buf->token && eff_buf->token_len) {
priv->rx.next_in = (unsigned char *)eff_buf->token;
priv->rx.avail_in = eff_buf->token_len;
}

View file

@ -40,7 +40,7 @@ lws_uv_idle(uv_idle_t *handle
struct lws_context_per_thread *pt = lws_container_of(handle,
struct lws_context_per_thread, uv_idle);
lwsl_debug("%s\n", __func__);
// lwsl_debug("%s\n", __func__);
/*
* is there anybody with pending stuff that needs service forcing?
@ -51,7 +51,7 @@ lws_uv_idle(uv_idle_t *handle
/* still somebody left who wants forced service? */
if (!lws_service_adjust_timeout(pt->context, 1, pt->tid))
/* yes... come back again later */
lwsl_debug("%s: done again\n", __func__);
// lwsl_debug("%s: done again\n", __func__);
return;
}

View file

@ -939,6 +939,24 @@ LWS_VISIBLE int lws_frame_is_binary(struct lws *wsi)
{
return wsi->u.ws.frame_is_binary;
}
static void
lws_remove_wsi_from_draining_ext_list(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
struct lws **w = &pt->rx_draining_ext_list;
wsi->u.ws.rx_draining_ext = 0;
/* remove us from context draining ext list */
while (*w) {
if (*w == wsi) {
*w = wsi->u.ws.rx_draining_ext_list;
break;
}
w = &((*w)->u.ws.rx_draining_ext_list);
}
wsi->u.ws.rx_draining_ext_list = NULL;
}
int
lws_rx_sm(struct lws *wsi, unsigned char c)
@ -951,23 +969,13 @@ lws_rx_sm(struct lws *wsi, unsigned char c)
if (wsi->socket_is_permanently_unusable)
return -1;
switch (wsi->lws_rx_parse_state) {
case LWS_RXPS_NEW:
if (wsi->u.ws.rx_draining_ext) {
struct lws **w = &pt->rx_draining_ext_list;
eff_buf.token = NULL;
eff_buf.token_len = 0;
wsi->u.ws.rx_draining_ext = 0;
/* remove us from context draining ext list */
while (*w) {
if (*w == wsi) {
*w = wsi->u.ws.rx_draining_ext_list;
break;
}
w = &((*w)->u.ws.rx_draining_ext_list);
}
wsi->u.ws.rx_draining_ext_list = NULL;
lws_remove_wsi_from_draining_ext_list(wsi);
rx_draining_ext = 1;
lwsl_err("%s: doing draining flow\n", __func__);
@ -1223,6 +1231,9 @@ handle_first:
case LWS_RXPS_PAYLOAD_UNTIL_LENGTH_EXHAUSTED:
assert(wsi->u.ws.rx_ubuf);
if (wsi->u.ws.rx_draining_ext)
goto drain_extension;
if (wsi->u.ws.rx_ubuf_head + LWS_PRE >=
wsi->u.ws.rx_ubuf_alloc) {
lwsl_err("Attempted overflow \n");
@ -1398,6 +1409,9 @@ drain_extension:
goto already_done;
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_RX, &eff_buf, 0);
/* eff_buf may be pointing somewhere completely different now,
* it's the output
*/
if (n < 0) {
/*
* we may rely on this to get RX, just drop connection
@ -1411,9 +1425,12 @@ drain_extension:
if (n && eff_buf.token_len) {
/* extension had more... main loop will come back */
// lwsl_notice("ext has stuff to drain\n");
wsi->u.ws.rx_draining_ext = 1;
wsi->u.ws.rx_draining_ext_list = pt->rx_draining_ext_list;
pt->rx_draining_ext_list = wsi;
} else {
lws_remove_wsi_from_draining_ext_list(wsi);
}
if (eff_buf.token_len > 0 ||

View file

@ -1914,7 +1914,7 @@ try_pollout:
if (accept_fd < 0) {
if (LWS_ERRNO == LWS_EAGAIN ||
LWS_ERRNO == LWS_EWOULDBLOCK) {
lwsl_err("accept asks to try again\n");
// lwsl_err("accept asks to try again\n");
break;
}
lwsl_err("ERROR on accept: %s\n", strerror(LWS_ERRNO));
@ -2073,6 +2073,7 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
}
if (wsi->u.ws.rx_draining_ext) {
// lwsl_notice("draining with 0\n");
m = lws_rx_sm(wsi, 0);
if (m < 0)
return -1;
@ -2084,7 +2085,8 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
wsi->rxflow_pos++;
/* consume payload bytes efficiently */
if (wsi->lws_rx_parse_state ==
if (
wsi->lws_rx_parse_state ==
LWS_RXPS_PAYLOAD_UNTIL_LENGTH_EXHAUSTED) {
m = lws_payload_until_length_exhausted(wsi, buf, &len);
if (wsi->rxflow_buffer)

View file

@ -666,6 +666,17 @@ completed:
}
#endif
static int
lws_is_ws_with_ext(struct lws *wsi)
{
#if defined(LWS_NO_EXTENSIONS)
return 0;
#else
return wsi->state == LWSS_ESTABLISHED &&
!!wsi->count_act_ext;
#endif
}
LWS_VISIBLE int
lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int tsi)
{
@ -943,9 +954,21 @@ read:
wsi->u.hdr.ah->rxpos;
} else {
if (wsi->mode != LWSCM_HTTP_CLIENT_ACCEPTED) {
/*
* extension may not consume everything (eg, pmd may be constrained
* as to what it can output...) has to go in per-wsi rx buf area.
* Otherwise in large temp serv_buf area.
*/
eff_buf.token = (char *)pt->serv_buf;
if (lws_is_ws_with_ext(wsi)) {
eff_buf.token_len = wsi->u.ws.rx_ubuf_alloc;
} else {
eff_buf.token_len = context->pt_serv_buf_size;
}
eff_buf.token_len = lws_ssl_capable_read(wsi,
pt->serv_buf, pending ? pending :
context->pt_serv_buf_size);
(unsigned char *)eff_buf.token, pending ? pending :
eff_buf.token_len);
switch (eff_buf.token_len) {
case 0:
lwsl_info("%s: zero length read\n", __func__);
@ -958,8 +981,7 @@ read:
lwsl_info("Closing when error\n");
goto close_and_handled;
}
eff_buf.token = (char *)pt->serv_buf;
// lwsl_notice("Actual RX %d\n", eff_buf.token_len);
}
}
@ -1016,6 +1038,8 @@ drain:
* around again it will pick up from where it
* left off.
*/
// lwsl_notice("doing lws_read from pt->serv_buf %p %p for len %d\n", pt->serv_buf, eff_buf.token, (int)eff_buf.token_len);
n = lws_read(wsi, (unsigned char *)eff_buf.token,
eff_buf.token_len);
if (n < 0) {
@ -1042,7 +1066,11 @@ drain:
pending = lws_ssl_pending(wsi);
if (pending) {
pending = pending > context->pt_serv_buf_size ?
if (lws_is_ws_with_ext(wsi))
pending = pending > wsi->u.ws.rx_ubuf_alloc ?
wsi->u.ws.rx_ubuf_alloc : pending;
else
pending = pending > context->pt_serv_buf_size ?
context->pt_serv_buf_size : pending;
goto read;
}

View file

@ -200,6 +200,11 @@ walk_final:
lws_callback_on_writable(wsi);
break;
case LWS_CALLBACK_RECEIVE:
lwsl_notice("pmd test: RX len %d\n", (int)len);
puts(in);
break;
case LWS_CALLBACK_CLOSED:
pss1 = vhd->live_pss_list;
pss2 = NULL;

View file

@ -131,6 +131,11 @@ callback_lws_status(struct lws *wsi, enum lws_callback_reasons reason,
update_status(wsi, pss);
break;
case LWS_CALLBACK_RECEIVE:
lwsl_notice("pmd test: RX len %d\n", (int)len);
puts(in);
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
m = lws_write(wsi, (unsigned char *)cache + LWS_PRE, cache_len,
LWS_WRITE_TEXT);

View file

@ -245,7 +245,8 @@ initiate the close, which it does with code 1001 and reason "Seeya".
<div id=s_status>Websocket connection not initialized</div>
</td>
<td colspan=1>
<span class="title">Server Info</span>
<span class="title">Server Info</span> <input type=button id=pmd value="Test pmd">
</td>
</tr><tr>
<td class="explain" colspan=2>
@ -347,6 +348,7 @@ document.getElementById('color').onclick = update_color;
document.getElementById('ot_open_btn').onclick = ot_open;
document.getElementById('ot_close_btn').onclick = ot_close;
document.getElementById('ot_req_close_btn').onclick = ot_req_close;
document.getElementById('pmd').onclick = on_pmd;
/*
* We display untrusted stuff in html context... reject anything
@ -614,6 +616,10 @@ function junk() {
socket_di.send(word);
}
function on_pmd() {
socket_status.send("{ \"RequestType\":\"DDoS\", \"blob\":\"\" }");
}
var socket_ot;
function ot_open() {