探究C++20协程(5)——基于挂起实现无阻塞的定时器

devtools/2024/9/22 19:37:04/

实现目标

当用传统的线程 sleep 函数来让程序等待时,实际上是在阻塞当前线程。阻塞意味着这个线程在指定的时间(例如100毫秒)内无法执行任何其他任务。这种方式虽然简单,但效率低下,因为它导致CPU资源在等待期间未被充分利用。

协程提供了一种更加高效的方式来处理这种等待情况。它们在单个线程内部执行,并且能够在不阻塞线程的情况下挂起和恢复。当一个协程遇到需要等待的操作(如 sleep)时,它会挂起自身,而不会阻塞所在的线程。这使得线程可以转而去执行其他的协程。

这种挂起和恢复的过程是由协程调度器管理的。在协程 sleep 的这100毫秒期间,调度器可以安排其他协程运行,从而充分利用CPU资源。等到等待时间结束后,原来挂起的协程会被自动恢复执行。这样可以在不牺牲响应性的前提下,更有效地管理和利用系统资源。

实现效果如下,直接使用co_wait让协程无阻塞等待1s

Task<int, AsyncExecutor> simple_task2() {debug("task 2 start ...");using namespace std::chrono_literals;// 之前的写法,用 sleep_for 让当前线程睡眠 1 秒// std::this_thread::sleep_for(1s);// 等待 1 秒,注意 1s 是 chrono_literals 的字面值写法co_await 1s;debug("task 2 returns after 1s.");co_return 2;
}

这里的1s 实际上这是 C++ 11 对字面值的一种支持,本质上就是一个运算符重载,类型是 duration< long long >。除了秒以外,时间的单位也可以是毫秒、纳秒、分钟、小时等等,这些 C++ 11 的 duration 都已经提供了完善的支持,因此只要对 duration 做支持即可。

为 duration 实现 await_transform

template<typename ResultType, typename Executor>
struct TaskPromise {...template<typename _Rep, typename _Period>SleepAwaiter await_transform(std::chrono::duration<_Rep, _Period> &&duration) {return SleepAwaiter(&executor, std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());}...
}

需要在TaskPromise里引入了一个新的类型 SleepAwaiter,它的任务有两个:

  • 确保当前协程在若干毫秒之后恢复执行。
  • 确保当前协程恢复执行时要调度到对应的调度器上。
struct SleepAwaiter {explicit SleepAwaiter(AbstractExecutor *executor, long long duration) noexcept: _executor(executor), _duration(duration) {}bool await_ready() const { return false; }void await_suspend(std::coroutine_handle<> handle) const {// 自定义的延时执行工具类,全局只需要一个实例static Scheduler scheduler;scheduler.execute([this, handle]() {// _duration 毫秒之后执行下面的代码_executor->execute([handle]() {handle.resume();});}, _duration);}void await_resume() {}private:AbstractExecutor *_executor;long long _duration;
}

最重要的是await_suspend:这个函数会被调用来暂停协程。它使用一个静态的Scheduler对象,调度一个延迟任务。这个任务在指定的_duration毫秒后由执行器_executor恢复协程。

这当中最为关键的就是 Scheduler 的实现了,这个类实际上本身就是一个独立的定时任务调度器。

定时任务调度器 Scheduler

Scheduler是一个管理和执行定时任务的单例类。它可以安排任务在指定时间后执行。为了有效管理这些任务,使用优先级队列来存储待执行的任务,这些任务按照执行时间的优先级进行排序。

定时任务的描述类型

为了方便管理定时任务,我们需要定义一个类型 DelayedExecutable。这是一个用于描述定时任务的类。它包含一个将要执行的函数和该函数的计划执行时间(绝对时间)。该类通过以下方式计算任务的计划执行时间:

