/*
 * libwebsockets - small server side websockets and web server implementation
 *
 * Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to
 * deal in the Software without restriction, including without limitation the
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 * sell copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 * IN THE SOFTWARE.
 */

#if !defined(_GNU_SOURCE)
#define _GNU_SOURCE
#endif

#include <pthread.h>

#include "private-lib-core.h"

#include <string.h>
#include <stdio.h>

struct lws_threadpool;

struct lws_threadpool_task {
	struct lws_threadpool_task *task_queue_next;

	struct lws_threadpool *tp;
	char name[32];
	struct lws_threadpool_task_args args;

	lws_usec_t created;
	lws_usec_t acquired;
	lws_usec_t done;
	lws_usec_t entered_state;

	lws_usec_t acc_running;
	lws_usec_t acc_syncing;

	pthread_cond_t wake_idle;

	enum lws_threadpool_task_status status;

	int late_sync_retries;

	char wanted_writeable_cb;
	char outlive;
};

struct lws_pool {
	struct lws_threadpool *tp;
	pthread_t thread;
	pthread_mutex_t lock; /* part of task wake_idle */
	struct lws_threadpool_task *task;
	lws_usec_t acquired;
	int worker_index;
};

struct lws_threadpool {
	pthread_mutex_t lock; /* protects all pool lists */
	pthread_cond_t wake_idle;
	struct lws_pool *pool_list;

	struct lws_context *context;
	struct lws_threadpool *tp_list; /* context list of threadpools */

	struct lws_threadpool_task *task_queue_head;
	struct lws_threadpool_task *task_done_head;

	char name[32];

	int threads_in_pool;
	int queue_depth;
	int done_queue_depth;
	int max_queue_depth;
	int running_tasks;

	unsigned int destroying:1;
};

static int
ms_delta(lws_usec_t now, lws_usec_t then)
{
	return (int)((now - then) / 1000);
}

static void
us_accrue(lws_usec_t *acc, lws_usec_t then)
{
	lws_usec_t now = lws_now_usecs();

	*acc += now - then;
}

static int
pc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us)
{
	lws_usec_t delta = (now - then) + 1;

	return (int)((us * 100) / delta);
}

static void
__lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len)
{
	lws_usec_t now = lws_now_usecs();
	char *end = buf + len - 1;
	int syncms = 0, runms = 0;

	if (!task->acquired) {
		buf += lws_snprintf(buf, end - buf,
				    "task: %s, QUEUED queued: %dms",
				    task->name, ms_delta(now, task->created));

		return;
	}

	if (task->acc_running)
		runms = task->acc_running;

	if (task->acc_syncing)
		syncms = task->acc_syncing;

	if (!task->done) {
		buf += lws_snprintf(buf, end - buf,
			"task: %s, ONGOING state %d (%dms) alive: %dms "
			"(queued %dms, acquired: %dms, "
			"run: %d%%, sync: %d%%)", task->name, task->status,
			ms_delta(now, task->entered_state),
			ms_delta(now, task->created),
			ms_delta(task->acquired, task->created),
			ms_delta(now, task->acquired),
			pc_delta(now, task->acquired, runms),
			pc_delta(now, task->acquired, syncms));

		return;
	}

	lws_snprintf(buf, end - buf,
		"task: %s, DONE state %d lived: %dms "
		"(queued %dms, on thread: %dms, "
		"ran: %d%%, synced: %d%%)", task->name, task->status,
		ms_delta(task->done, task->created),
		ms_delta(task->acquired, task->created),
		ms_delta(task->done, task->acquired),
		pc_delta(task->done, task->acquired, runms),
		pc_delta(task->done, task->acquired, syncms));
}

