[muduo网络库]——muduo库TcpConnection类,万字总结(剖析muduo网络库核心部分、设计思想)

embedded/2024/10/19 5:28:42/

接着之前我们[muduo网络库]——muduo库Buffer类(剖析muduo网络库核心部分、设计思想),我们接下来继续看muduo库中的TcpConnection类。

TcpConnection类

TcpConnection类是muduo最核心的类,这个类主要封装了一个已建立的TCP连接,以及控制该TCP连接的方法(连接建立和关闭和销毁),以及该连接发生的各种事件(读/写/错误/连接)对应的处理函数,以及这个TCP连接的服务端和客户端的套接字地址信息等。

重要成员变量

EventLoop *loop_; //绝对不是baseloop,因为TcpConnetion都是在subloop中管理的
const std::string name_;
std::atomic_int state_;
bool reading_;std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;ConnectionCallback connectionCallback_; //有新连接时的回调
MessageCallback messageCallback_; //有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; //消息发送完成以后的回调
CloseCallback closeCallback_;
HighWaterMarkCallback highWaterMarkCallback_;size_t highWaterMark_;Buffer inputBuffer_; //接受数据的缓冲区
Buffer outputBuffer_; //发送数据的缓冲区
  • loop_ 该Tcp连接的Channel注册到了哪一个sub EventLoop上。这个loop_就是那一个sub EventLoop。
  • name_客户端的名字
  • state_ 客户端的状态,对应的有一个枚举类型,分别对应着已经断开连接,正在连接,已经连接,正在断开连接。
enum StateE {kDisconnected, kConnecting, kConnected, kDisconnecting};
  • reading_ 连接是否正在监听读事件
  • socket_ 连接套接字, 用于对连接进行底层操作
  • channel_ 通道, 用于绑定要监听的事件
  • localAddr_本地IP地址
  • peerAddr_对端IP地址
  • connectionCallback_messageCallback_writeCompleteCallback_highWaterMarkCallback_closeCallback_对应的连接建立/关闭后的处理函数,收到消息后的处理函数,消息发送完后的处理函数,高水位回调,连接关闭后的处理函数。
  • highWaterMark_ 因为发送数据,应用写得快,内核发送数据慢,需要把待发送的数据写入缓冲区,且设置了水位回调,防止发送太快
  • inputBuffer_outputBuffer_输入输出缓冲区,在输出缓冲区是用于暂存那些暂时发送不出去的待发送数据。因为Tcp发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到Tcp发送缓冲区,而是暂存在这个outputBuffer_中,等TCP发送缓冲区有空间了,触发可写事件了,再把outputBuffer_中的数据拷贝到Tcp发送缓冲区中。

重要成员函数

  • 构造函数给channel设置相应的回调函数
 //当通道有读事件时候在Channel::handleEvent()内调用:TcpConnection::handleRead()
channel_->setReadCallback(std::bind(&TcpConnection::handleRead,this,std::placeholders::_1));//当通道有写事件的时候在Channel::handleEven()内调用:TcpConnection::handleWrite()
channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite,this));//当通道有关闭事件的时候在Channel::handleEvent()内调用:TcpConnection::handleClose()
channel_->setCloseCallback(std::bind(&TcpConnection::handleClose,this));//当通道有错误事件的时候在Channel::handleEvent()内调用:TcpConnection::handleError()
channel_->setErrorCallback(std::bind(&TcpConnection::handleError,this));LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n",name_.c_str(),sockfd);
//开启Tcp/Ip层的心跳包检测
socket_->setKeepAlive(true);
  • 一系列的获取loop_name_,地址,状态的函数。
EventLoop* getLoop() const { return loop_;}
const std::string& name() const { return name_;}
const InetAddress& localAddress() const { return localAddr_;}
const InetAddress& peerAddress() const { return peerAddr_; }
bool connected() const { return state_ == kConnected;}
  • 发送数据