  • 获取当前时间。
  • 根据提供的延迟(以毫秒为单位)计算出未来的绝对执行时间。
class DelayedExecutable {public:DelayedExecutable(std::function<void()> &&func, long long delay) : func(std::move(func)) {using namespace std;using namespace std::chrono;auto now = system_clock::now();// 当前的时间戳,单位毫秒auto current = duration_cast<milliseconds>(now.time_since_epoch()).count();// 计算出任务的计划执行时间scheduled_time = current + delay;}// 调用时,返回从当前时间还需要多少毫秒到任务执行时间long long delay() const {using namespace std;using namespace std::chrono;auto now = system_clock::now();auto current = duration_cast<milliseconds>(now.time_since_epoch()).count();return scheduled_time - current;}long long get_scheduled_time() const {return scheduled_time;}void operator()() {func();}private:long long scheduled_time;std::function<void()> func;
};

为了将 DelayedExecutable 存入优先级队列当中,我们还需要给它提给一个比较大小的类:

class DelayedExecutableCompare {public:bool operator()(DelayedExecutable &left, DelayedExecutable &right) {return left.get_scheduled_time() > right.get_scheduled_time();}
};

这个类就很简单了,直接将对 DelayedExecutable 的比较转换成对它们的执行时间的比较。使用这个类对 DelayedExecutable 进行排序时,会使得时间靠前的对象排到前面。

定时任务调度器

Scheduler类使用了多线程和条件变量来管理和执行任务。它的工作原理如下:

  • 维护一个优先级队列,队列中的任务按执行时间排序。
  • 使用一个工作线程来检查队列并执行任务。
  • 如果队列为空,工作线程会等待新任务的到来。
  • 如果当前时间未到队列头部任务的执行时间,工作线程将等待直到那个时间。
  • 当任务的执行时间到达,从队列中取出任务(恢复协程)并执行。
class Scheduler {private:std::condition_variable queue_condition;std::mutex queue_lock;// 注意这里改用优先级队列std::priority_queue<DelayedExecutable, std::vector<DelayedExecutable>, DelayedExecutableCompare> executable_queue;std::atomic<bool> is_active;std::thread work_thread;void run_loop() {while (is_active.load(std::memory_order_relaxed) || !executable_queue.empty()) {std::unique_lock lock(queue_lock);if (executable_queue.empty()) {queue_condition.wait(lock);if (executable_queue.empty()) {continue;}}// 从这里开始于 LooperExecutor 不同,这里需要判断优先级队头的任务,也就是最先要执行的任务是否需要立即执行auto executable = executable_queue.top();long long delay = executable.delay();if (delay > 0) {// 队头的任务还没到执行时间,等待 delay 毫秒auto status = queue_condition.wait_for(lock, std::chrono::milliseconds(delay));// 如果等待期间没有延时比 delay 更小的任务加入,这里就会返回 timeoutif (status != std::cv_status::timeout) {// 不是 timeout,需要重新计算队头的延时continue;}}executable_queue.pop();lock.unlock();executable();}}public:Scheduler() {... // 与 LooperExecutor 完全相同}~Scheduler() {... // 与 LooperExecutor 完全相同}void execute(std::function<void()> &&func, long long delay) {delay = delay < 0 ? 0 : delay;std::unique_lock lock(queue_lock);if (is_active.load(std::memory_order_relaxed)) {// 只有队列为空或者比当前队头任务的延时更小时,需要调用 notify_one// 其他情况只需要按顺序依次执行即可bool need_notify = executable_queue.empty() || executable_queue.top().delay() > delay;executable_queue.push(DelayedExecutable(std::move(func), delay));lock.unlock();if (need_notify) {queue_condition.notify_one();}}}void shutdown(bool wait_for_complete = true) {... // 与 LooperExecutor 完全相同}void join() {if (work_thread.joinable()) {work_thread.join();}}
};

关于阻塞的说明

虽然Scheduler的实现在等待任务执行时会阻塞一个线程,这种方式比阻塞多个线程要高效得多。在实际应用中,这可以显著减少系统资源的占用,提高线程的利用率。通过这种方式,即使有多个协程或任务需要延时执行,只需要阻塞一个专门的调度线程而不是每个协程对应的线程。

结果展示

在这里插入图片描述

任务开始执行

