文章目录
- 前言
- 代码
- 1. Socket
- 1.1 Socket
- 1.2 SSocket
- 1.3 CSocket
- 2. Poller
- 3. Channel
- 4. Acceptor
- 5. Time
- 5.1 TimerTask
- 5.1 TimerWheel
- 6. Buffer
- 7. Any
- 8. Connection
- 9. Loop
- 9.1 EventLoop
- 9.2 Thread
- 10. TcpServer
- 尾序
前言
在上一篇文章当中对本项目的框架做了一个整体的介绍(感兴趣的读者可从本专栏中进行查看),不过并没有进行实现和详细地说明,那么本篇文章将从代码的角度呈现一个更为完整的服务器模块,同时结合上篇文章对服务器模块有一个更加深刻的理解。话不多说,不过还是要说明一点,本项目是一个基于C++代码实现的,如果对C++的基本语法和面向对象的思想,以及系统IO不了解的话,慎入!
代码
Gitee链接:https://gitee.com/shunhua-xiangyang/practical-projects/tree/master/imitate_mudo_concurrent_server/source/Server
说明:如下代码更准确的说应该是伪代码,目的是方便大家阅读,其中省略了一些不必要的代码,因此能看可能不能跑^ _ ^ ,如果需要能跑的可自行在Gitee中进行查看,这样写的目的是避免自己偷懒水博客,不当CV工程师,将思路更好地理顺进而呈现出来^ _ ^。
1. Socket
先交代两个杂七杂八的事情,即信号的设置,所需的头文件。
- 头文件
//套接字相关的定义和函数
#include <sys/types.h>
#include <sys/socket.h>
//Internet地址族的相关结构
#include <netinet/in.h>
//地址转换
#include <arpa/inet.h>
//文件描述符的操作
#include <unistd.h>
#include <fcntl.h>
- 忽略管道破裂信号,避免服务进程异常退出。
//防止进程异常退出
struct Attribution
{Attribution(){//忽略管道破裂信号。signal(SIGPIPE,SIG_IGN);}
};
//在main启动之前进行初始化,类似于饿汉模式。
Attribution sets;
1.1 Socket
Socket子模块,完成简单的网络套接字的创建和关闭。
class Socket
{
public:Socket(const std::string& ip = "",uint16_t port = -1,int fd = -1):_ip(ip),_port(port),_fd(fd){}//创建套接字void Create(){_fd = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);//AF_INET,使用的ipv4的网络类型。//SOCK_STREAM,套接字类型,即连接,可靠,全双工,字节流的套接字。//IPPROTO_TCP,传输层协议,TCP。}//关闭套接字void Close(int fd){if(fd >= 0){close(fd);}}void Close(){Close(_fd);_fd = -1;}//获取文件描述符int GetFd(){return _fd;}//设置套接字模式为非阻塞,方便提高IO通信的效率void SetNoBlock(int fd){int flag = fcntl(fd,F_GETFL);if(fcntl(fd,F_SETFL,flag | O_NONBLOCK) < 0){//出错return;}}//网络序列信息的转换sockaddr_in MesToInet(){sockaddr_in meg;socklen_t len = sizeof(meg);memset(&meg,0,sizeof(meg));//网络协议meg.sin_family = AF_INET;//端口号meg.sin_port = htons(_port);//ip地址if(!inet_aton(_ip.c_str(),&(meg.sin_addr))){//出错}return meg;}//网络序列转换为当前主机的字节序列std::pair<uint16_t,std::string> InetToMes(sockaddr_in meg){uint16_t port = ntohs(meg.sin_port);std::string ip_str = inet_ntoa(meg.sin_addr);return std::make_pair(port,ip_str); }
private:int _fd;uint16_t _port;std::string _ip;
};
1.2 SSocket
SSocket子模块,服务端监听以及获取连接。
//默认端口号,根据自己的服务器的开放端口自行设置。
const static int default_port = 8000;
struct SSocket : public Socket
{
public://服务端启动时要绑定端口SSocket(const std::string& ip = "0.0.0.0",uint16_t port = default_port):Socket(ip,port){Bind();Listen();}
public://绑定void Bind(){//创建套接字。Create();//设置端口号复用ReusePort();//设置地址复用ReuseAddr();//获取网络信息sockaddr_in sver_msg = MesToInet();socklen_t len = sizeof(sver_msg);//绑定,0 表示成功。if(!bind(GetFd(),(sockaddr*)&sver_msg,len)){//绑定成功}}//监听void Listen(){if(!listen(GetFd(),backlog)){//监听成功}}//接收连接int Accept(){//获取客户端的ip地址和端口号。sockaddr_in cmsg;memset(&cmsg,0,sizeof(cmsg));socklen_t len = sizeof(cmsg);//获取套接字int client_fd = accept(GetFd(),(sockaddr*)&cmsg,&len);return client_fd;}
private:void ReusePort(){int opt = 1; //表示开启。int ret = setsockopt(GetFd(),SOL_SOCKET,SO_REUSEPORT,&opt,sizeof(opt));}void ReuseAddr(){int opt = 1; //表示开启。int ret = setsockopt(GetFd(),SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));}const static int backlog = 8;
};
1.3 CSocket
CSocket子模块,客户端进行连接以及读写数据。
struct CSocket : public Socket
{
public://客户端连接服务端锁定唯一一个进程时,需要ip地址和端口号。CSocket(const std::string& ip,uint16_t port = default_port):Socket(ip,port){}//发起连接bool Connect(){//创建套接字。Create();//获取套接字int fd = GetFd();//获取网络信息sockaddr_in sver_msg = MesToInet();socklen_t len = sizeof(sver_msg);//发起连接int ret = connect(fd,(sockaddr*)&sver_msg,len);if(-1 == ret){//连接失败return false;}return true;}//发送信息ssize_t Send(int fd,const void* buff,uint64_t len,int flag = 0){ssize_t n = send(fd,buff,len,flag);if(-1 == n){if(errno == EINTR || errno == EAGAIN){//阻断或者无数据可读//说明:EWOULDBLOCK == EAGAIN;return 0;}}return n;}//接收信息ssize_t Recv(int fd,void* buff, size_t len,int flag = 0){ssize_t n = recv(fd,buff,len,flag);if(-1 == n){if(errno == EINTR || errno == EAGAIN){//阻断或者无数据可读//说明:EWOULDBLOCK == EAGAIN;return 0;}return -1;}return n;}//依据于上面两个核心函数实现的函数重载,目的是方便调用,很简单说明一下,,不实现了。。ssize_t Send(int fd,const std::string& str);ssize_t Send(const std::string& str);ssize_t RecvNoBlock(void* buff,size_t len);//传入flag设置为MSG_DONTWAIT//...size_t Recv(int fd,std::string& str,size_t len);size_t Recv(std::string& str,size_t len);ssize_t SendNoBlock(int fd,void* buff, size_t len);//同上//...
};
SCSocket子模块,服务端接收的客户端的连接以及读写数据。
//服务端接收连接的套接字
struct SCSocket : public Socket
{SCSocket(int fd):Socket(ip,port,fd){}//发送信息ssize_t Send(int fd,const void* buff,uint64_t len,int flag = 0){ssize_t n = send(fd,buff,len,flag);if(-1 == n){if(errno == EINTR || errno == EAGAIN){//阻断或者无数据可读//说明:EWOULDBLOCK == EAGAIN;return 0;}else{return -1;}}return n;}//接收信息ssize_t Recv(int fd,void* buff, size_t len,int flag = 0){ssize_t n = recv(fd,buff,len,flag);if(-1 == n){if(errno == EINTR || errno == EAGAIN){//阻断或者无数据可读//说明:EWOULDBLOCK == EAGAIN;return 0;}else{return -1;}}return n;}//依据于上面两个核心函数实现的函数重载,目的是方便调用,很简单说明一下,,不实现了。。ssize_t Send(int fd,const std::string& str);ssize_t Send(const std::string& str);ssize_t RecvNoBlock(void* buff,size_t len);//传入flag设置为MSG_DONTWAIT//...size_t Recv(int fd,std::string& str,size_t len);size_t Recv(std::string& str,size_t len);ssize_t SendNoBlock(int fd,void* buff, size_t len);//同上//...
};
补充:相关的接口的详细说明可使用man命令进行查看,或者翻博主写的【Linux进阶之路】Socket —— “UDP“ && “TCP“以及【Linux进阶之路】高级IO,电脑浏览器可按下Ctrl + F可快速查阅,因此就不在本文中赘述了。
2. Poller
#pragma once
//容器
#include<unordered_map>
#include<vector>
//智能指针
#include<memory>
//文件IO
#include<unistd.h>
//epoll
#include<sys/epoll.h>
class Poller
{
public: Poller(){_efd = epoll_create(DefaultSize);memset(_evs,0,sizeof(_evs));}//更新void UpDate(Channel* cel){if(Is_ADD(cel)){UpDate(cel,EPOLL_CTL_MOD);}else{//添加到哈希表中_chs.insert({cel->Fd(),cel->Get()});UpDate(cel,EPOLL_CTL_ADD);}}//移除void Remove(Channel* cel){auto it = _chs.find(cel->Fd());if(it != _chs.end()){//将其从epoll中删除UpDate(cel,EPOLL_CTL_DEL);_chs.erase(it);}}//获取活跃连接std::vector<std::shared_ptr<Channel>> Wait(){int n = epoll_wait(_efd,_evs,MAX_EPOLLEVENTS,-1);if(-1 == n){if(errno == EINTR || errno == EPIPE){//阻断或者非阻塞return {};}}std::vector<std::shared_ptr<Channel>> actives;for(int i = 0; i < n; i++){int fd = _evs[i].data.fd;//就绪的事件int event = _evs[i].events;if(_chs.count(fd)){//设置就绪的事件_chs[fd]->SetRevent(event);actives.push_back(_chs[fd]);}}//右值拷贝return actives;}
private://直接进行监控并更新操作void UpDate(Channel* cel,int opt){struct epoll_event event;//初始化memset(&event,0,sizeof(event));event.data.fd = cel->Fd(); event.events = cel->GetMoniter();//要监控的事件epoll_event* eptr = (opt == EPOLL_CTL_DEL) ? nullptr : &event;//设置进内核的红黑树结构if(-1 == epoll_ctl(_efd,opt,fd,eptr)){//错误}}//检测是否被添加bool Is_ADD(Channel* cel){return _chs.count(cel->Fd());}
private:enum{MAX_EPOLLEVENTS = 1024,DefaultSize = 1};int _efd;struct epoll_event _evs[MAX_EPOLLEVENTS];using cel_t = std::shared_ptr<Channel>;std::unordered_map<int,cel_t> _chs;
};
说明:在博主的此篇文章中实现过类似的——【Linux进阶之路】高级IO,接口有着详细介绍,此处是一个更加完善,更加贴合实际的版本。
3. Channel
- 说明: Channel在实际使用的时候一般来说是在堆上开辟的,即new出一个Channel对象,因此使用shared_ptr进行管理,但是在使用的过程无法通过原始指针获取到shared_ptr对象,进而再度构造,导致段错误的现象,为了避免这种现象C++使用enable_shared_from_this模版来避免这个问题,原理是内部保存一个weak_ptr,下面给出一段样例代码,方便大家理解。
#include <iostream>
#include <memory>class MyClass : public std::enable_shared_from_this<MyClass> {
public:void show() {std::cout << "MyClass instance at " << this << std::endl;}std::shared_ptr<MyClass> getSharedPtr() {return shared_from_this();}
};
int main()
{std::shared_ptr<MyClass> sptr1(new MyClass);std::shared_ptr<MyClass> sptr2(sptr1.get());//errorstd::shared_ptr<MyClass> sptr3 = sptr1->getSharedPtr();std::cout << sptr2.use_count() << std::endl;//1std::cout << sptr3.use_count() << std::endl;//2//最后会出现Segmentation fault (core dumped)现象,因为sptr2进行构造导致引用计数不正确,最终导致资源多释放了一回。return 0;
}
#pragma once
//bind
#include<functional>
//智能指针
#include<memory>
class EventLoop;
class Channel:public std::enable_shared_from_this<Channel>
{using evt_cb_t = std::function<void()>;
public://构造Channel(EventLoop* loop,int fd):_loop(loop),_fd(fd),_revents(0),_mevents(0){}//获取智能指针std::shared_ptr<Channel> Get(){return shared_from_this();}//设置回调void SetRead(const evt_cb_t& rd){_rd = rd;}void SetWrite(const evt_cb_t& wt){_wt = wt;}void SetError(const evt_cb_t& err){_er = err;}void SetClose(const evt_cb_t& clo){_clo = clo;}void SetNor(const evt_cb_t& nor){_nor = nor;}//清理回调与监控信息void Clear(){_rd = _wt = _clo = _er = _nor = nullptr;_mevents = _revents = 0;_fd = -1;}//取消void RemoveRead(){_rd = nullptr;}void RemoveWrite(){_wt = nullptr;}//处理void Handle(){//算术 位运算 关系 按位 逻辑 赋值//异常if(_er != nullptr && (_revents & EPOLLERR)){_er();} //连接关闭else if(_clo != nullptr && (_revents & EPOLLHUP)){_clo();} else {//读if(_rd != nullptr && (IsRead()) && ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))){//连接活跃,更新活跃时间Flush();_rd();}//写else if(_wt != nullptr && (IsWrite()) &&(_revents & EPOLLOUT)){_wt();}} }//打开监控,epoll内核结构也要进行打开void OpenWrite(){_mevents |= (EPOLLOUT),Update(); }void OpenRead(){_mevents |= EPOLLIN,Update(); }//关闭,epoll内核结构也要进行关闭。void CloseWrite(){_mevents &= (~EPOLLOUT),Update();}void CloseRead(){_mevents &= ~EPOLLIN,Update();}void CloseAll(){_mevents = 0,Update()}//检测bool IsRead(){return _mevents & EPOLLIN;}bool IsWrite(){return _mevents & EPOLLOUT;}//描述符int Fd(){return _fd;};//获取监控事件int GetMoniter(){return _mevents;}//设置已经就绪的事件void SetRevent(uint32_t revs){_revents = revs;}//更新内核结构的事件结构:EventLoop -> Poller -> Update / Removevoid Update();void Remove();
private://刷新一下事件bool Flush(){if(_nor != nullptr) _nor();return true;}EventLoop* _loop; //绑定EventLoop即绑定线程。int _fd;uint32_t _mevents; //预期监控事件uint32_t _revents; //实际就绪事件//回调函数evt_cb_t _rd; //读evt_cb_t _wt; //写evt_cb_t _er; //错误evt_cb_t _clo;//关闭evt_cb_t _nor;//任意事件
};
4. Acceptor
#pragma once
//bind
#include<functional>
//智能指针
#include<memory>
class Acceptor
{using apt_cb = std::function<void(int)>; //连接处理回调
public://port:8000是博主的服务器设置的默认端口,自己还需查看开放端口进行设置。Acceptor(EventLoop* loop,int port = 8000):_loop(loop),_lskt("0.0.0.0",port),_cel(new Channel(loop,_lskt.GetFd())){//绑定读事件回调_cel->SetRead(std::bind(&Acceptor::ReadHander,this));}//设置连接处理函数void SetAcceptCb(const apt_cb& aptcb){_accept_cb = aptcb;//只有连接处理回调设置了才能打开读事件,否则会导致连接没有被正常管理,进而连接异常。_cel->OpenRead();}
private://获取与处理连接void ReadHander(){int cfd = _lskt.Accept();if(_accept_cb) _accept_cb(cfd);}
private:EventLoop* _loop;SSocket _lskt;//服务器进行监听的套接字std::shared_ptr<Channel> _cel;apt_cb _accept_cb = nullptr;
};
5. Time
5.1 TimerTask
定时任务模块,即完成对任务的执行,取消,资源的清理功能,是一个由shared_ptr管理的资源对象。
using TaskFunc = std::function<void()>;
using DelFunc = std::function<void()>;
class TimerTask
{
public:TimerTask(uint64_t id,uint32_t delay,TaskFunc cb):_id(id),_timeout(delay),_cancel(true),_task_cb(cb){}void SetDel(DelFunc cb){_del_cb = cb;}void Cancel(){_cancel = false;}uint32_t GetDelay(){return _timeout;}~TimerTask(){if(true == _cancel){_task_cb();}_del_cb();}
private:uint64_t _id; //设置id方便进行查找uint32_t _timeout;//设置超时时间bool _cancel;//是否取消定时任务TaskFunc _task_cb;//任务执行的回调DelFunc _del_cb;//资源释放的回调,从TimerWheel的结构中移除对应的信息。
};
5.1 TimerWheel
时间轮模块,即实现定时功能,进而完成对任务的添加,刷新,取消。
- 技术核心:时间轮 + 智能指针,即vector<shared_ptr<TimerTask>>。
- 原理:在本项目中时间轮设置为60秒,即一个容量为60的vector,当任务活跃时重新计算定时任务在vector中的位置,运行过程中每一秒种定时刷新一次时间轮,同时清空里面的定时任务,最终当shared_ptr的引用计数为0时,调用TimerTask的析构函数,执行定时任务。
- 系统调用接口:timerfd,获取当前距离上一次刷新的秒数。
#include <sys/timerfd.h>
int timerfd_create(int clockid, int flags);
/*
参数:
1.设置的时间标准,例如CLOCK_REALTIME为实时时钟,易收到系统时间更改的影响,CLOCK_MONOTONIC为单调时钟,不易受到系统时间更改的影响
比较适合测量时间的间隔,还有CLOCK_PROCESS_CPUTIME_ID,CLOCK_THREAD_CPUTIME_ID分别用于测量进程和线程的CPU时间。
2.设置属性,一般设置为0。
返回值:
1.成功返回创建的文件描述符timer_fd。
2.失败返回-1,并设置合适的错误码。
*/
int timerfd_settime(int fd, int flags,const struct itimerspec *new_value,struct itimerspec *old_value);
/*
参数:
1.创建的timer_fd
2.设置属性,一般设置为0。
3.输入型参数,用于设置间隔时间对象。
4.输出型参数,用于获取上次设置的间隔时间对象。
返回值:
1.成功返回创建的文件描述符。
2.失败返回-1,并设置合适的错误码。
*/
- 拓展:本项目还可以设置一个24小时60分钟60秒的时间轮,思路一致,感兴趣的可自行思考实现或者与博主私信交流。
//整形相关的类型
#include<cstdint>
//bind
#include<functional>
//智能指针
#include<memory>
//容器
#include<vector>
#include<unordered_map>
//文件描述符
#include<unistd.h>
#include<sys/timerfd.h> //timerfd
class EventLoop;
//根据TimerTask任务,封装一个时间轮
class TimerWheel
{
public:TimerWheel(EventLoop *loop):_capacity(60),_tick(0),_wheel(_capacity),_tfd(CreateTFd()),_loop(loop),_tcel(new Channel(_loop,_tfd)){_tcel->SetRead(std::bind(&TimerWheel::OnTime,this));_tcel->OpenRead();}//说明://1.任务统一是在线程中运行的,为了防止多线程对连接操作导致的线程安全,放到EventLoop中串行化执行可避免此问题。//2.在实际执行的就绪任务队列时可能由于前面的任务没有及时地执行导致刷新不及时或者没有释放,因此解决此类问题可将时间轮放到与接收连接类似执行// 轻量化任务的线程中或者自己再创建一个线程专门执行此任务。//添加任务void TimerAdd(uint64_t id,uint32_t delay,TaskFunc cb);//取消任务void CancelTask(uint64_t id);//刷新任务void TimerRefresh(uint64_t id);//判断任务bool HasTimer(uint64_t id){return _hash.count(id);}
private://InLoop系列主要是将任务放到指定的线程的任务池中,方便其执行。//添加void TimerAddInLoop(uint64_t id,uint32_t delay,TaskFunc cb){//将任务new出来赋值给 TimerPtrTimerPtr task(new TimerTask(id,delay,cb));//设置资源释放函数task->SetDel(std::bind(&TimerWheel::SourceClear,this,id));//将任务添加到哈希表中_hash.insert({id,task});//将任务添加到tick指向的数组当中int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(task);}//取消void CancelTaskInLoop(uint64_t id){auto it = _hash.find(id);if(it != _hash.end()){TimerPtr task = it->second.lock();if(task.get() != nullptr){task->Cancel();}else{_hash.erase(id);}}}//刷新void TimerRefreshInLoop(uint64_t id){auto it = _hash.find(id);if(it != _hash.end()){//获取对象并进行拷贝TimerPtr task = it->second.lock();int pos = (task->GetDelay() + _tick) % _capacity;_wheel[pos].push_back(task);return;}}//资源清理void SourceClear(u_int64_t id){auto it = _hash.find(id);if(it == _hash.end()){//说明不存在return;}else{_hash.erase(it);}}//timerfd系列//创建timerfdstatic int CreateTFd(){int tfd = timerfd_create(CLOCK_MONOTONIC,0); //计时标准为相对时间//CLOCK_REALTIMEif(tfd < 0){//创建失败}struct itimerspec new_time;memset(&new_time, 0, sizeof(new_time));//到期时间为1s后new_time.it_value.tv_sec = 1;//每次时间间隔为1snew_time.it_interval.tv_sec = 1;int ret = timerfd_settime(tfd,0,&new_time,nullptr);return tfd;}//获取到期次数int TimerRead(){uint64_t val = 0;int ret = read(_tfd,&val,sizeof(val));return val;}//更新表盘void TimerUpdate(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear();//清理调用析构的同时,执行任务或者将引用计数减1。} //定时刷新void OnTime(){//可能没有及时地刷新,导致times不为1int times = TimerRead();for(int i = 0; i < times; i++){TimerUpdate();}}using TimerPtr = std::shared_ptr<TimerTask>;using ValPtr = std::weak_ptr<TimerTask>;uint32_t _capacity; //设置时间轮的最大超时时间,也就是表盘。uint32_t _tick; //秒针的实时走向std::vector<std::vector<TimerPtr>> _wheel; //设置为秒级的时间轮//因为要维护一个非活跃连接的更新,所以要建立一个哈希表方便进行查找std::unordered_map<uint64_t,ValPtr> _hash;int _tfd; //timerfd,便于计时从而定时更新。EventLoop* _loop;//回调指针,所有的定时任务都是要在EventLoop下进行的。std::shared_ptr<Channel> _tcel;//timewheel任务对象
};
6. Buffer
思路图解
- 读偏移向左移动时,即数据写入Buffer时空间不够,将可读数据拷贝到开头,即紫色部分,然后扩容至所需大小;读偏移向右移动时,即从数据Buffer中读取出来,需注意读偏移不能超过写偏移的位置。
- 写偏移向左移动与读偏移向左移动向左移动的情况相同,写偏移向有移动,即向Buffer写入数据。
涉及接口
- copy
#include <algorithm>
template <class InputIt, class OutputIt>
OutputIt copy(InputIt first, InputIt last, OutputIt d_first);
//将[first,last)位置的数据拷贝到以d_first为起始位置的地方,一般来说迭代器类型为char*,即拷贝字符串或者按字节拷贝。
- memchr
#include <string.h>
void *memchr(const void *s, int c, size_t n);
//在以s位置为起点向后查找长度为n,看是否存在字符c。
//读写
#include<unistd.h>
class Buffer
{
public://构造Buffer():_rd_idx(0),_wd_idx(0){}//读void Read(void* buffer,uint64_t len){_Read(buffer,len);MoveReadIdx(len);}//读len长度的stringstd::string Read(uint64_t len){std::string str;str.resize(len);Read(&str[0],len);return str;}//读一行std::string GetLine(){ char* crlf = (char*)memchr(begin() + _rd_idx,'\n',ReadableSize());if(nullptr == crlf) return "";uint64_t len = crlf - (begin() + CurRpos()) + (uint64_t)1;std::string res = Read(len); return res;}//所有数据读出std::string Read(){return Read(ReadableSize());}//移动读偏移void MoveReadIdx(uint64_t len){if(len <= ReadableSize()){_rd_idx += len;}}//写void Write(const void *buffer,uint64_t len){_Write(buffer,len);MoveWriteIdx(len);}void Write(const std::string& str){Write(&str[0],str.size());}//移动写偏移void MoveWriteIdx(uint64_t len){if(len <= TailLdleSize()){_wd_idx += len;}}//获取到可读位置char *rdpos(){return &_buff[CurRpos()];}//获取可读大小uint64_t ReadableSize(){return _wd_idx - _rd_idx;}uint64_t Size(){return ReadableSize();}//复位void Clear(){_wd_idx = _rd_idx = 0;}//检测是否为空bool Empty(){return 0 == ReadableSize();}
private:void _Read(void* buffer,uint64_t len){if(len <= ReadableSize()){//无法检测传入的指针是否可靠。char* beg = begin() + _rd_idx;char* end = beg + len;if(buffer){std::copy(beg,end,(char*)buffer);}}}void _Write(const void* buffer,uint64_t len){//确保有足够的位置写EnsureWriteSpace(len);const char* beg = static_cast<const char*>(buffer);const char* end = beg + len;std::copy(beg,end,begin() + CurWpos());}//获取读写位置uint64_t CurRpos(){return _rd_idx;}uint64_t CurWpos(){return _wd_idx;}//获取空闲位置uint64_t TailLdleSize(){return _buff.size() - _wd_idx;}uint64_t HeadLdleSize(){return _rd_idx;}//扩容,保证有足够的长度放进去数据void EnsureWriteSpace(uint64_t len){//空闲空间足够if(len <= TailLdleSize() + HeadLdleSize()){//移动int sz = ReadableSize();std::copy(_buff.begin() + _rd_idx,_buff.begin() + _wd_idx, begin());_rd_idx = 0;_wd_idx = sz;}else{_buff.resize(_wd_idx + len);}}char* begin(){return &_buff[0];}
private:std::vector<char> _buff; //缓存区uint64_t _rd_idx; //读偏移uint64_t _wd_idx; //写偏移
};
7. Any
class Any final
{
public://默认构造Any():_cot(nullptr){}//拷贝构造Any(const Any& val):_cot(nullptr == val._cot ? nullptr : val._cot->clone()){}//模版构造template<class T>Any(const T& val):_cot(new placeholer<T>(val)){}//析构~Any(){delete _cot;};//赋值Any& operator =(const Any& val){Any(val).swap(*this);return *this;}//模版赋值template<class T>Any& operator=(const T& val){Any(val).swap(*this);return *this;}//交换Any& swap(Any& val){std::swap(_cot,val._cot);return *this;}//取值函数template<class T>T* get(){if(typeid(T) != _cot->get_type()){//错误}using holer_ptr = placeholer<T>*;holer_ptr to_child = (holer_ptr)(_cot);if(nullptr == to_child){//错误 }return &(to_child->_val);}private://父类,接口继承,多态封装struct holder{//析构函数,虚函数virtual ~holder(){};//克隆函数,此处为纯虚函数,目的是子类强制重写virtual holder* clone() = 0;//获取类型,方便之后的检测virtual const std::type_info& get_type() = 0;};//子类,模版实现template<class T>struct placeholer : public holder{placeholer(const T& val):_val(val){}//重写函数holder* clone(){return new placeholer(_val);};const std::type_info& get_type(){return typeid(T);};T _val;};//父类指针,指向holder * _cot;
};
- Any类在本项目的用处为当做应用层协议的上下文和进行丝滑地切换应用层协议。
- 实现的核心思想为多态,模版,封装类,即父类为抽象类,模版的子类强制重写父类的接口,在Any类中使用父类指针成员进行封装通过向下转换完成对任意成员的赋值,拷贝,获取功能。
说明:Any类的实现是参考C++17中的any类进行实现的,感兴趣可自行查看文档——https://c-cpp.com/cpp/utility/any.html
8. Connection
//bind
#include<functional>
//智能指针
#include<memory>
//定义状态基,用于判断连接状况,分别为连接关闭,连接待关闭,连接待建立,连接已建立。
typedef enum{DISCONNECTED,DISCONNECTING,CONNECTING,CONNECTED}ConSta;
class Connection;
//管理连接的智能指针类型
using ConPtr = std::shared_ptr<Connection>;
//连接初始化
using con_cb_t = std::function<void(const ConPtr&)>;
//消息处理
using msg_cb_t = std::function<void(const ConPtr&,Buffer*)>;
//关闭连接
using clo_cb_t = std::function<void(const ConPtr&)>;
//记录上下文处理过程
using txt_cb_t = std::function<void(const ConPtr&)>;
//服务器组件清理
using svr_cb_t = std::function<void(const ConPtr&)>;
//任意事件回调
using evt_cb_t = std::function<void(const ConPtr&)>;class Connection : public std::enable_shared_from_this<Connection>
{
public://获取信息接口,比如文件描述符,连接ID,获取上下文,判断连接状态int Fd(){return _fd;}int Id(){return _conid;}Any* GetTxt(){return &_text;}bool IsConnect(){return _const == CONNECTED;}//回调函数的设置 void SetMsgCb(msg_cb_t msg){_msg = msg;}void SetConCb(con_cb_t con){_con = con;}void SetTxtCb(txt_cb_t txt){_txt = txt;}void SetCloCb(clo_cb_t clo){_clo = clo;}void SetEvtCb(evt_cb_t evt){_evt = evt;}void SetSvrCb(svr_cb_t svr){_svr_clo = svr;}//设置上下文void SetAnyCb(const Any& text){_text = text;} //构造函数Connection(EventLoop* loop,int id,int sockfd):_conid(id),_timid(id)\,_fd(sockfd),_const(CONNECTING),_loop(loop),_cel(new Channel(loop,sockfd)),_skt(sockfd){//设置读,写,关闭,错误,任意事件,但是不打开因为连接还未初始化完毕 _cel->SetRead(std::bind(&Connection::ReadHder,this));_cel->SetWrite(std::bind(&Connection::WriteHder,this));_cel->SetClose(std::bind(&Connection::CloseHder,this));_cel->SetError(std::bind(&Connection::ErrorHder,this));_cel->SetNor(std::bind(&Connection::EventHder,this));}
private://事件就绪自动执行,外部不可见。//Channel类绑定的事件回调,要绑定到Channel内部,比如读,写,关闭,错误,任意事件。void ReadHder(){char buff[65536] = {0};int ret = _skt.RecvNoBlock(buff,sizeof(buff) - 1);if(ret < 0){//进行连接关闭的预处理工作return ShutDown();}else if(ret > 0){//读取到缓存区中_inbuf.Write(buff,ret);//调用信息处理函数ConPtr con = shared_from_this();_msg(con,&_inbuf);}}void WriteHder(){int ret = _skt.SendNoBlock(_outbuf.rdpos(),_outbuf.ReadableSize());if(ret < 0){if(_inbuf.ReadableSize() > 0){//消息处理回调,由程序员自行绑定设置_msg(shared_from_this(),&_inbuf);}return ShutDown();}//注意:将读偏移进行移动。_outbuf.MoveReadIdx(ret);//防止写busy提高效率。if(0 == _outbuf.ReadableSize()){_cel->CloseWrite();} //连接处于关闭状态且输出缓存区的读大小为0,则进行释放if(_const == DISCONNECTING && _outbuf.ReadableSize() == 0){return Realease();}}void EventHder(){//可能会刷新非活跃连接if(_entimrse == true){_loop->TimerRefresh(_timid);}if(_evt)_evt(shared_from_this());}void ErrorHder(){CloseHder();}void CloseHder(){//连接处理完毕进行关闭ShutDown();}
public://数据发送void Send(void* buff,int len){_loop->Run(std::bind(&Connection::SendInLoop,this,buff,len));}void Send(std::string msg){Send((void*)msg.c_str(),msg.size());}//连接建立初始化,由连接的监听线程完成。void Establish(){if(_const == CONNECTING){if(_con != nullptr){_con(shared_from_this());}//连接建立之后才能进行读事件监控_const = CONNECTED;_cel->OpenRead();}}//连接关闭的预处理行为void ShutDown(){_loop->Run(std::bind(&Connection::ShutDownInLoop,this));}//释放连接void Realease(){_loop->Run(std::bind(&Connection::RealeaseInLoop,this));}//用户切换应用层的处理函数。void SwitchTrs(con_cb_t con,msg_cb_t msg,clo_cb_t clo,txt_cb_t txt){_loop->AssertIsIn();_loop->Run(std::bind(&Connection::SwitchTrsInLoop,this,con,msg,clo,txt));}//非活跃连接的关闭和启动void CloTimRse(){_loop->Run(std::bind(&Connection::CloTimRseInLoop,this));}void TimoutRse(int sec){_loop->Run(std::bind(&Connection::TimoutRseInLoop,this,sec));}
private://内部执行,外部不可见,在EventLoop绑定的线程中执行的函数,防止执行流错乱,任务最终都会被Push到指定线程池中运行。//将数据推送到应用层缓冲区void SendInLoop(void* bufer,int len){if(_const == CONNECTED){_outbuf.Write(bufer,len);if(_cel->IsWrite() == false){_cel->OpenWrite();} }}//连接关闭void ShutDownInLoop(){//ReadHder与WriteHder,CloseHder调用此函数,因此连接必须处于就绪状态。if(_const != CONNECTED){return;} //将消息处理完毕if(_inbuf.ReadableSize() > 0 && _msg) {_msg(shared_from_this(),&_inbuf);}if(_outbuf.ReadableSize() > 0 && false == _cel->IsWrite()){_const = DISCONNECTING;_cel->OpenWrite();//WriteHder处理完调用Realease}//消息处理完毕之后调用Realeaseelse{_const = DISCONNECTING;Realease();}}//连接释放void RealeaseInLoop(){//此函数由ShutDownInLoop,WriteHder调用,因此必须处于连接的待释放状态if(_const != DISCONNECTING) return;ConPtr con = shared_from_this();//用户的关闭连接对于信息处理的回调if(_clo){ _clo(con);//此项目中最终设置的clo回调为空。}//服务器组件内部信息清理if(_svr_clo) _svr_clo(con); //非活跃关闭if(_entimrse) CloTimRse();//内部信息处理_cel->Remove();//移除Poller内核结构_cel->Clear();//清理属性信息。_skt.Close();//关闭套接字_const = DISCONNECTED;}//用于切换应用层协议void SwitchTrsInLoop(con_cb_t con,msg_cb_t msg,clo_cb_t clo,txt_cb_t txt){_con = con;_msg = msg;_clo = clo;_txt = txt;}//非活跃连接的关闭和启动void CloTimRseInLoop(){if(_entimrse){_entimrse = false;_loop->TimerCancelTask(_timid);}}void TimoutRseInLoop(int sec){_entimrse = true;if(_loop->HasTimer(_timid)){_loop->TimerRefresh(_timid);}else{_loop->TimerAdd(_timid,sec,std::bind(&Connection::CloseHder,this));}}
private:int _conid;//连接IDint _timid;//定时任务ID与连接ID一致。int _fd;bool _entimrse;//是否打开非活跃连接的释放销毁ConSta _const;//连接状态//封装类SCSocket _skt; //服务端接收的客户端的信息Any _text;//上下文,用于切换应用层协议。EventLoop* _loop;//线程绑定std::shared_ptr<Channel> _cel; //Channel对象用于管理事件监控信息。//应用层的输入输出缓存区Buffer _inbuf;Buffer _outbuf;//连接的处理方法con_cb_t _con = nullptr;//连接的初始化msg_cb_t _msg = nullptr;//消息处理clo_cb_t _clo = nullptr;//应用层对连接的关闭处理txt_cb_t _txt = nullptr; //记录协议上下文的处理过程。svr_cb_t _svr_clo = nullptr; //清理组件内关于连接的信息。evt_cb_t _evt = nullptr;//对标Channel中的_nor
};
说明:
- Connection的回调是用户自行设置的,面向上层,内部Channel的回调是通过Connection内部的接口设置的,面向下层。
- 每个连接都有自己的应用层协议,即对应的回调函数,相应协议的上下文,可随时进行切换。
- 上层连接需要及时地将内核的数据读取,需要接收缓存区;当发送数据内核的缓存区满时,剩下的数据需要存储,需要发送缓冲区。
- 用户调用的发送接口并不是真正的发送,而是将数据写入到发送缓存区,打开读事件监控,通过下层自动发送。
- 事件循环,即Loop与线程绑定,Connection内部存放EventLoop*指针绑定线程,也就是说Connection是与线程绑定的,因此当执行函数时需要将任务放到绑定线程的任务池中,而不能让其它的线程去执行,因此设置Inloop系列的函数。
9. Loop
9.1 EventLoop
说明:
- eventfd,文件描述符,通常用来记录事件发生的次数,此项目用来缓解线程阻塞等待问题。
- 相应的系统调用接口如下。
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
/*
参数:1.初始值,此项目中设置为1,其它项目可按场景设置。2.文件描述符属性设置,如EFD_CLOEXEC,执行 exec 调用时,子进程对应fd应自动关闭。EFD_NONBLOCK,fd设置为非阻塞模式
返回值:1.成功,返回创建的fd。2.失败,返回-1,设置errno。
*/
//bind
#include<functional>
//容器
#include<vector>
//线程
#include<thread>
//锁
#include<mutex>
//智能指针
#include<memory>
//eventfd
#include<sys/eventfd.h>
//事件处理模块,一个线程一个EventLoop
class EventLoop
{using Handler = std::function<void()>;
public:EventLoop():_tid(std::this_thread::get_id()),_evfd(EventFdCreate()),_ev_cl(new Channel(this,_evfd)),_twl(this){//设置读回调,并打开监控。_ev_cl->SetRead(std::bind(&EventLoop::EventRead,this));_ev_cl->OpenRead();}void Start(){while(1){//1.执行监控(wait)std::vector<std::shared_ptr<Channel>> alreadys = _poller.Wait();//2.就绪事件处理(获取IO,处理新到来的任务)for(auto& handler : alreadys) Push(std::bind(&Channel::Handle,handler.get()));//3.执行任务(先处理队列中的任务)RunAll();}}//线程与任务池相关的函数void Push(const Handler& hder)//将执行对象压入到任务池中{//{}锁定临界区,同时说明RAII锁的作用范围{std::unique_lock<std::mutex> lock(_mtx);_tasks.push_back(hder);}//写入一次,触发事件,然后wait就会马上就绪,往下执行,运行任务。EventWeak();}void RunAll()//运行任务池中所有的对象{std::vector<Handler> tsks;{std::unique_lock<std::mutex> lock(_mtx);tsks.swap(_tasks);}for(auto& t : tsks){t();} }void Run(const Handler& hder)//执行传入的线程处理对象或者将其放入到线程中{if(IsIn()){hder();} else{Push(hder);} }bool IsIn()//判断当前线程与预期线程是否一致{return std::this_thread::get_id() == _tid;} //Poller接口的封装void UpDate(Channel* cel)//更新监控对象{_poller.UpDate(cel);}void ReMove(Channel* cel)//移除监控对象{_poller.Remove(cel);}//TimerWheel接口的封装void TimerAdd(uint64_t id,uint32_t delay,TaskFunc cb)//添加定时任务{_twl.TimerAdd(id,delay,cb);}void TimerRefresh(uint64_t id)//刷新定时任务{_twl.TimerRefresh(id);}void TimerCancelTask(uint64_t id)//取消定时任务{_twl.CancelTask(id);}bool HasTimer(uint64_t id)//判断定时任务{return _twl.HasTimer(id);}
private://eventfd的实现接口static int EventFdCreate(){int cnt = 0;int efd = eventfd(cnt,EFD_CLOEXEC | EFD_NONBLOCK);//EFD_CLOEXEC,执行 exec 调用时,子进程对应fd应自动关闭。//EFD_NONBLOCK,fd设置为非阻塞模式return efd;}void EventRead(){uint64_t val = 0;//清空读事件int ret = read(_evfd,&val,sizeof(val));if(-1 == ret){if(errno == EINTR || errno == EAGAIN){return;}}}void EventWeak(){//触发读就绪事件uint64_t val = 0;int ret = write(_evfd,&val,sizeof(val));if(-1 == ret){if(errno == EINTR || errno == EAGAIN){return;}}}
private:int _evfd;//文件描述符管理的一把计数器,用于减缓epoll导致的阻塞。std::shared_ptr<Channel> _ev_cl;//_evfd的事件管理对象std::thread::id _tid; //判断运行的线程是否与预期的线程保持一致std::vector<Handler> _tasks; //任务池,存放任务的执行对象std::mutex _mtx; //互斥锁保证线程安全。Poller _poller; //进行对事件的监控。 TimerWheel _twl;//时间轮,管理定时任务
};//关联实现:
//Channel
void Channel::Update(){_loop->UpDate(this);}
void Channel::Remove(){_loop->ReMove(this);}
//TimerWhell
void TimerWheel::TimerAdd(uint64_t id,uint32_t delay,TaskFunc cb)
{_loop->Run(std::bind(&TimerWheel::TimerAddInLoop,this,id,delay,cb));
}
//刷新任务
void TimerWheel::TimerRefresh(uint64_t id)
{_loop->Run(std::bind(&TimerWheel::TimerRefreshInLoop,this,id));
}
//取消任务
void TimerWheel::CancelTask(uint64_t id)
{_loop->Run(std::bind(&TimerWheel::CancelTaskInLoop,this,id));
}
- 事件循环Eventloop主要是通过绑定线程,让线程不断执行任务池中的任务,除此之外此类还完成对Poller,即对事件监控的类和TimerWheel,即管理定时任务的类,两者的整合。
- 通过thread_id使得别的线程进入Eventloop时无法完成对任务的执行,而是将任务推送到任务池当中,且通过互斥锁使得多线程保持互斥,同时在Push到任务池之后,可通过EventWeak使得正在Start中阻塞的线程立马往下执行,及时执行任务池中的任务。
9.2 Thread
#pragma once
//条件变量
#include<condition_variable>
class LoopThread
{
public:LoopThread():_loop(nullptr),_thd(std::bind(&LoopThread::ThreadEntry,this)){}//用于实际分配任务,保证同步。EventLoop* GetLoop(){EventLoop* loop = nullptr;{std::unique_lock<std::mutex> lk(_mtx);_cv.wait(lk,[&](){return _loop != nullptr;}); //为假进入条件变量进行等待。loop = _loop;}return loop;}
private:void ThreadEntry(){EventLoop loop;{std::unique_lock<std::mutex> lk(_mtx);_loop = &loop;}//唤醒所有线程,让其获取锁,进而获取loop的地址。_cv.notify_all();loop.Start();}
private:std::mutex _mtx;std::condition_variable _cv;EventLoop* _loop;std::thread _thd;
};
class LoopThreadPool
{
public://构造LoopThreadPool(EventLoop* loop):_tcnt(0),_tnxt_id(0),_baseloop(loop){}//析构~LoopThreadPool(){for(int i = 0; i < _tcnt; i++){delete _ths[i];}}//初始化设置子线程数量void Init(int thread_cnt = 0){_tcnt = thread_cnt;if(_tcnt > 0){_loops.resize(_tcnt);_ths.resize(_tcnt);for(int i = 0; i < _tcnt; i++){_ths[i] = new LoopThread();_loops[i] = _ths[i]->GetLoop();}}}//用于给连接分配线程,因为EventLoop与线程绑定,因此分配EventLoop,即分配线程。EventLoop *GetNextLoop(){if(0 == _tcnt){return _baseloop;}_tnxt_id = (_tnxt_id + 1) % _tcnt;return _loops[_tnxt_id];}
private:int _tcnt;int _tnxt_id;EventLoop * _baseloop;std::vector<EventLoop*> _loops;std::vector<LoopThread*> _ths;
};
- LoopThread为一个封装的线程,在调用构造函数时启动,其中EventLoop是在线程执行时初始化进行赋值的,因此其它线程获取EventLoop 必须在线程初始化之后,因此封装了锁和条件变量保证同步和互斥。EventLoop是在线程内部创建的,线程通过EventLoop进而死循环地执行任务池中的任务。
- LoopThreadPool是封装的一个线程池,用于给线程按照轮转的方式分配连接。
10. TcpServer
class TcpServer
{
public://初始化和启动TcpServer(int port = 8000):_nxt_id(0),_tim_out(0),_enrsetim(false),_pool(&_baseloop),_aptor(&_baseloop,port){}void Start(int thread_cnt = 0){_aptor.SetAcceptCb(std::bind(&TcpServer::ConClient,this,std::placeholders::_1));_pool.Init(thread_cnt);_baseloop.Start();}
private://用于Acceptor进行绑定连接处理函数void ConClient(int cfd){//将新连接放到Channel进行监控int newconid = _nxt_id++;ConPtr con(new Connection(_pool.GetNextLoop(),newconid,cfd));//设置回调函数con->SetConCb(_con);con->SetCloCb(_clo);con->SetMsgCb(_msg);con->SetEvtCb(_evt);//组件内清理回调con->SetSvrCb(std::bind(&TcpServer::ConRemove,this,std::placeholders::_1));con->SetTxtCb(_txt);//启动非活跃销毁if(true == _enrsetim) con->TimoutRse(_tim_out);//初始化连接con->Establish();//插入hash表管理最后进行释放_cons.insert({newconid,con});}//移除组件内信息,Connection内部的SetSvrCb绑定,在Realease进行调用。void ConRemove(const ConPtr& con){_baseloop.Run(std::bind(&TcpServer::ConRemoveInLoop,this,con));}void ConRemoveInLoop(const ConPtr& con){int id = con->Id();auto it = _cons.find(id);if(it != _cons.end()){_cons.erase(it);}}public://定时任务void EnRseTimout(int tim)//启动非活跃销毁{_enrsetim = true; _tim_out = tim;}void AddTimer(int delay,const TaskFunc& func)//添加定时任务{_baseloop.TimerAdd(_nxt_id,delay,func);//TimerAdd保证在同一个线程下串行化运行。}
private://分配的连接和非活跃idint _nxt_id;//超时处理时间int _tim_out;bool _enrsetim;//线程EventLoop _baseloop;LoopThreadPool _pool;//监听套接字Acceptor _aptor;//连接处理std::unordered_map<uint64_t,ConPtr> _cons;
public://设置回调函数void SetAllCb(msg_cb_t msg = nullptr,\con_cb_t con = nullptr,clo_cb_t clo = nullptr,\evt_cb_t evt = nullptr,txt_cb_t txt = nullptr){SetMsgCb(msg);SetConCb(con);SetCloCb(clo);SetEvtCb(evt);SetTxtCb(txt);}void SetMsgCb(msg_cb_t msg){_msg = msg;}void SetConCb(con_cb_t con){_con = con;}void SetTxtCb(txt_cb_t txt){_txt = txt;}void SetCloCb(clo_cb_t clo){_clo = clo;}void SetEvtCb(evt_cb_t evt){_evt = evt;}private://回调con_cb_t _con = nullptr; //用户设置的连接初始化回调msg_cb_t _msg = nullptr; //用户设置的消息处理回调clo_cb_t _clo = nullptr; //用户设置的关闭事件回调txt_cb_t _txt = nullptr; //记录协议上下文,本项目的上层协议没有用到。evt_cb_t _evt = nullptr; //用户设置的任意事件回调
};
TcpServer是一个整合之前所有实现模块的一个综合模块,内部独立实现连接的初始化和移除功能,分别绑定到Acceptor和Connection内部;整合Loop(内部有多线程,定时任务Time模块)进而提供多线程和定时任务的功能;向上还提供设置各种回调函数,本项目中在实现上层的Http协议时只用到了消息处理回调和连接初始化回调,其它应用层可根据需要自行设置。
尾序
在实现地过程中,感觉就像包包子一样,不断地将底层的接口这样"馅",通过封装,继承,多态思想这张"皮",最终呈现出一个完整的"包子",即呈现给上层的对象。对此模块的实现可以更好地体会到多线程的工作方式和执行逻辑,同时可以对系统IO有了更加深刻的理解。希望写这篇文章对各位也能有所收获,我是舜华,期待与你的每一次相遇!
我们下篇再见了!