void TcpConnection::send(const std::string &buf) //直接引用buffer
{if(state_ == kConnected){if(loop_->isInLoopThread()){//string.c_str是Borland封装的String类中的一个函数,它返回当前字符串的首字符地址。sendInLoop(buf.c_str(),buf.size());}else{loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));}}
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{ssize_t nwrote = 0;size_t remaining = len; //未发送的数据bool faultError = false; //记录是否产生错误//之前调用过connection的shutdown 不能在发送了if(state_ == kDisconnected){LOG_ERROR("disconnected,give up writing!");return ;}//channel 第一次开始写数据,且缓冲区没有待发送数据if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0){nwrote = ::write(channel_->fd(),data,len);if(nwrote >= 0){remaining = len - nwrote;if(remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}}else{nwrote = 0;if(errno != EWOULDBLOCK) //用于非阻塞模式,不需要重新读或者写{LOG_ERROR("TcpConnection::sendInLoop");if(errno == EPIPE || errno == ECONNRESET) //SIGPIPE RESET{faultError = true;}}}}if(!faultError && remaining > 0) {//目前发送缓冲区剩余的待发送数据的长度size_t oldlen = outputBuffer_.readableBytes();if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_){loop_->queueInLoop(std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaining));}outputBuffer_.append((char*)data + nwrote,remaining);if(!channel_->isWriting()){channel_->enableWriting(); //注册channel写事件,否则poller不会向channel通知epollout}}
}

1) 发送数据要发送的数据长度是len,如果在loop_在当前的线程里面,就调用sendInLoopsendInLoop内部实际上是调用了系统的write,如果一次性发送完了,就设置writeCompleteCallback_,表明不要再给channel设置epollout事件了
2)如果没有写完,先计算一下oldlen目前发送缓冲区剩余的待发送数据的长度。满足:

if(oldlen + remaining >= highWaterMark_ && oldlen < highWaterMark_&& highWaterMark_)

就会触发高水位回调
3)不满足以上的话,直接写入outputBuffer_
4) 剩余的数据保存到缓冲区当中,要给给channel注册epollout事件(切记,一定要注册channel的写事件,否则poller不会向channel通知epollout),这样poller发现tcp发送缓冲区有空间,会通知相应的socket-channel调用相应的writeCallback()回调方法,也就是调用TcpConnection::handleWrite,把发送缓冲区中数据全部发送出去。

  • 重中之重TcpConnection::handleRead负责处理Tcp连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中inputBuffer_,然后再调用connectionCallback_保存的连接建立后的处理函数。
void TcpConnection::handleRead(TimeStamp receiveTime)
{int savedErrno = 0;ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);if(n > 0){      messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);}else if(n==0) //客户端断开{handleClose();} else{errno = savedErrno;LOG_ERROR("TcpConnection::hanleRead");handleError();}}
  1. 关于readFd在Buffer类中我们已经剖析过了Buffer类,接着已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessageshared_from_this()获取了当前TcpConnection对象的智能指针.
  2. n=0,说明客户端断开了,调用连接关闭后的处理函数。
  3. n<0 出错了,调用错误处理回调
  • handleWrite( )负责处理Tcp连接的可写事件
void TcpConnection::handleWrite()
{if(channel_->isWriting()){int savedErrno = 0;ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);if(n > 0){outputBuffer_.retrieve(n); //处理了n个if(outputBuffer_.readableBytes() == 0) //发送完成{channel_->disableWriting(); //不可写了if(writeCompleteCallback_){//唤醒loop对应的thread线程,执行回调loop_->queueInLoop(std::bind(writeCompleteCallback_,shared_from_this()));}if(state_ == kDisconnecting){shutdownInLoop();// 在当前loop中删除TcpConnection}}}else{LOG_ERROR("TcpConnection::handleWrite");}}else{LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());}
}
  1. 如果可写,通过fd发送数据,直到发送完成
  2. 设置不可写,如果writeCompleteCallback_,唤醒loop对应的thread线程,执行回调
  3. 当前TCP正在断开连接,调用shutdownInLoop,在当前loop中删除TcpConnection
  • 处理Tcp连接关闭的事件handleClose()
void TcpConnection::handleClose()
{LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);setState(kDisconnected);channel_->disableAll();TcpConnectionPtr connPtr(shared_from_this());connectionCallback_(connPtr); //执行连接关闭的回调closeCallback_(connPtr); //关闭连接的回调 TcpServer => TcpServer::removeConnection
}

处理逻辑就是将这个TcpConnection对象中的channel_从事件监听器中移除。然后调用connectionCallback_closeCallback_保存的回调函数。closeCallback_TcpServer::newConnection()为新连接新建TcpConnection时,已设为TcpServer::removeConnection(),而removeConnection()最终会调用TcpConnection::connectDestroyed()来销毁连接资源。

