在多线程编程中,定时器是一个非常重要的功能,它能够让我们在特定的时间点执行特定的任务。本文将介绍一个基于libuv库实现的C++定时器管理器——TimerManager,它通过创建多个工作线程,每个线程运行一个uv loop来高效地管理定时器任务。
1.TimerManager概述
TimerManager是一个用于管理定时器的类,它在初始化时会根据设置的线程数thread_num创建相应数量的工作线程。每个工作线程中都运行着一个uv loop,这些uv loop负责处理定时器任务。TimerManager提供了线程安全的接口,允许用户在不同线程中提交定时器任务、停止或等待定时器管理器。
2.核心组件
2.1 TimerManager类
TimerManager类是整个定时器管理器的核心,它包含以下几个关键部分:
线程数设置:通过thread_num成员变量设置工作线程的数量。
定时器回调类型:定义了timer_callback_t类型,这是一个函数对象,用于处理定时器到期时的回调操作。
定时器请求结构:TimerRequest结构体封装了定时器的相关信息,如超时时间、重复间隔和回调函数。
工作线程结构:WorkThread结构体代表每个工作线程,包含线程对象、请求队列、异步句柄等。
状态管理:通过status_原子变量管理定时器管理器的状态,如初始状态、启动状态、停止状态等。
2.2 工作线程
每个工作线程都运行在一个独立的uv loop中,它们通过WorkThreadMain函数启动。在该函数中,会初始化uv loop,并创建一个异步句柄async_handle用于接收来自其他线程的定时器请求。工作线程会不断运行uv loop,处理定时器任务,直到收到停止或等待的指令。
2.3 定时器任务处理
当用户通过StartTimer方法提交一个定时器任务时,TimerManager会根据round-robin算法选择一个工作线程,并将定时器请求添加到该线程的请求队列中。工作线程在收到请求后,会根据请求的细节创建一个uv_timer_t对象,并启动定时器。当定时器到期时,会调用相应的回调函数。
3. 关键功能
3.1 启动定时器管理器
通过调用Start方法,TimerManager会根据设置的线程数创建相应数量的工作线程,并启动每个线程中的uv loop。在启动过程中,会使用std::promise和std::future机制确保所有工作线程都已成功启动并准备好接收定时器请求。
3.2 提交定时器任务
StartTimer方法允许用户提交定时器任务。用户需要提供超时时间、重复间隔和回调函数。TimerManager会根据round-robin算法选择一个工作线程,并将定时器请求添加到该线程的请求队列中。工作线程在收到请求后,会创建uv_timer_t对象并启动定时器。
3.3 停止和等待定时器管理器
Stop方法用于停止定时器管理器,它会通知所有工作线程停止运行,并等待所有已启动的定时器任务完成。Join方法则会等待所有定时器任务主动停止后才返回。
libuvC_25">4.基于libuv实现的C++定时器管理器
...struct TimerManager {size_t thread_num = 1;public:// TimerCallbackReturnEnum 这个名字不会被使用, 所以随便起起可以enum TimerCallbackReturnEnum : unsigned int {/// 该标志设置则表明主动停止 timer.kStopTimer = 0x01,/// 若觉得 callback 执行时间可能超过了 1ms, 则应该设置该标志.kMoreThan1MS = 0x02};/*** timer callback 类型.** timer callback 的调用形式如下:** param: status; 若为 0, 则表明当前定时器到期了; 若不为 0, 则表明由于出现了某些错误才会被调用, 此后* 当前回调不会再调用.** return; callback 的返回值会被当作位掩码来对待, 参见 TimerCallbackReturnEnum.* unsigned int (unsigned int status);*/using timer_callback_t = std::function<unsigned int(unsigned int)>;public:void Start();~TimerManager() noexcept;public:/* 只有这里的接口才是线程安全的, 即可以在不同线程, 在同一个 TimerManager 实例上调用 Stop(), Join(), 或者* StartTimer(). 但是不能在线程 A 上针对同一个 TimerManager 实例(下称 T )调用 Start(), 在线程 B 上调用* T.StartTimer().*//*** 停止 TimerManager.** 此时尚未被处理的 timer 不会被处理, 即直接以错误状态来调用 timer callback. 此时会一直等待已经处理的 timer* 主动停止才会返回.*/void Stop() {DoStopOrJoin(TimerManagerStatus::kStop);return ;}/*** 停止 TimerManager.** 此时会一直等待所有的 timer 主动停止才会返回.*/void Join() {DoStopOrJoin(TimerManagerStatus::kJoin);return ;}/*** 启动一个定时器.** 若 timeout 为 0, 则会立即调用 cb; 否则会在 timeout ms 之后调用 cb; 在第一次调用 cb 之后, 若 repeat* 为 0, 则停止当前定时器; 否则之后会每 repeat ms 之后调用一次 cb.*/void StartTimer(uint64_t timeout, uint64_t repeat, const timer_callback_t &cb);/* 本来这些都是 private 就行了.** 但是我想重载个 operator<<(ostream &out, TimerManagerStatus); 本来是把这个重载当作是 static member, 然* 后编译报错. 貌似只能作为 non-member, 这样子的话, TimerManagerStatus 也就必须得是 public 了.*/
public:enum class TimerManagerStatus : unsigned int {kInitial = 0,kStarted,kStop,kJoin};struct TimerRequest {uint64_t timeout;uint64_t repeat;timer_callback_t cb;public:TimerRequest(uint64_t timeout_arg, uint64_t repeat_arg, const timer_callback_t &cb_arg):timeout(timeout_arg),repeat(repeat_arg),cb(cb_arg) {}unsigned int Call(unsigned int status) noexcept {return cb(status);}};struct WorkThread {bool started = false;std::thread thread;// 不变量 31: 加锁顺序, 先 vec_mux 再 handle_mux.std::mutex vec_mux;/* request_vec 的内存是由 work thread 来分配.** 不变量 4: 对于其他线程而言, 其检测到若 request_vec 为 nullptr, 则表明对应的 work thread 不再工作,* 此时不能往 request_vec 中加入请求. 反之, 则表明 work thread 正常工作, 此时可以压入元素.*/std::unique_ptr<std::vector<std::unique_ptr<TimerRequest>>> request_vec;std::shared_mutex handle_mux;/* 不变量 3: 若 async_handle != nullptr, 则表明 async_handle 指向着的 uv_async_t 已经被初始化, 此时* 对其调用 uv_async_send() 不会触发 SIGSEGV.** 其实这里可以使用读写锁, 因为 uv_async_send() 是线程安全的, 但是 uv_close(), uv_async_init() 这些* 并不是. 也即在执行 uv_async_send() 之前加读锁, 其他操作加写锁.*/uv_async_t *async_handle = nullptr;public:void AsyncSend() noexcept {handle_mux.lock_shared();if (async_handle) {uv_async_send(async_handle); // 当 send() 失败了怎么办???}handle_mux.unlock_shared();return ;}/*** 将 timer_req 表示的请求追加到当前 work thread 中.** 若抛出异常, 则表明追加失败, 此时 timer_req 引用的对象没有任何变化. 若未抛出异常, 则根据 timer_req* 是否为空来判断请求是否成功追加, 即当不为空时, 表明请求成功追加到当前 work thread 中.*/void AddRequest(std::unique_ptr<TimerRequest> &timer_req);};private:std::atomic<TimerManagerStatus> status_{TimerManagerStatus::kInitial}; // lock-freestd::atomic_uint seq_num{0};std::unique_ptr<std::vector<WorkThread>> work_threads_;private:TimerManagerStatus GetStatus() noexcept {return status_.load(std::memory_order_relaxed);}void SetStatus(TimerManagerStatus status) noexcept {status_.store(status, std::memory_order_relaxed);return ;}void JoinAllThread() noexcept {for (WorkThread &work_thread : *work_threads_) {if (!work_thread.started)continue ;work_thread.thread.join(); // join() 理论上不会抛出异常的.}}void DoStopOrJoin(TimerManagerStatus op);private:static void WorkThreadMain(TimerManager *timer_manager, size_t idx, std::promise<void> *p) noexcept;static void OnAsyncHandle(uv_async_t* handle) noexcept;
};inline std::ostream& operator<<(std::ostream &out, TimerManager::TimerManagerStatus status) {out << static_cast<unsigned int>(status);return out;
}
4. 1示例代码
以下是一个使用TimerManager的示例代码:
...struct TimerCB {int timer_id = 0;int times = 0;int total_times = 5;public:TimerCB(int id, int total_times_arg) noexcept :timer_id(id),total_times(total_times_arg) {LOG(INFO) << "id: " << timer_id << "; times: " << times << "; total_times: " << total_times;}unsigned int operator()(unsigned int status) noexcept {LOG(INFO) << "id: " << timer_id << "; times: " << times << "; total_times: " << total_times<< "; status: " << status;if (++times >= total_times)return TimerManager::kStopTimer;return 0;}
};int main(int argc, char **argv) {google::SetUsageMessage("TimerManager Test");google::SetVersionString("1.0.0");google::ParseCommandLineFlags(&argc, &argv, false);google::InitGoogleLogging(argv[0]);google::InstallFailureSignalHandler();TimerManager timer_manager;timer_manager.thread_num = FLAGS_work_thread_num;timer_manager.Start();timer_manager.StartTimer(0, 0, TimerCB(1, 3));timer_manager.StartTimer(3000, 0, TimerCB(2, 3));timer_manager.StartTimer(0, 2000, TimerCB(3, 3));timer_manager.StartTimer(1000, 2000, TimerCB(4, 3));timer_manager.Join();return 0;
}
If you need the complete source code, please add the WeChat number (c17865354792)
在该示例中,我们首先初始化TimerManager,设置工作线程数,并启动定时器管理器。然后,我们提交了几个定时器任务,每个任务都有不同的超时时间和重复间隔。最后,我们调用Join方法等待所有定时器任务完成。
总结
TimerManager是一个基于libuv实现的高效C++定时器管理器。它通过创建多个工作线程,每个线程运行一个uv loop,实现了对定时器任务的高效管理。TimerManager提供了线程安全的接口,方便用户在多线程环境中使用。通过合理地使用TimerManager,我们可以在C++程序中轻松地实现定时任务的功能。
Welcome to follow WeChat official account【程序猿编码】