1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

threadpool: enable to use secure streams as well as direct wsi

This extends threadpool slightly so it can bind enqueued tasks to a
secure streams handle as well as a straight wsi.

Either the .wsi must be set as before, or the .ss handle if you are
using secure streams, when enqueuing a task on the taskpool.

A couple of other helpers get ss-aware wrappers if LWS_WITH_SECURE_STREAMS

Although threadpool was originally designed for server (gitohashi)
actually it's also fine working with client wsi / Secure Streams,
if you have a situation a client connection is associated with heavy
processing.
This commit is contained in:
Andy Green 2020-03-16 13:58:07 +00:00
parent 99e6aff537
commit af20721500
2 changed files with 87 additions and 21 deletions

View file

@ -71,7 +71,11 @@ struct lws_threadpool_create_args {
};
struct lws_threadpool_task_args {
struct lws *wsi; /**< user must set to wsi task is bound to */
#if defined(LWS_WITH_SECURE_STREAMS)
struct lws_ss_handle *ss; /**< either wsi or ss must be set */
#endif
struct lws *wsi; /**< either wsi or ss must be set */
void *user; /**< user may set (user-private pointer) */
const char *name; /**< user may set to describe task */
char async_task; /**< set to allow the task to shrug off the loss
@ -177,6 +181,11 @@ lws_threadpool_enqueue(struct lws_threadpool *tp,
LWS_VISIBLE LWS_EXTERN int
lws_threadpool_dequeue(struct lws *wsi);
#if defined(LWS_WITH_SECURE_STREAMS)
LWS_VISIBLE LWS_EXTERN int
lws_threadpool_dequeue_ss(struct lws_ss_handle *ss);
#endif
/**
* lws_threadpool_task_status() - Dequeue or try to stop a running task
*
@ -198,6 +207,12 @@ LWS_VISIBLE LWS_EXTERN enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws *wsi,
struct lws_threadpool_task **task, void **user);
#if defined(LWS_WITH_SECURE_STREAMS)
LWS_VISIBLE LWS_EXTERN enum lws_threadpool_task_status
lws_threadpool_task_status_ss(struct lws_ss_handle *ss,
struct lws_threadpool_task **task, void **user);
#endif
/**
* lws_threadpool_task_sync() - Indicate to a stalled task it may continue
*

View file

@ -1,7 +1,7 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
@ -232,17 +232,27 @@ state_transition(struct lws_threadpool_task *task,
task->status = status;
}
static struct lws *
task_to_wsi(struct lws_threadpool_task *task)
{
#if defined(LWS_WITH_SECURE_STREAMS)
if (task->args.ss)
return task->args.ss->wsi;
#endif
return task->args.wsi;
}
static void
lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
{
if (task->args.cleanup)
task->args.cleanup(task->args.wsi, task->args.user);
task->args.cleanup(task_to_wsi(task), task->args.user);
if (task->args.wsi)
task->args.wsi->tp_task = NULL;
if (task_to_wsi(task))
task_to_wsi(task)->tp_task = NULL;
lwsl_thread("%s: tp %p: cleaned finished task for wsi %p\n",
__func__, task->tp, task->args.wsi);
__func__, task->tp, task_to_wsi(task));
lws_free(task);
}
@ -265,7 +275,7 @@ __lws_threadpool_reap(struct lws_threadpool_task *task)
tp->done_queue_depth--;
lwsl_thread("%s: tp %s: reaped task wsi %p\n", __func__,
tp->name, task->args.wsi);
tp->name, task_to_wsi(task));
break;
}
@ -308,7 +318,7 @@ lws_threadpool_tsi_context(struct lws_context *context, int tsi)
if (!task)
continue;
wsi = task->args.wsi;
wsi = task_to_wsi(task);
if (!wsi || wsi->tsi != tsi ||
!task->wanted_writeable_cb)
continue;
@ -331,7 +341,7 @@ lws_threadpool_tsi_context(struct lws_context *context, int tsi)
while (*c) {
task = *c;
wsi = task->args.wsi;
wsi = task_to_wsi(task);
if (wsi && wsi->tsi == tsi &&
task->wanted_writeable_cb) {
@ -373,12 +383,12 @@ lws_threadpool_worker_sync(struct lws_pool *pool,
pthread_mutex_lock(&pool->lock); /* ======================= pool lock */
lwsl_info("%s: %s: task %p (%s): syncing with wsi %p\n", __func__,
pool->tp->name, task, task->name, task->args.wsi);
pool->tp->name, task, task->name, task_to_wsi(task));
temp = task->status;
state_transition(task, LWS_TP_STATUS_SYNCING);
while (tries--) {
wsi = task->args.wsi;
wsi = task_to_wsi(task);
/*
* if the wsi is no longer attached to this task, there is
@ -431,7 +441,7 @@ lws_threadpool_worker_sync(struct lws_pool *pool,
lwsl_err("%s: %s: task %p (%s): SYNC timed out "
"(associated wsi %p)\n",
__func__, pool->tp->name, task,
task->name, task->args.wsi);
task->name, task_to_wsi(task));
state_transition(task, LWS_TP_STATUS_STOPPING);
goto done;
@ -547,7 +557,7 @@ lws_threadpool_worker(void *d)
lws_usec_t then;
int n;
if (tp->destroying || !task->args.wsi) {
if (tp->destroying || !task_to_wsi(task)) {
lwsl_info("%s: stopping on wsi gone\n", __func__);
state_transition(task, LWS_TP_STATUS_STOPPING);
}
@ -563,7 +573,7 @@ lws_threadpool_worker(void *d)
/* if not destroying the tp, continue */
break;
case LWS_TP_RETURN_SYNC:
if (!task->args.wsi) {
if (!task_to_wsi(task)) {
lwsl_debug("%s: task that wants to "
"outlive lost wsi asked "
"to sync: bypassed\n",
@ -623,11 +633,11 @@ lws_threadpool_worker(void *d)
/* signal the associated wsi to take a fresh look at
* task status */
if (pool->task->args.wsi) {
if (task_to_wsi(pool->task)) {
task->wanted_writeable_cb = 1;
lws_cancel_service(
lws_get_context(pool->task->args.wsi));
lws_get_context(task_to_wsi(pool->task)));
}
}
@ -812,6 +822,9 @@ lws_threadpool_dequeue(struct lws *wsi)
wsi->tp_task = NULL;
task->args.wsi = NULL;
#if defined(LWS_WITH_SECURE_STREAMS)
task->args.ss = NULL;
#endif
goto bail;
}
@ -833,7 +846,7 @@ lws_threadpool_dequeue(struct lws *wsi)
task->done = lws_now_usecs();
lwsl_debug("%s: tp %p: removed queued task wsi %p\n",
__func__, tp, task->args.wsi);
__func__, tp, task_to_wsi(task));
break;
}
@ -877,11 +890,14 @@ lws_threadpool_dequeue(struct lws *wsi)
task->args.wsi->tp_task = NULL;
task->args.wsi = NULL;
#if defined(LWS_WITH_SECURE_STREAMS)
task->args.ss = NULL;
#endif
pthread_mutex_unlock(&tp->pool_list[n].lock);
lwsl_debug("%s: tp %p: request stop running task "
"for wsi %p\n", __func__, tp, task->args.wsi);
"for wsi %p\n", __func__, tp, task_to_wsi(task));
break;
}
@ -889,9 +905,12 @@ lws_threadpool_dequeue(struct lws *wsi)
if (n == tp->threads_in_pool) {
/* can't find it */
lwsl_notice("%s: tp %p: no task for wsi %p, decoupling\n",
__func__, tp, task->args.wsi);
__func__, tp, task_to_wsi(task));
task->args.wsi->tp_task = NULL;
task->args.wsi = NULL;
#if defined(LWS_WITH_SECURE_STREAMS)
task->args.ss = NULL;
#endif
}
bail:
@ -900,6 +919,17 @@ bail:
return 0;
}
#if defined(LWS_WITH_SECURE_STREAMS)
int
lws_threadpool_dequeue_ss(struct lws_ss_handle *ss)
{
if (ss)
return lws_threadpool_dequeue(ss->wsi);
return 0;
}
#endif
struct lws_threadpool_task *
lws_threadpool_enqueue(struct lws_threadpool *tp,
const struct lws_threadpool_task_args *args,
@ -911,6 +941,10 @@ lws_threadpool_enqueue(struct lws_threadpool *tp,
if (tp->destroying)
return NULL;
#if defined(LWS_WITH_SECURE_STREAMS)
assert(args->ss || args->wsi);
#endif
pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
/*
@ -957,10 +991,15 @@ lws_threadpool_enqueue(struct lws_threadpool *tp,
* whatever reason can clean up)
*/
args->wsi->tp_task = task;
#if defined(LWS_WITH_SECURE_STREAMS)
if (args->ss)
args->ss->wsi->tp_task = task;
else
#endif
args->wsi->tp_task = task;
lwsl_thread("%s: tp %s: enqueued task %p (%s) for wsi %p, depth %d\n",
__func__, tp->name, task, task->name, args->wsi,
__func__, tp->name, task, task->name, task_to_wsi(task),
tp->queue_depth);
/* alert any idle thread there's something new on the task list */
@ -1007,6 +1046,18 @@ lws_threadpool_task_status_wsi(struct lws *wsi,
return status;
}
#if defined(LWS_WITH_SECURE_STREAMS)
enum lws_threadpool_task_status
lws_threadpool_task_status_ss(struct lws_ss_handle *ss,
struct lws_threadpool_task **task, void **user)
{
if (ss)
return lws_threadpool_task_status_wsi(ss->wsi, task, user);
return -1;
}
#endif
void
lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
{