1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

threadpool: allow multiple threadpool tasks to bind to same wsi

This commit is contained in:
Andy Green 2020-04-04 10:47:11 +01:00
parent 4caeb56bec
commit fb3fd499e3
5 changed files with 185 additions and 90 deletions

View file

@ -1,7 +1,7 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
@ -177,17 +177,21 @@ lws_threadpool_enqueue(struct lws_threadpool *tp,
* This doesn't free the task. It only shortcuts it to state
* LWS_TP_STATUS_STOPPED. lws_threadpool_task_status() must be performed on
* the task separately once it is in LWS_TP_STATUS_STOPPED to free the task.
*
* DEPRECATED: You should use lws_threadpool_dequeue_task() with
* lws_threadpool_get_task_wsi() / _ss() if you know there can only be one task
* per connection, or call it via lws_threadpool_foreach_task_wsi() / _ss() to
* get the tasks bound to the connection.
*/
LWS_VISIBLE LWS_EXTERN int
lws_threadpool_dequeue(struct lws *wsi);
lws_threadpool_dequeue(struct lws *wsi) LWS_WARN_DEPRECATED;
#if defined(LWS_WITH_SECURE_STREAMS)
LWS_VISIBLE LWS_EXTERN int
lws_threadpool_dequeue_ss(struct lws_ss_handle *ss);
#endif
lws_threadpool_dequeue_task(struct lws_threadpool_task *task);
/**
* lws_threadpool_task_status() - Dequeue or try to stop a running task
* lws_threadpool_task_status() - reap completed tasks
*
* \param wsi: the wsi to query the current task of
* \param task: receives a pointer to the opaque task
@ -202,16 +206,18 @@ lws_threadpool_dequeue_ss(struct lws_ss_handle *ss);
*
* Its use is to make sure the service thread has seen the state of the task
* before deleting it.
*
* DEPRECATED... use lws_threadpool_task_status() instead and get the task
* pointer from lws_threadpool_get_task_wsi() / _ss() if you know there can only
* be one, else call it via lws_threadpool_foreach_task_wsi() / _ss()
*/
LWS_VISIBLE LWS_EXTERN enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws *wsi,
struct lws_threadpool_task **task, void **user);
struct lws_threadpool_task **task, void **user)
LWS_WARN_DEPRECATED;
#if defined(LWS_WITH_SECURE_STREAMS)
LWS_VISIBLE LWS_EXTERN enum lws_threadpool_task_status
lws_threadpool_task_status_ss(struct lws_ss_handle *ss,
struct lws_threadpool_task **task, void **user);
#endif
lws_threadpool_task_status(struct lws_threadpool_task *task, void **user);
/**
* lws_threadpool_task_sync() - Indicate to a stalled task it may continue
@ -244,4 +250,28 @@ lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop);
LWS_VISIBLE LWS_EXTERN void
lws_threadpool_dump(struct lws_threadpool *tp);
LWS_VISIBLE LWS_EXTERN struct lws_threadpool_task *
lws_threadpool_get_task_wsi(struct lws *wsi);
#if defined(LWS_WITH_SECURE_STREAMS)
LWS_VISIBLE LWS_EXTERN struct lws_threadpool_task *
lws_threadpool_get_task_ss(struct lws_ss_handle *ss);
#endif
LWS_VISIBLE LWS_EXTERN int
lws_threadpool_foreach_task_wsi(struct lws *wsi, void *user,
int (*cb)(struct lws_threadpool_task *task,
void *user));
#if defined(LWS_WITH_SECURE_STREAMS)
LWS_VISIBLE LWS_EXTERN int
lws_threadpool_foreach_task_ss(struct lws_ss_handle *ss, void *user,
int (*cb)(struct lws_threadpool_task *task, void *user));
#endif
//@}

View file

@ -710,7 +710,7 @@ struct lws {
const lws_retry_bo_t *retry_policy;
#if defined(LWS_WITH_THREADPOOL)
struct lws_threadpool_task *tp_task;
lws_dll2_owner_t tp_task_owner; /* struct lws_threadpool_task */
#endif
#if defined(LWS_WITH_PEER_LIMITS)

View file

@ -456,8 +456,8 @@ lws_spawn_piped(const struct lws_spawn_piped_info *i)
* the original process temporarily and we are (ab)using its
* identity during this pre-exec() time
*/
close(lsp->pipe_fds[m][!(m == 0)]);
#if !defined(LWS_HAVE_VFORK) || !defined(LWS_HAVE_EXECVPE)
close(lsp->pipe_fds[m][!(m == 0)]);
close(lsp->pipe_fds[m][!!(m == 0)]);
#endif
}

View file

