Zinx框架学习 - 消息封装

news/2024/12/27 22:20:04/

Zinx - V0.5 消息封装

  • 之前我们使用Request来保存服务器的数据,很显然使用[]byte来接收数据,没有长度也没有消息类型,接下来就要针对这个消息进行封装

创建消息类型

定义一个基本的message包,会包含消息ID、数据、数据长度三个成员,并提供基本的setter和getter方法

imssage.go接口

package ziface//将请求的消息封装到一个Message中, 定义抽象的接口
type IMessage interface {//获取消息的IDGetMsgId() uint32//获取消息的长度GetMsgLen() uint32//获取消息的内容GetData() []byte//设置消息的IDSetMsgId(uint32)//设置消息的内容SetData([]byte)//设置消息的长度SetDataLen(uint32)
}

message.go实现类

package znettype Message struct {Id      uint32 //消息的IDDataLen uint32 //消息的长度Data    []byte //消息的内容
}//创建一个Message消息包
func NewMsgPackage(id uint32, data []byte) *Message {return &Message{Id:      id,DataLen: uint32(len(data)),Data:    data,}
}//获取消息的ID
func (m *Message) GetMsgId() uint32 {return m.Id
}//获取消息的长度
func (m *Message) GetMsgLen() uint32 {return m.DataLen
}//获取消息的内容
func (m *Message) GetData() []byte {return m.Data
}//设置消息的ID
func (m *Message) SetMsgId(id uint32) {m.Id = id
}//设置消息的内容
func (m *Message) SetData(data []byte) {m.Data = data
}//设置消息的长度
func (m *Message) SetDataLen(len uint32) {m.DataLen = len
}

消息的粘包

这里我们使用 TLV (Type-Len-Value) 封包格式解决TCP粘包问题,由于Zinx也是TCP流的形式传播数据,难免会出现消息1和消息2⼀同发送,那么zinx就需要有能⼒区分两个消息的边界,所以Zinx此时应该提供⼀个统⼀的拆包和封包的⽅法。

  • 封包:在发包之前打包成如上图这种格式的有head和body的两部分的包
  • 拆包:在收到数据的时候分两次进行读取,先读取固定长度的head部分,得到后续Data的长度,再根据DataLen读取之后的body。

封包拆包的实现

 需要注意的是,封包针对的是IMessage数据来封,返回的是二进制序列化后的结果,即把message结构体变成二进制序列化的数据,拆包针对的是二进制的数据流,得到的是IMessage数据,即把二进制序列化的数据变成message结构体

idatapack.go

package ziface//封包、拆包 模块
//直接面向TCP连接中的数据流, 用于处理TCP粘包问题
type IDataPack interface {//获取包的头的长度方法GetHeadLen() uint32//封包方法Pack(msg IMessage) ([]byte, error)//拆包方法Unpack([]byte) (IMessage, error)
}

datapack.go

package znetimport ("bytes""encoding/binary""errors""zinx/utils""zinx/ziface"
)//封包,拆包的具体模块
type DataPack struct{}//拆包封包实例的一个初始化方法
func NewDataPack() *DataPack {return &DataPack{}
}//获取包的头的长度方法
func (dp *DataPack) GetHeadLen() uint32 {//Datalen uint32(4字节) + ID uint32(4字节)return 8
}//封包方法
//|datelen|msgID|data|
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {//创建一个存放bytes字节的缓冲dataBuff := bytes.NewBuffer([]byte{})//将dataLen写进databuff中if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgLen()); err != nil {return nil, err}//将MsgId 写进databuff中if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {return nil, err}//将data数据 写进databuff中if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {return nil, err}return dataBuff.Bytes(), nil
}//拆包方法 (将包的Head信息都出来) 之后再根据head信息里的data的长度,再进行一次读
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {//创建一个从输入二进制数据的ioReaderdataBuff := bytes.NewReader(binaryData)//只解压head信息,得到datalen和MsgIDmsg := &Message{}//读dataLenif err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {return nil, err}//读MsgIDif err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {return nil, err}//判断datalen是否已经超出了我们允许的最大包长度if utils.GlobalObject.MaxPackageSize > 0 && msg.DataLen > utils.GlobalObject.MaxPackageSize {return nil, errors.New("too Large msg data recv!")}return msg, nil
}

注意这里的unpack()只读取了datalen和MsgID,并没有读取data

单元测试

  • 单元测试思路

模拟服务器创建socketTCP,并使用go协程承载从客户端读取数据、进行拆包处理的业务

