1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

ah owns rxbuf

This is intended to solve a longstanding problem with the
relationship between http/1.1 keep-alive and the service
loop.

Ah now contain an rx buffer which is used during header
processing, and the ah may not be detached from the wsi
until the rx buffer is exhausted.

Having the rx buffer in the ah means we can delay using the
rx until a later service loop.

Ah which have pending rx force POLLIN service on the wsi
they are attached to automatically, so we can interleave
general service / connections with draining each ah rx
buffer.

The possible http/1.1 situations and their dispositions are:

 1) exactly one set of http headers come.  After processing,
    the ah is detached since no pending rx left.  If more
    headers come later, a fresh ah is aqcuired when available
    and the rx flow control blocks the read until then.

 2) more that one whole set of headers come and we remain in
    http mode (no upgrade).  The ah is left attached and
    returns to the service loop after the first set of headers.
    We will get forced service due to the ah having pending
    content (respecting flowcontrol) and process the pending
    rx in the ah.  If we use it all up, we will detach the
    ah.

 3) one set of http headers come with ws traffic appended.
    We service the headers, do the upgrade, and keep the ah
    until the remaining ws content is used.  When we
    exhausted the ws traffix in the ah rx buffer, we
    detach the ah.

Since there can be any amount of http/1.1 pipelining on a
connection, and each may be expensive to service, it's now
enforced there is a return to the service loop after each
header set is serviced on a connection.

When I added the forced service for ah with pending buffering,
I added support for it to the windows plat code.  However this
is untested.

Signed-off-by: Andy Green <andy.green@linaro.org>
This commit is contained in:
Andy Green 2016-02-15 12:37:04 +08:00
parent 8acdd2e7ed
commit 2c218e705f
10 changed files with 410 additions and 198 deletions

View file

@ -301,7 +301,9 @@ lws_client_connect_2(struct lws *wsi)
return wsi;
oom4:
lws_free_header_table(wsi);
/* we're closing, losing some rx is OK */
wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen;
lws_header_table_detach(wsi);
lws_free(wsi);
return NULL;
@ -402,7 +404,7 @@ lws_client_connect_via_info(struct lws_client_connect_info *i)
}
#endif
if (lws_allocate_header_table(wsi))
if (lws_header_table_attach(wsi))
goto bail;
/*
@ -465,7 +467,9 @@ lws_client_connect_via_info(struct lws_client_connect_info *i)
return lws_client_connect_2(wsi);
bail1:
lws_free_header_table(wsi);
/* we're closing, losing some rx is OK */
wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen;
lws_header_table_detach(wsi);
bail:
lws_free(wsi);

View file

@ -798,7 +798,7 @@ check_accept:
lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
/* free up his parsing allocations */
lws_free_header_table(wsi);
lws_header_table_detach(wsi);
lws_union_transition(wsi, LWSCM_WS_CLIENT);
wsi->state = LWSS_ESTABLISHED;

View file

@ -55,12 +55,14 @@
* into multiple fragments. They may contain unknown headers with arbitrary
* argument lengths. So, we parse using a single-character at a time state
* machine that is completely independent of packet size.
*
* Returns <0 for error or length of chars consumed from buf (up to len)
*/
LWS_VISIBLE int
lws_read(struct lws *wsi, unsigned char *buf, size_t len)
{
unsigned char *last_char;
unsigned char *last_char, *oldbuf = buf;
int body_chunk_len;
size_t n;
@ -90,7 +92,7 @@ lws_read(struct lws *wsi, unsigned char *buf, size_t len)
}
break;
#endif
http_new:
case LWSS_HTTP:
wsi->hdr_parsing_completed = 0;
/* fallthru */
@ -203,9 +205,9 @@ postbody_completion:
read_ok:
/* Nothing more to do for now */
lwsl_debug("%s: read_ok\n", __func__);
lwsl_info("%s: read_ok, used %d\n", __func__, buf - oldbuf);
return 0;
return buf - oldbuf;
http_complete:
lwsl_debug("%s: http_complete\n", __func__);
@ -215,11 +217,11 @@ http_complete:
if (lws_http_transaction_completed(wsi))
goto bail;
#endif
/* If we have more data, loop back around: */
if (len)
goto http_new;
return 0;
/* we may have next header set already, but return to event loop first
* so a heaily-pipelined http/1.1 connection cannot monopolize the
* service thread with GET hugefile.bin GET hugefile.bin etc
*/
goto read_ok;
bail:
lwsl_debug("closing connection at lws_read bail:\n");

View file

