Reactor介绍,如何从简易版本的epoll修改成Reactor模型(demo版本代码+详细介绍)

embedded/2024/10/21 23:14:30/

目录

Reactor demo​​​​​​​

引入

比喻 

修改代码

connection

tcp_server

ET模式

主逻辑

处理事件

运行结果

代码

完善功能

读取数据

运行结果

​编辑

代码

处理数据

回指指针 

如何处理写事件

引入

循环内

处理对写事件的关心

异常处理

代码

server.hpp

server.cpp

运行结果


Reactor demo

引入

Reactor 模式是一种设计模式,广泛应用于处理高效的事件驱动编程和网络编程中

  • 用于管理和分发来自不同事件源的事件
  • 核心目的是优化系统在处理大量事件时的性能,特别是在 I/O 操作密集的环境中

Reactor 是一个半同步半异步模型 

  • reactor直译过来就是反应堆
  • 同步 -- 调用epoll接口等待的过程
  • 异步 -- 事件以回调方式进行处理

比喻 

回想起我们曾经玩过的打地鼠游戏:

  • 整个游戏界面就是reactor模型,操作的人就是多路转接方案,检测每个洞(连接)有没有地鼠出来(事件就绪),一旦出来就砸他(执行注册好的回调方法)
  • 这里我们写的代码算是同步,因为需要在内部处理事件
  • 如果要写异步,可以搞个线程池,将收到的数据直接push进任务队列中,交给线程池处理,我们只进行io,然后在内部搞个字段来接收线程池返回的结果

这里我们只是写一个demo版本的(当然,demo版本也很麻烦)

(大家如果遇到什么问题,可以评论交流)

修改代码

之前我们只调用一次read,无法确定是否读完了一份完整数据,并且只有读功能 -- epoll接口使用 -- 非阻塞式网络io(仅读事件)-CSDN博客

  • 在这份代码中,我们并没有保证读完所有数据,并且也不会有机会拿到完整数据,因为每次读取都会创建新的临时缓冲区
  • 所以,我们需要把没读完的数据临时缓存起来

因为应用层上存在大量连接,每个连接都对应一个套接字文件,这些连接都会遇到这个问题

  • 所以需要给每个文件都设置输入输出缓冲区,并定义结构来管理

这次主要有两个模块:

connection

对应我们上面说的,是对文件的管理结构,结合网络通信+epoll,可以确定里面的成员变量:

  • 要有每个连接对应的套接字fd
  • 缓冲区肯定输入输出都要有(这里我们就用string就行,虽然它并不适合处理二进制流,应该用vector,但vector会有很多拷贝,所以方便起见,还是用string)
  • 可以将处理读/写/异常事件就绪时的回调函数也放在里面,刚好可以实现自定义特定文件的处理方式 -- 这样可以使用类内的缓冲区,而不是在读数据时,调用公共函数,将数据添加到公共临时缓冲区中
  • 定义一个回指指针,指向tcp_server(按下不表,在后续说明)

tcp_server

是我们服务器的类

  • 肯定先要包括之前我们封装好的epoll接口和socket接口对象

需要管理多个连接,也就是需要一个结构来将多个connection对象组织起来

  • 使用unordered_map结构,建立fd->连接结构的映射关系
  • 每次将新的要关注的文件添加到connections中,一旦有文件上的事件就绪,就可以通过fd,找到处理事件的方法

我们目前将读事件分为两类,所以需要两种回调函数

  • 获取新连接 和 读取数据
  • 那么,最好是先定义出针对各种类型的处理函数,然后根据文件类型,手动设置好我们需要的方法
  • 因为我们是在服务器内部进行回调函数的设置,所以将回调函数定义在类内,使用会更方便

以上,我们可以定义一个函数来解决,总的来说分为两步:

  • 将需要关注的[新文件上的特定事件]添加进epoll模型中 (内核层)
  • 将[新文件+如何处理特定事件]添加进connections中 (用户层)

