1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

callback each active extension on packet tx pre send

Signed-off-by: Andy Green <andy@warmcat.com>
This commit is contained in:
Andy Green 2011-03-06 13:14:42 +00:00
parent 98a717c7ed
commit 3b84c006c9
4 changed files with 215 additions and 34 deletions

View file

@ -413,6 +413,111 @@ int lws_send_pipe_choked(struct libwebsocket *wsi)
return 0;
}
static int
lws_handle_POLLOUT_event(struct libwebsocket_context *context,
struct libwebsocket *wsi, struct pollfd *pollfd)
{
struct lws_tokens eff_buf;
int n;
int ret;
int m;
if (!wsi->extension_data_pending)
goto user_service;
/*
* check in on the active extensions, see if they
* had pending stuff to spill... they need to get the
* first look-in otherwise sequence will be disordered
*
* NULL, zero-length eff_buf means just spill pending
*/
ret = 1;
while (ret == 1) {
/* default to nobody has more to spill */
ret = 0;
eff_buf.token = NULL;
eff_buf.token_len = 0;
/* give every extension a chance to spill */
for (n = 0; n < wsi->count_active_extensions; n++) {
m = wsi->active_extensions[n]->callback(
wsi->protocol->owning_server, wsi,
LWS_EXT_CALLBACK_PACKET_TX_PRESEND,
wsi->active_extensions_user[n], &eff_buf, 0);
if (m < 0) {
fprintf(stderr, "extension reports fatal error\n");
return -1;
}
if (m)
/*
* at least one extension told us he has more
* to spill, so we will go around again after
*/
ret = 1;
}
/* assuming they gave us something to send, send it */
if (eff_buf.token_len) {
if (lws_issue_raw(wsi, (unsigned char *)eff_buf.token,
eff_buf.token_len))
return -1;
} else
continue;
/* no extension has more to spill */
if (!ret)
continue;
/*
* There's more to spill from an extension, but we just sent
* something... did that leave the pipe choked?
*/
if (!lws_send_pipe_choked(wsi))
/* no we could add more */
continue;
fprintf(stderr, "choked in POLLOUT service\n");
/*
* Yes, he's choked. Leave the POLLOUT masked on so we will
* come back here when he is unchoked. Don't call the user
* callback to enforce ordering of spilling, he'll get called
* when we come back here and there's nothing more to spill.
*/
return 0;
}
wsi->extension_data_pending = 0;
user_service:
/* one shot */
pollfd->events &= ~POLLOUT;
/* external POLL support via protocol 0 */
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_CLEAR_MODE_POLL_FD,
(void *)(long)wsi->sock, NULL, POLLOUT);
wsi->protocol->callback(context, wsi,
LWS_CALLBACK_CLIENT_WRITEABLE,
wsi->user_space,
NULL, 0);
return 0;
}
/**
* libwebsocket_service_fd() - Service polled socket with something waiting
* @context: Websocket context
@ -428,7 +533,7 @@ int
libwebsocket_service_fd(struct libwebsocket_context *context,
struct pollfd *pollfd)
{
unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + MAX_BROADCAST_PAYLOAD +
unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 1 + MAX_BROADCAST_PAYLOAD +
LWS_SEND_BUFFER_POST_PADDING];
struct libwebsocket *wsi;
struct libwebsocket *new_wsi;
@ -712,24 +817,17 @@ libwebsocket_service_fd(struct libwebsocket_context *context,
return 1;
}
/* the guy requested a callback when it was OK to write */
/*
* either extension code with stuff to spill, or the user code,
* requested a callback when it was OK to write
*/
if (pollfd->revents & POLLOUT) {
/* one shot */
pollfd->events &= ~POLLOUT;
/* external POLL support via protocol 0 */
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_CLEAR_MODE_POLL_FD,
(void *)(long)wsi->sock, NULL, POLLOUT);
wsi->protocol->callback(context, wsi,
LWS_CALLBACK_CLIENT_WRITEABLE,
wsi->user_space,
NULL, 0);
}
if (pollfd->revents & POLLOUT)
if (lws_handle_POLLOUT_event(context, wsi, pollfd) < 0) {
libwebsocket_close_and_free_session(context, wsi,
LWS_CLOSE_STATUS_NORMAL);
return 1;
}
/* any incoming data ready? */
@ -1397,20 +1495,13 @@ bail2:
/* the guy requested a callback when it was OK to write */
if (pollfd->revents & POLLOUT) {
if (pollfd->revents & POLLOUT)
if (lws_handle_POLLOUT_event(context, wsi, pollfd) < 0) {
libwebsocket_close_and_free_session(context, wsi,
LWS_CLOSE_STATUS_NORMAL);
return 1;
}
pollfd->events &= ~POLLOUT;
/* external POLL support via protocol 0 */
context->protocols[0].callback(context, wsi,
LWS_CALLBACK_CLEAR_MODE_POLL_FD,
(void *)(long)wsi->sock, NULL, POLLOUT);
wsi->protocol->callback(context, wsi,
LWS_CALLBACK_CLIENT_WRITEABLE,
wsi->user_space,
NULL, 0);
}
/* any incoming data ready? */

