mirror of
https://github.com/warmcat/libwebsockets.git
synced 2025-03-09 00:00:04 +01:00
rxflow buflist: handle forced service
This commit is contained in:
parent
654adaf82a
commit
62af7934c8
17 changed files with 232 additions and 67 deletions
|
@ -59,6 +59,8 @@ lws_uv_idle(uv_idle_t *handle
|
|||
struct lws_context_per_thread, uv_idle);
|
||||
lws_usec_t us;
|
||||
|
||||
lws_service_do_ripe_rxflow(pt);
|
||||
|
||||
/*
|
||||
* is there anybody with pending stuff that needs service forcing?
|
||||
*/
|
||||
|
|
|
@ -963,7 +963,7 @@ int
|
|||
lws_buflist_append_segment(struct lws_buflist **head, uint8_t *buf, size_t len)
|
||||
{
|
||||
int first = !*head;
|
||||
void *p;
|
||||
void *p = *head;
|
||||
|
||||
assert(buf);
|
||||
assert(len);
|
||||
|
@ -972,7 +972,7 @@ lws_buflist_append_segment(struct lws_buflist **head, uint8_t *buf, size_t len)
|
|||
while (*head)
|
||||
head = &((*head)->next);
|
||||
|
||||
lwsl_info("%s: len %u\n", __func__, (uint32_t)len);
|
||||
lwsl_info("%s: len %u first %d %p\n", __func__, (uint32_t)len, first, p);
|
||||
|
||||
*head = (struct lws_buflist *)
|
||||
lws_malloc(sizeof(**head) + len, __func__);
|
||||
|
@ -1060,7 +1060,7 @@ lws_buflist_use_segment(struct lws_buflist **head, size_t len)
|
|||
if (!*head)
|
||||
return 0;
|
||||
|
||||
return (*head)->len;
|
||||
return (*head)->len - (*head)->pos;
|
||||
}
|
||||
|
||||
/* ... */
|
||||
|
|
|
@ -5724,6 +5724,16 @@ lws_dll_lws_remove(struct lws_dll_lws *_a)
|
|||
} \
|
||||
}
|
||||
|
||||
#define lws_start_foreach_dll(___type, ___it, ___start) \
|
||||
{ \
|
||||
___type ___it = ___start; \
|
||||
while (___it) {
|
||||
|
||||
#define lws_end_foreach_dll(___it) \
|
||||
___it = (___it)->next; \
|
||||
} \
|
||||
}
|
||||
|
||||
struct lws_buflist;
|
||||
|
||||
/**
|
||||
|
|
|
@ -266,6 +266,8 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
|
|||
if (!pt->rx_draining_ext_list && !n) /* poll timeout */ {
|
||||
#endif
|
||||
lws_service_fd_tsi(context, NULL, tsi);
|
||||
lws_service_do_ripe_rxflow(pt);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -296,6 +298,8 @@ faked_service:
|
|||
n--;
|
||||
}
|
||||
|
||||
lws_service_do_ripe_rxflow(pt);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -959,7 +959,7 @@ struct lws_context_per_thread {
|
|||
struct lws *tx_draining_ext_list;
|
||||
struct lws_dll_lws dll_head_timeout;
|
||||
struct lws_dll_lws dll_head_hrtimer;
|
||||
struct lws_dll_lws dll_head_rxflow;
|
||||
struct lws_dll_lws dll_head_rxflow; /* guys with pending rxflow */
|
||||
#if defined(LWS_WITH_LIBUV) || defined(LWS_WITH_LIBEVENT)
|
||||
struct lws_context *context;
|
||||
#endif
|
||||
|
@ -2066,7 +2066,8 @@ struct lws {
|
|||
|
||||
struct lws_dll_lws dll_timeout;
|
||||
struct lws_dll_lws dll_hrtimer;
|
||||
struct lws_dll_lws dll_rxflow;
|
||||
struct lws_dll_lws dll_rxflow; /* guys with pending rxflow */
|
||||
|
||||
#if defined(LWS_WITH_PEER_LIMITS)
|
||||
struct lws_peer *peer;
|
||||
#endif
|
||||
|
@ -2236,6 +2237,9 @@ struct lws {
|
|||
|
||||
#define lws_is_flowcontrolled(w) (!!(wsi->rxflow_bitmap))
|
||||
|
||||
void
|
||||
lws_service_do_ripe_rxflow(struct lws_context_per_thread *pt);
|
||||
|
||||
LWS_EXTERN int log_level;
|
||||
|
||||
LWS_EXTERN int
|
||||
|
|
|
@ -54,8 +54,10 @@ lws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len)
|
|||
}
|
||||
/* account for what we're using in rxflow buffer */
|
||||
if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL) &&
|
||||
!lws_buflist_use_segment(&wsi->buflist_rxflow, 1))
|
||||
!lws_buflist_use_segment(&wsi->buflist_rxflow, 1)) {
|
||||
lwsl_debug("%s: removed wsi %p from rxflow list\n", __func__, wsi);
|
||||
lws_dll_lws_remove(&wsi->dll_rxflow);
|
||||
}
|
||||
|
||||
if (lws_client_rx_sm(wsi, *(*buf)++)) {
|
||||
lwsl_debug("client_rx_sm exited\n");
|
||||
|
|
|
@ -178,7 +178,7 @@ postbody_completion:
|
|||
if (!wsi->cgi)
|
||||
#endif
|
||||
{
|
||||
lwsl_info("HTTP_BODY_COMPLETION\n");
|
||||
lwsl_info("HTTP_BODY_COMPLETION: %p (%s)\n", wsi, wsi->protocol->name);
|
||||
n = wsi->protocol->callback(wsi,
|
||||
LWS_CALLBACK_HTTP_BODY_COMPLETION,
|
||||
wsi->user_space, NULL, 0);
|
||||
|
|
|
@ -597,6 +597,8 @@ static void lws_h2_set_bin(struct lws *wsi, int n, unsigned char *buf)
|
|||
*buf = wsi->h2.h2n->set.s[n];
|
||||
}
|
||||
|
||||
/* we get called on the network connection */
|
||||
|
||||
int lws_h2_do_pps_send(struct lws *wsi)
|
||||
{
|
||||
struct lws_h2_netconn *h2n = wsi->h2.h2n;
|
||||
|
@ -743,8 +745,10 @@ int lws_h2_do_pps_send(struct lws *wsi)
|
|||
goto bail;
|
||||
}
|
||||
cwsi = lws_h2_wsi_from_id(wsi, pps->u.rs.sid);
|
||||
if (cwsi)
|
||||
if (cwsi) {
|
||||
lwsl_debug("%s: closing cwsi %p %s %s (wsi %p)\n", __func__, cwsi, cwsi->role_ops->name, cwsi->protocol->name, wsi);
|
||||
lws_close_free_wsi(cwsi, 0, "reset stream");
|
||||
}
|
||||
break;
|
||||
|
||||
case LWS_H2_PPS_UPDATE_WINDOW:
|
||||
|
@ -1399,13 +1403,13 @@ lws_h2_parse_end_of_frame(struct lws *wsi)
|
|||
}
|
||||
}
|
||||
|
||||
wsi->vhost->conn_stats.h2_trans++;
|
||||
p = lws_hdr_simple_ptr(h2n->swsi, WSI_TOKEN_HTTP_COLON_METHOD);
|
||||
if (!strcmp(p, "POST"))
|
||||
h2n->swsi->ah->frag_index[WSI_TOKEN_POST_URI] =
|
||||
h2n->swsi->ah->frag_index[WSI_TOKEN_HTTP_COLON_PATH];
|
||||
|
||||
wsi->vhost->conn_stats.h2_trans++;
|
||||
|
||||
lwsl_debug("%s: setting DEF_ACT from 0x%x\n", __func__, h2n->swsi->wsistate);
|
||||
lwsi_set_state(h2n->swsi, LRS_DEFERRING_ACTION);
|
||||
lws_callback_on_writable(h2n->swsi);
|
||||
break;
|
||||
|
@ -1769,6 +1773,26 @@ lws_h2_parser(struct lws *wsi, unsigned char *in, lws_filepos_t inlen,
|
|||
break;
|
||||
} else {
|
||||
|
||||
if (lwsi_state(h2n->swsi) == LRS_DEFERRING_ACTION) {
|
||||
m = lws_buflist_append_segment(
|
||||
&h2n->swsi->buflist_rxflow,
|
||||
in - 1, n);
|
||||
if (m < 0)
|
||||
return -1;
|
||||
if (m) {
|
||||
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
|
||||
lwsl_debug("%s: added %p to rxflow list\n", __func__, wsi);
|
||||
lws_dll_lws_add_front(&h2n->swsi->dll_rxflow, &pt->dll_head_rxflow);
|
||||
}
|
||||
in += n - 1;
|
||||
h2n->inside += n;
|
||||
h2n->count += n - 1;
|
||||
inlen -= n - 1;
|
||||
|
||||
lwsl_debug("%s: deferred %d\n", __func__, n);
|
||||
goto do_windows;
|
||||
}
|
||||
|
||||
h2n->swsi->outer_will_close = 1;
|
||||
/*
|
||||
* choose the length for this go so that we end at
|
||||
|
@ -1782,8 +1806,9 @@ lws_h2_parser(struct lws *wsi, unsigned char *in, lws_filepos_t inlen,
|
|||
* can return 0 in POST body with
|
||||
* content len exhausted somehow.
|
||||
*/
|
||||
if (n <= 0) {
|
||||
lwsl_debug("%s: lws_read_h1 told %d %d / %d\n",
|
||||
if (n < 0 ||
|
||||
(!n && !lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL))) {
|
||||
lwsl_info("%s: lws_read_h1 told %d %d / %d\n",
|
||||
__func__, n, h2n->count, h2n->length);
|
||||
in += h2n->length - h2n->count;
|
||||
h2n->inside = h2n->length;
|
||||
|
@ -1800,6 +1825,7 @@ lws_h2_parser(struct lws *wsi, unsigned char *in, lws_filepos_t inlen,
|
|||
h2n->count += n - 1;
|
||||
}
|
||||
|
||||
do_windows:
|
||||
/* account for both network and stream wsi windows */
|
||||
|
||||
wsi->h2.peer_tx_cr_est -= n;
|
||||
|
@ -2172,7 +2198,7 @@ lws_read_h2(struct lws *wsi, unsigned char *buf, lws_filepos_t len)
|
|||
|
||||
m = lws_h2_parser(wsi, buf, len, &body_chunk_len);
|
||||
if (m && m != 2) {
|
||||
lwsl_debug("%s: http2_parser bailed\n", __func__);
|
||||
lwsl_debug("%s: http2_parser bailed: %d\n", __func__, m);
|
||||
lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS,
|
||||
"lws_read_h2 bail");
|
||||
|
||||
|
@ -2185,11 +2211,6 @@ lws_read_h2(struct lws *wsi, unsigned char *buf, lws_filepos_t len)
|
|||
break;
|
||||
}
|
||||
|
||||
/* account for what we're using in rxflow buffer */
|
||||
if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL) &&
|
||||
!lws_buflist_use_segment(&wsi->buflist_rxflow, body_chunk_len))
|
||||
lws_dll_lws_remove(&wsi->dll_rxflow);
|
||||
|
||||
buf += body_chunk_len;
|
||||
len -= body_chunk_len;
|
||||
}
|
||||
|
|
|
@ -91,7 +91,7 @@ rops_handle_POLLIN_h2(struct lws_context_per_thread *pt, struct lws *wsi,
|
|||
unsigned int pending = 0;
|
||||
char draining_flow = 0;
|
||||
struct lws *wsi1;
|
||||
int n;
|
||||
int n, m;
|
||||
|
||||
#ifdef LWS_WITH_CGI
|
||||
if (wsi->cgi && (pollfd->revents & LWS_POLLOUT)) {
|
||||
|
@ -155,11 +155,13 @@ rops_handle_POLLIN_h2(struct lws_context_per_thread *pt, struct lws *wsi,
|
|||
wsi->ws->tx_draining_ext = 0;
|
||||
}
|
||||
|
||||
#if 0 /* not so for h2 */
|
||||
if (lws_is_flowcontrolled(wsi))
|
||||
/* We cannot deal with any kind of new RX because we are
|
||||
* RX-flowcontrolled.
|
||||
*/
|
||||
return LWS_HPI_RET_HANDLED;
|
||||
#endif
|
||||
|
||||
if (wsi->http2_substream || wsi->upgraded_to_http2) {
|
||||
wsi1 = lws_get_network_wsi(wsi);
|
||||
|
@ -313,14 +315,6 @@ drain:
|
|||
/* service incoming data */
|
||||
|
||||
if (eff_buf.token_len) {
|
||||
/*
|
||||
* if draining from rxflow buffer, not
|
||||
* critical to track what was used since at the
|
||||
* use it bumps wsi->rxflow_pos. If we come
|
||||
* around again it will pick up from where it
|
||||
* left off.
|
||||
*/
|
||||
|
||||
if (lwsi_role_h2(wsi) && lwsi_state(wsi) != LRS_BODY)
|
||||
n = lws_read_h2(wsi, (unsigned char *)eff_buf.token,
|
||||
eff_buf.token_len);
|
||||
|
@ -333,6 +327,17 @@ drain:
|
|||
n = 0;
|
||||
return LWS_HPI_RET_DIE;
|
||||
}
|
||||
|
||||
if (draining_flow) {
|
||||
m = lws_buflist_use_segment(&wsi->buflist_rxflow, n);
|
||||
lwsl_info("%s: draining rxflow: used %d, next %d\n",
|
||||
__func__, n, m);
|
||||
if (!m) {
|
||||
lwsl_notice("%s: removed wsi %p from rxflow list\n",
|
||||
__func__, wsi);
|
||||
lws_dll_lws_remove(&wsi->dll_rxflow);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
eff_buf.token = NULL;
|
||||
|
@ -386,6 +391,10 @@ int rops_handle_POLLOUT_h2(struct lws *wsi)
|
|||
#endif
|
||||
) && wsi->h2.h2n->pps) {
|
||||
lwsl_info("servicing pps\n");
|
||||
/*
|
||||
* this is called on the network connection, but may close
|
||||
* substreams... that may affect callers
|
||||
*/
|
||||
if (lws_h2_do_pps_send(wsi)) {
|
||||
wsi->socket_is_permanently_unusable = 1;
|
||||
return LWS_HP_RET_BAIL_DIE;
|
||||
|
@ -418,9 +427,13 @@ rops_service_flag_pending_h2(struct lws_context *context, int tsi)
|
|||
struct lws_context_per_thread *pt = &context->pt[tsi];
|
||||
struct allocated_headers *ah;
|
||||
int forced = 0;
|
||||
#endif
|
||||
|
||||
/* POLLIN faking (the pt lock is taken by the parent) */
|
||||
|
||||
|
||||
|
||||
#if !defined(LWS_ROLE_H1)
|
||||
/*
|
||||
* 3) For any wsi who have an ah with pending RX who did not
|
||||
* complete their current headers, and are not flowcontrolled,
|
||||
|
@ -621,7 +634,8 @@ rops_close_kill_connection_h2(struct lws *wsi, enum lws_close_status reason)
|
|||
wsi->h2.parent_wsi);
|
||||
lws_start_foreach_llp(struct lws **, w,
|
||||
wsi->h2.parent_wsi->h2.child_list) {
|
||||
lwsl_info(" \\---- child %p\n", *w);
|
||||
lwsl_info(" \\---- child %s %p\n",
|
||||
(*w)->role_ops ? (*w)->role_ops->name : "?", *w);
|
||||
} lws_end_foreach_llp(w, h2.sibling_list);
|
||||
}
|
||||
|
||||
|
@ -632,7 +646,9 @@ rops_close_kill_connection_h2(struct lws *wsi, enum lws_close_status reason)
|
|||
lwsl_info(" parent %p: closing children: list:\n", wsi);
|
||||
lws_start_foreach_llp(struct lws **, w,
|
||||
wsi->h2.child_list) {
|
||||
lwsl_info(" \\---- child %p\n", *w);
|
||||
lwsl_info(" \\---- child %s %p\n",
|
||||
(*w)->role_ops ? (*w)->role_ops->name : "?",
|
||||
*w);
|
||||
} lws_end_foreach_llp(w, h2.sibling_list);
|
||||
/* trigger closing of all of our http2 children first */
|
||||
lws_start_foreach_llp(struct lws **, w,
|
||||
|
@ -652,6 +668,7 @@ rops_close_kill_connection_h2(struct lws *wsi, enum lws_close_status reason)
|
|||
if (wsi->upgraded_to_http2) {
|
||||
/* remove pps */
|
||||
struct lws_h2_protocol_send *w = wsi->h2.h2n->pps, *w1;
|
||||
|
||||
while (w) {
|
||||
w1 = w->next;
|
||||
free(w);
|
||||
|
@ -770,16 +787,57 @@ lws_h2_dump_waiting_children(struct lws *wsi)
|
|||
|
||||
wsi = wsi->h2.child_list;
|
||||
while (wsi) {
|
||||
if (wsi->h2.requested_POLLOUT)
|
||||
lwsl_info(" * %p %s\n", wsi, wsi->protocol->name);
|
||||
else
|
||||
lwsl_info(" %p %s\n", wsi, wsi->protocol->name);
|
||||
lwsl_info(" %c %p %s %s\n",
|
||||
wsi->h2.requested_POLLOUT ? '*' : ' ',
|
||||
wsi, wsi->role_ops->name, wsi->protocol->name);
|
||||
|
||||
wsi = wsi->h2.sibling_list;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static int
|
||||
lws_h2_bind_for_post_before_action(struct lws *wsi)
|
||||
{
|
||||
const char *p;
|
||||
|
||||
p = lws_hdr_simple_ptr(wsi, WSI_TOKEN_HTTP_COLON_METHOD);
|
||||
if (!strcmp(p, "POST")) {
|
||||
const struct lws_protocols *pp;
|
||||
const char *name;
|
||||
const struct lws_http_mount *hit =
|
||||
lws_find_mount(wsi,
|
||||
lws_hdr_simple_ptr(wsi,
|
||||
WSI_TOKEN_HTTP_COLON_PATH),
|
||||
lws_hdr_total_length(wsi,
|
||||
WSI_TOKEN_HTTP_COLON_PATH));
|
||||
|
||||
lwsl_debug("%s: %s: hit %p: %s\n", __func__,
|
||||
lws_hdr_simple_ptr(wsi, WSI_TOKEN_HTTP_COLON_PATH),
|
||||
hit, hit ? hit->origin : "null");
|
||||
if (hit) {
|
||||
name = hit->origin;
|
||||
if (hit->protocol)
|
||||
name = hit->protocol;
|
||||
|
||||
pp = lws_vhost_name_to_protocol(wsi->vhost, name);
|
||||
if (!pp) {
|
||||
lwsl_err("Unable to find plugin '%s'\n", name);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (lws_bind_protocol(wsi, pp))
|
||||
return 1;
|
||||
}
|
||||
|
||||
lwsl_notice("%s: setting LRS_BODY from 0x%x (%s)\n", __func__,
|
||||
wsi->wsistate, wsi->protocol->name);
|
||||
lwsi_set_state(wsi, LRS_BODY);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* we are the 'network wsi' for potentially many muxed child wsi with
|
||||
* no network connection of their own, who have to use us for all their
|
||||
|
@ -886,6 +944,8 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi)
|
|||
|
||||
lwsi_set_state(w, LRS_ESTABLISHED);
|
||||
|
||||
lws_h2_bind_for_post_before_action(w);
|
||||
|
||||
lwsl_info(" h2 action start...\n");
|
||||
n = lws_http_action(w);
|
||||
lwsl_info(" h2 action result %d "
|
||||
|
|
|
@ -109,6 +109,7 @@ lws_client_connect_2(struct lws *wsi)
|
|||
lwsl_info("%s: just join h2 directly\n",
|
||||
__func__);
|
||||
|
||||
wsi->client_h2_alpn = 1;
|
||||
lws_wsi_h2_adopt(w, wsi);
|
||||
lws_vhost_unlock(wsi->vhost);
|
||||
|
||||
|
|
|
@ -422,9 +422,9 @@ start_ws_handshake:
|
|||
|
||||
case LRS_ISSUE_HTTP_BODY:
|
||||
if (wsi->client_http_body_pending) {
|
||||
lws_set_timeout(wsi,
|
||||
PENDING_TIMEOUT_CLIENT_ISSUE_PAYLOAD,
|
||||
context->timeout_secs);
|
||||
//lws_set_timeout(wsi,
|
||||
// PENDING_TIMEOUT_CLIENT_ISSUE_PAYLOAD,
|
||||
// context->timeout_secs);
|
||||
/* user code must ask for writable callback */
|
||||
break;
|
||||
}
|
||||
|
@ -658,13 +658,17 @@ lws_client_interpret_server_handshake(struct lws *wsi)
|
|||
/* we are being an http client...
|
||||
*/
|
||||
#if defined(LWS_ROLE_H2)
|
||||
if (wsi->client_h2_alpn)
|
||||
if (wsi->client_h2_alpn || wsi->client_h2_substream) {
|
||||
lwsl_debug("%s: %p: transitioning to h2 client\n", __func__, wsi);
|
||||
lws_role_transition(wsi, LWSIFR_CLIENT,
|
||||
LRS_ESTABLISHED, &role_ops_h2);
|
||||
else
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
lwsl_debug("%s: %p: transitioning to h1 client\n", __func__, wsi);
|
||||
lws_role_transition(wsi, LWSIFR_CLIENT,
|
||||
LRS_ESTABLISHED, &role_ops_h1);
|
||||
}
|
||||
|
||||
wsi->ah = ah;
|
||||
ah->http_response = 0;
|
||||
|
|
|
@ -1718,6 +1718,7 @@ lws_http_transaction_completed(struct lws *wsi)
|
|||
* until we can verify POLLOUT. The part of this that confirms POLLOUT
|
||||
* with no partials is in lws_server_socket_service() below.
|
||||
*/
|
||||
lwsl_debug("%s: setting DEF_ACT from 0x%x\n", __func__, wsi->wsistate);
|
||||
lwsi_set_state(wsi, LRS_DEFERRING_ACTION);
|
||||
wsi->http.tx_content_length = 0;
|
||||
wsi->http.tx_content_remain = 0;
|
||||
|
|
|
@ -1093,13 +1093,6 @@ drain:
|
|||
/* service incoming data */
|
||||
|
||||
if (eff_buf.token_len) {
|
||||
/*
|
||||
* if draining from rxflow buffer, not
|
||||
* critical to track what was used since at the
|
||||
* use it bumps wsi->rxflow_pos. If we come
|
||||
* around again it will pick up from where it
|
||||
* left off.
|
||||
*/
|
||||
#if defined(LWS_ROLE_H2)
|
||||
if (lwsi_role_h2(wsi) && lwsi_state(wsi) != LRS_BODY)
|
||||
n = lws_read_h2(wsi, (unsigned char *)eff_buf.token,
|
||||
|
@ -1114,6 +1107,14 @@ drain:
|
|||
n = 0;
|
||||
return LWS_HPI_RET_DIE;
|
||||
}
|
||||
if (draining_flow) {
|
||||
m = lws_buflist_use_segment(&wsi->buflist_rxflow, n);
|
||||
lwsl_debug("%s: draining rxflow: used %d, next %d\n", __func__, n, m);
|
||||
if (!m) {
|
||||
lwsl_notice("%s: removed wsi %p from rxflow list\n", __func__, wsi);
|
||||
lws_dll_lws_remove(&wsi->dll_rxflow);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
eff_buf.token = NULL;
|
||||
|
|
|
@ -549,7 +549,6 @@ handshake_0405(struct lws_context *context, struct lws *wsi)
|
|||
|
||||
return 0;
|
||||
|
||||
|
||||
bail:
|
||||
/* caller will free up his parsing allocations */
|
||||
return -1;
|
||||
|
@ -561,6 +560,9 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
|
|||
{
|
||||
int m, draining_flow = 0;
|
||||
|
||||
if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL))
|
||||
draining_flow = 1;
|
||||
|
||||
lwsl_parser("%s: received %d byte packet\n", __func__, (int)len);
|
||||
|
||||
/* let the rx protocol state machine have as much as it needs */
|
||||
|
@ -583,22 +585,13 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
|
|||
continue;
|
||||
}
|
||||
|
||||
/* account for what we're using in rxflow buffer */
|
||||
if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
|
||||
draining_flow = 1;
|
||||
if (!lws_buflist_use_segment(&wsi->buflist_rxflow, 1))
|
||||
lws_dll_lws_remove(&wsi->dll_rxflow);
|
||||
}
|
||||
|
||||
/* consume payload bytes efficiently */
|
||||
if (wsi->lws_rx_parse_state ==
|
||||
LWS_RXPS_PAYLOAD_UNTIL_LENGTH_EXHAUSTED) {
|
||||
m = lws_payload_until_length_exhausted(wsi, buf, &len);
|
||||
if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
|
||||
draining_flow = 1;
|
||||
if (!lws_buflist_use_segment(&wsi->buflist_rxflow, m))
|
||||
lws_dll_lws_remove(&wsi->dll_rxflow);
|
||||
}
|
||||
if (draining_flow &&
|
||||
!lws_buflist_use_segment(&wsi->buflist_rxflow, m))
|
||||
lws_dll_lws_remove(&wsi->dll_rxflow);
|
||||
}
|
||||
|
||||
/* process the byte */
|
||||
|
@ -607,8 +600,11 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
|
|||
return -1;
|
||||
len--;
|
||||
|
||||
if (draining_flow && /* were draining, now nothing left */
|
||||
!lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
|
||||
/* account for what we're using in rxflow buffer */
|
||||
if (draining_flow &&
|
||||
!lws_buflist_use_segment(&wsi->buflist_rxflow, 1)) {
|
||||
lws_dll_lws_remove(&wsi->dll_rxflow);
|
||||
|
||||
lwsl_debug("%s: %p flow buf: drained\n", __func__, wsi);
|
||||
|
||||
/* having drained the rxflow buffer, can rearm POLLIN */
|
||||
|
|
|
@ -136,11 +136,12 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
|
|||
if (pollfd) {
|
||||
int eff = vwsi->leave_pollout_active;
|
||||
|
||||
if (!eff)
|
||||
if (!eff) {
|
||||
if (lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
|
||||
lwsl_info("failed at set pollfd\n");
|
||||
goto bail_die;
|
||||
}
|
||||
}
|
||||
|
||||
vwsi->handling_pollout = 0;
|
||||
|
||||
|
@ -162,7 +163,9 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
|
|||
|
||||
if (lwsi_role_client(wsi) &&
|
||||
!wsi->hdr_parsing_completed &&
|
||||
lwsi_state(wsi) != LRS_H2_WAITING_TO_SEND_HEADERS)
|
||||
lwsi_state(wsi) != LRS_H2_WAITING_TO_SEND_HEADERS &&
|
||||
lwsi_state(wsi) != LRS_ISSUE_HTTP_BODY
|
||||
)
|
||||
goto bail_ok;
|
||||
|
||||
|
||||
|
@ -301,10 +304,13 @@ int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len)
|
|||
/* a new rxflow, buffer it and warn caller */
|
||||
|
||||
m = lws_buflist_append_segment(&wsi->buflist_rxflow, buf + n, len - n);
|
||||
|
||||
if (m < 0)
|
||||
return -1;
|
||||
if (m)
|
||||
if (m) {
|
||||
lwsl_debug("%s: added %p to rxflow list\n", __func__, wsi);
|
||||
lws_dll_lws_add_front(&wsi->dll_rxflow, &pt->dll_head_rxflow);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -394,6 +400,43 @@ lws_read_or_use_preamble(struct lws_context_per_thread *pt, struct lws *wsi)
|
|||
return len;
|
||||
}
|
||||
|
||||
void
|
||||
lws_service_do_ripe_rxflow(struct lws_context_per_thread *pt)
|
||||
{
|
||||
struct lws_pollfd pfd;
|
||||
|
||||
if (!pt->dll_head_rxflow.next)
|
||||
return;
|
||||
|
||||
/*
|
||||
* service all guys with pending rxflow that reached a state they can
|
||||
* accept the pending data
|
||||
*/
|
||||
|
||||
lws_pt_lock(pt, __func__);
|
||||
|
||||
lws_start_foreach_dll_safe(struct lws_dll_lws *, d, d1,
|
||||
pt->dll_head_rxflow.next) {
|
||||
struct lws *wsi = lws_container_of(d, struct lws, dll_rxflow);
|
||||
|
||||
pfd.events = POLLIN;
|
||||
pfd.revents = POLLIN;
|
||||
pfd.fd = -1;
|
||||
|
||||
lwsl_debug("%s: rxflow processing: %p 0x%x\n", __func__, wsi,
|
||||
wsi->wsistate);
|
||||
|
||||
if (!lws_is_flowcontrolled(wsi) &&
|
||||
lwsi_state(wsi) != LRS_DEFERRING_ACTION &&
|
||||
(wsi->role_ops->handle_POLLIN)(pt, wsi, &pfd) ==
|
||||
LWS_HPI_RET_CLOSE_HANDLED)
|
||||
lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS,
|
||||
"close_and_handled");
|
||||
|
||||
} lws_end_foreach_dll_safe(d, d1);
|
||||
|
||||
lws_pt_unlock(pt);
|
||||
}
|
||||
|
||||
/*
|
||||
* guys that need POLLIN service again without waiting for network action
|
||||
|
@ -413,6 +456,20 @@ lws_service_flag_pending(struct lws_context *context, int tsi)
|
|||
|
||||
lws_pt_lock(pt, __func__);
|
||||
|
||||
/*
|
||||
* 1) If there is any wsi with rxflow buffered and in a state to process
|
||||
* it, we should not wait in poll
|
||||
*/
|
||||
|
||||
lws_start_foreach_dll(struct lws_dll_lws *, d, pt->dll_head_rxflow.next) {
|
||||
struct lws *wsi = lws_container_of(d, struct lws, dll_rxflow);
|
||||
|
||||
if (lwsi_state(wsi) != LRS_DEFERRING_ACTION) {
|
||||
forced = 1;
|
||||
break;
|
||||
}
|
||||
} lws_end_foreach_dll(d);
|
||||
|
||||
#if defined(LWS_ROLE_WS)
|
||||
forced |= role_ops_ws.service_flag_pending(context, tsi);
|
||||
#endif
|
||||
|
|
|
@ -59,7 +59,7 @@ int main(int argc, char **argv)
|
|||
info.port = 7681;
|
||||
info.mounts = &mount;
|
||||
info.error_document_404 = "/404.html";
|
||||
|
||||
info.max_http_header_pool = 32;
|
||||
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
|
||||
info.ssl_cert_filepath = "localhost-100y.cert";
|
||||
info.ssl_private_key_filepath = "localhost-100y.key";
|
||||
|
|
|
@ -139,7 +139,7 @@ callback_post_demo(struct lws *wsi, enum lws_callback_reasons reason,
|
|||
break;
|
||||
|
||||
case LWS_CALLBACK_HTTP_BODY_COMPLETION:
|
||||
lwsl_debug("LWS_CALLBACK_HTTP_BODY_COMPLETION\n");
|
||||
lwsl_debug("LWS_CALLBACK_HTTP_BODY_COMPLETION: %p\n", wsi);
|
||||
/* call to inform no more payload data coming */
|
||||
lws_spa_finalize(pss->spa);
|
||||
|
||||
|
@ -194,12 +194,14 @@ callback_post_demo(struct lws *wsi, enum lws_callback_reasons reason,
|
|||
break;
|
||||
|
||||
case LWS_CALLBACK_HTTP_WRITEABLE:
|
||||
if (!pss->result_len)
|
||||
if (!pss->result_len) {
|
||||
lwsl_debug("nothing in result_len\n");
|
||||
break;
|
||||
}
|
||||
lwsl_debug("LWS_CALLBACK_HTTP_WRITEABLE: sending %d\n",
|
||||
pss->result_len);
|
||||
n = lws_write(wsi, (unsigned char *)pss->result + LWS_PRE,
|
||||
pss->result_len, LWS_WRITE_HTTP);
|
||||
pss->result_len, LWS_WRITE_HTTP_FINAL);
|
||||
if (n < 0)
|
||||
return 1;
|
||||
goto try_to_reuse;
|
||||
|
|
Loading…
Add table
Reference in a new issue