pmd: split ebufs to track in and out
This commit is contained in:
parent
8d473ad78c
commit
7ca8b77f2c
|
@ -214,8 +214,6 @@ lws_adopt_descriptor_vhost(struct lws_vhost *vh, lws_adoption_type type,
|
|||
if (new_wsi->role_ops->adoption_cb[lwsi_role_server(new_wsi)])
|
||||
n = new_wsi->role_ops->adoption_cb[lwsi_role_server(new_wsi)];
|
||||
|
||||
lwsl_err("new wsi wsistate 0x%x\n", new_wsi->wsistate);
|
||||
|
||||
if (context->event_loop_ops->accept)
|
||||
if (context->event_loop_ops->accept(new_wsi))
|
||||
goto fail;
|
||||
|
|
|
@ -148,6 +148,16 @@ enum lws_parser_return {
|
|||
LPR_FORBIDDEN = -2
|
||||
};
|
||||
|
||||
enum pmd_return {
|
||||
PMDR_UNKNOWN,
|
||||
PMDR_DID_NOTHING,
|
||||
PMDR_HAS_PENDING,
|
||||
PMDR_EMPTY_NONFINAL,
|
||||
PMDR_EMPTY_FINAL,
|
||||
|
||||
PMDR_FAILED = -1
|
||||
};
|
||||
|
||||
typedef union {
|
||||
#ifdef LWS_WITH_IPV6
|
||||
struct sockaddr_in6 sa6;
|
||||
|
|
|
@ -324,7 +324,7 @@ __lws_rx_flow_control(struct lws *wsi)
|
|||
if (lws_buflist_next_segment_len(&wsi->buflist, NULL)) {
|
||||
/* get ourselves called back to deal with stashed buffer */
|
||||
lws_callback_on_writable(wsi);
|
||||
return 0;
|
||||
// return 0;
|
||||
}
|
||||
|
||||
/* now the pending is cleared, we can change rxflow state */
|
||||
|
|
|
@ -33,6 +33,11 @@ lws_send_pipe_choked(struct lws *wsi)
|
|||
struct lws_pollfd fds;
|
||||
struct lws *wsi_eff;
|
||||
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
if (wsi->ws && wsi->ws->tx_draining_ext)
|
||||
return 1;
|
||||
#endif
|
||||
|
||||
#if defined(LWS_WITH_HTTP2)
|
||||
wsi_eff = lws_get_network_wsi(wsi);
|
||||
#else
|
||||
|
|
|
@ -29,16 +29,18 @@
|
|||
int lws_ws_client_rx_sm(struct lws *wsi, unsigned char c)
|
||||
{
|
||||
int callback_action = LWS_CALLBACK_CLIENT_RECEIVE;
|
||||
int handled, m;
|
||||
struct lws_ext_pm_deflate_rx_ebufs pmdrx;
|
||||
unsigned short close_code;
|
||||
struct lws_tokens ebuf;
|
||||
unsigned char *pp;
|
||||
int handled, m, n;
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
int rx_draining_ext = 0, n;
|
||||
int rx_draining_ext = 0;
|
||||
#endif
|
||||
|
||||
ebuf.token = NULL;
|
||||
ebuf.len = 0;
|
||||
pmdrx.eb_in.token = NULL;
|
||||
pmdrx.eb_in.len = 0;
|
||||
pmdrx.eb_out.token = NULL;
|
||||
pmdrx.eb_out.len = 0;
|
||||
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
if (wsi->ws->rx_draining_ext) {
|
||||
|
@ -67,6 +69,14 @@ int lws_ws_client_rx_sm(struct lws *wsi, unsigned char c)
|
|||
switch (wsi->ws->opcode) {
|
||||
case LWSWSOPC_TEXT_FRAME:
|
||||
wsi->ws->rsv_first_msg = (c & 0x70);
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
/*
|
||||
* set the expectation that we will have to
|
||||
* fake up the zlib trailer to the inflator for
|
||||
* this frame
|
||||
*/
|
||||
wsi->ws->pmd_trailer_application = !!(c & 0x40);
|
||||
#endif
|
||||
wsi->ws->continuation_possible = 1;
|
||||
wsi->ws->check_utf8 = lws_check_opt(
|
||||
wsi->context->options,
|
||||
|
@ -76,6 +86,14 @@ int lws_ws_client_rx_sm(struct lws *wsi, unsigned char c)
|
|||
break;
|
||||
case LWSWSOPC_BINARY_FRAME:
|
||||
wsi->ws->rsv_first_msg = (c & 0x70);
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
/*
|
||||
* set the expectation that we will have to
|
||||
* fake up the zlib trailer to the inflator for
|
||||
* this frame
|
||||
*/
|
||||
wsi->ws->pmd_trailer_application = !!(c & 0x40);
|
||||
#endif
|
||||
wsi->ws->check_utf8 = 0;
|
||||
wsi->ws->continuation_possible = 1;
|
||||
wsi->ws->first_fragment = 1;
|
||||
|
@ -322,11 +340,18 @@ int lws_ws_client_rx_sm(struct lws *wsi, unsigned char c)
|
|||
if (wsi->ws->this_frame_masked && !wsi->ws->all_zero_nonce)
|
||||
c ^= wsi->ws->mask[(wsi->ws->mask_idx++) & 3];
|
||||
|
||||
/*
|
||||
* unmask and collect the payload body in
|
||||
* rx_ubuf_head + LWS_PRE
|
||||
*/
|
||||
|
||||
wsi->ws->rx_ubuf[LWS_PRE + (wsi->ws->rx_ubuf_head++)] = c;
|
||||
|
||||
if (--wsi->ws->rx_packet_length == 0) {
|
||||
/* spill because we have the whole frame */
|
||||
wsi->lws_rx_parse_state = LWS_RXPS_NEW;
|
||||
lwsl_debug("%s: spilling as we have the whole frame\n",
|
||||
__func__);
|
||||
goto spill;
|
||||
}
|
||||
|
||||
|
@ -343,6 +368,9 @@ int lws_ws_client_rx_sm(struct lws *wsi, unsigned char c)
|
|||
break;
|
||||
|
||||
/* spill because we filled our rx buffer */
|
||||
|
||||
lwsl_debug("%s: spilling as we filled our rx buffer\n",
|
||||
__func__);
|
||||
spill:
|
||||
|
||||
handled = 0;
|
||||
|
@ -481,111 +509,166 @@ ping_drop:
|
|||
|
||||
/*
|
||||
* No it's real payload, pass it up to the user callback.
|
||||
*
|
||||
* We have been statefully collecting it in the
|
||||
* LWS_RXPS_WS_FRAME_PAYLOAD clause above.
|
||||
*
|
||||
* It's nicely buffered with the pre-padding taken care of
|
||||
* so it can be sent straight out again using lws_write
|
||||
* so it can be sent straight out again using lws_write.
|
||||
*
|
||||
* However, now we have a chunk of it, we want to deal with it
|
||||
* all here. Since this may be input to permessage-deflate and
|
||||
* there are block limits on that for input and output, we may
|
||||
* need to iterate.
|
||||
*/
|
||||
if (handled)
|
||||
goto already_done;
|
||||
|
||||
ebuf.token = &wsi->ws->rx_ubuf[LWS_PRE];
|
||||
ebuf.len = wsi->ws->rx_ubuf_head;
|
||||
pmdrx.eb_in.token = &wsi->ws->rx_ubuf[LWS_PRE];
|
||||
pmdrx.eb_in.len = wsi->ws->rx_ubuf_head;
|
||||
|
||||
/* for the non-pm-deflate case */
|
||||
|
||||
pmdrx.eb_out = pmdrx.eb_in;
|
||||
|
||||
lwsl_debug("%s: starting disbursal of %d deframed rx\n",
|
||||
__func__, wsi->ws->rx_ubuf_head);
|
||||
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
drain_extension:
|
||||
lwsl_ext("%s: passing %d to ext\n", __func__, ebuf.len);
|
||||
|
||||
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_RX, &ebuf, 0);
|
||||
lwsl_ext("Ext RX returned %d\n", n);
|
||||
if (n < 0) {
|
||||
wsi->socket_is_permanently_unusable = 1;
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
lwsl_debug("post inflate ebuf len %d\n", ebuf.len);
|
||||
do {
|
||||
|
||||
// lwsl_notice("%s: pmdrx.eb_in.len: %d\n", __func__,
|
||||
// (int)pmdrx.eb_in.len);
|
||||
|
||||
n = PMDR_DID_NOTHING;
|
||||
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
if (rx_draining_ext && !ebuf.len) {
|
||||
lwsl_debug(" --- ending drain on 0 read result\n");
|
||||
goto already_done;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (wsi->ws->check_utf8 && !wsi->ws->defeat_check_utf8) {
|
||||
if (lws_check_utf8(&wsi->ws->utf8,
|
||||
(unsigned char *)ebuf.token,
|
||||
ebuf.len)) {
|
||||
lws_close_reason(wsi,
|
||||
LWS_CLOSE_STATUS_INVALID_PAYLOAD,
|
||||
(uint8_t *)"bad utf8", 8);
|
||||
goto utf8_fail;
|
||||
}
|
||||
|
||||
/* we are ending partway through utf-8 character? */
|
||||
if (!wsi->ws->rx_packet_length && wsi->ws->final &&
|
||||
wsi->ws->utf8
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
&& !n
|
||||
#endif
|
||||
) {
|
||||
lwsl_info("FINAL utf8 error\n");
|
||||
lws_close_reason(wsi,
|
||||
LWS_CLOSE_STATUS_INVALID_PAYLOAD,
|
||||
(uint8_t *)"partial utf8", 12);
|
||||
utf8_fail:
|
||||
lwsl_info("utf8 error\n");
|
||||
lwsl_hexdump_info(ebuf.token, ebuf.len);
|
||||
lwsl_ext("%s: +++ passing %d %p to ext\n", __func__,
|
||||
pmdrx.eb_in.len, pmdrx.eb_in.token);
|
||||
|
||||
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_RX,
|
||||
&pmdrx, 0);
|
||||
lwsl_ext("Ext RX returned %d\n", n);
|
||||
if (n < 0) {
|
||||
wsi->socket_is_permanently_unusable = 1;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (ebuf.len < 0 &&
|
||||
callback_action != LWS_CALLBACK_CLIENT_RECEIVE_PONG)
|
||||
goto already_done;
|
||||
|
||||
if (!ebuf.token)
|
||||
goto already_done;
|
||||
|
||||
ebuf.token[ebuf.len] = '\0';
|
||||
|
||||
if (!wsi->protocol->callback)
|
||||
goto already_done;
|
||||
|
||||
if (callback_action == LWS_CALLBACK_CLIENT_RECEIVE_PONG)
|
||||
lwsl_info("Client doing pong callback\n");
|
||||
|
||||
if (
|
||||
/* coverity says dead code otherwise */
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
n &&
|
||||
if (n == PMDR_DID_NOTHING)
|
||||
break;
|
||||
#endif
|
||||
ebuf.len)
|
||||
/* extension had more... main loop will come back
|
||||
* we want callback to be done with this set, if so,
|
||||
* because lws_is_final() hides it was final until the
|
||||
* last chunk
|
||||
*/
|
||||
lws_add_wsi_to_draining_ext_list(wsi);
|
||||
else
|
||||
lws_remove_wsi_from_draining_ext_list(wsi);
|
||||
lwsl_ext("%s: post inflate ebuf in len %d / out len %d\n",
|
||||
__func__, pmdrx.eb_in.len, pmdrx.eb_out.len);
|
||||
|
||||
if (lwsi_state(wsi) == LRS_RETURNED_CLOSE ||
|
||||
lwsi_state(wsi) == LRS_WAITING_TO_SEND_CLOSE ||
|
||||
lwsi_state(wsi) == LRS_AWAITING_CLOSE_ACK)
|
||||
goto already_done;
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
if (rx_draining_ext && !pmdrx.eb_out.len) {
|
||||
lwsl_debug(" --- ending drain on 0 read result\n");
|
||||
goto already_done;
|
||||
}
|
||||
|
||||
m = wsi->protocol->callback(wsi,
|
||||
(enum lws_callback_reasons)callback_action,
|
||||
wsi->user_space, ebuf.token, ebuf.len);
|
||||
if (n == PMDR_HAS_PENDING) { /* 1 means stuff to drain */
|
||||
/* extension had more... main loop will come back */
|
||||
lwsl_ext("%s: adding to draining ext list\n",
|
||||
__func__);
|
||||
lws_add_wsi_to_draining_ext_list(wsi);
|
||||
} else {
|
||||
lwsl_ext("%s: removing from draining ext list\n",
|
||||
__func__);
|
||||
lws_remove_wsi_from_draining_ext_list(wsi);
|
||||
}
|
||||
rx_draining_ext = wsi->ws->rx_draining_ext;
|
||||
#endif
|
||||
|
||||
wsi->ws->first_fragment = 0;
|
||||
if (wsi->ws->check_utf8 && !wsi->ws->defeat_check_utf8) {
|
||||
|
||||
// lwsl_notice("%s: bulk ws rx: input used %d, output %d\n",
|
||||
// __func__, wsi->ws->rx_ubuf_head, ebuf.len);
|
||||
if (lws_check_utf8(&wsi->ws->utf8,
|
||||
(unsigned char *)pmdrx.eb_out.token,
|
||||
pmdrx.eb_out.len)) {
|
||||
lws_close_reason(wsi,
|
||||
LWS_CLOSE_STATUS_INVALID_PAYLOAD,
|
||||
(uint8_t *)"bad utf8", 8);
|
||||
goto utf8_fail;
|
||||
}
|
||||
|
||||
/* if user code wants to close, let caller know */
|
||||
if (m)
|
||||
return 1;
|
||||
/* we are ending partway through utf-8 character? */
|
||||
if (!wsi->ws->rx_packet_length &&
|
||||
wsi->ws->final && wsi->ws->utf8
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
/* if ext not negotiated, going to be UNKNOWN */
|
||||
&& (n == PMDR_EMPTY_FINAL || n == PMDR_UNKNOWN)
|
||||
#endif
|
||||
) {
|
||||
lwsl_info("FINAL utf8 error\n");
|
||||
lws_close_reason(wsi,
|
||||
LWS_CLOSE_STATUS_INVALID_PAYLOAD,
|
||||
(uint8_t *)"partial utf8", 12);
|
||||
utf8_fail:
|
||||
lwsl_info("utf8 error\n");
|
||||
lwsl_hexdump_info(pmdrx.eb_out.token,
|
||||
pmdrx.eb_out.len);
|
||||
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pmdrx.eb_out.len < 0 &&
|
||||
callback_action != LWS_CALLBACK_CLIENT_RECEIVE_PONG)
|
||||
goto already_done;
|
||||
|
||||
if (!pmdrx.eb_out.token)
|
||||
goto already_done;
|
||||
|
||||
pmdrx.eb_out.token[pmdrx.eb_out.len] = '\0';
|
||||
|
||||
if (!wsi->protocol->callback)
|
||||
goto already_done;
|
||||
|
||||
if (callback_action == LWS_CALLBACK_CLIENT_RECEIVE_PONG)
|
||||
lwsl_info("Client doing pong callback\n");
|
||||
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
if (n == PMDR_HAS_PENDING)
|
||||
/* extension had more... main loop will come back
|
||||
* we want callback to be done with this set, if so,
|
||||
* because lws_is_final() hides it was final until the
|
||||
* last chunk
|
||||
*/
|
||||
lws_add_wsi_to_draining_ext_list(wsi);
|
||||
else
|
||||
lws_remove_wsi_from_draining_ext_list(wsi);
|
||||
#endif
|
||||
|
||||
if (lwsi_state(wsi) == LRS_RETURNED_CLOSE ||
|
||||
lwsi_state(wsi) == LRS_WAITING_TO_SEND_CLOSE ||
|
||||
lwsi_state(wsi) == LRS_AWAITING_CLOSE_ACK)
|
||||
goto already_done;
|
||||
|
||||
/* if pmd not enabled, in == out */
|
||||
|
||||
if (n == PMDR_DID_NOTHING || n == PMDR_UNKNOWN)
|
||||
pmdrx.eb_in.len -= pmdrx.eb_out.len;
|
||||
|
||||
m = wsi->protocol->callback(wsi,
|
||||
(enum lws_callback_reasons)callback_action,
|
||||
wsi->user_space, pmdrx.eb_out.token,
|
||||
pmdrx.eb_out.len);
|
||||
|
||||
wsi->ws->first_fragment = 0;
|
||||
|
||||
lwsl_debug("%s: bulk ws rx: inp used %d, output %d\n",
|
||||
__func__, wsi->ws->rx_ubuf_head,
|
||||
pmdrx.eb_out.len);
|
||||
|
||||
/* if user code wants to close, let caller know */
|
||||
if (m)
|
||||
return 1;
|
||||
|
||||
} while (pmdrx.eb_in.len
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
|| rx_draining_ext
|
||||
#endif
|
||||
);
|
||||
|
||||
already_done:
|
||||
wsi->ws->rx_ubuf_head = 0;
|
||||
|
|
|
@ -112,7 +112,7 @@ lws_ws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len)
|
|||
if (wsi->ws->rx_draining_ext) {
|
||||
int m;
|
||||
|
||||
//lwsl_notice("%s: draining ext\n", __func__);
|
||||
lwsl_info("%s: draining ext\n", __func__);
|
||||
if (lwsi_role_client(wsi))
|
||||
m = lws_ws_client_rx_sm(wsi, 0);
|
||||
else
|
||||
|
@ -122,7 +122,10 @@ lws_ws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len)
|
|||
continue;
|
||||
}
|
||||
#endif
|
||||
/* caller will account for buflist usage */
|
||||
/*
|
||||
* caller will account for buflist usage by studying what
|
||||
* happened to *buf
|
||||
*/
|
||||
|
||||
if (lws_ws_client_rx_sm(wsi, *(*buf)++)) {
|
||||
lwsl_notice("%s: client_rx_sm exited, DROPPING %d\n",
|
||||
|
|
|
@ -63,6 +63,8 @@ lws_extension_pmdeflate_restrict_args(struct lws *wsi,
|
|||
}
|
||||
}
|
||||
|
||||
static unsigned char trail[] = { 0, 0, 0xff, 0xff };
|
||||
|
||||
LWS_VISIBLE int
|
||||
lws_extension_callback_pm_deflate(struct lws_context *context,
|
||||
const struct lws_extension *ext,
|
||||
|
@ -72,10 +74,12 @@ lws_extension_callback_pm_deflate(struct lws_context *context,
|
|||
{
|
||||
struct lws_ext_pm_deflate_priv *priv =
|
||||
(struct lws_ext_pm_deflate_priv *)user;
|
||||
struct lws_tokens *ebuf = (struct lws_tokens *)in;
|
||||
static unsigned char trail[] = { 0, 0, 0xff, 0xff };
|
||||
int n, ret = 0, was_fin = 0, extra;
|
||||
struct lws_ext_pm_deflate_rx_ebufs *pmdrx =
|
||||
(struct lws_ext_pm_deflate_rx_ebufs *)in;
|
||||
struct lws_ext_option_arg *oa;
|
||||
int n, ret = 0, was_fin = 0, m;
|
||||
unsigned int pen = 0;
|
||||
int penbits = 0;
|
||||
|
||||
switch (reason) {
|
||||
case LWS_EXT_CB_NAMED_OPTION_SET:
|
||||
|
@ -174,54 +178,80 @@ lws_extension_callback_pm_deflate(struct lws_context *context,
|
|||
if (priv->tx_init)
|
||||
(void)deflateEnd(&priv->tx);
|
||||
lws_free(priv);
|
||||
|
||||
return ret;
|
||||
|
||||
|
||||
case LWS_EXT_CB_PAYLOAD_RX:
|
||||
lwsl_ext(" %s: LWS_EXT_CB_PAYLOAD_RX: in %d, existing in %d\n",
|
||||
__func__, ebuf->len, priv->rx.avail_in);
|
||||
__func__, pmdrx->eb_in.len, priv->rx.avail_in);
|
||||
|
||||
/* if this frame is not marked as compressed, we ignore it */
|
||||
|
||||
if (!(wsi->ws->rsv_first_msg & 0x40) || (wsi->ws->opcode & 8))
|
||||
return 0;
|
||||
return PMDR_DID_NOTHING;
|
||||
|
||||
// lwsl_hexdump_debug(ebuf->token, ebuf->len);
|
||||
/*
|
||||
* we shouldn't come back in here if we already applied the
|
||||
* trailer for this compressed packet
|
||||
*/
|
||||
if (!wsi->ws->pmd_trailer_application)
|
||||
return PMDR_DID_NOTHING;
|
||||
|
||||
if (!priv->rx_init)
|
||||
pmdrx->eb_out.len = 0;
|
||||
|
||||
lwsl_ext("%s: LWS_EXT_CB_PAYLOAD_RX: in %d, "
|
||||
"existing avail in %d, pkt fin: %d\n", __func__,
|
||||
pmdrx->eb_in.len, priv->rx.avail_in,
|
||||
wsi->ws->final);
|
||||
|
||||
/* if needed, initialize the inflator */
|
||||
|
||||
if (!priv->rx_init) {
|
||||
if (inflateInit2(&priv->rx,
|
||||
-priv->args[PMD_SERVER_MAX_WINDOW_BITS]) != Z_OK) {
|
||||
lwsl_err("%s: iniflateInit failed\n", __func__);
|
||||
return -1;
|
||||
return PMDR_FAILED;
|
||||
}
|
||||
priv->rx_init = 1;
|
||||
if (!priv->buf_rx_inflated)
|
||||
priv->buf_rx_inflated = lws_malloc(LWS_PRE + 7 + 5 +
|
||||
priv->rx_init = 1;
|
||||
if (!priv->buf_rx_inflated)
|
||||
priv->buf_rx_inflated = lws_malloc(
|
||||
LWS_PRE + 7 + 5 +
|
||||
(1 << priv->args[PMD_RX_BUF_PWR2]),
|
||||
"pmd rx inflate buf");
|
||||
if (!priv->buf_rx_inflated) {
|
||||
lwsl_err("%s: OOM\n", __func__);
|
||||
return -1;
|
||||
if (!priv->buf_rx_inflated) {
|
||||
lwsl_err("%s: OOM\n", __func__);
|
||||
return PMDR_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
/*
|
||||
* We have to leave the input stream alone if we didn't
|
||||
* finish with it yet. The input stream is held in the wsi
|
||||
* rx buffer by the caller, so this assumption is safe while
|
||||
* we block new rx while draining the existing rx
|
||||
* don't give us new input while we still work through
|
||||
* the last input
|
||||
*/
|
||||
if (!priv->rx.avail_in && ebuf->token && ebuf->len) {
|
||||
priv->rx.next_in = (unsigned char *)ebuf->token;
|
||||
priv->rx.avail_in = ebuf->len;
|
||||
|
||||
if (priv->rx.avail_in && pmdrx->eb_in.token &&
|
||||
pmdrx->eb_in.len) {
|
||||
lwsl_warn("%s: priv->rx.avail_in %d while getting new in\n",
|
||||
__func__, priv->rx.avail_in);
|
||||
// assert(0);
|
||||
}
|
||||
#endif
|
||||
if (!priv->rx.avail_in && pmdrx->eb_in.token && pmdrx->eb_in.len) {
|
||||
priv->rx.next_in = (unsigned char *)pmdrx->eb_in.token;
|
||||
priv->rx.avail_in = pmdrx->eb_in.len;
|
||||
}
|
||||
|
||||
priv->rx.next_out = priv->buf_rx_inflated + LWS_PRE;
|
||||
ebuf->token = (char *)priv->rx.next_out;
|
||||
pmdrx->eb_out.token = (char *)priv->rx.next_out;
|
||||
priv->rx.avail_out = 1 << priv->args[PMD_RX_BUF_PWR2];
|
||||
|
||||
if (priv->rx_held_valid) {
|
||||
lwsl_ext("-- RX piling on held byte --\n");
|
||||
*(priv->rx.next_out++) = priv->rx_held;
|
||||
priv->rx.avail_out--;
|
||||
priv->rx_held_valid = 0;
|
||||
}
|
||||
pen = penbits = 0;
|
||||
deflatePending(&priv->rx, &pen, &penbits);
|
||||
pen |= penbits;
|
||||
|
||||
/* if...
|
||||
/* so... if...
|
||||
*
|
||||
* - he has no remaining input content for this message, and
|
||||
* - and this is the final fragment, and
|
||||
|
@ -230,15 +260,26 @@ lws_extension_callback_pm_deflate(struct lws_context *context,
|
|||
* ...then put back the 00 00 FF FF the sender stripped as our
|
||||
* input to zlib
|
||||
*/
|
||||
if (!priv->rx.avail_in && wsi->ws->final &&
|
||||
!wsi->ws->rx_packet_length) {
|
||||
lwsl_ext("RX APPEND_TRAILER-DO\n");
|
||||
if (!priv->rx.avail_in &&
|
||||
wsi->ws->final &&
|
||||
!wsi->ws->rx_packet_length &&
|
||||
wsi->ws->pmd_trailer_application) {
|
||||
lwsl_ext("%s: trailer apply 1\n", __func__);
|
||||
was_fin = 1;
|
||||
wsi->ws->pmd_trailer_application = 0;
|
||||
priv->rx.next_in = trail;
|
||||
priv->rx.avail_in = sizeof(trail);
|
||||
}
|
||||
|
||||
n = inflate(&priv->rx, Z_NO_FLUSH);
|
||||
/*
|
||||
* if after all that there's nothing pending and nothing to give
|
||||
* him right now, bail without having done anything
|
||||
*/
|
||||
|
||||
if (!priv->rx.avail_in && !pen)
|
||||
return PMDR_DID_NOTHING;
|
||||
|
||||
n = inflate(&priv->rx, was_fin ? Z_SYNC_FLUSH : Z_NO_FLUSH);
|
||||
lwsl_ext("inflate ret %d, avi %d, avo %d, wsifinal %d\n", n,
|
||||
priv->rx.avail_in, priv->rx.avail_out, wsi->ws->final);
|
||||
switch (n) {
|
||||
|
@ -246,29 +287,51 @@ lws_extension_callback_pm_deflate(struct lws_context *context,
|
|||
case Z_STREAM_ERROR:
|
||||
case Z_DATA_ERROR:
|
||||
case Z_MEM_ERROR:
|
||||
lwsl_notice("zlib error inflate %d: %s\n",
|
||||
n, priv->rx.msg);
|
||||
return -1;
|
||||
lwsl_err("%s: zlib error inflate %d: \"%s\"\n",
|
||||
__func__, n, priv->rx.msg);
|
||||
return PMDR_FAILED;
|
||||
}
|
||||
|
||||
/*
|
||||
* If we did not already send in the 00 00 FF FF, and he's
|
||||
* out of input, he did not EXACTLY fill the output buffer
|
||||
* (which is ambiguous and we will force it to go around
|
||||
* again by withholding a byte), and he's otherwise working on
|
||||
* being a FIN fragment, then do the FIN message processing
|
||||
* of faking up the 00 00 FF FF that the sender stripped.
|
||||
* track how much input was used, and advance it
|
||||
*/
|
||||
if (!priv->rx.avail_in && wsi->ws->final &&
|
||||
!wsi->ws->rx_packet_length && !was_fin &&
|
||||
priv->rx.avail_out /* ambiguous as to if it is the end */
|
||||
|
||||
pmdrx->eb_in.token = (char *)pmdrx->eb_in.token +
|
||||
(pmdrx->eb_in.len - priv->rx.avail_in);
|
||||
pmdrx->eb_in.len = priv->rx.avail_in;
|
||||
|
||||
pen = penbits = 0;
|
||||
deflatePending(&priv->rx, &pen, &penbits);
|
||||
pen |= penbits;
|
||||
|
||||
lwsl_debug("%s: %d %d %d %d %d %d\n", __func__,
|
||||
priv->rx.avail_in,
|
||||
wsi->ws->final,
|
||||
(int)wsi->ws->rx_packet_length,
|
||||
was_fin,
|
||||
wsi->ws->pmd_trailer_application,
|
||||
pen);
|
||||
|
||||
if (!priv->rx.avail_in &&
|
||||
wsi->ws->final &&
|
||||
!wsi->ws->rx_packet_length &&
|
||||
!was_fin &&
|
||||
wsi->ws->pmd_trailer_application &&
|
||||
!pen
|
||||
) {
|
||||
lwsl_ext("RX APPEND_TRAILER-DO\n");
|
||||
lwsl_ext("%s: RX trailer apply 2\n", __func__);
|
||||
|
||||
/* we overallocated just for this situation where
|
||||
* we might issue something */
|
||||
priv->rx.avail_out += 5;
|
||||
|
||||
was_fin = 1;
|
||||
wsi->ws->pmd_trailer_application = 0;
|
||||
priv->rx.next_in = trail;
|
||||
priv->rx.avail_in = sizeof(trail);
|
||||
n = inflate(&priv->rx, Z_SYNC_FLUSH);
|
||||
lwsl_ext("RX trailer inf returned %d, avi %d, avo %d\n",
|
||||
n, priv->rx.avail_in, priv->rx.avail_out);
|
||||
lwsl_ext("RX trailer infl ret %d, avi %d, avo %d\n",
|
||||
n, priv->rx.avail_in, priv->rx.avail_out);
|
||||
switch (n) {
|
||||
case Z_NEED_DICT:
|
||||
case Z_STREAM_ERROR:
|
||||
|
@ -278,55 +341,45 @@ lws_extension_callback_pm_deflate(struct lws_context *context,
|
|||
n, priv->rx.msg);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
/*
|
||||
* we must announce in our returncode now if there is more
|
||||
* output to be expected from inflate, so we can decide to
|
||||
* set the FIN bit on this bufferload or not. However zlib
|
||||
* is ambiguous when we exactly filled the inflate buffer. It
|
||||
* does not give us a clue as to whether we should understand
|
||||
* that to mean he ended on a buffer boundary, or if there is
|
||||
* more in the pipeline.
|
||||
*
|
||||
* So to work around that safely, if it used all output space
|
||||
* exactly, we ALWAYS say there is more coming and we withhold
|
||||
* the last byte of the buffer to guarantee that is true.
|
||||
*
|
||||
* That still leaves us at least one byte to finish with a FIN
|
||||
* on, even if actually nothing more is coming from the next
|
||||
* inflate action itself.
|
||||
*/
|
||||
if (!priv->rx.avail_out) { /* he used all available out buf */
|
||||
lwsl_ext("-- rx grabbing held --\n");
|
||||
/* snip the last byte and hold it for next time */
|
||||
priv->rx_held = *(--priv->rx.next_out);
|
||||
priv->rx_held_valid = 1;
|
||||
|
||||
assert(priv->rx.avail_out);
|
||||
|
||||
pen = penbits = 0;
|
||||
deflatePending(&priv->rx, &pen, &penbits);
|
||||
pen |= penbits;
|
||||
}
|
||||
|
||||
ebuf->len = lws_ptr_diff(priv->rx.next_out, ebuf->token);
|
||||
priv->count_rx_between_fin += ebuf->len;
|
||||
pmdrx->eb_out.len = lws_ptr_diff(priv->rx.next_out,
|
||||
pmdrx->eb_out.token);
|
||||
priv->count_rx_between_fin += pmdrx->eb_out.len;
|
||||
|
||||
lwsl_ext(" %s: RX leaving with new effbuff len %d, "
|
||||
"ret %d, rx.avail_in=%d, TOTAL RX since FIN %lu\n",
|
||||
__func__, ebuf->len, priv->rx_held_valid,
|
||||
priv->rx.avail_in,
|
||||
"rx.avail_in=%d, TOTAL RX since FIN %lu\n",
|
||||
__func__, pmdrx->eb_out.len, priv->rx.avail_in,
|
||||
(unsigned long)priv->count_rx_between_fin);
|
||||
|
||||
if (was_fin) {
|
||||
lwsl_ext("%s: was_fin\n", __func__);
|
||||
assert(!pen);
|
||||
priv->count_rx_between_fin = 0;
|
||||
if (priv->args[PMD_SERVER_NO_CONTEXT_TAKEOVER]) {
|
||||
lwsl_ext("PMD_SERVER_NO_CONTEXT_TAKEOVER\n");
|
||||
(void)inflateEnd(&priv->rx);
|
||||
priv->rx_init = 0;
|
||||
}
|
||||
|
||||
return PMDR_EMPTY_FINAL;
|
||||
}
|
||||
|
||||
// lwsl_hexdump_debug(ebuf->token, ebuf->len);
|
||||
if (pen || priv->rx.avail_in)
|
||||
return PMDR_HAS_PENDING;
|
||||
|
||||
return priv->rx_held_valid;
|
||||
return PMDR_EMPTY_NONFINAL;
|
||||
|
||||
case LWS_EXT_CB_PAYLOAD_TX:
|
||||
|
||||
/* initialize us if needed */
|
||||
|
||||
if (!priv->tx_init) {
|
||||
n = deflateInit2(&priv->tx, priv->args[PMD_COMP_LEVEL],
|
||||
Z_DEFLATED,
|
||||
|
@ -336,139 +389,160 @@ lws_extension_callback_pm_deflate(struct lws_context *context,
|
|||
Z_DEFAULT_STRATEGY);
|
||||
if (n != Z_OK) {
|
||||
lwsl_ext("inflateInit2 failed %d\n", n);
|
||||
return 1;
|
||||
return PMDR_FAILED;
|
||||
}
|
||||
priv->tx_init = 1;
|
||||
}
|
||||
priv->tx_init = 1;
|
||||
|
||||
if (!priv->buf_tx_deflated)
|
||||
priv->buf_tx_deflated = lws_malloc(LWS_PRE + 7 + 5 +
|
||||
(1 << priv->args[PMD_TX_BUF_PWR2]),
|
||||
"pmd tx deflate buf");
|
||||
if (!priv->buf_tx_deflated) {
|
||||
lwsl_err("%s: OOM\n", __func__);
|
||||
return -1;
|
||||
return PMDR_FAILED;
|
||||
}
|
||||
|
||||
if (ebuf->token) {
|
||||
lwsl_ext("%s: TX: ebuf length %d\n", __func__,
|
||||
ebuf->len);
|
||||
priv->tx.next_in = (unsigned char *)ebuf->token;
|
||||
priv->tx.avail_in = ebuf->len;
|
||||
}
|
||||
/* hook us up with any deflated input that the caller has */
|
||||
|
||||
#if 0
|
||||
for (n = 0; n < ebuf->len; n++) {
|
||||
printf("%02X ", (unsigned char)ebuf->token[n]);
|
||||
if ((n & 15) == 15)
|
||||
printf("\n");
|
||||
if (pmdrx->eb_in.token) {
|
||||
|
||||
assert(!priv->tx.avail_in);
|
||||
|
||||
priv->count_tx_between_fin += pmdrx->eb_in.len;
|
||||
lwsl_ext("%s: TX: eb_in length %d, "
|
||||
"TOTAL TX since FIN: %d\n", __func__,
|
||||
pmdrx->eb_in.len,
|
||||
(int)priv->count_tx_between_fin);
|
||||
priv->tx.next_in = (unsigned char *)pmdrx->eb_in.token;
|
||||
priv->tx.avail_in = pmdrx->eb_in.len;
|
||||
}
|
||||
printf("\n");
|
||||
#endif
|
||||
|
||||
priv->tx.next_out = priv->buf_tx_deflated + LWS_PRE + 5;
|
||||
ebuf->token = (char *)priv->tx.next_out;
|
||||
pmdrx->eb_out.token = (char *)priv->tx.next_out;
|
||||
priv->tx.avail_out = 1 << priv->args[PMD_TX_BUF_PWR2];
|
||||
|
||||
if (priv->tx.avail_in) {
|
||||
n = deflate(&priv->tx, Z_SYNC_FLUSH);
|
||||
if (n == Z_STREAM_ERROR) {
|
||||
lwsl_ext("%s: Z_STREAM_ERROR\n", __func__);
|
||||
return -1;
|
||||
}
|
||||
pen = penbits = 0;
|
||||
deflatePending(&priv->tx, &pen, &penbits);
|
||||
pen |= penbits;
|
||||
|
||||
if (!priv->tx.avail_in && (len & LWS_WRITE_NO_FIN)) {
|
||||
lwsl_ext("%s: no available in, pen: %u\n", __func__, pen);
|
||||
|
||||
if (!pen)
|
||||
return PMDR_DID_NOTHING;
|
||||
}
|
||||
|
||||
if (priv->tx_held_valid) {
|
||||
priv->tx_held_valid = 0;
|
||||
if ((int)priv->tx.avail_out ==
|
||||
1 << priv->args[PMD_TX_BUF_PWR2])
|
||||
/*
|
||||
* We can get a situation he took something in
|
||||
* but did not generate anything out, at the end
|
||||
* of a message (eg, next thing he sends is 80
|
||||
* 00, a zero length FIN, like Autobahn can
|
||||
* send).
|
||||
*
|
||||
* If we have come back as a FIN, we must not
|
||||
* place the pending trailer 00 00 FF FF, just
|
||||
* the 1 byte of live data
|
||||
*/
|
||||
|
||||
*(--ebuf->token) = priv->tx_held[0];
|
||||
else {
|
||||
/*
|
||||
* he generated some data on his own...
|
||||
* prepend the whole pending
|
||||
*/
|
||||
ebuf->token -= 5;
|
||||
for (n = 0; n < 5; n++)
|
||||
ebuf->token[n] = priv->tx_held[n];
|
||||
|
||||
}
|
||||
m = Z_NO_FLUSH;
|
||||
if (!(len & LWS_WRITE_NO_FIN)) {
|
||||
lwsl_ext("%s: deflate with SYNC_FLUSH, pkt len %d\n",
|
||||
__func__, (int)wsi->ws->rx_packet_length);
|
||||
m = Z_SYNC_FLUSH;
|
||||
}
|
||||
priv->compressed_out = 1;
|
||||
ebuf->len = lws_ptr_diff(priv->tx.next_out, ebuf->token);
|
||||
|
||||
n = deflate(&priv->tx, m);
|
||||
if (n == Z_STREAM_ERROR) {
|
||||
lwsl_notice("%s: Z_STREAM_ERROR\n", __func__);
|
||||
return PMDR_FAILED;
|
||||
}
|
||||
|
||||
pen = (!priv->tx.avail_out) && n != Z_STREAM_END;
|
||||
|
||||
lwsl_ext("%s: deflate ret %d, len 0x%x\n", __func__, n,
|
||||
(unsigned int)len);
|
||||
|
||||
if ((len & 0xf) == LWS_WRITE_TEXT)
|
||||
priv->tx_first_frame_type = LWSWSOPC_TEXT_FRAME;
|
||||
if ((len & 0xf) == LWS_WRITE_BINARY)
|
||||
priv->tx_first_frame_type = LWSWSOPC_BINARY_FRAME;
|
||||
|
||||
pmdrx->eb_out.len = lws_ptr_diff(priv->tx.next_out,
|
||||
pmdrx->eb_out.token);
|
||||
|
||||
if (m == Z_SYNC_FLUSH && !(len & LWS_WRITE_NO_FIN) && !pen &&
|
||||
pmdrx->eb_out.len < 4) {
|
||||
lwsl_err("%s: FAIL want to trim out length %d\n",
|
||||
__func__, (int)pmdrx->eb_out.len);
|
||||
assert(0);
|
||||
}
|
||||
|
||||
if (!(len & LWS_WRITE_NO_FIN) &&
|
||||
m == Z_SYNC_FLUSH &&
|
||||
!pen &&
|
||||
pmdrx->eb_out.len >= 4) {
|
||||
// lwsl_err("%s: Trimming 4 from end of write\n", __func__);
|
||||
priv->tx.next_out -= 4;
|
||||
priv->tx.avail_out += 4;
|
||||
priv->count_tx_between_fin = 0;
|
||||
|
||||
assert(priv->tx.next_out[0] == 0x00 &&
|
||||
priv->tx.next_out[1] == 0x00 &&
|
||||
priv->tx.next_out[2] == 0xff &&
|
||||
priv->tx.next_out[3] == 0xff);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* we must announce in our returncode now if there is more
|
||||
* output to be expected from inflate, so we can decide to
|
||||
* set the FIN bit on this bufferload or not. However zlib
|
||||
* is ambiguous when we exactly filled the inflate buffer. It
|
||||
* does not give us a clue as to whether we should understand
|
||||
* that to mean he ended on a buffer boundary, or if there is
|
||||
* more in the pipeline.
|
||||
*
|
||||
* Worse, the guy providing the stuff we are sending may not
|
||||
* know until after that this was, actually, the last chunk,
|
||||
* that can happen even if we did not fill the output buf, ie
|
||||
* he may send after this a zero-length FIN fragment.
|
||||
*
|
||||
* This is super difficult because we must snip the last 4
|
||||
* bytes in the case this is the last compressed output of the
|
||||
* message. The only way to deal with it is defer sending the
|
||||
* last 5 bytes of each frame until the next one, when we will
|
||||
* be in a position to understand if that has a FIN or not.
|
||||
* track how much input was used and advance it
|
||||
*/
|
||||
|
||||
extra = !!(len & LWS_WRITE_NO_FIN) || !priv->tx.avail_out;
|
||||
pmdrx->eb_in.token = (char *)pmdrx->eb_in.token +
|
||||
(pmdrx->eb_in.len - priv->tx.avail_in);
|
||||
pmdrx->eb_in.len = priv->tx.avail_in;
|
||||
|
||||
if (ebuf->len >= 4 + extra) {
|
||||
lwsl_ext("tx held %d\n", 4 + extra);
|
||||
priv->tx_held_valid = extra;
|
||||
for (n = 3 + extra; n >= 0; n--)
|
||||
priv->tx_held[n] = *(--priv->tx.next_out);
|
||||
ebuf->len -= 4 + extra;
|
||||
}
|
||||
lwsl_ext(" TX rewritten with new effbuff len %d, ret %d\n",
|
||||
ebuf->len, !priv->tx.avail_out);
|
||||
priv->compressed_out = 1;
|
||||
pmdrx->eb_out.len = lws_ptr_diff(priv->tx.next_out,
|
||||
pmdrx->eb_out.token);
|
||||
|
||||
return !priv->tx.avail_out; /* 1 == have more tx pending */
|
||||
lwsl_ext(" TX rewritten with new eb_in len %d, "
|
||||
"eb_out len %d, deflatePending %d\n",
|
||||
pmdrx->eb_in.len, pmdrx->eb_out.len, pen);
|
||||
|
||||
if (pmdrx->eb_in.len || pen)
|
||||
return PMDR_HAS_PENDING;
|
||||
|
||||
if (!(len & LWS_WRITE_NO_FIN))
|
||||
return PMDR_EMPTY_FINAL;
|
||||
|
||||
return PMDR_EMPTY_NONFINAL;
|
||||
|
||||
case LWS_EXT_CB_PACKET_TX_PRESEND:
|
||||
if (!priv->compressed_out)
|
||||
break;
|
||||
priv->compressed_out = 0;
|
||||
|
||||
if ((*(ebuf->token) & 0x80) &&
|
||||
/*
|
||||
* we may have not produced any output for the actual "first"
|
||||
* write... in that case, we need to fix up the inappropriate
|
||||
* use of CONTINUATION when the first real write does come.
|
||||
*/
|
||||
if (priv->tx_first_frame_type & 0xf) {
|
||||
*pmdrx->eb_in.token = ((*pmdrx->eb_in.token) & ~0xf) |
|
||||
(priv->tx_first_frame_type & 0xf);
|
||||
/*
|
||||
* We have now written the "first" fragment, only
|
||||
* do that once
|
||||
*/
|
||||
priv->tx_first_frame_type = 0;
|
||||
}
|
||||
|
||||
n = *(pmdrx->eb_in.token) & 15;
|
||||
|
||||
/* set RSV1, but not on CONTINUATION */
|
||||
if (n == LWSWSOPC_TEXT_FRAME || n == LWSWSOPC_BINARY_FRAME)
|
||||
*pmdrx->eb_in.token |= 0x40;
|
||||
|
||||
lwsl_ext("%s: PRESEND compressed: ws frame 0x%02X, len %d\n",
|
||||
__func__, ((*pmdrx->eb_in.token) & 0xff),
|
||||
pmdrx->eb_in.len);
|
||||
|
||||
if (((*pmdrx->eb_in.token) & 0x80) && /* fin */
|
||||
priv->args[PMD_CLIENT_NO_CONTEXT_TAKEOVER]) {
|
||||
lwsl_debug("PMD_CLIENT_NO_CONTEXT_TAKEOVER\n");
|
||||
(void)deflateEnd(&priv->tx);
|
||||
priv->tx_init = 0;
|
||||
}
|
||||
|
||||
n = *(ebuf->token) & 15;
|
||||
/* set RSV1, but not on CONTINUATION */
|
||||
if (n == LWSWSOPC_TEXT_FRAME || n == LWSWSOPC_BINARY_FRAME)
|
||||
*ebuf->token |= 0x40;
|
||||
#if 0
|
||||
for (n = 0; n < ebuf->len; n++) {
|
||||
printf("%02X ", (unsigned char)ebuf->token[n]);
|
||||
if ((n & 15) == 15)
|
||||
puts("\n");
|
||||
}
|
||||
puts("\n");
|
||||
#endif
|
||||
lwsl_ext("%s: tx opcode 0x%02X\n", __func__,
|
||||
(unsigned char)*ebuf->token);
|
||||
break;
|
||||
|
||||
default:
|
||||
|
|
|
@ -24,18 +24,19 @@ struct lws_ext_pm_deflate_priv {
|
|||
unsigned char *buf_rx_inflated; /* RX inflated output buffer */
|
||||
unsigned char *buf_tx_deflated; /* TX deflated output buffer */
|
||||
|
||||
unsigned char *buf_tx_holding;
|
||||
|
||||
size_t count_rx_between_fin;
|
||||
size_t count_tx_between_fin;
|
||||
|
||||
size_t len_tx_holding;
|
||||
|
||||
unsigned char args[PMD_ARG_COUNT];
|
||||
unsigned char tx_held[5];
|
||||
unsigned char rx_held;
|
||||
|
||||
unsigned char tx_first_frame_type;
|
||||
|
||||
unsigned char tx_init:1;
|
||||
unsigned char rx_init:1;
|
||||
unsigned char compressed_out:1;
|
||||
unsigned char rx_held_valid:1;
|
||||
unsigned char tx_held_valid:1;
|
||||
unsigned char rx_append_trailer:1;
|
||||
unsigned char pending_tx_trailer:1;
|
||||
};
|
||||
|
||||
|
|
|
@ -32,18 +32,21 @@ int
|
|||
lws_ws_rx_sm(struct lws *wsi, char already_processed, unsigned char c)
|
||||
{
|
||||
int callback_action = LWS_CALLBACK_RECEIVE;
|
||||
int ret = 0;
|
||||
struct lws_ext_pm_deflate_rx_ebufs pmdrx;
|
||||
unsigned short close_code;
|
||||
struct lws_tokens ebuf;
|
||||
unsigned char *pp;
|
||||
int ret = 0;
|
||||
int n = 0;
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
int rx_draining_ext = 0;
|
||||
int lin;
|
||||
#endif
|
||||
|
||||
ebuf.token = NULL;
|
||||
ebuf.len = 0;
|
||||
pmdrx.eb_in.token = NULL;
|
||||
pmdrx.eb_in.len = 0;
|
||||
pmdrx.eb_out.token = NULL;
|
||||
pmdrx.eb_out.len = 0;
|
||||
|
||||
if (wsi->socket_is_permanently_unusable)
|
||||
return -1;
|
||||
|
||||
|
@ -51,8 +54,10 @@ lws_ws_rx_sm(struct lws *wsi, char already_processed, unsigned char c)
|
|||
case LWS_RXPS_NEW:
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
if (wsi->ws->rx_draining_ext) {
|
||||
ebuf.token = NULL;
|
||||
ebuf.len = 0;
|
||||
pmdrx.eb_in.token = NULL;
|
||||
pmdrx.eb_in.len = 0;
|
||||
pmdrx.eb_out.token = NULL;
|
||||
pmdrx.eb_out.len = 0;
|
||||
lws_remove_wsi_from_draining_ext_list(wsi);
|
||||
rx_draining_ext = 1;
|
||||
lwsl_debug("%s: doing draining flow\n", __func__);
|
||||
|
@ -161,6 +166,14 @@ handle_first:
|
|||
return -1;
|
||||
}
|
||||
wsi->ws->rsv_first_msg = (c & 0x70);
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
/*
|
||||
* set the expectation that we will have to
|
||||
* fake up the zlib trailer to the inflator for this
|
||||
* frame
|
||||
*/
|
||||
wsi->ws->pmd_trailer_application = !!(c & 0x40);
|
||||
#endif
|
||||
wsi->ws->frame_is_binary =
|
||||
wsi->ws->opcode == LWSWSOPC_BINARY_FRAME;
|
||||
wsi->ws->first_fragment = 1;
|
||||
|
@ -557,114 +570,167 @@ ping_drop:
|
|||
|
||||
/*
|
||||
* No it's real payload, pass it up to the user callback.
|
||||
*
|
||||
* We have been statefully collecting it in the
|
||||
* LWS_RXPS_WS_FRAME_PAYLOAD clause above.
|
||||
*
|
||||
* It's nicely buffered with the pre-padding taken care of
|
||||
* so it can be sent straight out again using lws_write
|
||||
* so it can be sent straight out again using lws_write.
|
||||
*
|
||||
* However, now we have a chunk of it, we want to deal with it
|
||||
* all here. Since this may be input to permessage-deflate and
|
||||
* there are block limits on that for input and output, we may
|
||||
* need to iterate.
|
||||
*/
|
||||
|
||||
ebuf.token = &wsi->ws->rx_ubuf[LWS_PRE];
|
||||
ebuf.len = wsi->ws->rx_ubuf_head;
|
||||
pmdrx.eb_in.token = &wsi->ws->rx_ubuf[LWS_PRE];
|
||||
pmdrx.eb_in.len = wsi->ws->rx_ubuf_head;
|
||||
|
||||
if (wsi->ws->opcode == LWSWSOPC_PONG && !ebuf.len)
|
||||
/* for the non-pm-deflate case */
|
||||
|
||||
pmdrx.eb_out = pmdrx.eb_in;
|
||||
|
||||
if (wsi->ws->opcode == LWSWSOPC_PONG && !pmdrx.eb_in.len)
|
||||
goto already_done;
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
drain_extension:
|
||||
#endif
|
||||
// lwsl_notice("%s: passing %d to ext\n", __func__, ebuf.len);
|
||||
|
||||
if (lwsi_state(wsi) == LRS_RETURNED_CLOSE ||
|
||||
lwsi_state(wsi) == LRS_AWAITING_CLOSE_ACK)
|
||||
goto already_done;
|
||||
do {
|
||||
|
||||
// lwsl_notice("%s: pmdrx.eb_in.len: %d\n", __func__,
|
||||
// (int)pmdrx.eb_in.len);
|
||||
|
||||
if (lwsi_state(wsi) == LRS_RETURNED_CLOSE ||
|
||||
lwsi_state(wsi) == LRS_AWAITING_CLOSE_ACK)
|
||||
goto already_done;
|
||||
|
||||
n = PMDR_DID_NOTHING;
|
||||
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
lin = ebuf.len;
|
||||
//if (lin)
|
||||
// lwsl_hexdump_notice(ebuf.token, ebuf.len);
|
||||
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_RX, &ebuf, 0);
|
||||
lwsl_debug("%s: ext says %d / ebuf.len %d\n", __func__,
|
||||
n, ebuf.len);
|
||||
if (wsi->ws->rx_draining_ext)
|
||||
already_processed &= ~ALREADY_PROCESSED_NO_CB;
|
||||
lin = pmdrx.eb_in.len;
|
||||
//if (lin)
|
||||
// lwsl_hexdump_notice(ebuf.token, ebuf.len);
|
||||
lwsl_ext("%s: +++ passing %d %p to ext\n", __func__,
|
||||
pmdrx.eb_in.len, pmdrx.eb_in.token);
|
||||
|
||||
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_RX, &pmdrx, 0);
|
||||
lwsl_debug("%s: ext says %d / ebuf.len %d\n", __func__,
|
||||
n, pmdrx.eb_out.len);
|
||||
if (wsi->ws->rx_draining_ext)
|
||||
already_processed &= ~ALREADY_PROCESSED_NO_CB;
|
||||
#endif
|
||||
/*
|
||||
* ebuf may be pointing somewhere completely different now,
|
||||
* it's the output
|
||||
*/
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
if (n < 0) {
|
||||
|
||||
/*
|
||||
* we may rely on this to get RX, just drop connection
|
||||
* ebuf may be pointing somewhere completely different
|
||||
* now, it's the output
|
||||
*/
|
||||
wsi->socket_is_permanently_unusable = 1;
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
if (
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
rx_draining_ext &&
|
||||
#endif
|
||||
ebuf.len == 0)
|
||||
goto already_done;
|
||||
|
||||
if (
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
n &&
|
||||
#endif
|
||||
ebuf.len)
|
||||
/* extension had more... main loop will come back */
|
||||
lws_add_wsi_to_draining_ext_list(wsi);
|
||||
else
|
||||
lws_remove_wsi_from_draining_ext_list(wsi);
|
||||
|
||||
if (wsi->ws->check_utf8 && !wsi->ws->defeat_check_utf8) {
|
||||
if (lws_check_utf8(&wsi->ws->utf8,
|
||||
(unsigned char *)ebuf.token,
|
||||
ebuf.len)) {
|
||||
lws_close_reason(wsi,
|
||||
LWS_CLOSE_STATUS_INVALID_PAYLOAD,
|
||||
(uint8_t *)"bad utf8", 8);
|
||||
goto utf8_fail;
|
||||
}
|
||||
|
||||
/* we are ending partway through utf-8 character? */
|
||||
if (!wsi->ws->rx_packet_length && wsi->ws->final &&
|
||||
wsi->ws->utf8 && !n) {
|
||||
lwsl_info("FINAL utf8 error\n");
|
||||
lws_close_reason(wsi,
|
||||
LWS_CLOSE_STATUS_INVALID_PAYLOAD,
|
||||
(uint8_t *)"partial utf8", 12);
|
||||
utf8_fail:
|
||||
lwsl_notice("utf8 error\n");
|
||||
lwsl_hexdump_notice(ebuf.token, ebuf.len);
|
||||
|
||||
if (n < 0) {
|
||||
/*
|
||||
* we may rely on this to get RX, just drop
|
||||
* connection
|
||||
*/
|
||||
wsi->socket_is_permanently_unusable = 1;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!wsi->wsistate_pre_close && (ebuf.len >= 0 ||
|
||||
callback_action == LWS_CALLBACK_RECEIVE_PONG)) {
|
||||
if (ebuf.len)
|
||||
ebuf.token[ebuf.len] = '\0';
|
||||
|
||||
if (wsi->protocol->callback &&
|
||||
!(already_processed & ALREADY_PROCESSED_NO_CB)) {
|
||||
if (callback_action == LWS_CALLBACK_RECEIVE_PONG)
|
||||
lwsl_info("Doing pong callback\n");
|
||||
|
||||
ret = user_callback_handle_rxflow(
|
||||
wsi->protocol->callback,
|
||||
wsi, (enum lws_callback_reasons)
|
||||
callback_action,
|
||||
wsi->user_space,
|
||||
ebuf.token,
|
||||
ebuf.len);
|
||||
}
|
||||
wsi->ws->first_fragment = 0;
|
||||
}
|
||||
if (n == PMDR_DID_NOTHING)
|
||||
break;
|
||||
#endif
|
||||
lwsl_notice("%s: post ext ret %d, ebuf in %d / out %d\n",
|
||||
__func__, n, pmdrx.eb_in.len,
|
||||
pmdrx.eb_out.len);
|
||||
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
if (!lin)
|
||||
break;
|
||||
if (rx_draining_ext && !pmdrx.eb_out.len) {
|
||||
lwsl_debug(" --- ending drain on 0 read\n");
|
||||
goto already_done;
|
||||
}
|
||||
|
||||
if (n == PMDR_HAS_PENDING)
|
||||
/*
|
||||
* extension had more...
|
||||
* main loop will come back
|
||||
*/
|
||||
lws_add_wsi_to_draining_ext_list(wsi);
|
||||
else
|
||||
lws_remove_wsi_from_draining_ext_list(wsi);
|
||||
|
||||
rx_draining_ext = wsi->ws->rx_draining_ext;
|
||||
#endif
|
||||
|
||||
if (wsi->ws->check_utf8 && !wsi->ws->defeat_check_utf8) {
|
||||
if (lws_check_utf8(&wsi->ws->utf8,
|
||||
(unsigned char *)pmdrx.eb_out.token,
|
||||
pmdrx.eb_out.len)) {
|
||||
lws_close_reason(wsi,
|
||||
LWS_CLOSE_STATUS_INVALID_PAYLOAD,
|
||||
(uint8_t *)"bad utf8", 8);
|
||||
goto utf8_fail;
|
||||
}
|
||||
|
||||
/* we are ending partway through utf-8 character? */
|
||||
if (!wsi->ws->rx_packet_length &&
|
||||
wsi->ws->final && wsi->ws->utf8
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
/* if ext not negotiated, going to be UNKNOWN */
|
||||
&& (n == PMDR_EMPTY_FINAL || n == PMDR_UNKNOWN)
|
||||
#endif
|
||||
) {
|
||||
lwsl_info("FINAL utf8 error\n");
|
||||
lws_close_reason(wsi,
|
||||
LWS_CLOSE_STATUS_INVALID_PAYLOAD,
|
||||
(uint8_t *)"partial utf8", 12);
|
||||
utf8_fail:
|
||||
lwsl_notice("utf8 error\n");
|
||||
lwsl_hexdump_notice(pmdrx.eb_out.token,
|
||||
pmdrx.eb_out.len);
|
||||
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/* if pmd not enabled, in == out */
|
||||
|
||||
if (n == PMDR_DID_NOTHING ||
|
||||
n == PMDR_UNKNOWN)
|
||||
pmdrx.eb_in.len -= pmdrx.eb_out.len;
|
||||
|
||||
if (!wsi->wsistate_pre_close &&
|
||||
(pmdrx.eb_out.len >= 0 ||
|
||||
callback_action == LWS_CALLBACK_RECEIVE_PONG ||
|
||||
n == PMDR_EMPTY_FINAL)) {
|
||||
if (pmdrx.eb_out.len)
|
||||
pmdrx.eb_out.token[pmdrx.eb_out.len] = '\0';
|
||||
|
||||
if (wsi->protocol->callback &&
|
||||
!(already_processed & ALREADY_PROCESSED_NO_CB)) {
|
||||
if (callback_action ==
|
||||
LWS_CALLBACK_RECEIVE_PONG)
|
||||
lwsl_info("Doing pong callback\n");
|
||||
|
||||
ret = user_callback_handle_rxflow(
|
||||
wsi->protocol->callback, wsi,
|
||||
(enum lws_callback_reasons)
|
||||
callback_action,
|
||||
wsi->user_space,
|
||||
pmdrx.eb_out.token,
|
||||
pmdrx.eb_out.len);
|
||||
}
|
||||
wsi->ws->first_fragment = 0;
|
||||
}
|
||||
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
if (!lin)
|
||||
break;
|
||||
#endif
|
||||
|
||||
} while (pmdrx.eb_in.len
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
|| rx_draining_ext
|
||||
#endif
|
||||
);
|
||||
|
||||
already_done:
|
||||
wsi->ws->rx_ubuf_head = 0;
|
||||
break;
|
||||
|
@ -962,7 +1028,9 @@ rops_handle_POLLIN_ws(struct lws_context_per_thread *pt, struct lws *wsi,
|
|||
#endif
|
||||
}
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
if (wsi->ws->tx_draining_ext)
|
||||
if (wsi->ws->tx_draining_ext) {
|
||||
lws_handle_POLLOUT_event(wsi, pollfd);
|
||||
//lwsl_notice("%s: tx drain\n", __func__);
|
||||
/*
|
||||
* We cannot deal with new RX until the TX ext path has
|
||||
* been drained. It's because new rx will, eg, crap on
|
||||
|
@ -971,7 +1039,9 @@ rops_handle_POLLIN_ws(struct lws_context_per_thread *pt, struct lws *wsi,
|
|||
* TX ext drain path MUST go through event loop to avoid
|
||||
* blocking.
|
||||
*/
|
||||
lws_callback_on_writable(wsi);
|
||||
return LWS_HPI_RET_HANDLED;
|
||||
}
|
||||
#endif
|
||||
if ((pollfd->revents & LWS_POLLIN) && lws_is_flowcontrolled(wsi)) {
|
||||
/* We cannot deal with any kind of new RX because we are
|
||||
|
@ -1188,7 +1258,7 @@ int rops_handle_POLLOUT_ws(struct lws *wsi)
|
|||
{
|
||||
int write_type = LWS_WRITE_PONG;
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
struct lws_tokens ebuf;
|
||||
struct lws_ext_pm_deflate_rx_ebufs pmdrx;
|
||||
int ret, m;
|
||||
#endif
|
||||
int n;
|
||||
|
@ -1315,11 +1385,11 @@ int rops_handle_POLLOUT_ws(struct lws *wsi)
|
|||
}
|
||||
|
||||
/*
|
||||
* check in on the active extensions, see if they
|
||||
* had pending stuff to spill... they need to get the
|
||||
* first look-in otherwise sequence will be disordered
|
||||
* Check in on the active extensions, see if they had pending stuff to
|
||||
* spill... they need to get the first look-in otherwise sequence will
|
||||
* be disordered.
|
||||
*
|
||||
* NULL, zero-length ebuf means just spill pending
|
||||
* coming here with a NULL, zero-length ebuf means just spill pending
|
||||
*/
|
||||
|
||||
ret = 1;
|
||||
|
@ -1332,13 +1402,13 @@ int rops_handle_POLLOUT_ws(struct lws *wsi)
|
|||
/* default to nobody has more to spill */
|
||||
|
||||
ret = 0;
|
||||
ebuf.token = NULL;
|
||||
ebuf.len = 0;
|
||||
pmdrx.eb_in.token = NULL;
|
||||
pmdrx.eb_in.len = 0;
|
||||
|
||||
/* give every extension a chance to spill */
|
||||
|
||||
m = lws_ext_cb_active(wsi, LWS_EXT_CB_PACKET_TX_PRESEND,
|
||||
&ebuf, 0);
|
||||
&pmdrx, 0);
|
||||
if (m < 0) {
|
||||
lwsl_err("ext reports fatal error\n");
|
||||
return LWS_HP_RET_BAIL_DIE;
|
||||
|
@ -1352,9 +1422,9 @@ int rops_handle_POLLOUT_ws(struct lws *wsi)
|
|||
|
||||
/* assuming they gave us something to send, send it */
|
||||
|
||||
if (ebuf.len) {
|
||||
n = lws_issue_raw(wsi, (unsigned char *)ebuf.token,
|
||||
ebuf.len);
|
||||
if (pmdrx.eb_in.len) {
|
||||
n = lws_issue_raw(wsi, (unsigned char *)pmdrx.eb_in.token,
|
||||
pmdrx.eb_in.len);
|
||||
if (n < 0) {
|
||||
lwsl_info("closing from POLLOUT spill\n");
|
||||
return LWS_HP_RET_BAIL_DIE;
|
||||
|
@ -1362,9 +1432,9 @@ int rops_handle_POLLOUT_ws(struct lws *wsi)
|
|||
/*
|
||||
* Keep amount spilled small to minimize chance of this
|
||||
*/
|
||||
if (n != ebuf.len) {
|
||||
if (n != pmdrx.eb_in.len) {
|
||||
lwsl_err("Unable to spill ext %d vs %d\n",
|
||||
ebuf.len, n);
|
||||
pmdrx.eb_in.len, n);
|
||||
return LWS_HP_RET_BAIL_DIE;
|
||||
}
|
||||
} else
|
||||
|
@ -1573,10 +1643,10 @@ rops_write_role_protocol_ws(struct lws *wsi, unsigned char *buf, size_t len,
|
|||
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
|
||||
enum lws_write_protocol wpt;
|
||||
#endif
|
||||
struct lws_ext_pm_deflate_rx_ebufs pmdrx;
|
||||
int masked7 = lwsi_role_client(wsi);
|
||||
unsigned char is_masked_bit = 0;
|
||||
unsigned char *dropmask = NULL;
|
||||
struct lws_tokens ebuf;
|
||||
size_t orig_len = len;
|
||||
int pre = 0, n = 0;
|
||||
|
||||
|
@ -1599,7 +1669,7 @@ rops_write_role_protocol_ws(struct lws *wsi, unsigned char *buf, size_t len,
|
|||
wsi->ws->tx_draining_ext_list = NULL;
|
||||
|
||||
wpt = *wp;
|
||||
*wp = (wsi->ws->tx_draining_stashed_wp & 0xc0)|
|
||||
*wp = (wsi->ws->tx_draining_stashed_wp & 0xc0) |
|
||||
LWS_WRITE_CONTINUATION;
|
||||
|
||||
/*
|
||||
|
@ -1649,8 +1719,13 @@ rops_write_role_protocol_ws(struct lws *wsi, unsigned char *buf, size_t len,
|
|||
* a size that can be sent without partial sends or blocking, allows
|
||||
* interleaving of control frames and other connection service.
|
||||
*/
|
||||
ebuf.token = (char *)buf;
|
||||
ebuf.len = (int)len;
|
||||
|
||||
pmdrx.eb_in.token = (char *)buf;
|
||||
pmdrx.eb_in.len = (int)len;
|
||||
|
||||
/* for the non-pm-deflate case */
|
||||
|
||||
pmdrx.eb_out = pmdrx.eb_in;
|
||||
|
||||
switch ((int)*wp) {
|
||||
case LWS_WRITE_PING:
|
||||
|
@ -1659,18 +1734,19 @@ rops_write_role_protocol_ws(struct lws *wsi, unsigned char *buf, size_t len,
|
|||
break;
|
||||
default:
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
// lwsl_notice("LWS_EXT_CB_PAYLOAD_TX\n");
|
||||
// m = (int)ebuf.len;
|
||||
/* returns 0 if no more tx pending, 1 if more pending */
|
||||
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_TX, &ebuf, *wp);
|
||||
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_TX, &pmdrx, *wp);
|
||||
if (n < 0)
|
||||
return -1;
|
||||
// lwsl_notice("ext processed %d plaintext into %d compressed"
|
||||
// " (wp 0x%x)\n", m, (int)ebuf.len, *wp);
|
||||
lwsl_ext("%s: defl ext ret %d, ext in remaining %d, "
|
||||
"out %d compressed (wp 0x%x)\n", __func__, n,
|
||||
(int)pmdrx.eb_in.len, (int)pmdrx.eb_out.len, *wp);
|
||||
|
||||
if (n && ebuf.len) {
|
||||
lwsl_ext("write drain len %d (wp 0x%x) SETTING "
|
||||
"tx_draining_ext\n", (int)ebuf.len, *wp);
|
||||
if (n == PMDR_HAS_PENDING) {
|
||||
lwsl_ext("%s: HAS PENDING: write drain len %d "
|
||||
"(wp 0x%x) SETTING tx_draining_ext "
|
||||
"(remaining in %d)\n", __func__,
|
||||
(int)pmdrx.eb_out.len, *wp,
|
||||
(int)pmdrx.eb_in.len);
|
||||
/* extension requires further draining */
|
||||
wsi->ws->tx_draining_ext = 1;
|
||||
wsi->ws->tx_draining_ext_list =
|
||||
|
@ -1684,15 +1760,20 @@ rops_write_role_protocol_ws(struct lws *wsi, unsigned char *buf, size_t len,
|
|||
* fragments, so the last guy can use its FIN state.
|
||||
*/
|
||||
wsi->ws->tx_draining_stashed_wp = *wp;
|
||||
/* this is definitely not actually the last fragment
|
||||
* because the extension asserted he has more coming
|
||||
* So make sure this intermediate one doesn't go out
|
||||
* with a FIN.
|
||||
/*
|
||||
* Despite what we may have thought, this is definitely
|
||||
* NOT the last fragment, because the extension asserted
|
||||
* he has more coming. For example, the extension may
|
||||
* be compressing, and has saved up everything until the
|
||||
* end, where the output is larger than one chunk.
|
||||
*
|
||||
* Make sure this intermediate one doesn't actually
|
||||
* go out with a FIN.
|
||||
*/
|
||||
*wp |= LWS_WRITE_NO_FIN;
|
||||
}
|
||||
#endif
|
||||
if (ebuf.len && wsi->ws->stashed_write_pending) {
|
||||
if (pmdrx.eb_out.len && wsi->ws->stashed_write_pending) {
|
||||
wsi->ws->stashed_write_pending = 0;
|
||||
*wp = ((*wp) & 0xc0) | (int)wsi->ws->stashed_write_type;
|
||||
}
|
||||
|
@ -1703,13 +1784,13 @@ rops_write_role_protocol_ws(struct lws *wsi, unsigned char *buf, size_t len,
|
|||
* compression extension, it has already updated its state according
|
||||
* to this being issued
|
||||
*/
|
||||
if ((char *)buf != ebuf.token) {
|
||||
if ((char *)buf != pmdrx.eb_out.token) {
|
||||
/*
|
||||
* ext might eat it, but not have anything to issue yet.
|
||||
* In that case we have to follow his lead, but stash and
|
||||
* replace the write type that was lost here the first time.
|
||||
*/
|
||||
if (len && !ebuf.len) {
|
||||
if (len && !pmdrx.eb_out.len) {
|
||||
if (!wsi->ws->stashed_write_pending)
|
||||
wsi->ws->stashed_write_type =
|
||||
(char)(*wp) & 0x3f;
|
||||
|
@ -1723,8 +1804,8 @@ rops_write_role_protocol_ws(struct lws *wsi, unsigned char *buf, size_t len,
|
|||
wsi->ws->clean_buffer = 0;
|
||||
}
|
||||
|
||||
buf = (unsigned char *)ebuf.token;
|
||||
len = ebuf.len;
|
||||
buf = (unsigned char *)pmdrx.eb_out.token;
|
||||
len = pmdrx.eb_out.len;
|
||||
|
||||
if (!buf) {
|
||||
lwsl_err("null buf (%d)\n", (int)len);
|
||||
|
|
|
@ -139,11 +139,22 @@ struct _lws_websocket_related {
|
|||
unsigned int extension_data_pending:1;
|
||||
unsigned int rx_draining_ext:1;
|
||||
unsigned int tx_draining_ext:1;
|
||||
unsigned int pmd_trailer_application:1;
|
||||
|
||||
uint8_t count_act_ext;
|
||||
#endif
|
||||
};
|
||||
|
||||
/*
|
||||
* we need to separately track what's happening with both compressed rx in
|
||||
* and with inflated rx out that will be passed to the user code
|
||||
*/
|
||||
|
||||
struct lws_ext_pm_deflate_rx_ebufs {
|
||||
struct lws_tokens eb_in;
|
||||
struct lws_tokens eb_out;
|
||||
};
|
||||
|
||||
int
|
||||
lws_ws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len);
|
||||
|
||||
|
|
|
@ -704,9 +704,9 @@ bail:
|
|||
static int
|
||||
lws_ws_frame_rest_is_payload(struct lws *wsi, uint8_t **buf, size_t len)
|
||||
{
|
||||
struct lws_ext_pm_deflate_rx_ebufs pmdrx;
|
||||
unsigned int avail = (unsigned int)len;
|
||||
uint8_t *buffer = *buf, mask[4];
|
||||
struct lws_tokens ebuf;
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
unsigned int old_packet_length = (int)wsi->ws->rx_packet_length;
|
||||
#endif
|
||||
|
@ -741,10 +741,10 @@ lws_ws_frame_rest_is_payload(struct lws *wsi, uint8_t **buf, size_t len)
|
|||
if (!avail)
|
||||
return 0;
|
||||
|
||||
ebuf.token = (char *)buffer;
|
||||
ebuf.len = avail;
|
||||
|
||||
//lwsl_hexdump_notice(ebuf.token, ebuf.len);
|
||||
pmdrx.eb_in.token = (char *)buffer;
|
||||
pmdrx.eb_in.len = avail;
|
||||
pmdrx.eb_out.token = (char *)buffer;
|
||||
pmdrx.eb_out.len = avail;
|
||||
|
||||
if (!wsi->ws->all_zero_nonce) {
|
||||
|
||||
|
@ -777,10 +777,12 @@ lws_ws_frame_rest_is_payload(struct lws *wsi, uint8_t **buf, size_t len)
|
|||
|
||||
(*buf) += avail;
|
||||
len -= avail;
|
||||
wsi->ws->rx_packet_length -= avail;
|
||||
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_RX, &ebuf, 0);
|
||||
lwsl_info("%s: ext says %d / ebuf.len %d\n", __func__, n, ebuf.len);
|
||||
n = lws_ext_cb_active(wsi, LWS_EXT_CB_PAYLOAD_RX, &pmdrx, 0);
|
||||
lwsl_info("%s: ext says %d / ebuf_out.len %d\n", __func__, n,
|
||||
pmdrx.eb_out.len);
|
||||
#endif
|
||||
/*
|
||||
* ebuf may be pointing somewhere completely different now,
|
||||
|
@ -794,12 +796,11 @@ lws_ws_frame_rest_is_payload(struct lws *wsi, uint8_t **buf, size_t len)
|
|||
*/
|
||||
lwsl_notice("%s: LWS_EXT_CB_PAYLOAD_RX blew out\n", __func__);
|
||||
wsi->socket_is_permanently_unusable = 1;
|
||||
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
wsi->ws->rx_packet_length -= avail;
|
||||
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
/*
|
||||
* if we had an rx fragment right at the last compressed byte of the
|
||||
|
@ -811,15 +812,16 @@ lws_ws_frame_rest_is_payload(struct lws *wsi, uint8_t **buf, size_t len)
|
|||
* as the message completion.
|
||||
*/
|
||||
|
||||
if (!ebuf.len && /* zero-length inflation output */
|
||||
!n && /* nothing left to drain from the inflator */
|
||||
wsi->ws->count_act_ext && /* we are using pmd */
|
||||
if (!pmdrx.eb_out.len && /* zero-length inflation output */
|
||||
n == PMDR_EMPTY_FINAL && /* nothing to drain from the inflator */
|
||||
old_packet_length && /* we gave the inflator new input */
|
||||
!wsi->ws->rx_packet_length && /* raw ws packet payload all gone */
|
||||
wsi->ws->final && /* the raw ws packet is a FIN guy */
|
||||
wsi->protocol->callback &&
|
||||
!wsi->wsistate_pre_close) {
|
||||
|
||||
lwsl_ext("%s: issuing zero length FIN pkt\n", __func__);
|
||||
|
||||
if (user_callback_handle_rxflow(wsi->protocol->callback, wsi,
|
||||
LWS_CALLBACK_RECEIVE,
|
||||
wsi->user_space, NULL, 0))
|
||||
|
@ -829,22 +831,21 @@ lws_ws_frame_rest_is_payload(struct lws *wsi, uint8_t **buf, size_t len)
|
|||
}
|
||||
#endif
|
||||
|
||||
if (!ebuf.len)
|
||||
if (!pmdrx.eb_out.len)
|
||||
return avail;
|
||||
|
||||
if (
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
n &&
|
||||
#endif
|
||||
ebuf.len)
|
||||
if (n == PMDR_HAS_PENDING)
|
||||
/* extension had more... main loop will come back */
|
||||
lws_add_wsi_to_draining_ext_list(wsi);
|
||||
else
|
||||
lws_remove_wsi_from_draining_ext_list(wsi);
|
||||
#endif
|
||||
|
||||
if (wsi->ws->check_utf8 && !wsi->ws->defeat_check_utf8) {
|
||||
if (lws_check_utf8(&wsi->ws->utf8,
|
||||
(unsigned char *)ebuf.token, ebuf.len)) {
|
||||
(unsigned char *)pmdrx.eb_out.token,
|
||||
pmdrx.eb_out.len)) {
|
||||
lws_close_reason(wsi, LWS_CLOSE_STATUS_INVALID_PAYLOAD,
|
||||
(uint8_t *)"bad utf8", 8);
|
||||
goto utf8_fail;
|
||||
|
@ -859,7 +860,7 @@ lws_ws_frame_rest_is_payload(struct lws *wsi, uint8_t **buf, size_t len)
|
|||
|
||||
utf8_fail:
|
||||
lwsl_info("utf8 error\n");
|
||||
lwsl_hexdump_info(ebuf.token, ebuf.len);
|
||||
lwsl_hexdump_info(pmdrx.eb_out.token, pmdrx.eb_out.len);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
@ -869,14 +870,16 @@ utf8_fail:
|
|||
if (user_callback_handle_rxflow(wsi->protocol->callback, wsi,
|
||||
LWS_CALLBACK_RECEIVE,
|
||||
wsi->user_space,
|
||||
ebuf.token, ebuf.len))
|
||||
pmdrx.eb_out.token,
|
||||
pmdrx.eb_out.len))
|
||||
return -1;
|
||||
|
||||
wsi->ws->first_fragment = 0;
|
||||
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
lwsl_info("%s: input used %d, output %d, rem len %d, rx_draining_ext %d\n",
|
||||
__func__, avail, ebuf.len, (int)len, wsi->ws->rx_draining_ext);
|
||||
__func__, avail, pmdrx.eb_out.len, (int)len,
|
||||
wsi->ws->rx_draining_ext);
|
||||
#endif
|
||||
|
||||
return avail; /* how much we used from the input */
|
||||
|
@ -886,6 +889,7 @@ utf8_fail:
|
|||
int
|
||||
lws_parse_ws(struct lws *wsi, unsigned char **buf, size_t len)
|
||||
{
|
||||
unsigned char *bufin = *buf;
|
||||
int m, bulk = 0;
|
||||
|
||||
lwsl_debug("%s: received %d byte packet\n", __func__, (int)len);
|
||||
|
@ -900,10 +904,31 @@ lws_parse_ws(struct lws *wsi, unsigned char **buf, size_t len)
|
|||
*/
|
||||
if (wsi->rxflow_bitmap) {
|
||||
lwsl_info("%s: doing rxflow, caching %d\n", __func__,
|
||||
(int)len);
|
||||
if (lws_rxflow_cache(wsi, *buf, 0, (int)len) !=
|
||||
LWSRXFC_TRIMMED)
|
||||
*buf += len; /* stashing it is taking care of it */
|
||||
(int)len);
|
||||
/*
|
||||
* Since we cached the remaining available input, we
|
||||
* can say we "consumed" it.
|
||||
*
|
||||
* But what about the case where the available input
|
||||
* came out of the rxflow cache already? If we are
|
||||
* effectively "putting it back in the cache", we have
|
||||
* leave it where it is, already pointed to by the head.
|
||||
*/
|
||||
if (lws_rxflow_cache(wsi, *buf, 0, (int)len) ==
|
||||
LWSRXFC_TRIMMED) {
|
||||
/*
|
||||
* We dealt with it by trimming the existing
|
||||
* rxflow cache HEAD to account for what we used.
|
||||
*
|
||||
* indicate we didn't use anything to the caller
|
||||
* so he doesn't do any consumed processing
|
||||
*/
|
||||
lwsl_info("%s: trimming inside rxflow cache\n",
|
||||
__func__);
|
||||
*buf = bufin;
|
||||
} else
|
||||
*buf += len;
|
||||
|
||||
return 1;
|
||||
}
|
||||
#if !defined(LWS_WITHOUT_EXTENSIONS)
|
||||
|
@ -956,7 +981,7 @@ lws_parse_ws(struct lws *wsi, unsigned char **buf, size_t len)
|
|||
wsi->ws->rx_draining_ext);
|
||||
#endif
|
||||
m = lws_ws_rx_sm(wsi, ALREADY_PROCESSED_IGNORE_CHAR |
|
||||
ALREADY_PROCESSED_NO_CB, 0);
|
||||
ALREADY_PROCESSED_NO_CB, 0);
|
||||
}
|
||||
|
||||
if (m < 0) {
|
||||
|
|
|
@ -35,6 +35,7 @@ struct per_session_data__minimal_client_echo {
|
|||
uint32_t tail;
|
||||
char flow_controlled;
|
||||
uint8_t completed:1;
|
||||
uint8_t write_consume_pending:1;
|
||||
};
|
||||
|
||||
struct vhd_minimal_client_echo {
|
||||
|
@ -157,40 +158,44 @@ callback_minimal_client_echo(struct lws *wsi, enum lws_callback_reasons reason,
|
|||
case LWS_CALLBACK_CLIENT_WRITEABLE:
|
||||
|
||||
lwsl_user("LWS_CALLBACK_CLIENT_WRITEABLE\n");
|
||||
do {
|
||||
pmsg = lws_ring_get_element(pss->ring, &pss->tail);
|
||||
if (!pmsg) {
|
||||
lwsl_user(" (nothing in ring)\n");
|
||||
break;
|
||||
}
|
||||
|
||||
flags = lws_write_ws_flags(
|
||||
pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT,
|
||||
pmsg->first, pmsg->final);
|
||||
|
||||
/* notice we allowed for LWS_PRE in the payload already */
|
||||
m = lws_write(wsi, ((unsigned char *)pmsg->payload) +
|
||||
LWS_PRE, pmsg->len, flags);
|
||||
if (m < (int)pmsg->len) {
|
||||
lwsl_err("ERROR %d writing to ws socket\n", m);
|
||||
return -1;
|
||||
}
|
||||
|
||||
lwsl_user(" wrote %d: flags: 0x%x first: %d final %d\n",
|
||||
m, flags, pmsg->first, pmsg->final);
|
||||
|
||||
if ((*vhd->options & 1) && pmsg && pmsg->final)
|
||||
pss->completed = 1;
|
||||
|
||||
if (pss->write_consume_pending) {
|
||||
/* perform the deferred fifo consume */
|
||||
lws_ring_consume_single_tail(pss->ring, &pss->tail, 1);
|
||||
pss->write_consume_pending = 0;
|
||||
}
|
||||
pmsg = lws_ring_get_element(pss->ring, &pss->tail);
|
||||
if (!pmsg) {
|
||||
lwsl_user(" (nothing in ring)\n");
|
||||
break;
|
||||
}
|
||||
|
||||
} while (lws_ring_get_element(pss->ring, &pss->tail) &&
|
||||
!lws_send_pipe_choked(wsi));
|
||||
flags = lws_write_ws_flags(
|
||||
pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT,
|
||||
pmsg->first, pmsg->final);
|
||||
|
||||
/* more to do for us? */
|
||||
if (lws_ring_get_element(pss->ring, &pss->tail))
|
||||
/* come back as soon as we can write more */
|
||||
lws_callback_on_writable(wsi);
|
||||
/* notice we allowed for LWS_PRE in the payload already */
|
||||
m = lws_write(wsi, ((unsigned char *)pmsg->payload) +
|
||||
LWS_PRE, pmsg->len, flags);
|
||||
if (m < (int)pmsg->len) {
|
||||
lwsl_err("ERROR %d writing to ws socket\n", m);
|
||||
return -1;
|
||||
}
|
||||
|
||||
lwsl_user(" wrote %d: flags: 0x%x first: %d final %d\n",
|
||||
m, flags, pmsg->first, pmsg->final);
|
||||
|
||||
if ((*vhd->options & 1) && pmsg && pmsg->final)
|
||||
pss->completed = 1;
|
||||
|
||||
/*
|
||||
* Workaround deferred deflate in pmd extension by only
|
||||
* consuming the fifo entry when we are certain it has been
|
||||
* fully deflated at the next WRITABLE callback. You only need
|
||||
* this if you're using pmd.
|
||||
*/
|
||||
pss->write_consume_pending = 1;
|
||||
lws_callback_on_writable(wsi);
|
||||
|
||||
if (pss->flow_controlled &&
|
||||
(int)lws_ring_get_count_free_elements(pss->ring) > RING_DEPTH - 5) {
|
||||
|
|
|
@ -36,6 +36,7 @@ struct per_session_data__minimal_server_echo {
|
|||
uint32_t tail;
|
||||
uint8_t completed:1;
|
||||
uint8_t flow_controlled:1;
|
||||
uint8_t write_consume_pending:1;
|
||||
};
|
||||
|
||||
struct vhd_minimal_server_echo {
|
||||
|
@ -67,7 +68,7 @@ callback_minimal_server_echo(struct lws *wsi, enum lws_callback_reasons reason,
|
|||
lws_get_protocol(wsi));
|
||||
const struct msg *pmsg;
|
||||
struct msg amsg;
|
||||
int n, flags;
|
||||
int m, n, flags;
|
||||
|
||||
switch (reason) {
|
||||
|
||||
|
@ -103,39 +104,41 @@ callback_minimal_server_echo(struct lws *wsi, enum lws_callback_reasons reason,
|
|||
case LWS_CALLBACK_SERVER_WRITEABLE:
|
||||
|
||||
lwsl_user("LWS_CALLBACK_SERVER_WRITEABLE\n");
|
||||
do {
|
||||
int m;
|
||||
|
||||
pmsg = lws_ring_get_element(pss->ring, &pss->tail);
|
||||
if (!pmsg) {
|
||||
lwsl_user(" (nothing in ring)\n");
|
||||
break;
|
||||
}
|
||||
|
||||
flags = lws_write_ws_flags(
|
||||
pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT,
|
||||
pmsg->first, pmsg->final);
|
||||
|
||||
/* notice we allowed for LWS_PRE in the payload already */
|
||||
m = lws_write(wsi, ((unsigned char *)pmsg->payload) +
|
||||
LWS_PRE, pmsg->len, flags);
|
||||
if (m < (int)pmsg->len) {
|
||||
lwsl_err("ERROR %d writing to ws socket\n", m);
|
||||
return -1;
|
||||
}
|
||||
|
||||
lwsl_user(" wrote %d: flags: 0x%x first: %d final %d\n",
|
||||
m, flags, pmsg->first, pmsg->final);
|
||||
|
||||
if (pss->write_consume_pending) {
|
||||
/* perform the deferred fifo consume */
|
||||
lws_ring_consume_single_tail(pss->ring, &pss->tail, 1);
|
||||
pss->write_consume_pending = 0;
|
||||
}
|
||||
|
||||
} while (lws_ring_get_element(pss->ring, &pss->tail) &&
|
||||
!lws_send_pipe_choked(wsi));
|
||||
pmsg = lws_ring_get_element(pss->ring, &pss->tail);
|
||||
if (!pmsg) {
|
||||
lwsl_user(" (nothing in ring)\n");
|
||||
break;
|
||||
}
|
||||
|
||||
/* more to do for us? */
|
||||
if (lws_ring_get_element(pss->ring, &pss->tail))
|
||||
/* come back as soon as we can write more */
|
||||
lws_callback_on_writable(wsi);
|
||||
flags = lws_write_ws_flags(
|
||||
pmsg->binary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT,
|
||||
pmsg->first, pmsg->final);
|
||||
|
||||
/* notice we allowed for LWS_PRE in the payload already */
|
||||
m = lws_write(wsi, ((unsigned char *)pmsg->payload) +
|
||||
LWS_PRE, pmsg->len, flags);
|
||||
if (m < (int)pmsg->len) {
|
||||
lwsl_err("ERROR %d writing to ws socket\n", m);
|
||||
return -1;
|
||||
}
|
||||
|
||||
lwsl_user(" wrote %d: flags: 0x%x first: %d final %d\n",
|
||||
m, flags, pmsg->first, pmsg->final);
|
||||
/*
|
||||
* Workaround deferred deflate in pmd extension by only
|
||||
* consuming the fifo entry when we are certain it has been
|
||||
* fully deflated at the next WRITABLE callback. You only need
|
||||
* this if you're using pmd.
|
||||
*/
|
||||
pss->write_consume_pending = 1;
|
||||
lws_callback_on_writable(wsi);
|
||||
|
||||
if (pss->flow_controlled &&
|
||||
(int)lws_ring_get_count_free_elements(pss->ring) > RING_DEPTH - 5) {
|
||||
|
|
|
@ -52,21 +52,6 @@ cat << EOF >fuzzingserver.json
|
|||
}
|
||||
EOF
|
||||
|
||||
cat << EOF >fuzzingclient.json
|
||||
{
|
||||
"outdir": "./reports/servers",
|
||||
"servers": [
|
||||
{
|
||||
"url": "ws://127.0.0.1:9001"
|
||||
}
|
||||
],
|
||||
"cases": ["*"],
|
||||
"exclude-cases": ["2.10", "2.11", "12.3.1", "12.3.2", "12.4.*", "12.5.*" ],
|
||||
"exclude-agent-cases": {}
|
||||
}
|
||||
EOF
|
||||
|
||||
|
||||
PYTHONHASHSEED=0 wstest -m fuzzingserver &
|
||||
Q=$!
|
||||
sleep 2s
|
||||
|
@ -89,7 +74,7 @@ done
|
|||
|
||||
# generate the report in ./reports
|
||||
#
|
||||
$CLIE -s 127.0.0.1 -p 9001 -u "/updateReports?agent=libwebsockets" -o
|
||||
$CLIE -s 127.0.0.1 -p 9001 -u "/updateReports?agent=libwebsockets" -o -d3
|
||||
sleep 2s
|
||||
killall wstest
|
||||
sleep 1s
|
||||
|
|
|
@ -37,16 +37,6 @@ killall wstest 2>/dev/null
|
|||
# https://github.com/crossbario/autobahn-testsuite/issues/71
|
||||
|
||||
|
||||
cat << EOF >fuzzingserver.json
|
||||
{
|
||||
"url": "ws://127.0.0.1:9001",
|
||||
"outdir": "./reports/clients",
|
||||
"cases": ["*"],
|
||||
"exclude-cases": [ "2.10", "2.11", "12.3.1", "12.3.2", "12.4.*", "12.5.*"],
|
||||
"exclude-agent-cases": {}
|
||||
}
|
||||
EOF
|
||||
|
||||
cat << EOF >fuzzingclient.json
|
||||
{
|
||||
"outdir": "./reports/servers",
|
||||
|
@ -56,7 +46,7 @@ cat << EOF >fuzzingclient.json
|
|||
}
|
||||
],
|
||||
"cases": ["*"],
|
||||
"exclude-cases": ["2.10", "2.11", "12.3.1", "12.3.2", "12.4.*", "12.5.*" ],
|
||||
"exclude-cases": ["2.10", "2.11" ],
|
||||
"exclude-agent-cases": {}
|
||||
}
|
||||
EOF
|
||||
|
@ -66,7 +56,7 @@ echo "----------------------------------------------"
|
|||
echo "------- tests: autobahn as server"
|
||||
echo
|
||||
|
||||
$SERV -p 9001 -d7 &
|
||||
$SERV -p 9001 -d3 &
|
||||
wstest -m fuzzingclient
|
||||
R=$?
|
||||
echo "Autobahn client exit $R"
|
||||
|
|
|
@ -396,6 +396,18 @@ int ZEXPORT deflateSetHeader (strm, head)
|
|||
return Z_OK;
|
||||
}
|
||||
|
||||
/* ========================================================================= */
|
||||
int ZEXPORT deflatePending (strm, pending, bits)
|
||||
unsigned *pending;
|
||||
int *bits;
|
||||
z_streamp strm;
|
||||
{
|
||||
if (strm == Z_NULL || strm->state == Z_NULL) return Z_STREAM_ERROR;
|
||||
*pending = strm->state->pending;
|
||||
*bits = strm->state->bi_valid;
|
||||
return Z_OK;
|
||||
}
|
||||
|
||||
/* ========================================================================= */
|
||||
int ZEXPORT deflatePrime (strm, bits, value)
|
||||
z_streamp strm;
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
# define deflateInit2_ z_deflateInit2_
|
||||
# define deflateInit_ z_deflateInit_
|
||||
# define deflateParams z_deflateParams
|
||||
# define deflatePending z_deflatePending
|
||||
# define deflatePrime z_deflatePrime
|
||||
# define deflateReset z_deflateReset
|
||||
# define deflateSetDictionary z_deflateSetDictionary
|
||||
|
|
Loading…
Reference in New Issue