pollout: handle request for pollout during pollout service

This commit is contained in:
Andy Green 2017-04-05 08:30:55 +08:00
parent 4ae029c3a1
commit 89212d6668
3 changed files with 93 additions and 25 deletions

View file

@ -33,6 +33,26 @@ _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
if (!wsi || wsi->position_in_fds_table < 0) if (!wsi || wsi->position_in_fds_table < 0)
return 0; return 0;
if (wsi->handling_pollout && !_and && _or == LWS_POLLOUT) {
/*
* Happening alongside service thread handling POLLOUT.
* The danger is when he is finished, he will disable POLLOUT,
* countermanding what we changed here.
*
* Instead of changing the fds, inform the service thread
* what happened, and ask it to leave POLLOUT active on exit
*/
wsi->leave_pollout_active = 1;
/*
* by definition service thread is not in poll wait, so no need
* to cancel service
*/
lwsl_debug("%s: using leave_pollout_active\n", __func__);
return 0;
}
context = wsi->context; context = wsi->context;
pt = &context->pt[(int)wsi->tsi]; pt = &context->pt[(int)wsi->tsi];
assert(wsi->position_in_fds_table >= 0 && assert(wsi->position_in_fds_table >= 0 &&

View file

@ -1502,7 +1502,7 @@ struct lws {
unsigned int sending_chunked:1; unsigned int sending_chunked:1;
unsigned int already_did_cce:1; unsigned int already_did_cce:1;
unsigned int told_user_closed:1; unsigned int told_user_closed:1;
unsigned int :1;
#if defined(LWS_WITH_ESP8266) #if defined(LWS_WITH_ESP8266)
unsigned int pending_send_completion:3; unsigned int pending_send_completion:3;
unsigned int close_is_pending_send_completion:1; unsigned int close_is_pending_send_completion:1;
@ -1532,6 +1532,10 @@ struct lws {
unsigned int redirect_to_https:1; unsigned int redirect_to_https:1;
#endif #endif
/* volatile to make sure code is aware other thread can change */
volatile unsigned int handling_pollout:1;
volatile unsigned int leave_pollout_active:1;
#ifndef LWS_NO_CLIENT #ifndef LWS_NO_CLIENT
unsigned short c_port; unsigned short c_port;
#endif #endif

View file

@ -64,6 +64,14 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
//lwsl_err("%s: %p\n", __func__, wsi); //lwsl_err("%s: %p\n", __func__, wsi);
wsi->leave_pollout_active = 0;
wsi->handling_pollout = 1;
/*
* if another thread wants POLLOUT on us, from here on while
* handling_pollout is set, he will only set leave_pollout_active.
* If we are going to disable POLLOUT, we will check that first.
*/
/* /*
* user callback is lowest priority to get these notifications * user callback is lowest priority to get these notifications
* actually, since other pending things cannot be disordered * actually, since other pending things cannot be disordered
@ -77,13 +85,13 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
if (lws_issue_raw(wsi, wsi->trunc_alloc + wsi->trunc_offset, if (lws_issue_raw(wsi, wsi->trunc_alloc + wsi->trunc_offset,
wsi->trunc_len) < 0) { wsi->trunc_len) < 0) {
lwsl_info("%s signalling to close\n", __func__); lwsl_info("%s signalling to close\n", __func__);
return -1; goto bail_die;
} }
/* leave POLLOUT active either way */ /* leave POLLOUT active either way */
return 0; goto bail_ok;
} else } else
if (wsi->state == LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE) if (wsi->state == LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE)
return -1; /* retry closing now */ goto bail_die; /* retry closing now */
if (wsi->mode == LWSCM_WSCL_ISSUE_HTTP_BODY) if (wsi->mode == LWSCM_WSCL_ISSUE_HTTP_BODY)
goto user_service; goto user_service;
@ -105,7 +113,7 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
wsi->pps = LWS_PPS_NONE; wsi->pps = LWS_PPS_NONE;
lws_rx_flow_control(wsi, 1); lws_rx_flow_control(wsi, 1);
return 0; /* leave POLLOUT active */ goto bail_ok; /* leave POLLOUT active */
} }
#endif #endif
@ -134,16 +142,16 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
n = lws_write(wsi, &wsi->u.ws.ping_payload_buf[LWS_PRE], n = lws_write(wsi, &wsi->u.ws.ping_payload_buf[LWS_PRE],
wsi->u.ws.ping_payload_len, write_type); wsi->u.ws.ping_payload_len, write_type);
if (n < 0) if (n < 0)
return -1; goto bail_die;
/* well he is sent, mark him done */ /* well he is sent, mark him done */
wsi->u.ws.ping_pending_flag = 0; wsi->u.ws.ping_pending_flag = 0;
if (wsi->u.ws.payload_is_close) if (wsi->u.ws.payload_is_close)
/* oh... a close frame was it... then we are done */ /* oh... a close frame was it... then we are done */
return -1; goto bail_die;
/* otherwise for PING, leave POLLOUT active either way */ /* otherwise for PING, leave POLLOUT active either way */
return 0; goto bail_ok;
} }
if (wsi->state == LWSS_ESTABLISHED && if (wsi->state == LWSS_ESTABLISHED &&
@ -155,7 +163,7 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
n = lws_write(wsi, &wsi->u.ws.ping_payload_buf[LWS_PRE], n = lws_write(wsi, &wsi->u.ws.ping_payload_buf[LWS_PRE],
0, LWS_WRITE_PING); 0, LWS_WRITE_PING);
if (n < 0) if (n < 0)
return -1; goto bail_die;
/* /*
* we apparently were able to send the PING in a reasonable time * we apparently were able to send the PING in a reasonable time
@ -166,7 +174,7 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
lws_set_timeout(wsi, PENDING_TIMEOUT_WS_PONG_CHECK_GET_PONG, lws_set_timeout(wsi, PENDING_TIMEOUT_WS_PONG_CHECK_GET_PONG,
wsi->context->timeout_secs); wsi->context->timeout_secs);
return 0; goto bail_ok;
} }
/* Priority 4: if we are closing, not allowed to send more data frags /* Priority 4: if we are closing, not allowed to send more data frags
@ -185,16 +193,16 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
if (wsi->state == LWSS_ESTABLISHED && wsi->u.ws.tx_draining_ext) { if (wsi->state == LWSS_ESTABLISHED && wsi->u.ws.tx_draining_ext) {
lwsl_ext("SERVICING TX EXT DRAINING\n"); lwsl_ext("SERVICING TX EXT DRAINING\n");
if (lws_write(wsi, NULL, 0, LWS_WRITE_CONTINUATION) < 0) if (lws_write(wsi, NULL, 0, LWS_WRITE_CONTINUATION) < 0)
return -1; goto bail_die;
/* leave POLLOUT active */ /* leave POLLOUT active */
return 0; goto bail_ok;
} }
/* Priority 6: user can get the callback /* Priority 6: user can get the callback
*/ */
m = lws_ext_cb_active(wsi, LWS_EXT_CB_IS_WRITEABLE, NULL, 0); m = lws_ext_cb_active(wsi, LWS_EXT_CB_IS_WRITEABLE, NULL, 0);
if (m) if (m)
return -1; goto bail_die;
#ifndef LWS_NO_EXTENSIONS #ifndef LWS_NO_EXTENSIONS
if (!wsi->extension_data_pending) if (!wsi->extension_data_pending)
goto user_service; goto user_service;
@ -225,7 +233,7 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
&eff_buf, 0); &eff_buf, 0);
if (m < 0) { if (m < 0) {
lwsl_err("ext reports fatal error\n"); lwsl_err("ext reports fatal error\n");
return -1; goto bail_die;
} }
if (m) if (m)
/* /*
@ -241,7 +249,7 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
eff_buf.token_len); eff_buf.token_len);
if (n < 0) { if (n < 0) {
lwsl_info("closing from POLLOUT spill\n"); lwsl_info("closing from POLLOUT spill\n");
return -1; goto bail_die;
} }
/* /*
* Keep amount spilled small to minimize chance of this * Keep amount spilled small to minimize chance of this
@ -249,7 +257,7 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
if (n != eff_buf.token_len) { if (n != eff_buf.token_len) {
lwsl_err("Unable to spill ext %d vs %d\n", lwsl_err("Unable to spill ext %d vs %d\n",
eff_buf.token_len, n); eff_buf.token_len, n);
return -1; goto bail_die;
} }
} else } else
continue; continue;
@ -277,7 +285,7 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
* when we come back here and there's nothing more to spill. * when we come back here and there's nothing more to spill.
*/ */
return 0; goto bail_ok;
} }
#ifndef LWS_NO_EXTENSIONS #ifndef LWS_NO_EXTENSIONS
wsi->extension_data_pending = 0; wsi->extension_data_pending = 0;
@ -285,15 +293,31 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
user_service: user_service:
/* one shot */ /* one shot */
if (pollfd) if (pollfd) {
int eff = wsi->leave_pollout_active;
if (!eff)
if (lws_change_pollfd(wsi, LWS_POLLOUT, 0)) { if (lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
lwsl_info("failed at set pollfd\n"); lwsl_info("failed at set pollfd\n");
return 1; goto bail_die;
}
wsi->handling_pollout = 0;
/* cannot get leave_pollout_active set after the above */
if (!eff && wsi->leave_pollout_active)
/* got set inbetween sampling eff and clearing
* handling_pollout, force POLLOUT on */
lws_calllback_as_writeable(wsi);
wsi->leave_pollout_active = 0;
} }
if (wsi->mode != LWSCM_WSCL_ISSUE_HTTP_BODY && if (wsi->mode != LWSCM_WSCL_ISSUE_HTTP_BODY &&
!wsi->hdr_parsing_completed) !wsi->hdr_parsing_completed)
return 0; goto bail_ok;
#ifdef LWS_WITH_CGI #ifdef LWS_WITH_CGI
user_service_go_again: user_service_go_again:
@ -321,7 +345,7 @@ user_service_go_again:
wsi->u.http2.requested_POLLOUT = 0; wsi->u.http2.requested_POLLOUT = 0;
if (!wsi->u.http2.initialized) { if (!wsi->u.http2.initialized) {
lwsl_info("pollout on uninitialized http2 conn\n"); lwsl_info("pollout on uninitialized http2 conn\n");
return 0; goto bail_ok;
} }
lwsl_info("%s: doing children\n", __func__); lwsl_info("%s: doing children\n", __func__);
@ -344,10 +368,30 @@ user_service_go_again:
lwsl_info("%s: completed\n", __func__); lwsl_info("%s: completed\n", __func__);
return 0; goto bail_ok;
notify: notify:
#endif #endif
wsi->handling_pollout = 0;
wsi->leave_pollout_active = 0;
return lws_calllback_as_writeable(wsi); return lws_calllback_as_writeable(wsi);
/*
* since these don't disable the POLLOUT, they are always doing the
* right thing for leave_pollout_active whether it was set or not.
*/
bail_ok:
wsi->handling_pollout = 0;
wsi->leave_pollout_active = 0;
return 0;
bail_die:
wsi->handling_pollout = 0;
wsi->leave_pollout_active = 0;
return -1;
} }
int int