void TcpConnection::connectDestroyed()
{if(state_ == kConnected){setState(kDisconnected);channel_->disableAll(); //把channel所有感兴趣的事件,从poller中del掉connectionCallback_(shared_from_this());}channel_->remove();//把channel从poller中删除掉
}

只有处于已连接状态(kConnected)的tcp连接, 才需要先更新状态, 关闭通道事件监听。

  • 错误处理回调
void TcpConnection::handleError()
{int optval;socklen_t optlen = sizeof optval;int err = 0;if(::getsockopt(channel_->fd(),SOL_SOCKET,SO_ERROR,&optval,&optlen) < 0){err = errno;        }else{err = optval;}LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n",name_.c_str(),err);
}

getsockopt()函数用于获取任意类型、任意状态套接口的选项当前值,并把结果存入optval,最后输出错误日志。

  • 关闭连接
 //关闭连接
void TcpConnection::shutdown()
{if(state_ == kConnected){setState(kDisconnecting);loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop,this));}
}void TcpConnection::shutdownInLoop()
{if(!channel_->isWriting()) //说明当前outputBuffer中的数据已经全部发送完成{socket_->shutdowmWrite(); // 关闭写端}
}

注意: 为什么是关闭了写端呢?在TcpConnection::shutdownInLoop()中,我们会发现它调用了Socket的shutdowmWrite,这里并没有使用close,陈硕大佬原话是这样的:Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。因为TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。
Muduo把“主动关闭连接”这件事分成两步来做,如果要主动关闭连接,它先关闭本地的“写”端,等对方关闭之后,再关闭本地“读”端。
另外如果当前outputbuffer里面还有数据尚未发出的话,Muduo也不会立刻调用shutwownWrite,而是等到数据发送完毕再shutdown,可以避免对方漏收数据。

关闭连接事件很重要,涉及到TcpConnection和Channel的生命周期以及是否能合理销毁,用了智能指针来管理和控制生命周期。下面我们就来分析一下断开流程中**TcpConnection的引用计数问题**:

1.首先连接到来创建TcpConnection,并存入容器。引用计数+1 总数:1
2.客户端断开连接,在Channel的handleEvent函数中会将Channel中的TcpConnection弱指针提升,引用计数+1 总数:2
3.触发HandleRead ,可读字节0,进而触发HandleClose,HandleClose函数中栈上的TcpConnectionPtr guardThis会继续将引用计数+1 总数:3
4.触发HandleClose的回调函数 在TcpServer::removeConnection结束后(回归主线程队列),释放HandleClose的栈指针,以及Channel里提升的指针引用计数-2 总数:1
5.主线程执行回调removeConnectionInLoop,在函数内部将tcpconnection从TcpServer中保存连接容器中erase掉。但在removeConnectionInLoop结尾用conn为参数构造了bind。引用计数不变 总数:1
6.回归次线程处理connectDestroyed事件,结束完释放参数传递的最后一个shard_ptr,释放TcpConnection。引用计数-1 总数:0

  • 建立连接
void TcpConnection::connectEstablished()
{setState(kConnected);channel_->tie(shared_from_this());channel_->enableReading(); //向poller注册channel的epollin事件//新连接建立 执行回调connectionCallback_(shared_from_this());
}
思考一下:什么时候调用WriteCompleCallback?什么时候调用HighWaterMarkCallback?
  • 如果发送缓存区被清空,就调用WriteCompleCallback。TcpConnection有两处可能触发此回调。
  1. TcpConnection::sendInLoop()。
  2. TcpConnection::handleWrite()。
  • 如果输出缓冲的长度超过用户指定大小,就会触发回调HighWaterMarkCallback(只在上升沿触发一次)。
    在非阻塞的发送数据情况下,假设Server发给Client数据流,为防止Server发过来的数据撑爆Client的输出缓冲区,一种做法是在Client的HighWaterMarkCallback中停止读取Server的数据,而在Client的WriteCompleteCallback中恢复读取Server的数据
需要注意的是: TcpConnection类是唯一默认用shared_ptr来管理的类,唯一继承自enable_shared_from_this的类。这是因为其生命周期模糊:可能在连接断开时,还有其他地方持有它的引用,贸然delete会造成空悬指针。只有确保其他地方没有持有该对象的引用的时候,才能安全地销毁对象。