@ -36,59 +36,61 @@
struct lws_threadpool;
struct lws_threadpool_task {
struct lws_threadpool_task *task_queue_next;
struct lws_threadpool_task *task_queue_next;
struct lws_threadpool *tp;
char name[32];
struct lws_threadpool *tp;
char name[32];
struct lws_threadpool_task_args args;
lws_usec_t created;
lws_usec_t acquired;
lws_usec_t done;
lws_usec_t entered_state;
lws_dll2_t list;
lws_usec_t acc_running;
lws_usec_t acc_syncing;
lws_usec_t created;
lws_usec_t acquired;
lws_usec_t done;
lws_usec_t entered_state;
pthread_cond_t wake_idle;
lws_usec_t acc_running;
lws_usec_t acc_syncing;
pthread_cond_t wake_idle;
enum lws_threadpool_task_status status;
int late_sync_retries;
int late_sync_retries;
char wanted_writeable_cb;
char outlive;
char wanted_writeable_cb;
char outlive;
};
struct lws_pool {
struct lws_threadpool *tp;
pthread_t thread;
pthread_mutex_t lock; /* part of task wake_idle */
struct lws_threadpool_task *task;
lws_usec_t acquired;
int worker_index;
struct lws_threadpool *tp;
pthread_t thread;
pthread_mutex_t lock; /* part of task wake_idle */
struct lws_threadpool_task *task;
lws_usec_t acquired;
int worker_index;
};
struct lws_threadpool {
pthread_mutex_t lock; /* protects all pool lists */
pthread_cond_t wake_idle;
struct lws_pool *pool_list;
pthread_mutex_t lock; /* protects all pool lists */
pthread_cond_t wake_idle;
struct lws_pool *pool_list;
struct lws_context *context;
struct lws_threadpool *tp_list; /* context list of threadpools */
struct lws_context *context;
struct lws_threadpool *tp_list; /* context list of threadpools */
struct lws_threadpool_task *task_queue_head;
struct lws_threadpool_task *task_done_head;
struct lws_threadpool_task *task_queue_head;
struct lws_threadpool_task *task_done_head;
char name[32];
char name[32];
int threads_in_pool;
int queue_depth;
int done_queue_depth;
int max_queue_depth;
int running_tasks;
int threads_in_pool;
int queue_depth;
int done_queue_depth;
int max_queue_depth;
int running_tasks;
unsigned int destroying:1;
unsigned int destroying:1;
};
static int
@ -248,8 +250,7 @@ lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
if (task->args.cleanup)
task->args.cleanup(task_to_wsi(task), task->args.user);
if (task_to_wsi(task))
task_to_wsi(task)->tp_task = NULL;
lws_dll2_remove(&task->list);
lwsl_thread("%s: tp %p: cleaned finished task for wsi %p\n",
__func__, task->tp, task_to_wsi(task));
@ -801,21 +802,16 @@ lws_threadpool_destroy(struct lws_threadpool *tp)
}
/*
* we want to stop and destroy the task and related priv. The wsi may no
* longer exist.
* We want to stop and destroy the tasks and related priv.
*/
int
lws_threadpool_dequeue(struct lws *wsi)
lws_threadpool_dequeue_task(struct lws_threadpool_task *task)
{
struct lws_threadpool *tp;
struct lws_threadpool_task **c, *task;
struct lws_threadpool_task **c;
int n;
task = wsi->tp_task;
if (!task)
return 0;
tp = task->tp;
pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
@ -823,7 +819,7 @@ lws_threadpool_dequeue(struct lws *wsi)
/* disconnect from wsi, and wsi from task */
wsi->tp_task = NULL;
lws_dll2_remove(&task->list);
task->args.wsi = NULL;
#if defined(LWS_WITH_SECURE_STREAMS)
task->args.ss = NULL;
@ -891,7 +887,7 @@ lws_threadpool_dequeue(struct lws *wsi)
/* disconnect from wsi, and wsi from task */
task->args.wsi->tp_task = NULL;
lws_dll2_remove(&task->list);
task->args.wsi = NULL;
#if defined(LWS_WITH_SECURE_STREAMS)
task->args.ss = NULL;
@ -909,7 +905,7 @@ lws_threadpool_dequeue(struct lws *wsi)
/* can't find it */
lwsl_notice("%s: tp %p: no task for wsi %p, decoupling\n",
__func__, tp, task_to_wsi(task));
task->args.wsi->tp_task = NULL;
lws_dll2_remove(&task->list);
task->args.wsi = NULL;
#if defined(LWS_WITH_SECURE_STREAMS)
task->args.ss = NULL;
@ -922,16 +918,20 @@ bail:
return 0;
}
#if defined(LWS_WITH_SECURE_STREAMS)
int
lws_threadpool_dequeue_ss(struct lws_ss_handle *ss)
lws_threadpool_dequeue(struct lws *wsi) /* deprecated */
{
if (ss)
return lws_threadpool_dequeue(ss->wsi);
struct lws_threadpool_task *task;
return 0;
if (!wsi->tp_task_owner.count)
return 0;
assert(wsi->tp_task_owner.count != 1);
task = lws_container_of(wsi->tp_task_owner.head,
struct lws_threadpool_task, list);
return lws_threadpool_dequeue_task(task);
}
#endif
struct lws_threadpool_task *
lws_threadpool_enqueue(struct lws_threadpool *tp,
@ -996,10 +996,10 @@ lws_threadpool_enqueue(struct lws_threadpool *tp,
#if defined(LWS_WITH_SECURE_STREAMS)
if (args->ss)
args->ss->wsi->tp_task = task;
lws_dll2_add_tail(&task->list, &args->ss->wsi->tp_task_owner);
else
#endif
args->wsi->tp_task = task;
lws_dll2_add_tail(&task->list, &args->wsi->tp_task_owner);
lwsl_thread("%s: tp %s: enqueued task %p (%s) for wsi %p, depth %d\n",
__func__, tp->name, task, task->name, task_to_wsi(task),
@ -1019,31 +1019,23 @@ bail:
/* this should be called from the service thread */
enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws *wsi,
struct lws_threadpool_task **task, void **user)
lws_threadpool_task_status(struct lws_threadpool_task *task, void **user)
{
enum lws_threadpool_task_status status;
struct lws_threadpool *tp;
struct lws_threadpool *tp = task->tp;
*task = wsi->tp_task;
if (!*task) {
lwsl_notice("%s: wsi has NULL tp_task, ~=FINISHED\n", __func__);
return LWS_TP_STATUS_FINISHED;
}
tp = (*task)->tp;
*user = (*task)->args.user;
status = (*task)->status;
*user = task->args.user;
status = task->status;
if (status == LWS_TP_STATUS_FINISHED ||
status == LWS_TP_STATUS_STOPPED) {
char buf[160];
pthread_mutex_lock(&tp->lock); /* ================ tpool lock */
__lws_threadpool_task_dump(*task, buf, sizeof(buf));
__lws_threadpool_task_dump(task, buf, sizeof(buf));
lwsl_thread("%s: %s: service thread REAPING: %s\n",
__func__, tp->name, buf);
__lws_threadpool_reap(*task);
__lws_threadpool_reap(task);
lws_memory_barrier();
pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */
}
@ -1051,17 +1043,26 @@ lws_threadpool_task_status_wsi(struct lws *wsi,
return status;
}
#if defined(LWS_WITH_SECURE_STREAMS)
enum lws_threadpool_task_status
lws_threadpool_task_status_ss(struct lws_ss_handle *ss,
struct lws_threadpool_task **task, void **user)
lws_threadpool_task_status_wsi(struct lws *wsi,
struct lws_threadpool_task **_task, void **user)
{
if (ss)
return lws_threadpool_task_status_wsi(ss->wsi, task, user);
struct lws_threadpool_task *task;
return -1;
if (!wsi->tp_task_owner.count) {
lwsl_notice("%s: wsi has no task, ~=FINISHED\n", __func__);
return LWS_TP_STATUS_FINISHED;
}
assert(wsi->tp_task_owner.count == 1); /* see deprecation docs in hdr */
task = lws_container_of(wsi->tp_task_owner.head,
struct lws_threadpool_task, list);
*_task = task;
return lws_threadpool_task_status(task, user);
}
#endif
void
lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
@ -1073,3 +1074,66 @@ lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
pthread_cond_signal(&task->wake_idle);
}
int
lws_threadpool_foreach_task_wsi(struct lws *wsi, void *user,
int (*cb)(struct lws_threadpool_task *task,
void *user))
{
struct lws_threadpool_task *task1;
if (wsi->tp_task_owner.head == NULL)
return 0;
task1 = lws_container_of(wsi->tp_task_owner.head,
struct lws_threadpool_task, list);
pthread_mutex_lock(&task1->tp->lock); /* ================ tpool lock */
lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
wsi->tp_task_owner.head) {
struct lws_threadpool_task *task = lws_container_of(d,
struct lws_threadpool_task, list);
if (cb(task, user)) {
pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
return 1;
}
} lws_end_foreach_dll_safe(d, d1);
pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
return 0;
}
#if defined(LWS_WITH_SECURE_STREAMS)
int
lws_threadpool_foreach_task_ss(struct lws_ss_handle *ss, void *user,
int (*cb)(struct lws_threadpool_task *task,
void *user))
{
if (!ss->wsi)
return 0;
return lws_threadpool_foreach_task_wsi(ss->wsi, user, cb);
}
#endif
struct lws_threadpool_task *
lws_threadpool_get_task_wsi(struct lws *wsi)
{
if (wsi->tp_task_owner.head == NULL)
return NULL;
return lws_container_of(wsi->tp_task_owner.head,
struct lws_threadpool_task, list);
}
#if defined(LWS_WITH_SECURE_STREAMS)
struct lws_threadpool_task *
lws_threadpool_get_task_ss(struct lws_ss_handle *ss)
{
return lws_threadpool_get_task_wsi(ss->wsi);
}
#endif

View file

@ -237,7 +237,7 @@ callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL:
lwsl_debug("LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: %p\n", wsi);
lws_threadpool_dequeue(wsi);
lws_threadpool_dequeue_task(lws_threadpool_get_task_wsi(wsi));
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
@ -253,7 +253,8 @@ callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
* private task data.
*/
n = lws_threadpool_task_status_wsi(wsi, &task, &_user);
task = lws_threadpool_get_task_wsi(wsi);
n = lws_threadpool_task_status(task, &_user);
lwsl_debug("%s: LWS_CALLBACK_SERVER_WRITEABLE: status %d\n",
__func__, n);
switch(n) {