void
lws_threadpool_dump(struct lws_threadpool *tp)
{
#if defined(_DEBUG)
	struct lws_threadpool_task **c;
	char buf[160];
	int n, count;

	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */

	lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__,
		    tp->name, tp->queue_depth, tp->running_tasks,
		    tp->done_queue_depth);

	count = 0;
	c = &tp->task_queue_head;
	while (*c) {
		struct lws_threadpool_task *task = *c;
		__lws_threadpool_task_dump(task, buf, sizeof(buf));
		lwsl_thread("  - %s\n", buf);
		count++;

		c = &(*c)->task_queue_next;
	}

	if (count != tp->queue_depth)
		lwsl_err("%s: tp says queue depth %d, but actually %d\n",
			 __func__, tp->queue_depth, count);

	count = 0;
	for (n = 0; n < tp->threads_in_pool; n++) {
		struct lws_pool *pool = &tp->pool_list[n];
		struct lws_threadpool_task *task = pool->task;

		if (task) {
			__lws_threadpool_task_dump(task, buf, sizeof(buf));
			lwsl_thread("  - worker %d: %s\n", n, buf);
			count++;
		}
	}

	if (count != tp->running_tasks)
		lwsl_err("%s: tp says %d running_tasks, but actually %d\n",
			 __func__, tp->running_tasks, count);

	count = 0;
	c = &tp->task_done_head;
	while (*c) {
		struct lws_threadpool_task *task = *c;
		__lws_threadpool_task_dump(task, buf, sizeof(buf));
		lwsl_thread("  - %s\n", buf);
		count++;

		c = &(*c)->task_queue_next;
	}

	if (count != tp->done_queue_depth)
		lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n",
			 __func__, tp->done_queue_depth, count);

	pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
#endif
}

static void
state_transition(struct lws_threadpool_task *task,
		 enum lws_threadpool_task_status status)
{
	task->entered_state = lws_now_usecs();
	task->status = status;
}

static void
lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
{
	if (task->args.cleanup)
		task->args.cleanup(task->args.wsi, task->args.user);

	if (task->args.wsi)
		task->args.wsi->tp_task = NULL;

	lwsl_thread("%s: tp %p: cleaned finished task for wsi %p\n",
		    __func__, task->tp, task->args.wsi);

	lws_free(task);
}

static void
__lws_threadpool_reap(struct lws_threadpool_task *task)
{
	struct lws_threadpool_task **c, *t = NULL;
	struct lws_threadpool *tp = task->tp;

	/* remove the task from the done queue */

	c = &tp->task_done_head;

	while (*c) {
		if ((*c) == task) {
			t = *c;
			*c = t->task_queue_next;
			t->task_queue_next = NULL;
			tp->done_queue_depth--;

			lwsl_thread("%s: tp %s: reaped task wsi %p\n", __func__,
				   tp->name, task->args.wsi);

			break;
		}
		c = &(*c)->task_queue_next;
	}

	if (!t)
		lwsl_err("%s: task %p not in done queue\n", __func__, task);

	/* call the task's cleanup and delete the task itself */

	lws_threadpool_task_cleanup_destroy(task);
}

/*
 * this gets called from each tsi service context after the service was
 * cancelled... we need to ask for the writable callback from the matching
 * tsi context for any wsis bound to a worked thread that need it
 */

int
lws_threadpool_tsi_context(struct lws_context *context, int tsi)
{
	struct lws_threadpool_task **c, *task = NULL;
	struct lws_threadpool *tp;
	struct lws *wsi;

	lws_context_lock(context, __func__);

	tp = context->tp_list_head;
	while (tp) {
		int n;

		/* for the running (syncing...) tasks... */

		for (n = 0; n < tp->threads_in_pool; n++) {
			struct lws_pool *pool = &tp->pool_list[n];

			task = pool->task;
			if (!task)
				continue;

			wsi = task->args.wsi;
			if (!wsi || wsi->tsi != tsi ||
			    !task->wanted_writeable_cb)
				continue;

			task->wanted_writeable_cb = 0;
			lws_memory_barrier();

			/*
			 * finally... we can ask for the callback on
			 * writable from the correct service thread
			 * context
			 */

			lws_callback_on_writable(wsi);
		}

		/* for the done tasks... */

		c = &tp->task_done_head;

		while (*c) {
			task = *c;
			wsi = task->args.wsi;

			if (wsi && wsi->tsi == tsi &&
			    task->wanted_writeable_cb) {

				task->wanted_writeable_cb = 0;
				lws_memory_barrier();

				/*
				 * finally... we can ask for the callback on
				 * writable from the correct service thread
				 * context
				 */

				lws_callback_on_writable(wsi);
			}

			c = &task->task_queue_next;
		}

		tp = tp->tp_list;
	}

	lws_context_unlock(context);

	return 0;
}

static int
lws_threadpool_worker_sync(struct lws_pool *pool,
			   struct lws_threadpool_task *task)
{
	enum lws_threadpool_task_status temp;
	struct timespec abstime;
	struct lws *wsi;
	int tries = 15;

