参考博客:https://blog.csdn.net/weixin_51322383/article/details/130474753
https://zhuanlan.zhihu.com/p/721880618
阻塞队列blockqueue
1、阻塞队列的设计流程是什么样的
它的底层是用deque进行管理的
阻塞队列主要是围绕着生产者消费者模式进行多线程的同步和互斥管理
初始化阶段:
-
队列的最大容量(capacity_),防止队列无限增长。
-
关闭标志(isClose_),标记队列是否已关闭,以便终止等待操作。
-
互斥锁(mtx_)用于同步对队列的访问,确保线程安全。
-
条件变量(condConsumer_ 和 condProducer_)用于控制生产者和消费者线程的阻塞与唤醒。
生产者插入数据:
push_back 和 push_front 函数用于插入数据。
-
检查队列是否已满:如果队列已满,生产者线程通过 condProducer_.wait(locker) 进入 等待状态,直到队列有空位(由消费者线程消费掉数据)。
-
插入数据:如果队列未满,生产者会将数据插入队列的尾部或头部。
-
唤醒消费者:插入数据后,通过 condConsumer_.notify_one() 唤醒一个等待的消费者线程,告知它队列中有新的数据可以消费。
消费者获取数据:
pop 和 pop 带超时 的函数用于从队列获取数据。
-
检查队列是否为空:如果队列为空,消费者线程通过 condConsumer_.wait(locker) 进入 等待状态,直到生产者插入数据。
-
取出数据:如果队列有数据,消费者会从队列中取出一个元素。
-
唤醒生产者:消费者取出数据后,通过 condProducer_.notify_one() 唤醒一个等待的生产者线程,告知它队列中有空位可以插入数据。
关闭队列:
Close:当队列需要关闭时,调用 Close 方法来:
-
清空队列中的所有元素。
-
设置 isClose_ 标志,标识队列已关闭。
-
唤醒所有正在等待的生产者和消费者线程,确保它们能够终止或做出响应。
检查队列的一些状态:
-
提供了一些辅助函数来检查队列的状态:
-
empty:检查队列是否为空。
-
full:检查队列是否已满。
-
size:获取当前队列中元素的数量。
-
capacity:获取队列的最大容量。
-
2、为什么这么设置阻塞队列呢?
(正常的一些容器队列它没有办法就是在两个线程情况下进行同步阻塞这种接口,它不能解决多线程环境下的一些同步问题,所以需要设置一个阻塞队列)
设置阻塞队列是为了让日志系统在异步写入的时候进行使用,
主要是为了解决多线程环境中的生产者与消费者之间的同步问题,
具体来说就是首先第一个点保证生产者与消费者同步之间的关系 :生产者不会在队列满的时候继续生产,消费者不会在队列空时继续消费。
第二点保证多线程访问共享资源的同步 为了避免数据竞争和不一致的队列状态,所有涉及队列操作的代码段都需要加锁,确保同一时刻只有一个线程可以对队列进行修改或者访问。
3、为什么阻塞队列每个操作都要先上个锁?
确保了线程安全
多线程访问共享资源的问题:
-
数据竞争(Race Condition):多个线程同时修改队列的状态或数据,可能导致队列处于不一致的状态。例如,两个线程同时尝试修改队列的头部或尾部,可能导致数据丢失或损坏。
-
资源冲突:多个线程同时访问队列,可能会破坏队列的结构或导致其他线程无法正确地读写数据。
-
插入操作( push_back 、 push_front ):插入数据可能会改变队列的状态(例如队列大小或数据顺序)。如果没有加锁保护,当多个线程同时插入数据时,会导致队列状态不一致或数据丢失。
-
删除操作( pop ):删除数据会修改队列的内容,若在没有加锁的情况下删除数据,可能会发生“删除了但其他线程还访问到它”的问题。
-
查询操作(如 size 、 empty ):虽然这些操作不直接修改队列中的数据,但它们依然需要确保读取队列状态时是安全的。如果没有加锁,可能会在读取队列状态时,另一个线程修改队列,导致读取到不一致的状态。
-
列清空操作( clear 、 Close ):清空队列涉及到删除所有元素,这个操作必须是原子性的。为了防止其他线程在清空过程中继续操作队列,必须加锁,确保清空操作不会和其他线程的插入或删除操作发生冲突。
-
mutex (互斥锁):用于确保在同一时刻只有一个线程可以访问队列的资源,防止数据竞争。
-
condition_variable (条件变量):用于实现线程的等待与唤醒机制,例如当队列满时,生产者线程需要等待消费者线程消费数据,这时生产者线程就会被阻塞,直到消费者通知它继续工作。
每个操作都需要加锁,是为了在多线程并发操作时保护队列的状态和数据,确保操作的原子性,防止由于并发问题导致队列的状态不一致或者数据损坏。通过合理的使用锁和条件变量,阻塞队列能够在多线程环境中安全、高效地工作。
日志系统
1、日志系统的运行流程?
使用单例模式(局部静态变量法)这种方法不需要加锁和解锁 获取实例
通过实例调用Instance()->init()函数完成初始化,若设置阻塞队列大小大于0则选择异步日志,等于0则选择同步日志,更新isaysnc变量。
通过实例调用write_log()函数写日志,首先根据当前时刻创建日志(前缀为时间,后缀为.log,并更新日期today和当前行数linecount。)
在write_log()函数内部,通过isasync变量判断写日志的方法:如果是异步,工作线程将要写的内容放在阻塞队列中,由写线程在阻塞队列中取出数据,然后写入日志:如果是同步,直接写入日志文件。
2、说一下你的日志的运行机制?(简化版)
使用单例模式创建日志系统,对服务器运行状态、错误信息和访问数据进行记录,该系统可以实现按天分类,超行分类功能,可以根据实际情况分别使用同步和异步写入两种方式。
其中异步写入方式,将生产者-消费者模型封装为阻塞队列,创建一个写线程,工作线程将要写的内容push进队列,写线程从队列中取出内容,写入日志文件。
3、为什么要异步?和同步的区别是什么?
**写入日志时会产生比较多的系统调用,若是某条日志信息过大,会阻塞日志系统,造成系统瓶颈。**异步方式采用 生产者-消费者模型,具有较高的并发能力。
生产者-消费者模型,并发编程中的经典模型。
以多线程为例,为了实现线程间数据同步,生产者线程与消费者线程共享一个缓冲区,其中生产者线程往缓冲区中push消息,消费者线程从缓冲区中pop消息。
阻塞队列,将生产者-消费者模型进行封装,使用循环数组实现队列,作为两者共享的缓冲区。
异步日志, **将所写的日志内容先存入阻塞队列,写线程从阻塞队列中取出内容,写入日志。**可以提高系统的并发性能。
同步日志,日志写入函数与工作线程串行执行,由于涉及到I/O操作,当单条日志比较大的时候,同步模式会阻塞整个处理流程,服务器所能处理的并发能力将有所下降,尤其是在峰值的时候,写日志可能成为系统的瓶颈。
写入方式通过初始化时 是否设置队列大小(表示在队列中可以放几条数据)来判断,若队列大小为0,则为同步,否则为异步。
若异步,则将日志信息加入阻塞队列,同步则加锁向文件中写。
blockqueue.h
# ifndef BLOCKQUEUE_H
# define BLOCKQUEUE_H#include <deque>
#include <condition_variable>
#include <mutex>
#include <sys/time.h>
using namespace std;template<typename T>
class BlockQueue {
public:explicit BlockQueue(size_t maxsize = 1000);//构造函数~BlockQueue();//析构函数bool empty();//检查队列是否为空bool full();//检查队列是否已满void push_back(const T& item);//将一个元素插入队列的尾部void push_front(const T& item); //将一个元素插入队列的头部bool pop(T& item); // 弹出的任务放入itembool pop(T& item, int timeout); // 等待时间void clear();//清空队列中所有的元素T front();//获取队列中的第一个元素(但不移除)T back();//获取队列中的最后一个元素(但不移除)size_t capacity();//获取队列的最大容量size_t size();//获取当前队列中元素的数量void flush();//手动唤醒一个正在等待的消费者线程void Close();//清空队列private:deque<T> deq_; // 底层数据结构mutex mtx_; // 锁bool isClose_; // 关闭标志size_t capacity_; // 容量condition_variable condConsumer_; // 消费者条件变量condition_variable condProducer_; // 生产者条件变量
};template<typename T>
BlockQueue<T>::BlockQueue(size_t maxsize) : capacity_(maxsize) {assert(maxsize > 0);//确保队列的最大容量不是0或者负值isClose_ = false;
}template<typename T>
BlockQueue<T>::~BlockQueue() {Close();
}template<typename T>
void BlockQueue<T>::Close() {// lock_guard<mutex> locker(mtx_); // 操控队列之前,都需要上锁// deq_.clear(); // 清空队列clear();isClose_ = true;condConsumer_.notify_all();condProducer_.notify_all();
}template<typename T>
void BlockQueue<T>::clear() {lock_guard<mutex> locker(mtx_);deq_.clear();
}template<typename T>
bool BlockQueue<T>::empty() {lock_guard<mutex> locker(mtx_);return deq_.empty();
}template<typename T>
bool BlockQueue<T>::full() {lock_guard<mutex> locker(mtx_);return deq_.size() >= capacity_;
}template<typename T>
void BlockQueue<T>::push_back(const T& item) {// 注意,条件变量需要搭配unique_lockunique_lock<mutex> locker(mtx_); while(deq_.size() >= capacity_) { // 队列满了,需要等待 while是为了防止虚假唤醒condProducer_.wait(locker); // 首先释放锁,然后进入阻塞状态,//暂停生产,等待消费者唤醒生产条件变量}deq_.push_back(item);condConsumer_.notify_one(); // 唤醒消费者
}template<typename T>
void BlockQueue<T>::push_front(const T& item) {unique_lock<mutex> locker(mtx_);while(deq_.size() >= capacity_) { // 队列满了,需要等待condProducer_.wait(locker); // 暂停生产,等待消费者唤醒生产条件变量 挂起当前的线程 并同时释放互斥锁}deq_.push_front(item);condConsumer_.notify_one(); // 唤醒消费者
}template<typename T>
bool BlockQueue<T>::pop(T& item) {unique_lock<mutex> locker(mtx_);while(deq_.empty()) {condConsumer_.wait(locker); // 队列空了,需要等待}item = deq_.front();deq_.pop_front();condProducer_.notify_one(); // 唤醒生产者return true;
}template<typename T>
bool BlockQueue<T>::pop(T &item, int timeout) {unique_lock<std::mutex> locker(mtx_);while(deq_.empty()){if(condConsumer_.wait_for(locker, std::chrono::seconds(timeout)) == std::cv_status::timeout){return false;}if(isClose_){return false;}}item = deq_.front();deq_.pop_front();condProducer_.notify_one();return true;
}template<typename T>
T BlockQueue<T>::front() {lock_guard<std::mutex> locker(mtx_);return deq_.front();
}template<typename T>
T BlockQueue<T>::back() {lock_guard<std::mutex> locker(mtx_);return deq_.back();
}template<typename T>
size_t BlockQueue<T>::capacity() {lock_guard<std::mutex> locker(mtx_);return capacity_;
}template<typename T>
size_t BlockQueue<T>::size() {lock_guard<std::mutex> locker(mtx_);return deq_.size();
}// 唤醒消费者
template<typename T>
void BlockQueue<T>::flush() {condConsumer_.notify_one();
}
# endif
log.h
#ifndef LOG_H
#define LOG_H#include <mutex>
#include <string>
#include <thread>
#include <sys/time.h>
#include <string.h>
#include <stdarg.h> // vastart va_end
#include <assert.h>
#include <sys/stat.h> // mkdir
#include "blockqueue.h"
#include "../buffer/buffer.h"class Log {
public:// 初始化日志实例(阻塞队列最大容量、日志保存路径、日志文件后缀)void init(int level, const char* path = "./log", const char* suffix =".log",int maxQueueCapacity = 1024);static Log* Instance(); //单例模式static void FlushLogThread(); //异步写日志的入口 异步写日志公有方法,调用私有方法asyncWritevoid write(int level, const char *format,...); //写入一条日志信息 将输出内容按照标准格式整理void flush();//强制刷新日志缓冲区 将内容写入磁盘 //刷新int GetLevel(); //获取当前的日志的等级void SetLevel(int level);//设置日志输出的最低等级bool IsOpen() { return isOpen_; } // 检查日志系统是否已经初始化并打开。private:Log(); // 日志系统的构造函数 因为是单例模式 void AppendLogLevelTitle_(int level);//根据日志等级添加日志头部信息。virtual ~Log();//日志系统的析构函数 因为是单例模式void AsyncWrite_(); // 异步模式下从阻塞队列中取出队列并写入文件 异步写日志方法private:static const int LOG_PATH_LEN = 256; // 日志文件最长文件名static const int LOG_NAME_LEN = 256; // 日志最长名字static const int MAX_LINES = 50000; // 单个日志文件内的最长日志条数const char* path_; //路径名const char* suffix_; //后缀名int MAX_LINES_; // 最大日志行数int lineCount_; //日志行数记录int toDay_; //按当天日期区分文件bool isOpen_; Buffer buff_; // 输出的内容,缓冲区int level_; // 日志等级bool isAsync_; // 是否开启异步日志FILE* fp_; //打开log的文件指针std::unique_ptr<BlockQueue<std::string>> deque_; //阻塞队列std::unique_ptr<std::thread> writeThread_; //写线程的指针std::mutex mtx_; //同步日志必需的互斥量
};//宏定义封装了write和flush接口
#define LOG_BASE(level, format, ...) do {Log* log = Log::Instance();if (log->IsOpen() && log->GetLevel() <= level) {log->write(level, format, ##__VA_ARGS__); log->flush();}
} while(0);// 四个宏定义,主要用于不同类型的日志输出,也是外部使用日志的接口
// ...表示可变参数,__VA_ARGS__就是将...的值复制到这里
// 前面加上##的作用是:当可变参数的个数为0时,这里的##可以把把前面多余的","去掉,否则会编译出错。
//直接调用这些宏定义 就是调用了这些日志中的write和flush接口了
#define LOG_DEBUG(format, ...) do {LOG_BASE(0, format, ##__VA_ARGS__)} while(0);
#define LOG_INFO(format, ...) do {LOG_BASE(1, format, ##__VA_ARGS__)} while(0);
#define LOG_WARN(format, ...) do {LOG_BASE(2, format, ##__VA_ARGS__)} while(0);
#define LOG_ERROR(format, ...) do {LOG_BASE(3, format, ##__VA_ARGS__)} while(0);#endif //LOG_H
log.cpp
#include "log.h"// 构造函数
Log::Log() {fp_ = nullptr;deque_ = nullptr;writeThread_ = nullptr;lineCount_ = 0;toDay_ = 0;isAsync_ = false;
}Log::~Log() {while(!deque_->empty()) {deque_->flush(); // 唤醒消费者,处理掉剩下的任务}deque_->Close(); // 关闭队列writeThread_->join(); // 等待当前线程完成手中的任务if(fp_) { // 冲洗文件缓冲区,关闭文件描述符lock_guard<mutex> locker(mtx_);flush(); // 清空缓冲区中的数据fclose(fp_); // 关闭日志文件}
}// 唤醒阻塞队列消费者,开始写日志
void Log::flush() {if(isAsync_) { // 只有异步日志才会用到dequedeque_->flush();//这里是唤醒消费者线程}fflush(fp_); // 清空输入缓冲区
}// 懒汉模式 局部静态变量法(这种方法不需要加锁和解锁操作)
Log* Log::Instance() {static Log log;return &log;
}// 异步日志的写线程函数//异步模式只有两个线程 一个主线程 一个写线程
void Log::FlushLogThread() {Log::Instance()->AsyncWrite_();
}// 写线程真正的执行函数
void Log::AsyncWrite_() {string str = "";while(deque_->pop(str)) {lock_guard<mutex> locker(mtx_);fputs(str.c_str(), fp_);}
}// 初始化日志实例
void Log::init(int level, const char* path, const char* suffix, int maxQueCapacity) {isOpen_ = true;level_ = level;path_ = path;suffix_ = suffix;if(maxQueCapacity) { // 异步方式这里支持同步和异步的转换isAsync_ = true;if(!deque_) { // 为空则创建一个unique_ptr<BlockQueue<std::string>> newQue(new BlockQueue<std::string>);// 因为unique_ptr不支持普通的拷贝或赋值操作,所以采用move// 将动态申请的内存权给deque,newDeque被释放deque_ = move(newQue); // 左值变右值,掏空newDequeunique_ptr<thread> newThread(new thread(FlushLogThread));//创建专门写日志的线程writeThread_ = move(newThread);//保存线程实例}} else {isAsync_ = false;}lineCount_ = 0;//确认日志文件名time_t timer = time(nullptr);//获取当前的时间戳struct tm *sysTime = localtime(&timer);//转化为本地时间struct tm t = *sysTime;//拷贝时间信息path_ = path;suffix_ = suffix;char fileName[LOG_NAME_LEN] = {0};snprintf(fileName, LOG_NAME_LEN - 1, "%s/%04d_%02d_%02d%s", path_, t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, suffix_);toDay_ = t.tm_mday;//创建或打开日志文件{lock_guard<mutex> locker(mtx_);//加锁保证线程安全buff_.RetrieveAll(); //清空日志缓冲区if(fp_) { // 重新打开flush(); //刷新缓冲区fclose(fp_); //关闭文件}fp_ = fopen(fileName, "a"); // 打开文件读取并附加写入if(fp_ == nullptr) { //如果文件打开失败。创建目录后重试mkdir(path_, 0777); //创建路径权限为最大fp_ = fopen(fileName, "a"); // 生成目录文件(最大权限)}assert(fp_ != nullptr); //确保文件成功打开}
}void Log::write(int level, const char *format, ...) {//获取当前时间struct timeval now = {0, 0};gettimeofday(&now, nullptr);//时间戳获取time_t tSec = now.tv_sec; //秒部分struct tm *sysTime = localtime(&tSec); //转换为本地时间struct tm t = *sysTime; //拷贝时间va_list vaList; // 可变参数列表// 日志日期 日志行数 如果不是今天或行数超了//判断是否需要切换日志文件if (toDay_ != t.tm_mday || (lineCount_ && (lineCount_ % MAX_LINES == 0))){unique_lock<mutex> locker(mtx_);//加锁保护日志文件操作locker.unlock();//提前解锁 保护临界区char newFile[LOG_NAME_LEN];char tail[36] = {0};snprintf(tail, 36, "%04d_%02d_%02d", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday);if (toDay_ != t.tm_mday) // 时间不匹配,则替换为最新的日志文件名{snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s%s", path_, tail, suffix_);toDay_ = t.tm_mday;//更新日志日期lineCount_ = 0;//重置行计数器}else {//如果行数达到上限,创建新的分文件snprintf(newFile, LOG_NAME_LEN - 72, "%s/%s-%d%s", path_, tail, (lineCount_ / MAX_LINES), suffix_);}locker.lock();flush();//刷新当前文件缓冲区fclose(fp_);//关闭当前文件fp_ = fopen(newFile, "a");//打开新文件assert(fp_ != nullptr);//确保文件打开成功}// 在buffer内生成一条对应的日志信息{ //格式化日志内容unique_lock<mutex> locker(mtx_);//加锁保护日志生成和写入lineCount_++; //增加行计数//写入时间信息int n = snprintf(buff_.BeginWrite(), 128, "%d-%02d-%02d %02d:%02d:%02d.%06ld ",t.tm_year + 1900, t.tm_mon + 1, t.tm_mday,t.tm_hour, t.tm_min, t.tm_sec, now.tv_usec);buff_.HasWritten(n);//更新缓冲区写入位置AppendLogLevelTitle_(level); //写入日志等级//写入日志内容va_start(vaList, format);//初始化可变参数列表int m = vsnprintf(buff_.BeginWrite(), buff_.WritableBytes(), format, vaList);//格式化日志内容va_end(vaList);//清理可变参数列表buff_.HasWritten(m); //更新缓冲区写入位置buff_.Append("
", 2); //添加换行符和字符串解释符//写入日志 异步或者同步if(isAsync_ && deque_ && !deque_->full()) { // 异步方式(加入阻塞队列中,等待写线程读取日志信息)deque_->push_back(buff_.RetrieveAllToStr());} else { // 同步方式(直接向文件中写入日志信息)fputs(buff_.Peek(), fp_); // 同步就直接写入文件}buff_.RetrieveAll(); // 清空buff}
}// 添加日志等级 //在write函数中已经被调用
void Log::AppendLogLevelTitle_(int level) {switch(level) {case 0:buff_.Append("[debug]: ", 9);break;case 1:buff_.Append("[info] : ", 9);break;case 2:buff_.Append("[warn] : ", 9);break;case 3:buff_.Append("[error]: ", 9);break;default:buff_.Append("[info] : ", 9);break;}
}//像这个不需要调用都是在宏定义里面
int Log::GetLevel() {lock_guard<mutex> locker(mtx_);return level_;
}//这个也只有在测试的时候才会调用
void Log::SetLevel(int level) {lock_guard<mutex> locker(mtx_);level_ = level;
}