除此之外,我们可以直接在类内定义一个struct epoll_event数组,存放从内核捞取出的就绪事件,然后交给事件派发器

ET模式

保证服务器以ET模式工作,要设置相应的标志位:

以及,为了保证全部读取,需要将fd设置为非阻塞io方式 -- fcntl()

主逻辑

服务器不断循环,循环过程中派发事件

  • 然后在派发器逻辑中,每次获取一个就绪事件,分辨是哪个文件上的哪个事件就绪了,然后调用注册好的回调函数

判断是否就绪:

  • 我们检测是否是读事件就绪,和epoll的工作模式无关,只检测EPOLLIN就行了

如果出现异常(EPOLLERR,EPOLLHUP),统一转化为读写问题(设置两个标记位)

  • 因为一旦出现异常,读写一定会受到影响,只要转化,就能在读写函数内部解决 ??

而且,在正式处理前,我们需要一个函数,来判断当前连接是否安全(是否是我们需要关注的文件),以及当前对应的处理函数是否被设置

处理事件

连接/数据到来时,我们无法确定只有一个连接/一份数据,并且这里是在ET模式下,必须要读取出所有连接

  • 所以,我们写一个循环来获取连接/读取数据,直到读完  

运行结果

我们从本地连接上该服务,会发现连接成功:

最后一条的警告,是我们在socket.hpp中封装的accept里显示的,因为此时是非阻塞式+底层没有数据,所以系统调用的accept走了返回值<0的情况,然后打印到日志上

  • 我们可以在内部添加判断语句,如果错误码=EAGAIN,就不打印日志

代码

#pragma once#include <memory>
#include <errno.h>
#include <string>
#include <functional>
#include <fcntl.h>#include "Log.hpp"
#include "socket.hpp"
#include "myepoll.hpp"#include <unordered_map>class connection;using func_t = std::function<void(std::shared_ptr<connection>)>;class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区
{int fd_;std::string in_buffer_;std::string out_buffer_;public:func_t read_cb_;func_t write_cb_;func_t except_cb_;// 回指指针
public:connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb): fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {}~connection() {}int get_fd(){return fd_;}private:
};class epoll_server
{static const int def_timeout = 1000;static const int def_num = 64;static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET;int port_;std::shared_ptr<MY_SOCKET> p_listen_sock_;std::shared_ptr<MY_EPOLL> p_epoll_;std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系struct epoll_event events_[def_num];public:epoll_server(int port): port_(port), p_listen_sock_(new MY_SOCKET), p_epoll_(new MY_EPOLL(def_timeout)) {}~epoll_server() {}void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb){// 添加到connections中 -- 用户层connections_.insert(std::make_pair(fd, std::make_shared<connection>(fd, read_cb, write_cb, except_cb)));// 添加到epoll模型 -- 内核p_epoll_->ctl(EPOLL_CTL_ADD, fd, event);}void init(){p_listen_sock_->Socket();set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式p_listen_sock_->Bind(port_);p_listen_sock_->Listen();// 添加监听套接字add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr);lg(DEBUG, "listen_socket add success\n");}void loop(){init();while (true){int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件for (int i = 0; i < n; ++i){Dispatcher(events_[i]); // 每次处理一个就绪事件}}}private:void accept(std::shared_ptr<connection> conn) // 处理连接事件{// 连接到来时,我们要循环处理,直到无数据while (true){std::string clientip;uint16_t clientport;int sock = p_listen_sock_->Accept(clientip, clientport);if (sock > 0){lg(INFO, "get a new connection ,fd : %d\n", sock);set_no_block(sock); // 设置为非阻塞式// 将新套接字添加进connections和epoll模型add_sock(sock, EVENT_IN,std::bind(&epoll_server::receiver, this, std::placeholders::_1), nullptr, nullptr);}else{// 如果底层无数据,也会错误返回,并设置错误码11if (errno == EAGAIN) // 无数据{break;}else if (errno == EINTR) // 被信号中断{continue;}else{lg(ERROR, "accept error\n");break;}}}}void receiver(std::shared_ptr<connection> conn){}void Dispatcher(struct epoll_event &sock){int fd = sock.data.fd; // 需要判断是否是我们关注的文件if (!is_safe(fd)){lg(DEBUG, "fd: %s is not safe\n", fd);return;}int event = sock.events;if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件{event |= EPOLLIN;event |= EPOLLOUT;}if ((event & EPOLLIN) && connections_[fd]->read_cb_) // 如果读取回调存在{connections_[fd]->read_cb_(connections_[fd]); // 调用读回调}if ((event & EPOLLOUT) && connections_[fd]->write_cb_) // 如果读取回调存在{connections_[fd]->write_cb_(connections_[fd]); // 调用读回调}}bool is_safe(int fd){auto it = connections_.find(fd);if (it != connections_.end()){return true;}else{return false;}}void set_no_block(int fd){int ret = fcntl(fd, F_GETFL);if (ret < 0){perror("fcntl");return;}fcntl(fd, F_SETFL, ret | O_NONBLOCK);}
};

