一、Work-stealing 算法历史
Work-stealing 是一种高效的任务调度算法,通常用于并行计算框架中,例如 Intel TBB 和 Java Fork/Join 框架。它的核心思想是每个线程都有一个任务队列,当某个线程完成自身任务时,它会尝试从其他线程的队列中窃取任务继续执行。
Working-Stealing算法最初是由Charles E. Leiserson、 Blaise Barney、 Daniel Cociorva 以及 Andrew Begel在2009年的论文“The implementation of work stealing in Intel Cilk”中提出的.
- 提出场景:随着计算机硬件技术的发展,多核处理器逐渐普及,并行计算成为提高计算性能的重要手段。在并行编程中,如何有效地调度任务以充分利用多核处理器的资源,同时减少任务之间的等待时间和处理器的空闲时间,成为一个重要的研究课题。在此背景下,Working-Stealing算法应运而生,为解决并行任务调度问题提供了一种有效的策略.
- 最初目的:
- 实现高效的负载平衡:在并行计算中,不同任务的执行时间可能存在差异,导致某些处理器可能在完成自己的任务后处于空闲状态,而其他处理器仍在忙碌。Working-Stealing算法通过允许空闲处理器从忙碌处理器的任务队列中窃取任务,可以自动地将工作负载均匀地分配到各个处理器上,充分利用所有处理器的计算资源,提高整体的计算效率.
- 减少同步开销:与传统的使用全局任务队列或集中式调度的方法相比,Working-Stealing算法为每个处理器或线程分配了独立的任务队列。这种分散式的任务管理方式减少了多个处理器同时访问共享资源时的竞争和同步开销,提高了系统的并行性和可扩展性.
- 提高数据局部性:在窃取任务时,处理器通常会从其他处理器的任务队列尾部窃取任务。由于任务队列中的任务往往具有一定的相关性或数据依赖性,这种窃取方式有利于提高数据的局部性,即窃取到的任务更有可能操作与当前处理器正在处理的数据相近的数据,从而减少了数据访问的延迟,进一步提高了性能.
- 适应动态变化的任务执行时间:在实际的并行计算中,任务的执行时间可能会因为各种因素而发生变化,例如数据的分布、缓存的命中率等。Working-Stealing算法能够根据任务的实际执行情况动态地调整任务的分配,及时地将任务从执行时间较长的处理器转移到空闲处理器上,从而更好地适应这种动态变化,保持系统的高性能运行.
二、Work-stealing 算法定义
-
工作窃取(Working - Stealing)算法的定义
- 基本概念:工作窃取是一种用于动态负载平衡的任务调度算法。在并行计算环境中,有多个线程(或处理器)可用于执行任务。每个线程都有自己的任务队列,当一个线程完成自己队列中的所有任务后,它会从其他线程的任务队列尾部“窃取”任务来执行。
- 任务分配方式:例如,假设有线程A、B和C,它们都有各自的任务队列。开始时,每个线程从自己的队列头部获取任务并执行。如果线程A提前完成了自己队列中的所有任务,而线程B和C还有任务未完成,那么线程A会尝试从线程B或C的任务队列尾部“拿走”一个任务来继续执行。这种从其他线程队列尾部窃取任务的方式是为了尽量减少对被窃取线程的干扰,因为一般情况下,线程会从队列头部获取任务执行,这样就可以避免与正在被窃取线程的任务执行产生冲突。
-
成为众多并行计算库核心算法的原因
- 动态负载平衡优势
- 自动适应负载变化:在并行计算中,任务的执行时间可能因各种因素而不同。有些任务可能很快完成,而有些任务可能由于数据依赖、计算复杂性等原因需要更长时间。工作窃取算法能够自动适应这种负载的动态变化。例如,在一个并行处理图像像素的任务中,部分像素区域可能因为颜色单一、处理简单而快速完成处理,而包含复杂纹理或对象的像素区域处理时间较长。采用工作窃取算法,处理简单区域的线程在完成自己的任务后,可以帮助处理复杂区域的线程,从而平衡各个线程的负载,提高整体的计算效率。
- 减少空闲时间:它有效地减少了线程的空闲时间。在传统的任务分配方式中,如果任务分配不均匀,可能会出现部分线程提前完成任务而闲置,而其他线程仍在忙碌的情况。工作窃取算法通过让空闲线程主动获取任务,使得所有线程都能尽可能地保持忙碌状态,充分利用了计算资源。比如在一个基于多核处理器的并行计算服务器上,工作窃取算法可以确保各个核心都能高效地参与计算,避免核心资源的浪费。
- 可扩展性良好
- 易于实现和集成
- 动态负载平衡优势
三、C++ 实现介绍
c++20实现如下:
实现代码
#include <iostream>
#include <vector>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <random>
#include <functional>
#include <optional>class WorkStealingQueue {
private:std::deque<std::function<void()>> tasks;mutable std::mutex mtx;public:// 推送任务到队列尾部void pushTask(std::function<void()> task) {std::lock_guard<std::mutex> lock(mtx);tasks.push_back(std::move(task));}// 弹出队列尾部的任务(本线程优先)std::optional<std::function<void()>> popTask() {std::lock_guard<std::mutex> lock(mtx);if (tasks.empty()) return std::nullopt;auto task = std::move(tasks.back());tasks.pop_back();return task;}// 从队列头部窃取任务(其他线程)std::optional<std::function<void()>> stealTask() {std::lock_guard<std::mutex> lock(mtx);if (tasks.empty()) return std::nullopt;auto task = std::move(tasks.front());tasks.pop_front();return task;}// 检查队列是否为空bool isEmpty() const {std::lock_guard<std::mutex> lock(mtx);return tasks.empty();}
};class WorkStealingScheduler {
private:std::vector<WorkStealingQueue> queues;std::vector<std::thread> workers;std::atomic<bool> stop{false};unsigned int threadCount;public:WorkStealingScheduler(unsigned int threadCount): queues(threadCount), threadCount(threadCount) {}~WorkStealingScheduler() {stop = true;for (auto& worker : workers) {if (worker.joinable()) worker.join();}}void start() {for (unsigned int i = 0; i < threadCount; ++i) {workers.emplace_back([this, i]() { workerThread(i); });}}void submitTask(unsigned int threadId, std::function<void()> task) {queues[threadId].pushTask(std::move(task));}private:void workerThread(unsigned int id) {std::mt19937 rng(id); // 用于随机选择其他线程队列std::uniform_int_distribution<unsigned int> dist(0, threadCount - 1);while (!stop) {// 尝试从自己的队列获取任务auto task = queues[id].popTask();if (!task) {// 如果没有任务,从其他线程的队列中窃取for (unsigned int attempt = 0; attempt < threadCount; ++attempt) {unsigned int victimId = dist(rng);if (victimId!= id) {task = queues[victimId].stealTask();if (task) break;}}}// 执行任务if (task) {(*task)();} else {// 如果没有任务可执行,线程暂时休眠一会儿std::this_thread::yield();}}}
};int main() {constexpr unsigned int numThreads = 4;WorkStealingScheduler scheduler(numThreads);scheduler.start();// 提交任务for (unsigned int i = 0; i < 20; ++i) {scheduler.submitTask(i % numThreads, [i]() {std::cout << "Task " << i << " executed by thread " << std::this_thread::get_id() << std::endl;});}// 等待所有任务完成std::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}
代码说明
1. WorkStealingQueue
- 每个线程都有一个 WorkStealingQueue,用于存储自己的任务。
- 支持本线程的任务弹出 (popTask) 和其他线程的任务窃取 (stealTask)。
2. WorkStealingScheduler
- 包含多个 WorkStealingQueue,每个线程对应一个队列。
- 提供 submitTask 接口向特定线程的队列添加任务。
- 每个线程会从自己的队列中取任务执行,如果没有任务会随机尝试从其他队列窃取任务。
3. 随机窃取
- 使用 std::mt19937 生成随机数,选择其他线程的队列窃取任务。
4. 任务执行
- 每个任务是一个 std::function<void()>,它可以封装任意的可调用对象。
5. 停止逻辑
- 调用 ~WorkStealingScheduler() 会停止所有线程。
示例输出
运行程序后会看到类似的输出:
Task 0 executed by thread 140390394259200
Task 1 executed by thread 140390385866496
Task 2 executed by thread 140390377473792
Task 3 executed by thread 140390394259200
Task 4 executed by thread 140390385866496
...
任务由多个线程并行执行,如果某个线程空闲,它会从其他线程的队列中窃取任务。
改进方向
1. 负载均衡
增加任务优先级,改进任务窃取逻辑。
2. 更高效的数据结构
使用无锁队列替代互斥锁保护的双端队列。
3. 任务追踪
添加任务完成信号,确保所有任务执行完毕后主线程退出。
通过这些改进,可以进一步提升调度器的性能和功能。