multithread stability

Signed-off-by: Andy Green <andy.green@linaro.org>
This commit is contained in:
Andy Green 2016-01-26 20:56:56 +08:00
parent dcbe30a28e
commit 8c1f6026a7
21 changed files with 761 additions and 502 deletions

View file

@ -85,10 +85,16 @@ set(LWS_WITHOUT_CLIENT ON)
set(LWS_WITHOUT_TESTAPPS ON)
set(LWS_WITHOUT_EXTENSIONS ON)
set(LWS_MBED3 ON)
# this implies no pthreads in the lib
set(LWS_MAX_SMP 1)
endif()
if (WIN32)
# this implies no pthreads in the lib
set(LWS_MAX_SMP 1)
endif()
# Allow the user to override installation directories.
set(LWS_INSTALL_LIB_DIR lib CACHE PATH "Installation directory for libraries")

View file

@ -80,7 +80,7 @@ lws_create_context(struct lws_context_creation_info *info)
int pid_daemon = get_daemonize_pid();
#endif
char *p;
int n;
int n, m;
lwsl_notice("Initial logging level %d\n", log_level);
@ -96,7 +96,7 @@ lws_create_context(struct lws_context_creation_info *info)
#endif
lws_feature_status_libev(info);
#endif
lwsl_info(" LWS_MAX_HEADER_LEN : %u\n", LWS_MAX_HEADER_LEN);
lwsl_info(" LWS_DEF_HEADER_LEN : %u\n", LWS_DEF_HEADER_LEN);
lwsl_info(" LWS_MAX_PROTOCOLS : %u\n", LWS_MAX_PROTOCOLS);
lwsl_info(" LWS_MAX_SMP : %u\n", LWS_MAX_SMP);
lwsl_info(" SPEC_LATEST_SUPPORTED : %u\n", SPEC_LATEST_SUPPORTED);
@ -129,7 +129,6 @@ lws_create_context(struct lws_context_creation_info *info)
if (context->count_threads > LWS_MAX_SMP)
context->count_threads = LWS_MAX_SMP;
context->lserv_seen = 0;
context->protocols = info->protocols;
context->token_limits = info->token_limits;
context->listen_port = info->port;
@ -141,8 +140,18 @@ lws_create_context(struct lws_context_creation_info *info)
context->ka_interval = info->ka_interval;
context->ka_probes = info->ka_probes;
/* we zalloc only the used ones, so the memory is not wasted
* allocating for unused threads
if (info->max_http_header_data)
context->max_http_header_data = info->max_http_header_data;
else
context->max_http_header_data = LWS_DEF_HEADER_LEN;
if (info->max_http_header_pool)
context->max_http_header_pool = info->max_http_header_pool;
else
context->max_http_header_pool = LWS_DEF_HEADER_POOL;
/*
* Allocate the per-thread storage for scratchpad buffers,
* and header data pool
*/
for (n = 0; n < context->count_threads; n++) {
context->pt[n].serv_buf = lws_zalloc(LWS_MAX_SOCKET_IO_BUF);
@ -150,6 +159,22 @@ lws_create_context(struct lws_context_creation_info *info)
lwsl_err("OOM\n");
return NULL;
}
context->pt[n].http_header_data = lws_malloc(context->max_http_header_data *
context->max_http_header_pool);
if (!context->pt[n].http_header_data)
goto bail;
context->pt[n].ah_pool = lws_zalloc(sizeof(struct allocated_headers) *
context->max_http_header_pool);
for (m = 0; m < context->max_http_header_pool; m++)
context->pt[n].ah_pool[m].data =
(char *)context->pt[n].http_header_data +
(m * context->max_http_header_data);
if (!context->pt[n].ah_pool)
goto bail;
lws_pt_mutex_init(&context->pt[n]);
}
if (info->fd_limit_per_thread)
@ -180,44 +205,18 @@ lws_create_context(struct lws_context_creation_info *info)
context->lws_ev_sigint_cb = &lws_sigint_cb;
#endif /* LWS_USE_LIBEV */
lwsl_info(" mem: context: %5u bytes (%d + (%d x %d))\n",
lwsl_info(" mem: context: %5u bytes (%d ctx + (%d thr x %d))\n",
sizeof(struct lws_context) +
(context->count_threads * LWS_MAX_SOCKET_IO_BUF),
sizeof(struct lws_context),
context->count_threads,
LWS_MAX_SOCKET_IO_BUF);
/*
* allocate and initialize the pool of
* allocated_header structs + data
*/
if (info->max_http_header_data)
context->max_http_header_data = info->max_http_header_data;
else
context->max_http_header_data = LWS_MAX_HEADER_LEN;
if (info->max_http_header_pool)
context->max_http_header_pool = info->max_http_header_pool;
else
context->max_http_header_pool = LWS_MAX_HEADER_POOL;
context->http_header_data = lws_malloc(context->max_http_header_data *
context->max_http_header_pool);
if (!context->http_header_data)
goto bail;
context->ah_pool = lws_zalloc(sizeof(struct allocated_headers) *
context->max_http_header_pool);
if (!context->ah_pool)
goto bail;
for (n = 0; n < context->max_http_header_pool; n++)
context->ah_pool[n].data = (char *)context->http_header_data +
(n * context->max_http_header_data);
/* this is per context */
lwsl_info(" mem: http hdr rsvd: %5u bytes ((%u + %u) x %u)\n",
lwsl_info(" mem: http hdr rsvd: %5u bytes (%u thr x (%u + %u) x %u))\n",
(context->max_http_header_data +
sizeof(struct allocated_headers)) *
context->max_http_header_pool,
context->max_http_header_pool * context->count_threads,
context->count_threads,
context->max_http_header_data,
sizeof(struct allocated_headers),
context->max_http_header_pool);
@ -229,10 +228,13 @@ lws_create_context(struct lws_context_creation_info *info)
goto bail;
}
lwsl_info(" mem: pollfd map: %5u\n", n);
#if LWS_MAX_SMP > 1
/* each thread serves his own chunk of fds */
for (n = 1; n < (int)info->count_threads; n++)
context->pt[n].fds = context->pt[0].fds +
(n * context->fd_limit_per_thread);
context->pt[n].fds = context->pt[n - 1].fds +
context->fd_limit_per_thread;
#endif
if (lws_plat_init(context, info))
goto bail;
@ -345,7 +347,7 @@ lws_context_destroy(struct lws_context *context)
#endif
while (m--)
for (n = 0; n < context->pt[m].fds_count; n++) {
for (n = 0; (unsigned int)n < context->pt[m].fds_count; n++) {
struct lws *wsi = wsi_from_fd(context, context->pt[m].fds[n].fd);
if (!wsi)
continue;
@ -384,17 +386,18 @@ lws_context_destroy(struct lws_context *context)
ev_signal_stop(context->io_loop, &context->w_sigint.watcher);
#endif /* LWS_USE_LIBEV */
for (n = 0; n < context->count_threads; n++)
for (n = 0; n < context->count_threads; n++) {
lws_free_set_NULL(context->pt[n].serv_buf);
if (context->pt[n].ah_pool)
lws_free(context->pt[n].ah_pool);
if (context->pt[n].http_header_data)
lws_free(context->pt[n].http_header_data);
}
lws_plat_context_early_destroy(context);
lws_ssl_context_destroy(context);
if (context->pt[0].fds)
lws_free(context->pt[0].fds);
if (context->ah_pool)
lws_free(context->ah_pool);
if (context->http_header_data)
lws_free(context->http_header_data);
lws_plat_context_late_destroy(context);

View file

@ -100,6 +100,7 @@ http_new:
wsi->u.hdr.lextable_pos = 0;
/* fallthru */
case LWSS_HTTP_HEADERS:
assert(wsi->u.hdr.ah);
lwsl_parser("issuing %d bytes to parser\n", (int)len);
if (lws_handshake_client(wsi, &buf, len))
@ -182,6 +183,7 @@ postbody_completion:
case LWSS_ESTABLISHED:
case LWSS_AWAITING_CLOSE_ACK:
case LWSS_SHUTDOWN:
if (lws_handshake_client(wsi, &buf, len))
goto bail;
switch (wsi->mode) {

View file

@ -81,6 +81,7 @@ lws_initloop(struct lws_context *context, struct ev_loop *loop)
const char * backend_name;
int status = 0;
int backend;
int m = 0; /* !!! TODO add pt support */
if (!loop)
loop = ev_default_loop(0);
@ -91,7 +92,7 @@ lws_initloop(struct lws_context *context, struct ev_loop *loop)
* Initialize the accept w_accept with the listening socket
* and register a callback for read operations
*/
ev_io_init(w_accept, lws_accept_cb, context->lserv_fd, EV_READ);
ev_io_init(w_accept, lws_accept_cb, context->pt[m].lserv_fd, EV_READ);
ev_io_start(context->io_loop,w_accept);
/* Register the signal watcher unless the user says not to */

View file

@ -1,7 +1,7 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010-2014 Andy Green <andy@warmcat.com>
* Copyright (C) 2010-2016 Andy Green <andy@warmcat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@ -63,33 +63,72 @@ lws_free_wsi(struct lws *wsi)
*/
if (wsi->mode != LWSCM_WS_CLIENT &&
wsi->mode != LWSCM_WS_SERVING)
if (wsi->u.hdr.ah)
lws_free_header_table(wsi);
lws_free_header_table(wsi);
lws_free(wsi);
}
static void
lws_remove_from_timeout_list(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
if (!wsi->timeout_list_prev)
return;
lws_pt_lock(pt);
if (wsi->timeout_list)
wsi->timeout_list->timeout_list_prev = wsi->timeout_list_prev;
*wsi->timeout_list_prev = wsi->timeout_list;
wsi->timeout_list_prev = NULL;
wsi->timeout_list = NULL;
lws_pt_unlock(pt);
}
/**
* lws_set_timeout() - marks the wsi as subject to a timeout
*
* You will not need this unless you are doing something special
*
* @wsi: Websocket connection instance
* @reason: timeout reason
* @secs: how many seconds
*/
LWS_VISIBLE void
lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
time_t now;
lws_pt_lock(pt);
time(&now);
if (!wsi->pending_timeout && reason) {
wsi->timeout_list = pt->timeout_list;
if (wsi->timeout_list)
wsi->timeout_list->timeout_list_prev = &wsi->timeout_list;
wsi->timeout_list_prev = &pt->timeout_list;
*wsi->timeout_list_prev = wsi;
}
wsi->pending_timeout_limit = now + secs;
wsi->pending_timeout = reason;
lws_pt_unlock(pt);
if (!reason)
lws_remove_from_timeout_list(wsi);
}
void
lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
{
struct lws_context *context;
struct lws_context_per_thread *pt;
int n, m, ret, old_state;
int n, m, ret;
struct lws_tokens eff_buf;
if (!wsi)
@ -97,7 +136,6 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
context = wsi->context;
pt = &context->pt[(int)wsi->tsi];
old_state = wsi->state;
if (wsi->mode == LWSCM_HTTP_SERVING_ACCEPTED &&
wsi->u.http.fd != LWS_INVALID_FILE) {
@ -108,10 +146,13 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
wsi->user_space, NULL, 0);
}
if (wsi->socket_is_permanently_unusable ||
reason == LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY)
reason == LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY ||
wsi->state == LWSS_SHUTDOWN)
goto just_kill_connection;
switch (old_state) {
wsi->state_pre_close = wsi->state;
switch (wsi->state_pre_close) {
case LWSS_DEAD_SOCKET:
return;
@ -203,7 +244,7 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
* LWSS_AWAITING_CLOSE_ACK and will skip doing this a second time.
*/
if (old_state == LWSS_ESTABLISHED &&
if (wsi->state_pre_close == LWSS_ESTABLISHED &&
(wsi->u.ws.close_in_ping_buffer_len || /* already a reason */
(reason != LWS_CLOSE_STATUS_NOSTATUS &&
(reason != LWS_CLOSE_STATUS_NOSTATUS_CONTEXT_DESTROY)))) {
@ -218,8 +259,7 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
reason & 0xff;
}
n = lws_write(wsi, &wsi->u.ws.ping_payload_buf[
LWS_PRE],
n = lws_write(wsi, &wsi->u.ws.ping_payload_buf[LWS_PRE],
wsi->u.ws.close_in_ping_buffer_len,
LWS_WRITE_CLOSE);
if (n >= 0) {
@ -246,7 +286,28 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason)
just_kill_connection:
lwsl_debug("close: just_kill_connection: %p\n", wsi);
#if LWS_POSIX
/*
* Testing with ab shows that we have to stage the socket close when
* the system is under stress... shutdown any further TX, change the
* state to one that won't emit anything more, and wait with a timeout
* for the POLLIN to show a zero-size rx before coming back and doing
* the actual close.
*/
if (wsi->state != LWSS_SHUTDOWN) {
lwsl_info("%s: shutting down connection: %p\n", __func__, wsi);
n = shutdown(wsi->sock, SHUT_WR);
if (n)
lwsl_debug("closing: shutdown ret %d\n", LWS_ERRNO);
wsi->state = LWSS_SHUTDOWN;
lws_change_pollfd(wsi, LWS_POLLOUT, LWS_POLLIN);
lws_set_timeout(wsi, PENDING_TIMEOUT_SHUTDOWN_FLUSH,
AWAITING_TIMEOUT);
return;
}
#endif
lwsl_info("%s: real just_kill_connection: %p\n", __func__, wsi);
/*
* we won't be servicing or receiving anything further from this guy
@ -262,7 +323,7 @@ just_kill_connection:
lws_free_set_NULL(wsi->rxflow_buffer);
if (old_state == LWSS_ESTABLISHED ||
if (wsi->state_pre_close == LWSS_ESTABLISHED ||
wsi->mode == LWSCM_WS_SERVING ||
wsi->mode == LWSCM_WS_CLIENT) {
@ -308,10 +369,10 @@ just_kill_connection:
/* tell the user it's all over for this guy */
if (wsi->protocol && wsi->protocol->callback &&
((old_state == LWSS_ESTABLISHED) ||
(old_state == LWSS_RETURNED_CLOSE_ALREADY) ||
(old_state == LWSS_AWAITING_CLOSE_ACK) ||
(old_state == LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE))) {
((wsi->state_pre_close == LWSS_ESTABLISHED) ||
(wsi->state_pre_close == LWSS_RETURNED_CLOSE_ALREADY) ||
(wsi->state_pre_close == LWSS_AWAITING_CLOSE_ACK) ||
(wsi->state_pre_close == LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE))) {
lwsl_debug("calling back CLOSED\n");
wsi->protocol->callback(wsi, LWS_CALLBACK_CLOSED,
wsi->user_space, NULL, 0);
@ -327,7 +388,7 @@ just_kill_connection:
wsi->user_space, NULL, 0);
} else
lwsl_debug("not calling back closed mode=%d state=%d\n",
wsi->mode, old_state);
wsi->mode, wsi->state_pre_close);
/* deallocate any active extension contexts */
@ -341,12 +402,10 @@ just_kill_connection:
LWS_EXT_CB_DESTROY_ANY_WSI_CLOSING, NULL, 0) < 0)
lwsl_warn("ext destroy wsi failed\n");
wsi->socket_is_permanently_unusable = 1;
if (!lws_ssl_close(wsi) && lws_socket_is_valid(wsi->sock)) {
#if LWS_POSIX
n = shutdown(wsi->sock, SHUT_RDWR);
if (n)
lwsl_debug("closing: shutdown ret %d\n", LWS_ERRNO);
n = compatible_close(wsi->sock);
if (n)
lwsl_debug("closing: close ret %d\n", LWS_ERRNO);
@ -555,7 +614,7 @@ lws_callback_all_protocol(struct lws_context *context,
{
struct lws_context_per_thread *pt = &context->pt[0];
struct lws *wsi;
int n, m = context->count_threads;
unsigned int n, m = context->count_threads;
while (m--) {
for (n = 0; n < pt->fds_count; n++) {
@ -571,39 +630,6 @@ lws_callback_all_protocol(struct lws_context *context,
return 0;
}
/**
* lws_set_timeout() - marks the wsi as subject to a timeout
*
* You will not need this unless you are doing something special
*
* @wsi: Websocket connection instance
* @reason: timeout reason
* @secs: how many seconds
*/
LWS_VISIBLE void
lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs)
{
time_t now;
time(&now);
if (!wsi->pending_timeout) {
wsi->timeout_list = wsi->context->timeout_list;
if (wsi->timeout_list)
wsi->timeout_list->timeout_list_prev = &wsi->timeout_list;
wsi->timeout_list_prev = &wsi->context->timeout_list;
*wsi->timeout_list_prev = wsi;
}
wsi->pending_timeout_limit = now + secs;
wsi->pending_timeout = reason;
if (!reason)
lws_remove_from_timeout_list(wsi);
}
#if LWS_POSIX
/**
@ -704,7 +730,7 @@ lws_rx_flow_allow_all_protocol(const struct lws_context *context,
{
const struct lws_context_per_thread *pt = &context->pt[0];
struct lws *wsi;
int n, m = context->count_threads;
unsigned int n, m = context->count_threads;
while (m--) {
for (n = 0; n < pt->fds_count; n++) {

View file

@ -1497,6 +1497,7 @@ enum pending_timeout {
PENDING_TIMEOUT_HTTP_CONTENT = 10,
PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND = 11,
PENDING_FLUSH_STORED_SEND_BEFORE_CLOSE = 12,
PENDING_TIMEOUT_SHUTDOWN_FLUSH = 13,
/****** add new things just above ---^ ******/
};

View file

@ -122,7 +122,7 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
{
struct lws_context_per_thread *pt = &context->pt[tsi];
struct lws *wsi;
int n, m;
int n, m, c;
char buf;
#ifdef LWS_OPENSSL_SUPPORT
struct lws *wsi_next;
@ -215,10 +215,13 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
/* any socket with events to service? */
for (n = 0; n < pt->fds_count; n++) {
c = n;
for (n = 0; n < pt->fds_count && c; n++) {
if (!pt->fds[n].revents)
continue;
c--;
if (pt->fds[n].fd == pt->dummy_pipe_fds[0]) {
if (read(pt->fds[n].fd, &buf, 1) != 1)
lwsl_err("Cannot read from dummy pipe.");
@ -457,6 +460,8 @@ LWS_VISIBLE void
lws_plat_delete_socket_from_fds(struct lws_context *context,
struct lws *wsi, int m)
{
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
pt->fds_count--;
}
LWS_VISIBLE void

View file

@ -152,7 +152,7 @@ LWS_VISIBLE int
lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
{
int n;
int i;
unsigned int i;
DWORD ev;
WSANETWORKEVENTS networkevents;
struct lws_pollfd *pfd;
@ -177,7 +177,7 @@ lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
for (i = 0; i < pt->fds_count; ++i) {
pfd = &pt->fds[i];
if (pfd->fd == context->lserv_fd)
if (pfd->fd == pt->lserv_fd)
continue;
if (pfd->events & LWS_POLLOUT) {
@ -372,7 +372,7 @@ lws_plat_delete_socket_from_fds(struct lws_context *context,
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
WSACloseEvent(pt->events[m + 1]);
pt->events[m + 1] = pt->events[pt->fds_count + 1];
pt->events[m + 1] = pt->events[pt->fds_count--];
}
LWS_VISIBLE void

View file

@ -30,7 +30,7 @@ lws_0405_frame_mask_generate(struct lws *wsi)
wsi->u.ws.mask[2] = 0;
wsi->u.ws.mask[3] = 0;
#else
int n;
int n;
/* fetch the per-frame nonce */
n = lws_get_random(lws_get_context(wsi), wsi->u.ws.mask, 4);
@ -643,6 +643,7 @@ lws_ssl_capable_write_no_ssl(struct lws *wsi, unsigned char *buf, int len)
#if LWS_POSIX
n = send(wsi->sock, (char *)buf, len, MSG_NOSIGNAL);
lwsl_info("%s: sent len %d result %d", __func__, len, n);
if (n >= 0)
return n;
@ -662,7 +663,7 @@ lws_ssl_capable_write_no_ssl(struct lws *wsi, unsigned char *buf, int len)
// !!!
#endif
lwsl_debug("ERROR writing len %d to skt %d\n", len, n);
lwsl_debug("ERROR writing len %d to skt fd %d err %d / errno %d\n", len, wsi->sock, n, LWS_ERRNO);
return LWS_SSL_CAPABLE_ERROR;
}
#endif

View file

@ -27,8 +27,8 @@ unsigned char lextable[] = {
#define FAIL_CHAR 0x08
int
LWS_WARN_UNUSED_RESULT lextable_decode(int pos, char c)
int LWS_WARN_UNUSED_RESULT
lextable_decode(int pos, char c)
{
if (c >= 'A' && c <= 'Z')
c += 'a' - 'A';
@ -60,82 +60,173 @@ LWS_WARN_UNUSED_RESULT lextable_decode(int pos, char c)
}
}
static void
lws_reset_header_table(struct lws *wsi)
{
/* init the ah to reflect no headers or data have appeared yet */
memset(wsi->u.hdr.ah->frag_index, 0, sizeof(wsi->u.hdr.ah->frag_index));
wsi->u.hdr.ah->nfrag = 0;
wsi->u.hdr.ah->pos = 0;
}
int LWS_WARN_UNUSED_RESULT
lws_allocate_header_table(struct lws *wsi)
{
struct lws_context *context = wsi->context;
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
struct lws_pollargs pa;
struct lws **pwsi;
int n;
lwsl_debug("%s: wsi %p: ah %p\n", __func__, (void *)wsi,
(void *)wsi->u.hdr.ah);
lwsl_info("%s: wsi %p: ah %p (tsi %d)\n", __func__, (void *)wsi,
(void *)wsi->u.hdr.ah, wsi->tsi);
/* if we are already bound to one, just clear it down */
if (wsi->u.hdr.ah)
if (wsi->u.hdr.ah) {
lwsl_err("cleardown\n");
goto reset;
}
lws_pt_lock(pt);
pwsi = &pt->ah_wait_list;
while (*pwsi) {
if (*pwsi == wsi) {
/* if already waiting on list, if no new ah just ret */
if (pt->ah_count_in_use ==
context->max_http_header_pool) {
lwsl_err("ah wl denied\n");
goto bail;
}
/* new ah.... remove ourselves from waiting list */
*pwsi = wsi->u.hdr.ah_wait_list;
wsi->u.hdr.ah_wait_list = NULL;
pt->ah_wait_list_length--;
break;
}
pwsi = &(*pwsi)->u.hdr.ah_wait_list;
}
/*
* server should have suppressed the accept of a new wsi before this
* became the case. If initiating multiple client connects, make sure
* the ah pool is big enough to cope, or be prepared to retry
* pool is all busy... add us to waiting list and return that we
* weren't able to deliver it right now
*/
if (context->ah_count_in_use == context->max_http_header_pool) {
lwsl_err("No free ah\n");
return -1;
if (pt->ah_count_in_use == context->max_http_header_pool) {
lwsl_err("%s: adding %p to ah waiting list\n", __func__, wsi);
wsi->u.hdr.ah_wait_list = pt->ah_wait_list;
pt->ah_wait_list = wsi;
pt->ah_wait_list_length++;
/* we cannot accept input then */
_lws_change_pollfd(wsi, LWS_POLLIN, 0, &pa);
goto bail;
}
for (n = 0; n < context->max_http_header_pool; n++)
if (!context->ah_pool[n].in_use)
if (!pt->ah_pool[n].in_use)
break;
/* if the count of in use said something free... */
assert(n != context->max_http_header_pool);
wsi->u.hdr.ah = &context->ah_pool[n];
wsi->u.hdr.ah = &pt->ah_pool[n];
wsi->u.hdr.ah->in_use = 1;
pt->ah_count_in_use++;
context->ah_count_in_use++;
/* if we used up all the ah, defeat accepting new server connections */
if (context->ah_count_in_use == context->max_http_header_pool)
if (_lws_server_listen_accept_flow_control(context, 0))
return 1;
_lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa);
lwsl_debug("%s: wsi %p: ah %p: count %d (on exit)\n",
__func__, (void *)wsi, (void *)wsi->u.hdr.ah,
context->ah_count_in_use);
lwsl_info("%s: wsi %p: ah %p: count %d (on exit)\n", __func__,
(void *)wsi, (void *)wsi->u.hdr.ah, pt->ah_count_in_use);
lws_pt_unlock(pt);
reset:
/* init the ah to reflect no headers or data have appeared yet */
memset(wsi->u.hdr.ah->frag_index, 0, sizeof(wsi->u.hdr.ah->frag_index));
wsi->u.hdr.ah->nfrag = 0;
wsi->u.hdr.ah->pos = 0;
lws_reset_header_table(wsi);
time(&wsi->u.hdr.ah->assigned);
return 0;
bail:
lws_pt_unlock(pt);
return 1;
}
int lws_free_header_table(struct lws *wsi)
{
struct lws_context *context = wsi->context;
struct allocated_headers *ah = wsi->u.hdr.ah;
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
struct lws_pollargs pa;
struct lws **pwsi;
time_t now;
lwsl_debug("%s: wsi %p: ah %p (count = %d)\n", __func__, (void *)wsi,
(void *)wsi->u.hdr.ah, context->ah_count_in_use);
lwsl_info("%s: wsi %p: ah %p (tsi=%d, count = %d)\n", __func__, (void *)wsi,
(void *)wsi->u.hdr.ah, wsi->tsi, pt->ah_count_in_use);
assert(wsi->u.hdr.ah);
if (!wsi->u.hdr.ah)
return 0;
lws_pt_lock(pt);
pwsi = &pt->ah_wait_list;
if (!wsi->u.hdr.ah) { /* remove from wait list if that's all */
if (wsi->socket_is_permanently_unusable)
while (*pwsi) {
if (*pwsi == wsi) {
lwsl_info("%s: wsi %p, removing from wait list\n",
__func__, wsi);
*pwsi = wsi->u.hdr.ah_wait_list;
wsi->u.hdr.ah_wait_list = NULL;
pt->ah_wait_list_length--;
goto bail;
}
pwsi = &(*pwsi)->u.hdr.ah_wait_list;
}
goto bail;
}
time(&now);
if (now - wsi->u.hdr.ah->assigned > 3)
lwsl_err("header assign - free time %d\n",
(int)(now - wsi->u.hdr.ah->assigned));
/* if we think we're freeing one, there should be one to free */
assert(context->ah_count_in_use > 0);
assert(pt->ah_count_in_use > 0);
/* and he should have been in use */
assert(wsi->u.hdr.ah->in_use);
wsi->u.hdr.ah->in_use = 0;
/* if we just freed up one ah, allow new server connection */
if (context->ah_count_in_use == context->max_http_header_pool)
if (_lws_server_listen_accept_flow_control(context, 1))
return 1;
context->ah_count_in_use--;
wsi->u.hdr.ah = NULL;
if (!*pwsi) {
ah->in_use = 0;
pt->ah_count_in_use--;
goto bail;
}
/* somebody else on same tsi is waiting, give it to him */
lwsl_info("pt wait list %p\n", *pwsi);
while ((*pwsi)->u.hdr.ah_wait_list)
pwsi = &(*pwsi)->u.hdr.ah_wait_list;
wsi = *pwsi;
lwsl_info("last wsi in wait list %p\n", wsi);
wsi->u.hdr.ah = ah;
lws_reset_header_table(wsi);
time(&wsi->u.hdr.ah->assigned);
assert(wsi->position_in_fds_table != -1);
lwsl_info("%s: Enabling %p POLLIN\n", __func__, wsi);
/* his wait is over, let him progress */
_lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa);
/* point prev guy to next guy in list instead */
*pwsi = wsi->u.hdr.ah_wait_list;
wsi->u.hdr.ah_wait_list = NULL;
pt->ah_wait_list_length--;
assert(!!pt->ah_wait_list_length == !!(int)(long)pt->ah_wait_list);
bail:
lws_pt_unlock(pt);
return 0;
}
@ -386,6 +477,8 @@ lws_parse(struct lws *wsi, unsigned char c)
struct lws_context *context = wsi->context;
unsigned int n, m, enc = 0;
assert(wsi->u.hdr.ah);
switch (wsi->u.hdr.parser_state) {
default:
@ -651,8 +744,7 @@ swallow:
lwsl_parser("known hdr %d\n", n);
for (m = 0; m < ARRAY_SIZE(methods); m++)
if (n == methods[m] &&
ah->frag_index[
methods[m]]) {
ah->frag_index[methods[m]]) {
lwsl_warn("Duplicated method\n");
return -1;
}

View file

@ -21,11 +21,80 @@
#include "private-libwebsockets.h"
int
_lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
{
struct lws_context *context;
struct lws_context_per_thread *pt;
int ret = 0, pa_events = 1;
struct lws_pollfd *pfd;
int sampled_tid, tid;
if (!wsi || wsi->position_in_fds_table < 0)
return 0;
context = wsi->context;
pt = &context->pt[(int)wsi->tsi];
assert(wsi->position_in_fds_table >= 0 &&
wsi->position_in_fds_table < pt->fds_count);
pfd = &pt->fds[wsi->position_in_fds_table];
pa->fd = wsi->sock;
pa->prev_events = pfd->events;
pa->events = pfd->events = (pfd->events & ~_and) | _or;
if (context->protocols[0].callback(wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD,
wsi->user_space, (void *)pa, 0)) {
ret = -1;
goto bail;
}
/*
* if we changed something in this pollfd...
* ... and we're running in a different thread context
* than the service thread...
* ... and the service thread is waiting ...
* then cancel it to force a restart with our changed events
*/
#if LWS_POSIX
pa_events = pa->prev_events != pa->events;
#endif
if (pa_events) {
if (lws_plat_change_pollfd(context, wsi, pfd)) {
lwsl_info("%s failed\n", __func__);
ret = -1;
goto bail;
}
sampled_tid = context->service_tid;
if (sampled_tid) {
tid = context->protocols[0].callback(wsi,
LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0);
if (tid == -1) {
ret = -1;
goto bail;
}
if (tid != sampled_tid)
lws_cancel_service_pt(wsi);
}
}
bail:
return ret;
}
int
insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
{
struct lws_pollargs pa = { wsi->sock, LWS_POLLIN, 0 };
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
int ret = 0;
#ifndef LWS_NO_SERVER
struct lws_pollargs pa1;
#endif
lwsl_info("%s: %p: tsi=%d, sock=%d, pos-in-fds=%d\n",
__func__, wsi, wsi->tsi, wsi->sock, pt->fds_count);
if ((unsigned int)pt->fds_count >= context->fd_limit_per_thread) {
lwsl_err("Too many fds (%d)\n", context->max_fds);
@ -47,83 +116,105 @@ insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
wsi->user_space, (void *) &pa, 1))
return -1;
lws_pt_lock(pt);
pt->count_conns++;
insert_wsi(context, wsi);
wsi->position_in_fds_table = pt->fds_count;
pt->fds[pt->fds_count].fd = wsi->sock;
pt->fds[pt->fds_count].events = LWS_POLLIN;
/* don't apply this logic to the listening socket... */
// if (wsi->mode != LWSCM_SERVER_LISTENER && !wsi->u.hdr.ah)
// pt->fds[pt->fds_count].events = 0;
pa.events = pt->fds[pt->fds_count].events;
lws_plat_insert_socket_into_fds(context, wsi);
/* external POLL support via protocol 0 */
if (context->protocols[0].callback(wsi, LWS_CALLBACK_ADD_POLL_FD,
wsi->user_space, (void *) &pa, 0))
return -1;
ret = -1;
#ifndef LWS_NO_SERVER
/* if no more room, defeat accepts on this thread */
if ((unsigned int)pt->fds_count == context->fd_limit_per_thread - 1)
_lws_change_pollfd(pt->wsi_listening, LWS_POLLIN, 0, &pa1);
#endif
lws_pt_unlock(pt);
if (context->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
wsi->user_space, (void *)&pa, 1))
return -1;
ret = -1;
return 0;
return ret;
}
int
remove_wsi_socket_from_fds(struct lws *wsi)
{
int m;
int m, ret = 0;
struct lws *end_wsi;
struct lws_pollargs pa = { wsi->sock, 0, 0 };
#ifndef LWS_NO_SERVER
struct lws_pollargs pa1;
#endif
struct lws_context *context = wsi->context;
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE);
--pt->fds_count;
#if !defined(_WIN32) && !defined(MBED_OPERATORS)
if (wsi->sock > context->max_fds) {
lwsl_err("Socket fd %d too high (%d)\n",
wsi->sock, context->max_fds);
lwsl_err("fd %d too high (%d)\n", wsi->sock, context->max_fds);
return 1;
}
#endif
lwsl_info("%s: wsi=%p, sock=%d, fds pos=%d\n", __func__,
wsi, wsi->sock, wsi->position_in_fds_table);
if (context->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
wsi->user_space, (void *)&pa, 1))
return -1;
m = wsi->position_in_fds_table; /* replace the contents for this */
lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE);
/* have the last guy take up the vacant slot */
pt->fds[m] = pt->fds[pt->fds_count];
lws_pt_lock(pt);
lwsl_info("%s: wsi=%p, sock=%d, fds pos=%d, end guy pos=%d, endfd=%d\n",
__func__, wsi, wsi->sock, wsi->position_in_fds_table,
pt->fds_count, pt->fds[pt->fds_count].fd);
/* the guy who is to be deleted's slot index in pt->fds */
m = wsi->position_in_fds_table;
/* have the last guy take up the now vacant slot */
pt->fds[m] = pt->fds[pt->fds_count - 1];
lws_plat_delete_socket_from_fds(context, wsi, m);
/*
* end guy's fds_lookup entry remains unchanged
* (still same fd pointing to same wsi)
*/
/* end guy's "position in fds table" changed */
/* end guy's "position in fds table" is now the deletion guy's old one */
end_wsi = wsi_from_fd(context, pt->fds[pt->fds_count].fd);
assert(end_wsi);
end_wsi->position_in_fds_table = m;
/* deletion guy's lws_lookup entry needs nuking */
delete_from_fd(context, wsi->sock);
/* removed wsi has no position any more */
wsi->position_in_fds_table = -1;
/* remove also from external POLL support via protocol 0 */
if (lws_socket_is_valid(wsi->sock)) {
if (lws_socket_is_valid(wsi->sock))
if (context->protocols[0].callback(wsi, LWS_CALLBACK_DEL_POLL_FD,
wsi->user_space, (void *) &pa, 0))
return -1;
}
wsi->user_space, (void *) &pa, 0))
ret = -1;
#ifndef LWS_NO_SERVER
/* if this made some room, accept connects on this thread */
if ((unsigned int)pt->fds_count < context->fd_limit_per_thread - 1)
_lws_change_pollfd(pt->wsi_listening, 0, LWS_POLLIN, &pa1);
#endif
lws_pt_unlock(pt);
if (context->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
wsi->user_space, (void *) &pa, 1))
return -1;
ret = -1;
return 0;
return ret;
}
int
@ -131,11 +222,8 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or)
{
struct lws_context *context;
struct lws_context_per_thread *pt;
int tid;
int sampled_tid;
struct lws_pollfd *pfd;
int ret = 0;
struct lws_pollargs pa;
int pa_events = 1;
if (!wsi || !wsi->protocol || wsi->position_in_fds_table < 0)
return 1;
@ -144,55 +232,20 @@ lws_change_pollfd(struct lws *wsi, int _and, int _or)
if (!context)
return 1;
pt = &context->pt[(int)wsi->tsi];
pfd = &pt->fds[wsi->position_in_fds_table];
pa.fd = wsi->sock;
if (context->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
wsi->user_space, (void *) &pa, 0))
return -1;
pa.prev_events = pfd->events;
pa.events = pfd->events = (pfd->events & ~_and) | _or;
if (context->protocols[0].callback(wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD,
wsi->user_space, (void *) &pa, 0))
return -1;
/*
* if we changed something in this pollfd...
* ... and we're running in a different thread context
* than the service thread...
* ... and the service thread is waiting ...
* then cancel it to force a restart with our changed events
*/
#if LWS_POSIX
pa_events = pa.prev_events != pa.events;
#endif
if (pa_events) {
if (lws_plat_change_pollfd(context, wsi, pfd)) {
lwsl_info("%s failed\n", __func__);
return 1;
}
sampled_tid = context->service_tid;
if (sampled_tid) {
tid = context->protocols[0].callback(wsi,
LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0);
if (tid == -1)
return -1;
if (tid != sampled_tid)
lws_cancel_service_pt(wsi);
}
}
pt = &context->pt[(int)wsi->tsi];
lws_pt_lock(pt);
ret = _lws_change_pollfd(wsi, _and, _or, &pa);
lws_pt_unlock(pt);
if (context->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
wsi->user_space, (void *) &pa, 0))
return -1;
ret = -1;
return 0;
return ret;
}
@ -287,7 +340,7 @@ lws_callback_on_writable_all_protocol(const struct lws_context *context,
{
const struct lws_context_per_thread *pt = &context->pt[0];
struct lws *wsi;
int n, m = context->count_threads;
unsigned int n, m = context->count_threads;
while (m--) {
for (n = 0; n < pt->fds_count; n++) {

View file

@ -1,7 +1,7 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2015 Andy Green <andy@warmcat.com>
* Copyright (C) 2010 - 2016 Andy Green <andy@warmcat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@ -34,6 +34,9 @@
#include <limits.h>
#include <stdarg.h>
#include <assert.h>
#if LWS_MAX_SMP > 1
#include <pthread.h>
#endif
#ifdef LWS_HAVE_SYS_STAT_H
#include <sys/stat.h>
@ -60,6 +63,7 @@
#define MSG_NOSIGNAL 0
#define SHUT_RDWR SD_BOTH
#define SOL_TCP IPPROTO_TCP
#define SHUT_WR SD_SEND
#define compatible_close(fd) closesocket(fd)
#define lws_set_blocking_send(wsi) wsi->sock_send_blocking = 1
@ -280,11 +284,11 @@ extern "C" {
#endif
#endif
#ifndef LWS_MAX_HEADER_LEN
#define LWS_MAX_HEADER_LEN 1024
#ifndef LWS_DEF_HEADER_LEN
#define LWS_DEF_HEADER_LEN 1024
#endif
#ifndef LWS_MAX_HEADER_POOL
#define LWS_MAX_HEADER_POOL 16
#ifndef LWS_DEF_HEADER_POOL
#define LWS_DEF_HEADER_POOL 16
#endif
#ifndef LWS_MAX_PROTOCOLS
#define LWS_MAX_PROTOCOLS 5
@ -299,7 +303,7 @@ extern "C" {
#define SPEC_LATEST_SUPPORTED 13
#endif
#ifndef AWAITING_TIMEOUT
#define AWAITING_TIMEOUT 5
#define AWAITING_TIMEOUT 20
#endif
#ifndef CIPHERS_LIST_STRING
#define CIPHERS_LIST_STRING "DEFAULT"
@ -315,12 +319,6 @@ extern "C" {
#define SYSTEM_RANDOM_FILEPATH "/dev/urandom"
#endif
/*
* if not in a connection storm, check for incoming
* connections this many normal connection services
*/
#define LWS_lserv_mod 10
enum lws_websocket_opcodes_07 {
LWSWSOPC_CONTINUATION = 0,
LWSWSOPC_TEXT_FRAME = 1,
@ -347,6 +345,7 @@ enum lws_connection_states {
LWSS_RETURNED_CLOSE_ALREADY,
LWSS_AWAITING_CLOSE_ACK,
LWSS_FLUSHING_STORED_SEND_BEFORE_CLOSE,
LWSS_SHUTDOWN,
LWSS_HTTP2_AWAIT_CLIENT_PREFACE,
LWSS_HTTP2_ESTABLISHED_PRE_SETTINGS,
@ -411,6 +410,7 @@ enum connection_mode {
/* transient, ssl delay hiding */
LWSCM_SSL_ACK_PENDING,
LWSCM_SSL_INIT,
/* transient modes */
LWSCM_WSCL_WAITING_CONNECT,
@ -481,6 +481,7 @@ struct allocated_headers {
* lws_fragments->nfrag for continuation.
*/
struct lws_fragments frags[WSI_TOKEN_COUNT * 2];
time_t assigned;
/*
* for each recognized token, frag_index says which frag[] his data
* starts in (0 means the token did not appear)
@ -503,12 +504,26 @@ struct allocated_headers {
*/
struct lws_context_per_thread {
#if LWS_MAX_SMP > 1
pthread_mutex_t lock;
#endif
struct lws_pollfd *fds;
struct lws *rx_draining_ext_list;
struct lws *tx_draining_ext_list;
struct lws *timeout_list;
void *http_header_data;
struct allocated_headers *ah_pool;
struct lws *ah_wait_list;
int ah_wait_list_length;
#ifdef LWS_OPENSSL_SUPPORT
struct lws *pending_read_list; /* linked list */
#endif
#ifndef LWS_NO_SERVER
struct lws *wsi_listening;
#endif
lws_sockfd_type lserv_fd;
unsigned long count_conns;
/*
* usable by anything in the service code, but only if the scope
* does not last longer than the service action (since next service
@ -520,7 +535,9 @@ struct lws_context_per_thread {
#else
int dummy_pipe_fds[2];
#endif
int fds_count;
unsigned int fds_count;
short ah_count_in_use;
};
/*
@ -551,14 +568,8 @@ struct lws_context {
const char *iface;
const struct lws_token_limits *token_limits;
void *user_space;
struct lws *timeout_list;
#ifndef LWS_NO_SERVER
struct lws *wsi_listening;
#endif
const struct lws_protocols *protocols;
void *http_header_data;
struct allocated_headers *ah_pool;
#ifdef LWS_OPENSSL_SUPPORT
SSL_CTX *ssl_ctx;
@ -576,8 +587,6 @@ struct lws_context {
char worst_latency_info[256];
#endif
lws_sockfd_type lserv_fd;
int max_fds;
int listen_port;
#ifdef LWS_USE_LIBEV
@ -587,8 +596,6 @@ struct lws_context {
int fd_random;
int lserv_mod;
int lserv_count;
int lserv_seen;
unsigned int http_proxy_port;
unsigned int options;
unsigned int fd_limit_per_thread;
@ -624,7 +631,6 @@ struct lws_context {
short max_http_header_data;
short max_http_header_pool;
short ah_count_in_use;
short count_threads;
unsigned int being_destroyed:1;
@ -697,6 +703,8 @@ enum uri_esc_states {
struct _lws_http_mode_related {
/* MUST be first in struct */
struct allocated_headers *ah; /* mirroring _lws_header_related */
struct lws *ah_wait_list;
struct lws *new_wsi_list;
unsigned long filepos;
unsigned long filelen;
lws_filefd_type fd;
@ -857,6 +865,7 @@ struct _lws_http2_related {
struct _lws_header_related {
/* MUST be first in struct */
struct allocated_headers *ah;
struct lws *ah_wait_list;
enum uri_path_states ups;
enum uri_esc_states ues;
short lextable_pos;
@ -986,6 +995,7 @@ struct lws {
unsigned char ietf_spec_revision;
char mode; /* enum connection_mode */
char state; /* enum lws_connection_states */
char state_pre_close;
char lws_rx_parse_state; /* enum lws_rx_parse_state */
char rx_frame_type; /* enum lws_write_protocol */
char pending_timeout; /* enum pending_timeout */
@ -1049,7 +1059,7 @@ LWS_EXTERN int
delete_from_fd(struct lws_context *context, lws_sockfd_type fd);
#else
#define wsi_from_fd(A,B) A->lws_lookup[B]
#define insert_wsi(A,B) A->lws_lookup[B->sock]=B
#define insert_wsi(A,B) assert(A->lws_lookup[B->sock] == 0); A->lws_lookup[B->sock]=B
#define delete_from_fd(A,B) A->lws_lookup[B]=0
#endif
@ -1222,7 +1232,7 @@ enum lws_ssl_capable_status {
#define lws_ssl_capable_read lws_ssl_capable_read_no_ssl
#define lws_ssl_capable_write lws_ssl_capable_write_no_ssl
#define lws_ssl_pending lws_ssl_pending_no_ssl
#define lws_server_socket_service_ssl(_a, _b, _c, _d) (0)
#define lws_server_socket_service_ssl(_b, _c) (0)
#define lws_ssl_close(_a) (0)
#define lws_ssl_context_destroy(_a)
#define lws_ssl_remove_wsi_from_buffered_list(_a)
@ -1236,9 +1246,7 @@ lws_ssl_capable_write(struct lws *wsi, unsigned char *buf, int len);
LWS_EXTERN int LWS_WARN_UNUSED_RESULT
lws_ssl_pending(struct lws *wsi);
LWS_EXTERN int LWS_WARN_UNUSED_RESULT
lws_server_socket_service_ssl(struct lws **wsi, struct lws *new_wsi,
lws_sockfd_type accept_fd,
struct lws_pollfd *pollfd);
lws_server_socket_service_ssl(struct lws *new_wsi, lws_sockfd_type accept_fd);
LWS_EXTERN int
lws_ssl_close(struct lws *wsi);
LWS_EXTERN void
@ -1265,6 +1273,29 @@ lws_context_init_http2_ssl(struct lws_context *context);
#endif
#endif
#if LWS_MAX_SMP > 1
static LWS_INLINE void
lws_pt_mutex_init(struct lws_context_per_thread *pt)
{
pthread_mutex_init(&pt->lock, NULL);
}
static LWS_INLINE void
lws_pt_lock(struct lws_context_per_thread *pt)
{
pthread_mutex_lock(&pt->lock);
}
static LWS_INLINE void
lws_pt_unlock(struct lws_context_per_thread *pt)
{
pthread_mutex_unlock(&pt->lock);
}
#else
#define lws_pt_mutex_init(_a) (void)(_a)
#define lws_pt_lock(_a) (void)(_a)
#define lws_pt_unlock(_a) (void)(_a)
#endif
LWS_EXTERN int LWS_WARN_UNUSED_RESULT
lws_ssl_capable_read_no_ssl(struct lws *wsi, unsigned char *buf, int len);
@ -1297,6 +1328,9 @@ lws_decode_ssl_error(void);
LWS_EXTERN int
_lws_rx_flow_control(struct lws *wsi);
LWS_EXTERN int
_lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa);
#ifndef LWS_NO_SERVER
LWS_EXTERN int
lws_server_socket_service(struct lws_context *context, struct lws *wsi,
@ -1304,7 +1338,7 @@ lws_server_socket_service(struct lws_context *context, struct lws *wsi,
LWS_EXTERN int
lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len);
LWS_EXTERN int
_lws_server_listen_accept_flow_control(struct lws_context *context, int on);
_lws_server_listen_accept_flow_control(struct lws *twsi, int on);
#else
#define lws_server_socket_service(_a, _b, _c) (0)
#define lws_handshake_server(_a, _b, _c) (0)

View file

@ -166,14 +166,13 @@ handshake_0405(struct lws_context *context, struct lws *wsi)
int accept_len;
if (!lws_hdr_total_length(wsi, WSI_TOKEN_HOST) ||
!lws_hdr_total_length(wsi, WSI_TOKEN_KEY)) {
!lws_hdr_total_length(wsi, WSI_TOKEN_KEY)) {
lwsl_parser("handshake_04 missing pieces\n");
/* completed header processing, but missing some bits */
goto bail;
}
if (lws_hdr_total_length(wsi, WSI_TOKEN_KEY) >=
MAX_WEBSOCKET_04_KEY_LEN) {
if (lws_hdr_total_length(wsi, WSI_TOKEN_KEY) >= MAX_WEBSOCKET_04_KEY_LEN) {
lwsl_warn("Client key too long %d\n", MAX_WEBSOCKET_04_KEY_LEN);
goto bail;
}
@ -183,8 +182,8 @@ handshake_0405(struct lws_context *context, struct lws *wsi)
* overflow
*/
n = sprintf((char *)pt->serv_buf,
"%s258EAFA5-E914-47DA-95CA-C5AB0DC85B11",
lws_hdr_simple_ptr(wsi, WSI_TOKEN_KEY));
"%s258EAFA5-E914-47DA-95CA-C5AB0DC85B11",
lws_hdr_simple_ptr(wsi, WSI_TOKEN_KEY));
lws_SHA1(pt->serv_buf, n, hash);

View file

@ -33,8 +33,9 @@ int lws_context_init_server(struct lws_context_creation_info *info,
socklen_t len = sizeof(struct sockaddr);
struct sockaddr_in sin;
struct sockaddr *v;
int n, opt = 1;
int n, opt = 1, limit = 1;
#endif
int m = 0;
lws_sockfd_type sockfd;
struct lws *wsi;
@ -44,6 +45,11 @@ int lws_context_init_server(struct lws_context_creation_info *info,
return 0;
#if LWS_POSIX
#if defined(__linux__)
limit = context->count_threads;
#endif
for (m = 0; m < limit; m++) {
#ifdef LWS_USE_IPV6
if (LWS_IPV6_ENABLED(context))
sockfd = socket(AF_INET6, SOCK_STREAM, 0);
@ -69,6 +75,13 @@ int lws_context_init_server(struct lws_context_creation_info *info,
compatible_close(sockfd);
return 1;
}
#if defined(__linux__) && defined(SO_REUSEPORT)
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT,
(const void *)&opt, sizeof(opt)) < 0) {
compatible_close(sockfd);
return 1;
}
#endif
#endif
lws_plat_set_socket_options(context, sockfd);
@ -122,19 +135,19 @@ int lws_context_init_server(struct lws_context_creation_info *info,
wsi->sock = sockfd;
wsi->mode = LWSCM_SERVER_LISTENER;
wsi->protocol = context->protocols;
wsi->tsi = m;
context->wsi_listening = wsi;
context->pt[m].wsi_listening = wsi;
if (insert_wsi_socket_into_fds(context, wsi))
goto bail;
context->lserv_mod = LWS_lserv_mod;
context->lserv_count = 0;
context->lserv_fd = sockfd;
context->pt[m].lserv_fd = sockfd;
#if LWS_POSIX
listen(sockfd, LWS_SOMAXCONN);
listen(wsi->sock, LWS_SOMAXCONN);
} /* for each thread able to independently lister */
#else
mbed3_tcp_stream_bind(sockfd, info->port, wsi);
mbed3_tcp_stream_bind(wsi->sock, info->port, wsi);
#endif
lwsl_notice(" Listening on port %d\n", info->port);
@ -147,15 +160,17 @@ bail:
}
int
_lws_server_listen_accept_flow_control(struct lws_context *context, int on)
_lws_server_listen_accept_flow_control(struct lws *twsi, int on)
{
struct lws *wsi = context->wsi_listening;
struct lws_context_per_thread *pt = &twsi->context->pt[(int)twsi->tsi];
struct lws *wsi = pt->wsi_listening;
int n;
if (!wsi || context->being_destroyed)
if (!wsi || twsi->context->being_destroyed)
return 0;
lwsl_debug("%s: wsi %p: state %d\n", __func__, (void *)wsi, on);
lwsl_debug("%s: Thr %d: LISTEN wsi %p: state %d\n",
__func__, twsi->tsi, (void *)wsi, on);
if (on)
n = lws_change_pollfd(wsi, 0, LWS_POLLIN);
@ -324,7 +339,7 @@ int lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len)
char protocol_name[32];
char *p;
/* LWSCM_WS_SERVING */
assert(wsi->u.hdr.ah);
while (len--) {
@ -576,10 +591,33 @@ bail_nuke_ah:
return 1;
}
static int
lws_get_idlest_tsi(struct lws_context *context)
{
unsigned int lowest = ~0;
int n = 0, hit = -1;
for (; n < context->count_threads; n++) {
if ((unsigned int)context->pt[n].fds_count != context->fd_limit_per_thread - 1 &&
(unsigned int)context->pt[n].fds_count < lowest) {
lowest = context->pt[n].fds_count;
hit = n;
}
}
return hit;
}
struct lws *
lws_create_new_server_wsi(struct lws_context *context)
{
struct lws *new_wsi;
int n = lws_get_idlest_tsi(context);
if (n < 0) {
lwsl_err("no space for new conn\n");
return NULL;
}
new_wsi = lws_zalloc(sizeof(struct lws));
if (new_wsi == NULL) {
@ -587,6 +625,9 @@ lws_create_new_server_wsi(struct lws_context *context)
return NULL;
}
new_wsi->tsi = n;
lwsl_info("Accepted %p to tsi %d\n", new_wsi, new_wsi->tsi);
new_wsi->context = context;
new_wsi->pending_timeout = NO_PENDING_TIMEOUT;
new_wsi->rxflow_change_to = LWS_RXFLOW_ALLOW;
@ -601,11 +642,6 @@ lws_create_new_server_wsi(struct lws_context *context)
new_wsi->use_ssl = LWS_SSL_ENABLED(context);
#endif
if (lws_allocate_header_table(new_wsi)) {
lws_free(new_wsi);
return NULL;
}
/*
* these can only be set once the protocol is known
* we set an unestablished connection's protocol pointer
@ -617,12 +653,13 @@ lws_create_new_server_wsi(struct lws_context *context)
new_wsi->ietf_spec_revision = 0;
new_wsi->sock = LWS_SOCK_INVALID;
/*
* outermost create notification for wsi
* no user_space because no protocol selection
*/
context->protocols[0].callback(new_wsi, LWS_CALLBACK_WSI_CREATE, NULL,
NULL, 0);
context->protocols[0].callback(new_wsi, LWS_CALLBACK_WSI_CREATE,
NULL, NULL, 0);
return new_wsi;
}
@ -642,7 +679,7 @@ lws_http_transaction_completed(struct lws *wsi)
lwsl_debug("%s: wsi %p\n", __func__, wsi);
/* if we can't go back to accept new headers, drop the connection */
if (wsi->u.http.connection_type != HTTP_CONNECTION_KEEP_ALIVE) {
lwsl_info("%s: close connection\n", __func__);
lwsl_info("%s: %p: close connection\n", __func__, wsi);
return 1;
}
@ -655,34 +692,75 @@ lws_http_transaction_completed(struct lws *wsi)
lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
if (lws_allocate_header_table(wsi))
return 1;
lwsl_info("On waiting list for header table");
/* If we're (re)starting on headers, need other implied init */
wsi->u.hdr.ues = URIES_IDLE;
lwsl_info("%s: keep-alive await new transaction\n", __func__);
lwsl_info("%s: %p: keep-alive await new transaction\n", __func__, wsi);
return 0;
}
static int
lws_get_idlest_tsi(struct lws_context *context)
/*
* either returns new wsi bound to accept_fd, or closes accept_fd and
* returns NULL, having cleaned up any new wsi pieces
*/
LWS_VISIBLE struct lws *
lws_adopt_socket(struct lws_context *context, lws_sockfd_type accept_fd)
{
unsigned int lowest = ~0;
int n, hit = 0;
struct lws *new_wsi = lws_create_new_server_wsi(context);
if (!new_wsi) {
compatible_close(accept_fd);
return NULL;
}
for (n = 0; n < context->count_threads; n++)
if ((unsigned int)context->pt[n].fds_count < lowest) {
lowest = context->pt[n].fds_count;
hit = n;
}
new_wsi->sock = accept_fd;
return hit;
/* the transport is accepted... give him time to negotiate */
lws_set_timeout(new_wsi, PENDING_TIMEOUT_ESTABLISH_WITH_SERVER,
AWAITING_TIMEOUT);
#if LWS_POSIX == 0
mbed3_tcp_stream_accept(accept_fd, new_wsi);
#endif
/*
* A new connection was accepted. Give the user a chance to
* set properties of the newly created wsi. There's no protocol
* selected yet so we issue this to protocols[0]
*/
if ((context->protocols[0].callback)(new_wsi,
LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED, NULL, NULL, 0)) {
compatible_close(new_wsi->sock);
lws_free(new_wsi);
return NULL;
}
lws_libev_accept(new_wsi, new_wsi->sock);
if (!LWS_SSL_ENABLED(context)) {
if (insert_wsi_socket_into_fds(context, new_wsi))
goto fail;
} else {
new_wsi->mode = LWSCM_SSL_INIT;
if (lws_server_socket_service_ssl(new_wsi, accept_fd))
goto fail;
}
return new_wsi;
fail:
lwsl_err("%s: fail\n", __func__);
lws_close_free_wsi(new_wsi, LWS_CLOSE_STATUS_NOSTATUS);
return NULL;
}
LWS_VISIBLE
int lws_server_socket_service(struct lws_context *context,
struct lws *wsi, struct lws_pollfd *pollfd)
LWS_VISIBLE int
lws_server_socket_service(struct lws_context *context, struct lws *wsi,
struct lws_pollfd *pollfd)
{
lws_sockfd_type accept_fd = LWS_SOCK_INVALID;
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
@ -690,7 +768,7 @@ int lws_server_socket_service(struct lws_context *context,
struct sockaddr_in cli_addr;
socklen_t clilen;
#endif
struct lws *new_wsi = NULL;
int n, len;
switch (wsi->mode) {
@ -698,6 +776,7 @@ int lws_server_socket_service(struct lws_context *context,
case LWSCM_HTTP_SERVING:
case LWSCM_HTTP_SERVING_ACCEPTED:
case LWSCM_HTTP2_SERVING:
case LWSS_SHUTDOWN:
/* handle http headers coming in */
@ -720,15 +799,19 @@ int lws_server_socket_service(struct lws_context *context,
/* any incoming data ready? */
if (!(pollfd->revents & pollfd->events && LWS_POLLIN))
if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
goto try_pollout;
if (wsi->state == LWSS_HTTP && !wsi->u.hdr.ah)
if (lws_allocate_header_table(wsi))
goto try_pollout;
len = lws_ssl_capable_read(wsi, pt->serv_buf,
LWS_MAX_SOCKET_IO_BUF);
lwsl_debug("%s: read %d\r\n", __func__, len);
lwsl_debug("%s: wsi %p read %d\r\n", __func__, wsi, len);
switch (len) {
case 0:
lwsl_info("lws_server_skt_srv: read 0 len\n");
lwsl_info("%s: read 0 len\n", __func__);
/* lwsl_info(" state=%d\n", wsi->state); */
if (!wsi->hdr_parsing_completed)
lws_free_header_table(wsi);
@ -748,7 +831,6 @@ int lws_server_socket_service(struct lws_context *context,
n = lws_read(wsi, pt->serv_buf, len);
if (n < 0) /* we closed wsi */
return 1;
/* hum he may have used up the
* writability above */
break;
@ -761,8 +843,10 @@ try_pollout:
break;
/* one shot */
if (lws_change_pollfd(wsi, LWS_POLLOUT, 0))
if (lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
lwsl_notice("%s a\n", __func__);
goto fail;
}
lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_WRITE);
@ -771,15 +855,19 @@ try_pollout:
wsi->protocol->callback,
wsi, LWS_CALLBACK_HTTP_WRITEABLE,
wsi->user_space, NULL, 0);
if (n < 0)
if (n < 0) {
lwsl_info("writeable_fail\n");
goto fail;
}
break;
}
/* >0 == completion, <0 == error */
n = lws_serve_http_file_fragment(wsi);
if (n < 0 || (n > 0 && lws_http_transaction_completed(wsi)))
if (n < 0 || (n > 0 && lws_http_transaction_completed(wsi))) {
lwsl_info("completed\n");
goto fail;
}
break;
case LWSCM_SERVER_LISTENER:
@ -787,91 +875,65 @@ try_pollout:
#if LWS_POSIX
/* pollin means a client has connected to us then */
if (!(pollfd->revents & LWS_POLLIN))
break;
do {
if (!(pollfd->revents & LWS_POLLIN) || !(pollfd->events & LWS_POLLIN))
break;
/* listen socket got an unencrypted connection... */
/* listen socket got an unencrypted connection... */
clilen = sizeof(cli_addr);
lws_latency_pre(context, wsi);
accept_fd = accept(pollfd->fd, (struct sockaddr *)&cli_addr,
&clilen);
lws_latency(context, wsi,
"unencrypted accept LWSCM_SERVER_LISTENER",
accept_fd, accept_fd >= 0);
if (accept_fd < 0) {
if (LWS_ERRNO == LWS_EAGAIN ||
LWS_ERRNO == LWS_EWOULDBLOCK) {
lwsl_debug("accept asks to try again\n");
clilen = sizeof(cli_addr);
lws_latency_pre(context, wsi);
accept_fd = accept(pollfd->fd, (struct sockaddr *)&cli_addr,
&clilen);
lws_latency(context, wsi, "listener accept", accept_fd,
accept_fd >= 0);
if (accept_fd < 0) {
if (LWS_ERRNO == LWS_EAGAIN ||
LWS_ERRNO == LWS_EWOULDBLOCK) {
lwsl_err("accept asks to try again\n");
break;
}
lwsl_err("ERROR on accept: %s\n", strerror(LWS_ERRNO));
break;
}
lwsl_warn("ERROR on accept: %s\n", strerror(LWS_ERRNO));
break;
}
lws_plat_set_socket_options(context, accept_fd);
#else
/* not very beautiful... */
accept_fd = (lws_sockfd_type)pollfd;
#endif
/*
* look at who we connected to and give user code a chance
* to reject based on client IP. There's no protocol selected
* yet so we issue this to protocols[0]
*/
lws_plat_set_socket_options(context, accept_fd);
if ((context->protocols[0].callback)(wsi,
LWS_CALLBACK_FILTER_NETWORK_CONNECTION,
NULL, (void *)(long)accept_fd, 0)) {
lwsl_debug("Callback denied network connection\n");
compatible_close(accept_fd);
break;
}
new_wsi = lws_create_new_server_wsi(context);
if (new_wsi == NULL) {
compatible_close(accept_fd);
break;
}
new_wsi->sock = accept_fd;
new_wsi->tsi = lws_get_idlest_tsi(context);
lwsl_info("Accepted to tsi %d\n", new_wsi->tsi);
/* the transport is accepted... give him time to negotiate */
lws_set_timeout(new_wsi, PENDING_TIMEOUT_ESTABLISH_WITH_SERVER,
AWAITING_TIMEOUT);
#if LWS_POSIX == 0
mbed3_tcp_stream_accept(accept_fd, new_wsi);
#endif
/*
* A new connection was accepted. Give the user a chance to
* set properties of the newly created wsi. There's no protocol
* selected yet so we issue this to protocols[0]
*/
(context->protocols[0].callback)(new_wsi,
LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED,
NULL, NULL, 0);
lws_libev_accept(new_wsi, accept_fd);
if (!LWS_SSL_ENABLED(context)) {
#if LWS_POSIX
lwsl_debug("accepted new conn port %u on fd=%d\n",
ntohs(cli_addr.sin_port), accept_fd);
#else
/* not very beautiful... */
accept_fd = (lws_sockfd_type)pollfd;
#endif
if (insert_wsi_socket_into_fds(context, new_wsi))
goto fail;
}
break;
/*
* look at who we connected to and give user code a chance
* to reject based on client IP. There's no protocol selected
* yet so we issue this to protocols[0]
*/
if ((context->protocols[0].callback)(wsi,
LWS_CALLBACK_FILTER_NETWORK_CONNECTION,
NULL, (void *)(long)accept_fd, 0)) {
lwsl_debug("Callback denied network connection\n");
compatible_close(accept_fd);
break;
}
if (!lws_adopt_socket(context, accept_fd))
/* already closed cleanly as necessary */
return 1;
#if LWS_POSIX
} while (pt->fds_count < context->fd_limit_per_thread - 1 &&
lws_poll_listen_fd(&pt->fds[wsi->position_in_fds_table]) > 0);
#endif
return 0;
default:
break;
}
if (!lws_server_socket_service_ssl(&wsi, new_wsi, accept_fd, pollfd))
if (!lws_server_socket_service_ssl(wsi, accept_fd))
return 0;
fail:

View file

@ -294,6 +294,9 @@ notify:
int
lws_service_timeout_check(struct lws *wsi, unsigned int sec)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
struct lws **pwsi;
/*
* if extensions want in on it (eg, we are a mux parent)
* give them a chance to service child timeouts
@ -309,8 +312,25 @@ lws_service_timeout_check(struct lws *wsi, unsigned int sec)
* connection
*/
if ((time_t)sec > wsi->pending_timeout_limit) {
lwsl_info("wsi %p: TIMEDOUT WAITING on %d\n",
(void *)wsi, wsi->pending_timeout);
#if LWS_POSIX
lwsl_notice("wsi %p: TIMEDOUT WAITING on %d (did hdr %d, ah %p, wl %d, pfd events %d)\n",
(void *)wsi, wsi->pending_timeout,
wsi->hdr_parsing_completed, wsi->u.hdr.ah,
pt->ah_wait_list_length,
pt->fds[wsi->sock].events);
#endif
lws_pt_lock(pt);
pwsi = &pt->ah_wait_list;
while (*pwsi) {
if (*pwsi == wsi)
break;
pwsi = &(*pwsi)->u.hdr.ah_wait_list;
}
lws_pt_unlock(pt);
if (!*pwsi)
lwsl_err("*** not on ah wait list ***\n");
/*
* Since he failed a timeout, he already had a chance to do
* something and was unable to... that includes situations like
@ -372,9 +392,6 @@ int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len)
LWS_VISIBLE int
lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int tsi)
{
#if LWS_POSIX
int idx = 0;
#endif
struct lws_context_per_thread *pt = &context->pt[tsi];
lws_sockfd_type our_fd = 0, tmp_fd;
struct lws_tokens eff_buf;
@ -386,10 +403,6 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
int n, m;
int more;
#if LWS_POSIX
if (context->lserv_fd)
idx = wsi_from_fd(context, context->lserv_fd)->position_in_fds_table;
#endif
/*
* you can call us with pollfd = NULL to just allow the once-per-second
* global timeout checks; if less than a second since the last check
@ -409,7 +422,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
if (pollfd)
our_fd = pollfd->fd;
wsi = context->timeout_list;
wsi = context->pt[tsi].timeout_list;
while (wsi) {
/* we have to take copies, because he may be deleted */
wsi1 = wsi->timeout_list;
@ -423,6 +436,18 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
}
wsi = wsi1;
}
#if 1
{
char s[300], *p = s;
for (n = 0; n < context->count_threads; n++)
p += sprintf(p, " %7lu (%5d), ",
context->pt[n].count_conns,
context->pt[n].fds_count);
lwsl_notice("load: %s\n", s);
}
#endif
}
/* the socket we came to service timed out, nothing to do */
@ -445,44 +470,10 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t
*/
#if LWS_POSIX
/*
* deal with listen service piggybacking
* every lserv_mod services of other fds, we
* sneak one in to service the listen socket if there's anything waiting
*
* To handle connection storms, as found in ab, if we previously saw a
* pending connection here, it causes us to check again next time.
*/
if (context->lserv_fd && pollfd != &pt->fds[idx]) {
context->lserv_count++;
if (context->lserv_seen ||
context->lserv_count == context->lserv_mod) {
context->lserv_count = 0;
m = 1;
if (context->lserv_seen > 5)
m = 2;
while (m--) {
/*
* even with extpoll, we prepared this
* internal fds for listen
*/
n = lws_poll_listen_fd(&pt->fds[idx]);
if (n <= 0) {
if (context->lserv_seen)
context->lserv_seen--;
break;
}
/* there's a conn waiting for us */
lws_service_fd(context, &pt->fds[idx]);
context->lserv_seen++;
}
}
}
/* handle session socket closed */
if ((!(pollfd->revents & LWS_POLLIN)) &&
if ((!(pollfd->revents & pollfd->events & LWS_POLLIN)) &&
(pollfd->revents & LWS_POLLHUP)) {
lwsl_debug("Session Socket %p (fd=%d) dead\n",

View file

@ -592,11 +592,8 @@ lws_ssl_close(struct lws *wsi)
/* leave all wsi close processing to the caller */
LWS_VISIBLE int
lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi,
lws_sockfd_type accept_fd,
struct lws_pollfd *pollfd)
lws_server_socket_service_ssl(struct lws *wsi, lws_sockfd_type accept_fd)
{
struct lws *wsi = *pwsi;
struct lws_context *context = wsi->context;
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
int n, m;
@ -606,43 +603,41 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi,
if (!LWS_SSL_ENABLED(context))
return 0;
lwsl_err("%s: mode %d, state %d\n", __func__, wsi->mode, wsi->state);
switch (wsi->mode) {
case LWSCM_SERVER_LISTENER:
case LWSCM_SSL_INIT:
if (!new_wsi) {
lwsl_err("no new_wsi\n");
if (!wsi)
return 0;
}
new_wsi->ssl = SSL_new(context->ssl_ctx);
if (new_wsi->ssl == NULL) {
wsi->ssl = SSL_new(context->ssl_ctx);
if (wsi->ssl == NULL) {
lwsl_err("SSL_new failed: %s\n",
ERR_error_string(SSL_get_error(new_wsi->ssl, 0), NULL));
ERR_error_string(SSL_get_error(wsi->ssl, 0), NULL));
lws_decode_ssl_error();
compatible_close(accept_fd);
goto fail;
}
SSL_set_ex_data(new_wsi->ssl,
SSL_set_ex_data(wsi->ssl,
openssl_websocket_private_data_index, context);
SSL_set_fd(new_wsi->ssl, accept_fd);
SSL_set_fd(wsi->ssl, accept_fd);
#ifdef USE_WOLFSSL
#ifdef USE_OLD_CYASSL
CyaSSL_set_using_nonblock(new_wsi->ssl, 1);
CyaSSL_set_using_nonblock(wsi->ssl, 1);
#else
wolfSSL_set_using_nonblock(new_wsi->ssl, 1);
wolfSSL_set_using_nonblock(wsi->ssl, 1);
#endif
#else
SSL_set_mode(new_wsi->ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
bio = SSL_get_rbio(new_wsi->ssl);
SSL_set_mode(wsi->ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
bio = SSL_get_rbio(wsi->ssl);
if (bio)
BIO_set_nbio(bio, 1); /* nonblocking */
else
lwsl_notice("NULL rbio\n");
bio = SSL_get_wbio(new_wsi->ssl);
bio = SSL_get_wbio(wsi->ssl);
if (bio)
BIO_set_nbio(bio, 1); /* nonblocking */
else
@ -655,8 +650,6 @@ lws_server_socket_service_ssl(struct lws **pwsi, struct lws *new_wsi,
* pieces come if we're not sorted yet
*/
*pwsi = new_wsi;
wsi = *pwsi;
wsi->mode = LWSCM_SSL_ACK_PENDING;
if (insert_wsi_socket_into_fds(context, wsi))
goto fail;
@ -754,7 +747,7 @@ go_again:
break;
}
lwsl_debug("SSL_accept failed skt %u: %s\n",
pollfd->fd, ERR_error_string(m, NULL));
wsi->sock, ERR_error_string(m, NULL));
goto fail;
accepted:

View file

@ -35,6 +35,8 @@
* using this protocol, including the sender
*/
extern int debug_level;
enum demo_protocols {
/* always first */
PROTOCOL_HTTP = 0,
@ -117,8 +119,8 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
{
struct per_session_data__http *pss =
(struct per_session_data__http *)user;
static unsigned char buffer[4096];
unsigned long amount, file_len;
unsigned char buffer[4096 + LWS_PRE];
unsigned long amount, file_len, sent;
char leaf_path[1024];
const char *mimetype;
char *other_headers;
@ -136,15 +138,16 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
switch (reason) {
case LWS_CALLBACK_HTTP:
dump_handshake_info(wsi);
if (debug_level & LLL_INFO) {
dump_handshake_info(wsi);
/* dump the individual URI Arg parameters */
n = 0;
while (lws_hdr_copy_fragment(wsi, buf, sizeof(buf),
WSI_TOKEN_HTTP_URI_ARGS, n) > 0) {
lwsl_info("URI Arg %d: %s\n", ++n, buf);
/* dump the individual URI Arg parameters */
n = 0;
while (lws_hdr_copy_fragment(wsi, buf, sizeof(buf),
WSI_TOKEN_HTTP_URI_ARGS, n) > 0) {
lwsl_info("URI Arg %d: %s\n", ++n, buf);
}
}
if (len < 1) {
lws_return_http_status(wsi,
HTTP_STATUS_BAD_REQUEST, NULL);
@ -153,8 +156,7 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
/* this example server has no concept of directories */
if (strchr((const char *)in + 1, '/')) {
lws_return_http_status(wsi,
HTTP_STATUS_FORBIDDEN, NULL);
lws_return_http_status(wsi, HTTP_STATUS_FORBIDDEN, NULL);
goto try_to_reuse;
}
@ -177,8 +179,10 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
pss->fd = lws_plat_file_open(wsi, leaf_path, &file_len,
LWS_O_RDONLY);
if (pss->fd == LWS_INVALID_FILE)
if (pss->fd == LWS_INVALID_FILE) {
lwsl_err("faild to open file %s\n", leaf_path);
return -1;
}
/*
* we will send a big jpeg file, but it could be
@ -219,9 +223,11 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
* this is mandated by changes in HTTP2
*/
*p = '\0';
lwsl_info("%s\n", buffer + LWS_PRE);
n = lws_write(wsi, buffer + LWS_PRE, p - (buffer + LWS_PRE),
LWS_WRITE_HTTP_HEADERS);
if (n < 0) {
lws_plat_file_close(wsi, pss->fd);
return -1;
@ -284,7 +290,6 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
* we'll get a LWS_CALLBACK_HTTP_FILE_COMPLETION callback when
* it's done
*/
break;
case LWS_CALLBACK_HTTP_BODY:
@ -308,9 +313,15 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
goto try_to_reuse;
case LWS_CALLBACK_HTTP_WRITEABLE:
lwsl_info("LWS_CALLBACK_HTTP_WRITEABLE\n");
if (pss->fd == LWS_INVALID_FILE)
goto try_to_reuse;
/*
* we can send more of whatever it is we were sending
*/
sent = 0;
do {
/* we'd like the send this much */
n = sizeof(buffer) - LWS_PRE;
@ -328,15 +339,16 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
n = m;
n = lws_plat_file_read(wsi, pss->fd,
&amount, buffer +
LWS_PRE, n);
&amount, buffer + LWS_PRE, n);
/* problem reading, close conn */
if (n < 0)
if (n < 0) {
lwsl_err("problem reading file\n");
goto bail;
}
n = (int)amount;
/* sent it all, close conn */
if (n == 0)
goto flush_bail;
goto penultimate;
/*
* To support HTTP2, must take care about preamble space
*
@ -344,46 +356,28 @@ int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user,
* is handled by the library itself if you sent a
* content-length header
*/
m = lws_write(wsi, buffer + LWS_PRE,
n, LWS_WRITE_HTTP);
if (m < 0)
m = lws_write(wsi, buffer + LWS_PRE, n, LWS_WRITE_HTTP);
if (m < 0) {
lwsl_err("write failed\n");
/* write failed, close conn */
goto bail;
/*
* http2 won't do this
*/
if (m != n)
/* partial write, adjust */
if (lws_plat_file_seek_cur(wsi, pss->fd, m - n) ==
(unsigned long)-1)
goto bail;
}
if (m) /* while still active, extend timeout */
lws_set_timeout(wsi,
PENDING_TIMEOUT_HTTP_CONTENT, 5);
/* if we have indigestion, let him clear it
* before eating more */
if (lws_partial_buffered(wsi))
break;
} while (!lws_send_pipe_choked(wsi));
lws_set_timeout(wsi, PENDING_TIMEOUT_HTTP_CONTENT, 5);
sent += m;
} while (!lws_send_pipe_choked(wsi) && (sent < 1024 * 1024));
later:
lws_callback_on_writable(wsi);
break;
flush_bail:
/* true if still partial pending */
if (lws_partial_buffered(wsi)) {
lws_callback_on_writable(wsi);
break;
}
penultimate:
lws_plat_file_close(wsi, pss->fd);
pss->fd = LWS_INVALID_FILE;
goto try_to_reuse;
bail:
lws_plat_file_close(wsi, pss->fd);
return -1;
/*

View file

@ -23,6 +23,7 @@
int close_testing;
int max_poll_elements;
int debug_level = 7;
volatile int force_exit = 0;
struct lws_context *context;
struct lws_plat_file_ops fops_plat;
@ -192,7 +193,6 @@ int main(int argc, char **argv)
ev_timer timeout_watcher;
char cert_path[1024];
char key_path[1024];
int debug_level = 7;
int use_ssl = 0;
int opts = 0;
int n = 0;

View file

@ -22,7 +22,7 @@
/* lws-mirror_protocol */
#define MAX_MESSAGE_QUEUE 32
#define MAX_MESSAGE_QUEUE 512
struct a_message {
void *payload;
@ -38,7 +38,7 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
{
struct per_session_data__lws_mirror *pss =
(struct per_session_data__lws_mirror *)user;
int n;
int n, m;
switch (reason) {
@ -59,19 +59,16 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
if (close_testing)
break;
while (pss->ringbuffer_tail != ringbuffer_head) {
m = ringbuffer[pss->ringbuffer_tail].len;
n = lws_write(wsi, (unsigned char *)
ringbuffer[pss->ringbuffer_tail].payload +
LWS_PRE,
ringbuffer[pss->ringbuffer_tail].len,
LWS_WRITE_TEXT);
LWS_PRE, m, LWS_WRITE_TEXT);
if (n < 0) {
lwsl_err("ERROR %d writing to mirror socket\n", n);
return -1;
}
if (n < (int)ringbuffer[pss->ringbuffer_tail].len)
lwsl_err("mirror partial write %d vs %d\n",
n, ringbuffer[pss->ringbuffer_tail].len);
if (n < m)
lwsl_err("mirror partial write %d vs %d\n", n, m);
if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1))
pss->ringbuffer_tail = 0;
@ -83,8 +80,7 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
lws_rx_flow_allow_all_protocol(lws_get_context(wsi),
lws_get_protocol(wsi));
if (lws_partial_buffered(wsi) ||
lws_send_pipe_choked(wsi)) {
if (lws_send_pipe_choked(wsi)) {
lws_callback_on_writable(wsi);
break;
}
@ -101,11 +97,10 @@ callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason,
if (ringbuffer[ringbuffer_head].payload)
free(ringbuffer[ringbuffer_head].payload);
ringbuffer[ringbuffer_head].payload =
malloc(LWS_PRE + len);
ringbuffer[ringbuffer_head].payload = malloc(LWS_PRE + len);
ringbuffer[ringbuffer_head].len = len;
memcpy((char *)ringbuffer[ringbuffer_head].payload +
LWS_PRE, in, len);
LWS_PRE, in, len);
if (ringbuffer_head == (MAX_MESSAGE_QUEUE - 1))
ringbuffer_head = 0;
else

View file

@ -24,6 +24,7 @@
int close_testing;
int max_poll_elements;
int debug_level = 7;
#ifdef EXTERNAL_POLL
struct lws_pollfd *pollfds;
@ -185,7 +186,6 @@ int main(int argc, char **argv)
pthread_t pthread_dumb, pthread_service[32];
char cert_path[1024];
char key_path[1024];
int debug_level = 7;
int threads = 1;
int use_ssl = 0;
void *retval;
@ -335,6 +335,7 @@ int main(int argc, char **argv)
info.options = opts;
info.count_threads = threads;
info.extensions = exts;
info.max_http_header_pool = 4;
context = lws_create_context(&info);
if (context == NULL) {

View file

@ -23,6 +23,7 @@
int close_testing;
int max_poll_elements;
int debug_level = 7;
#ifdef EXTERNAL_POLL
struct lws_pollfd *pollfds;
@ -171,7 +172,6 @@ int main(int argc, char **argv)
const char *iface = NULL;
char cert_path[1024];
char key_path[1024];
int debug_level = 7;
int use_ssl = 0;
int opts = 0;
int n = 0;