Zinx - V0.5 消息封装
- 之前我们使用Request来保存服务器的数据,很显然使用[]byte来接收数据,没有长度也没有消息类型,接下来就要针对这个消息进行封装
创建消息类型
定义一个基本的message包,会包含消息ID、数据、数据长度三个成员,并提供基本的setter和getter方法
imssage.go接口
package ziface//将请求的消息封装到一个Message中, 定义抽象的接口
type IMessage interface {//获取消息的IDGetMsgId() uint32//获取消息的长度GetMsgLen() uint32//获取消息的内容GetData() []byte//设置消息的IDSetMsgId(uint32)//设置消息的内容SetData([]byte)//设置消息的长度SetDataLen(uint32)
}
message.go实现类
package znettype Message struct {Id uint32 //消息的IDDataLen uint32 //消息的长度Data []byte //消息的内容
}//创建一个Message消息包
func NewMsgPackage(id uint32, data []byte) *Message {return &Message{Id: id,DataLen: uint32(len(data)),Data: data,}
}//获取消息的ID
func (m *Message) GetMsgId() uint32 {return m.Id
}//获取消息的长度
func (m *Message) GetMsgLen() uint32 {return m.DataLen
}//获取消息的内容
func (m *Message) GetData() []byte {return m.Data
}//设置消息的ID
func (m *Message) SetMsgId(id uint32) {m.Id = id
}//设置消息的内容
func (m *Message) SetData(data []byte) {m.Data = data
}//设置消息的长度
func (m *Message) SetDataLen(len uint32) {m.DataLen = len
}
消息的粘包
这里我们使用 TLV (Type-Len-Value) 封包格式解决TCP粘包问题,由于Zinx也是TCP流的形式传播数据,难免会出现消息1和消息2⼀同发送,那么zinx就需要有能⼒区分两个消息的边界,所以Zinx此时应该提供⼀个统⼀的拆包和封包的⽅法。
- 封包:在发包之前打包成如上图这种格式的有head和body的两部分的包
- 拆包:在收到数据的时候分两次进行读取,先读取固定长度的head部分,得到后续Data的长度,再根据DataLen读取之后的body。
封包拆包的实现
需要注意的是,封包针对的是IMessage数据来封,返回的是二进制序列化后的结果,即把message结构体变成二进制序列化的数据,拆包针对的是二进制的数据流,得到的是IMessage数据,即把二进制序列化的数据变成message结构体
idatapack.go
package ziface//封包、拆包 模块
//直接面向TCP连接中的数据流, 用于处理TCP粘包问题
type IDataPack interface {//获取包的头的长度方法GetHeadLen() uint32//封包方法Pack(msg IMessage) ([]byte, error)//拆包方法Unpack([]byte) (IMessage, error)
}
datapack.go
package znetimport ("bytes""encoding/binary""errors""zinx/utils""zinx/ziface"
)//封包,拆包的具体模块
type DataPack struct{}//拆包封包实例的一个初始化方法
func NewDataPack() *DataPack {return &DataPack{}
}//获取包的头的长度方法
func (dp *DataPack) GetHeadLen() uint32 {//Datalen uint32(4字节) + ID uint32(4字节)return 8
}//封包方法
//|datelen|msgID|data|
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {//创建一个存放bytes字节的缓冲dataBuff := bytes.NewBuffer([]byte{})//将dataLen写进databuff中if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgLen()); err != nil {return nil, err}//将MsgId 写进databuff中if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {return nil, err}//将data数据 写进databuff中if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {return nil, err}return dataBuff.Bytes(), nil
}//拆包方法 (将包的Head信息都出来) 之后再根据head信息里的data的长度,再进行一次读
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {//创建一个从输入二进制数据的ioReaderdataBuff := bytes.NewReader(binaryData)//只解压head信息,得到datalen和MsgIDmsg := &Message{}//读dataLenif err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {return nil, err}//读MsgIDif err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {return nil, err}//判断datalen是否已经超出了我们允许的最大包长度if utils.GlobalObject.MaxPackageSize > 0 && msg.DataLen > utils.GlobalObject.MaxPackageSize {return nil, errors.New("too Large msg data recv!")}return msg, nil
}
注意这里的unpack()只读取了datalen和MsgID,并没有读取data
单元测试
- 单元测试思路
模拟服务器创建socketTCP,并使用go协程承载从客户端读取数据、进行拆包处理的业务
//只是负责测试datapack拆包 封包的单元测试
func TestDataPack(t *testing.T) {//模拟的服务器//1 创建socketTCPlistenner, err := net.Listen("tcp", "127.0.0.1:7777")if err != nil {fmt.Println("server listen err: ", err)return}//创建一个go 承载 负责从客户端处理业务go func() {//2 从客户端读取数据,拆包处理for {conn, err := listenner.Accept()if err != nil {fmt.Println("server accept error", err)}go func(conn net.Conn) {//处理客户端的请求//------> 拆包的过程 <------//定义一个拆包的对象dpdp := NewDataPack()for {// 1第一次从conn读, 把包的head读出来headData := make([]byte, dp.GetHeadLen())_, err := io.ReadFull(conn, headData)if err != nil {fmt.Println("read head error")break}msgHead, err := dp.Unpack(headData)if err != nil {fmt.Println("server unpacke err ", err)return}if msgHead.GetMsgLen() > 0 {//msg是有数据的, 需要进行第二次读取//2 第二次从conn读, 根据head中的datalen 再读取data内容msg := msgHead.(*Message)msg.Data = make([]byte, msg.GetMsgLen())//根据datalen的长度再次从io流中读取_, err := io.ReadFull(conn, msg.Data)if err != nil {fmt.Println("server unpack data err: ", err)return}//完整的一个消息已经读取完毕fmt.Println("---> Recv MsgID: ", msg.Id, ", datalen = ", msg.DataLen, "data = ", string(msg.Data))}}}(conn)}}()
从客户端读取数据进行拆包处理的业务需要用for循环进行阻塞,等待客户端的连接,然后开启一个协程处理客户端请求并将conn作为形参传入,这个协程实际上是承载的拆包的业务,进行两次读取,第一次把包的head读出来,第二次根据head中的dataLen读取data内容。我们首先定义一个拆包对象,这个对象提供拆包方法,同时读包也是用一个for循环去读,第一次将包的head二进制流读出来后要将其封装到message中,即调用unpack()方法,返回一个只有dataLen和MsgID两个字段的结构体。此时就可以进行第二次读取了,我们想要将第二次读取的数据添加到结构体的data字段里,但需要注意的是此时的msgHead是imessage接口类型,没有data字段,所以我们需要进行类型断言将其转换为message结构体类型
模拟客户端启动go程序,将两个包组合发送模拟粘包
//模拟客户端conn, err := net.Dial("tcp", "127.0.0.1:7777")if err != nil {fmt.Println("client dial err: ", err)return}//创建一个封包对象 dpdp := NewDataPack()//模拟粘包过程,封装两个msg一同发送//封装第一个msg1包msg1 := &Message{Id: 1,DataLen: 4,Data: []byte{'z', 'i', 'n', 'x'},}sendData1, err := dp.Pack(msg1)if err != nil {fmt.Println("client pack msg1 error", err)return}//封装第二个msg2包msg2 := &Message{Id: 2,DataLen: 7,Data: []byte{'n', 'i', 'h', 'a', 'o', '!', '!'},}sendData2, err := dp.Pack(msg2)if err != nil {fmt.Println("client pack msg1 error", err)return}//将两个包粘在一起sendData1 = append(sendData1, sendData2...)//一次性发送给服务端conn.Write(sendData1)//客户端阻塞select {}
创建一个封包对象,封装两个message一同发送,发送完之后主进程不能结束,要等待客户端返回,否则发送完进程结束客户端就销毁了,我们使用select阻塞
消息封装集成到Zinx框架
- 将Message添加到Request属性中
package ziface//IReqeust接口:
//实际上是把客户端请求的链接信息, 和 请求的数据 包装到了一个Request中
type IRequest interface {//得到当前链接GetConnection() IConneciton//得到请求的消息数据GetData() []byte//得到请求的消息IDGetMsgID() uint32
}
package znetimport ("zinx/ziface"
)type Request struct {//已经和客户端建立好的链接conn ziface.IConneciton//客户端请求的数据msg ziface.IMessage
}//得到当前链接
func (r *Request) GetConnection() ziface.IConneciton {return r.conn
}//得到请求的消息数据
func (r *Request) GetData() []byte {return r.msg.GetData()
}func (r *Request) GetMsgID() uint32 {return r.msg.GetMsgId()
}
- 修改链接读取数据的机制,将之前的单纯的读取byte改成拆包形式的读取按照TLV形式读取
首先还是创建一个拆包解包对象,读取客户端的Msg Head二级制流8个字节,然后拆包,得到msgID和msgDatalen并放在msg消息中,最后根据dataLen再次读取Data,放在msg.Data中
//链接的读业务方法
func (c *Connection) StartReader() {fmt.Println(" Reader Goroutine is running...")defer fmt.Println("connID = ", c.ConnID, " Reader is exit, remote addr is ", c.RemoteAddr().String())defer c.Stop()for {//读取客户端的数据到buf中//buf := make([]byte, utils.GlobalObject.MaxPackageSize)//_, err := c.Conn.Read(buf)//if err != nil {// fmt.Println("recv buf err", err)// continue//}//创建一个拆包解包对象dp := NewDataPack()//读取客户端的Msg Head 二级制流 8个字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head error", err)break}//拆包,得到msgID 和 msgDatalen 放在msg消息中msg, err := dp.Unpack(headData)if err != nil {fmt.Println("unpack error", err)break}//根据dataLen 再次读取Data, 放在msg.Data中var data []byteif msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data error ", err)break}}msg.SetData(data)//得到当前conn数据的Request请求数据req := Request{conn: c,msg: msg,}//执行注册的路由方法go func(request ziface.IRequest) {c.Router.PreHandle(request)c.Router.Handle(request)c.Router.PostHandle(request)}(&req)//从路由中,找到注册绑定的Conn对应的router调用}
}
- 给链接提供一个发包机制: 将发送的消息进行打包,再发送
修改StartReader方法
//链接的读业务方法
func (c *Connection) StartReader() {fmt.Println(" Reader Goroutine is running...")defer fmt.Println("connID = ", c.ConnID, " Reader is exit, remote addr is ", c.RemoteAddr().String())defer c.Stop()for {//读取客户端的数据到buf中//buf := make([]byte, utils.GlobalObject.MaxPackageSize)//_, err := c.Conn.Read(buf)//if err != nil {// fmt.Println("recv buf err", err)// continue//}//创建一个拆包解包对象dp := NewDataPack()//读取客户端的Msg Head 二级制流 8个字节headData := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {fmt.Println("read msg head error", err)break}//拆包,得到msgID 和 msgDatalen 放在msg消息中msg, err := dp.Unpack(headData)if err != nil {fmt.Println("unpack error", err)break}//根据dataLen 再次读取Data, 放在msg.Data中var data []byteif msg.GetMsgLen() > 0 {data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {fmt.Println("read msg data error ", err)break}}msg.SetData(data)//得到当前conn数据的Request请求数据req := Request{conn: c,msg: msg,}//执行注册的路由方法go func(request ziface.IRequest) {c.Router.PreHandle(request)c.Router.Handle(request)c.Router.PostHandle(request)}(&req)//从路由中,找到注册绑定的Conn对应的router调用}
}
添加SendMsg方法
package zifaceimport "net"//定义链接模块的抽象层
type IConneciton interface {//启动链接 让当前的链接准备开始工作Start()//停止链接 结束当前链接的工作Stop()//获取当前链接的绑定socket connGetTCPConnection() *net.TCPConn//获取当前链接模块的链接IDGetConnID() uint32//获取远程客户端的 TCP状态 IP portRemoteAddr() net.Addr//发送数据, 将数据发送给远程的客户端SendMsg(msgId uint32, data []byte) error
}//定义一个处理链接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error
实现SendMsg方法
//提供一个SendMsg方法 将我们要发送给客户端的数据,先进行封包,再发送
func (c *Connection) SendMsg(msgId uint32, data []byte) error {if c.isClosed == true {return errors.New("Connection closed when send msg")}//将data进行封包 MsgDataLen|MsgID|Datadp := NewDataPack()//MsgDataLen|MsgID|DatabinaryMsg, err := dp.Pack(NewMsgPackage(msgId, data))if err != nil {fmt.Println("Pack error msg id = ", msgId)return errors.New("Pack error msg")}//将数据发送给客户端if _, err := c.Conn.Write(binaryMsg); err != nil {fmt.Println("Write msg id ", msgId, " error :", err)return errors.New("conn Write error")}return nil
}
Zinx框架开发
server.go
package mainimport ("fmt""zinx/ziface""zinx/znet"
)//基于Zinx框架来开发的 服务器端应用程序
//ping test 自定义路由
type PingRouter struct {znet.BaseRouter
}//Test Handle
func (this *PingRouter) Handle(request ziface.IRequest) {fmt.Println("Call Router Handle...")//先读取客户端的数据,再回写ping..ping...pingfmt.Println("recv from client: msgID = ", request.GetMsgID(),", data = ", string(request.GetData()))err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping"))if err != nil {fmt.Println(err)}
}func main() {//1 创建一个server句柄,使用Zinx的apis := znet.NewServer("[zinx V0.5]")//2 给当前zinx框架添加一个自定义的routers.AddRouter(&PingRouter{})//3 启动servers.Serve()
}
client.go
package mainimport ("fmt""io""net""time""zinx/znet"
)//模拟客户端
func main() {fmt.Println("client start...")time.Sleep(1 * time.Second)//1 直接链接远程服务器,得到一个conn链接conn, err := net.Dial("tcp", "127.0.0.1:8999")if err != nil {fmt.Println("client start err, exit!")return}for {//发送封包的message消息 MsgID:0dp := znet.NewDataPack()binaryMsg, err := dp.Pack(znet.NewMsgPackage(0, []byte("ZinxV0.5 client Test Message")))if err != nil {fmt.Println("Pack error:", err)return}if _, err := conn.Write(binaryMsg); err != nil {fmt.Println("write error", err)return}//服务器就应该给我们回复一个message数据, MsgID:1 pingpingping// 1 先读取流中的head部分 得到ID 和 dataLenbinaryHead := make([]byte, dp.GetHeadLen())if _, err := io.ReadFull(conn, binaryHead); err != nil {fmt.Println("read head error ", err)break}// 将二进制的head拆包到msg 结构体中msgHead, err := dp.Unpack(binaryHead)if err != nil {fmt.Println("client unpack msgHead error ", err)break}if msgHead.GetMsgLen() > 0 {// 2再根据DataLen进行第二次读取,将data读出来msg := msgHead.(*znet.Message)msg.Data = make([]byte, msg.GetMsgLen())if _, err := io.ReadFull(conn, msg.Data); err != nil {fmt.Println("read msg data error , ", err)return}fmt.Println("---> Recv Server Msg : ID = ", msg.Id, ", len = ", msg.DataLen, ", data = ", string(msg.Data))}//cpu阻塞time.Sleep(1 * time.Second)}
}