代码地址:https://github.com/Cheeron955/mymuduo/tree/master

muduoTcpConnectionchannelTcpConnectionChannelPollerEPollPollerEventLoopAcceptorSocketBufferTcpConnectionCurrentThreadDefaultPollerLoggerTimeStamp_344">好了~ 有关于muduo库的TcpConnection类的细节就到此结束了,再次强调一点,当channel有相应的事件的时候,会调用TcpConnection对应的回调。到此为止,我们介绍了Channel,Poller,EPollPoller,EventLoop,Acceptor,Socket,Buffer,TcpConnection八大类,还有单独的CurrentThread,DefaultPoller,接下来我们会剖析一下剩下的,比如Logger,TimeStamp等比较简单的,然后最后在从连接建立,断开,发送消息等过程在进行一个总结剖析,希望大家多多支持,我们下一节见~~

http://www.ppmy.cn/embedded/40934.html

相关文章

如何使用 iOS系统恢复软件修复 iPhone 问题

苹果公司向世界推出了他们可以拥有的最智能的手机。但即使是 iPhone 也无法避免智能手机常见的损坏和问题。您将熟悉最常见的问题。屏幕黑屏或卡在 Apple 徽标上&#xff1b;冻结或卡在恢复模式的 iPhone。但这样的问题不胜枚举&#xff0c;每天都有 iOS 用户在他们的设备中遇到…

大模型LLM之SFT微调总结

一. SFT微调是什么 在大模型的加持下现有的语义理解系统的效果有一个质的飞跃&#xff1b;相对于之前的有监督的Pre-Train模型&#xff1b;大模型在某些特定的任务中碾压式的超过传统nlp效果&#xff1b;由于常见的大模型参数量巨大&#xff1b;在实际工作中很难直接对大模型训…

K8S集群Etcd数据备份/恢复

前言&#xff1a; kubernetes使用etcd数据库实时存储集群中的数据&#xff0c;安全起见&#xff0c;一定要备份。 一、安装etcdctl 1、查看使用Etcd版本 rootmaster:~# cat /etc/kubernetes/manifests/etcd.yaml | grep image: | awk {print $2} registry.aliyuncs.com/goo…

Python sqlite3库 实现 数据库基础及应用-数据维护:请编写一个具有新增和查询两种功能的程序

目录 【第12次课】实验十数据库基础及应用2-数据维护 程序代码&#xff1a; 程序运行举例: 【第12次课】实验十数据库基础及应用2-数据维护 数据库文件有两张表&#xff0c;其中“成绩表”有三个字段:学号、课程名、成绩。请编写一个具有 新增和查询两种功能的程序: (1)当输…

M-有效算法

在赛场上&#xff0c;脑子就两个字“二分”&#xff0c;一点思路都没&#xff0c;完全不知道二分谁&#xff0c;怎么二分&#xff0c;从哪入手。隐隐约约也知道要变换公式&#xff0c;可惜没坚持这个想法。脑子里全是把k分离出来&#xff0c;赛后看了题解才知道&#xff0c;应该…

windows 10安装 docker desktop

升级 windows 10 windows 10 升级到 20H2&#xff0c;如 20H2 19045.4291。 注意&#xff1a;需返回更新&#xff0c;重启计算机&#xff0c;确保更新完整。 bios 开启虚拟化 开启cpu虚拟化功能。 windows 启用功能 启用hyper-v 启用 wsl 安装 wsl https://learn.microso…

CSS常用滤镜效果

CSS 提供了多种滤镜效果&#xff0c;可以通过 filter 属性应用于 HTML 元素。以下是一些常用的 CSS 滤镜效果&#xff1a; 一、灰度 (Grayscale) 将图像转换为灰度图像。值在 0%&#xff08;原始图像&#xff09;和 100%&#xff08;完全灰度&#xff09;之间。 filter: gra…

SpringMVC 中的常用注解和用法

Component&#xff1a;通用的组件注解&#xff0c;标识一个类为 Spring 组件&#xff0c;会被自动扫描并创建 Bean。(工具类&#xff09;Repository&#xff1a;表示持久层的注解&#xff0c;用于标识数据访问组件。&#xff08;和数据交互&#xff09;Service&#xff1a;表示…