	/* block until writable acknowledges */
	lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task);
	pthread_mutex_lock(&pool->lock); /* ======================= pool lock */

	lwsl_info("%s: %s: task %p (%s): syncing with wsi %p\n", __func__,
		    pool->tp->name, task, task->name, task->args.wsi);

	temp = task->status;
	state_transition(task, LWS_TP_STATUS_SYNCING);
	while (tries--) {
		wsi = task->args.wsi;

		/*
		 * if the wsi is no longer attached to this task, there is
		 * nothing we can sync to usefully.  Since the work wants to
		 * sync, it means we should react to the situation by telling
		 * the task it can't continue usefully by stopping it.
		 */

		if (!wsi) {
			lwsl_thread("%s: %s: task %p (%s): No longer bound to any "
				 "wsi to sync to\n", __func__, pool->tp->name,
				 task, task->name);

			state_transition(task, LWS_TP_STATUS_STOPPING);
			goto done;
		}

		/*
		 * So tries times this is the maximum time between SYNC asking
		 * for a callback on writable and actually getting it we are
		 * willing to sit still for.
		 *
		 * If it is exceeded, we will stop the task.
		 */
		abstime.tv_sec = time(NULL) + 2;
		abstime.tv_nsec = 0;

		task->wanted_writeable_cb = 1;
		lws_memory_barrier();

		/*
		 * This will cause lws_threadpool_tsi_context() to get called
		 * from each tsi service context, where we can safely ask for
		 * a callback on writeable on the wsi we are associated with.
		 */
		lws_cancel_service(lws_get_context(wsi));

		/*
		 * so the danger here is that we asked for a writable callback
		 * on the wsi, but for whatever reason, we are never going to
		 * get one.  To avoid deadlocking forever, we allow a set time
		 * for the sync to happen naturally, otherwise the cond wait
		 * times out and we stop the task.
		 */

		if (pthread_cond_timedwait(&task->wake_idle, &pool->lock,
					   &abstime) == ETIMEDOUT) {
			task->late_sync_retries++;
			if (!tries) {
				lwsl_err("%s: %s: task %p (%s): SYNC timed out "
					 "(associated wsi %p)\n",
					 __func__, pool->tp->name, task,
					 task->name, task->args.wsi);

				state_transition(task, LWS_TP_STATUS_STOPPING);
				goto done;
			}

			continue;
		} else
			break;
	}

	if (task->status == LWS_TP_STATUS_SYNCING)
		state_transition(task, temp);

	lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task);

done:
	pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */

	return 0;
}