完善功能

读取数据

完善普通文件的回调函数(监听套接字只需要处理读事件,但通信用的套接字需要三种事件都处理)

我们先写好读取数据的函数:

  • 循环读取至读完全部数据
  • 读取一段就放入输入缓冲区中(服务器不应该关心数据格式/是否是一份完整数据,只要把数据全拿到手就行)
  • 因为我们是非阻塞式,所以一旦读取完毕,会返回错误,我们需要将返回值<0的情况分类: 读完数据 / 因异常信号中断读取 / 真的出错
  • 然后,我们在真的出错时,调用异常处理函数
  • 同理,在对方关闭连接时,也需要进入异常处理阶段

这里为了日志好看,可以在connection结构中增加两个字段 -- ip和port

运行结果

可以看到,随着我们的输入,打印出的[输入缓冲区的内容]变得更多:

代码

​
#pragma once#include <memory>
#include <errno.h>
#include <string>
#include <functional>
#include <fcntl.h>#include "Log.hpp"
#include "socket.hpp"
#include "myepoll.hpp"#include <unordered_map>class connection;using func_t = std::function<void(std::shared_ptr<connection>)>;class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区
{int fd_;std::string in_buffer_;std::string out_buffer_;public:func_t read_cb_;func_t write_cb_;func_t except_cb_;// 方便日志打印std::string ip_;uint16_t port_;// 回指指针
public:connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb): fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {}~connection() {}int get_fd(){return fd_;}void append(const std::string &str){in_buffer_ += str;}std::string& inbuffer(){return in_buffer_;}private:
};class epoll_server
{static const int def_timeout = 1000;static const int def_num = 64;static const int def_buffsize = 128;static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET;int port_;std::shared_ptr<MY_SOCKET> p_listen_sock_;std::shared_ptr<MY_EPOLL> p_epoll_;std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系struct epoll_event events_[def_num];public:epoll_server(int port): port_(port), p_listen_sock_(new MY_SOCKET), p_epoll_(new MY_EPOLL(def_timeout)) {}~epoll_server() {}void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb){// 添加到connections中 -- 用户层connections_.insert(std::make_pair(fd, std::make_shared<connection>(fd, read_cb, write_cb, except_cb)));// 添加到epoll模型 -- 内核p_epoll_->ctl(EPOLL_CTL_ADD, fd, event);}void init(){p_listen_sock_->Socket();set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式p_listen_sock_->Bind(port_);p_listen_sock_->Listen();// 添加监听套接字add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr);lg(DEBUG, "listen_socket add success\n");}void loop(){init();while (true){int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件for (int i = 0; i < n; ++i){Dispatcher(events_[i]); // 每次处理一个就绪事件}}}private:void accept(std::shared_ptr<connection> conn) // 处理连接事件{// 连接到来时,我们要循环处理,直到无数据while (true){std::string clientip;uint16_t clientport;int sock = p_listen_sock_->Accept(clientip, clientport);if (sock > 0){lg(DEBUG, "get a new client, get info-> [%s:%d], sockfd : %d", clientip.c_str(), clientport, sock);set_no_block(sock); // 设置为非阻塞式// 将新套接字添加进connections和epoll模型add_sock(sock, EVENT_IN,std::bind(&epoll_server::receiver, this, std::placeholders::_1),std::bind(&epoll_server::sender, this, std::placeholders::_1),std::bind(&epoll_server::excepter, this, std::placeholders::_1));}else{// 如果底层无数据,也会错误返回,并设置错误码11if (errno == EAGAIN) // 无数据{break;}else if (errno == EINTR) // 被信号中断{continue;}else{lg(ERROR, "accept error\n");break;}}}}void receiver(std::shared_ptr<connection> conn){while (true) // 读取至底层无数据{char buffer[def_buffsize];int n = read(conn->get_fd(), buffer, sizeof(buffer) - 1);if (n > 0) // 还没读完{buffer[n] = 0;conn->append(buffer);}else if (n == 0) // 对方关闭连接{lg(INFO, "sockfd: %d, client info %s:%d quit...", conn->get_fd(), conn->ip_.c_str(), conn->port_);conn->except_cb_(conn); // 关注异常事件return;}else // 出错/读完{if (errno == EAGAIN) // 读完全部数据{break;}else if (errno == EINTR){continue;}else // 真的出错{conn->except_cb_(conn); // 关注异常事件return;}}}}void sender(std::shared_ptr<connection> conn) {}void excepter(std::shared_ptr<connection> conn) {}void Dispatcher(struct epoll_event &sock){int fd = sock.data.fd; // 需要判断是否是我们关注的文件if (!is_safe(fd)){lg(DEBUG, "fd: %s is not safe\n", fd);return;}int event = sock.events;if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件{event |= EPOLLIN;event |= EPOLLOUT;}if ((event & EPOLLIN) && connections_[fd]->read_cb_) // 如果读回调存在{connections_[fd]->read_cb_(connections_[fd]); // 调用读回调}if ((event & EPOLLOUT) && connections_[fd]->write_cb_) // 如果写回调存在{connections_[fd]->write_cb_(connections_[fd]); // 调用写回调}print(connections_[fd]);}bool is_safe(int fd){auto it = connections_.find(fd);if (it != connections_.end()){return true;}else{return false;}}void set_no_block(int fd){int ret = fcntl(fd, F_GETFL);if (ret < 0){perror("fcntl");return;}fcntl(fd, F_SETFL, ret | O_NONBLOCK);}void print(std::shared_ptr<connection> conn){std::cout << "fd: " << conn->get_fd() << " , ";std::cout << "in_buffer: " << conn->inbuffer().c_str() << std::endl;}
};​

