diff --git a/.travis.yml b/.travis.yml index cf439b4de..be87f71cf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,6 +19,7 @@ env: - LWS_METHOD=nologs CMAKE_ARGS="-DLWS_WITH_NO_LOGS=ON" - LWS_METHOD=smp CMAKE_ARGS="-DLWS_MAX_SMP=32 -DLWS_WITH_MINIMAL_EXAMPLES=1" - LWS_METHOD=nows CMAKE_ARGS="-DLWS_ROLE_WS=0" + - LWS_METHOD=threadpool CMAKE_ARGS="-DLWS_WITH_THREADPOOL=1 -DLWS_WITH_MINIMAL_EXAMPLES=1" os: - linux diff --git a/CMakeLists.txt b/CMakeLists.txt index 51c85c52d..bec9b4027 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,6 +37,7 @@ option(LWS_WITH_PEER_LIMITS "Track peers and restrict resources a single peer ca option(LWS_WITH_ACCESS_LOG "Support generating Apache-compatible access logs" OFF) option(LWS_WITH_RANGES "Support http ranges (RFC7233)" OFF) option(LWS_WITH_SERVER_STATUS "Support json + jscript server monitoring" OFF) +option(LWS_WITH_THREADPOOL "Managed worker thread pool support (relies on pthreads)" ON) option(LWS_WITH_HTTP_STREAM_COMPRESSION "Support HTTP stream compression" OFF) option(LWS_WITH_HTTP_BROTLI "Also offer brotli http stream compression (requires LWS_WITH_HTTP_STREAM_COMPRESSION)" OFF) option(LWS_WITH_ACME "Enable support for ACME automatic cert acquisition + maintenance (letsencrypt etc)" OFF) @@ -289,6 +290,7 @@ endif() if (WIN32) set(LWS_MAX_SMP 1) +set(LWS_WITH_THREADPOOL 0) endif() @@ -749,6 +751,10 @@ set(SOURCES lib/misc/base64-decode.c lib/misc/lws-ring.c lib/roles/pipe/ops-pipe.c) + +if (LWS_WITH_THREADPOOL AND UNIX AND LWS_HAVE_PTHREAD_H) + list(APPEND SOURCES lib/misc/threadpool/threadpool.c) +endif() if (LWS_ROLE_H1 OR LWS_ROLE_H2) list(APPEND SOURCES diff --git a/README.md b/README.md index d2c67b3a7..0eab09c71 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,10 @@ News ---- +## v3.0.1 released + +See the git log for the list of fixes. + ## v3.0.0 released See the changelog for info https://libwebsockets.org/git/libwebsockets/tree/changelog?h=v3.0-stable diff --git a/cmake/lws_config.h.in b/cmake/lws_config.h.in index 6c6418c04..a8e62901f 100644 --- a/cmake/lws_config.h.in +++ b/cmake/lws_config.h.in @@ -182,5 +182,6 @@ #cmakedefine LWS_WITH_HTTP_STREAM_COMPRESSION #cmakedefine LWS_WITH_HTTP_BROTLI +#cmakedefine LWS_WITH_THREADPOOL ${LWS_SIZEOFPTR_CODE} diff --git a/doc-assets/threadpool-states.svg b/doc-assets/threadpool-states.svg new file mode 100644 index 000000000..024c03f1b --- /dev/null +++ b/doc-assets/threadpool-states.svg @@ -0,0 +1,153 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + image/svg+xml + + + + + + + + + + + + queued + + running + + finished + + stopped + + stopping + + + + + + + + + threadpoolworkerthreadtakestask + workerproducesa bufferof output + + + nothingmoreto do + buffersent onand moreto do + problemssending + + + sync + + + free task + + + + sync + worker thread context + lws service thread context + + + wait untilthe lwsservicethreadknows thetask is done + + + + newwsi onmount + + protocol_HTTPcallback + + + enqueue threadpool task + + + + protocolWRITEABLE + cancel service + + + + + protocolWRITEABLE + cancel service + + + + + lws_threadpool_task_sync + lws_threadpool_task_status_wsi + move to"done queue"idlingworker thread + + + + wsi hasunexpect-edly gone + write thebuffer onthe wsi + acknowledgethe task hasended + Threadpool + + + + + + + synchronization with the lws service thread(syncs to the correct service thread for the wsi) + + diff --git a/doc-assets/threadpool.svg b/doc-assets/threadpool.svg new file mode 100644 index 000000000..08a4f6a64 --- /dev/null +++ b/doc-assets/threadpool.svg @@ -0,0 +1,2 @@ + +Worker threadsTask queueDone queueSYNCLWSServiceThreadreapAll communication with tasks happensin lws service thread context, via theWRITEABLE callbacktasktasktasktasktasktasktasktasktaskwsi (may be detached)task function pointercleanup function pointeruser private pointerThreadpoolenqueue diff --git a/include/libwebsockets.h b/include/libwebsockets.h index 6080c6a30..ebea94506 100644 --- a/include/libwebsockets.h +++ b/include/libwebsockets.h @@ -407,6 +407,7 @@ struct lws; #include #include #include +#include #if defined(LWS_WITH_TLS) diff --git a/include/libwebsockets/lws-callbacks.h b/include/libwebsockets/lws-callbacks.h index dbf547ad7..c0a65cbb8 100644 --- a/include/libwebsockets/lws-callbacks.h +++ b/include/libwebsockets/lws-callbacks.h @@ -729,8 +729,7 @@ enum lws_callback_reasons { * these callbacks. The deadline can be continuously extended into the * future by later calls to lws_set_timer_usecs() before the deadline * expires, or cancelled by lws_set_timer_usecs(wsi, -1); - * See the note on lws_set_timer_usecs() about which event loops are - * supported. */ + */ LWS_CALLBACK_EVENT_WAIT_CANCELLED = 71, /**< This is sent to every protocol of every vhost in response diff --git a/include/libwebsockets/lws-context-vhost.h b/include/libwebsockets/lws-context-vhost.h index 76bd3b681..44a6ed91b 100644 --- a/include/libwebsockets/lws-context-vhost.h +++ b/include/libwebsockets/lws-context-vhost.h @@ -867,33 +867,5 @@ struct lws_http_mount { void *_unused[2]; /**< dummy */ }; -/** - * lws_http_compression_apply() - apply an http compression transform - * - * \param wsi: the wsi to apply the compression transform to - * \param name: NULL, or the name of the compression transform, eg, "deflate" - * \param p: pointer to pointer to headers buffer - * \param end: pointer to end of headers buffer - * \param decomp: 0 = add compressor to wsi, 1 = add decompressor - * - * This allows transparent compression of dynamically generated HTTP. The - * requested compression (eg, "deflate") is only applied if the client headers - * indicated it was supported (and it has support in lws), otherwise it's a NOP. - * - * If the requested compression method is NULL, then the supported compression - * formats are tried, and for non-decompression (server) mode the first that's - * found on the client's accept-encoding header is chosen. - * - * NOTE: the compression transform, same as h2 support, relies on the user - * code using LWS_WRITE_HTTP and then LWS_WRITE_HTTP_FINAL on the last part - * written. The internal lws fileserving code already does this. - * - * If the library was built without the cmake option - * LWS_WITH_HTTP_STREAM_COMPRESSION set, then a NOP is provided for this api, - * allowing user code to build either way and use compression if available. - */ -LWS_VISIBLE int -lws_http_compression_apply(struct lws *wsi, const char *name, - unsigned char **p, unsigned char *end, char decomp); ///@} ///@} diff --git a/include/libwebsockets/lws-http.h b/include/libwebsockets/lws-http.h index aad6cbe41..97a2c712c 100644 --- a/include/libwebsockets/lws-http.h +++ b/include/libwebsockets/lws-http.h @@ -646,5 +646,34 @@ lws_http_redirect(struct lws *wsi, int code, const unsigned char *loc, int len, */ LWS_VISIBLE LWS_EXTERN int LWS_WARN_UNUSED_RESULT lws_http_transaction_completed(struct lws *wsi); + +/** + * lws_http_compression_apply() - apply an http compression transform + * + * \param wsi: the wsi to apply the compression transform to + * \param name: NULL, or the name of the compression transform, eg, "deflate" + * \param p: pointer to pointer to headers buffer + * \param end: pointer to end of headers buffer + * \param decomp: 0 = add compressor to wsi, 1 = add decompressor + * + * This allows transparent compression of dynamically generated HTTP. The + * requested compression (eg, "deflate") is only applied if the client headers + * indicated it was supported (and it has support in lws), otherwise it's a NOP. + * + * If the requested compression method is NULL, then the supported compression + * formats are tried, and for non-decompression (server) mode the first that's + * found on the client's accept-encoding header is chosen. + * + * NOTE: the compression transform, same as h2 support, relies on the user + * code using LWS_WRITE_HTTP and then LWS_WRITE_HTTP_FINAL on the last part + * written. The internal lws fileserving code already does this. + * + * If the library was built without the cmake option + * LWS_WITH_HTTP_STREAM_COMPRESSION set, then a NOP is provided for this api, + * allowing user code to build either way and use compression if available. + */ +LWS_VISIBLE int +lws_http_compression_apply(struct lws *wsi, const char *name, + unsigned char **p, unsigned char *end, char decomp); ///@} diff --git a/include/libwebsockets/lws-threadpool.h b/include/libwebsockets/lws-threadpool.h new file mode 100644 index 000000000..258ee1abc --- /dev/null +++ b/include/libwebsockets/lws-threadpool.h @@ -0,0 +1,225 @@ +/* + * libwebsockets - small server side websockets and web server implementation + * + * Copyright (C) 2010-2018 Andy Green + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation: + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301 USA + * + * included from libwebsockets.h + */ + +/** \defgroup threadpool Threadpool related functions + * ##Threadpool + * \ingroup lwsapi + * + * This allows you to create one or more pool of threads which can run tasks + * associated with a wsi. If the pool is busy, tasks wait on a queue. + * + * Tasks don't have to be atomic, if they will take more than a few tens of ms + * they should return back to the threadpool worker with a return of 0. This + * will allow them to abort cleanly. + */ +//@{ + +struct lws_threadpool; +struct lws_threadpool_task; + +enum lws_threadpool_task_status { + LWS_TP_STATUS_QUEUED, + LWS_TP_STATUS_RUNNING, + LWS_TP_STATUS_SYNCING, + LWS_TP_STATUS_STOPPING, + LWS_TP_STATUS_FINISHED, /* lws_threadpool_task_status() frees task */ + LWS_TP_STATUS_STOPPED, /* lws_threadpool_task_status() frees task */ +}; + +enum lws_threadpool_task_return { + /** Still work to do, just confirming not being stopped */ + LWS_TP_RETURN_CHECKING_IN, + /** Still work to do, enter cond_wait until service thread syncs. This + * is used if you have filled your buffer(s) of data to the service + * thread and are blocked until the service thread completes sending at + * least one. + */ + LWS_TP_RETURN_SYNC, + /** No more work to do... */ + LWS_TP_RETURN_FINISHED, + /** Responding to request to stop */ + LWS_TP_RETURN_STOPPED +}; + +struct lws_threadpool_create_args { + int threads; + int max_queue_depth; +}; + +struct lws_threadpool_task_args { + struct lws *wsi; /**< user must set to wsi task is bound to */ + void *user; /**< user may set (user-private pointer) */ + const char *name; /**< user may set to describe task */ + enum lws_threadpool_task_return (*task)(void *user, + enum lws_threadpool_task_status s); + /**< user must set to actual task function */ + void (*cleanup)(struct lws *wsi, void *user); + /**< socket lifecycle may end while task is not stoppable, so the task + * must be able to detach from any wsi and clean itself up when it does + * stop. If NULL, no cleanup necessary, otherwise point to a user- + * supplied function that destroys the stuff in \p user. + * + * wsi may be NULL on entry, indicating the task got detached due to the + * wsi closing before. + */ +}; + +/** + * lws_threadpool_create() - create a pool of worker threads + * + * \param context: the lws_context the threadpool will exist inside + * \param args: argument struct prepared by caller + * \param format: printf-type format for the task name + * \param ...: printf type args for the task name format + * + * Creates a pool of worker threads with \p threads and a queue of up to + * \p max_queue_depth waiting tasks if all the threads are busy. + * + * Returns NULL if OOM, or a struct lws_threadpool pointer that must be + * destroyed by lws_threadpool_destroy(). + */ +LWS_VISIBLE LWS_EXTERN struct lws_threadpool * +lws_threadpool_create(struct lws_context *context, + const struct lws_threadpool_create_args *args, + const char *format, ...) LWS_FORMAT(3); + +/** + * lws_threadpool_finish() - Stop all pending and running tasks + * + * \param tp: the threadpool object + * + * Marks the threadpool as under destruction. Removes everything from the + * pending queue and completes those tasks as LWS_TP_STATUS_STOPPED. + * + * Running tasks will also get LWS_TP_STATUS_STOPPED as soon as they + * "resurface". + * + * This doesn't reap tasks or free the threadpool, the reaping is done by the + * lws_threadpool_task_status() on the done task. + */ +LWS_VISIBLE LWS_EXTERN void +lws_threadpool_finish(struct lws_threadpool *tp); + +/** + * lws_threadpool_destroy() - Destroy a threadpool + * + * \param tp: the threadpool object + * + * Waits for all worker threads to stop, ends the threads and frees the tp. + */ +LWS_VISIBLE LWS_EXTERN void +lws_threadpool_destroy(struct lws_threadpool *tp); + +/** + * lws_threadpool_enqueue() - Queue the task and run it on a worker thread when possible + * + * \param tp: the threadpool to queue / run on + * \param args: information about what to run + * \param format: printf-type format for the task name + * \param ...: printf type args for the task name format + * + * This asks for a task to run ASAP on a worker thread in threadpool \p tp. + * + * The args defines the wsi, a user-private pointer, a timeout in secs and + * a pointer to the task function. + * + * Returns NULL or an opaque pointer to the queued (or running, or completed) + * task. + * + * Once a task is created and enqueued, it can only be destroyed by calling + * lws_threadpool_task_status() on it after it has reached the state + * LWS_TP_STATUS_FINISHED or LWS_TP_STATUS_STOPPED. + */ +LWS_VISIBLE LWS_EXTERN struct lws_threadpool_task * +lws_threadpool_enqueue(struct lws_threadpool *tp, + const struct lws_threadpool_task_args *args, + const char *format, ...) LWS_FORMAT(3); + +/** + * lws_threadpool_dequeue() - Dequeue or try to stop a running task + * + * \param wsi: the wsi whose current task we want to eliminate + * + * Returns 0 is the task was dequeued or already compeleted, or 1 if the task + * has been asked to stop asynchronously. + * + * This doesn't free the task. It only shortcuts it to state + * LWS_TP_STATUS_STOPPED. lws_threadpool_task_status() must be performed on + * the task separately once it is in LWS_TP_STATUS_STOPPED to free the task. + */ +LWS_VISIBLE LWS_EXTERN int +lws_threadpool_dequeue(struct lws *wsi); + +/** + * lws_threadpool_task_status() - Dequeue or try to stop a running task + * + * \param wsi: the wsi to query the current task of + * \param task: receives a pointer to the opaque task + * \param user: receives a void * pointer to the task user data + * + * This is the equivalent of posix waitpid()... it returns the status of the + * task, and if the task is in state LWS_TP_STATUS_FINISHED or + * LWS_TP_STATUS_STOPPED, frees \p task. If in another state, the task + * continues to exist. + * + * This is designed to be called from the service thread. + * + * Its use is to make sure the service thread has seen the state of the task + * before deleting it. + */ +LWS_VISIBLE LWS_EXTERN enum lws_threadpool_task_status +lws_threadpool_task_status_wsi(struct lws *wsi, + struct lws_threadpool_task **task, void **user); + +/** + * lws_threadpool_task_sync() - Indicate to a stalled task it may continue + * + * \param task: the task to unblock + * \param stop: 0 = run after unblock, 1 = when he unblocks, stop him + * + * Inform the task that the service thread has finished with the shared data + * and that the task, if blocked in LWS_TP_RETURN_SYNC, may continue. + * + * If the lws service context determined that the task must be aborted, it + * should still call this but with stop = 1, causing the task to finish. + */ +LWS_VISIBLE LWS_EXTERN void +lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop); + +/** + * lws_threadpool_dump() - dump the state of a threadpool to the log + * + * \param tp: The threadpool to dump + * + * This locks the threadpool and then dumps the pending queue, the worker + * threads and the done queue, together with time information for how long + * the tasks have been in their current state, how long they have occupied a + * thread, etc. + * + * This only does anything on lws builds with CMAKE_BUILD_TYPE=DEBUG, otherwise + * while it still exists, it's a NOP. + */ + +LWS_VISIBLE LWS_EXTERN void +lws_threadpool_dump(struct lws_threadpool *tp); +//@} diff --git a/include/libwebsockets/lws-timeout-timer.h b/include/libwebsockets/lws-timeout-timer.h index 2631b71cf..b7f04e582 100644 --- a/include/libwebsockets/lws-timeout-timer.h +++ b/include/libwebsockets/lws-timeout-timer.h @@ -61,6 +61,8 @@ enum pending_timeout { PENDING_TIMEOUT_UDP_IDLE = 26, PENDING_TIMEOUT_CLIENT_CONN_IDLE = 27, PENDING_TIMEOUT_LAGGING = 28, + PENDING_TIMEOUT_THREADPOOL = 29, + PENDING_TIMEOUT_THREADPOOL_TASK = 30, /****** add new things just above ---^ ******/ diff --git a/lib/core/connect.c b/lib/core/connect.c index c5aa1b1de..6e730f3da 100644 --- a/lib/core/connect.c +++ b/lib/core/connect.c @@ -147,7 +147,7 @@ lws_client_connect_via_info(const struct lws_client_connect_info *i) lwsl_info("%s: protocol binding to %s\n", __func__, local); p = lws_vhost_name_to_protocol(wsi->vhost, local); if (p) - lws_bind_protocol(wsi, p); + lws_bind_protocol(wsi, p, __func__); } /* diff --git a/lib/core/libwebsockets.c b/lib/core/libwebsockets.c index b3a831400..91ce98c40 100644 --- a/lib/core/libwebsockets.c +++ b/lib/core/libwebsockets.c @@ -537,7 +537,7 @@ lws_remove_child_from_any_parent(struct lws *wsi) } int -lws_bind_protocol(struct lws *wsi, const struct lws_protocols *p) +lws_bind_protocol(struct lws *wsi, const struct lws_protocols *p, const char *reason) { // if (wsi->protocol == p) // return 0; @@ -546,7 +546,7 @@ lws_bind_protocol(struct lws *wsi, const struct lws_protocols *p) if (wsi->protocol && wsi->protocol_bind_balance) { wsi->protocol->callback(wsi, wsi->role_ops->protocol_unbind_cb[!!lwsi_role_server(wsi)], - wsi->user_space, NULL, 0); + wsi->user_space, (void *)reason, 0); wsi->protocol_bind_balance = 0; } if (!wsi->user_space_externally_allocated) @@ -759,7 +759,7 @@ __lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason, const char * wsi->protocol->callback(wsi, wsi->role_ops->protocol_unbind_cb[ !!lwsi_role_server(wsi)], - wsi->user_space, NULL, 0); + wsi->user_space, (void *)__func__, 0); wsi->protocol_bind_balance = 0; } @@ -794,7 +794,7 @@ just_kill_connection: wsi->protocol->callback(wsi, wsi->role_ops->protocol_unbind_cb[ !!lwsi_role_server(wsi)], - wsi->user_space, NULL, 0); + wsi->user_space, (void *)__func__, 0); wsi->protocol_bind_balance = 0; } diff --git a/lib/core/output.c b/lib/core/output.c index a10d6a3b5..d43735ade 100644 --- a/lib/core/output.c +++ b/lib/core/output.c @@ -29,7 +29,7 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len) struct lws_context *context = lws_get_context(wsi); struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; size_t real_len = len; - unsigned int n; + unsigned int n, m; // lwsl_notice("%s: len %d\n", __func__, (int)len); @@ -104,13 +104,15 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len) /* nope, send it on the socket directly */ lws_latency_pre(context, wsi); - n = lws_ssl_capable_write(wsi, buf, n); - lws_latency(context, wsi, "send lws_issue_raw", n, n == len); + m = lws_ssl_capable_write(wsi, buf, n); + lws_latency(context, wsi, "send lws_issue_raw", n, n == m); + + lwsl_info("%s: ssl_capable_write (%d) says %d\n", __func__, n, m); /* something got written, it can have been truncated now */ wsi->could_have_pending = 1; - switch (n) { + switch (m) { case LWS_SSL_CAPABLE_ERROR: /* we're going to close, let close know sends aren't possible */ wsi->socket_is_permanently_unusable = 1; @@ -121,7 +123,7 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len) * ie, implying treat it was a truncated send so it gets * retried */ - n = 0; + m = 0; break; } @@ -131,17 +133,17 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len) * send in the buflist. */ if (lws_has_buffered_out(wsi)) { - if (n) { - lwsl_info("%p partial adv %d (vs %ld)\n", wsi, n, + if (m) { + lwsl_info("%p partial adv %d (vs %ld)\n", wsi, m, (long)real_len); - lws_buflist_use_segment(&wsi->buflist_out, n); + lws_buflist_use_segment(&wsi->buflist_out, m); } if (!lws_has_buffered_out(wsi)) { lwsl_info("%s: wsi %p: buflist_out flushed\n", __func__, wsi); - n = (int)real_len; + m = (int)real_len; if (lwsi_state(wsi) == LRS_FLUSHING_BEFORE_CLOSE) { lwsl_info("** %p signalling to close now\n", wsi); return -1; /* retry closing now */ @@ -162,7 +164,7 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len) /* always callback on writeable */ lws_callback_on_writable(wsi); - return n; + return m; } #if defined(LWS_WITH_HTTP_STREAM_COMPRESSION) @@ -170,9 +172,9 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len) lws_callback_on_writable(wsi); #endif - if ((unsigned int)n == real_len) + if (m == real_len) /* what we just sent went out cleanly */ - return n; + return m; /* * We were not able to send everything... and we were not sending from @@ -180,13 +182,13 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len) * buffering the unsent remainder on it. * (it will get first priority next time the socket is writable). */ - lwsl_debug("%p new partial sent %d from %lu total\n", wsi, n, + lwsl_debug("%p new partial sent %d from %lu total\n", wsi, m, (unsigned long)real_len); - lws_buflist_append_segment(&wsi->buflist_out, buf + n, real_len - n); + lws_buflist_append_segment(&wsi->buflist_out, buf + m, real_len - m); lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_C_WRITE_PARTIALS, 1); - lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_B_PARTIALS_ACCEPTED_PARTS, n); + lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_B_PARTIALS_ACCEPTED_PARTS, m); #if !defined(LWS_WITH_ESP32) if (lws_wsi_is_udp(wsi)) { diff --git a/lib/core/pollfd.c b/lib/core/pollfd.c index 514ea255e..f2deea9ff 100644 --- a/lib/core/pollfd.c +++ b/lib/core/pollfd.c @@ -474,7 +474,7 @@ lws_same_vh_protocol_insert(struct lws *wsi, int n) { if (wsi->same_vh_protocol_prev || wsi->same_vh_protocol_next) { lws_same_vh_protocol_remove(wsi); - lwsl_notice("Attempted to attach wsi twice to same vh prot\n"); + lwsl_info("Attempted to attach wsi twice to same vh prot\n"); } lws_vhost_lock(wsi->vhost); diff --git a/lib/core/private.h b/lib/core/private.h index 612d02f18..af9741fd7 100644 --- a/lib/core/private.h +++ b/lib/core/private.h @@ -631,6 +631,11 @@ struct lws_context { struct lws_vhost *vhost_pending_destruction_list; struct lws_plugin *plugin_list; struct lws_deferred_free *deferred_free_list; + +#if defined(LWS_WITH_THREADPOOL) + struct lws_threadpool *tp_list_head; +#endif + #if defined(LWS_WITH_PEER_LIMITS) struct lws_peer **pl_hash_table; struct lws_peer *peer_wait_list; @@ -873,6 +878,10 @@ struct lws { struct lws_dll_lws dll_hrtimer; struct lws_dll_lws dll_buflist; /* guys with pending rxflow */ +#if defined(LWS_WITH_THREADPOOL) + struct lws_threadpool_task *tp_task; +#endif + #if defined(LWS_WITH_PEER_LIMITS) struct lws_peer *peer; #endif @@ -1350,7 +1359,8 @@ int lws_protocol_init(struct lws_context *context); int -lws_bind_protocol(struct lws *wsi, const struct lws_protocols *p); +lws_bind_protocol(struct lws *wsi, const struct lws_protocols *p, + const char *reason); const struct lws_http_mount * lws_find_mount(struct lws *wsi, const char *uri_ptr, int uri_len); @@ -1488,6 +1498,8 @@ hubbub_error html_parser_cb(const hubbub_token *token, void *pw); #endif +int +lws_threadpool_tsi_context(struct lws_context *context, int tsi); void __lws_remove_from_timeout_list(struct lws *wsi); diff --git a/lib/core/service.c b/lib/core/service.c index 3c4b8f106..9e6c5e709 100644 --- a/lib/core/service.c +++ b/lib/core/service.c @@ -99,7 +99,7 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd) wsi->http.comp_ctx.may_have_more) { enum lws_write_protocol wp = LWS_WRITE_HTTP; - lwsl_debug("%s: completing comp partial (buflist_comp %p, may %d)\n", + lwsl_info("%s: completing comp partial (buflist_comp %p, may %d)\n", __func__, wsi->http.comp_ctx.buflist_comp, wsi->http.comp_ctx.may_have_more ); @@ -334,9 +334,10 @@ lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi) { struct lws_context_per_thread *pt = &context->pt[tsi]; - /* Figure out if we really want to wait in poll() - * We only need to wait if really nothing already to do and we have - * to wait for something from network + /* + * Figure out if we really want to wait in poll()... we only need to + * wait if really nothing already to do and we have to wait for + * something from network */ #if defined(LWS_ROLE_WS) && !defined(LWS_WITHOUT_EXTENSIONS) /* 1) if we know we are draining rx ext, do not wait in poll */ @@ -347,11 +348,12 @@ lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi) /* 2) if we know we have non-network pending data, do not wait in poll */ if (pt->context->tls_ops && - pt->context->tls_ops->fake_POLLIN_for_buffered) - if (pt->context->tls_ops->fake_POLLIN_for_buffered(pt)) + pt->context->tls_ops->fake_POLLIN_for_buffered && + pt->context->tls_ops->fake_POLLIN_for_buffered(pt)) return 0; - /* 3) If there is any wsi with rxflow buffered and in a state to process + /* + * 3) If there is any wsi with rxflow buffered and in a state to process * it, we should not wait in poll */ @@ -361,6 +363,12 @@ lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi) if (lwsi_state(wsi) != LRS_DEFERRING_ACTION) return 0; + /* + * 4) If any guys with http compression to spill, we shouldn't wait in + * poll but hurry along and service them + */ + + } lws_end_foreach_dll(d); return timeout_ms; diff --git a/lib/misc/threadpool/README.md b/lib/misc/threadpool/README.md new file mode 100644 index 000000000..7d51c17e3 --- /dev/null +++ b/lib/misc/threadpool/README.md @@ -0,0 +1,175 @@ +## Threadpool + +### Overview + +![overview](/doc-assets/threadpool.svg) + +An api that lets you create a pool of worker threads, and a queue of tasks that +are bound to a wsi. Tasks in their own thread synchronize communication to the +lws service thread of the wsi via `LWS_CALLBACK_SERVER_WRITEABLE` and friends. + +Tasks can produce some output, then return that they want to "sync" with the +service thread. That causes a `LWS_CALLBACK_SERVER_WRITEABLE` in the service +thread context, where the output can be consumed, and the task told to continue, +or completed tasks be reaped. + +ALL of the details related to thread synchronization and an associated wsi in +the lws service thread context are handled by the threadpool api, without needing +any pthreads in user code. + +### Example + +https://libwebsockets.org/git/libwebsockets/tree/minimal-examples/ws-server/minimal-ws-server-threadpool + +### Lifecycle considerations + +#### Tasks vs wsi + +Although all tasks start out as being associated to a wsi, in fact the lifetime +of a task and that of the wsi are not necessarily linked. + +You may start a long task, eg, that runs atomically in its thread for 30s, and +at any time the client may close the connection, eg, close a browser window. + +There are arrangements that a task can "check in" periodically with lws to see +if it has been asked to stop, allowing the task lifetime to be related to the +wsi lifetime somewhat, but some tasks are going to be atomic and longlived. + +For that reason, at wsi close an ongoing task can detach from the wsi and +continue until it ends or understands it has been asked to stop. To make +that work, the task is created with a `cleanup` callback that performs any +freeing independent of still having a wsi around to do it... the task takes over +responsibility to free the user pointer on destruction when the task is created. + +![Threadpool States](/doc-assets/threadpool-states.svg) + +#### Reaping completed tasks + +Once created, although tasks may run asynchronously, the task itself does not +get destroyed on completion but added to a "done queue". Only when the lws +service thread context queries the task state with `lws_threadpool_task_status()` +may the task be reaped and memory freed. + +This is analogous to unix processes and `wait()`. + +If a task became detached from its wsi, then joining the done queue is enough +to get the task reaped, since there's nobody left any more to synchronize the +reaping with. + +### User interface + +The api is declared at https://libwebsockets.org/git/libwebsockets/tree/include/libwebsockets/lws-threadpool.h + +#### Threadpool creation / destruction + +The threadpool should be created at program or vhost init using +`lws_threadpool_create()` and destroyed on exit or vhost destruction using +first `lws_threadpool_finish()` and then `lws_threadpool_destroy()`. + +Threadpools should be named, varargs are provided on the create function +to facilite eg, naming the threadpool by the vhost it's associated with. + +Threadpool creation takes an args struct with the following members: + +Member|function +---|--- +threads|The maxiumum number of independent threads in the pool +max_queue_depth|The maximum number of tasks allowed to wait for a place in the pool + +#### Task creation / destruction + +Tasks are created and queued using `lws_threadpool_enqueue()`, this takes an +args struct with the following members + +Member|function +---|--- +wsi|The wsi the task is initially associated with +user|An opaque user-private pointer used for communication with the lws service thread and private state / data +task|A pointer to the function that will run in the pool thread +cleanup|A pointer to a function that will clean up finished or stopped tasks (perhaps freeing user) + +Tasks also should have a name, the creation function again provides varargs +to simplify naming the task with string elements related to who started it +and why. + +#### The task function itself + +The task function receives the task user pointer and the task state. The +possible task states are + +State|Meaning +---|--- +LWS_TP_STATUS_QUEUED|Task is still waiting for a pool thread +LWS_TP_STATUS_RUNNING|Task is supposed to do its work +LWS_TP_STATUS_SYNCING|Task is blocked waiting for sync from lws service thread +LWS_TP_STATUS_STOPPING|Task has been asked to stop but didn't stop yet +LWS_TP_STATUS_FINISHED|Task has reported it has completed +LWS_TP_STATUS_STOPPED|Task has aborted + +The task function will only be told `LWS_TP_STATUS_RUNNING` or +`LWS_TP_STATUS_STOPPING` in its status argument... RUNNING means continue with the +user task and STOPPING means clean up and return `LWS_TP_RETURN_STOPPED`. + +If possible every 100ms or so the task should return `LWS_TP_RETURN_CHECKING_IN` +to allow lws to inform it reasonably quickly that it has been asked to stop +(eg, because the related wsi has closed), or if it can continue. If not +possible, it's okay but eg exiting the application may experience delays +until the running task finishes, and since the wsi may have gone, the work +is wasted. + +The task function may return one of + +Return|Meaning +---|--- +LWS_TP_RETURN_CHECKING_IN|Still wants to run, but confirming nobody asked him to stop. Will be called again immediately with `LWS_TP_STATUS_RUNNING` or `LWS_TP_STATUS_STOPPING` +LWS_TP_RETURN_SYNC|Task wants to trigger a WRITABLE callback and block until lws service thread restarts it with `lws_threadpool_task_sync()` +LWS_TP_RETURN_FINISHED|Task has finished, successfully as far as it goes +LWS_TP_RETURN_STOPPED|Task has finished, aborting in response to a request to stop + +#### Synchronizing + +The task can choose to "SYNC" with the lws service thread, in other words +cause a WRITABLE callback on the associated wsi in the lws service thread +context and block itself until it hears back from there via +`lws_threadpool_task_sync()` to resume the task. + +This is typically used when, eg, the task has filled its buffer, or ringbuffer, +and needs to pause operations until what's done has been sent and some buffer +space is open again. + +In the WRITABLE callback, in lws service thread context, the buffer can be +sent with `lws_write()` and then `lws_threadpool_task_sync()` to allow the task +to fill another buffer and continue that way. + +If the WRITABLE callback determines that the task should stop, it can just call +`lws_threadpool_task_sync()` with the second argument as 1, to force the task +to stop immediately after it resumes. + +#### The cleanup function + +When a finished task is reaped, or a task that become detached from its initial +wsi completes or is stopped, it calls the `.cleanup` function defined in the +task creation args struct to free anything related to the user pointer. + +With threadpool, responsibility for freeing allocations used by the task belongs +strictly with the task, via the `.cleanup` function, once the task has been +enqueued. That's different from a typical non-threadpool protocol where the +wsi lifecycle controls deallocation. This reflects the fact that the task +may outlive the wsi. + +#### Protecting against WRITABLE and / or SYNC duplication + +Care should be taken than data prepared by the task thread in the user priv +memory should only be sent once. For example, after sending data from a user +priv buffer of a given length stored in the priv, zero down the length. + +Task execution and the SYNC writable callbacks are mutually exclusive, so there +is no danger of collision between the task thread and the lws service thread if +the reason for the callback is a SYNC operation from the task thread. + +### Thread overcommit + +If the tasks running on the threads are ultimately network-bound for all or some +of their processing (via the SYNC with the WRITEABLE callback), it's possible +to overcommit the number of threads in the pool compared to the number of +threads the processor has in hardware to get better occupancy in the CPU. diff --git a/lib/misc/threadpool/threadpool.c b/lib/misc/threadpool/threadpool.c new file mode 100644 index 000000000..127bf025b --- /dev/null +++ b/lib/misc/threadpool/threadpool.c @@ -0,0 +1,979 @@ +/* + * libwebsockets - threadpool api + * + * Copyright (C) 2018 Andy Green + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation: + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301 USA + */ + +#include "core/private.h" + +#include +#include +#include + +struct lws_threadpool; + +struct lws_threadpool_task { + struct lws_threadpool_task *task_queue_next; + + struct lws_threadpool *tp; + char name[32]; + struct lws_threadpool_task_args args; + + lws_usec_t created; + lws_usec_t acquired; + lws_usec_t done; + lws_usec_t entered_state; + + lws_usec_t acc_running; + lws_usec_t acc_syncing; + + pthread_cond_t wake_idle; + + enum lws_threadpool_task_status status; + + int late_sync_retries; + + char wanted_writeable_cb; +}; + +struct lws_pool { + struct lws_threadpool *tp; + pthread_t thread; + pthread_mutex_t lock; /* part of task wake_idle */ + struct lws_threadpool_task *task; + lws_usec_t acquired; + int worker_index; +}; + +struct lws_threadpool { + pthread_mutex_t lock; /* protects all pool lists */ + pthread_cond_t wake_idle; + struct lws_pool *pool_list; + + struct lws_context *context; + struct lws_threadpool *tp_list; /* context list of threadpools */ + + struct lws_threadpool_task *task_queue_head; + struct lws_threadpool_task *task_done_head; + + char name[32]; + + int threads_in_pool; + int queue_depth; + int done_queue_depth; + int max_queue_depth; + int running_tasks; + + unsigned int destroying:1; +}; + +static int +ms_delta(lws_usec_t now, lws_usec_t then) +{ + return (int)((now - then) / 1000); +} + +static void +us_accrue(lws_usec_t *acc, lws_usec_t then) +{ + lws_usec_t now = lws_now_usecs(); + + *acc += now - then; +} + +static int +pc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us) +{ + lws_usec_t delta = (now - then) + 1; + + return (int)((us * 100) / delta); +} + +static void +__lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len) +{ + lws_usec_t now = lws_now_usecs(); + char *end = buf + len - 1; + int syncms = 0, runms = 0; + + if (!task->acquired) { + buf += lws_snprintf(buf, end - buf, + "task: %s, QUEUED queued: %dms", + task->name, ms_delta(now, task->created)); + + return; + } + + if (task->acc_running) + runms = task->acc_running; + + if (task->acc_syncing) + syncms = task->acc_syncing; + + if (!task->done) { + buf += lws_snprintf(buf, end - buf, + "task: %s, ONGOING state %d (%dms) alive: %dms " + "(queued %dms, acquired: %dms, " + "run: %d%%, sync: %d%%)", task->name, task->status, + ms_delta(now, task->entered_state), + ms_delta(now, task->created), + ms_delta(task->acquired, task->created), + ms_delta(now, task->acquired), + pc_delta(now, task->acquired, runms), + pc_delta(now, task->acquired, syncms)); + + return; + } + + buf += lws_snprintf(buf, end - buf, + "task: %s, DONE state %d lived: %dms " + "(queued %dms, on thread: %dms, " + "ran: %d%%, synced: %d%%)", task->name, task->status, + ms_delta(task->done, task->created), + ms_delta(task->acquired, task->created), + ms_delta(task->done, task->acquired), + pc_delta(task->done, task->acquired, runms), + pc_delta(task->done, task->acquired, syncms)); +} + +void +lws_threadpool_dump(struct lws_threadpool *tp) +{ +#if defined(_DEBUG) + struct lws_threadpool_task **c; + char buf[160]; + int n, count; + + pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ + + lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__, + tp->name, tp->queue_depth, tp->running_tasks, + tp->done_queue_depth); + + count = 0; + c = &tp->task_queue_head; + while (*c) { + struct lws_threadpool_task *task = *c; + __lws_threadpool_task_dump(task, buf, sizeof(buf)); + lwsl_thread(" - %s\n", buf); + count++; + + c = &(*c)->task_queue_next; + } + + if (count != tp->queue_depth) + lwsl_err("%s: tp says queue depth %d, but actually %d\n", + __func__, tp->queue_depth, count); + + count = 0; + for (n = 0; n < tp->threads_in_pool; n++) { + struct lws_pool *pool = &tp->pool_list[n]; + struct lws_threadpool_task *task = pool->task; + + if (task) { + __lws_threadpool_task_dump(task, buf, sizeof(buf)); + lwsl_thread(" - worker %d: %s\n", n, buf); + count++; + } + } + + if (count != tp->running_tasks) + lwsl_err("%s: tp says %d running_tasks, but actually %d\n", + __func__, tp->running_tasks, count); + + count = 0; + c = &tp->task_done_head; + while (*c) { + struct lws_threadpool_task *task = *c; + __lws_threadpool_task_dump(task, buf, sizeof(buf)); + lwsl_thread(" - %s\n", buf); + count++; + + c = &(*c)->task_queue_next; + } + + if (count != tp->done_queue_depth) + lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n", + __func__, tp->done_queue_depth, count); + + pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */ +#endif +} + +static void +state_transition(struct lws_threadpool_task *task, + enum lws_threadpool_task_status status) +{ + task->entered_state = lws_now_usecs(); + task->status = status; +} + +static void +lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task) +{ + if (task->args.cleanup) + task->args.cleanup(task->args.wsi, task->args.user); + + if (task->args.wsi) + task->args.wsi->tp_task = NULL; + + lwsl_thread("%s: tp %p: cleaned finished task for wsi %p\n", + __func__, task->tp, task->args.wsi); + + lws_free(task); +} + +static void +__lws_threadpool_reap(struct lws_threadpool_task *task) +{ + struct lws_threadpool_task **c, *t = NULL; + struct lws_threadpool *tp = task->tp; + + /* remove the task from the done queue */ + + c = &tp->task_done_head; + + while (*c) { + if ((*c) == task) { + t = *c; + *c = t->task_queue_next; + t->task_queue_next = NULL; + tp->done_queue_depth--; + + lwsl_thread("%s: tp %s: reaped task wsi %p\n", __func__, + tp->name, task->args.wsi); + + break; + } + c = &(*c)->task_queue_next; + } + + if (!t) + lwsl_err("%s: task %p not in done queue\n", __func__, task); + + /* call the task's cleanup and delete the task itself */ + + lws_threadpool_task_cleanup_destroy(task); +} + +/* + * this gets called from each tsi service context after the service was + * cancelled... we need to ask for the writable callback from the matching + * tsi context for any wsis bound to a worked thread that need it + */ + +int +lws_threadpool_tsi_context(struct lws_context *context, int tsi) +{ + struct lws_threadpool_task **c, *task = NULL; + struct lws_threadpool *tp; + struct lws *wsi; + + lws_context_lock(context, __func__); + + tp = context->tp_list_head; + while (tp) { + int n; + + /* for the running (syncing...) tasks... */ + + for (n = 0; n < tp->threads_in_pool; n++) { + struct lws_pool *pool = &tp->pool_list[n]; + + task = pool->task; + if (!task) + continue; + + wsi = task->args.wsi; + if (!wsi || wsi->tsi != tsi || + !task->wanted_writeable_cb) + continue; + + task->wanted_writeable_cb = 0; + lws_memory_barrier(); + + /* + * finally... we can ask for the callback on + * writable from the correct service thread + * context + */ + + lws_callback_on_writable(wsi); + } + + /* for the done tasks... */ + + c = &tp->task_done_head; + + while (*c) { + task = *c; + wsi = task->args.wsi; + + if (wsi && wsi->tsi == tsi && + task->wanted_writeable_cb) { + + task->wanted_writeable_cb = 0; + lws_memory_barrier(); + + /* + * finally... we can ask for the callback on + * writable from the correct service thread + * context + */ + + lws_callback_on_writable(wsi); + } + + c = &task->task_queue_next; + } + + tp = tp->tp_list; + } + + lws_context_unlock(context); + + return 0; +} + +static int +lws_threadpool_worker_sync(struct lws_pool *pool, + struct lws_threadpool_task *task) +{ + enum lws_threadpool_task_status temp; + struct timespec abstime; + struct lws *wsi; + int tries = 15; + + /* block until writable acknowledges */ + lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task); + pthread_mutex_lock(&pool->lock); /* ======================= pool lock */ + + lwsl_info("%s: %s: task %p (%s): syncing with wsi %p\n", __func__, + pool->tp->name, task, task->name, task->args.wsi); + + temp = task->status; + state_transition(task, LWS_TP_STATUS_SYNCING); + while (tries--) { + wsi = task->args.wsi; + + /* + * if the wsi is no longer attached to this task, there is + * nothing we can sync to usefully. Since the work wants to + * sync, it means we should react to the situation by telling + * the task it can't continue usefully by stopping it. + */ + + if (!wsi) { + lwsl_thread("%s: %s: task %p (%s): No longer bound to any " + "wsi to sync to\n", __func__, pool->tp->name, + task, task->name); + + state_transition(task, LWS_TP_STATUS_STOPPING); + goto done; + } + + /* + * So tries times this is the maximum time between SYNC asking + * for a callback on writable and actually getting it we are + * willing to sit still for. + * + * If it is exceeded, we will stop the task. + */ + abstime.tv_sec = time(NULL) + 2; + abstime.tv_nsec = 0; + + task->wanted_writeable_cb = 1; + lws_memory_barrier(); + + /* + * This will cause lws_threadpool_tsi_context() to get called + * from each tsi service context, where we can safely ask for + * a callback on writeable on the wsi we are associated with. + */ + lws_cancel_service(lws_get_context(wsi)); + + /* + * so the danger here is that we asked for a writable callback + * on the wsi, but for whatever reason, we are never going to + * get one. To avoid deadlocking forever, we allow a set time + * for the sync to happen naturally, otherwise the cond wait + * times out and we stop the task. + */ + + if (pthread_cond_timedwait(&task->wake_idle, &pool->lock, + &abstime) == ETIMEDOUT) { + task->late_sync_retries++; + if (!tries) { + lwsl_err("%s: %s: task %p (%s): SYNC timed out " + "(associated wsi %p)\n", + __func__, pool->tp->name, task, + task->name, task->args.wsi); + + state_transition(task, LWS_TP_STATUS_STOPPING); + goto done; + } + + continue; + } else + break; + } + + if (task->status == LWS_TP_STATUS_SYNCING) + state_transition(task, temp); + + lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task); + +done: + pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */ + + return 0; +} + +static void * +lws_threadpool_worker(void *d) +{ + struct lws_threadpool_task **c, **c2, *task; + struct lws_pool *pool = d; + struct lws_threadpool *tp = pool->tp; + char buf[160]; + + while (!tp->destroying) { + + /* we have no running task... wait and get one from the queue */ + + pthread_mutex_lock(&tp->lock); /* =================== tp lock */ + + /* + * if there's no task already waiting in the queue, wait for + * the wake_idle condition to signal us that might have changed + */ + while (!tp->task_queue_head && !tp->destroying) + pthread_cond_wait(&tp->wake_idle, &tp->lock); + + if (tp->destroying) { + pthread_mutex_unlock(&tp->lock); /* ------ tp unlock */ + continue; + } + + c = &tp->task_queue_head; + c2 = NULL; + task = NULL; + pool->task = NULL; + + /* look at the queue tail */ + while (*c) { + c2 = c; + c = &(*c)->task_queue_next; + } + + /* is there a task at the queue tail? */ + if (c2 && *c2) { + pool->task = task = *c2; + task->acquired = pool->acquired = lws_now_usecs(); + /* remove it from the queue */ + *c2 = task->task_queue_next; + task->task_queue_next = NULL; + tp->queue_depth--; + /* mark it as running */ + state_transition(task, LWS_TP_STATUS_RUNNING); + } + + /* someone else got it first... wait and try again */ + if (!task) { + pthread_mutex_unlock(&tp->lock); /* ------ tp unlock */ + continue; + } + + task->wanted_writeable_cb = 0; + + /* we have acquired a new task */ + + __lws_threadpool_task_dump(task, buf, sizeof(buf)); + + lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n", + __func__, tp->name, pool->worker_index, buf); + tp->running_tasks++; + + pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */ + + /* + * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to + * "resurface" periodically, and get called again with + * cont = 1 immediately to indicate it is picking up where it + * left off if the task is not being "stopped". + * + * This allows long tasks to respond to requests to stop in + * a clean and opaque way. + * + * 2) The task can return with LWS_TP_RETURN_SYNC to register + * a "callback on writable" request on the service thread and + * block until it hears back from the WRITABLE handler. + * + * This allows the work on the thread to be synchronized to the + * previous work being dispatched cleanly. + * + * 3) The task can return with LWS_TP_RETURN_FINISHED to + * indicate its work is completed nicely. + * + * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate + * it stopped and cleaned up after incomplete work. + */ + + do { + lws_usec_t then; + int n; + + if (tp->destroying || !task->args.wsi) + state_transition(task, LWS_TP_STATUS_STOPPING); + + then = lws_now_usecs(); + n = task->args.task(task->args.user, task->status); + us_accrue(&task->acc_running, then); + switch (n) { + case LWS_TP_RETURN_CHECKING_IN: + /* if not destroying the tp, continue */ + break; + case LWS_TP_RETURN_SYNC: + /* block until writable acknowledges */ + then = lws_now_usecs(); + lws_threadpool_worker_sync(pool, task); + us_accrue(&task->acc_syncing, then); + break; + case LWS_TP_RETURN_FINISHED: + state_transition(task, LWS_TP_STATUS_FINISHED); + break; + case LWS_TP_RETURN_STOPPED: + state_transition(task, LWS_TP_STATUS_STOPPED); + break; + } + } while (task->status == LWS_TP_STATUS_RUNNING); + + pthread_mutex_lock(&tp->lock); /* =================== tp lock */ + + tp->running_tasks--; + + if (pool->task->status == LWS_TP_STATUS_STOPPING) + state_transition(task, LWS_TP_STATUS_STOPPED); + + /* move the task to the done queue */ + + pool->task->task_queue_next = tp->task_done_head; + tp->task_done_head = task; + tp->done_queue_depth++; + pool->task->done = lws_now_usecs(); + + if (!pool->task->args.wsi && + (pool->task->status == LWS_TP_STATUS_STOPPED || + pool->task->status == LWS_TP_STATUS_FINISHED)) { + + __lws_threadpool_task_dump(pool->task, buf, sizeof(buf)); + lwsl_thread("%s: %s: worker %d REAPING: %s\n", + __func__, tp->name, pool->worker_index, + buf); + + /* + * there is no longer any wsi attached, so nothing is + * going to take care of reaping us. So we must take + * care of it ourselves. + */ + __lws_threadpool_reap(pool->task); + } else { + + __lws_threadpool_task_dump(pool->task, buf, sizeof(buf)); + lwsl_thread("%s: %s: worker %d DONE: %s\n", + __func__, tp->name, pool->worker_index, + buf); + + /* signal the associated wsi to take a fresh look at + * task status */ + + if (pool->task->args.wsi) { + task->wanted_writeable_cb = 1; + + lws_cancel_service( + lws_get_context(pool->task->args.wsi)); + } + } + + pool->task = NULL; + pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */ + } + + /* threadpool is being destroyed */ + + pthread_exit(NULL); + + return NULL; +} + +struct lws_threadpool * +lws_threadpool_create(struct lws_context *context, + const struct lws_threadpool_create_args *args, + const char *format, ...) +{ + struct lws_threadpool *tp; + va_list ap; + int n; + + tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * args->threads), + "threadpool alloc"); + if (!tp) + return NULL; + + memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * args->threads)); + tp->pool_list = (struct lws_pool *)(tp + 1); + tp->max_queue_depth = args->max_queue_depth; + + va_start(ap, format); + n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap); + va_end(ap); + + lws_context_lock(context, __func__); + + tp->context = context; + tp->tp_list = context->tp_list_head; + context->tp_list_head = tp; + + lws_context_unlock(context); + + pthread_mutex_init(&tp->lock, NULL); + pthread_cond_init(&tp->wake_idle, NULL); + + for (n = 0; n < args->threads; n++) { + tp->pool_list[n].tp = tp; + tp->pool_list[n].worker_index = n; + pthread_mutex_init(&tp->pool_list[n].lock, NULL); + if (pthread_create(&tp->pool_list[n].thread, NULL, + lws_threadpool_worker, &tp->pool_list[n])) { + lwsl_err("thread creation failed\n"); + } else + tp->threads_in_pool++; + } + + return tp; +} + +void +lws_threadpool_finish(struct lws_threadpool *tp) +{ + struct lws_threadpool_task **c, *task; + + pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ + + /* nothing new can start, running jobs will abort as STOPPED and the + * pool threads will exit ASAP (they are joined in destroy) */ + tp->destroying = 1; + + /* stop everyone in the pending queue and move to the done queue */ + + c = &tp->task_queue_head; + while (*c) { + task = *c; + *c = task->task_queue_next; + task->task_queue_next = tp->task_done_head; + tp->task_done_head = task; + state_transition(task, LWS_TP_STATUS_STOPPED); + tp->queue_depth--; + tp->done_queue_depth++; + task->done = lws_now_usecs(); + + c = &task->task_queue_next; + } + + pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */ + + pthread_cond_broadcast(&tp->wake_idle); +} + +void +lws_threadpool_destroy(struct lws_threadpool *tp) +{ + struct lws_threadpool_task *task, *next; + struct lws_threadpool **ptp; + void *retval; + int n; + + /* remove us from the context list of threadpools */ + + lws_context_lock(tp->context, __func__); + + ptp = &tp->context->tp_list_head; + while (*ptp) { + if (*ptp == tp) { + *ptp = tp->tp_list; + break; + } + ptp = &(*ptp)->tp_list; + } + + lws_context_unlock(tp->context); + + + pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ + + tp->destroying = 1; + pthread_cond_broadcast(&tp->wake_idle); + pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */ + + lws_threadpool_dump(tp); + + for (n = 0; n < tp->threads_in_pool; n++) { + task = tp->pool_list[n].task; + + /* he could be sitting waiting for SYNC */ + + if (task != NULL) + pthread_cond_broadcast(&task->wake_idle); + + pthread_join(tp->pool_list[n].thread, &retval); + pthread_mutex_destroy(&tp->pool_list[n].lock); + } + lwsl_info("%s: all threadpools exited\n", __func__); + + task = tp->task_done_head; + while (task) { + next = task->task_queue_next; + lws_threadpool_task_cleanup_destroy(task); + tp->done_queue_depth--; + task = next; + } + + pthread_mutex_destroy(&tp->lock); + + lws_free(tp); +} + +/* + * we want to stop and destroy the task and related priv. The wsi may no + * longer exist. + */ + +int +lws_threadpool_dequeue(struct lws *wsi) +{ + struct lws_threadpool *tp; + struct lws_threadpool_task **c, *task; + int n; + + task = wsi->tp_task; + if (!task) + return 0; + + tp = task->tp; + pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ + + c = &tp->task_queue_head; + + /* is he queued waiting for a chance to run? Mark him as stopped and + * move him on to the done queue */ + + while (*c) { + if ((*c) == task) { + *c = task->task_queue_next; + task->task_queue_next = tp->task_done_head; + tp->task_done_head = task; + state_transition(task, LWS_TP_STATUS_STOPPED); + tp->queue_depth--; + tp->done_queue_depth++; + task->done = lws_now_usecs(); + + lwsl_debug("%s: tp %p: removed queued task wsi %p\n", + __func__, tp, task->args.wsi); + + break; + } + c = &(*c)->task_queue_next; + } + + /* is he on the done queue? */ + + c = &tp->task_done_head; + while (*c) { + if ((*c) == task) { + *c = task->task_queue_next; + task->task_queue_next = NULL; + lws_threadpool_task_cleanup_destroy(task); + tp->done_queue_depth--; + goto bail; + } + c = &(*c)->task_queue_next; + } + + /* he's not in the queue... is he already running on a thread? */ + + for (n = 0; n < tp->threads_in_pool; n++) { + if (!tp->pool_list[n].task || tp->pool_list[n].task != task) + continue; + + /* + * ensure we don't collide with tests or changes in the + * worker thread + */ + pthread_mutex_lock(&tp->pool_list[n].lock); + + /* + * mark him as having been requested to stop... + * the caller will hear about it in his service thread + * context as a request to close + */ + state_transition(task, LWS_TP_STATUS_STOPPING); + + /* disconnect from wsi, and wsi from task */ + + task->args.wsi->tp_task = NULL; + task->args.wsi = NULL; + + pthread_mutex_unlock(&tp->pool_list[n].lock); + + lwsl_debug("%s: tp %p: request stop running task " + "for wsi %p\n", __func__, tp, task->args.wsi); + + break; + } + + if (n == tp->threads_in_pool) { + /* can't find it */ + lwsl_notice("%s: tp %p: no task for wsi %p, decoupling\n", + __func__, tp, task->args.wsi); + task->args.wsi->tp_task = NULL; + task->args.wsi = NULL; + } + +bail: + pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */ + + return 0; +} + +struct lws_threadpool_task * +lws_threadpool_enqueue(struct lws_threadpool *tp, + const struct lws_threadpool_task_args *args, + const char *format, ...) +{ + struct lws_threadpool_task *task = NULL; + va_list ap; + + if (tp->destroying) + return NULL; + + pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ + + /* + * if there's room on the queue, the job always goes on the queue + * first, then any free thread may pick it up after the wake_idle + */ + + if (tp->queue_depth == tp->max_queue_depth) { + lwsl_notice("%s: queue reached limit %d\n", __func__, + tp->max_queue_depth); + + goto bail; + } + + /* + * create the task object + */ + + task = lws_malloc(sizeof(*task), __func__); + if (!task) + goto bail; + + memset(task, 0, sizeof(*task)); + pthread_cond_init(&task->wake_idle, NULL); + task->args = *args; + task->tp = tp; + task->created = lws_now_usecs(); + + va_start(ap, format); + vsnprintf(task->name, sizeof(task->name) - 1, format, ap); + va_end(ap); + + /* + * add him on the tp task queue + */ + + task->task_queue_next = tp->task_queue_head; + state_transition(task, LWS_TP_STATUS_QUEUED); + tp->task_queue_head = task; + tp->queue_depth++; + + /* + * mark the wsi itself as depending on this tp (so wsi close for + * whatever reason can clean up) + */ + + args->wsi->tp_task = task; + + lwsl_thread("%s: tp %s: enqueued task %p (%s) for wsi %p, depth %d\n", + __func__, tp->name, task, task->name, args->wsi, + tp->queue_depth); + + /* alert any idle thread there's something new on the task list */ + + lws_memory_barrier(); + pthread_cond_signal(&tp->wake_idle); + +bail: + pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */ + + return task; +} + +/* this should be called from the service thread */ + +enum lws_threadpool_task_status +lws_threadpool_task_status_wsi(struct lws *wsi, + struct lws_threadpool_task **task, void **user) +{ + enum lws_threadpool_task_status status; + struct lws_threadpool *tp; + char buf[160]; + + *task = wsi->tp_task; + if (!*task) + return -1; + + tp = (*task)->tp; + *user = (*task)->args.user; + status = (*task)->status; + + if (status == LWS_TP_STATUS_FINISHED || + status == LWS_TP_STATUS_STOPPED) { + + pthread_mutex_lock(&tp->lock); /* ================ tpool lock */ + __lws_threadpool_task_dump(*task, buf, sizeof(buf)); + lwsl_thread("%s: %s: service thread REAPING: %s\n", + __func__, tp->name, buf); + __lws_threadpool_reap(*task); + lws_memory_barrier(); + pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */ + } + + return status; +} + +void +lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop) +{ + lwsl_debug("%s\n", __func__); + + if (stop) + state_transition(task, LWS_TP_STATUS_STOPPING); + + pthread_cond_signal(&task->wake_idle); +} diff --git a/lib/roles/h1/ops-h1.c b/lib/roles/h1/ops-h1.c index f47ae9c7f..61910c7c1 100644 --- a/lib/roles/h1/ops-h1.c +++ b/lib/roles/h1/ops-h1.c @@ -42,7 +42,7 @@ lws_read_h1(struct lws *wsi, unsigned char *buf, lws_filepos_t len) lws_filepos_t body_chunk_len; size_t n; - // lwsl_notice("%s: h1 path: wsi state 0x%x\n", __func__, lwsi_state(wsi)); + lwsl_debug("%s: h1 path: wsi state 0x%x\n", __func__, lwsi_state(wsi)); switch (lwsi_state(wsi)) { @@ -225,7 +225,7 @@ ws_mode: break; case LRS_DEFERRING_ACTION: - lwsl_debug("%s: LRS_DEFERRING_ACTION\n", __func__); + lwsl_notice("%s: LRS_DEFERRING_ACTION\n", __func__); break; case LRS_SSL_ACK_PENDING: @@ -543,6 +543,37 @@ rops_handle_POLLIN_h1(struct lws_context_per_thread *pt, struct lws *wsi, } #endif + + /* Priority 2: pre- compression transform */ + +#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION) + if (wsi->http.comp_ctx.buflist_comp || + wsi->http.comp_ctx.may_have_more) { + enum lws_write_protocol wp = LWS_WRITE_HTTP; + + lwsl_info("%s: completing comp partial (buflist_comp %p, may %d)\n", + __func__, wsi->http.comp_ctx.buflist_comp, + wsi->http.comp_ctx.may_have_more + ); + + if (wsi->role_ops->write_role_protocol(wsi, NULL, 0, &wp) < 0) { + lwsl_info("%s signalling to close\n", __func__); + return LWS_HPI_RET_PLEASE_CLOSE_ME; + } + lws_callback_on_writable(wsi); + + if (!wsi->http.comp_ctx.buflist_comp && + !wsi->http.comp_ctx.may_have_more && + wsi->http.deferred_transaction_completed) { + wsi->http.deferred_transaction_completed = 0; + if (lws_http_transaction_completed(wsi)) + return LWS_HPI_RET_PLEASE_CLOSE_ME; + } + + return LWS_HPI_RET_HANDLED; + } +#endif + if (lws_is_flowcontrolled(wsi)) /* We cannot deal with any kind of new RX because we are * RX-flowcontrolled. @@ -652,7 +683,7 @@ rops_write_role_protocol_h1(struct lws *wsi, unsigned char *buf, size_t len, if (n) return n; - lwsl_debug("%s: %p: transformed %d bytes to %d " + lwsl_info("%s: %p: transformed %d bytes to %d " "(wp 0x%x, more %d)\n", __func__, wsi, (int)len, (int)o, (int)*wp, wsi->http.comp_ctx.may_have_more); @@ -665,7 +696,7 @@ rops_write_role_protocol_h1(struct lws *wsi, unsigned char *buf, size_t len, * pipelining */ n = lws_snprintf(c, sizeof(c), "%X\x0d\x0a", (int)o); - // lwsl_notice("%s: chunk %s\n", __func__, c); + lwsl_info("%s: chunk (%d) %s", __func__, (int)o, c); out -= n; o += n; memcpy(out, c, n); @@ -673,6 +704,7 @@ rops_write_role_protocol_h1(struct lws *wsi, unsigned char *buf, size_t len, out[o++] = '\x0a'; if (((*wp) & 0x1f) == LWS_WRITE_HTTP_FINAL) { + lwsl_info("%s: final chunk\n", __func__); out[o++] = '0'; out[o++] = '\x0d'; out[o++] = '\x0a'; @@ -902,7 +934,7 @@ rops_perform_user_POLLOUT_h1(struct lws *wsi) wsi->http.comp_ctx.may_have_more) { enum lws_write_protocol wp = LWS_WRITE_HTTP; - lwsl_debug("%s: completing comp partial" + lwsl_info("%s: completing comp partial" "(buflist_comp %p, may %d)\n", __func__, wsi->http.comp_ctx.buflist_comp, wsi->http.comp_ctx.may_have_more); diff --git a/lib/roles/h2/http2.c b/lib/roles/h2/http2.c index 5ade1c7f3..38e53d578 100644 --- a/lib/roles/h2/http2.c +++ b/lib/roles/h2/http2.c @@ -1424,6 +1424,10 @@ lws_h2_parse_end_of_frame(struct lws *wsi) } } +#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION) + lws_http_compression_validate(h2n->swsi); +#endif + wsi->vhost->conn_stats.h2_trans++; p = lws_hdr_simple_ptr(h2n->swsi, WSI_TOKEN_HTTP_COLON_METHOD); if (!strcmp(p, "POST")) diff --git a/lib/roles/h2/ops-h2.c b/lib/roles/h2/ops-h2.c index c92dd4634..2c6b2675d 100644 --- a/lib/roles/h2/ops-h2.c +++ b/lib/roles/h2/ops-h2.c @@ -363,7 +363,7 @@ rops_write_role_protocol_h2(struct lws *wsi, unsigned char *buf, size_t len, size_t olen = len; int n; #if defined(LWS_WITH_HTTP_STREAM_COMPRESSION) - unsigned char mtubuf[1450 + LWS_PRE]; + unsigned char mtubuf[4096 + LWS_PRE]; #endif /* if not in a state to send stuff, then just send nothing */ @@ -396,7 +396,7 @@ rops_write_role_protocol_h2(struct lws *wsi, unsigned char *buf, size_t len, if (n) return n; - lwsl_debug("%s: %p: transformed %d bytes to %d " + lwsl_info("%s: %p: transformed %d bytes to %d " "(wp 0x%x, more %d)\n", __func__, wsi, (int)len, (int)o, (int)*wp, wsi->http.comp_ctx.may_have_more); @@ -776,7 +776,7 @@ lws_h2_bind_for_post_before_action(struct lws *wsi) return 1; } - if (lws_bind_protocol(wsi, pp)) + if (lws_bind_protocol(wsi, pp, __func__)) return 1; } @@ -885,7 +885,7 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi) w->http.comp_ctx.may_have_more) { enum lws_write_protocol wp = LWS_WRITE_HTTP; - lwsl_debug("%s: completing comp partial" + lwsl_info("%s: completing comp partial" "(buflist_comp %p, may %d)\n", __func__, w->http.comp_ctx.buflist_comp, w->http.comp_ctx.may_have_more); diff --git a/lib/roles/http/client/client.c b/lib/roles/http/client/client.c index d80c64a04..214b7be2e 100644 --- a/lib/roles/http/client/client.c +++ b/lib/roles/http/client/client.c @@ -720,7 +720,7 @@ lws_client_interpret_server_handshake(struct lws *wsi) * set-cookie:.test=LWS_1456736240_336776_COOKIE;Max-Age=360000 */ - wsi->http.connection_type = HTTP_CONNECTION_KEEP_ALIVE; + wsi->http.conn_type = HTTP_CONNECTION_KEEP_ALIVE; if (!wsi->client_h2_substream) { p = lws_hdr_simple_ptr(wsi, WSI_TOKEN_HTTP); if (wsi->do_ws && !p) { @@ -730,7 +730,7 @@ lws_client_interpret_server_handshake(struct lws *wsi) } if (!p) { p = lws_hdr_simple_ptr(wsi, WSI_TOKEN_HTTP1_0); - wsi->http.connection_type = HTTP_CONNECTION_CLOSE; + wsi->http.conn_type = HTTP_CONNECTION_CLOSE; } if (!p) { cce = "HS: URI missing"; @@ -828,7 +828,7 @@ lws_client_interpret_server_handshake(struct lws *wsi) /* if h1 KA is allowed, enable the queued pipeline guys */ if (!wsi->client_h2_alpn && !wsi->client_h2_substream && w == wsi) { /* ie, coming to this for the first time */ - if (wsi->http.connection_type == HTTP_CONNECTION_KEEP_ALIVE) + if (wsi->http.conn_type == HTTP_CONNECTION_KEEP_ALIVE) wsi->keepalive_active = 1; else { /* @@ -916,7 +916,7 @@ lws_client_interpret_server_handshake(struct lws *wsi) wsi->http.rx_content_length; } else /* can't do 1.1 without a content length or chunked */ if (!wsi->chunked) - wsi->http.connection_type = + wsi->http.conn_type = HTTP_CONNECTION_CLOSE; /* @@ -1033,7 +1033,7 @@ lws_generate_client_handshake(struct lws *wsi, char *pkt) return NULL; } - lws_bind_protocol(wsi, pr); + lws_bind_protocol(wsi, pr, __func__); } if ((wsi->protocol->callback)(wsi, LWS_CALLBACK_RAW_ADOPT, diff --git a/lib/roles/http/header.c b/lib/roles/http/header.c index fe41f9c30..85939343a 100644 --- a/lib/roles/http/header.c +++ b/lib/roles/http/header.c @@ -141,6 +141,10 @@ lws_add_http_common_headers(struct lws *wsi, unsigned int code, const char *content_type, lws_filepos_t content_len, unsigned char **p, unsigned char *end) { + const char *ka[] = { "close", "keep-alive" }; + int types[] = { HTTP_CONNECTION_CLOSE, HTTP_CONNECTION_KEEP_ALIVE }, + t = 0; + if (lws_add_http_header_status(wsi, code, p, end)) return 1; @@ -149,16 +153,60 @@ lws_add_http_common_headers(struct lws *wsi, unsigned int code, (int)strlen(content_type), p, end)) return 1; - if (content_len != LWS_ILLEGAL_HTTP_CONTENT_LEN) { - if (lws_add_http_header_content_length(wsi, content_len, p, end)) +#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION) + if (!wsi->http.lcs && + (!strncmp(content_type, "text/", 5) || + !strcmp(content_type, "application/javascript") || + !strcmp(content_type, "image/svg+xml"))) + lws_http_compression_apply(wsi, NULL, p, end, 0); +#endif + + /* + * if we decided to compress it, we don't know the content length... + * the compressed data will go out chunked on h1 + */ + if ( +#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION) + !wsi->http.lcs && +#endif + content_len != LWS_ILLEGAL_HTTP_CONTENT_LEN) { + if (lws_add_http_header_content_length(wsi, content_len, + p, end)) return 1; } else { - if (lws_add_http_header_by_token(wsi, WSI_TOKEN_CONNECTION, - (unsigned char *)"close", 5, - p, end)) - return 1; + /* there was no length... it normally means CONNECTION_CLOSE */ +#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION) - wsi->http.connection_type = HTTP_CONNECTION_CLOSE; + if (!wsi->http2_substream && wsi->http.lcs) { + /* so... + * - h1 connection + * - http compression transform active + * - did not send content length + * + * then mark as chunked... + */ + wsi->http.comp_ctx.chunking = 1; + if (lws_add_http_header_by_token(wsi, + WSI_TOKEN_HTTP_TRANSFER_ENCODING, + (unsigned char *)"chunked", 7, p, end)) + return -1; + + /* ... but h1 compression is chunked, if active we can + * still pipeline + */ + if (wsi->http.lcs && + wsi->http.conn_type == HTTP_CONNECTION_KEEP_ALIVE) + t = 1; + } +#endif + if (!wsi->http2_substream) { + if (lws_add_http_header_by_token(wsi, WSI_TOKEN_CONNECTION, + (unsigned char *)ka[t], + (int)strlen(ka[t]), p, end)) + return 1; + + wsi->http.conn_type = types[t]; + } } return 0; @@ -246,6 +294,7 @@ lws_add_http_header_status(struct lws *wsi, unsigned int _code, end)) return 1; } + headers = wsi->vhost->headers; while (headers) { if (lws_add_http_header_by_name(wsi, @@ -303,9 +352,9 @@ lws_return_http_status(struct lws *wsi, unsigned int code, code == HTTP_STATUS_NOT_FOUND) /* we should do a redirect, and do the 404 there */ if (lws_http_redirect(wsi, HTTP_STATUS_FOUND, - (uint8_t *)wsi->vhost->http.error_document_404, - (int)strlen(wsi->vhost->http.error_document_404), - &p, end) > 0) + (uint8_t *)wsi->vhost->http.error_document_404, + (int)strlen(wsi->vhost->http.error_document_404), + &p, end) > 0) return 0; #endif diff --git a/lib/roles/http/private.h b/lib/roles/http/private.h index f901ea08e..366435396 100644 --- a/lib/roles/http/private.h +++ b/lib/roles/http/private.h @@ -39,7 +39,7 @@ enum http_version { HTTP_VERSION_2 }; -enum http_connection_type { +enum http_conn_type { HTTP_CONNECTION_CLOSE, HTTP_CONNECTION_KEEP_ALIVE }; @@ -226,10 +226,11 @@ struct _lws_http_mode_related { #if defined(LWS_WITH_HTTP_STREAM_COMPRESSION) struct lws_compression_support *lcs; lws_comp_ctx_t comp_ctx; + unsigned char comp_accept_mask; #endif enum http_version request_version; - enum http_connection_type connection_type; + enum http_conn_type conn_type; lws_filepos_t tx_content_length; lws_filepos_t tx_content_remain; lws_filepos_t rx_content_length; diff --git a/lib/roles/http/server/parsers.c b/lib/roles/http/server/parsers.c index ca05a7836..b7dcdb49b 100644 --- a/lib/roles/http/server/parsers.c +++ b/lib/roles/http/server/parsers.c @@ -1123,6 +1123,7 @@ excessive: set_parsing_complete: if (ah->ues != URIES_IDLE) goto forbid; + if (lws_hdr_total_length(wsi, WSI_TOKEN_UPGRADE)) { if (lws_hdr_total_length(wsi, WSI_TOKEN_VERSION)) wsi->rx_frame_type = /* temp for ws version index */ diff --git a/lib/roles/http/server/rewrite.c b/lib/roles/http/server/rewrite.c index 61bb613d9..1afcce151 100644 --- a/lib/roles/http/server/rewrite.c +++ b/lib/roles/http/server/rewrite.c @@ -37,7 +37,7 @@ LWS_EXTERN int lws_rewrite_parse(struct lws_rewrite *r, const unsigned char *in, int in_len) { - if (hubbub_parser_parse_chunk(r->parser, in, in_len) != HUBBUB_OK) + if (r && hubbub_parser_parse_chunk(r->parser, in, in_len) != HUBBUB_OK) return -1; return 0; diff --git a/lib/roles/http/server/server.c b/lib/roles/http/server/server.c index e17028a37..8cff753ba 100644 --- a/lib/roles/http/server/server.c +++ b/lib/roles/http/server/server.c @@ -673,7 +673,7 @@ lws_http_serve(struct lws *wsi, char *uri, const char *origin, const struct lws_protocols *pp = lws_vhost_name_to_protocol( wsi->vhost, m->protocol); - if (lws_bind_protocol(wsi, pp)) + if (lws_bind_protocol(wsi, pp, __func__)) return -1; args.p = (char *)p; args.max_len = lws_ptr_diff(end, p); @@ -887,7 +887,7 @@ int lws_http_action(struct lws *wsi) { struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; - enum http_connection_type connection_type; + enum http_conn_type conn_type; enum http_version request_version; char content_length_str[32]; struct lws_process_html_args args; @@ -971,9 +971,9 @@ lws_http_action(struct lws *wsi) /* HTTP/1.1 defaults to "keep-alive", 1.0 to "close" */ if (request_version == HTTP_VERSION_1_1) - connection_type = HTTP_CONNECTION_KEEP_ALIVE; + conn_type = HTTP_CONNECTION_KEEP_ALIVE; else - connection_type = HTTP_CONNECTION_CLOSE; + conn_type = HTTP_CONNECTION_CLOSE; /* Override default if http "Connection:" header: */ if (lws_hdr_total_length(wsi, WSI_TOKEN_CONNECTION)) { @@ -982,12 +982,12 @@ lws_http_action(struct lws *wsi) WSI_TOKEN_CONNECTION); http_conn_str[sizeof(http_conn_str) - 1] = '\0'; if (!strcasecmp(http_conn_str, "keep-alive")) - connection_type = HTTP_CONNECTION_KEEP_ALIVE; + conn_type = HTTP_CONNECTION_KEEP_ALIVE; else if (!strcasecmp(http_conn_str, "close")) - connection_type = HTTP_CONNECTION_CLOSE; + conn_type = HTTP_CONNECTION_CLOSE; } - wsi->http.connection_type = connection_type; + wsi->http.conn_type = conn_type; } n = wsi->protocol->callback(wsi, LWS_CALLBACK_FILTER_HTTP_CONNECTION, @@ -1040,7 +1040,7 @@ lws_http_action(struct lws *wsi) lwsl_info("no hit\n"); - if (lws_bind_protocol(wsi, &wsi->vhost->protocols[0])) + if (lws_bind_protocol(wsi, &wsi->vhost->protocols[0], "no mount hit")) return 1; lwsi_set_state(wsi, LRS_DOING_TRANSACTION); @@ -1260,7 +1260,7 @@ lws_http_action(struct lws *wsi) return 1; } - if (lws_bind_protocol(wsi, pp)) + if (lws_bind_protocol(wsi, pp, "http action CALLBACK bind")) return 1; args.p = uri_ptr; @@ -1344,7 +1344,9 @@ lws_http_action(struct lws *wsi) lws_vhost_name_to_protocol( wsi->vhost, hit->protocol); - if (lws_bind_protocol(wsi, pp)) + lwsi_set_state(wsi, LRS_DOING_TRANSACTION); + + if (lws_bind_protocol(wsi, pp, "http_action HTTP")) return 1; m = pp->callback(wsi, LWS_CALLBACK_HTTP, @@ -1497,7 +1499,8 @@ raw_transition: lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0); lws_bind_protocol(wsi, &wsi->vhost->protocols[ wsi->vhost-> - raw_protocol_index]); + raw_protocol_index], + __func__); lwsl_info("transition to raw vh %s prot %d\n", wsi->vhost->name, wsi->vhost->raw_protocol_index); @@ -1635,6 +1638,10 @@ raw_transition: lwsi_set_state(wsi, LRS_ESTABLISHED); wsi->http.fop_fd = NULL; +#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION) + lws_http_compression_validate(wsi); +#endif + lwsl_debug("%s: wsi %p: ah %p\n", __func__, (void *)wsi, (void *)wsi->http.ah); @@ -1737,7 +1744,7 @@ lws_http_transaction_completed(struct lws *wsi) return 0; } - lwsl_info("%s: wsi %p\n", __func__, wsi); + lwsl_debug("%s: wsi %p\n", __func__, wsi); #if defined(LWS_WITH_HTTP_STREAM_COMPRESSION) lws_http_compression_destroy(wsi); @@ -1760,12 +1767,12 @@ lws_http_transaction_completed(struct lws *wsi) if (wsi->seen_zero_length_recv) return 1; - if (wsi->http.connection_type != HTTP_CONNECTION_KEEP_ALIVE) { + if (wsi->http.conn_type != HTTP_CONNECTION_KEEP_ALIVE) { lwsl_info("%s: %p: close connection\n", __func__, wsi); return 1; } - if (lws_bind_protocol(wsi, &wsi->vhost->protocols[0])) + if (lws_bind_protocol(wsi, &wsi->vhost->protocols[0], __func__)) return 1; /* @@ -1804,7 +1811,7 @@ lws_http_transaction_completed(struct lws *wsi) if (wsi->http.ah) { // lws_buflist_describe(&wsi->buflist, wsi); if (!lws_buflist_next_segment_len(&wsi->buflist, NULL)) { - lwsl_info("%s: %p: nothing in buflist so detaching ah\n", + lwsl_debug("%s: %p: nothing in buflist so detaching ah\n", __func__, wsi); lws_header_table_detach(wsi, 1); #ifdef LWS_WITH_TLS @@ -1840,13 +1847,14 @@ lws_http_transaction_completed(struct lws *wsi) if (wsi->http.ah) wsi->http.ah->ues = URIES_IDLE; - //lwsi_set_state(wsi, LRS_ESTABLISHED); + //lwsi_set_state(wsi, LRS_ESTABLISHED); // !!! } else if (lws_buflist_next_segment_len(&wsi->buflist, NULL)) if (lws_header_table_attach(wsi, 0)) lwsl_debug("acquired ah\n"); - lwsl_info("%s: %p: keep-alive await new transaction\n", __func__, wsi); + lwsl_debug("%s: %p: keep-alive await new transaction (state 0x%x)\n", + __func__, wsi, wsi->wsistate); lws_callback_on_writable(wsi); return 0; @@ -2096,11 +2104,6 @@ lws_serve_http_file(struct lws *wsi, const char *file, const char *content_type, (unsigned char *)cc, cclen, &p, end)) return -1; - if (wsi->http.connection_type == HTTP_CONNECTION_KEEP_ALIVE) - if (lws_add_http_header_by_token(wsi, WSI_TOKEN_CONNECTION, - (unsigned char *)"keep-alive", 10, &p, end)) - return -1; - if (other_headers) { if ((end - p) < other_headers_len) return -1; @@ -2159,7 +2162,7 @@ LWS_VISIBLE int lws_serve_http_file_fragment(struct lws *wsi) wsi->http.comp_ctx.may_have_more) { enum lws_write_protocol wp = LWS_WRITE_HTTP; - lwsl_debug("%s: completing comp partial (buflist_comp %p, may %d)\n", + lwsl_info("%s: completing comp partial (buflist_comp %p, may %d)\n", __func__, wsi->http.comp_ctx.buflist_comp, wsi->http.comp_ctx.may_have_more); diff --git a/lib/roles/pipe/ops-pipe.c b/lib/roles/pipe/ops-pipe.c index 27b3739ff..659c9bd93 100644 --- a/lib/roles/pipe/ops-pipe.c +++ b/lib/roles/pipe/ops-pipe.c @@ -39,6 +39,18 @@ rops_handle_POLLIN_pipe(struct lws_context_per_thread *pt, struct lws *wsi, if (n < 0) return LWS_HPI_RET_PLEASE_CLOSE_ME; #endif + +#if defined(LWS_WITH_THREADPOOL) + /* + * threadpools that need to call for on_writable callbacks do it by + * marking the task as needing one for its wsi, then cancelling service. + * + * Each tsi will call this to perform the actual callback_on_writable + * from the correct service thread context + */ + lws_threadpool_tsi_context(pt->context, pt->tid); +#endif + /* * the poll() wait, or the event loop for libuv etc is a * process-wide resource that we interrupted. So let every diff --git a/lib/roles/raw-skt/ops-raw-skt.c b/lib/roles/raw-skt/ops-raw-skt.c index d1e0ef0b9..f22949303 100644 --- a/lib/roles/raw-skt/ops-raw-skt.c +++ b/lib/roles/raw-skt/ops-raw-skt.c @@ -156,11 +156,12 @@ rops_adoption_bind_raw_skt(struct lws *wsi, int type, const char *vh_prot_name) LRS_ESTABLISHED, &role_ops_raw_skt); if (vh_prot_name) - lws_bind_protocol(wsi, wsi->protocol); + lws_bind_protocol(wsi, wsi->protocol, __func__); else /* this is the only time he will transition */ lws_bind_protocol(wsi, - &wsi->vhost->protocols[wsi->vhost->raw_protocol_index]); + &wsi->vhost->protocols[wsi->vhost->raw_protocol_index], + __func__); return 1; /* bound */ } diff --git a/lib/roles/ws/server-ws.c b/lib/roles/ws/server-ws.c index c0d719eb6..8c5f25c99 100644 --- a/lib/roles/ws/server-ws.c +++ b/lib/roles/ws/server-ws.c @@ -326,7 +326,8 @@ lws_process_ws_upgrade(struct lws *wsi) !strcmp(wsi->vhost->protocols[n].name, protocol_name)) { lws_bind_protocol(wsi, - &wsi->vhost->protocols[n]); + &wsi->vhost->protocols[n], + "ws upgrade select pcol"); hit = 1; break; } @@ -353,7 +354,8 @@ lws_process_ws_upgrade(struct lws *wsi) wsi->vhost->default_protocol_index); n = wsi->vhost->default_protocol_index; lws_bind_protocol(wsi, &wsi->vhost->protocols[ - (int)wsi->vhost->default_protocol_index]); + (int)wsi->vhost->default_protocol_index], + "ws upgrade default pcol"); } /* allocate the ws struct for the wsi */ diff --git a/minimal-examples/ws-server/README.md b/minimal-examples/ws-server/README.md index eb33c7a3c..b69c1d896 100644 --- a/minimal-examples/ws-server/README.md +++ b/minimal-examples/ws-server/README.md @@ -5,6 +5,7 @@ minimal-ws-server-echo|Simple ws server that listens and echos back anything cli minimal-ws-server-pmd-bulk|Simple ws server showing how to pass bulk data with permessage-deflate minimal-ws-server-pmd|Simple ws server with permessage-deflate support minimal-ws-server-ring|Like minimal-ws-server but holds the chat in a multi-tail ringbuffer +minimal-ws-server-threadpool|Demonstrates how to use a worker thread pool with lws minimal-ws-server-threads|Simple ws server where data is produced by different threads minimal-ws-server|Serves an index.html over http that opens a ws shared chat client in a browser diff --git a/minimal-examples/ws-server/minimal-ws-server-threadpool/CMakeLists.txt b/minimal-examples/ws-server/minimal-ws-server-threadpool/CMakeLists.txt new file mode 100644 index 000000000..951e9f642 --- /dev/null +++ b/minimal-examples/ws-server/minimal-ws-server-threadpool/CMakeLists.txt @@ -0,0 +1,92 @@ +cmake_minimum_required(VERSION 2.8) +include(CheckIncludeFile) +include(CheckCSourceCompiles) + +set(SAMP lws-minimal-ws-server-threadpool) +set(SRCS minimal-ws-server-threadpool.c) + +MACRO(require_pthreads result) + CHECK_INCLUDE_FILE(pthread.h LWS_HAVE_PTHREAD_H) + if (NOT LWS_HAVE_PTHREAD_H) + if (LWS_WITH_MINIMAL_EXAMPLES) + set(result 0) + else() + message(FATAL_ERROR "threading support requires pthreads") + endif() + endif() +ENDMACRO() + +# If we are being built as part of lws, confirm current build config supports +# reqconfig, else skip building ourselves. +# +# If we are being built externally, confirm installed lws was configured to +# support reqconfig, else error out with a helpful message about the problem. +# +MACRO(require_lws_config reqconfig _val result) + + if (DEFINED ${reqconfig}) + if (${reqconfig}) + set (rq 1) + else() + set (rq 0) + endif() + else() + set(rq 0) + endif() + + if (${_val} EQUAL ${rq}) + set(SAME 1) + else() + set(SAME 0) + endif() + + if (LWS_WITH_MINIMAL_EXAMPLES AND NOT ${SAME}) + if (${_val}) + message("${SAMP}: skipping as lws being built without ${reqconfig}") + else() + message("${SAMP}: skipping as lws built with ${reqconfig}") + endif() + set(${result} 0) + else() + if (LWS_WITH_MINIMAL_EXAMPLES) + set(MET ${SAME}) + else() + CHECK_C_SOURCE_COMPILES("#include \nint main(void) {\n#if defined(${reqconfig})\n return 0;\n#else\n fail;\n#endif\n return 0;\n}\n" HAS_${reqconfig}) + if (NOT DEFINED HAS_${reqconfig} OR NOT HAS_${reqconfig}) + set(HAS_${reqconfig} 0) + else() + set(HAS_${reqconfig} 1) + endif() + if ((HAS_${reqconfig} AND ${_val}) OR (NOT HAS_${reqconfig} AND NOT ${_val})) + set(MET 1) + else() + set(MET 0) + endif() + endif() + if (NOT MET) + if (${_val}) + message(FATAL_ERROR "This project requires lws must have been configured with ${reqconfig}") + else() + message(FATAL_ERROR "Lws configuration of ${reqconfig} is incompatible with this project") + endif() + endif() + + endif() +ENDMACRO() + +set(requirements 1) +require_pthreads(requirements) +require_lws_config(LWS_ROLE_WS 1 requirements) +require_lws_config(LWS_WITHOUT_SERVER 0 requirements) +require_lws_config(LWS_WITH_THREADPOOL 1 requirements) + +if (requirements) + add_executable(${SAMP} ${SRCS}) + + if (websockets_shared) + target_link_libraries(${SAMP} websockets_shared pthread) + add_dependencies(${SAMP} websockets_shared) + else() + target_link_libraries(${SAMP} websockets pthread) + endif() +endif() diff --git a/minimal-examples/ws-server/minimal-ws-server-threadpool/README.md b/minimal-examples/ws-server/minimal-ws-server-threadpool/README.md new file mode 100644 index 000000000..c8a91df40 --- /dev/null +++ b/minimal-examples/ws-server/minimal-ws-server-threadpool/README.md @@ -0,0 +1,26 @@ +# lws minimal ws server (threadpool) + +## build + +``` + $ cmake . && make +``` + +Pthreads is required on your system. + +This demonstrates how to cleanly assign tasks bound to a wsi to a thread pool, +with a queue if the pool is occupied. + +It creates a threadpool with 3 worker threads and a maxiumum queue size of 4. + +The web page at http://localhost:7681 then starts up 8 x ws connections. + +## usage + +``` + $ ./lws-minimal-ws-server-threadpool +[2018/03/13 13:09:52:2208] USER: LWS minimal ws server + threadpool | visit http://localhost:7681 +[2018/03/13 13:09:52:2365] NOTICE: Creating Vhost 'default' port 7681, 2 protocols, IPv6 off +``` + + diff --git a/minimal-examples/ws-server/minimal-ws-server-threadpool/minimal-ws-server-threadpool.c b/minimal-examples/ws-server/minimal-ws-server-threadpool/minimal-ws-server-threadpool.c new file mode 100644 index 000000000..3b4ad7a85 --- /dev/null +++ b/minimal-examples/ws-server/minimal-ws-server-threadpool/minimal-ws-server-threadpool.c @@ -0,0 +1,127 @@ +/* + * lws-minimal-ws-server=threadpool + * + * Copyright (C) 2018 Andy Green + * + * This file is made available under the Creative Commons CC0 1.0 + * Universal Public Domain Dedication. + * + * This demonstrates a minimal ws server that can cooperate with + * other threads cleanly. Two other threads are started, which fill + * a ringbuffer with strings at 10Hz. + * + * The actual work and thread spawning etc are done in the protocol + * implementation in protocol_lws_minimal.c. + * + * To keep it simple, it serves stuff in the subdirectory "./mount-origin" of + * the directory it was started in. + * You can change that by changing mount.origin. + */ + +#include +#include +#include +#include + +#define LWS_PLUGIN_STATIC +#include "protocol_lws_minimal_threadpool.c" + +static struct lws_protocols protocols[] = { + { "http", lws_callback_http_dummy, 0, 0 }, + LWS_PLUGIN_PROTOCOL_MINIMAL, + { NULL, NULL, 0, 0 } /* terminator */ +}; + +static int interrupted; + +static const struct lws_http_mount mount = { + /* .mount_next */ NULL, /* linked-list "next" */ + /* .mountpoint */ "/", /* mountpoint URL */ + /* .origin */ "./mount-origin", /* serve from dir */ + /* .def */ "index.html", /* default filename */ + /* .protocol */ NULL, + /* .cgienv */ NULL, + /* .extra_mimetypes */ NULL, + /* .interpret */ NULL, + /* .cgi_timeout */ 0, + /* .cache_max_age */ 0, + /* .auth_mask */ 0, + /* .cache_reusable */ 0, + /* .cache_revalidate */ 0, + /* .cache_intermediaries */ 0, + /* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */ + /* .mountpoint_len */ 1, /* char count */ + /* .basic_auth_login_file */ NULL, +}; + +/* + * This demonstrates how to pass a pointer into a specific protocol handler + * running on a specific vhost. In this case, it's our default vhost and + * we pass the pvo named "config" with the value a const char * "myconfig". + * + * This is the preferred way to pass configuration into a specific vhost + + * protocol instance. + */ + +static const struct lws_protocol_vhost_options pvo_ops = { + NULL, + NULL, + "config", /* pvo name */ + (void *)"myconfig" /* pvo value */ +}; + +static const struct lws_protocol_vhost_options pvo = { + NULL, /* "next" pvo linked-list */ + &pvo_ops, /* "child" pvo linked-list */ + "lws-minimal", /* protocol name we belong to on this vhost */ + "" /* ignored */ +}; + +void sigint_handler(int sig) +{ + interrupted = 1; +} + +int main(int argc, const char **argv) +{ + struct lws_context_creation_info info; + struct lws_context *context; + const char *p; + int logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE + /* for LLL_ verbosity above NOTICE to be built into lws, + * lws must have been configured and built with + * -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */ + /* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */ + /* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */ + /* | LLL_DEBUG */; + + signal(SIGINT, sigint_handler); + + if ((p = lws_cmdline_option(argc, argv, "-d"))) + logs = atoi(p); + + lws_set_log_level(logs, NULL); + lwsl_user("LWS minimal ws server + threadpool | visit http://localhost:7681\n"); + + memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ + info.port = 7681; + info.mounts = &mount; + info.protocols = protocols; + info.pvo = &pvo; /* per-vhost options */ + + context = lws_create_context(&info); + if (!context) { + lwsl_err("lws init failed\n"); + return 1; + } + + /* start the threads that create content */ + + while (!interrupted) + if (lws_service(context, 1000)) + interrupted = 1; + + lws_context_destroy(context); + + return 0; +} diff --git a/minimal-examples/ws-server/minimal-ws-server-threadpool/mount-origin/favicon.ico b/minimal-examples/ws-server/minimal-ws-server-threadpool/mount-origin/favicon.ico new file mode 100644 index 000000000..c0cc2e3df Binary files /dev/null and b/minimal-examples/ws-server/minimal-ws-server-threadpool/mount-origin/favicon.ico differ diff --git a/minimal-examples/ws-server/minimal-ws-server-threadpool/mount-origin/index.html b/minimal-examples/ws-server/minimal-ws-server-threadpool/mount-origin/index.html new file mode 100644 index 000000000..47fb96b7f --- /dev/null +++ b/minimal-examples/ws-server/minimal-ws-server-threadpool/mount-origin/index.html @@ -0,0 +1,97 @@ + + + +
+ + Minimal ws server threadpool example.
+ 8 x ws connections are opened back to the example server.
+ There are three threads in the pool to service them, the
+ remainder are queued until a thread in the pool is free.