static void *
lws_threadpool_worker(void *d)
{
	struct lws_threadpool_task **c, **c2, *task;
	struct lws_pool *pool = d;
	struct lws_threadpool *tp = pool->tp;
	char buf[160];

	while (!tp->destroying) {

		/* we have no running task... wait and get one from the queue */

		pthread_mutex_lock(&tp->lock); /* =================== tp lock */

		/*
		 * if there's no task already waiting in the queue, wait for
		 * the wake_idle condition to signal us that might have changed
		 */
		while (!tp->task_queue_head && !tp->destroying)
			pthread_cond_wait(&tp->wake_idle, &tp->lock);

		if (tp->destroying) {
			pthread_mutex_unlock(&tp->lock);  /* ------ tp unlock */
			continue;
		}

		c = &tp->task_queue_head;
		c2 = NULL;
		task = NULL;
		pool->task = NULL;

		/* look at the queue tail */
		while (*c) {
			c2 = c;
			c = &(*c)->task_queue_next;
		}

		/* is there a task at the queue tail? */
		if (c2 && *c2) {
			pool->task = task = *c2;
			task->acquired = pool->acquired = lws_now_usecs();
			/* remove it from the queue */
			*c2 = task->task_queue_next;
			task->task_queue_next = NULL;
			tp->queue_depth--;
			/* mark it as running */
			state_transition(task, LWS_TP_STATUS_RUNNING);
		}

		/* someone else got it first... wait and try again */
		if (!task) {
			pthread_mutex_unlock(&tp->lock);  /* ------ tp unlock */
			continue;
		}

		task->wanted_writeable_cb = 0;

		/* we have acquired a new task */

		__lws_threadpool_task_dump(task, buf, sizeof(buf));

		lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n",
			    __func__, tp->name, pool->worker_index, buf);
		tp->running_tasks++;

		pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */

		/*
		 * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to
		 * "resurface" periodically, and get called again with
		 * cont = 1 immediately to indicate it is picking up where it
		 * left off if the task is not being "stopped".
		 *
		 * This allows long tasks to respond to requests to stop in
		 * a clean and opaque way.
		 *
		 * 2) The task can return with LWS_TP_RETURN_SYNC to register
		 * a "callback on writable" request on the service thread and
		 * block until it hears back from the WRITABLE handler.
		 *
		 * This allows the work on the thread to be synchronized to the
		 * previous work being dispatched cleanly.
		 *
		 * 3) The task can return with LWS_TP_RETURN_FINISHED to
		 * indicate its work is completed nicely.
		 *
		 * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate
		 * it stopped and cleaned up after incomplete work.
		 */

		do {
			lws_usec_t then;
			int n;

			if (tp->destroying || !task->args.wsi) {
				lwsl_info("%s: stopping on wsi gone\n", __func__);
				state_transition(task, LWS_TP_STATUS_STOPPING);
			}

			then = lws_now_usecs();
			n = task->args.task(task->args.user, task->status);
			lwsl_debug("   %d, status %d\n", n, task->status);
			us_accrue(&task->acc_running, then);
			if (n & LWS_TP_RETURN_FLAG_OUTLIVE)
				task->outlive = 1;
			switch (n & 7) {
			case LWS_TP_RETURN_CHECKING_IN:
				/* if not destroying the tp, continue */
				break;
			case LWS_TP_RETURN_SYNC:
				if (!task->args.wsi) {
					lwsl_debug("%s: task that wants to "
						    "outlive lost wsi asked "
						    "to sync: bypassed\n",
						    __func__);
					break;
				}
				/* block until writable acknowledges */
				then = lws_now_usecs();
				lws_threadpool_worker_sync(pool, task);
				us_accrue(&task->acc_syncing, then);
				break;
			case LWS_TP_RETURN_FINISHED:
				state_transition(task, LWS_TP_STATUS_FINISHED);
				break;
			case LWS_TP_RETURN_STOPPED:
				state_transition(task, LWS_TP_STATUS_STOPPED);
				break;
			}
		} while (task->status == LWS_TP_STATUS_RUNNING);

		pthread_mutex_lock(&tp->lock); /* =================== tp lock */

		tp->running_tasks--;

		if (pool->task->status == LWS_TP_STATUS_STOPPING)
			state_transition(task, LWS_TP_STATUS_STOPPED);

		/* move the task to the done queue */

		pool->task->task_queue_next = tp->task_done_head;
		tp->task_done_head = task;
		tp->done_queue_depth++;
		pool->task->done = lws_now_usecs();

		if (!pool->task->args.wsi &&
		    (pool->task->status == LWS_TP_STATUS_STOPPED ||
		     pool->task->status == LWS_TP_STATUS_FINISHED)) {

			__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
			lwsl_thread("%s: %s: worker %d REAPING: %s\n",
				    __func__, tp->name, pool->worker_index,
				    buf);

			/*
			 * there is no longer any wsi attached, so nothing is
			 * going to take care of reaping us.  So we must take
			 * care of it ourselves.
			 */
			__lws_threadpool_reap(pool->task);
		} else {

			__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
			lwsl_thread("%s: %s: worker %d DONE: %s\n",
				    __func__, tp->name, pool->worker_index,
				    buf);

			/* signal the associated wsi to take a fresh look at
			 * task status */

			if (pool->task->args.wsi) {
				task->wanted_writeable_cb = 1;

				lws_cancel_service(
					lws_get_context(pool->task->args.wsi));
			}
		}

		pool->task = NULL;
		pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
	}

	/* threadpool is being destroyed */

	pthread_exit(NULL);

	return NULL;
}

struct lws_threadpool *
lws_threadpool_create(struct lws_context *context,
		      const struct lws_threadpool_create_args *args,
		      const char *format, ...)
{
	struct lws_threadpool *tp;
	va_list ap;
	int n;

	tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * args->threads),
			"threadpool alloc");
	if (!tp)
		return NULL;

	memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * args->threads));
	tp->pool_list = (struct lws_pool *)(tp + 1);
	tp->max_queue_depth = args->max_queue_depth;

	va_start(ap, format);
	n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap);
	va_end(ap);

	lws_context_lock(context, __func__);

	tp->context = context;
	tp->tp_list = context->tp_list_head;
	context->tp_list_head = tp;

	lws_context_unlock(context);

	pthread_mutex_init(&tp->lock, NULL);
	pthread_cond_init(&tp->wake_idle, NULL);

	for (n = 0; n < args->threads; n++) {
#if defined(LWS_HAS_PTHREAD_SETNAME_NP)
		char name[16];
#endif
		tp->pool_list[n].tp = tp;
		tp->pool_list[n].worker_index = n;
		pthread_mutex_init(&tp->pool_list[n].lock, NULL);
		if (pthread_create(&tp->pool_list[n].thread, NULL,
				   lws_threadpool_worker, &tp->pool_list[n])) {
			lwsl_err("thread creation failed\n");
		} else {
#if defined(LWS_HAS_PTHREAD_SETNAME_NP)
			lws_snprintf(name, sizeof(name), "%s-%d", tp->name, n);
			pthread_setname_np(tp->pool_list[n].thread, name);
#endif
			tp->threads_in_pool++;
		}
	}

	return tp;
}

