1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

rxflow remove recursion and simplify

Signed-off-by: Andy Green <andy.green@linaro.org>
This commit is contained in:
Andy Green 2013-03-16 11:24:23 +08:00
parent 37029d9081
commit ca0a129065
7 changed files with 100 additions and 74 deletions

View file

@ -626,6 +626,8 @@ check_accept:
memset(&wsi->u, 0, sizeof(wsi->u));
wsi->u.ws.rxflow_change_to = LWS_RXFLOW_ALLOW;
/*
* create the frame buffer for this connection according to the
* size mentioned in the protocol definition. If 0 there, then

View file

@ -239,6 +239,7 @@ libwebsocket_read(struct libwebsocket_context *context,
/* union transition */
memset(&wsi->u, 0, sizeof(wsi->u));
wsi->u.ws.rxflow_change_to = LWS_RXFLOW_ALLOW;
/*
* create the frame buffer for this connection according to the

View file

@ -841,6 +841,7 @@ libwebsocket_service_fd(struct libwebsocket_context *context,
struct timeval tv;
int timed_out = 0;
int our_fd = 0;
char draining_flow = 0;
#ifndef LWS_NO_EXTENSIONS
int more = 1;
@ -984,14 +985,25 @@ libwebsocket_service_fd(struct libwebsocket_context *context,
/* the guy requested a callback when it was OK to write */
if ((pollfd->revents & POLLOUT) &&
wsi->state == WSI_STATE_ESTABLISHED)
if (lws_handle_POLLOUT_event(context, wsi,
pollfd) < 0) {
wsi->state == WSI_STATE_ESTABLISHED &&
lws_handle_POLLOUT_event(context, wsi, pollfd) < 0) {
lwsl_info("libwebsocket_service_fd: closing\n");
goto close_and_handled;
}
if (wsi->u.ws.rxflow_buffer &&
(wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)) {
lwsl_info("draining rxflow\n");
/* well, drain it */
eff_buf.token = (char *)wsi->u.ws.rxflow_buffer +
wsi->u.ws.rxflow_pos;
eff_buf.token_len = wsi->u.ws.rxflow_len -
wsi->u.ws.rxflow_pos;
draining_flow = 1;
goto drain;
}
/* any incoming data ready? */
if (!(pollfd->revents & POLLIN))
@ -1041,6 +1053,7 @@ read_pending:
*/
eff_buf.token = (char *)context->service_buffer;
drain:
#ifndef LWS_NO_EXTENSIONS
more = 1;
while (more) {
@ -1079,6 +1092,14 @@ read_pending:
eff_buf.token_len = 0;
}
#endif
if (draining_flow && wsi->u.ws.rxflow_buffer &&
wsi->u.ws.rxflow_pos == wsi->u.ws.rxflow_len) {
lwsl_info("flow buffer: drained\n");
free(wsi->u.ws.rxflow_buffer);
wsi->u.ws.rxflow_buffer = NULL;
/* having drained the rxflow buffer, can rearm POLLIN */
_libwebsocket_rx_flow_control(wsi);
}
#ifdef LWS_OPENSSL_SUPPORT
if (wsi->ssl && SSL_pending(wsi->ssl))
@ -1099,10 +1120,9 @@ read_pending:
goto handled;
close_and_handled:
libwebsocket_close_and_free_session(
context, wsi,
LWS_CLOSE_STATUS_NOSTATUS);
n = 0;
libwebsocket_close_and_free_session(context, wsi,
LWS_CLOSE_STATUS_NOSTATUS);
n = 1;
handled:
pollfd->revents = 0;
@ -1249,6 +1269,7 @@ int
libwebsocket_service(struct libwebsocket_context *context, int timeout_ms)
{
int n;
int m;
/* stay dead once we are dead */
@ -1266,11 +1287,17 @@ libwebsocket_service(struct libwebsocket_context *context, int timeout_ms)
/* any socket with events to service? */
for (n = 0; n < context->fds_count; n++)
if (context->fds[n].revents)
if (libwebsocket_service_fd(context,
&context->fds[n]) < 0)
return -1;
for (n = 0; n < context->fds_count; n++) {
if (!context->fds[n].revents)
continue;
m = libwebsocket_service_fd(context, &context->fds[n]);
if (m < 0)
return -1;
/* if something closed, retry this slot */
if (m)
n--;
}
return 0;
}
@ -1479,7 +1506,7 @@ lws_latency(struct libwebsocket_context *context, struct libwebsocket *wsi,
#ifdef LWS_NO_SERVER
int
_libwebsocket_rx_flow_control(struct libwebsocket *wsi)
_libwebsocket_rx_flow_control(struct libswebsocket *wsi)
{
return 0;
}
@ -1488,34 +1515,33 @@ int
_libwebsocket_rx_flow_control(struct libwebsocket *wsi)
{
struct libwebsocket_context *context = wsi->protocol->owning_server;
int n;
if (!(wsi->u.ws.rxflow_change_to & 2))
/* there is no pending change */
if (!(wsi->u.ws.rxflow_change_to & LWS_RXFLOW_PENDING_CHANGE))
return 0;
wsi->u.ws.rxflow_change_to &= ~2;
lwsl_info("rxflow: wsi %p change_to %d\n",
wsi, wsi->u.ws.rxflow_change_to);
/* if we're letting it come again, did we interrupt anything? */
if ((wsi->u.ws.rxflow_change_to & 1) && wsi->u.ws.rxflow_buffer) {
n = libwebsocket_interpret_incoming_packet(wsi, NULL, 0);
if (n < 0) {
lwsl_info("libwebsocket_rx_flow_control: close req\n");
return -1;
}
if (n)
/* oh he stuck again, do nothing */
return 0;
/* stuff is still buffered, not ready to really accept new input */
if (wsi->u.ws.rxflow_buffer) {
/* get ourselves called back to deal with stashed buffer */
libwebsocket_callback_on_writable(context, wsi);
return 0;
}
if (wsi->u.ws.rxflow_change_to & 1)
/* pending is cleared, we can change rxflow state */
wsi->u.ws.rxflow_change_to &= ~LWS_RXFLOW_PENDING_CHANGE;
lwsl_info("rxflow: wsi %p change_to %d\n", wsi,
wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW);
/* adjust the pollfd for this wsi */
if (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)
context->fds[wsi->position_in_fds_table].events |= POLLIN;
else
context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
if (wsi->u.ws.rxflow_change_to & 1)
if (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)
/* external POLL support via protocol 0 */
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_SET_MODE_POLL_FD,
@ -1544,7 +1570,11 @@ _libwebsocket_rx_flow_control(struct libwebsocket *wsi)
int
libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable)
{
wsi->u.ws.rxflow_change_to = 2 | !!enable;
if (enable == (wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW))
return 0;
lwsl_info("libwebsocket_rx_flow_control(0x%p, %d)\n", wsi, enable);
wsi->u.ws.rxflow_change_to = LWS_RXFLOW_PENDING_CHANGE | !!enable;
return 0;
}

View file

@ -1047,36 +1047,22 @@ illegal_ctl_length:
int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi,
unsigned char *buf, size_t len)
{
size_t n;
size_t n = 0;
int m;
int clear_rxflow = !!wsi->u.ws.rxflow_buffer;
struct libwebsocket_context *context = wsi->protocol->owning_server;
#if 0
lwsl_parser("received %d byte packet\n", (int)len);
lwsl_hexdump(buf, len);
#endif
if (buf && wsi->u.ws.rxflow_buffer)
lwsl_err("!!!! pending rxflow data loss\n");
/* let the rx protocol state machine have as much as it needs */
n = 0;
if (!buf) {
lwsl_info("dumping stored rxflow buffer len %d pos=%d\n",
wsi->u.ws.rxflow_len, wsi->u.ws.rxflow_pos);
buf = wsi->u.ws.rxflow_buffer;
n = wsi->u.ws.rxflow_pos;
len = wsi->u.ws.rxflow_len;
/* let's pretend he's already allowing input */
context->fds[wsi->position_in_fds_table].events |= POLLIN;
}
while (n < len) {
if (!(context->fds[wsi->position_in_fds_table].events &
POLLIN)) {
/* his RX is flowcontrolled */
/*
* we were accepting input but now we stopped doing so
*/
if (!(wsi->u.ws.rxflow_change_to & LWS_RXFLOW_ALLOW)) {
/* his RX is flowcontrolled, don't send remaining now */
if (!wsi->u.ws.rxflow_buffer) {
/* a new rxflow, buffer it and warn caller */
lwsl_info("new rxflow input buffer len %d\n",
@ -1087,24 +1073,21 @@ int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi,
wsi->u.ws.rxflow_pos = 0;
memcpy(wsi->u.ws.rxflow_buffer,
buf + n, len - n);
} else {
lwsl_info("re-using rxflow input buffer\n");
} else
/* rxflow while we were spilling prev rxflow */
wsi->u.ws.rxflow_pos = n;
}
lwsl_info("stalling in existing rxflow buffer");
return 1;
}
m = libwebsocket_rx_sm(wsi, buf[n]);
/* account for what we're using in rxflow buffer */
if (wsi->u.ws.rxflow_buffer)
wsi->u.ws.rxflow_pos++;
/* process the byte */
m = libwebsocket_rx_sm(wsi, buf[n++]);
if (m < 0)
return -1;
n++;
}
if (clear_rxflow) {
lwsl_info("flow: clearing it\n");
free(wsi->u.ws.rxflow_buffer);
wsi->u.ws.rxflow_buffer = NULL;
context->fds[wsi->position_in_fds_table].events &= ~POLLIN;
}
return 0;

View file

@ -227,6 +227,11 @@ enum connection_mode {
LWS_CONNMODE_SERVER_LISTENER,
};
enum {
LWS_RXFLOW_ALLOW = (1 << 0),
LWS_RXFLOW_PENDING_CHANGE = (1 << 1),
};
struct libwebsocket_protocols;
struct libwebsocket;

View file

@ -122,7 +122,7 @@ callback_lws_mirror(struct libwebsocket_context *context,
switch (reason) {
case LWS_CALLBACK_CLOSED:
fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED mirror_lifetime=%d\n", mirror_lifetime);
wsi_mirror = NULL;
break;

View file

@ -431,7 +431,7 @@ callback_dumb_increment(struct libwebsocket_context *context,
/* lws-mirror_protocol */
#define MAX_MESSAGE_QUEUE 128
#define MAX_MESSAGE_QUEUE 32
struct per_session_data__lws_mirror {
struct libwebsocket *wsi;
@ -461,8 +461,7 @@ callback_lws_mirror(struct libwebsocket_context *context,
switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
lwsl_info("callback_lws_mirror: "
"LWS_CALLBACK_ESTABLISHED\n");
lwsl_info("callback_lws_mirror: LWS_CALLBACK_ESTABLISHED\n");
pss->ringbuffer_tail = ringbuffer_head;
pss->wsi = wsi;
break;
@ -488,9 +487,9 @@ callback_lws_mirror(struct libwebsocket_context *context,
lwsl_err("ERROR %d writing to mirror socket\n", n);
return -1;
}
if (n < ringbuffer[pss->ringbuffer_tail].len) {
lwsl_err("mirror partial write %d vs %d\n", n, ringbuffer[pss->ringbuffer_tail].len);
}
if (n < ringbuffer[pss->ringbuffer_tail].len)
lwsl_err("mirror partial write %d vs %d\n",
n, ringbuffer[pss->ringbuffer_tail].len);
if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
pss->ringbuffer_tail = 0;
@ -507,8 +506,13 @@ callback_lws_mirror(struct libwebsocket_context *context,
if (lws_send_pipe_choked(wsi)) {
libwebsocket_callback_on_writable(context, wsi);
return 0;
break;
}
/*
* for tests with chrome on same machine as client and
* server, this is needed to stop chrome choking
*/
usleep(1);
}
break;
@ -540,6 +544,7 @@ callback_lws_mirror(struct libwebsocket_context *context,
choke:
if (num_wsi_choked < sizeof wsi_choked / sizeof wsi_choked[0]) {
lwsl_debug("LWS_CALLBACK_RECEIVE: throttling %p\n", wsi);
libwebsocket_rx_flow_control(wsi, 0);
wsi_choked[num_wsi_choked++] = wsi;
}