@ -52,18 +52,12 @@ lws_free_wsi(struct lws *wsi)
lws_free_set_NULL(wsi->rxflow_buffer);
lws_free_set_NULL(wsi->trunc_alloc);
/*
* These union members have an ah at the start
*
* struct _lws_http_mode_related http;
* struct _lws_http2_related http2;
* struct _lws_header_related hdr;
*
* basically ws-related union member does not
*/
if (wsi->mode != LWSCM_WS_CLIENT &&
wsi->mode != LWSCM_WS_SERVING)
lws_free_header_table(wsi);
if (wsi->u.hdr.ah) {
/* we're closing, losing some rx is OK */
wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen;
lws_header_table_detach(wsi);
}
wsi->context->count_wsi_allocated--;
lwsl_debug("%s: %p, remaining wsi %d\n", __func__, wsi,
@ -411,10 +405,12 @@ just_kill_connection:
wsi->socket_is_permanently_unusable = 1;
#ifdef LWS_USE_LIBUV
/* libuv has to do his own close handle processing asynchronously */
lws_libuv_closehandle(wsi);
if (LWS_LIBUV_ENABLED(context)) {
/* libuv has to do his own close handle processing asynchronously */
lws_libuv_closehandle(wsi);
return;
return;
}
#endif
lws_close_free_wsi_final(wsi);

View file

@ -122,12 +122,8 @@ LWS_VISIBLE int
lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
{
struct lws_context_per_thread *pt = &context->pt[tsi];
struct lws *wsi;
int n, m, c;
char buf;
#ifdef LWS_OPENSSL_SUPPORT
struct lws *wsi_next;
#endif
/* stay dead once we are dead */
@ -148,17 +144,7 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
}
context->service_tid = context->service_tid_detected;
/* if we know we are draining rx ext, do not wait in poll */
if (pt->rx_draining_ext_list)
timeout_ms = 0;
#ifdef LWS_OPENSSL_SUPPORT
/* if we know we have non-network pending data, do not wait in poll */
if (lws_ssl_anybody_has_buffered_read_tsi(context, tsi)) {
timeout_ms = 0;
lwsl_err("ssl buffered read\n");
}
#endif
timeout_ms = lws_service_adjust_timeout(context, timeout_ms, tsi);
n = poll(pt->fds, pt->fds_count, timeout_ms);
@ -178,46 +164,11 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
return 0;
}
/*
* For all guys with already-available ext data to drain, if they are
* not flowcontrolled, fake their POLLIN status
*/
wsi = pt->rx_draining_ext_list;
while (wsi) {
pt->fds[wsi->position_in_fds_table].revents |=
pt->fds[wsi->position_in_fds_table].events & POLLIN;
wsi = wsi->u.ws.rx_draining_ext_list;
}
c = n;
#ifdef LWS_OPENSSL_SUPPORT
/*
* For all guys with buffered SSL read data already saved up, if they
* are not flowcontrolled, fake their POLLIN status so they'll get
* service to use up the buffered incoming data, even though their
* network socket may have nothing
*/
wsi = pt->pending_read_list;
while (wsi) {
wsi_next = wsi->pending_read_list_next;
pt->fds[wsi->position_in_fds_table].revents |=
pt->fds[wsi->position_in_fds_table].events & POLLIN;
if (pt->fds[wsi->position_in_fds_table].revents & POLLIN)
/*
* he's going to get serviced now, take him off the
* list of guys with buffered SSL. If he still has some
* at the end of the service, he'll get put back on the
* list then.
*/
lws_ssl_remove_wsi_from_buffered_list(wsi);
wsi = wsi_next;
}
#endif
lws_service_flag_pending(context, tsi);
/* any socket with events to service? */
c = n;
for (n = 0; n < pt->fds_count && c; n++) {
if (!pt->fds[n].revents)
continue;
@ -383,7 +334,8 @@ lws_plat_context_late_destroy(struct lws_context *context)
/* cast a struct sockaddr_in6 * into addr for ipv6 */
LWS_VISIBLE int
lws_interface_to_sa(int ipv6, const char *ifname, struct sockaddr_in *addr, size_t addrlen)
lws_interface_to_sa(int ipv6, const char *ifname, struct sockaddr_in *addr,
size_t addrlen)
{
int rc = -1;

View file

@ -153,16 +153,15 @@ LWS_VISIBLE void lwsl_emit_syslog(int level, const char *line)
LWS_VISIBLE int
lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
{
int n;
unsigned int i;
DWORD ev;
struct lws_context_per_thread *pt = &context->pt[tsi];
WSANETWORKEVENTS networkevents;
struct lws_pollfd *pfd;
struct lws *wsi;
struct lws_context_per_thread *pt = &context->pt[tsi];
unsigned int i;
DWORD ev;
int n, m;
/* stay dead once we are dead */
if (context == NULL)
return 1;
@ -196,6 +195,9 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
}
}
/* if we know something needs service already, don't wait in poll */
timeout_ms = lws_service_adjust_timeout(context, timeout_ms, tsi);
ev = WSAWaitForMultipleEvents(pt->fds_count + 1, pt->events,
FALSE, timeout_ms, FALSE);
context->service_tid = 0;
@ -215,8 +217,9 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
pfd = &pt->fds[ev - WSA_WAIT_EVENT_0 - 1];
if (WSAEnumNetworkEvents(pfd->fd,
pt->events[ev - WSA_WAIT_EVENT_0],
/* eh... is one event at a time the best windows can do? */
if (WSAEnumNetworkEvents(pfd->fd, pt->events[ev - WSA_WAIT_EVENT_0],
&networkevents) == SOCKET_ERROR) {
lwsl_err("WSAEnumNetworkEvents() failed with error %d\n",
LWS_ERRNO);
@ -231,7 +234,27 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
wsi->sock_send_blocking = 0;
}
return lws_service_fd(context, pfd);
/* if someone faked their LWS_POLLIN, then go through all active fds */
if (lws_service_flag_pending(context, tsi)) {
/* any socket with events to service? */
for (n = 0; n < (int)pt->fds_count; n++) {
if (!pt->fds[n].revents)
continue;
m = lws_service_fd_tsi(context, &pt->fds[n], tsi);
if (m < 0)
return -1;
/* if something closed, retry this slot */
if (m)
n--;
}
return 0;
}
/* otherwise just do the one... must be a way to improve that... */
return lws_service_fd_tsi(context, pfd, tsi);
}
LWS_VISIBLE int

View file

@ -61,19 +61,30 @@ lextable_decode(int pos, char c)
}
void
lws_reset_header_table(struct lws *wsi)
lws_header_table_reset(struct lws *wsi)
{
if (!wsi->u.hdr.ah)
return;
struct allocated_headers *ah = wsi->u.hdr.ah;
/* if we have the idea we're resetting 'our' ah, must be bound to one */
assert(ah);
/* ah also concurs with ownership */
assert(ah->wsi == wsi);
/* init the ah to reflect no headers or data have appeared yet */
memset(wsi->u.hdr.ah->frag_index, 0, sizeof(wsi->u.hdr.ah->frag_index));
wsi->u.hdr.ah->nfrag = 0;
wsi->u.hdr.ah->pos = 0;
memset(ah->frag_index, 0, sizeof(ah->frag_index));
ah->nfrag = 0;
ah->pos = 0;
/* and reset the rx state */
ah->rxpos = 0;
ah->rxlen = 0;
/* since we will restart the ah, our new headers are not completed */
wsi->hdr_parsing_completed = 0;
}
int LWS_WARN_UNUSED_RESULT
lws_allocate_header_table(struct lws *wsi)
lws_header_table_attach(struct lws *wsi)
{
struct lws_context *context = wsi->context;
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
@ -133,6 +144,7 @@ lws_allocate_header_table(struct lws *wsi)
wsi->u.hdr.ah = &pt->ah_pool[n];
wsi->u.hdr.ah->in_use = 1;
pt->ah_pool[n].wsi = wsi; /* mark our owner */
pt->ah_count_in_use++;
_lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa);
@ -143,7 +155,7 @@ lws_allocate_header_table(struct lws *wsi)
lws_pt_unlock(pt);
reset:
lws_reset_header_table(wsi);
lws_header_table_reset(wsi);
time(&wsi->u.hdr.ah->assigned);
return 0;
@ -154,7 +166,7 @@ bail:
return 1;
}
int lws_free_header_table(struct lws *wsi)
int lws_header_table_detach(struct lws *wsi)
{
struct lws_context *context = wsi->context;
struct allocated_headers *ah = wsi->u.hdr.ah;
@ -163,8 +175,18 @@ int lws_free_header_table(struct lws *wsi)
struct lws **pwsi;
time_t now;
lwsl_info("%s: wsi %p: ah %p (tsi=%d, count = %d)\n", __func__, (void *)wsi,
(void *)wsi->u.hdr.ah, wsi->tsi, pt->ah_count_in_use);
lwsl_info("%s: wsi %p: ah %p (tsi=%d, count = %d)\n", __func__,
(void *)wsi, (void *)wsi->u.hdr.ah, wsi->tsi,
pt->ah_count_in_use);
assert(ah);
/* may not be detached while he still has unprocessed rx */
if (ah->rxpos != ah->rxlen) {
lwsl_err("%s: %p: rxpos:%d, rxlen:%d\n", __func__, wsi,
ah->rxpos, ah->rxlen);
assert(ah->rxpos == ah->rxlen);
}
lws_pt_lock(pt);
@ -173,8 +195,8 @@ int lws_free_header_table(struct lws *wsi)
if (wsi->socket_is_permanently_unusable)
while (*pwsi) {
if (*pwsi == wsi) {
lwsl_info("%s: wsi %p, removing from wait list\n",
__func__, wsi);
lwsl_info("%s: wsi %p, remv wait\n",
__func__, wsi);
*pwsi = wsi->u.hdr.ah_wait_list;
wsi->u.hdr.ah_wait_list = NULL;
pt->ah_wait_list_length--;
@ -189,11 +211,13 @@ int lws_free_header_table(struct lws *wsi)
if (now - wsi->u.hdr.ah->assigned > 3)
lwsl_err("header assign - free time %d\n",
(int)(now - wsi->u.hdr.ah->assigned));
/* if we think we're freeing one, there should be one to free */
/* if we think we're detaching one, there should be one in use */
assert(pt->ah_count_in_use > 0);
/* and he should have been in use */
/* and this specific one should have been in use */
assert(wsi->u.hdr.ah->in_use);
wsi->u.hdr.ah = NULL;
ah->wsi = NULL; /* no owner */
if (!*pwsi) {
ah->in_use = 0;
@ -212,13 +236,17 @@ int lws_free_header_table(struct lws *wsi)
lwsl_info("last wsi in wait list %p\n", wsi);
wsi->u.hdr.ah = ah;
lws_reset_header_table(wsi);
ah->wsi = wsi; /* new owner */
lws_header_table_reset(wsi);
time(&wsi->u.hdr.ah->assigned);
assert(wsi->position_in_fds_table != -1);
lwsl_info("%s: Enabling %p POLLIN\n", __func__, wsi);
/* his wait is over, let him progress */
/* he has been stuck waiting for an ah, but now his wait is over,
* let him progress
*/
_lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa);
/* point prev guy to next guy in list instead */
@ -459,7 +487,8 @@ issue_char(struct lws *wsi, unsigned char c)
return -1;
wsi->u.hdr.ah->data[wsi->u.hdr.ah->pos++] = '\0';
lwsl_warn("header %i exceeds limit %d\n",
wsi->u.hdr.parser_state, wsi->u.hdr.current_token_limit);
wsi->u.hdr.parser_state,
wsi->u.hdr.current_token_limit);
}
return 1;
@ -679,7 +708,7 @@ check_eol:
/* bail at EOL */
if (wsi->u.hdr.parser_state != WSI_TOKEN_CHALLENGE &&
c == '\x0d') {
c == '\x0d') {
c = '\0';
wsi->u.hdr.parser_state = WSI_TOKEN_SKIPPING_SAW_CR;
lwsl_parser("*\n");

View file

@ -489,6 +489,7 @@ struct lws_fragments {
*/
struct allocated_headers {
struct lws *wsi; /* owner */
char *data; /* prepared by context init to point to dedicated storage */
/*
* the randomly ordered fragments, indexed by frag_index and
@ -502,6 +503,10 @@ struct allocated_headers {
* the actual header data gets dumped as it comes in, into data[]
*/
unsigned char frag_index[WSI_TOKEN_COUNT];
unsigned char rx[2048];
unsigned int rxpos;
unsigned int rxlen;
#ifndef LWS_NO_CLIENT
char initial_handshake_hash_base64[30];
unsigned short c_port;
@ -946,6 +951,8 @@ struct _lws_header_related {
};
struct _lws_websocket_related {
/* cheapest way to deal with ah overlap with ws union transition */
struct _lws_header_related *hdr;
char *rx_ubuf;
unsigned int rx_ubuf_alloc;
struct lws *rx_draining_ext_list;
@ -1119,6 +1126,12 @@ lws_http_action(struct lws *wsi);
LWS_EXTERN int
lws_b64_selftest(void);
LWS_EXTERN int
lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi);
LWS_EXTERN int
lws_service_flag_pending(struct lws_context *context, int tsi);
#if defined(_WIN32) || defined(MBED_OPERATORS)
LWS_EXTERN struct lws *
wsi_from_fd(const struct lws_context *context, lws_sockfd_type fd);
@ -1244,13 +1257,13 @@ LWS_EXTERN int
lws_plat_set_socket_options(struct lws_context *context, lws_sockfd_type fd);
LWS_EXTERN int LWS_WARN_UNUSED_RESULT
lws_allocate_header_table(struct lws *wsi);
lws_header_table_attach(struct lws *wsi);
LWS_EXTERN int
lws_free_header_table(struct lws *wsi);
lws_header_table_detach(struct lws *wsi);
LWS_EXTERN void
lws_reset_header_table(struct lws *wsi);
lws_header_table_reset(struct lws *wsi);
LWS_EXTERN char * LWS_WARN_UNUSED_RESULT
lws_hdr_simple_ptr(struct lws *wsi, enum lws_token_indexes h);

View file

@ -292,33 +292,20 @@ lws_http_action(struct lws *wsi)
n = wsi->protocol->callback(wsi, LWS_CALLBACK_FILTER_HTTP_CONNECTION,
wsi->user_space, uri_ptr, uri_len);
if (!n) {
/*
* if there is content supposed to be coming,
* put a timeout on it having arrived
*/
lws_set_timeout(wsi, PENDING_TIMEOUT_HTTP_CONTENT,
AWAITING_TIMEOUT);
n = wsi->protocol->callback(wsi, LWS_CALLBACK_HTTP,
wsi->user_space, uri_ptr, uri_len);
}
/*
* If we are in keepalive, we may already have the next header set
* pipelined in the lws_read buffer above us... if so, we must hold
* the ah so it's still bound when we want to process the next headers.
*
*/
if (connection_type == HTTP_CONNECTION_CLOSE ||
!wsi->u.hdr.more_rx_waiting)
/* now drop the header info we kept a pointer to */
lws_free_header_table(wsi);
if (n) {
lwsl_info("LWS_CALLBACK_HTTP closing\n");
return 1; /* struct ah ptr already nuked */ }
return 1;
}
/*
* if there is content supposed to be coming,
* put a timeout on it having arrived
*/
lws_set_timeout(wsi, PENDING_TIMEOUT_HTTP_CONTENT,
AWAITING_TIMEOUT);
n = wsi->protocol->callback(wsi, LWS_CALLBACK_HTTP,
wsi->user_space, uri_ptr, uri_len);
/*
* If we're not issuing a file, check for content_length or
@ -336,7 +323,9 @@ lws_http_action(struct lws *wsi)
return 0;
bail_nuke_ah:
lws_free_header_table(wsi);
/* we're closing, losing some rx is OK */
wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen;
lws_header_table_detach(wsi);
return 1;
}
@ -346,12 +335,15 @@ int
lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len)
{
struct lws_context *context = lws_get_context(wsi);
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
struct _lws_header_related hdr;
struct allocated_headers *ah;
int protocol_len, n, hit;
char protocol_list[128];
char protocol_name[32];
char *p;
assert(len < 10000000);
assert(wsi->u.hdr.ah);
while (len--) {
@ -563,10 +555,34 @@ upgrade_ws:
goto bail_nuke_ah;
}
/* drop the header info -- no bail_nuke_ah after this */
lws_free_header_table(wsi);
/* we are upgrading to ws, so http/1.1 and keepalive +
* pipelined header considerations about keeping the ah around
* no longer apply. However it's common for the first ws
* protocol data to have been coalesced with the browser
* upgrade request and to already be in the ah rx buffer.
*/
lwsl_err("%s: %p: inheriting ah in ws mode (rxpos: %d, rxlen: %d)\n",
__func__, wsi, wsi->u.hdr.ah->rxpos, wsi->u.hdr.ah->rxlen);
lws_pt_lock(pt);
hdr = wsi->u.hdr;
lws_union_transition(wsi, LWSCM_WS_SERVING);
/*
* first service is WS mode will notice this, use the RX and
* then detach the ah (caution: we are not in u.hdr union
* mode any more then... ah_temp member is at start the same
* though)
*
* Beacuse rxpos/rxlen shows something in the ah, we will get
* service guaranteed next time around the event loop
*
* All union members begin with hdr, so we can use it even
* though we transitioned to ws union mode (the ah detach
* code uses it anyway).
*/
wsi->u.hdr = hdr;
lws_pt_unlock(pt);
/*
* create the frame buffer for this connection according to the
@ -601,7 +617,10 @@ upgrade_ws:
bail_nuke_ah:
/* drop the header info */
lws_free_header_table(wsi);
/* we're closing, losing some rx is OK */
wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen;
lws_header_table_detach(wsi);
return 1;
}
@ -718,10 +737,11 @@ lws_http_transaction_completed(struct lws *wsi)
* that is already at least the start of another header set, simply
* reset the existing header table and keep it.
*/
if (!wsi->u.hdr.more_rx_waiting)
lws_free_header_table(wsi);
if (wsi->u.hdr.ah &&
wsi->u.hdr.ah->rxpos == wsi->u.hdr.ah->rxlen)
lws_header_table_detach(wsi);
else
lws_reset_header_table(wsi);
lws_header_table_reset(wsi);
/* If we're (re)starting on headers, need other implied init */
wsi->u.hdr.ues = URIES_IDLE;
@ -804,11 +824,11 @@ lws_server_socket_service(struct lws_context *context, struct lws *wsi,
{
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
lws_sockfd_type accept_fd = LWS_SOCK_INVALID;
struct allocated_headers *ah;
#if LWS_POSIX
struct sockaddr_in cli_addr;
socklen_t clilen;
#endif
int n, len;
switch (wsi->mode) {
@ -825,11 +845,12 @@ lws_server_socket_service(struct lws_context *context, struct lws *wsi,
if (!(pollfd->revents & LWS_POLLOUT))
break;
if (lws_issue_raw(wsi, wsi->trunc_alloc + wsi->trunc_offset,
if (lws_issue_raw(wsi, wsi->trunc_alloc +
wsi->trunc_offset,
wsi->trunc_len) < 0)
goto fail;
/*
* we can't afford to allow input processing send
* we can't afford to allow input processing to send
* something new, so spin around he event loop until
* he doesn't have any partials
*/
@ -841,34 +862,63 @@ lws_server_socket_service(struct lws_context *context, struct lws *wsi,
if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
goto try_pollout;
if ((wsi->state == LWSS_HTTP ||
wsi->state == LWSS_HTTP_ISSUING_FILE ||
wsi->state == LWSS_HTTP_HEADERS) && !wsi->u.hdr.ah)
if (lws_allocate_header_table(wsi))
goto try_pollout;
/* these states imply we MUST have an ah attached */
/*
* This is a good situation, the ah allocated and we know there
* is header data pending. However, in http1.1 / keepalive
* case, back-to-back header sets pipelined into one packet
* is common.
*
* Ah is defined to be required to stay attached to the wsi
* until the current set of header data completes, which may
* involve network roundtrips if fragmented. Typically the
* header set is not fragmented and gets done atomically.
*
* When we complete processing for the first header set, we
* normally drop the ah in lws_http_transaction_completed()
* since we do not know how long it would be held waiting for
* the start of the next header set to arrive.
*
* However if there is pending data, http1.1 / keepalive mode
* is active, we need to retain (just reset) the ah after
* dealing with each header set instead of dropping it.
*
* Otherwise the remaining header data has nowhere to be stored.
*/
if (wsi->state == LWSS_HTTP ||
wsi->state == LWSS_HTTP_ISSUING_FILE ||
wsi->state == LWSS_HTTP_HEADERS) {
if (!wsi->u.hdr.ah)
if (lws_header_table_attach(wsi))
goto try_pollout;
ah = wsi->u.hdr.ah;
lwsl_debug("%s: %p: rxpos:%d rxlen:%d\n", __func__, wsi,
ah->rxpos, ah->rxlen);
/* if nothing in ah rx buffer, get some fresh rx */
if (ah->rxpos == ah->rxlen) {
ah->rxlen = lws_ssl_capable_read(wsi, ah->rx,
sizeof(ah->rx));
ah->rxpos = 0;
lwsl_debug("%s: wsi %p, ah->rxlen = %d\r\n",
__func__, wsi, ah->rxlen);
switch (ah->rxlen) {
case 0:
lwsl_info("%s: read 0 len\n", __func__);
/* lwsl_info(" state=%d\n", wsi->state); */
// if (!wsi->hdr_parsing_completed)
// lws_header_table_detach(wsi);
/* fallthru */
case LWS_SSL_CAPABLE_ERROR:
goto fail;
case LWS_SSL_CAPABLE_MORE_SERVICE:
ah->rxlen = ah->rxpos = 0;
goto try_pollout;
}
}
assert(ah->rxpos != ah->rxlen && ah->rxlen);
/* just ignore incoming if waiting for close */
if (wsi->state != LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE) {
n = lws_read(wsi, ah->rx + ah->rxpos,
ah->rxlen - ah->rxpos);
if (n < 0) /* we closed wsi */
return 1;
if (wsi->u.hdr.ah) {
if ( wsi->u.hdr.ah->rxlen)
wsi->u.hdr.ah->rxpos += n;
if (wsi->u.hdr.ah->rxpos == wsi->u.hdr.ah->rxlen &&
(wsi->mode != LWSCM_HTTP_SERVING &&
wsi->mode != LWSCM_HTTP_SERVING_ACCEPTED &&
wsi->mode != LWSCM_HTTP2_SERVING))
lws_header_table_detach(wsi);
}
break;
}
goto try_pollout;
}
len = lws_ssl_capable_read(wsi, pt->serv_buf,
LWS_MAX_SOCKET_IO_BUF);
@ -877,8 +927,8 @@ lws_server_socket_service(struct lws_context *context, struct lws *wsi,
case 0:
lwsl_info("%s: read 0 len\n", __func__);
/* lwsl_info(" state=%d\n", wsi->state); */
if (!wsi->hdr_parsing_completed)
lws_free_header_table(wsi);
// if (!wsi->hdr_parsing_completed)
// lws_header_table_detach(wsi);
/* fallthru */
case LWS_SSL_CAPABLE_ERROR:
goto fail;

View file

@ -319,10 +319,13 @@ lws_service_timeout_check(struct lws *wsi, unsigned int sec)
lws_pt_lock(pt);
pwsi = &pt->ah_wait_list;
if (!pwsi)
return 0;
while (*pwsi) {
if (*pwsi == wsi)
break;
pwsi = &(*pwsi)->u.hdr.ah_wait_list;
lwsl_err("%s: pwsi=%p\n", __func__, pwsi);
}
lws_pt_unlock(pt);
@ -363,6 +366,124 @@ int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len)
return 0;
}
/* this is used by the platform service code to stop us waiting for network
* activity in poll() when we have something that already needs service
*/
int
lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi)
{
struct lws_context_per_thread *pt = &context->pt[tsi];
int n;
/* Figure out if we really want to wait in poll()
* We only need to wait if really nothing already to do and we have
* to wait for something from network
*/
/* 1) if we know we are draining rx ext, do not wait in poll */
if (pt->rx_draining_ext_list)
timeout_ms = 0;
#ifdef LWS_OPENSSL_SUPPORT
/* 2) if we know we have non-network pending data, do not wait in poll */
if (lws_ssl_anybody_has_buffered_read_tsi(context, tsi)) {
timeout_ms = 0;
lwsl_err("ssl buffered read\n");
}
#endif
/* 3) if any ah has pending rx, do not wait in poll */
for (n = 0; n < context->max_http_header_pool; n++)
if (pt->ah_pool[n].rxpos != pt->ah_pool[n].rxlen) {
/* any ah with pending rx must be attached to someone */
assert(pt->ah_pool[n].wsi);
timeout_ms = 0;
break;
}
return timeout_ms;
}
/*
* guys that need POLLIN service again without waiting for network action
* can force POLLIN here if not flowcontrolled, so they will get service.
*
* Return nonzero if anybody got their POLLIN faked
*/
int
lws_service_flag_pending(struct lws_context *context, int tsi)
{
struct lws_context_per_thread *pt = &context->pt[tsi];
#ifdef LWS_OPENSSL_SUPPORT
struct lws *wsi_next;
#endif
struct lws *wsi;
int forced = 0;
int n;
/* POLLIN faking */
/*
* 1) For all guys with already-available ext data to drain, if they are
* not flowcontrolled, fake their POLLIN status
*/
wsi = pt->rx_draining_ext_list;
while (wsi) {
pt->fds[wsi->position_in_fds_table].revents |=
pt->fds[wsi->position_in_fds_table].events & LWS_POLLIN;
if (pt->fds[wsi->position_in_fds_table].revents &
LWS_POLLIN)
forced = 1;
wsi = wsi->u.ws.rx_draining_ext_list;
}
#ifdef LWS_OPENSSL_SUPPORT
/*
* 2) For all guys with buffered SSL read data already saved up, if they
* are not flowcontrolled, fake their POLLIN status so they'll get
* service to use up the buffered incoming data, even though their
* network socket may have nothing
*/
wsi = pt->pending_read_list;
while (wsi) {
wsi_next = wsi->pending_read_list_next;
pt->fds[wsi->position_in_fds_table].revents |=
pt->fds[wsi->position_in_fds_table].events & LWS_POLLIN;
if (pt->fds[wsi->position_in_fds_table].revents & LWS_POLLIN) {
forced = 1;
/*
* he's going to get serviced now, take him off the
* list of guys with buffered SSL. If he still has some
* at the end of the service, he'll get put back on the
* list then.
*/
lws_ssl_remove_wsi_from_buffered_list(wsi);
}
wsi = wsi_next;
}
#endif
/*
* 3) For any wsi who have an ah with pending RX who did not
* complete their current headers, and are not flowcontrolled,
* fake their POLLIN status so they will be able to drain the
* rx buffered in the ah
*/
for (n = 0; n < context->max_http_header_pool; n++)
if (pt->ah_pool[n].rxpos != pt->ah_pool[n].rxlen &&
!pt->ah_pool[n].wsi->hdr_parsing_completed) {
pt->fds[pt->ah_pool[n].wsi->position_in_fds_table].revents |=
pt->fds[pt->ah_pool[n].wsi->position_in_fds_table].events &
LWS_POLLIN;
if (pt->fds[pt->ah_pool[n].wsi->position_in_fds_table].revents &
LWS_POLLIN)
forced = 1;
}
return forced;
}
/**
* lws_service_fd() - Service polled socket with something waiting
* @context: Websocket context
@ -519,7 +640,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
lwsl_info("lws_service_fd: closing\n");
goto close_and_handled;
}
#if 1
if (wsi->state == LWSS_RETURNED_CLOSE_ALREADY ||
wsi->state == LWSS_AWAITING_CLOSE_ACK) {
/*
@ -530,7 +651,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
lws_rx_flow_control(wsi, 1);
wsi->u.ws.tx_draining_ext = 0;
}
#endif
if (wsi->u.ws.tx_draining_ext) {
/* we cannot deal with new RX until the TX ext
* path has been drained. It's because new
@ -592,27 +713,42 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
goto drain;
}
/* 4: any incoming data ready?
/* 4: any incoming (or ah-stashed incoming rx) data ready?
* notice if rx flow going off raced poll(), rx flow wins
*/
if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
break;
read:
eff_buf.token_len = lws_ssl_capable_read(wsi, pt->serv_buf,
pending ? pending : LWS_MAX_SOCKET_IO_BUF);
switch (eff_buf.token_len) {
case 0:
lwsl_info("service_fd: closing due to 0 length read\n");
goto close_and_handled;
case LWS_SSL_CAPABLE_MORE_SERVICE:
lwsl_info("SSL Capable more service\n");
n = 0;
goto handled;
case LWS_SSL_CAPABLE_ERROR:
lwsl_info("Closing when error\n");
goto close_and_handled;
}
/* all the union members start with hdr, so even in ws mode
* we can deal with the ah via u.hdr
*/
if (wsi->u.hdr.ah) {
lwsl_err("%s: %p: using inherited ah rx\n", __func__, wsi);
eff_buf.token_len = wsi->u.hdr.ah->rxlen -
wsi->u.hdr.ah->rxpos;
eff_buf.token = (char *)wsi->u.hdr.ah->rx +
wsi->u.hdr.ah->rxpos;
} else {
eff_buf.token_len = lws_ssl_capable_read(wsi, pt->serv_buf,
pending ? pending : LWS_MAX_SOCKET_IO_BUF);
switch (eff_buf.token_len) {
case 0:
lwsl_info("service_fd: closing due to 0 length read\n");
goto close_and_handled;
case LWS_SSL_CAPABLE_MORE_SERVICE:
lwsl_info("SSL Capable more service\n");
n = 0;
goto handled;
case LWS_SSL_CAPABLE_ERROR:
lwsl_info("Closing when error\n");
goto close_and_handled;
}
eff_buf.token = (char *)pt->serv_buf;
}
/*
* give any active extensions a chance to munge the buffer
* before parse. We pass in a pointer to an lws_tokens struct
@ -624,15 +760,12 @@ read:
* extension callback handling, just the normal input buffer is
* used then so it is efficient.
*/
eff_buf.token = (char *)pt->serv_buf;
drain:
do {
more = 0;
m = lws_ext_cb_active(wsi,
LWS_EXT_CB_PACKET_RX_PREPARSE, &eff_buf, 0);
m = lws_ext_cb_active(wsi, LWS_EXT_CB_PACKET_RX_PREPARSE,
&eff_buf, 0);
if (m < 0)
goto close_and_handled;
if (m)
@ -661,6 +794,16 @@ drain:
eff_buf.token_len = 0;
} while (more);
if (wsi->u.hdr.ah) {
lwsl_err("%s: %p: detaching inherited used ah\n", __func__, wsi);
/* show we used all the pending rx up */
wsi->u.hdr.ah->rxpos = wsi->u.hdr.ah->rxlen;
/* we can run the normal ah detach flow despite
* being in ws union mode, since all union members
* start with hdr */
lws_header_table_detach(wsi);
}
pending = lws_ssl_pending(wsi);
if (pending) {
handle_pending:
@ -670,7 +813,7 @@ handle_pending:
}
if (draining_flow && wsi->rxflow_buffer &&
wsi->rxflow_pos == wsi->rxflow_len) {
wsi->rxflow_pos == wsi->rxflow_len) {
lwsl_info("flow buffer: drained\n");
lws_free_set_NULL(wsi->rxflow_buffer);
/* having drained the rxflow buffer, can rearm POLLIN */