处理数据

虽然我们存入了数据,但我们还没有处理数据

处理数据应该交给用户来决定(也就是使用回调函数,在参数中传入connection对象即可,里面包含该文件读到的所有数据),因为这部分属于应用层的事情

  • 检测数据是否完整(协议定制,序列化/反序列化)
  • 如果包含一份完整数据,进行处理(具体业务处理)

我们将之前写过的网络计算机代码拿过来用 -- 网络计算器(使用json序列化/反序列化,条件编译,注意点),json介绍+语法介绍_json序列化和反序列化工具-CSDN博客

网络计算器代码编写+注意点(序列化,反序列化,报头封装和解包,服务端和客户端,计算),客户端和服务端数据传递流程图,守护进程化+日志重定向到文件_计算器封装-CSDN博客

  • 直接定义一个函数,使用自定义协议来处理数据(将输入缓冲区中的数据做处理,然后把结果写回输出缓冲区)
  • 然后在实例化服务器时,将该函数作为实参传进去
  • 这样,就可以在读取完数据之后进行处理,并且发送 (注意,发送数据应该由服务器来做,也就需要我们的回指指针发挥用处 -- 回指服务器,然后由服务器调用发送函数)
回指指针 

因为connection和epoll_server会互相引用

  • server里保存了全部connection结构的指针,用于管理
  • 而connection里也需要引用epoll_server,来使用里面的函数
  • 这样就造成了循环引用问题