  • 51:59.226:任务(ID:7900)开始执行。
  • 51:59.329:100毫秒后,任务(ID:7900)记录了一个时间点,显示从开始到现在大约过去了103毫秒。
    第二任务开始
  • 51:59.331:另一个任务(ID:38916)开始执行。
  • 52:00.339:大约一秒后,任务(ID:38916)完成。时间点从开始(51:59.331)到结束(52:00.339)约为1.008秒,符合预期的一秒内。
    任务返回结果
  • 52:00.340:任务(ID:7900)从任务2获得结果(返回值:2)。
    第三个时间点和第三任务开始
  • 52:00.854:500毫秒后,任务(ID:7900)记录另一个时间点,从51:59.329到52:00.854约为1.525秒。
  • 52:00.855:接着,第三个任务(ID:38248)开始。
    第三任务结束
  • 52:02.869:第三个任务(ID:23916)完成,历时约2秒,从52:00.855到52:02.869。
    任务返回结果并结束
  • 52:02.872:任务(ID:7900)从任务3获得结果(返回值:3)。
  • 52:02.873:任务(ID:7900)完成,总结果是6。
  • 52:02.874:几乎同时,从另一个获取点(ID:8524)也报告任务完成,结果同样是6。
    结束运行循环
  • 52:02.874:任务(ID:7900)退出运行循环。
  • 52:02.876:另一个任务(ID:11984)也退出运行循环。

通过上述分析,我们可以看到每个任务的开始和结束时间记录非常明确,并且与预期执行时间相匹配:

100毫秒的任务实际耗时约103毫秒。
1秒的任务实际耗时约1.008秒。
500毫秒后记录的时间与预期相符。
2秒的任务实际耗时约2.014秒。

完整代码

#define __cpp_lib_coroutine
#define  _CRT_SECURE_NO_WARNINGS
#include <coroutine>
#include <exception>
#include <iostream>
#include <thread>
#include <functional>
#include <mutex>
#include <list>
#include <optional>
#include <cassert>
#include <queue>
#include <future>
#include <chrono>
#include <ctime>
using namespace std;
void print_time() {// 获取当前时间点auto now = std::chrono::system_clock::now();// 转换为time_t类型auto now_c = std::chrono::system_clock::to_time_t(now);// 获取本地时间auto local_time = std::localtime(&now_c);// 输出当前的分钟和秒(不输出年、月、日和小时)std::cout << "Current time: ";std::cout << std::setfill('0') << std::setw(2) << local_time->tm_min << ":";  // 分钟std::cout << std::setfill('0') << std::setw(2) << local_time->tm_sec;        // 秒// 处理毫秒部分auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch());auto sec_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::seconds(now_c));std::cout << "." << std::setfill('0') << std::setw(3) << (now_ms.count() % 1000) << std::endl;  // 毫秒
}
void debug(const std::string& s) {print_time();printf(" %d %s\n",std::this_thread::get_id(), s.c_str());
}void debug(const std::string& s, int x) {print_time();printf("%d %s %d\n", std::this_thread::get_id(), s.c_str(), x);
}
// 定时器
class DelayedExecutable {
public:DelayedExecutable(std::function<void()>&& func, long long delay) : func(std::move(func)) {using namespace std;using namespace std::chrono;auto now = system_clock::now();auto current = duration_cast<milliseconds>(now.time_since_epoch()).count();scheduled_time = current + delay;}long long delay() const {using namespace std;using namespace std::chrono;auto now = system_clock::now();auto current = duration_cast<milliseconds>(now.time_since_epoch()).count();return scheduled_time - current;}long long get_scheduled_time() const {return scheduled_time;}void operator()() {func();}private:long long scheduled_time;std::function<void()> func;
};class DelayedExecutableCompare {
public:bool operator()(DelayedExecutable& left, DelayedExecutable& right) {return left.get_scheduled_time() > right.get_scheduled_time();}
};class Scheduler {
private:std::condition_variable queue_condition;std::mutex queue_lock;std::priority_queue<DelayedExecutable, std::vector<DelayedExecutable>, DelayedExecutableCompare> executable_queue;std::atomic<bool> is_active;std::thread work_thread;void run_loop() {while (is_active.load(std::memory_order_relaxed) || !executable_queue.empty()) {std::unique_lock lock(queue_lock);if (executable_queue.empty()) {queue_condition.wait(lock);if (executable_queue.empty()) {continue;}}auto executable = executable_queue.top();long long delay = executable.delay();if (delay > 0) {auto status = queue_condition.wait_for(lock, std::chrono::milliseconds(delay));if (status != std::cv_status::timeout) {// a new executable should be executed before.continue;}}executable_queue.pop();lock.unlock();executable();}debug("run_loop exit.");}
public:Scheduler() {is_active.store(true, std::memory_order_relaxed);work_thread = std::thread(&Scheduler::run_loop, this);}~Scheduler() {shutdown(false);join();}void execute(std::function<void()>&& func, long long delay) {delay = delay < 0 ? 0 : delay;std::unique_lock lock(queue_lock);if (is_active.load(std::memory_order_relaxed)) {bool need_notify = executable_queue.empty() || executable_queue.top().delay() > delay;executable_queue.push(DelayedExecutable(std::move(func), delay));lock.unlock();if (need_notify) {queue_condition.notify_one();}}}void shutdown(bool wait_for_complete = true) {is_active.store(false, std::memory_order_relaxed);if (!wait_for_complete) {// clear queue.std::unique_lock lock(queue_lock);decltype(executable_queue) empty_queue;std::swap(executable_queue, empty_queue);lock.unlock();}queue_condition.notify_all();}void join() {if (work_thread.joinable()) {work_thread.join();}}
};// 调度器
class AbstractExecutor {
public:virtual void execute(std::function<void()>&& func) = 0;
};class NoopExecutor : public AbstractExecutor {
public:void execute(std::function<void()>&& func) override {func();}
};class NewThreadExecutor : public AbstractExecutor {
public:void execute(std::function<void()>&& func) override {std::thread(func).detach();}
};class AsyncExecutor : public AbstractExecutor {
public:void execute(std::function<void()>&& func) override {auto future = std::async(func);}
};class LooperExecutor : public AbstractExecutor {
private:std::condition_variable queue_condition;std::mutex queue_lock;std::queue<std::function<void()>> executable_queue;std::atomic<bool> is_active;std::thread work_thread;void run_loop() {while (is_active.load(std::memory_order_relaxed) || !executable_queue.empty()) {std::unique_lock lock(queue_lock);if (executable_queue.empty()) {queue_condition.wait(lock);if (executable_queue.empty()) {continue;}}auto func = executable_queue.front();executable_queue.pop();lock.unlock();func();}debug("run_loop exit.");}public:LooperExecutor() {is_active.store(true, std::memory_order_relaxed);work_thread = std::thread(&LooperExecutor::run_loop, this);}~LooperExecutor() {shutdown(false);if (work_thread.joinable()) {work_thread.join();}}void execute(std::function<void()>&& func) override {std::unique_lock lock(queue_lock);if (is_active.load(std::memory_order_relaxed)) {executable_queue.push(func);lock.unlock();queue_condition.notify_one();}}void shutdown(bool wait_for_complete = true) {is_active.store(false, std::memory_order_relaxed);if (!wait_for_complete) {// clear queue.std::unique_lock lock(queue_lock);decltype(executable_queue) empty_queue;std::swap(executable_queue, empty_queue);lock.unlock();}queue_condition.notify_all();}
};template<typename T>
struct Result
{explicit Result() = default;explicit Result(T&& value) : _value(value) {}explicit Result(std::exception_ptr&& exception_ptr) : _exception_ptr(exception_ptr) {}T get_or_throw() {if (_exception_ptr) {std::rethrow_exception(_exception_ptr);}return _value;}
private:T _value;std::exception_ptr _exception_ptr;
};
// 用于协程initial_suspend()时直接将运行逻辑切入调度器的等待体
struct DispatchAwaiter {explicit DispatchAwaiter(AbstractExecutor* executor) noexcept: _executor(executor) {}bool await_ready() const { return false; }void await_suspend(std::coroutine_handle<> handle) const {_executor->execute([handle]() {handle.resume();});}void await_resume() {}private:AbstractExecutor* _executor;
};//对于时间的等待体
struct SleepAwaiter {explicit SleepAwaiter(AbstractExecutor* executor, long long duration) noexcept: _executor(executor), _duration(duration) {}bool await_ready() const { return false; }void await_suspend(std::coroutine_handle<> handle) const {static Scheduler scheduler;scheduler.execute([this, handle]() {_executor->execute([handle]() {handle.resume();});}, _duration);}void await_resume() {}private:AbstractExecutor* _executor;long long _duration;
};// 前向声明
template<typename ResultType, typename Executor>
struct Task;template<typename Result, typename Executor>
struct TaskAwaiter {explicit TaskAwaiter(AbstractExecutor* executor, Task<Result, Executor>&& task) noexcept: _executor(executor), task(std::move(task)) {}TaskAwaiter(TaskAwaiter&& completion) noexcept: _executor(completion._executor), task(std::exchange(completion.task, {})) {}TaskAwaiter(TaskAwaiter&) = delete;TaskAwaiter& operator=(TaskAwaiter&) = delete;constexpr bool await_ready() const noexcept {return false;}// 在这里增加了调度器的运行void await_suspend(std::coroutine_handle<> handle) noexcept {task.finally([handle, this]() {_executor->execute([handle]() {handle.resume();});});}Result await_resume() noexcept {return task.get_result();}private:Task<Result, Executor> task;AbstractExecutor* _executor;
};// 对应修改增加调度器的传入
template<typename ResultType,typename Executor>
struct TaskPromise {//此时调度器将开始调度,执行的逻辑DispatchAwaiter initial_suspend() { return DispatchAwaiter(&executor); }std::suspend_always final_suspend() noexcept { return {}; }Task<ResultType, Executor> get_return_object() {return Task{ std::coroutine_handle<TaskPromise>::from_promise(*this) };}//在这里返回等待器对象时需要将调度器的指针带上template<typename _ResultType, typename _Executor>TaskAwaiter<_ResultType, _Executor> await_transform(Task<_ResultType, _Executor>&& task) {return TaskAwaiter<_ResultType, _Executor>(&executor, std::move(task));}template<typename _Rep, typename _Period>SleepAwaiter await_transform(std::chrono::duration<_Rep, _Period>&& duration) {return SleepAwaiter(&executor, std::chrono::duration_cast<std::chrono::milliseconds>(duration).count());}void unhandled_exception() {std::lock_guard lock(completion_lock);result = Result<ResultType>(std::current_exception());completion.notify_all();notify_callbacks();}void return_value(ResultType value) {std::lock_guard lock(completion_lock);result = Result<ResultType>(std::move(value));completion.notify_all();notify_callbacks();}ResultType get_result() {std::unique_lock lock(completion_lock);if (!result.has_value()) {completion.wait(lock);}return result->get_or_throw();}void on_completed(std::function<void(Result<ResultType>)>&& func) {std::unique_lock lock(completion_lock);if (result.has_value()) {auto value = result.value();lock.unlock();func(value);}else {completion_callbacks.push_back(func);}}private:std::optional<Result<ResultType>> result;Executor executor;std::mutex completion_lock;std::condition_variable completion;std::list<std::function<void(Result<ResultType>)>> completion_callbacks;void notify_callbacks() {auto value = result.value();for (auto& callback : completion_callbacks) {callback(value);}completion_callbacks.clear();}};template<typename ResultType,typename Executor = NewThreadExecutor>
struct Task {using promise_type = TaskPromise<ResultType, Executor>;ResultType get_result() {return handle.promise().get_result();}Task& then(std::function<void(ResultType)>&& func) {handle.promise().on_completed([func](auto result) {try {func(result.get_or_throw());}catch (std::exception& e) {// ignore.}});return *this;}Task& catching(std::function<void(std::exception&)>&& func) {handle.promise().on_completed([func](auto result) {try {result.get_or_throw();}catch (std::exception& e) {func(e);}});return *this;}Task& finally(std::function<void()>&& func) {handle.promise().on_completed([func](auto result) { func(); });return *this;}explicit Task(std::coroutine_handle<promise_type> handle) noexcept : handle(handle) {}Task(Task&& task) noexcept : handle(std::exchange(task.handle, {})) {}Task(Task&) = delete;Task& operator=(Task&) = delete;~Task() {if (handle) handle.destroy();}private:std::coroutine_handle<promise_type> handle;
};Task<int, AsyncExecutor> simple_task2() {debug("task 2 start ...");using namespace std::chrono_literals;co_await 1s;debug("task 2 returns after 1s.");co_return 2;
}Task<int, NewThreadExecutor> simple_task3() {debug("in task 3 start ...");using namespace std::chrono_literals;co_await 2s;debug("task 3 returns after 2s.");co_return 3;
}Task<int, LooperExecutor> simple_task() {debug("task start ...");using namespace std::chrono_literals;co_await 100ms;debug("after 100ms ...");auto result2 = co_await simple_task2();debug("returns from task2: ", result2);co_await 500ms;debug("after 500ms ...");auto result3 = co_await simple_task3();debug("returns from task3: ", result3);co_return 1 + result2 + result3;
}int main() {auto simpleTask = simple_task();simpleTask.then([](int i) {debug("simple task end: ", i);}).catching([](std::exception& e) {//debug("error occurred", e.what());});try {auto i = simpleTask.get_result();debug("simple task end from get: ", i);}catch (std::exception& e) {//debug("error: ", e.what());}return 0;
}

http://www.ppmy.cn/devtools/10409.html

相关文章

指针专题(4)【qsort函数的概念和使用】

1.前言 上节我们学习了指针的相关内容&#xff0c;本节我们在有指针的基础的条件下学习一下指针的运用&#xff0c;那么废话不多说&#xff0c;我们正式进入今天的学习 2.回调函数 我们既然已经学习了指针的相关基础&#xff0c;那么我们此时就可以用指针来实现回调函数 而回…

ruoyi-cloud-plus添加一个不要认证的公开新页面

文章目录 一、前端1. 组件创建2. src/router/index.ts3. src/permission.ts 二、后端1. 设计思想2. ruoyi-gateway.yml3. 开发Controller 版本RuoYiCloudPlusv2.1.2plus-uiVue3 ts 以新增一个公开的课程搜索页面为例。 一、前端 1. 组件创建 在view目录下创建一个页面的vue…

华媒舍:百度竞价排名如何提升点击率

在网络推广中&#xff0c;提升点击率是十分重要的。运用百度搜索引擎广告是一种常用的提升点击率的形式。而百度竞价推广是搜索引擎所提供的一种付费流量方法&#xff0c;根据提高网站在搜索结果中的排名&#xff0c;可以有效提升点击率。下面我们就详细介绍如何运用百度竞价推…

720云手机电动云台全新上市,让手机能自动拍摄亿万像素VR全景

2024年3月&#xff0c;720云正式发布手机拍摄专业级VR全景的云台——720云手机电动云台(LG-05)&#xff0c;这款云台极大程度上把拍摄过程简化到极致。云台电机搭载先进的2.4G/蓝牙双模无线遥控功能和4500mAh电池&#xff0c;保证了无需连续充电的长时间使用。全自动拍摄模式和…

JS - 以工厂模式和原型模式方式建造对象、JS的垃级回收机制、数组的使用

创建对象的方式 使用工厂方法来建造对象 在JS中我们可以通过以下方式进行创建对象&#xff1a; var obj {name:"孙悟空",age:18,gender:"男",sayName:function(){alert(this.name);}};var obj2 {name:"猪八戒",age:28,gender:"男",…

OpenHarmony 视图加载——ImageViewZoom

简介 ImageViewZoom 支持加载 Resource 或 PixelMap 图片&#xff0c;支持设置图像显示类型功能&#xff0c;支持缩放功能&#xff0c;支持平移功能&#xff0c;双击放大功能&#xff0c;可以监听图片大小&#xff0c;资源变化事件&#xff0c;支持清除显示图片功能。 效果展示…

[生活][杂项] 如何正确打开编织袋

编织袋打开的正确姿势 面对单线分离右边的线头&#xff0c;然后依次拉开即可

Stabble Diffusion 本地部署教程详解

引言 随着人工智能技术的快速发展&#xff0c;文本到图像&#xff08;Text-to-Image, T2I&#xff09;模型已经成为研究和应用的热点。其中&#xff0c;Stable Diffusion 是一款开源的 T2I 模型&#xff0c;以其出色的图像质量和生成能力而受到广泛关注10。然而&#xff0c;要…