diff --git a/include/libwebsockets/lws-threadpool.h b/include/libwebsockets/lws-threadpool.h index 94440d021..5b6cf0800 100644 --- a/include/libwebsockets/lws-threadpool.h +++ b/include/libwebsockets/lws-threadpool.h @@ -71,7 +71,11 @@ struct lws_threadpool_create_args { }; struct lws_threadpool_task_args { - struct lws *wsi; /**< user must set to wsi task is bound to */ +#if defined(LWS_WITH_SECURE_STREAMS) + struct lws_ss_handle *ss; /**< either wsi or ss must be set */ +#endif + struct lws *wsi; /**< either wsi or ss must be set */ + void *user; /**< user may set (user-private pointer) */ const char *name; /**< user may set to describe task */ char async_task; /**< set to allow the task to shrug off the loss @@ -177,6 +181,11 @@ lws_threadpool_enqueue(struct lws_threadpool *tp, LWS_VISIBLE LWS_EXTERN int lws_threadpool_dequeue(struct lws *wsi); +#if defined(LWS_WITH_SECURE_STREAMS) +LWS_VISIBLE LWS_EXTERN int +lws_threadpool_dequeue_ss(struct lws_ss_handle *ss); +#endif + /** * lws_threadpool_task_status() - Dequeue or try to stop a running task * @@ -198,6 +207,12 @@ LWS_VISIBLE LWS_EXTERN enum lws_threadpool_task_status lws_threadpool_task_status_wsi(struct lws *wsi, struct lws_threadpool_task **task, void **user); +#if defined(LWS_WITH_SECURE_STREAMS) +LWS_VISIBLE LWS_EXTERN enum lws_threadpool_task_status +lws_threadpool_task_status_ss(struct lws_ss_handle *ss, + struct lws_threadpool_task **task, void **user); +#endif + /** * lws_threadpool_task_sync() - Indicate to a stalled task it may continue * diff --git a/lib/misc/threadpool/threadpool.c b/lib/misc/threadpool/threadpool.c index 2a8bcbeaa..f12d980ce 100644 --- a/lib/misc/threadpool/threadpool.c +++ b/lib/misc/threadpool/threadpool.c @@ -1,7 +1,7 @@ /* * libwebsockets - small server side websockets and web server implementation * - * Copyright (C) 2010 - 2019 Andy Green + * Copyright (C) 2010 - 2020 Andy Green * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to @@ -232,17 +232,27 @@ state_transition(struct lws_threadpool_task *task, task->status = status; } +static struct lws * +task_to_wsi(struct lws_threadpool_task *task) +{ +#if defined(LWS_WITH_SECURE_STREAMS) + if (task->args.ss) + return task->args.ss->wsi; +#endif + return task->args.wsi; +} + static void lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task) { if (task->args.cleanup) - task->args.cleanup(task->args.wsi, task->args.user); + task->args.cleanup(task_to_wsi(task), task->args.user); - if (task->args.wsi) - task->args.wsi->tp_task = NULL; + if (task_to_wsi(task)) + task_to_wsi(task)->tp_task = NULL; lwsl_thread("%s: tp %p: cleaned finished task for wsi %p\n", - __func__, task->tp, task->args.wsi); + __func__, task->tp, task_to_wsi(task)); lws_free(task); } @@ -265,7 +275,7 @@ __lws_threadpool_reap(struct lws_threadpool_task *task) tp->done_queue_depth--; lwsl_thread("%s: tp %s: reaped task wsi %p\n", __func__, - tp->name, task->args.wsi); + tp->name, task_to_wsi(task)); break; } @@ -308,7 +318,7 @@ lws_threadpool_tsi_context(struct lws_context *context, int tsi) if (!task) continue; - wsi = task->args.wsi; + wsi = task_to_wsi(task); if (!wsi || wsi->tsi != tsi || !task->wanted_writeable_cb) continue; @@ -331,7 +341,7 @@ lws_threadpool_tsi_context(struct lws_context *context, int tsi) while (*c) { task = *c; - wsi = task->args.wsi; + wsi = task_to_wsi(task); if (wsi && wsi->tsi == tsi && task->wanted_writeable_cb) { @@ -373,12 +383,12 @@ lws_threadpool_worker_sync(struct lws_pool *pool, pthread_mutex_lock(&pool->lock); /* ======================= pool lock */ lwsl_info("%s: %s: task %p (%s): syncing with wsi %p\n", __func__, - pool->tp->name, task, task->name, task->args.wsi); + pool->tp->name, task, task->name, task_to_wsi(task)); temp = task->status; state_transition(task, LWS_TP_STATUS_SYNCING); while (tries--) { - wsi = task->args.wsi; + wsi = task_to_wsi(task); /* * if the wsi is no longer attached to this task, there is @@ -431,7 +441,7 @@ lws_threadpool_worker_sync(struct lws_pool *pool, lwsl_err("%s: %s: task %p (%s): SYNC timed out " "(associated wsi %p)\n", __func__, pool->tp->name, task, - task->name, task->args.wsi); + task->name, task_to_wsi(task)); state_transition(task, LWS_TP_STATUS_STOPPING); goto done; @@ -547,7 +557,7 @@ lws_threadpool_worker(void *d) lws_usec_t then; int n; - if (tp->destroying || !task->args.wsi) { + if (tp->destroying || !task_to_wsi(task)) { lwsl_info("%s: stopping on wsi gone\n", __func__); state_transition(task, LWS_TP_STATUS_STOPPING); } @@ -563,7 +573,7 @@ lws_threadpool_worker(void *d) /* if not destroying the tp, continue */ break; case LWS_TP_RETURN_SYNC: - if (!task->args.wsi) { + if (!task_to_wsi(task)) { lwsl_debug("%s: task that wants to " "outlive lost wsi asked " "to sync: bypassed\n", @@ -623,11 +633,11 @@ lws_threadpool_worker(void *d) /* signal the associated wsi to take a fresh look at * task status */ - if (pool->task->args.wsi) { + if (task_to_wsi(pool->task)) { task->wanted_writeable_cb = 1; lws_cancel_service( - lws_get_context(pool->task->args.wsi)); + lws_get_context(task_to_wsi(pool->task))); } } @@ -812,6 +822,9 @@ lws_threadpool_dequeue(struct lws *wsi) wsi->tp_task = NULL; task->args.wsi = NULL; +#if defined(LWS_WITH_SECURE_STREAMS) + task->args.ss = NULL; +#endif goto bail; } @@ -833,7 +846,7 @@ lws_threadpool_dequeue(struct lws *wsi) task->done = lws_now_usecs(); lwsl_debug("%s: tp %p: removed queued task wsi %p\n", - __func__, tp, task->args.wsi); + __func__, tp, task_to_wsi(task)); break; } @@ -877,11 +890,14 @@ lws_threadpool_dequeue(struct lws *wsi) task->args.wsi->tp_task = NULL; task->args.wsi = NULL; +#if defined(LWS_WITH_SECURE_STREAMS) + task->args.ss = NULL; +#endif pthread_mutex_unlock(&tp->pool_list[n].lock); lwsl_debug("%s: tp %p: request stop running task " - "for wsi %p\n", __func__, tp, task->args.wsi); + "for wsi %p\n", __func__, tp, task_to_wsi(task)); break; } @@ -889,9 +905,12 @@ lws_threadpool_dequeue(struct lws *wsi) if (n == tp->threads_in_pool) { /* can't find it */ lwsl_notice("%s: tp %p: no task for wsi %p, decoupling\n", - __func__, tp, task->args.wsi); + __func__, tp, task_to_wsi(task)); task->args.wsi->tp_task = NULL; task->args.wsi = NULL; +#if defined(LWS_WITH_SECURE_STREAMS) + task->args.ss = NULL; +#endif } bail: @@ -900,6 +919,17 @@ bail: return 0; } +#if defined(LWS_WITH_SECURE_STREAMS) +int +lws_threadpool_dequeue_ss(struct lws_ss_handle *ss) +{ + if (ss) + return lws_threadpool_dequeue(ss->wsi); + + return 0; +} +#endif + struct lws_threadpool_task * lws_threadpool_enqueue(struct lws_threadpool *tp, const struct lws_threadpool_task_args *args, @@ -911,6 +941,10 @@ lws_threadpool_enqueue(struct lws_threadpool *tp, if (tp->destroying) return NULL; +#if defined(LWS_WITH_SECURE_STREAMS) + assert(args->ss || args->wsi); +#endif + pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ /* @@ -957,10 +991,15 @@ lws_threadpool_enqueue(struct lws_threadpool *tp, * whatever reason can clean up) */ - args->wsi->tp_task = task; +#if defined(LWS_WITH_SECURE_STREAMS) + if (args->ss) + args->ss->wsi->tp_task = task; + else +#endif + args->wsi->tp_task = task; lwsl_thread("%s: tp %s: enqueued task %p (%s) for wsi %p, depth %d\n", - __func__, tp->name, task, task->name, args->wsi, + __func__, tp->name, task, task->name, task_to_wsi(task), tp->queue_depth); /* alert any idle thread there's something new on the task list */ @@ -1007,6 +1046,18 @@ lws_threadpool_task_status_wsi(struct lws *wsi, return status; } +#if defined(LWS_WITH_SECURE_STREAMS) +enum lws_threadpool_task_status +lws_threadpool_task_status_ss(struct lws_ss_handle *ss, + struct lws_threadpool_task **task, void **user) +{ + if (ss) + return lws_threadpool_task_status_wsi(ss->wsi, task, user); + + return -1; +} +#endif + void lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop) {