1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

solve flowcontrol problems

Problems with rx flow control implementation were the underlying cause
of the connection stalling issue that was covered up with the udelay()
patch that was removed recently.

This get rx flow control working properly and corrects problems with
fifo management in the test server mirror protocol code too.

The rxfow control api has been changed to just set a flag, so it's very cheap
to call from user code.  After the callbacks that might use the rxflow control
api the flag is checked and any pending actions done.

rx flow control now stops any rx packet coming immediately, with compessed
connections "just what was left in the pipe" might be hundreds of KBytes.  To
implement that the current packet being decoded is copied into a malloc'd buffer
by the rx processing code now.

When rxflow is allows to come again, the buffer is drained and freed before any
new packet content is accepted.

Signed-off-by: Andy Green <andy.green@linaro.org>
This commit is contained in:
Andy Green 2013-01-17 16:50:35 +08:00
parent 0303db482e
commit 706961dbb5
6 changed files with 165 additions and 44 deletions

View file

@ -36,7 +36,7 @@ else
dist_libwebsockets_la_SOURCES += md5.c sha-1.c
endif
libwebsockets_la_CFLAGS=-Wall -std=gnu99 -pedantic
libwebsockets_la_CFLAGS=-Wall -std=gnu99 -pedantic -g
libwebsockets_la_LDFLAGS=
if MINGW

View file

@ -163,7 +163,7 @@ bail:
* screwed.. close the connection... we will get a
* destroy callback to take care of closing nicely
*/
fprintf(stderr, "zlib error inflate %d: %s",
lwsl_err("zlib error inflate %d: %s\n",
n, conn->zs_in.msg);
return -1;
}

View file

