linux_0">【linux】进程池
- 线程池:
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着 监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利 用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
#include <functional>
namespace ThreadModule
{using func_t = std::function<void(std::string)>;class Thread{public:Thread(func_t func, std::string threadname = "none_name") // 为什么--常量引用和非常量引用的概念: _func(func), _threadname(threadname), _stop(true){}void Excute(){_func(_threadname);}static void *handler(void *args){Thread *self = static_cast<Thread*>(args);self->Excute();return nullptr;}bool Start(){int ret = pthread_create(&_tid, nullptr, handler, this);if (ret == 0){_stop = false;return true;}else{return false;}}void Join(){if (!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}~Thread() {}private:std::string _threadname;pthread_t _tid;func_t _func;bool _stop;};
}
#endif
#ifndef __THREAD_POOL_HPP__
#define __THREAD_POOL_HPP__
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
#include <vector>
#include "log.hpp"
#include <queue>
#include "Thread.hpp"
#include <functional>
using namespace ThreadModule;
const int def_thread_sum = 3; // 默认线程数量template <typename T>
class thread_pool
{
public:void Lock_queue(){pthread_mutex_lock(&_mutex);}void UnLock_queue(){pthread_mutex_unlock(&_mutex);}void Wait_thread(){pthread_cond_wait(&_cond, &_mutex);}void Wake_up_thread(){pthread_cond_signal(&_cond);}void Wake_up_allthread(){pthread_cond_broadcast(&_cond);}public:thread_pool(int threadsum = def_thread_sum): _threadsum(threadsum), _isruning(false), _waitsum(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}void HandlerTask(std::string name)// 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用!{while (true){// 1. 保证队列安全Lock_queue();// 当任务队列为空,线程运行时// 2. 队列中不一定有数据while (_Task_queue.empty() && _isruning) // 当任务队列为空,有线程在运行,每个被唤醒的线程都要重新判断条件{_waitsum++; // 等待的线程数量加一Wait_thread(); // 将线程加入等待队列中,解锁--等待被唤醒--加锁_waitsum--;}// 2.1 如果线程池已经退出了 && 任务队列是空的if (_Task_queue.empty() && !_isruning){UnLock_queue();break;}// 2.2 如果线程池不退出 && 任务队列不是空的// 2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后在退出// 3. 一定有任务, 处理任务// 一定有任务T t = _Task_queue.front(); // 取出任务_Task_queue.pop();UnLock_queue();LOG(DEBUG, "%s get a task", name.c_str());//打印日志// 4. 处理任务,这个任务属于线程独占的任务t();// 执行任务LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.Result_tostring().c_str());}}void Init_thread_pool(){// 指向构建出所有的线程,并不启动for (int i = 0; i < _threadsum; i++){std::string name = "thread-" + std::to_string(i + 1);_thread_pool.emplace_back(std::bind(&thread_pool::HandlerTask, this, std::placeholders::_1), name);LOG(INFO, "init thread %s done", name.c_str());//打印日志}_isruning = true;}void Start_threads(){for (auto &thread : _thread_pool){thread.Start();}}bool Enqueue(T &in){bool ret = false;if (_isruning){Lock_queue();_Task_queue.push(in);if (_waitsum > 0) //等待队列中的线程数量大于0{Wake_up_thread();//唤醒等待队里中的一个线程}LOG(DEBUG, "enqueue task success");//打印日志ret = true;UnLock_queue();}return ret;}void WaitAllThread(){for (auto &thread : _thread_pool){thread.Join();//等待所有线程LOG(INFO, "%s is quit...", thread.name().c_str());//打印日志}}void Stop(){Lock_queue();_isruning = false;Wake_up_allthread();UnLock_queue();}~thread_pool(){pthread_mutex_destroy(&_mutex);//释放互斥量pthread_cond_destroy(&_cond);//释放条件变量}private:int _threadsum; //线程数量std::vector<Thread> _thread_pool;//线程池结构std::queue<T> _Task_queue;//任务队列pthread_mutex_t _mutex;//互斥量pthread_cond_t _cond;//条件变量bool _isruning;//正在运行的队列int _waitsum;//正在等待的线程的数量
};
#endif
#pragma once
#include <iostream>
#include<string>class Task
{
public:Task(int a, int b) : _a(a), _b(b){}void Excute(){_result = _a + _b;}void operator()(){_result = _a + _b;}std::string Result_tostring(){return std::to_string(_a)+" + " +std::to_string(_b)+" = "+std::to_string(_result);}std::string Debug_tostring(){return std::to_string(_a)+" + "+std::to_string(_b)+" = ?";}~Task(){}private:int _a;int _b;int _result;
};
#pragma once
#include <iostream>
#include <stdarg.h>
#include <fstream>
#include "LockGuard.hpp"
const static char *logname = "log.txt";//日志文件
bool g_save = false;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
enum level//日志的等级
{DEBUG = 0,INFO,WARNING,ERROR,FATAL
};
void save_file(const std::string &logname, std::string &massage)//保存日志到文件中
{std::ofstream infile("logname", std::ios::app);if (!infile.is_open()){return;}infile << massage << std::endl;infile.close();
}
// 获取日志等级
std::string get_level_string(int level)
{switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "None";}
}
// 获取时间字符串
std::string get_time_string()
{time_t cur_time = time(nullptr);if (cur_time == (time_t)-1){printf("Failed to get the current time.\n");return "None";}struct tm *formate_time = localtime(&cur_time);if (formate_time == nullptr){return "None";}char buffer[1024];snprintf(buffer, sizeof(buffer), "%d-%d-%d %d:%d:%d",formate_time->tm_year + 1900,formate_time->tm_mon + 1,formate_time->tm_mday,formate_time->tm_hour,formate_time->tm_min,formate_time->tm_sec);return buffer;
}
// 日志信息
void Log_inf(std::string filename, int line, bool is_save, int level, const char *format, ...)
{std::string levelstr = get_level_string(level);std::string time = get_time_string();pid_t selfid = getpid();char buffer[1024];va_list arg;va_start(arg, format);vsnprintf(buffer, sizeof(buffer), format, arg);va_end(arg);std::string massage = "[" + time + "]" + "[" + levelstr + "]" + "[" + std::to_string(selfid) + "]" + "[" + filename + "]" + "[" + std::to_string(line) + "]" + buffer + "\n";LockGuard lockguard(mutex); // RAIIif (is_save){// 保存到文件中save_file(logname, massage);}else{ //向屏幕中打印std::cout << massage;}
}// 定义宏
#define LOG(level, format, ...) \do \{ \Log_inf(__FILE__, __LINE__, g_save, level, format, ##__VA_ARGS__); \} while (0)
#define Enablefile() \do \{ \g_save = true; \} while (0)
#define EnableScrean() \do \{ \g_save = false; \} while (0)
#include <iostream>
#include "ThreadPool.hpp"
#include "Thread.hpp"
#include"Task.hpp"
#include<memory>
#include"log.hpp"
using namespace ThreadModule;
int main()
{Enablefile();std::unique_ptr<thread_pool<Task>> tp = std::make_unique<thread_pool<Task>>(5);tp->Init_thread_pool();tp->Start_threads();int cnt = 10;srand((signed)time(NULL));while(cnt--){int frist = rand()%10+1;usleep(1234);int second = rand()%10+1;Task t(frist,second);LOG(INFO,"main send Task is :%s", t.Debug_tostring().c_str());//打印日志tp->Enqueue(t);sleep(2);}tp->Stop();tp->WaitAllThread();LOG(INFO,"main thread is quit ... \n");//打印日志return 0;
}
[!IMPORTANT]
【Linux】线程篇完结!!,下一篇【Linux】网络篇