[RFC PATCH 04/10] perf workqueue: add threadpool execute and wait functions
From: Riccardo Mancini <hidden>
Date: 2021-07-13 12:11:45
Also in:
lkml
Subsystem:
performance events subsystem, the rest · Maintainers:
Peter Zijlstra, Ingo Molnar, Arnaldo Carvalho de Melo, Namhyung Kim, Linus Torvalds
This patch adds: - execute_in_threadpool: assigns a task to the threads to execute asynchronously. - wait_threadpool: waits for the task to complete on all threads. Furthermore, testing for these new functions is added. This patch completes the threadpool. Signed-off-by: Riccardo Mancini <redacted> --- tools/perf/tests/workqueue.c | 86 ++++++++++++++++++++- tools/perf/util/workqueue/threadpool.c | 103 +++++++++++++++++++++++++ tools/perf/util/workqueue/threadpool.h | 5 ++ 3 files changed, 193 insertions(+), 1 deletion(-)
diff --git a/tools/perf/tests/workqueue.c b/tools/perf/tests/workqueue.c
index be377e9897bab4e9..3c64db8203556847 100644
--- a/tools/perf/tests/workqueue.c
+++ b/tools/perf/tests/workqueue.c@@ -1,13 +1,59 @@ // SPDX-License-Identifier: GPL-2.0 +#include <stdlib.h> #include <linux/kernel.h> +#include <linux/zalloc.h> #include "tests.h" #include "util/debug.h" #include "util/workqueue/threadpool.h" +#define DUMMY_FACTOR 100000 +#define N_DUMMY_WORK_SIZES 7 + struct threadpool_test_args_t { int pool_size; }; +struct test_task { + struct task_struct task; + int n_threads; + int *array; +}; + +/** + * dummy_work - calculates DUMMY_FACTOR * (idx % N_DUMMY_WORK_SIZES) inefficiently + * + * This function uses modulus to create work items of different sizes. + */ +static void dummy_work(int idx) +{ + int prod = 0; + int k = idx % N_DUMMY_WORK_SIZES; + int i, j; + + for (i = 0; i < DUMMY_FACTOR; i++) + for (j = 0; j < k; j++) + prod ++; + + pr_debug3("dummy: %d * %d = %d\n", DUMMY_FACTOR, k, prod); +} + +static void test_task_fn1(int tidx, struct task_struct *task) +{ + struct test_task *mtask = container_of(task, struct test_task, task); + + dummy_work(tidx); + mtask->array[tidx] = tidx+1; +} + +static void test_task_fn2(int tidx, struct task_struct *task) +{ + struct test_task *mtask = container_of(task, struct test_task, task); + + dummy_work(tidx); + mtask->array[tidx] = tidx*2; +} + + static int __threadpool__prepare(struct threadpool_struct **pool, int pool_size) { int ret;
@@ -38,21 +84,59 @@ static int __threadpool__teardown(struct threadpool_struct *pool) return 0; } +static int __threadpool__exec_wait(struct threadpool_struct *pool, + struct task_struct *task) +{ + int ret; + + ret = execute_in_threadpool(pool, task); + TEST_ASSERT_VAL("threadpool execute failure", ret == 0); + TEST_ASSERT_VAL("threadpool is not executing", threadpool_is_busy(pool)); + + ret = wait_threadpool(pool); + TEST_ASSERT_VAL("threadpool wait failure", ret == 0); + TEST_ASSERT_VAL("waited threadpool is not ready", threadpool_is_ready(pool)); + + return 0; +} static int __test__threadpool(void *_args) { struct threadpool_test_args_t *args = _args; struct threadpool_struct *pool; - int ret; + int ret, i; + struct test_task task; + + task.task.fn = test_task_fn1; + task.n_threads = args->pool_size; + task.array = calloc(args->pool_size, sizeof(*task.array)); ret = __threadpool__prepare(&pool, args->pool_size); if (ret) return ret; + ret = __threadpool__exec_wait(pool, &task.task); + if (ret) + return ret; + + for (i = 0; i < args->pool_size; i++) + TEST_ASSERT_VAL("failed array check (1)", task.array[i] == i+1); + + task.task.fn = test_task_fn2; + + ret = __threadpool__exec_wait(pool, &task.task); + if (ret) + return ret; + + for (i = 0; i < args->pool_size; i++) + TEST_ASSERT_VAL("failed array check (2)", task.array[i] == 2*i); + ret = __threadpool__teardown(pool); if (ret) return ret; + free(task.array); + return 0; }
diff --git a/tools/perf/util/workqueue/threadpool.c b/tools/perf/util/workqueue/threadpool.c
index f4635ff782b9388e..720c7b2a562d6816 100644
--- a/tools/perf/util/workqueue/threadpool.c
+++ b/tools/perf/util/workqueue/threadpool.c@@ -21,6 +21,7 @@ static inline pid_t gettid(void) enum threadpool_status { THREADPOOL_STATUS__STOPPED, /* no threads */ THREADPOOL_STATUS__READY, /* threads are ready but idle */ + THREADPOOL_STATUS__BUSY, /* threads are busy */ THREADPOOL_STATUS__ERROR, /* errors */ THREADPOOL_STATUS__MAX };
@@ -164,6 +165,28 @@ static int terminate_thread(struct thread_struct *thread) return res; } +/** + * wake_thread - send wake msg to @thread + * + * This function does not wait for the thread to actually wake + * NB: call only from main thread! + */ +static int wake_thread(struct thread_struct *thread) +{ + int res; + enum thread_msg msg = THREAD_MSG__WAKE; + + res = write(thread->pipes.to[1], &msg, sizeof(msg)); + if (res < 0) { + pr_err("threadpool: error sending wake msg: %s\n", strerror(errno)); + return -1; + } + + pr_debug2("threadpool: sent wake msg %s to tid=%d\n", + thread_msg_tags[msg], thread->tid); + return 0; +} + /** * threadpool_thread - function running on thread *
@@ -207,6 +230,15 @@ static void *threadpool_thread(void *args) if (msg == THREAD_MSG__STOP) break; + + if (!thread->pool->current_task) { + pr_err("threadpool[%d]: received wake without task\n", + thread->tid); + break; + } + + pr_debug("threadpool[%d]: executing task\n", thread->tid); + thread->pool->current_task->fn(thread->idx, thread->pool->current_task); } pr_debug2("threadpool[%d]: exit\n", thread->tid);
@@ -383,11 +415,16 @@ int start_threadpool(struct threadpool_struct *pool) * stop_threadpool - stop all threads in the pool. * * This function blocks waiting for ack from all threads. + * If the pool was busy, it will first wait for the task to finish. */ int stop_threadpool(struct threadpool_struct *pool) { int t, ret, err = 0; + err = wait_threadpool(pool); + if (err) + return err; + if (pool->status != THREADPOOL_STATUS__READY) { pr_err("threadpool: stopping not ready pool\n"); return -1;
@@ -411,3 +448,69 @@ bool threadpool_is_ready(struct threadpool_struct *pool) { return pool->status == THREADPOOL_STATUS__READY; } + +/** + * execute_in_threadpool - execute @task on all threads of the @pool + * + * The task will run asynchronously wrt the main thread. + * The task can be waited with wait_threadpool. + * + * NB: make sure the pool is ready before calling this, since no queueing is + * performed. If you need queueing, have a look at the workqueue. + */ +int execute_in_threadpool(struct threadpool_struct *pool, struct task_struct *task) +{ + int t, err; + + WARN_ON(pool->status != THREADPOOL_STATUS__READY); + + pool->current_task = task; + + for (t = 0; t < pool->nr_threads; t++) { + err = wake_thread(&pool->threads[t]); + + if (err) { + pool->status = THREADPOOL_STATUS__ERROR; + return err; + } + } + + pool->status = THREADPOOL_STATUS__BUSY; + return 0; +} + +/** + * wait_threadpool - wait until all threads in @pool are done + * + * This function will wait for all threads to finish execution and send their + * ack message. + * + * NB: call only from main thread! + */ +int wait_threadpool(struct threadpool_struct *pool) +{ + int t, err = 0, ret; + + if (pool->status != THREADPOOL_STATUS__BUSY) + return 0; + + for (t = 0; t < pool->nr_threads; t++) { + ret = wait_thread(&pool->threads[t]); + if (ret) { + pool->status = THREADPOOL_STATUS__ERROR; + err = -1; + } + } + + pool->status = err ? THREADPOOL_STATUS__ERROR : THREADPOOL_STATUS__READY; + pool->current_task = NULL; + return err; +} + +/** + * threadpool_is_busy - check if the pool is busy + */ +int threadpool_is_busy(struct threadpool_struct *pool) +{ + return pool->status == THREADPOOL_STATUS__BUSY; +}
diff --git a/tools/perf/util/workqueue/threadpool.h b/tools/perf/util/workqueue/threadpool.h
index b62cad2b2c5dd331..dd9c2103ebe8d23b 100644
--- a/tools/perf/util/workqueue/threadpool.h
+++ b/tools/perf/util/workqueue/threadpool.h@@ -17,8 +17,13 @@ extern void destroy_threadpool(struct threadpool_struct *pool); extern int start_threadpool(struct threadpool_struct *pool); extern int stop_threadpool(struct threadpool_struct *pool); +extern int execute_in_threadpool(struct threadpool_struct *pool, + struct task_struct *task); +extern int wait_threadpool(struct threadpool_struct *pool); + extern int threadpool_size(struct threadpool_struct *pool); extern bool threadpool_is_ready(struct threadpool_struct *pool); +extern int threadpool_is_busy(struct threadpool_struct *pool); #endif /* __WORKQUEUE_THREADPOOL_H */
--
2.31.1