@ -371,6 +371,9 @@ just_kill_connection:
if (wsi->c_address)
free(wsi->c_address);
if (wsi->rxflow_buffer)
free(wsi->rxflow_buffer);
/* lwsl_info("closing fd=%d\n", wsi->sock); */
#ifdef LWS_OPENSSL_SUPPORT
@ -660,7 +663,8 @@ notify_action:
else
n = LWS_CALLBACK_SERVER_WRITEABLE;
wsi->protocol->callback(context, wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0);
user_callback_handle_rxflow(wsi->protocol->callback, context,
wsi, (enum libwebsocket_callback_reasons) n, wsi->user_space, NULL, 0);
return 0;
}
@ -902,7 +906,7 @@ libwebsocket_service_fd(struct libwebsocket_context *context,
LWS_CLOSE_STATUS_NOSTATUS);
else
if (wsi->state == WSI_STATE_HTTP && wsi->protocol->callback)
if (wsi->protocol->callback(context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space,
if (user_callback_handle_rxflow(wsi->protocol->callback, context, wsi, LWS_CALLBACK_HTTP_FILE_COMPLETION, wsi->user_space,
wsi->filepath, wsi->filepos))
libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS);
break;
@ -1117,7 +1121,7 @@ bail_prox_listener:
/* broadcast it to this connection */
new_wsi->protocol->callback(context, new_wsi,
user_callback_handle_rxflow(new_wsi->protocol->callback, context, new_wsi,
LWS_CALLBACK_BROADCAST,
new_wsi->user_space,
buf + LWS_SEND_BUFFER_PRE_PADDING, len);
@ -1531,6 +1535,51 @@ libwebsocket_get_socket_fd(struct libwebsocket *wsi)
return wsi->sock;
}
int
_libwebsocket_rx_flow_control(struct libwebsocket *wsi)
{
struct libwebsocket_context *context = wsi->protocol->owning_server;
int n;
if (!(wsi->rxflow_change_to & 2))
return 0;
wsi->rxflow_change_to &= ~2;
lwsl_info("rxflow: wsi %p change_to %d\n", wsi, wsi->rxflow_change_to);
/* if we're letting it come again, did we interrupt anything? */
if ((wsi->rxflow_change_to & 1) && wsi->rxflow_buffer) {
n = libwebsocket_interpret_incoming_packet(wsi, NULL, 0);
if (n < 0) {
libwebsocket_close_and_free_session(context, wsi, LWS_CLOSE_STATUS_NOSTATUS);
return -1;
}
if (n)
/* oh he stuck again, do nothing */
return 0;
}
if (wsi->rxflow_change_to & 1)
context->fds[wsi->position_in_fds_table].events |= POLLIN;
else
context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
if (wsi->rxflow_change_to & 1)
/* external POLL support via protocol 0 */
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_SET_MODE_POLL_FD,
(void *)(long)wsi->sock, NULL, POLLIN);
else
/* external POLL support via protocol 0 */
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_CLEAR_MODE_POLL_FD,
(void *)(long)wsi->sock, NULL, POLLIN);
return 1;
}
/**
* libwebsocket_rx_flow_control() - Enable and disable socket servicing for
* receieved packets.
@ -1545,36 +1594,12 @@ libwebsocket_get_socket_fd(struct libwebsocket *wsi)
int
libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
{
struct libwebsocket_context *context = wsi->protocol->owning_server;
int n;
wsi->rxflow_change_to = 2 | !!enable;
for (n = 0; n < context->fds_count; n++)
if (context->fds[n].fd == wsi->sock) {
if (enable)
context->fds[n].events |= POLLIN;
else
context->fds[n].events &= ~POLLIN;
return 0;
}
if (enable)
/* external POLL support via protocol 0 */
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_SET_MODE_POLL_FD,
(void *)(long)wsi->sock, NULL, POLLIN);
else
/* external POLL support via protocol 0 */
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_CLEAR_MODE_POLL_FD,
(void *)(long)wsi->sock, NULL, POLLIN);
#if 0
lwsl_err("libwebsocket_rx_flow_control unable to find socket\n");
#endif
return 1;
return 0;
}
/**
* libwebsocket_canonical_hostname() - returns this host's hostname
*
@ -1630,6 +1655,23 @@ OpenSSL_verify_callback(int preverify_ok, X509_STORE_CTX *x509_ctx)
}
#endif
int user_callback_handle_rxflow(callback_function callback_function,
struct libwebsocket_context * context,
struct libwebsocket *wsi,
enum libwebsocket_callback_reasons reason, void *user,
void *in, size_t len)
{
int n;
n = callback_function(context, wsi, reason, user, in, len);
if (n < 0)
return n;
_libwebsocket_rx_flow_control(wsi);
return 0;
}
/**
* libwebsocket_create_context() - Create the websocket handler
@ -2366,7 +2408,8 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol,
if (wsi->protocol != protocol)
continue;
wsi->protocol->callback(context, wsi,
user_callback_handle_rxflow(wsi->protocol->callback,
context, wsi,
LWS_CALLBACK_BROADCAST,
wsi->user_space,
buf, len);

View file

@ -664,6 +664,7 @@ handle_first:
break;
case LWS_RXPS_EAT_UNTIL_76_FF:
if (c == 0xff) {
wsi->lws_rx_parse_state = LWS_RXPS_NEW;
goto issue;
@ -675,7 +676,8 @@ handle_first:
break;
issue:
if (wsi->protocol->callback)
wsi->protocol->callback(wsi->protocol->owning_server,
user_callback_handle_rxflow(wsi->protocol->callback,
wsi->protocol->owning_server,
wsi, LWS_CALLBACK_RECEIVE,
wsi->user_space,
&wsi->rx_user_buffer[LWS_SEND_BUFFER_PRE_PADDING],
@ -865,7 +867,8 @@ spill:
eff_buf.token[eff_buf.token_len] = '\0';
if (wsi->protocol->callback)
wsi->protocol->callback(wsi->protocol->owning_server,
user_callback_handle_rxflow(wsi->protocol->callback,
wsi->protocol->owning_server,
wsi, LWS_CALLBACK_RECEIVE,
wsi->user_space,
eff_buf.token,
@ -895,18 +898,58 @@ int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi,
unsigned char *buf, size_t len)
{
size_t n;
int m;
int clear_rxflow = !!wsi->rxflow_buffer;
struct libwebsocket_context *context = wsi->protocol->owning_server;
#ifdef DEBUG
lwsl_parser("received %d byte packet\n", (int)len);
lwsl_hexdump(buf, len);
#endif
if (buf && wsi->rxflow_buffer)
lwsl_err("!!!! libwebsocket_interpret_incoming_packet: was pending rxflow, data loss\n");
/* let the rx protocol state machine have as much as it needs */
n = 0;
while (n < len)
if (libwebsocket_rx_sm(wsi, buf[n++]) < 0)
if (!buf) {
lwsl_info("dumping stored rxflow buffer len %d pos=%d\n", wsi->rxflow_len, wsi->rxflow_pos);
buf = wsi->rxflow_buffer;
n = wsi->rxflow_pos;
len = wsi->rxflow_len;
/* let's pretend he's already allowing input */
context->fds[wsi->position_in_fds_table].events |= POLLIN;
}
while (n < len) {
if (!(context->fds[wsi->position_in_fds_table].events & POLLIN)) {
/* his RX is flowcontrolled */
if (!wsi->rxflow_buffer) { /* a new rxflow in effect, buffer it and warn caller */
lwsl_info("new rxflow input buffer len %d\n", len - n);
wsi->rxflow_buffer = (unsigned char *)malloc(len - n);
wsi->rxflow_len = len - n;
wsi->rxflow_pos = 0;
memcpy(wsi->rxflow_buffer, buf + n, len - n);
} else {
lwsl_info("re-using rxflow input buffer\n");
/* rxflow while we were spilling previous rxflow buffer */
wsi->rxflow_pos = n;
}
return 1;
}
m = libwebsocket_rx_sm(wsi, buf[n]);
if (m < 0)
return -1;
n++;
}
if (clear_rxflow) {
lwsl_info("flow: clearing it\n");
free(wsi->rxflow_buffer);
wsi->rxflow_buffer = NULL;
context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
}
return 0;
}