所以,需要我们将connection里存放的回指指针的类型定义成weak_ptr,当需要使用时再转换为shared_ptr

  • 而我们传参时,必须通过shared_ptr来转换为weak_ptr,所以我们调用shared_from_this()来返回当前对象的shared_ptr
  • 而调用该函数的前提是,让需要使用的类继承enable_shared_from_this这个模板类

如何处理写事件

引入

因为写事件关注的是发送缓冲区是否有空间

  • 缓冲区经常都是有空间的,所以写事件经常会就绪
  • 而一旦事件就绪,wait就会返回
  • 但我们通常真正关心的是"是否有数据可以发送",而不是"是否有空间"
  • 所以写事件,要按需设置是否关心 -- 代码体现

对于读事件来说,我们设置常关心

  • 因为读事件看的就是有无数据
循环内

那我们该如何写入呢?

  • 直接调用写函数(send/write)
  • 并且要像读数据一样,需要把输出缓冲区内的数据全部写进文件的发送缓冲区中才行
  • 所以要循环写入

并且,和读取不一样,需要我们手动删除输出缓冲区中的数据

  • 因为读取是从内核读到用户层,内核会自动帮我们删除已读出的数据

接下来,说说函数返回值的问题:

  • 如果返回值>0,说明此时成功写入了数据,需要我们删除已经写入的数据(如果已经将数据全部写完了,退出循环)
  • 返回值为0,说明此时缓冲区内没有数据,压根没有写入数据,直接返回
  • 注意这里[退出循环]和[直接返回]的区别,因为我们要在循环结束后,处理对写事件的关心

发送出错,分几种情况(和处理读数据一样)

  • 底层缓冲区没有空间了,返回EWOULDBLOCK(=EAGIN=11) 
  • 被信号中断
  • 真的出错
处理对写事件的关心

出循环后分为两种情况:

如果outbuffer里还有数据没写完 -- 设置对写事件的关心 

  • 因为此时受限于底层的缓冲区空间,所以需要关注写事件
  • 一旦发送缓冲区有空间了,就会通知我们,然后回调我们的写处理函数,继续发送数据

outbuffer里的数据已经被写完了 -- 取消对写事件的关心

  • 数据已经写完了,即使有空间也不需要,所以不用关注

以上可以自定义一个使能事件的函数,可以自主决定是否开启读/写事件

  • 是否开启 -- bool类型字段
  • 然后在内部调用epoll_ctl函数,来修改特定文件对事件的关注

异常处理

一旦走到异常处理的函数中,一定是出问题了

  • 那就直接移除epoll中对该文件上事件的关心
  • 关闭这个连接
  • 从自定义的连接管理结构中移除

代码