View file

@ -75,6 +75,7 @@ enum libwebsocket_extension_callback_reasons {
LWS_EXT_CALLBACK_CONSTRUCT,
LWS_EXT_CALLBACK_DESTROY,
LWS_EXT_CALLBACK_PACKET_RX_PREPARSE,
LWS_EXT_CALLBACK_PACKET_TX_PRESEND,
};
enum libwebsocket_write_protocol {

View file

@ -1109,9 +1109,12 @@ int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
size_t len, enum libwebsocket_write_protocol protocol)
{
int n;
int m;
int pre = 0;
int post = 0;
unsigned int shift = 7;
struct lws_tokens eff_buf;
int ret;
if (len == 0 && protocol != LWS_WRITE_CLOSE) {
fprintf(stderr, "zero length libwebsocket_write attempt\n");
@ -1326,10 +1329,95 @@ int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf,
send_raw:
if (lws_issue_raw(wsi, buf - pre, len + pre + post))
return -1;
if (protocol == LWS_WRITE_HTTP) {
if (lws_issue_raw(wsi, (unsigned char *)buf - pre,
len + pre + post))
return -1;
debug("written %d bytes to client\n", (int)len);
return 0;
}
/*
* give any active extensions a chance to munge the buffer
* before send. We pass in a pointer to an lws_tokens struct
* prepared with the default buffer and content length that's in
* there. Rather than rewrite the default buffer, extensions
* that expect to grow the buffer can adapt .token to
* point to their own per-connection buffer in the extension
* user allocation. By default with no extensions or no
* extension callback handling, just the normal input buffer is
* used then so it is efficient.
*
* callback returns 1 in case it wants to spill more buffers
*/
eff_buf.token = (char *)buf - pre;
eff_buf.token_len = len + pre + post;
/*
* while we have original buf to spill ourselves, or extensions report
* more in their pipeline
*/
ret = 1;
while (ret == 1) {
/* default to nobody has more to spill */
ret = 0;
/* show every extension the new incoming data */
for (n = 0; n < wsi->count_active_extensions; n++) {
m = wsi->active_extensions[n]->callback(
wsi->protocol->owning_server, wsi,
LWS_EXT_CALLBACK_PACKET_TX_PRESEND,
wsi->active_extensions_user[n], &eff_buf, 0);
if (m < 0) {
fprintf(stderr, "Extension reports fatal error\n");
return -1;
}
if (m)
/*
* at least one extension told us he has more
* to spill, so we will go around again after
*/
ret = 1;
}
/* assuming they left us something to send, send it */
if (eff_buf.token_len)
if (lws_issue_raw(wsi, (unsigned char *)eff_buf.token,
eff_buf.token_len))
return -1;
/* we used up what we had */
eff_buf.token = NULL;
eff_buf.token_len = 0;
/*
* Did that leave the pipe choked?
*/
if (!lws_send_pipe_choked(wsi))
/* no we could add more */
continue;
fprintf(stderr, "choked\n");
/*
* Yes, he's choked. Don't spill the rest now get a callback
* when he is ready to send and take care of it there
*/
libwebsocket_callback_on_writable(
wsi->protocol->owning_server, wsi);
wsi->extension_data_pending = 1;
ret = 0;
}
debug("written %d bytes to client\n", eff_buf.token_len);
return 0;
}

View file

@ -248,6 +248,7 @@ struct libwebsocket {
int sock;
enum lws_rx_parse_state lws_rx_parse_state;
char extension_data_pending;
/* 04 protocol specific */