View file

@ -345,6 +345,10 @@ struct libwebsocket {
int sock;
int position_in_fds_table;
unsigned char *rxflow_buffer;
int rxflow_len;
int rxflow_pos;
int rxflow_change_to;
enum lws_rx_parse_state lws_rx_parse_state;
char extension_data_pending;
@ -480,6 +484,15 @@ extern int
lws_issue_raw_ext_access(struct libwebsocket *wsi,
unsigned char *buf, size_t len);
extern int
_libwebsocket_rx_flow_control(struct libwebsocket *wsi);
extern int
user_callback_handle_rxflow(callback_function, struct libwebsocket_context * context,
struct libwebsocket *wsi,
enum libwebsocket_callback_reasons reason, void *user,
void *in, size_t len);
#ifndef LWS_OPENSSL_SUPPORT
unsigned char *

View file

@ -334,6 +334,8 @@ struct a_message {
static struct a_message ringbuffer[MAX_MESSAGE_QUEUE];
static int ringbuffer_head;
static struct libwebsocket *wsi_choked[20];
static int num_wsi_choked;
static int
callback_lws_mirror(struct libwebsocket_context *context,
@ -365,7 +367,7 @@ callback_lws_mirror(struct libwebsocket_context *context,
LWS_WRITE_TEXT);
if (n < 0) {
fprintf(stderr, "ERROR %d writing to socket\n", n);
exit(1);
return 1;
}
if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
@ -373,9 +375,14 @@ callback_lws_mirror(struct libwebsocket_context *context,
else
pss->ringbuffer_tail++;
if (((ringbuffer_head - pss->ringbuffer_tail) %
MAX_MESSAGE_QUEUE) < (MAX_MESSAGE_QUEUE - 15))
libwebsocket_rx_flow_control(wsi, 1);
if (((ringbuffer_head - pss->ringbuffer_tail) &
(MAX_MESSAGE_QUEUE - 1)) < (MAX_MESSAGE_QUEUE - 15)) {
for (n = 0; n < num_wsi_choked; n++)
libwebsocket_rx_flow_control(wsi_choked[n], 1);
num_wsi_choked = 0;
}
// fprintf(stderr, "tx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1));
libwebsocket_callback_on_writable(context, wsi);
@ -390,6 +397,12 @@ callback_lws_mirror(struct libwebsocket_context *context,
case LWS_CALLBACK_RECEIVE:
if (((ringbuffer_head - pss->ringbuffer_tail) &
(MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) {
fprintf(stderr, "dropping!\n");
goto choke;
}
if (ringbuffer[ringbuffer_head].payload)
free(ringbuffer[ringbuffer_head].payload);
@ -404,13 +417,22 @@ callback_lws_mirror(struct libwebsocket_context *context,
else
ringbuffer_head++;
if (((ringbuffer_head - pss->ringbuffer_tail) %
MAX_MESSAGE_QUEUE) > (MAX_MESSAGE_QUEUE - 10))
libwebsocket_rx_flow_control(wsi, 0);
if (((ringbuffer_head - pss->ringbuffer_tail) &
(MAX_MESSAGE_QUEUE - 1)) < (MAX_MESSAGE_QUEUE - 10))
goto done;
choke:
if (num_wsi_choked < sizeof wsi_choked / sizeof wsi_choked[0]) {
libwebsocket_rx_flow_control(wsi, 0);
wsi_choked[num_wsi_choked++] = wsi;
}
// fprintf(stderr, "rx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1));
done:
libwebsocket_callback_on_writable_all_protocol(
libwebsockets_get_protocol(wsi));
break;
/*
* this just demonstrates how to use the protocol filter. If you won't
* study and reject connections based on header content, you don't need