简介
moduo实现了通用业务线程池,其任务使用无参的函数对象,其任务队列使用的是有界的队列
结构
线程池
线程池属性
有线程池名name_
,任务队列的最大数maxQueueSize_
,线程池的运行状态running_
,工作线程启动时的初始化函数threadInitCallback_
void setMaxQueueSize(int maxSize)
{ maxQueueSize_ = maxSize;
}void setThreadInitCallback(const Task& cb)
{ threadInitCallback_ = cb;
}ThreadPool::ThreadPool(const string& nameArg): mutex_(),notEmpty_(mutex_),notFull_(mutex_),name_(nameArg),maxQueueSize_(0),running_(false)
{
}const string& name() const{ return name_; }
线程池的开启和结束
开启是start
,结束是stop
,向任务队列中提交任务是run
start
设置线程池的运行状态running_
为true,调整线程池的容量,并且创建指定numThreads
个线程,工作线程的执行函数为runInThread
,工作线程先执行线程初始化函数,然后从任务队列中取出任务执行
void ThreadPool::start(int numThreads)
{running_ = true;threads_.reserve(numThreads);for (int i = 0; i < numThreads; ++i){char id[32];snprintf(id, sizeof id, "%d", i+1);threads_.emplace_back(new muduo::Thread(std::bind(&ThreadPool::runInThread, this), name_+id));threads_[i]->start();}if (numThreads == 0 && threadInitCallback_){threadInitCallback_();}
}void ThreadPool::runInThread()
{try{if (threadInitCallback_){threadInitCallback_();}while (running_){Task task(take());if (task){task();}}}catch (const Exception& ex){fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());fprintf(stderr, "stack trace: %s\n", ex.stackTrace());abort();}catch (const std::exception& ex){fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());abort();}catch (...){fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());throw; // rethrow}
}ThreadPool::Task ThreadPool::take()
{MutexLockGuard lock(mutex_);// always use a while-loop, due to spurious wakeupwhile (queue_.empty() && running_){notEmpty_.wait();}Task task;if (!queue_.empty()){task = queue_.front();queue_.pop_front();if (maxQueueSize_ > 0){notFull_.notify();}}return task;
}
stop
设置线程池运行状态为false,同时唤醒队列的两个条件变量,等待所有工作线程执行完成
void ThreadPool::stop()
{{MutexLockGuard lock(mutex_);running_ = false;notEmpty_.notifyAll();notFull_.notifyAll();}for (auto& thr : threads_){thr->join();}
}
run
提交任务到工作队列中,如果队列满则等待直到队列有空闲空间,将任务放入工作队列后,唤醒工作线程
void ThreadPool::run(Task task)
{if (threads_.empty()){task();}else{MutexLockGuard lock(mutex_);while (isFull() && running_){notFull_.wait();}if (!running_) return;assert(!isFull());queue_.push_back(std::move(task));notEmpty_.notify();}
}
线程
线程初始化
设置状态变量,线程执行函数,线程默认名,默认是Thread
+numCreated_
Thread::Thread(ThreadFunc func, const string& n): started_(false),joined_(false),pthreadId_(0),tid_(0),func_(std::move(func)),name_(n),latch_(1)
{setDefaultName();
}void Thread::setDefaultName()
{int num = numCreated_.incrementAndGet();if (name_.empty()){char buf[32];snprintf(buf, sizeof buf, "Thread%d", num);name_ = buf;}
}
线程启动
ThreadData
作为线程数据传到创建线程时的参数中,主要包含线程执行函数,线程名,线程id以及创建线程与线程执行同步变量
启动线程的函数为startThread
,其会调用ThreadData
的runInThread
start
启动线程,修改线程状态started_
为true,通过latch_
等待线程启动完成
void Thread::start()
{assert(!started_);started_ = true;// FIXME: move(func_)detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)){started_ = false;delete data; // or no delete?LOG_SYSFATAL << "Failed in pthread_create";}else{latch_.wait();assert(tid_ > 0);}
}void* startThread(void* obj)
{ThreadData* data = static_cast<ThreadData*>(obj);data->runInThread();delete data;return NULL;
}struct ThreadData
{typedef muduo::Thread::ThreadFunc ThreadFunc;ThreadFunc func_;string name_;pid_t* tid_;CountDownLatch* latch_;ThreadData(ThreadFunc func,const string& name,pid_t* tid,CountDownLatch* latch): func_(std::move(func)),name_(name),tid_(tid),latch_(latch){ }void runInThread(){*tid_ = muduo::CurrentThread::tid();tid_ = NULL;latch_->countDown();latch_ = NULL;muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str();::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);try{func_();muduo::CurrentThread::t_threadName = "finished";}catch (const Exception& ex){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());fprintf(stderr, "stack trace: %s\n", ex.stackTrace());abort();}catch (const std::exception& ex){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());abort();}catch (...){muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());throw; // rethrow}}
};
线程结束
在线程启动后,没有调用join
默认是分离的线程
join
等待线程执行结束
Thread::~Thread()
{if (started_ && !joined_){pthread_detach(pthreadId_);}
}int Thread::join()
{assert(started_);assert(!joined_);joined_ = true;return pthread_join(pthreadId_, NULL);
}
线程属性
线程运行状态started_
,线程idtid_
,线程名name_
bool started() const
{ return started_;
}pid_t tid() const
{ return tid_;
}const string& name() const
{ return name_;
}