时间堆
上面的两种定时器设计(时间堆)都依赖一个固定周期触发的tick信号。
设计定时器的另一种实现思路是直接将超时时间当作tick周期,具体操作是每次都取出所有定时器中超时时间最小的超时值作为一个tick(绝对超时时间最小的定时器)。一旦tick触发,超时时间最小的定时器必然到期。处理完已超时的定时器后,再从剩余的定时器中找出超时时间最小的一个,并将这个最小时间作为下一个tick,如此反复,就可以实现较为精确的定时。
最小堆很适合处理这种定时方案,将所有定时器按最小堆来组织,以便获取到当前的最小超时时间,sylar采取的即是这种方案。
定时器设计
sylar的定时器采用最小堆设计,所有定时器根据绝对的超时时间点进行排序,每次取出离当前时间最近的一个超时时间点,计算出超时需要等待的时间,然后等待超时。超时时间到后,获取当前的绝对时间点,然后把最小堆里超时时间点小于这个时间点的定时器都收集起来,执行它们的回调函数。
注意,在注册定时事件时,一般提供的是相对时间,比如相对当前时间3秒后执行。sylar会根据传入的相对时间和当前的绝对时间计算出定时器超时时的绝对时间点,然后根据这个绝对时间点对定时器进行最小堆排序。因为依赖的是系统绝对时间,所以需要考虑校时因素,这点会在后面讨论。
sylar定时器的超时等待基于epoll_wait,精度只支持毫秒级,因为epoll_wait的超时精度也只有毫秒级。
关于定时器和IO协程调度器的整合:
IO协程调度器的idle协程会在调度器空闲时阻塞在epoll_wait上,等待IO事件发生。在之前的代码里,epoll_wait具有固定的超时时间,这个值是5秒钟。加入定时器功能后,epoll_wait的超时时间改用当前定时器的最小超时时间来代替。epoll_wait返回后,根据当前的绝对时间把已超时的所有定时器收集起来,执行它们的回调函数。
注意:epoll_wait的返回并不一定是超时引起的,也有可能是IO事件唤醒的,所以在epoll_wait返回后不能立即当作定时器已经超时处理,而是要再判断一下定时器有没有超时(通过比较当前的绝对时间和定时器的绝对超时时间确定一个定时器到底有没有超时)。
定时器实现
sylar的定时器对应Timer类,这个类的成员变量包括定时器的绝对超时时间点,是否重复执行,回调函数,以及一个指向TimerManager的指针,提供cancel/reset/refresh方法用于操作定时器。构造Timer时可以传入超时时间,也可以直接传入一个绝对时间。Timer的构造函数被定义成私有方式,只能通过TimerManager类来创建Timer对象。除此外,Timer类还提供了一个仿函数Comparator,用于比较两个Timer对象,比较的依据是绝对超时时间。
class TimerManager;
/*** @brief 定时器*/
class Timer : public std::enable_shared_from_this<Timer> {
friend class TimerManager;
public:/// 定时器的智能指针类型typedef std::shared_ptr<Timer> ptr;/*** @brief 取消定时器*/bool cancel();/*** @brief 刷新设置定时器的执行时间*/bool refresh();/*** @brief 重置定时器时间* @param[in] ms 定时器执行间隔时间(毫秒)* @param[in] from_now 是否从当前时间开始计算*/bool reset(uint64_t ms, bool from_now);
private:/*** @brief 构造函数* @param[in] ms 定时器执行间隔时间* @param[in] cb 回调函数* @param[in] recurring 是否循环* @param[in] manager 定时器管理器*/Timer(uint64_t ms, std::function<void()> cb,bool recurring, TimerManager* manager);/*** @brief 构造函数* @param[in] next 执行的时间戳(毫秒)*/Timer(uint64_t next);
private:/// 是否循环定时器bool m_recurring = false;/// 执行周期uint64_t m_ms = 0;/// 精确的执行时间uint64_t m_next = 0;/// 回调函数std::function<void()> m_cb;/// 定时器管理器TimerManager* m_manager = nullptr;
private:/*** @brief 定时器比较仿函数*/struct Comparator {/*** @brief 比较定时器的智能指针的大小(按执行时间排序)* @param[in] lhs 定时器智能指针* @param[in] rhs 定时器智能指针*/bool operator()(const Timer::ptr& lhs, const Timer::ptr& rhs) const;};
};
operator:定时器比较仿函数
在比较过程中,首先检查了lhs和rhs是否为空指针,然后比较它们的m_next成员变量的值。如果m_next相等,则通过调用get()方法获取原始指针并进行比较。
最后返回lhs.get() < rhs.get()是为了确保在m_next相等的情况下,根据原始指针的大小进行排序。
bool Timer::Comparator::operator()(const Timer::ptr &lhs, const Timer::ptr &rhs) const {if (!lhs && !rhs) {return false;}if (!lhs) {return true;}if (!rhs) {return false;}if (lhs->m_next < rhs->m_next) {return true;}if (rhs->m_next < lhs->m_next) {return false;}return lhs.get() < rhs.get();
}
定时器构造函数:
1、根据相对超时时间创建定时器,需要将相对定时时间+当前绝对时间得到绝对定时时间
2、根据绝对超时时间创建定时器
Timer::Timer(uint64_t ms, std::function<void()> cb,bool recurring, TimerManager *manager): m_recurring(recurring), m_ms(ms), m_cb(cb), m_manager(manager) {m_next = sylar::GetElapsedMS() + m_ms;
}Timer::Timer(uint64_t next): m_next(next) {
}
定时器类(Timer)的三种操作:cancel()、refresh()和reset()
cancel()函数用于取消定时器。它首先获取写锁,然后检查回调函数(m_cb)是否存在。如果存在,将回调函数置为空指针,并从定时器管理器(m_manager)的定时器集合中删除当前对象。最后返回true 表示成功取消定时器。
refresh() 函数用于刷新定时器的下一次触发时间。它同样先获取写锁,然后检查回调函数是否存在。如果不存在,直接返回false。接着在定时器管理器的定时器集合中查找当前对象,如果找不到,也返回false。找到后,从集合中删除当前对象,计算下一次触发时间,并将当前对象重新插入到定时器集合中。最后返回 true 表示刷新成功。
reset() 函数用于重置定时器的触发间隔和起始时间。它接受两个参数:ms 表示新的触发间隔,from_now 表示是否从当前时间开始计算。如果新的触发间隔与原来的相同且 from_now 为 false,则直接返回 true。否则,获取写锁,检查回调函数是否存在。如果不存在,直接返回false。接着在定时器管理器的定时器集合中查找当前对象,如果找不到,也返回 false。找到后,从集合中删除当前对象,根据 from_now 的值计算新的起始时间,更新触发间隔和下一次触发时间,并将当前对象重新插入到定时器集合中。最后返回 true 表示重置成功。
bool Timer::cancel() {TimerManager::RWMutexType::WriteLock lock(m_manager->m_mutex);if (m_cb) {//将回调函数置为空指针,并从定时器管理器(m_manager)的定时器集合中删除当前对象m_cb = nullptr;auto it = m_manager->m_timers.find(shared_from_this());m_manager->m_timers.erase(it);return true;}return false;
}bool Timer::refresh() {TimerManager::RWMutexType::WriteLock lock(m_manager->m_mutex);if (!m_cb) {return false;}auto it = m_manager->m_timers.find(shared_from_this());if (it == m_manager->m_timers.end()) {return false;}m_manager->m_timers.erase(it);m_next = sylar::GetElapsedMS() + m_ms;m_manager->m_timers.insert(shared_from_this());return true;
}bool Timer::reset(uint64_t ms, bool from_now) {if (ms == m_ms && !from_now) {return true;}TimerManager::RWMutexType::WriteLock lock(m_manager->m_mutex);if (!m_cb) {return false;}auto it = m_manager->m_timers.find(shared_from_this());if (it == m_manager->m_timers.end()) {return false;}m_manager->m_timers.erase(it);uint64_t start = 0;if (from_now) {start = sylar::GetElapsedMS();} else {start = m_next - m_ms;}m_ms = ms;m_next = start + m_ms;m_manager->addTimer(shared_from_this(), lock);return true;
}
shared_from_this():share_ptr.获取this指针,并用share_ptr管理,然后返回share_ptr.
定时器管理类TimerManager
成员变量:
m_mutex:读写锁类型,用于保护定时器集合的访问。
m_timers:定时器集合,存储所有的定时器对象。
m_tickled:布尔值,表示是否触发了onTimerInsertedAtFront函数。
m_previouseTime:上次执行时间,记录上一次定时器执行的时间。
成员函数:
构造函数:初始化定时器管理器对象。
析构函数:释放定时器管理器对象的资源。
addTimer:添加定时器,接受定时器的执行间隔时间、回调函数和是否循环作为参数,返回一个定时器对象的指针。
addConditionTimer:添加条件定时器,与addTimer类似,但还接受一个条件作为参数。
getNextTimer:获取到最近一个定时器执行的时间间隔(毫秒)。
listExpiredCb:获取需要执行的定时器的回调函数列表。
hasTimer:判断是否有定时器存在。
onTimerInsertedAtFront:当有新的定时器插入到定时器的首部时执行的虚函数,需要在派生类中实现。
addTimer(Timer::ptr val, RWMutexType::WriteLock& lock):将定时器添加到管理器中,需要传入定时器对象和写锁。
detectClockRollover:检测服务器时间是否被调后了,接受当前时间(毫秒)作为参数。
TimerManager定时器管理器:
用于管理和调度定时器对象。它提供了添加定时器、获取下一个定时器执行时间、获取需要执行的定时器的回调函数列表等功能。
所有的Timer对象都由TimerManager类进行管理,TimerManager包含一个std::set类型的Timer集合(实现定时器的最小堆结构,因为set里的元素总是排序过的,使set根据绝对定时时间自定义排序),以便获取到当前的最小定时器。
TimerManager提供创建定时器,获取最近一个定时器的超时时间,以及获取全部已经超时的定时器回调函数的方法,并且提供了一个onTimerInsertedAtFront()方法,这是一个虚函数,由IOManager继承时实现,当新的定时器插入到Timer集合的首部时,TimerManager通过该方法来通知IOManager立刻更新当前的epoll_wait超时。TimerManager还负责检测是否发生了校时,由detectClockRollover方法实现。
/*** @brief 定时器管理器*/
class TimerManager {
friend class Timer;
public:/// 读写锁类型typedef RWMutex RWMutexType;/*** @brief 构造函数*/TimerManager();/*** @brief 析构函数*/virtual ~TimerManager();/*** @brief 添加定时器* @param[in] ms 定时器执行间隔时间* @param[in] cb 定时器回调函数* @param[in] recurring 是否循环定时器*/Timer::ptr addTimer(uint64_t ms, std::function<void()> cb,bool recurring = false);/*** @brief 添加条件定时器* @param[in] ms 定时器执行间隔时间* @param[in] cb 定时器回调函数* @param[in] weak_cond 条件* @param[in] recurring 是否循环*/Timer::ptr addConditionTimer(uint64_t ms, std::function<void()> cb,std::weak_ptr<void> weak_cond,bool recurring = false);/*** @brief 到最近一个定时器执行的时间间隔(毫秒)*/uint64_t getNextTimer();/*** @brief 获取需要执行的定时器的回调函数列表* @param[out] cbs 回调函数数组*/void listExpiredCb(std::vector<std::function<void()> >& cbs);/*** @brief 是否有定时器*/bool hasTimer();
protected:/*** @brief 当有新的定时器插入到定时器的首部,执行该函数*/virtual void onTimerInsertedAtFront() = 0;/*** @brief 将定时器添加到管理器中*/void addTimer(Timer::ptr val, RWMutexType::WriteLock& lock);
private:/*** @brief 检测服务器时间是否被调后了*/bool detectClockRollover(uint64_t now_ms);
private:/// MutexRWMutexType m_mutex;/// 定时器集合std::set<Timer::ptr, Timer::Comparator> m_timers;/// 是否触发onTimerInsertedAtFrontbool m_tickled = false;/// 上次执行时间uint64_t m_previouseTime = 0;
};
构造函数:
将上次执行时间m_previouseTime更新为当前绝对时间
TimerManager::TimerManager() {m_previouseTime = sylar::GetElapsedMS();
}TimerManager::~TimerManager() {
}
添加定时器addTimer函数:
参数:接受定时器的执行间隔时间(毫秒)、回调函数和是否循环作为参数。
实现:首先创建一个Timer对象,并将其指针存储在局部变量timer中。然后获取写锁,调用addTimer函数将定时器添加到管理器中。最后返回创建的定时器对象的指针。
Timer::ptr TimerManager::addTimer(uint64_t ms, std::function<void()> cb, bool recurring) {Timer::ptr timer(new Timer(ms, cb, recurring, this));RWMutexType::WriteLock lock(m_mutex);addTimer(timer, lock);return timer;
}
向定时器管理器中添加一个定时器:
它接受一个指向定时器的指针和一个写锁作为参数。首先,它将定时器插入到定时器列表中,并检查是否需要将该定时器插入到列表的前面。如果需要,它会调用onTimerInsertedAtFront()函数。最后,它会解锁写锁。
void TimerManager::addTimer(Timer::ptr val, RWMutexType::WriteLock &lock) {const auto it = m_timers.insert(val).first;const bool insertAtFront = (it == m_timers.begin()) && m_shouldTriggerAtFrontFuncFlag;if (insertAtFront) {m_shouldTriggerAtFrontFuncFlag = false;}lock.unlock();if (insertAtFront == true) {onTimerInsertedAtFront();}
}
检测弱指针是否仍然有效:
在定时器触发时,检查弱指针是否仍然有效,如果有效,则执行回调函数。这可以用于避免在回调函数中访问已经失效的对象。
static void OnTimer(std::weak_ptr<void> weak_cond, std::function<void()> cb) {const std::shared_ptr<void> tmp = weak_cond.lock();if (tmp != nullptr) {cb();}
}
addConditionTimer函数:
定时器触发时检查条件是否满足来执行回调函数
支持创建条件定时器,也就是在创建定时器时绑定一个变量,在定时器触发时判断一下该变量是否仍然有效,如果变量无效,那就取消触发
参数:接受定时器的执行间隔时间(毫秒)、回调函数、条件(弱指针)和是否循环作为参数。
实现:调用addTimer函数,将条件和回调函数绑定在一起,并将绑定后的函数作为回调函数传递给addTimer函数。最后返回创建的定时器对象的指针。
Timer::ptr TimerManager::addConditionTimer(uint64_t ms, std::function<void()> cb, std::weak_ptr<void> weak_cond, bool recurring) {return addTimer(ms, std::bind(&OnTimer, weak_cond, cb), recurring);
}
总结起来,addTimer函数用于添加普通的定时器,而addConditionTimer函数用于添加带有条件的定时器。在addConditionTimer函数中,通过std::bind将条件和回调函数绑定在一起,以便在定时器触发时检查条件是否满足。
获取下一个定时器的触发时间:
函数首先使用读写锁(RWMutexType)来保护对定时器列表(m_timers)的访问。然后,将标志位m_shouldTriggerAtFrontFuncFlag设置为true,表示需要触发定时器队列的头部。
接下来,函数检查定时器列表是否为空。如果为空,则返回一个特殊的值~0ull,表示没有可用的定时器。
如果定时器列表不为空,函数获取第一个定时器的指针,并将其存储在变量next中。然后,获取当前时间的毫秒数,存储在变量now_ms中。
接下来,函数比较当前时间和下一个定时器的触发时间。如果当前时间大于等于下一个定时器的触发时间,则返回0,表示立即触发定时器。否则,返回下一个定时器的触发时间减去当前时间的差值,表示还需要等待的时间。
uint64_t TimerManager::getNextTimer() {RWMutexType::ReadLock lock(m_mutex);m_shouldTriggerAtFrontFuncFlag = true;if (m_timers.empty()) {return ~0ull;}const Timer::ptr &next = *m_timers.begin();uint64_t now_ms = sylar::GetElapsedMS();if (now_ms >= next->m_next) {return 0;} else {return next->m_next - now_ms;}
}
列出过期定时器并执行其回调函数
列出所有已经过期的定时器,并将它们的回调函数存储在cbs向量中。它首先获取当前时间,然后通过读写锁保护的方式访问定时器列表。如果定时器列表为空或者第一个定时器的下一次触发时间大于当前时间,则直接返回。否则,它会遍历定时器列表,找到所有已经过期的定时器,并将它们的回调函数添加到cbs向量中。对于循环定时器,它会更新下一次触发时间并将其重新插入到定时器列表中;对于非循环定时器,它会将回调函数置为nullptr。
void TimerManager::listExpiredCb(std::vector<std::function<void()>> &cbs) {const uint64_t now_ms = sylar::GetElapsedMS(); // 获取当前时间(毫秒)std::vector<Timer::ptr> expired; // 存储已过期的定时器{RWMutexType::ReadLock lock(m_mutex); // 读取锁保护m_timersif (m_timers.empty()) {return; // 如果定时器列表为空,直接返回}}RWMutexType::WriteLock lock(m_mutex); // 写入锁保护m_timersif (m_timers.empty()) {return; // 再次检查定时器列表是否为空,避免在上一个锁的保护下发生变化}bool rollover = false;if (SYLAR_UNLIKELY(IsClockRollover(now_ms))) {rollover = true; // 检测时钟回滚情况}if (!rollover && ((*m_timers.begin())->m_next > now_ms)) {return; // 如果第一个定时器的下一次触发时间大于当前时间,直接返回}const Timer::ptr now_timer(new Timer(now_ms)); // 创建一个新的定时器对象,表示当前时间auto it = rollover ? m_timers.end() : m_timers.lower_bound(now_timer); // 根据是否发生时钟回滚选择起始迭代器位置while (it != m_timers.end() && (*it)->m_next == now_ms) {++it; // 找到下一个不匹配的定时器}expired.insert(expired.begin(), m_timers.begin(), it); // 将已过期的定时器插入到expired向量中m_timers.erase(m_timers.begin(), it); // 从定时器列表中移除已过期的定时器cbs.reserve(expired.size()); // 预留足够的空间来存储回调函数for (auto &timer : expired) {cbs.push_back(timer->m_cb); // 将定时器的回调函数添加到cbs向量中if (timer->m_recurring) {timer->m_next = now_ms + timer->m_ms; // 如果定时器是循环的,更新下一次触发时间m_timers.insert(timer); // 将定时器重新插入到定时器列表中} else {timer->m_cb = nullptr; // 如果定时器不是循环的,将回调函数置为nullptr}}
}
检测时钟回滚情况:
接受当前时间(毫秒)作为参数,并返回一个布尔值表示是否发生了时钟回滚。如果当前时间小于上一次记录的时间,并且当前时间小于上一次记录的时间减去一个小时(60分钟),则认为发生了时钟回滚。最后,它会更新上一次记录的时间。
bool TimerManager::IsClockRollover(uint64_t now_ms) {bool rollover = false;if (now_ms < m_previouseTime &&now_ms < (m_previouseTime - 60 * 60 * 1000)) {rollover = true;}m_previouseTime = now_ms;return rollover;
}
检查定时器管理器中是否有定时器:
使用读锁保护定时器列表,并返回一个布尔值表示定时器列表是否为空。
bool TimerManager::hasTimer() {RWMutexType::ReadLock lock(m_mutex);return !m_timers.empty();
}