server.hpp
#pragma once#include <memory>
#include <errno.h>
#include <string>
#include <functional>
#include <fcntl.h>
#include <unordered_map>#include "Log.hpp"
#include "socket.hpp"
#include "myepoll.hpp"class connection;
class epoll_server;using func_t = std::function<void(std::shared_ptr<connection>)>;class connection // 每个文件都对应一个连接,拥有自己的输入输出缓冲区
{int fd_;std::string in_buffer_;std::string out_buffer_;public:func_t read_cb_;func_t write_cb_;func_t except_cb_;// 方便日志打印std::string ip_;uint16_t port_;// 使用 weak_ptr 防止循环引用std::weak_ptr<epoll_server> p_svr_;public:connection(int sockfd, func_t read_cb, func_t write_cb, func_t except_cb): fd_(sockfd), read_cb_(read_cb), write_cb_(write_cb), except_cb_(except_cb) {}~connection() {}void set_p_svr(std::weak_ptr<epoll_server> ptr){p_svr_ = ptr;}int get_fd() { return fd_; }void in_append(const std::string &str) { in_buffer_ += str; }void out_append(const std::string &str) { out_buffer_ += str; }std::string &inbuffer() { return in_buffer_; }std::string &outbuffer() { return out_buffer_; }
};class epoll_server : public std::enable_shared_from_this<epoll_server>, public no_copy
{static const int def_timeout = 1000;static const int def_num = 64;static const int def_buffsize = 128;static const uint32_t EVENT_IN = EPOLLIN | EPOLLET; // 设置ET模式static const uint32_t EVENT_OUT = EPOLLOUT | EPOLLET;int port_;func_t handle_;std::shared_ptr<MY_SOCKET> p_listen_sock_;std::shared_ptr<MY_EPOLL> p_epoll_;std::unordered_map<int, std::shared_ptr<connection>> connections_; // 建立fd->连接对象的映射关系struct epoll_event events_[def_num];public:epoll_server(int port, func_t handle): port_(port), handle_(handle), p_listen_sock_(new MY_SOCKET()), p_epoll_(new MY_EPOLL(def_timeout)) {}~epoll_server() {}void add_sock(int fd, uint32_t event, func_t read_cb, func_t write_cb, func_t except_cb, const std::string &ip = "0.0.0.0", uint16_t port = 0){std::shared_ptr<connection> new_connection(new connection(fd, read_cb, write_cb, except_cb));new_connection->set_p_svr(shared_from_this()); // shared_from_this(): 返回当前对象的shared_ptr,要确保epoll_server已经以shared_ptr的形式存在(主函数中以shared_ptr形式实例化对象)new_connection->ip_ = ip;new_connection->port_ = port;connections_.insert(std::make_pair(fd, new_connection));p_epoll_->ctl(EPOLL_CTL_ADD, fd, event);}void loop(){init();while (true){int n = p_epoll_->wait(events_, def_num); // 等待并获取就绪事件for (int i = 0; i < n; ++i){Dispatcher(events_[i]); // 每次处理一个就绪事件}}}void receiver(std::shared_ptr<connection> conn){int fd = conn->get_fd();while (true) // 读取至底层无数据{char buffer[def_buffsize];memset(buffer, 0, sizeof(buffer));int n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0) // 还没读完{conn->in_append(buffer);}else if (n == 0) // 对方关闭连接{lg(INFO, "sockfd: %d, client info %s:%d quit", fd, conn->ip_.c_str(), conn->port_);conn->except_cb_(conn); // 关注异常事件return;}else // 出错/读完{if (errno == EAGAIN) // 读完全部数据{break;}else if (errno == EINTR){continue;}else // 真的出错{lg(WARNING, "sockfd: %d, client info %s:%d error", fd, conn->ip_.c_str(), conn->port_);conn->except_cb_(conn); // 关注异常事件return;}}}// 读完了数据,就该处理了,但不一定包含了一份完整报文handle_(conn);}void excepter(std::shared_ptr<connection> conn){int fd = conn->get_fd();lg(WARNING, "Excepter hander sockfd: %d, client info %s:%d excepter handler", fd, conn->ip_.c_str(), conn->port_);p_epoll_->ctl(EPOLL_CTL_DEL, fd, 0);close(fd);lg(DEBUG, "close %d done\n", fd);connections_.erase(fd);lg(DEBUG, "remove %d from _connections done\n", fd);}void sender(std::shared_ptr<connection> conn){auto &buffer = conn->outbuffer();int fd = conn->get_fd();while (true){ssize_t n = write(fd, buffer.c_str(), buffer.size()); // 将输出缓冲区的内容写入内核if (n > 0)                                            // 写入一定数据{buffer.erase(0, n);if (buffer.empty()) // 数据写完了{break;}}else if (n == 0) // 没有数据可写{return;}else{if (errno == EAGAIN){break;}else if (errno == EINTR){continue;}else{lg(WARNING, "sockfd: %d, client info %s:%d send error...", conn->get_fd(), conn->ip_.c_str(), conn->port_);conn->except_cb_(conn);return;}}}// 判断接下来是否需要关注写事件if (buffer.empty()){enable_event(fd, true, false);}else{enable_event(fd, true, true);}}private:void init(){p_listen_sock_->Socket();set_no_block(p_listen_sock_->get_fd()); // 设置为非阻塞式p_listen_sock_->Bind(port_);p_listen_sock_->Listen();// 添加监听套接字add_sock(p_listen_sock_->get_fd(), EVENT_IN, std::bind(&epoll_server::accept, this, std::placeholders::_1), nullptr, nullptr);lg(DEBUG, "listen_socket add success\n");}void accept(std::shared_ptr<connection> conn) // 处理连接事件{while (true){std::string clientip;uint16_t clientport;int sock = p_listen_sock_->Accept(clientip, clientport);if (sock > 0){lg(DEBUG, "get a new client, get info-> [%s:%d], sockfd : %d", clientip.c_str(), clientport, sock);set_no_block(sock); // 设置为非阻塞式// 将新套接字添加进connections和epoll模型add_sock(sock, EVENT_IN,std::bind(&epoll_server::receiver, this, std::placeholders::_1),std::bind(&epoll_server::excepter, this, std::placeholders::_1),std::bind(&epoll_server::excepter, this, std::placeholders::_1),clientip, clientport);}else{if (errno == EAGAIN) // 无数据{break;}else if (errno == EINTR) // 被信号中断{continue;}else{lg(ERROR, "accept error\n");break;}}}}void Dispatcher(struct epoll_event &sock){int fd = sock.data.fd; // 需要判断是否是我们关注的文件if (!is_safe(fd)){lg(DEBUG, "fd: %d is not safe\n", fd);return;}auto conn = connections_[fd];if (!conn)return;int event = sock.events;if ((event & EPOLLHUP) || (event & EPOLLERR)) // 异常事件转换为读写事件{event |= EPOLLIN;event |= EPOLLOUT;}if ((event & EPOLLIN) && conn->read_cb_) // 如果读回调存在{conn->read_cb_(conn); // 调用读回调}if ((event & EPOLLOUT) && conn->write_cb_) // 如果写回调存在{conn->write_cb_(conn); // 调用写回调}}bool is_safe(int fd){return connections_.find(fd) != connections_.end(); // 是否在connections结构中存在}void set_no_block(int fd){int ret = fcntl(fd, F_GETFL);if (ret < 0){perror("fcntl");return;}fcntl(fd, F_SETFL, ret | O_NONBLOCK);}void enable_event(int fd, bool f_read, bool f_write){if (fd < 0){lg(ERROR, "Invalid file descriptor: %d", fd);return;}uint32_t event = 0;event |= ((f_read ? EPOLLIN : 0) | (f_write ? EPOLLOUT : 0) | EPOLLET);p_epoll_->ctl(EPOLL_CTL_MOD, fd, event);}
};
server.cpp
#include "server.hpp"
#include "cal.hpp"void def_handle(std::weak_ptr<connection> conne)
{if (conne.expired())return;auto conn = conne.lock();calculate Cal;std::string str = Cal.cal(conn->inbuffer()); // 处理数据,得到结果if (str.empty()){return;}//lg(DEBUG, "get data: %s\n", str.c_str());conn->out_append(str); // 添加到输出缓冲区//lg(DEBUG, "out_append success\n");// 写入auto server = conn->p_svr_.lock(); // weak_ptr不拥有对象的所有权,需要转换为shared_ptrserver->sender(conn);             // 需要让服务器调用写处理函数,后续让服务器擦屁股(也许没有写入全部数据)//lg(DEBUG, "sender success\n");
}int main()
{std::shared_ptr<epoll_server> epoll_svr(new epoll_server(8080, def_handle));epoll_svr->loop();return 0;
}