void
lws_threadpool_finish(struct lws_threadpool *tp)
{
	struct lws_threadpool_task **c, *task;

	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */

	/* nothing new can start, running jobs will abort as STOPPED and the
	 * pool threads will exit ASAP (they are joined in destroy) */
	tp->destroying = 1;

	/* stop everyone in the pending queue and move to the done queue */

	c = &tp->task_queue_head;
	while (*c) {
		task = *c;
		*c = task->task_queue_next;
		task->task_queue_next = tp->task_done_head;
		tp->task_done_head = task;
		state_transition(task, LWS_TP_STATUS_STOPPED);
		tp->queue_depth--;
		tp->done_queue_depth++;
		task->done = lws_now_usecs();

		c = &task->task_queue_next;
	}

	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */

	pthread_cond_broadcast(&tp->wake_idle);
}

void
lws_threadpool_destroy(struct lws_threadpool *tp)
{
	struct lws_threadpool_task *task, *next;
	struct lws_threadpool **ptp;
	void *retval;
	int n;

	/* remove us from the context list of threadpools */

	lws_context_lock(tp->context, __func__);

	ptp = &tp->context->tp_list_head;
	while (*ptp) {
		if (*ptp == tp) {
			*ptp = tp->tp_list;
			break;
		}
		ptp = &(*ptp)->tp_list;
	}

	lws_context_unlock(tp->context);


	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */

	tp->destroying = 1;
	pthread_cond_broadcast(&tp->wake_idle);
	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */

	lws_threadpool_dump(tp);

	for (n = 0; n < tp->threads_in_pool; n++) {
		task = tp->pool_list[n].task;

		/* he could be sitting waiting for SYNC */

		if (task != NULL)
			pthread_cond_broadcast(&task->wake_idle);

		pthread_join(tp->pool_list[n].thread, &retval);
		pthread_mutex_destroy(&tp->pool_list[n].lock);
	}
	lwsl_info("%s: all threadpools exited\n", __func__);

	task = tp->task_done_head;
	while (task) {
		next = task->task_queue_next;
		lws_threadpool_task_cleanup_destroy(task);
		tp->done_queue_depth--;
		task = next;
	}

	pthread_mutex_destroy(&tp->lock);

	lws_free(tp);
}

/*
 * we want to stop and destroy the task and related priv.  The wsi may no
 * longer exist.
 */

