diff --git a/include/libwebsockets/lws-threadpool.h b/include/libwebsockets/lws-threadpool.h index 5b6cf0800..a12c92d36 100644 --- a/include/libwebsockets/lws-threadpool.h +++ b/include/libwebsockets/lws-threadpool.h @@ -1,7 +1,7 @@ /* * libwebsockets - small server side websockets and web server implementation * - * Copyright (C) 2010 - 2019 Andy Green + * Copyright (C) 2010 - 2020 Andy Green * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to @@ -177,17 +177,21 @@ lws_threadpool_enqueue(struct lws_threadpool *tp, * This doesn't free the task. It only shortcuts it to state * LWS_TP_STATUS_STOPPED. lws_threadpool_task_status() must be performed on * the task separately once it is in LWS_TP_STATUS_STOPPED to free the task. + * + * DEPRECATED: You should use lws_threadpool_dequeue_task() with + * lws_threadpool_get_task_wsi() / _ss() if you know there can only be one task + * per connection, or call it via lws_threadpool_foreach_task_wsi() / _ss() to + * get the tasks bound to the connection. */ LWS_VISIBLE LWS_EXTERN int -lws_threadpool_dequeue(struct lws *wsi); +lws_threadpool_dequeue(struct lws *wsi) LWS_WARN_DEPRECATED; -#if defined(LWS_WITH_SECURE_STREAMS) LWS_VISIBLE LWS_EXTERN int -lws_threadpool_dequeue_ss(struct lws_ss_handle *ss); -#endif +lws_threadpool_dequeue_task(struct lws_threadpool_task *task); + /** - * lws_threadpool_task_status() - Dequeue or try to stop a running task + * lws_threadpool_task_status() - reap completed tasks * * \param wsi: the wsi to query the current task of * \param task: receives a pointer to the opaque task @@ -202,16 +206,18 @@ lws_threadpool_dequeue_ss(struct lws_ss_handle *ss); * * Its use is to make sure the service thread has seen the state of the task * before deleting it. + * + * DEPRECATED... use lws_threadpool_task_status() instead and get the task + * pointer from lws_threadpool_get_task_wsi() / _ss() if you know there can only + * be one, else call it via lws_threadpool_foreach_task_wsi() / _ss() */ LWS_VISIBLE LWS_EXTERN enum lws_threadpool_task_status lws_threadpool_task_status_wsi(struct lws *wsi, - struct lws_threadpool_task **task, void **user); + struct lws_threadpool_task **task, void **user) + LWS_WARN_DEPRECATED; -#if defined(LWS_WITH_SECURE_STREAMS) LWS_VISIBLE LWS_EXTERN enum lws_threadpool_task_status -lws_threadpool_task_status_ss(struct lws_ss_handle *ss, - struct lws_threadpool_task **task, void **user); -#endif +lws_threadpool_task_status(struct lws_threadpool_task *task, void **user); /** * lws_threadpool_task_sync() - Indicate to a stalled task it may continue @@ -244,4 +250,28 @@ lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop); LWS_VISIBLE LWS_EXTERN void lws_threadpool_dump(struct lws_threadpool *tp); + + + +LWS_VISIBLE LWS_EXTERN struct lws_threadpool_task * +lws_threadpool_get_task_wsi(struct lws *wsi); + +#if defined(LWS_WITH_SECURE_STREAMS) +LWS_VISIBLE LWS_EXTERN struct lws_threadpool_task * +lws_threadpool_get_task_ss(struct lws_ss_handle *ss); +#endif + + +LWS_VISIBLE LWS_EXTERN int +lws_threadpool_foreach_task_wsi(struct lws *wsi, void *user, + int (*cb)(struct lws_threadpool_task *task, + void *user)); + +#if defined(LWS_WITH_SECURE_STREAMS) +LWS_VISIBLE LWS_EXTERN int +lws_threadpool_foreach_task_ss(struct lws_ss_handle *ss, void *user, + int (*cb)(struct lws_threadpool_task *task, void *user)); +#endif + + //@} diff --git a/lib/core-net/private-lib-core-net.h b/lib/core-net/private-lib-core-net.h index 5e7dcbc24..b6540083e 100644 --- a/lib/core-net/private-lib-core-net.h +++ b/lib/core-net/private-lib-core-net.h @@ -710,7 +710,7 @@ struct lws { const lws_retry_bo_t *retry_policy; #if defined(LWS_WITH_THREADPOOL) - struct lws_threadpool_task *tp_task; + lws_dll2_owner_t tp_task_owner; /* struct lws_threadpool_task */ #endif #if defined(LWS_WITH_PEER_LIMITS) diff --git a/lib/misc/spawn.c b/lib/misc/spawn.c index 1a41eae77..fdc61cd2b 100644 --- a/lib/misc/spawn.c +++ b/lib/misc/spawn.c @@ -456,8 +456,8 @@ lws_spawn_piped(const struct lws_spawn_piped_info *i) * the original process temporarily and we are (ab)using its * identity during this pre-exec() time */ - close(lsp->pipe_fds[m][!(m == 0)]); #if !defined(LWS_HAVE_VFORK) || !defined(LWS_HAVE_EXECVPE) + close(lsp->pipe_fds[m][!(m == 0)]); close(lsp->pipe_fds[m][!!(m == 0)]); #endif } diff --git a/lib/misc/threadpool/threadpool.c b/lib/misc/threadpool/threadpool.c index 6a4a38bce..deab75717 100644 --- a/lib/misc/threadpool/threadpool.c +++ b/lib/misc/threadpool/threadpool.c @@ -36,59 +36,61 @@ struct lws_threadpool; struct lws_threadpool_task { - struct lws_threadpool_task *task_queue_next; + struct lws_threadpool_task *task_queue_next; - struct lws_threadpool *tp; - char name[32]; + struct lws_threadpool *tp; + char name[32]; struct lws_threadpool_task_args args; - lws_usec_t created; - lws_usec_t acquired; - lws_usec_t done; - lws_usec_t entered_state; + lws_dll2_t list; - lws_usec_t acc_running; - lws_usec_t acc_syncing; + lws_usec_t created; + lws_usec_t acquired; + lws_usec_t done; + lws_usec_t entered_state; - pthread_cond_t wake_idle; + lws_usec_t acc_running; + lws_usec_t acc_syncing; + + pthread_cond_t wake_idle; enum lws_threadpool_task_status status; - int late_sync_retries; + int late_sync_retries; - char wanted_writeable_cb; - char outlive; + char wanted_writeable_cb; + char outlive; }; struct lws_pool { - struct lws_threadpool *tp; - pthread_t thread; - pthread_mutex_t lock; /* part of task wake_idle */ - struct lws_threadpool_task *task; - lws_usec_t acquired; - int worker_index; + struct lws_threadpool *tp; + pthread_t thread; + pthread_mutex_t lock; /* part of task wake_idle */ + struct lws_threadpool_task *task; + lws_usec_t acquired; + int worker_index; }; struct lws_threadpool { - pthread_mutex_t lock; /* protects all pool lists */ - pthread_cond_t wake_idle; - struct lws_pool *pool_list; + pthread_mutex_t lock; /* protects all pool lists */ + pthread_cond_t wake_idle; + struct lws_pool *pool_list; - struct lws_context *context; - struct lws_threadpool *tp_list; /* context list of threadpools */ + struct lws_context *context; + struct lws_threadpool *tp_list; /* context list of threadpools */ - struct lws_threadpool_task *task_queue_head; - struct lws_threadpool_task *task_done_head; + struct lws_threadpool_task *task_queue_head; + struct lws_threadpool_task *task_done_head; - char name[32]; + char name[32]; - int threads_in_pool; - int queue_depth; - int done_queue_depth; - int max_queue_depth; - int running_tasks; + int threads_in_pool; + int queue_depth; + int done_queue_depth; + int max_queue_depth; + int running_tasks; - unsigned int destroying:1; + unsigned int destroying:1; }; static int @@ -248,8 +250,7 @@ lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task) if (task->args.cleanup) task->args.cleanup(task_to_wsi(task), task->args.user); - if (task_to_wsi(task)) - task_to_wsi(task)->tp_task = NULL; + lws_dll2_remove(&task->list); lwsl_thread("%s: tp %p: cleaned finished task for wsi %p\n", __func__, task->tp, task_to_wsi(task)); @@ -801,21 +802,16 @@ lws_threadpool_destroy(struct lws_threadpool *tp) } /* - * we want to stop and destroy the task and related priv. The wsi may no - * longer exist. + * We want to stop and destroy the tasks and related priv. */ int -lws_threadpool_dequeue(struct lws *wsi) +lws_threadpool_dequeue_task(struct lws_threadpool_task *task) { struct lws_threadpool *tp; - struct lws_threadpool_task **c, *task; + struct lws_threadpool_task **c; int n; - task = wsi->tp_task; - if (!task) - return 0; - tp = task->tp; pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ @@ -823,7 +819,7 @@ lws_threadpool_dequeue(struct lws *wsi) /* disconnect from wsi, and wsi from task */ - wsi->tp_task = NULL; + lws_dll2_remove(&task->list); task->args.wsi = NULL; #if defined(LWS_WITH_SECURE_STREAMS) task->args.ss = NULL; @@ -891,7 +887,7 @@ lws_threadpool_dequeue(struct lws *wsi) /* disconnect from wsi, and wsi from task */ - task->args.wsi->tp_task = NULL; + lws_dll2_remove(&task->list); task->args.wsi = NULL; #if defined(LWS_WITH_SECURE_STREAMS) task->args.ss = NULL; @@ -909,7 +905,7 @@ lws_threadpool_dequeue(struct lws *wsi) /* can't find it */ lwsl_notice("%s: tp %p: no task for wsi %p, decoupling\n", __func__, tp, task_to_wsi(task)); - task->args.wsi->tp_task = NULL; + lws_dll2_remove(&task->list); task->args.wsi = NULL; #if defined(LWS_WITH_SECURE_STREAMS) task->args.ss = NULL; @@ -922,16 +918,20 @@ bail: return 0; } -#if defined(LWS_WITH_SECURE_STREAMS) int -lws_threadpool_dequeue_ss(struct lws_ss_handle *ss) +lws_threadpool_dequeue(struct lws *wsi) /* deprecated */ { - if (ss) - return lws_threadpool_dequeue(ss->wsi); + struct lws_threadpool_task *task; - return 0; + if (!wsi->tp_task_owner.count) + return 0; + assert(wsi->tp_task_owner.count != 1); + + task = lws_container_of(wsi->tp_task_owner.head, + struct lws_threadpool_task, list); + + return lws_threadpool_dequeue_task(task); } -#endif struct lws_threadpool_task * lws_threadpool_enqueue(struct lws_threadpool *tp, @@ -996,10 +996,10 @@ lws_threadpool_enqueue(struct lws_threadpool *tp, #if defined(LWS_WITH_SECURE_STREAMS) if (args->ss) - args->ss->wsi->tp_task = task; + lws_dll2_add_tail(&task->list, &args->ss->wsi->tp_task_owner); else #endif - args->wsi->tp_task = task; + lws_dll2_add_tail(&task->list, &args->wsi->tp_task_owner); lwsl_thread("%s: tp %s: enqueued task %p (%s) for wsi %p, depth %d\n", __func__, tp->name, task, task->name, task_to_wsi(task), @@ -1019,31 +1019,23 @@ bail: /* this should be called from the service thread */ enum lws_threadpool_task_status -lws_threadpool_task_status_wsi(struct lws *wsi, - struct lws_threadpool_task **task, void **user) +lws_threadpool_task_status(struct lws_threadpool_task *task, void **user) { enum lws_threadpool_task_status status; - struct lws_threadpool *tp; + struct lws_threadpool *tp = task->tp; - *task = wsi->tp_task; - if (!*task) { - lwsl_notice("%s: wsi has NULL tp_task, ~=FINISHED\n", __func__); - return LWS_TP_STATUS_FINISHED; - } - - tp = (*task)->tp; - *user = (*task)->args.user; - status = (*task)->status; + *user = task->args.user; + status = task->status; if (status == LWS_TP_STATUS_FINISHED || status == LWS_TP_STATUS_STOPPED) { char buf[160]; pthread_mutex_lock(&tp->lock); /* ================ tpool lock */ - __lws_threadpool_task_dump(*task, buf, sizeof(buf)); + __lws_threadpool_task_dump(task, buf, sizeof(buf)); lwsl_thread("%s: %s: service thread REAPING: %s\n", __func__, tp->name, buf); - __lws_threadpool_reap(*task); + __lws_threadpool_reap(task); lws_memory_barrier(); pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */ } @@ -1051,17 +1043,26 @@ lws_threadpool_task_status_wsi(struct lws *wsi, return status; } -#if defined(LWS_WITH_SECURE_STREAMS) enum lws_threadpool_task_status -lws_threadpool_task_status_ss(struct lws_ss_handle *ss, - struct lws_threadpool_task **task, void **user) +lws_threadpool_task_status_wsi(struct lws *wsi, + struct lws_threadpool_task **_task, void **user) { - if (ss) - return lws_threadpool_task_status_wsi(ss->wsi, task, user); + struct lws_threadpool_task *task; - return -1; + if (!wsi->tp_task_owner.count) { + lwsl_notice("%s: wsi has no task, ~=FINISHED\n", __func__); + return LWS_TP_STATUS_FINISHED; + } + + assert(wsi->tp_task_owner.count == 1); /* see deprecation docs in hdr */ + + task = lws_container_of(wsi->tp_task_owner.head, + struct lws_threadpool_task, list); + + *_task = task; + + return lws_threadpool_task_status(task, user); } -#endif void lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop) @@ -1073,3 +1074,66 @@ lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop) pthread_cond_signal(&task->wake_idle); } + +int +lws_threadpool_foreach_task_wsi(struct lws *wsi, void *user, + int (*cb)(struct lws_threadpool_task *task, + void *user)) +{ + struct lws_threadpool_task *task1; + + if (wsi->tp_task_owner.head == NULL) + return 0; + + task1 = lws_container_of(wsi->tp_task_owner.head, + struct lws_threadpool_task, list); + + pthread_mutex_lock(&task1->tp->lock); /* ================ tpool lock */ + + lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1, + wsi->tp_task_owner.head) { + struct lws_threadpool_task *task = lws_container_of(d, + struct lws_threadpool_task, list); + + if (cb(task, user)) { + pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */ + return 1; + } + + } lws_end_foreach_dll_safe(d, d1); + + pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */ + + return 0; +} + +#if defined(LWS_WITH_SECURE_STREAMS) +int +lws_threadpool_foreach_task_ss(struct lws_ss_handle *ss, void *user, + int (*cb)(struct lws_threadpool_task *task, + void *user)) +{ + if (!ss->wsi) + return 0; + + return lws_threadpool_foreach_task_wsi(ss->wsi, user, cb); +} +#endif + +struct lws_threadpool_task * +lws_threadpool_get_task_wsi(struct lws *wsi) +{ + if (wsi->tp_task_owner.head == NULL) + return NULL; + + return lws_container_of(wsi->tp_task_owner.head, + struct lws_threadpool_task, list); +} + +#if defined(LWS_WITH_SECURE_STREAMS) +struct lws_threadpool_task * +lws_threadpool_get_task_ss(struct lws_ss_handle *ss) +{ + return lws_threadpool_get_task_wsi(ss->wsi); +} +#endif diff --git a/minimal-examples/ws-server/minimal-ws-server-threadpool/protocol_lws_minimal_threadpool.c b/minimal-examples/ws-server/minimal-ws-server-threadpool/protocol_lws_minimal_threadpool.c index 9850072c4..55998a794 100644 --- a/minimal-examples/ws-server/minimal-ws-server-threadpool/protocol_lws_minimal_threadpool.c +++ b/minimal-examples/ws-server/minimal-ws-server-threadpool/protocol_lws_minimal_threadpool.c @@ -237,7 +237,7 @@ callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: lwsl_debug("LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: %p\n", wsi); - lws_threadpool_dequeue(wsi); + lws_threadpool_dequeue_task(lws_threadpool_get_task_wsi(wsi)); break; case LWS_CALLBACK_SERVER_WRITEABLE: @@ -253,7 +253,8 @@ callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, * private task data. */ - n = lws_threadpool_task_status_wsi(wsi, &task, &_user); + task = lws_threadpool_get_task_wsi(wsi); + n = lws_threadpool_task_status(task, &_user); lwsl_debug("%s: LWS_CALLBACK_SERVER_WRITEABLE: status %d\n", __func__, n); switch(n) {