其他代码在压缩包里

运行结果

我们把网络计算器的客户端也拿过来 -- 网络计算器代码编写+注意点(序列化,反序列化,报头封装和解包,服务端和客户端,计算),客户端和服务端数据传递流程图,守护进程化+日志重定向到文件_计算器封装-CSDN博客

直接做测试:


http://www.ppmy.cn/embedded/118209.html

相关文章

Windows系统的Tomcat日志路径配置

文章目录 引言I Windows系统的Tomcat日志路径配置配置常规日志路径访问日志路径配置,修改server.xmlII 日志文件切割:以分隔割tomcat 的 catalina.out 文件为例子通过Linux系统自带的切割工具logrotate来进行切割引言 需求:C盘空间不足,处理日志文件,tomcat日志迁移到D盘…

JS封装函数转换时间案例

<script>let time prompt("请输入需要转换的秒数总数")function getTime(x) {let h parseInt(x / 60 / 60 % 60)let m parseInt(x / 60 % 60)let s parseInt(x % 60)h h > 10 ? h : 0 hm m > 10 ? m : 0 ms s > 10 ? s : 0 sreturn [h, …

Maven配置、下载教程(非常详细)

maven下载地址 Maven – 发行说明 – Maven 3.6.1 (apache.org) 1.配置settings.xml 下载完maven之后&#xff0c;保存在电脑中并解压 打开maven文件 -->conf-->settings.xml 使用记事本方式打开 打开之后找到这个地方&#xff0c;在电脑中创建一个文件夹(repository)…

SpringBoot使用validation进行自参数校验

一&#xff1a;介绍 在 SpringBoot 项目开发中&#xff0c;很多与数据库交互的参数需要校验数据正确性。很多小伙伴会把参数判断写进代码里&#xff0c;但是这种写法往往会有低可读性以及多处使用的时候&#xff0c;需要变更验证规则时&#xff0c;不易于维护等缺点。今天给大家…

数据结构:线性表

1、线性表概述 1.1线性表的定义 线性表&#xff08;list&#xff09;&#xff1a;零个或多个数据元素的有限序列。 简单地来说&#xff0c;我们可以用下面这张图来描述一个线性表&#xff1a; 1.2 线性表的存储结构 1.2.1顺序存储结构——顺序表 顺序表是将数据全部存储到…

Selenium爬虫-获取天气并使用MapReduce分析温度最高的那一天

文章目录 Selenium爬虫pom.xmlWeatherScraper.javaweatherScraper.java结果使用MapReduce进行分析最高哪一天气温最高map部分reduce部分main部分输出结果 Selenium爬虫 pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"ht…

C++中的stoi()与to_string():

stoi()函数 stoi()函数是C11标准中引入的一个新函数&#xff0c;用于将字符串转换为整数。它的名称是“string to integer”的缩写&#xff0c;意为“字符串到整数”的转换。 stoi()函数的原型如下&#xff1a; int stoi(const string& str, size_t* pos 0, int base 10…

Vue.js与Flask/Django后端配合

Vue.js与Flask/Django后端配合 在现代Web开发领域&#xff0c;前后端分离已成为一种流行的架构模式。Vue.js作为一款轻量级、高性能的前端框架&#xff0c;与Flask或Django这样的后端框架相结合&#xff0c;可以构建出强大且可扩展的Web应用。本文将详细介绍如何将Vue.js与Fla…