int
lws_threadpool_dequeue(struct lws *wsi)
{
	struct lws_threadpool *tp;
	struct lws_threadpool_task **c, *task;
	int n;

	task = wsi->tp_task;
	if (!task)
		return 0;

	tp = task->tp;
	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */

	if (task->outlive && !tp->destroying) {

		/* disconnect from wsi, and wsi from task */

		wsi->tp_task = NULL;
		task->args.wsi = NULL;

		goto bail;
	}


	c = &tp->task_queue_head;

	/* is he queued waiting for a chance to run?  Mark him as stopped and
	 * move him on to the done queue */

	while (*c) {
		if ((*c) == task) {
			*c = task->task_queue_next;
			task->task_queue_next = tp->task_done_head;
			tp->task_done_head = task;
			state_transition(task, LWS_TP_STATUS_STOPPED);
			tp->queue_depth--;
			tp->done_queue_depth++;
			task->done = lws_now_usecs();

			lwsl_debug("%s: tp %p: removed queued task wsi %p\n",
				    __func__, tp, task->args.wsi);

			break;
		}
		c = &(*c)->task_queue_next;
	}

	/* is he on the done queue? */

	c = &tp->task_done_head;
	while (*c) {
		if ((*c) == task) {
			*c = task->task_queue_next;
			task->task_queue_next = NULL;
			lws_threadpool_task_cleanup_destroy(task);
			tp->done_queue_depth--;
			goto bail;
		}
		c = &(*c)->task_queue_next;
	}

	/* he's not in the queue... is he already running on a thread? */

	for (n = 0; n < tp->threads_in_pool; n++) {
		if (!tp->pool_list[n].task || tp->pool_list[n].task != task)
			continue;

		/*
		 * ensure we don't collide with tests or changes in the
		 * worker thread
		 */
		pthread_mutex_lock(&tp->pool_list[n].lock);

		/*
		 * mark him as having been requested to stop...
		 * the caller will hear about it in his service thread
		 * context as a request to close
		 */
		state_transition(task, LWS_TP_STATUS_STOPPING);

		/* disconnect from wsi, and wsi from task */

		task->args.wsi->tp_task = NULL;
		task->args.wsi = NULL;

		pthread_mutex_unlock(&tp->pool_list[n].lock);

		lwsl_debug("%s: tp %p: request stop running task "
			    "for wsi %p\n", __func__, tp, task->args.wsi);

		break;
	}

	if (n == tp->threads_in_pool) {
		/* can't find it */
		lwsl_notice("%s: tp %p: no task for wsi %p, decoupling\n",
			    __func__, tp, task->args.wsi);
		task->args.wsi->tp_task = NULL;
		task->args.wsi = NULL;
	}

bail:
	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */

	return 0;
}

struct lws_threadpool_task *
lws_threadpool_enqueue(struct lws_threadpool *tp,
		       const struct lws_threadpool_task_args *args,
		       const char *format, ...)
{
	struct lws_threadpool_task *task = NULL;
	va_list ap;

	if (tp->destroying)
		return NULL;

	pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */

	/*
	 * if there's room on the queue, the job always goes on the queue
	 * first, then any free thread may pick it up after the wake_idle
	 */

	if (tp->queue_depth == tp->max_queue_depth) {
		lwsl_notice("%s: queue reached limit %d\n", __func__,
			    tp->max_queue_depth);

		goto bail;
	}

	/*
	 * create the task object
	 */

	task = lws_malloc(sizeof(*task), __func__);
	if (!task)
		goto bail;

	memset(task, 0, sizeof(*task));
	pthread_cond_init(&task->wake_idle, NULL);
	task->args = *args;
	task->tp = tp;
	task->created = lws_now_usecs();

	va_start(ap, format);
	vsnprintf(task->name, sizeof(task->name) - 1, format, ap);
	va_end(ap);

	/*
	 * add him on the tp task queue
	 */

	task->task_queue_next = tp->task_queue_head;
	state_transition(task, LWS_TP_STATUS_QUEUED);
	tp->task_queue_head = task;
	tp->queue_depth++;

	/*
	 * mark the wsi itself as depending on this tp (so wsi close for
	 * whatever reason can clean up)
	 */

	args->wsi->tp_task = task;

	lwsl_thread("%s: tp %s: enqueued task %p (%s) for wsi %p, depth %d\n",
		    __func__, tp->name, task, task->name, args->wsi,
		    tp->queue_depth);

	/* alert any idle thread there's something new on the task list */

	lws_memory_barrier();
	pthread_cond_signal(&tp->wake_idle);

bail:
	pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */

	return task;
}

/* this should be called from the service thread */

enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws *wsi,
			       struct lws_threadpool_task **task, void **user)
{
	enum lws_threadpool_task_status status;
	struct lws_threadpool *tp;

	*task = wsi->tp_task;
	if (!*task)
		return -1;

	tp = (*task)->tp;
	*user = (*task)->args.user;
	status = (*task)->status;

	if (status == LWS_TP_STATUS_FINISHED ||
	    status == LWS_TP_STATUS_STOPPED) {
		char buf[160];

		pthread_mutex_lock(&tp->lock); /* ================ tpool lock */
		__lws_threadpool_task_dump(*task, buf, sizeof(buf));
		lwsl_thread("%s: %s: service thread REAPING: %s\n",
			    __func__, tp->name, buf);
		__lws_threadpool_reap(*task);
		lws_memory_barrier();
		pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */
	}

	return status;
}

void
lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
{
	lwsl_debug("%s\n", __func__);

	if (stop)
		state_transition(task, LWS_TP_STATUS_STOPPING);

	pthread_cond_signal(&task->wake_idle);
}