前言
之前的一篇文章讲述了future的优缺点,以及future的组合性,其中也讲述了构建任务DAG一些问题,同时给出了比较好的方案则是Executor。
Executor还未进入标准(C++23),Executor拥有惰性构建及良好的抽象模型来构建任务DAG,libunifex则给出了相当具有标准的实现,我们也借助libunifex的简短的代码来看下构建任务DAG的便利性。
Executor
在讲述例子之前,我们先来了解下Executor的一些概念
Executor有sender和receiver的概念,sender则是当任务构建的存放对象,使用sync_wait等函数去执行构建的任务(sender)时,会创建一个receiver来和sender进行连接,同时连接过程也是sender的拆解和receiver构建的过程。这里sender拆解是因为构建任务的过程中,一个任务就是一个sender,为了避免类型擦除,后一个任务就会拥有前一个任务创建的sender,举例来说,via() -> then -> then, 这样的一个流程时,构建的sender则为,then_sender<then_sender<just_sender>>,但是实际运行时开始则是运行最里边sender,那么就会进行拆解构建receiver,即为via_receiver<then_receiver<then_receiver>>,把这个receiver拿去一层一层的运行即可。
理解起来可能不那么明朗,使用libunifex的例子来看下。
libunifex的例子
#include <unifex/scheduler_concepts.hpp>
#include <unifex/single_thread_context.hpp>
#include <unifex/sync_wait.hpp>
#include <unifex/then.hpp>
#include <unifex/via.hpp>
#include <unifex/just.hpp>#include <iostream>using namespace unifex;int main() {single_thread_context context;auto sender = via(context.get_scheduler(), just(1)) | then([](int i) {return i + 45;});auto ret = sync_wait_r<int>(std::move(sender));std::cout << "ret is: " << *ret << std::endl;return 0;
}
sender构造
这是一个完整的例子来表述任务的依赖关系,首先看到我们初始化变量single_thread_context是一个调度器的context,libunifex是以conetxt内部来封装调度器,便于隐藏cpu或者gpu等调度运行细节。
我们就先来了解下single_thread_context的一些重要的部分:
namespace _single_thread {
class context {manual_event_loop loop_;std::thread thread_;public:context() : loop_(), thread_([this] { loop_.run(); }) {}~context() {loop_.stop();thread_.join();}auto get_scheduler() noexcept {return loop_.get_scheduler();}std::thread::id get_thread_id() const noexcept {return thread_.get_id();}
};
} // namespace _single_threadusing single_thread_context = _single_thread::context;
最后一行可以看到single_thread_context就是context这个类,包含一个线程成员和manual_event_loop成员,构造时就开启一个线程运行loop的run函数。调度器也就是loop的调度器。这么看来主要的核心代码还是在manual_event_loop中实现。但是可以知道context的主要作用是提供一个get_scheduler的抽象接口,便于能够使用真正的调度器。
继续来看下manual_event_loop类:
class context {public:class scheduler {class schedule_task {// ...template <typename Receiver>operation<Receiver> connect(Receiver&& receiver) const {return operation<Receiver>{(Receiver &&) receiver, loop_};}explicit schedule_task(context* loop) noexcept: loop_(loop){}context* const loop_;};explicit scheduler(context* loop) noexcept : loop_(loop) {}public:schedule_task schedule() const noexcept {return schedule_task{loop_};}private:context* loop_;};scheduler get_scheduler() {return scheduler{this};}void run();void stop();void enqueue(task_base* task);
};using manual_event_loop = _manual_event_loop::context;
这里的最后一行代码也可以看出来manual_event_loop就是_manual_event_loop下的context。context中要表述的东西有点多,先大致讲解下流程,后边我们一步一步分析代码就明朗了。首先可以看到也是我们知道的有一个调度器scheduler类,而scheduler中又包含schedule_task,在调用get_scheduler时返回scheduler的对象,某一时刻需要执行调度任务时会调用scheduler类中schedule()函数返回schedule_task,并使用schedule_task调用connect函数得到operation这个对象,operation继承自task_base,那就能知道operation本身可以归属为一个task,然后会调用enqueue将operation放到manual_event_loo也就是_manual_event_loop下的context的任务队列中,run函数检查到有任务则会直接运行该任务。知道别处调用stop就会推出这个调度器的运行。
通过以上讲述,大家可能大概明白调度器的一个运行逻辑,我们也先回到开头的例子中进行分析。
auto sender = via(context.get_scheduler(), just(1)) | then([](int i) {return i + 45;
});
来看这句代码,使用竖线来表述运行的前后关系,这也是c++20中一个调用的特性,写成我们熟知的形式就是:
then(via(context.get_scheduler(), just(1)), [](int i){return i + 45;
});
如此便可以知道,首先调用context.get_scheduler()和just传递给via,via计算出结果再传递给then。那我们首先看你just(1)
是做了什么?
namespace _just_cpo {inline const struct just_fn {template <typename... Values>constexpr auto operator()(Values&&... values) const {return _just::sender<Values...>{std::in_place, (Values&&)values...};}} just{};
} // namespace _just_cpo
using _just_cpo::just;
这里使用了cpo的一种技术手段来实现just的函数调用,cpo这里我们不展开来讲,这里我们只需要知道just函数最终会调用到operator()中,然后可以知道该函数仅仅是构造了一个_just::sender并将我们的传递的参数1保存下来。看到这里我们已经学会了一个sender,也就是最简单just_sender,那就是明白这个just主要作用就是构造一个sender并保存参数,已便于给后边then的形参使用。
继续看下via的调用:
namespace _via_cpo {inline const struct _fn {template (typename Scheduler, typename Sender)auto operator()(Scheduler&& sched, Sender&& send) const {return _via::sender<Sender, schedule_result_t<Scheduler>>{(Sender&&) send,schedule(sched)};}} via{};
} // namespace _via_cpousing _via_cpo::via;
代码中同理可知,调用operator()函数并传递Scheduler和Sender参数,同样也是会构造一个_via::sender,会将sender和schedule(sched)保存,这里看到会调用到schedule函数,同样使用cpo的手段调用到我们的manual_event_loop的Scheduler中的schedule函数。上文我们可以知道schedule函数会返回schedule_task对象。那也就是说这里_via::sender会保存sender和schedule_task,不过在_via::sender中名称有一点变化,传递进来的sender对象称为前驱Predecessor,schedule_task被称为后继Successor。此时我们构造出来的完整的类型就是:_via::sender<_just::sender<Values…>, _manual_event_loop::context::scheduler::schedule_task>。
然后继续看下then函数的调用:
template(typename Sender, typename Func)
auto operator()(Sender&& predecessor, Func&& func) const {return _then::sender<Sender, Func>{(Sender &&) predecessor, (Func &&) func};
}
这里除了使用cpo,同时还使用了tag_invoke的技术手段,cpo或者tag_invoke可以帮助找到实际的调用的函数是哪个(候选的函数名字基本一致),同样我们也不展开tag_invoke.
我们仅仅是告知大家会调用到这个函数,相信大家也猜到了这里也是会构造一个sender出来(_then::sender),会保存我们刚刚生成_via::sender对象以及自己带的函数对象。
小结
到这里我们就完成sender的构造:_then::sender<_via::sender<_just::sender<Values…>, _manual_event_loop::context::scheduler::schedule_task>, Func>。哇,好长。要注意的是这个sender不仅保存了完整的类型,同时也会将相应的对象保存下来,一层一层进行了包装。就是这样就没有了所谓的类型擦除。
receiver构造
继续来看例子:
auto ret = sync_wait_r<int>(std::move(sender));
通过sync_wait_r来对sender开始进行任务的执行,使用sync_wait_r获取到最终执行完成的结果。看下sync_wait_r的实现细节。
namespace _sync_wait_r_cpo {template <typename Result>struct _fn {template(typename Sender)(requires sender<Sender>)decltype(auto) operator()(Sender&& sender) const {using Result2 = non_void_t<wrap_reference_t<decay_rvalue_t<Result>>>;return _sync_wait::_impl<Result2>((Sender&&) sender);}};
} // namespace _sync_wait_r_cpo
代码很简单,仅仅是直接去调用_sync_wait::_impl函数:
template <typename Result, typename Sender>
std::optional<Result> _impl(Sender&& sender) {using promise_t = _sync_wait::promise<Result>;promise_t promise;manual_event_loop ctx;// Store state for the operation on the stack.auto operation = connect((Sender&&)sender,_sync_wait::receiver_t<Result>{promise, ctx});start(operation);ctx.run();switch (promise.state_) {case promise_t::state::done:return std::nullopt;case promise_t::state::value:return std::move(promise.value_).get();case promise_t::state::error:std::rethrow_exception(promise.exception_.get());default:std::terminate();}
}
impl函数通过connect函数对sender和receiver进行了绑定,返回了operation对象,对operation执行start,使用manual_event_loop对象的run函数对主线程进行阻塞直到所有的任务完成,最后会得到结果返回。那么先看关键的connect函数做了什么
首先要知道传递给connect的是_then::sender和临时构造了一个_sync_wait::receiver。
template(typename Sender, typename Receiver)friend auto tag_invoke(tag_t<unifex::connect>, Sender&& s, Receiver&& r)-> connect_result_t<member_t<Sender, Predecessor>, receiver_t<remove_cvref_t<Receiver>>> {return unifex::connect(static_cast<Sender&&>(s).pred_,receiver_t<remove_cvref_t<Receiver>>{static_cast<Sender&&>(s).func_,static_cast<Receiver&&>(r)});}
调用connect时会调用到_then::sender的tag_invoke中,函数中还是会继续调用connect函数,不过这一次的sender参数就是_then::sender的pred_成员,也就是_via::sender,同时还会构造一个_then中receiver_t,我们称他为then::receiver,这里then::receiver会保存_then::sender的func和传递进来的Receiver,也就是_sync_wait::receiver,那我们看下函数内部connect的参数的类型是什么:connect(_via::sender<…>, _then::receiver<Func, _sync_wait::receiver>).
接下来就会_via::sender的connect函数:
template <typename Receiver>auto connect(Receiver&& receiver) && {return unifex::connect(static_cast<Predecessor&&>(pred_),predecessor_receiver<Successor, Receiver>{static_cast<Successor&&>(succ_),static_cast<Receiver&&>(receiver)});}
像前边一样,_via::sender会取出来自己的pred作为现在connect的sender,自己的succ和传递进来的receiver包装成一个全新的receiver,也就是:
connect(_just::sender<Values…>, _via::receiver<_manual_event_loop::context::scheduler::schedule_task, _then::receiver<Func, _sync_wait::receiver>>)
接下来就会调用到_just::sender的tag_invoke函数中:
template(typename This, typename Receiver)auto tag_invoke(tag_t<connect>, This&& that, Receiver&& r)-> operation<Receiver, Values...> {return {static_cast<This&&>(that).values_, static_cast<Receiver&&>(r)};}
这里会构造一个operation对象,使用自己的values和_via::receiver作为参数,那么这个operation就返回给最初的调用地方,同时receiver也构造完成。
然后我们回到_sync_wait::_impl函数:
std::optional<Result> _impl(Sender&& sender) {// ...// Store state for the operation on the stack.auto operation = connect((Sender&&)sender,_sync_wait::receiver_t<Result>{promise, ctx});start(operation);// ...
}
我们就知道了这里的operation就是_just::sender构造的_just::operation, 接下来就调用start函数并传入调用。
任务的执行
然后我们发现会调用到_just::operation的start函数:
template <typename Receiver, typename... Values>
using operation = typename _op<remove_cvref_t<Receiver>, Values...>::type;template <typename Receiver, typename... Values>
struct _op<Receiver, Values...>::type {std::tuple<Values...> values_;Receiver receiver_;void start() & noexcept {std::apply([&](Values&&... values) {unifex::set_value((Receiver &&) receiver_, (Values &&) values...);},std::move(values_));}
};
可以看到start函数中就是使用apply函数针对receiver进行set_value。
接下来就会调用到_via::receiver的set_value函数
template <typename... Values>void set_value(Values&&... values) && noexcept {submit((Successor &&) successor_,value_receiver<Receiver, Values...>{{(Values &&) values...}, (Receiver &&) receiver_});}
set_value会调用summit函数,同时会将_via::receiver的成员successor_和receiver_作为参数传递,successor_是_manual_event_loop::context::scheduler::schedule_task, receiver_则是_then::receiver<Func, _sync_wait::receiver>,同样也会将receiver_成员和values包装成一个value_receiver,value_receiver就变成了value_receiver<Values, _then::receiver<Func, sync_wait::receiver>>,然后就去调用submit函数。
submit比较特殊不是直接去调用schedule_task的submit函数,而是会先构造一个submit::operation<schedule_task, value_receiver<…>>,然后调用start函数。
在submit::operation的构造函数中会调用submit::operatio的sender(schedule_task)的connect函数赋值给inner
template <typename Sender, typename Receiver>
class _op<Sender, Receiver>::type {template <typename Receiver2>explicit type(Sender&& sender, Receiver2&& receiver): receiver_((Receiver2 &&) receiver), inner_(unifex::connect((Sender &&) sender, wrapped_receiver{this})){}void start() & noexcept {unifex::start(inner_);}
};
先来看connect会调用schedule_task的connect函数,我们称这里wrapped_receiver为submit_receiver,那么传给connect函数的receiver就是submit_receiver<value_receiver<Values, _then::receiver<Func, _sync_wait::receiver>>>
template <typename Receiver>
operation<Receiver> connect(Receiver&& receiver) const {return operation<Receiver>{(Receiver &&) receiver, loop_};
}
那么我们可以看到把调度器给从sender拆解下来了,并且剩余的部分又包装了一个receiver,其实也就是这里才算完整的receiver构建完成。
继续我们schedule_task的connect函数,会构造一个_manual_event_loop::operation对象,传入了loop_对象,也就是调度器context对象。
在然后就开始从submit的start函数调用到_manual_event_loop::operation的start函数:
template <typename Receiver>
inline void _op<Receiver>::type::start() noexcept {loop_->enqueue(this);
}
终于到这里了,把_manual_event_loop::operation放到了调度器任务队列了。因为我们在构造conetext那里就会执行run函数,run函数就是从任务队列中取任务运行。
void context::run() {std::unique_lock lock{mutex_};while (true) {while (head_ == nullptr) {if (stop_) return;cv_.wait(lock);}auto* task = head_;head_ = task->next_;if (head_ == nullptr) {tail_ = nullptr;}lock.unlock();task->execute();lock.lock();}
}
这里看到会执行_manual_event_loop::operation的execute,最终会调用到execute_impl函数:
void execute_impl(task_base* t) noexcept {auto& self = *static_cast<type*>(t);if constexpr (is_stop_never_possible_v<stop_token_type>) {unifex::set_value(std::move(self.receiver_));} else {// ...}}
然后大家也可以猜到了,就是调用receiver的set_value函数了,我们又要回到_submit::receiver的中了。
template(typename... Values)
void set_value(Values&&... values) && noexcept {auto allocator = get_allocator(get_receiver());unifex::set_value(std::move(get_receiver()), (Values &&) values...);destroy(std::move(allocator));
}
这里的Values就是没有值,然后会调用_submit::receiver的receiver_成员的set_value函数,也就是value_receiver<Values, _then::receiver<Func, _sync_wait::receiver>>的函数:
void set_value() noexcept {std::apply([&](Values && ... values) noexcept {unifex::set_value(std::forward<Receiver>(receiver_), (Values &&) values...);},std::move(values_));
}
接着就调用_then::receiver的set_value函数:
template <typename... Values>
void set_value(Values&&... values) && noexcept {using result_type = std::invoke_result_t<Func, Values...>;unifex::set_value((Receiver &&) receiver_,std::invoke((Func &&) func_, (Values &&) values...));
}
这里只留下了一些关键代码方便理解,首先调用func函数并将返回值传递给下一个receiver的set_value函数。
然后就是_sync_wait::receiver的set_value函数:
template <typename... Values>
void set_value(Values&&... values) && noexcept {unifex::activate_union_member(promise_.value_, (Values&&)values...);promise_.state_ = promise<T>::state::value;signal_complete();
}void signal_complete() noexcept {ctx_.stop();
}
这里就是把then执行完的赋给promise_,并且更新完成的状态。这里的ctx就是最开始sync_wait_r函数中manual_event_loop对象,调用stop告知sync_wait_r函数中阻塞全部任务已完成可以结束程序了。
用一张图完整的来描述sender和receiver构建的整个过程:
总结
本篇文章通过阅读libunifex的源码,带着大家了解下c++的executor的任务构建流程及前后依赖的任务执行过程,尽管executor还尚未进入标准,libunifex已经算比较好的诠释了exeutor,当然除了我们上边讲解的简单的前后依赖的任务,还有并行的任务流,libunifex使用when_all来实现。除了单线程single_thread_context的context,libunifex还提供了线程池的context来供使用。
文中主要侧重了解大概的流程而忽略了一些实现细节,比如说cpo,tag_invoke,一些concept等等。只能大家自己去学习了,可以参考ref的链接
ref
https://zhuanlan.zhihu.com/p/395250667
https://mp.weixin.qq.com/s/oKCtKZq1R5PkVTSJxEhLfQ
https://blog.csdn.net/QcloudCommunity/article/details/125611481 系列
https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p1341r0.pdf