//只是负责测试datapack拆包 封包的单元测试
func TestDataPack(t *testing.T) {//模拟的服务器//1 创建socketTCPlistenner, err := net.Listen("tcp", "127.0.0.1:7777")if err != nil {fmt.Println("server listen err: ", err)return}//创建一个go 承载 负责从客户端处理业务go func() {//2 从客户端读取数据,拆包处理for {conn, err := listenner.Accept()if err != nil {fmt.Println("server accept error", err)}go func(conn net.Conn) {//处理客户端的请求//------> 拆包的过程 <------//定义一个拆包的对象dpdp := NewDataPack()for {// 1第一次从conn读, 把包的head读出来headData := make([]byte, dp.GetHeadLen())_, err := io.ReadFull(conn, headData)if err != nil {fmt.Println("read head error")break}msgHead, err := dp.Unpack(headData)if err != nil {fmt.Println("server unpacke err ", err)return}if msgHead.GetMsgLen() > 0 {//msg是有数据的, 需要进行第二次读取//2 第二次从conn读, 根据head中的datalen 再读取data内容msg := msgHead.(*Message)msg.Data = make([]byte, msg.GetMsgLen())//根据datalen的长度再次从io流中读取_, err := io.ReadFull(conn, msg.Data)if err != nil {fmt.Println("server unpack data err: ", err)return}//完整的一个消息已经读取完毕fmt.Println("---> Recv MsgID: ", msg.Id, ", datalen = ", msg.DataLen, "data = ", string(msg.Data))}}}(conn)}}()

从客户端读取数据进行拆包处理的业务需要用for循环进行阻塞,等待客户端的连接,然后开启一个协程处理客户端请求并将conn作为形参传入,这个协程实际上是承载的拆包的业务,进行两次读取,第一次把包的head读出来,第二次根据head中的dataLen读取data内容。我们首先定义一个拆包对象,这个对象提供拆包方法,同时读包也是用一个for循环去读,第一次将包的head二进制流读出来后要将其封装到message中,即调用unpack()方法,返回一个只有dataLen和MsgID两个字段的结构体。此时就可以进行第二次读取了,我们想要将第二次读取的数据添加到结构体的data字段里,但需要注意的是此时的msgHead是imessage接口类型,没有data字段,所以我们需要进行类型断言将其转换为message结构体类型

模拟客户端启动go程序,将两个包组合发送模拟粘包

//模拟客户端conn, err := net.Dial("tcp", "127.0.0.1:7777")if err != nil {fmt.Println("client dial err: ", err)return}//创建一个封包对象 dpdp := NewDataPack()//模拟粘包过程,封装两个msg一同发送//封装第一个msg1包msg1 := &Message{Id:      1,DataLen: 4,Data:    []byte{'z', 'i', 'n', 'x'},}sendData1, err := dp.Pack(msg1)if err != nil {fmt.Println("client pack msg1 error", err)return}//封装第二个msg2包msg2 := &Message{Id:      2,DataLen: 7,Data:    []byte{'n', 'i', 'h', 'a', 'o', '!', '!'},}sendData2, err := dp.Pack(msg2)if err != nil {fmt.Println("client pack msg1 error", err)return}//将两个包粘在一起sendData1 = append(sendData1, sendData2...)//一次性发送给服务端conn.Write(sendData1)//客户端阻塞select {}

创建一个封包对象,封装两个message一同发送,发送完之后主进程不能结束,要等待客户端返回,否则发送完进程结束客户端就销毁了,我们使用select阻塞

消息封装集成到Zinx框架

  •  将Message添加到Request属性中
package ziface//IReqeust接口:
//实际上是把客户端请求的链接信息, 和 请求的数据 包装到了一个Request中
type IRequest interface {//得到当前链接GetConnection() IConneciton//得到请求的消息数据GetData() []byte//得到请求的消息IDGetMsgID() uint32
}
package znetimport ("zinx/ziface"
)type Request struct {//已经和客户端建立好的链接conn ziface.IConneciton//客户端请求的数据msg ziface.IMessage
}//得到当前链接
func (r *Request) GetConnection() ziface.IConneciton {return r.conn
}//得到请求的消息数据
func (r *Request) GetData() []byte {return r.msg.GetData()
}func (r *Request) GetMsgID() uint32 {return r.msg.GetMsgId()
}
  • 修改链接读取数据的机制,将之前的单纯的读取byte改成拆包形式的读取按照TLV形式读取

首先还是创建一个拆包解包对象,读取客户端的Msg Head二级制流8个字节,然后拆包,得到msgID和msgDatalen并放在msg消息中,最后根据dataLen再次读取Data,放在msg.Data中

//链接的读业务方法
func (c *Connection) StartReader() {fmt.Println(" Reader Goroutine is running...")defer fmt.Println("connID = ", c.ConnID, " Reader is exit, remote addr is ", c.RemoteAddr().String())defer c.Stop()for {//读取客户端的数据到buf中//buf := make([]byte, utils.GlobalObject.MaxPackageSize)//_, err := c.Conn.Read(buf)//if err != nil {//	fmt.Println("recv buf err", err)//	continue//}//创建一个拆包解包对象dp := NewDataPack()//读取客户端的Msg Head 二级制流 8个字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head error", err)break}//拆包,得到msgID 和 msgDatalen 放在msg消息中msg, err := dp.Unpack(headData)if err != nil {fmt.Println("unpack error", err)break}//根据dataLen  再次读取Data, 放在msg.Data中var data []byteif msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data error ", err)break}}msg.SetData(data)//得到当前conn数据的Request请求数据req := Request{conn: c,msg:  msg,}//执行注册的路由方法go func(request ziface.IRequest) {c.Router.PreHandle(request)c.Router.Handle(request)c.Router.PostHandle(request)}(&req)//从路由中,找到注册绑定的Conn对应的router调用}
}
  • 给链接提供一个发包机制: 将发送的消息进行打包,再发送

修改StartReader方法

//链接的读业务方法
func (c *Connection) StartReader() {fmt.Println(" Reader Goroutine is running...")defer fmt.Println("connID = ", c.ConnID, " Reader is exit, remote addr is ", c.RemoteAddr().String())defer c.Stop()for {//读取客户端的数据到buf中//buf := make([]byte, utils.GlobalObject.MaxPackageSize)//_, err := c.Conn.Read(buf)//if err != nil {//	fmt.Println("recv buf err", err)//	continue//}//创建一个拆包解包对象dp := NewDataPack()//读取客户端的Msg Head 二级制流 8个字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head error", err)break}//拆包,得到msgID 和 msgDatalen 放在msg消息中msg, err := dp.Unpack(headData)if err != nil {fmt.Println("unpack error", err)break}//根据dataLen  再次读取Data, 放在msg.Data中var data []byteif msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data error ", err)break}}msg.SetData(data)//得到当前conn数据的Request请求数据req := Request{conn: c,msg:  msg,}//执行注册的路由方法go func(request ziface.IRequest) {c.Router.PreHandle(request)c.Router.Handle(request)c.Router.PostHandle(request)}(&req)//从路由中,找到注册绑定的Conn对应的router调用}
}

添加SendMsg方法

package zifaceimport "net"//定义链接模块的抽象层
type IConneciton interface {//启动链接 让当前的链接准备开始工作Start()//停止链接 结束当前链接的工作Stop()//获取当前链接的绑定socket connGetTCPConnection() *net.TCPConn//获取当前链接模块的链接IDGetConnID() uint32//获取远程客户端的 TCP状态 IP portRemoteAddr() net.Addr//发送数据, 将数据发送给远程的客户端SendMsg(msgId uint32, data []byte) error
}//定义一个处理链接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error

实现SendMsg方法

//提供一个SendMsg方法 将我们要发送给客户端的数据,先进行封包,再发送
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send msg")}//将data进行封包 MsgDataLen|MsgID|Datadp := NewDataPack()//MsgDataLen|MsgID|DatabinaryMsg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("Write msg id ", msgId, " error :", err)return errors.New("conn Write error")}return nil
}

Zinx框架开发

server.go

package mainimport ("fmt""zinx/ziface""zinx/znet"
)//基于Zinx框架来开发的 服务器端应用程序
//ping test 自定义路由
type PingRouter struct {znet.BaseRouter
}//Test Handle
func (this *PingRouter) Handle(request ziface.IRequest) {fmt.Println("Call Router Handle...")//先读取客户端的数据,再回写ping..ping...pingfmt.Println("recv from client: msgID = ", request.GetMsgID(),", data = ", string(request.GetData()))err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping"))if err != nil {fmt.Println(err)}
}func main() {//1 创建一个server句柄,使用Zinx的apis := znet.NewServer("[zinx V0.5]")//2 给当前zinx框架添加一个自定义的routers.AddRouter(&PingRouter{})//3 启动servers.Serve()
}

client.go

package mainimport ("fmt""io""net""time""zinx/znet"
)//模拟客户端
func main() {fmt.Println("client start...")time.Sleep(1 * time.Second)//1 直接链接远程服务器,得到一个conn链接conn, err := net.Dial("tcp", "127.0.0.1:8999")if err != nil {fmt.Println("client start err, exit!")return}for {//发送封包的message消息  MsgID:0dp := znet.NewDataPack()binaryMsg, err := dp.Pack(znet.NewMsgPackage(0, []byte("ZinxV0.5 client Test Message")))if err != nil {fmt.Println("Pack error:", err)return}if _, err := conn.Write(binaryMsg); err != nil {fmt.Println("write error", err)return}//服务器就应该给我们回复一个message数据, MsgID:1 pingpingping// 1 先读取流中的head部分 得到ID 和 dataLenbinaryHead := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(conn, binaryHead); err != nil {fmt.Println("read head error ", err)break}// 将二进制的head拆包到msg 结构体中msgHead, err := dp.Unpack(binaryHead)if err != nil {fmt.Println("client unpack msgHead error ", err)break}if msgHead.GetMsgLen() > 0 {// 2再根据DataLen进行第二次读取,将data读出来msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(conn, msg.Data); err != nil {fmt.Println("read msg data error , ", err)return}fmt.Println("---> Recv Server Msg : ID = ", msg.Id, ", len = ", msg.DataLen, ", data = ", string(msg.Data))}//cpu阻塞time.Sleep(1 * time.Second)}
}


http://www.ppmy.cn/news/191153.html

相关文章

CoreDX DDS应用开发指南(1)简介

1 简介 Twin Oaks Computing,Inc.设计、开发和提供了CoreDX DDS中间件。CoreDX DDS,是OMG数据分发服务(DDS)标准的高性能、微小型体积的实现。CoreDX DDS以数据为中心的发布-订阅(Data-Centric, Publish-Subscribe)消息传递基础设施提供了高吞吐量、低延迟的数据通信。 本…

20230603-周六随笔

周六闲来无事&#xff0c;给新电脑装下开发环境&#xff0c;记录一下遇到的问题 git下载代码报错 报错1&#xff1a;schannel: SEC_E_UNTRUSTED_ROOT (0x80090325)解决方法&#xff1a;执行git config --system http.sslbackend openssl命令 报错2&#xff1a;SSL certifica…

6年测试经验之谈,为什么要做自动化测试?

一、自动化测试 自动化测试是把以人为驱动的测试行为转化为机器执行的一种过程。 个人认为&#xff0c;只要能服务于测试工作&#xff0c;能够帮助我们提升工作效率的&#xff0c;不管是所谓的自动化工具&#xff0c;还是简单的SQL 脚本、批处理脚本&#xff0c;还是自己编写…

【SCADA】关于KingSCADA仿真驱动的应用

大家好&#xff0c;我是雷工&#xff01; 在有些时候我们需要用到虚拟仿真的数据&#xff0c;例如在效果演示时为了有良好的动态效果。在KingSCADA软件中可以通过Simulate驱动作为虚拟设备实现这一功能需求。 下面为大家演示该功能的应用&#xff1a; 一、KingIOServer工程设计…

早上用剃须刀刮胡子划破皮肤怎么办?男士们请看过来!

在日常生活中&#xff0c;男士必备的生活用品有哪些呢?我想最重要的应该就是——剃须刀&#xff0c;但是&#xff0c;很多粗心大意的男士有时就会不小心划破了皮肤&#xff0c;此时&#xff0c;我们有什么解决方法呢? 刮胡子不小心划破皮肤的解决方法 1、茶袋 可以使用冷掉的…

双刃剃须刀行业调研报告 - 市场现状分析与发展前景预测

双刃剃须刀市场的企业竞争态势 该报告涉及的主要国际市场参与者有Gillette、Edgewell、BIC、Supermax、Lord、Malhotra、Benxi Jincheng、SRBIL、Treet、Feather、Feintechnik、AccuTec Blades、Kaili Razor、Shanghai Cloud、Yingjili等。这些参与者的市场份额、收入、公司概况…

FreeRTOS多任务系统

FreeRTOS 文章目录 FreeRTOS1 单任务和多任务系统1.1 单任务系统1.2 多任务系统 2 FreeRTOS 任务状态3 FreeRTOS 任务优先级4 Free RTOS 任务调度方式4.1 抢占式调度4.2 时间片调度 5 FreeRTOS 任务控制块6 FreeRTOS 任务栈 1 单任务和多任务系统 1.1 单任务系统 单任务系统的…

2022年全球市场干湿两用电动剃须刀总体规模、主要生产商、主要地区、产品和应用细分研究报告

本文研究全球市场、主要地区和主要国家干湿两用电动剃须刀的销量、销售收入等&#xff0c;同时也重点分析全球范围内主要厂商&#xff08;品牌&#xff09;竞争态势&#xff0c;干湿两用电动剃须刀销量、价格、收入和市场份额等。针对过去五年&#xff08;2017-2021&#xff09…