+ The textarea show the last 50 lines received. +
+
+
+ + + + + + diff --git a/minimal-examples/ws-server/minimal-ws-server-threadpool/mount-origin/libwebsockets.org-logo.png b/minimal-examples/ws-server/minimal-ws-server-threadpool/mount-origin/libwebsockets.org-logo.png new file mode 100644 index 000000000..2060a10c9 Binary files /dev/null and b/minimal-examples/ws-server/minimal-ws-server-threadpool/mount-origin/libwebsockets.org-logo.png differ diff --git a/minimal-examples/ws-server/minimal-ws-server-threadpool/protocol_lws_minimal_threadpool.c b/minimal-examples/ws-server/minimal-ws-server-threadpool/protocol_lws_minimal_threadpool.c new file mode 100644 index 000000000..c03888987 --- /dev/null +++ b/minimal-examples/ws-server/minimal-ws-server-threadpool/protocol_lws_minimal_threadpool.c @@ -0,0 +1,343 @@ +/* + * ws protocol handler plugin for "lws-minimal" demonstrating lws threadpool + * + * Copyright (C) 2010-2018 Andy Green + * + * This file is made available under the Creative Commons CC0 1.0 + * Universal Public Domain Dedication. + * + * The main reason some things are as they are is that the task lifecycle may + * be unrelated to the wsi lifecycle that queued that task. + * + * Consider the task may call an external library and run for 30s without + * "checking in" to see if it should stop. The wsi that started the task may + * have closed at any time before the 30s are up, with the browser window + * closing or whatever. + * + * So data shared between the asynchronous task and the wsi must have its + * lifecycle determined by the task, not the wsi. That means a separate struct + * that can be freed by the task. + * + * In the case the wsi outlives the task, the tasks do not get destroyed until + * the service thread has called lws_threadpool_task_status() on the completed + * task. So there is no danger of the shared task private data getting randomly + * freed. + */ + +#if !defined (LWS_PLUGIN_STATIC) +#define LWS_DLL +#define LWS_INTERNAL +#include +#endif + +#include + +struct per_vhost_data__minimal { + struct lws_threadpool *tp; + const char *config; +}; + +struct task_data { + char result[64]; + + uint64_t pos, end; +}; + +/* + * Create the private data for the task + * + * Notice we hand over responsibility for the cleanup and freeing of the + * allocated task_data to the threadpool, because the wsi it was originally + * bound to may close while the thread is still running. So we allocate + * something discrete for the task private data that can be definitively owned + * and freed by the threadpool, not the wsi... the pss won't do, as it only + * exists for the lifecycle of the wsi connection. + * + * When the task is created, we also tell it how to destroy the private data + * by giving it args.cleanup as cleanup_task_private_data() defined below. + */ + +static struct task_data * +create_task_private_data(void) +{ + struct task_data *priv = malloc(sizeof(*priv)); + + return priv; +} + +/* + * Destroy the private data for the task + * + * Notice the wsi the task was originally bound to may be long gone, in the + * case we are destroying the lws context and the thread was doing something + * for a long time without checking in. + */ +static void +cleanup_task_private_data(struct lws *wsi, void *user) +{ + struct task_data *priv = (struct task_data *)user; + + free(priv); +} + +/* + * This runs in its own thread, from the threadpool. + * + * The implementation behind this in lws uses pthreads, but no pthreadisms are + * required in the user code. + * + * The example counts to 10M, "checking in" to see if it should stop after every + * 100K and pausing to sync with the service thread to send a ws message every + * 1M. It resumes after the service thread determines the wsi is writable and + * the LWS_CALLBACK_SERVER_WRITEABLE indicates the task thread can continue by + * calling lws_threadpool_task_sync(). + */ + +static enum lws_threadpool_task_return +task_function(void *user, enum lws_threadpool_task_status s) +{ + struct task_data *priv = (struct task_data *)user; + int budget = 100 * 1000; + + if (priv->pos == priv->end) + return LWS_TP_RETURN_FINISHED; + + /* + * Preferably replace this with ~100ms of your real task, so it + * can "check in" at short intervals to see if it has been asked to + * stop. + * + * You can just run tasks atomically here with the thread dedicated + * to it, but it will cause odd delays while shutting down etc and + * the task will run to completion even if the wsi that started it + * has since closed. + */ + + while (budget--) + priv->pos++; + + usleep(100000); + + if (!(priv->pos % (1000 * 1000))) { + lws_snprintf(priv->result + LWS_PRE, + sizeof(priv->result) - LWS_PRE, + "pos %llu", (unsigned long long)priv->pos); + + return LWS_TP_RETURN_SYNC; + } + + return LWS_TP_RETURN_CHECKING_IN; +} + +static int +callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len) +{ + struct per_vhost_data__minimal *vhd = + (struct per_vhost_data__minimal *) + lws_protocol_vh_priv_get(lws_get_vhost(wsi), + lws_get_protocol(wsi)); + const struct lws_protocol_vhost_options *pvo; + struct lws_threadpool_create_args cargs; + struct lws_threadpool_task_args args; + struct lws_threadpool_task *task; + struct task_data *priv; + int n, m, r = 0; + char name[32]; + void *_user; + + switch (reason) { + case LWS_CALLBACK_PROTOCOL_INIT: + /* create our per-vhost struct */ + vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), + lws_get_protocol(wsi), + sizeof(struct per_vhost_data__minimal)); + if (!vhd) + return 1; + + /* recover the pointer to the globals struct */ + pvo = lws_pvo_search( + (const struct lws_protocol_vhost_options *)in, + "config"); + if (!pvo || !pvo->value) { + lwsl_err("%s: Can't find \"config\" pvo\n", __func__); + return 1; + } + vhd->config = pvo->value; + + memset(&cargs, 0, sizeof(cargs)); + + cargs.max_queue_depth = 8; + cargs.threads = 3; + vhd->tp = lws_threadpool_create(lws_get_context(wsi), + &cargs, "%s", + lws_get_vhost_name(lws_get_vhost(wsi))); + if (!vhd->tp) + return 1; + + lws_timed_callback_vh_protocol(lws_get_vhost(wsi), + lws_get_protocol(wsi), + LWS_CALLBACK_USER, 1); + + break; + + case LWS_CALLBACK_PROTOCOL_DESTROY: + lws_threadpool_finish(vhd->tp); + lws_threadpool_destroy(vhd->tp); + break; + + case LWS_CALLBACK_USER: + + /* + * in debug mode, dump the threadpool stat to the logs once + * a second + */ + lws_threadpool_dump(vhd->tp); + lws_timed_callback_vh_protocol(lws_get_vhost(wsi), + lws_get_protocol(wsi), + LWS_CALLBACK_USER, 1); + break; + + case LWS_CALLBACK_ESTABLISHED: + + memset(&args, 0, sizeof(args)); + priv = args.user = create_task_private_data(); + if (!args.user) + return 1; + + priv->pos = 0; + priv->end = 10 * 1000 * 1000; + + /* queue the task... the task takes on responsibility for + * destroying args.user. pss->priv just has a copy of it */ + + args.wsi = wsi; + args.task = task_function; + args.cleanup = cleanup_task_private_data; + + lws_get_peer_simple(wsi, name, sizeof(name)); + + if (!lws_threadpool_enqueue(vhd->tp, &args, "ws %s", name)) { + lwsl_user("%s: Couldn't enqueue task\n", __func__); + cleanup_task_private_data(wsi, priv); + return 1; + } + + lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL, 30); + + /* + * so the asynchronous worker will let us know the next step + * by causing LWS_CALLBACK_SERVER_WRITEABLE + */ + + break; + + case LWS_CALLBACK_CLOSED: + break; + + case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: + lwsl_debug("LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: %p\n", wsi); + lws_threadpool_dequeue(wsi); + break; + + case LWS_CALLBACK_SERVER_WRITEABLE: + + /* + * even completed tasks wait in a queue until we call the + * below on them. Then they may destroy themselves and their + * args.user data (by calling the cleanup callback). + * + * If you need to get things from the still-valid private task + * data, copy it here before calling + * lws_threadpool_task_status() that may free the task and the + * private task data. + */ + + n = lws_threadpool_task_status_wsi(wsi, &task, &_user); + lwsl_debug("%s: LWS_CALLBACK_SERVER_WRITEABLE: status %d\n", + __func__, n); + switch(n) { + + case LWS_TP_STATUS_FINISHED: + case LWS_TP_STATUS_STOPPED: + case LWS_TP_STATUS_QUEUED: + case LWS_TP_STATUS_RUNNING: + case LWS_TP_STATUS_STOPPING: + return 0; + + case LWS_TP_STATUS_SYNCING: + /* the task has paused for us to do something */ + break; + default: + return -1; + } + + priv = (struct task_data *)_user; + + lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL_TASK, 5); + + n = strlen(priv->result + LWS_PRE); + m = lws_write(wsi, (unsigned char *)priv->result + LWS_PRE, + n, LWS_WRITE_TEXT); + if (m < n) { + lwsl_err("ERROR %d writing to ws socket\n", m); + lws_threadpool_task_sync(task, 1); + return -1; + } + + /* + * service thread has done whatever it wanted to do with the + * data the task produced: if it's waiting to do more it can + * continue now. + */ + lws_threadpool_task_sync(task, 0); + break; + + default: + break; + } + + return r; +} + +#define LWS_PLUGIN_PROTOCOL_MINIMAL \ + { \ + "lws-minimal", \ + callback_minimal, \ + 0, \ + 128, \ + 0, NULL, 0 \ + } + +#if !defined (LWS_PLUGIN_STATIC) + +/* boilerplate needed if we are built as a dynamic plugin */ + +static const struct lws_protocols protocols[] = { + LWS_PLUGIN_PROTOCOL_MINIMAL +}; + +LWS_EXTERN LWS_VISIBLE int +init_protocol_minimal(struct lws_context *context, + struct lws_plugin_capability *c) +{ + if (c->api_magic != LWS_PLUGIN_API_MAGIC) { + lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC, + c->api_magic); + return 1; + } + + c->protocols = protocols; + c->count_protocols = LWS_ARRAY_SIZE(protocols); + c->extensions = NULL; + c->count_extensions = 0; + + return 0; +} + +LWS_EXTERN LWS_VISIBLE int +destroy_protocol_minimal(struct lws_context *context) +{ + return 0; +} +#endif