RESP协议解析
具体的协议说明可以参考第一篇文章协议说明, 下面我们直接开始协议的解析
Reply
在 interface/resp/reply.go 中定义一个顶级的回复实体接口,后续所有的协议类都将实现这个接口
// Reply 用于回复数据
type Reply interface {//ToBytes 将数据转换为Byte数组ToBytes() []byte
}
1. 通用响应
在 redis 中有许多的通用处理协议,例如:错误、正常响应、整数、数组等常用的实体,我们可以直接提前定义好,后续在解析协议的时候出现错误就可以直接使用
reply.go
在 resp/reply/reply.go 创建一个回复的文件,当前go文件中我们定义协议相关的实体结构体,例如:IntReply、StatusReply 等结构体,都实现了接口 Reply 的 ToBytes() 方法
var (nullBulkReplyBytes = []byte("$-1")CRLF = "\r\n" //结尾符号
)// BulkReply 单个字符串的回复体
type BulkReply struct {//响应数据,如果要回复一个 "hello world" , $11\r\nhello world\r\nArg []byte
}// ToBytes 自定义实体
func (b *BulkReply) ToBytes() []byte {if len(b.Arg) == 0 {return nullBulkReplyBytes}return []byte(fmt.Sprintf("$%d%s%s%s", len(b.Arg), CRLF, string(b.Arg), CRLF))
}// MakeBulkReply 创建一个实体
func MakeBulkReply(arg []byte) *BulkReply {return &BulkReply{Arg: arg,}
}// MultiBulkReply 多个字符串的回复
type MultiBulkReply struct {Args [][]byte
}func (m *MultiBulkReply) ToBytes() []byte {argLen := len(m.Args)var buf bytes.Bufferbuf.WriteString(fmt.Sprintf("*%d%s", argLen, CRLF))for _, arg := range m.Args {if arg == nil {buf.WriteString(string(nullBulkReplyBytes) + CRLF)}buf.WriteString(fmt.Sprintf("$%d%s%s%s", len(arg), CRLF, string(arg), CRLF))}return buf.Bytes()
}// MakeMultiBulkReply 创建一个实体
func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {return &MultiBulkReply{Args: args,}
}// StatusReply 状态回复
type StatusReply struct {Status string
}func (s *StatusReply) ToBytes() []byte {return []byte("+" + s.Status + CRLF)
}func MakeStatusReply(status string) *StatusReply {return &StatusReply{Status: status,}
}// IntReply 数字类型回复
type IntReply struct {Code int64
}func (i *IntReply) ToBytes() []byte {return []byte(":" + strconv.FormatInt(i.Code, 10) + CRLF)
}func MakeIntReply(code int64) *IntReply {return &IntReply{Code: code}
}// StandardErrReply 标准错误回复
type StandardErrReply struct {Status string
}func (s *StandardErrReply) ToBytes() []byte {return []byte("-" + s.Status + CRLF)
}func MakeStandardErrReply(status string) *StandardErrReply {return &StandardErrReply{Status: status}
}// IsErrReply 判断回复的数据是否是异常的信息
func IsErrReply(reply resp.Reply) bool {return reply.ToBytes()[0] == '-'
}// ErrorReply 错误回复,实现了两个接口
type ErrorReply interface {//Error 错误信息Error() string//ToBytes 转换字节数组ToBytes() []byte
}
error.go
在同目录下面我们创建一个错误回复的接口体定义文件,用于定义通用的错误接口体
/**通用异常信息
1. UnknownErrReply:未知错误
2. ArgNumErrReply:参数个数错误
3. SyntaxErrReply:语法错误
4. WrongTypeErrReply:错误类型
5. ProtocolErrReply:协议错误,不符合resp协议
*/// UnknownErrReply 未知错误
type UnknownErrReply struct{}var unKnownErrReply = []byte("-Err unknown\r\n")func (u *UnknownErrReply) Error() string {return "Err unknown"
}func (u *UnknownErrReply) ToBytes() []byte {return unKnownErrReply
}func MakeUnknownErrReply() *UnknownErrReply {return &UnknownErrReply{}
}// ArgNumErrReply 参数个数错误
type ArgNumErrReply struct {//Cmd 指令本身Cmd string
}func (a *ArgNumErrReply) Error() string {return fmt.Sprintf("Err wrong number of arguments for : %s, command", a.Cmd)
}func (a *ArgNumErrReply) ToBytes() []byte {return []byte(fmt.Sprintf("-Err wrong number of arguments for : %s, command\r\n", a.Cmd))
}func MakeArgNumErrReply(cmd string) *ArgNumErrReply {return &ArgNumErrReply{Cmd: cmd}
}// SyntaxErrReply 语法错误
type SyntaxErrReply struct{}var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = new(SyntaxErrReply)func (s *SyntaxErrReply) Error() string {return "Err syntax error"
}func (s *SyntaxErrReply) ToBytes() []byte {return syntaxErrBytes
}func MakeSyntaxErrReply() *SyntaxErrReply {return &SyntaxErrReply{}
}// WrongTypeErrReply 语法错误
type WrongTypeErrReply struct{}var wrongTypeErrBytes = []byte("-wrong type operation against a key holding the wrong kind of value\r\n")
var theWrongTypeErrReply = new(WrongTypeErrReply)func (w *WrongTypeErrReply) Error() string {return "-Err wrong type"
}func (w *WrongTypeErrReply) ToBytes() []byte {return wrongTypeErrBytes
}func MakeWrongTypeErrReply() *WrongTypeErrReply {return &WrongTypeErrReply{}
}// ProtocolErrReply 协议错误
type ProtocolErrReply struct {Msg string
}func (p *ProtocolErrReply) Error() string {return fmt.Sprintf("Error protocol error: %s\r\n", p.Msg)
}func (p *ProtocolErrReply) ToBytes() []byte {return []byte(fmt.Sprintf("-Error protocol error: %s\r\n", p.Msg))
}
consts.go
这个文件中主要定义了一些固定的回复格式
/**用于保存一些固定的回复信息格式
1. PongReply:心跳回复
2. OkReply:ok回复
3. NullBulkReply:空字符串回复
4. EmptyMultiBulkReply:空数组回复
5. NoReply:无数据
*/// PongReply 心跳回复
type PongReply struct{}// pongBytes 创建常量字节数组
var pongBytes = []byte("+PONG\r\n")// thePongReply 直接创建一个常量,不用每次都创建新的对象
var thePongReply = new(PongReply)type OkReply struct{}var okBytes = []byte("+OK\r\n")
var theOkReply = new(OkReply)// NullBulkReply 空字符串的响应 -1代表空
type NullBulkReply struct{}var nullBulkBytes = []byte("$-1\r\n")
var theNullBulkReply = new(NullBulkReply)// EmptyMultiBulkReply 空数组回复
type EmptyMultiBulkReply struct{}var emptyMultiBulkBytes = []byte("*0\r\n")
var theEmptyMultiBulkReply = new(EmptyMultiBulkReply)// NoReply 空回复
type NoReply struct{}var noReplyBytes = []byte("")
var theNoReply = new(NoReply)// ToBytes 直接回复固定的结构数据
func (p *PongReply) ToBytes() []byte {return pongBytes
}// MakePongReply 创建reply,一般暴露一个make方法出去
func MakePongReply() *PongReply {return thePongReply
}func (o *OkReply) ToBytes() []byte {return okBytes
}func MakeOkReply() *OkReply {return theOkReply
}func (n *NullBulkReply) ToBytes() []byte {return nullBulkBytes
}func MakeNullBulkReply() *NullBulkReply {return theNullBulkReply
}func (e *EmptyMultiBulkReply) ToBytes() []byte {return emptyMultiBulkBytes
}func MakeEmptyMultiBulkReply() *EmptyMultiBulkReply {return theEmptyMultiBulkReply
}func (n *NoReply) ToBytes() []byte {return noReplyBytes
}func MakeNoReply() *NoReply {return theNoReply
}
2. 解析器
2.1 结构体
Payload
Payload 结构体主要保存我们将协议数据解析完成之后的存储,里面包含了 resp.Reply 和 error 错误信息,Data 为响应实体就是上面定义的所有的结构体对象
readState
解析器的状态里面包含了当前解析协议参数描述
字段说明
- readingMultiline:当前解析的数据是多行数据还是单行数据
- expectedArgsCount:记录解析命令的参数个数,如果是数组那么解析的就是多个,如果是单行解析的就是1
- msgType:数据包的类型。例如:*、+、$ 等数据包的类型,参考resp协议说明
- args:传递的参数本身,二维数据进行数据的存储,例如:set k v,就有三个组
- bulkLen:解析后续需要解析数据的字节长度
方法说明
- finished:当前解析器是否已经解析完成,主要是判断 args的长度和expectedArgsCount是否相同
- result:根据指令的类型保存结果返回出去,响应的是 resp.Reply 。注意我们对协议的解析都是操作的 resp.Reply 类型
- ParseStream:这是一个公共方法,供外部调用里面主要创建一个 管道(chan) 返回给外部,解析完成之后就会往管道里面传递
- parser0:解析方法,接受参数 io.Reader 和 *Payload 两个类型,io.Reader 在前面tcp服务器实现的时候说过,客户的连接也会实现这个接口
- readLine:每次读取一行,以 \n 结尾
- adjustType:根据协议的类型会返回对应的处理函数
- readBody:用于读取 $ 后续的真正的数据
以上就是大体的说明,实际解析都是按照 RESP 协议进行正常的解析,可以自行解析,下面是代码
/**
协议解析器
*/// Payload 数据实体
type Payload struct {//客户端回复给服务端的数据,为什么都用Reply格式,因为对服务端来说,也是客户端回复给服务端的数据Data resp.Reply//是否有错误Err error
}// readState 解析器的状态
type readState struct {//解析的是单行还是多行数据readingMultiline bool//记录解析命令的参数个数,如果是数组那么解析的就是多个,如果是单行解析的就是1expectedArgsCount int//数据包的类型。例如:*、+、$ 等数据包的类型,参考resp协议说明msgType byte//传递的参数本身,二维数据进行数据的存储,例如:set k v,就有三个组args [][]byte//解析后续需要解析数据的字节长度bulkLen int64
}// finished 当前解析器是否已经解析完成
func (r *readState) finished() bool {//解析出来的个数跟参数的个数一样return r.expectedArgsCount > 0 && len(r.args) == r.expectedArgsCount
}// result 根据指令的类型保存结果返回出去
func (r *readState) result() resp.Reply {switch r.msgType {case common.ASTERISK:return reply.MakeMultiBulkReply(r.args)case common.DOLLAR:return reply.MakeBulkReply(r.args[0])default:return nil}
}// parser0 解析数据 tcp发送的数据就是 io.Reader
func parser0(reader io.Reader, ch chan *Payload) {//就算出现错误了也不能跳出死循环defer func() {if err := recover(); err != nil {logger.Error(string(debug.Stack()))}}()bufReader := bufio.NewReader(reader)state := &readState{}var err errorvar msg []byte//死循环不停的读取数据for true {var ioErr bool/**msg:返回读取\n为分割的一行数据,ioErr:代表是否读取到异常,err:返回的异常例如:*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n 这样的数据,那么读取的就是 *3\r\n*/msg, ioErr, err = readLine(bufReader, state)if err != nil { //先读一行数据,判断是否是io错误if ioErr {//写入错误数据到管道中ch <- &Payload{Err: err,}close(ch)return}//普通的错误ch <- &Payload{Err: err,}state = &readState{}continue}/**判断是否是多行解析,默认第一次进来都是false,里面读取了之后将会将其状态进行改变例如:*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n第一次解析一行:*3\r\n 改变状态 readingMultiline 为多行状态,解析 3 表示数组有3个参数第二次循环解析一行:$3\r\n 表示后面是个字符串,并且第一次将 readingMultiline 的状态进行了改变所以走到 else中通过 readBody() 进行读取,修改 state.bulkLen说明后续的参数字节长度第三次循环解析一行:SET\r\n 解析的是参数,通过 readBody() 进行解析,将数据直接存入到 state.args[][] 数组中第四次循环解析一行:$3\r\n 继续解析后续参数的长度第五次循环解析一行:KEY\r\n 读取的参数存入到数组中第六次循环解析一行:$5\r\n 继续解析后续参数的长度为5第七次循环解析一行:VALUE\r\n 读取的参数存入到数组中*/if !state.readingMultiline {//判断数据第一个符号是什么类型first := msg[0]//通过 adjustType() 函数判断是什么类型的数据,然后调用返回的函数payload := adjustType(first)(msg, state, ch)//将响应的实体返回给管道中if payload != nil {ch <- payload}} else {//读取数据err := readBody(msg, state)if err != nil {//如果err不为空,返回一个协议错误ch <- &Payload{Err: errors.New("protocol error:" + string(msg)),}//重置状态器state = &readState{}continue}//判断是否读取完成了if state.finished() {ch <- &Payload{//包装一下Resp对象Data: state.result(),}state = &readState{}}}}
}//adjustType 判断类型返回对应类型处理的函数
func adjustType(first byte) func(msg []byte, state *readState, ch chan *Payload) *Payload {//返回一个默认的处理函数,打印一下错误信息resultFunc := func(msg []byte, state *readState, ch chan *Payload) *Payload {logger.Error("不支持当前类型:%s 的数据包处理", first)return nil}//先判断 * 号if first == common.ASTERISK {//协议解析错误resultFunc = func(msg []byte, state *readState, ch chan *Payload) *Payload {// * 号解析多行数据err := parseMultiBulkHeader(msg, state)//处理读取错误if err != nil {state = &readState{}return &Payload{Err: errors.New("protocol error:" + string(msg)),}}//如果解析的数组长度为空的if state.expectedArgsCount <= 0 {state = &readState{}return &Payload{//给redis的核心响应一个空的数组而不是给客户端返回Data: reply.MakeEmptyMultiBulkReply(),}}return nil}} else if first == common.DOLLAR { //解析 $resultFunc = func(msg []byte, state *readState, ch chan *Payload) *Payload {err := parseBulkHeader(msg, state)//处理读取错误if err != nil {state = &readState{}return &Payload{Err: errors.New("protocol error:" + string(msg)),}}//空字符串的响应 -1代表空if state.bulkLen == -1 {state = &readState{}return &Payload{//给redis的核心响应一个空的数组而不是给客户端返回Data: reply.MakeNullBulkReply(),}}return nil}} else { //解析 +或者-号resultFunc = func(msg []byte, state *readState, ch chan *Payload) *Payload {//解析单行result, err := parseSingleLine(msg)//将状态器清空一下state = &readState{}return &Payload{Data: result,Err: err,}}}return resultFunc
}// ParseStream 解析字节流的时候会返回一个管道,解析完成之后发送数据到管道中
func ParseStream(reader io.Reader) chan *Payload {//创建一个管道ch := make(chan *Payload)go parser0(reader, ch)//给redis核心响应一个ch管道,核心层只需要监听这个管道数据即可return ch
}// readLine 根据指示读取一行数据。例如:*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {/**1. 按照\r\n的正常情况进行分割(没有读取到$字符),因为 $ 是预设指令,指示后面需要读取几个字符2. 如果之前读取到 $字符,严格按照$后面跟随的字符个数进行读取,不能进行分行*/var msg []bytevar err errorif state.bulkLen == 0 { // 1.表示前面没有读取到 $等预设指令 按照\r\n切分msg, err = bufReader.ReadBytes('\n')if err != nil {return nil, true, err}//判断 \n切了之后 倒数第二个是否是 \r进行区分,就是协议格式不对if len(msg) == 0 || msg[len(msg)-2] != '\r' {return nil, false, errors.New("protocol error: " + string(msg))}} else {//2. 前面读取到了 $ 等预设字符, state.bulkLen + 2 需要将后面的\r\n也要读取进来msg = make([]byte, state.bulkLen+2)//将bufReader中根据数组的长度全部读取出来_, err := io.ReadFull(bufReader, msg)if err != nil {return nil, true, err}//判断是否是 \r\n进行区分,就是协议格式不对if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {return nil, false, errors.New("protocol error: " + string(msg))}//将预设的长度设置为0state.bulkLen = 0}return msg, false, nil
}// parseMultiBulkHeader 解析多行,例如:*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
func parseMultiBulkHeader(msg []byte, state *readState) error {var err errorvar expectedLine uint64//将 *3\r\n 中的3给切出来,获取到当前数据包的长度expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)if err != nil {return errors.New("protocol error: " + string(msg))}if expectedLine == 0 { //读取出来的是空的数据state.expectedArgsCount = 0return nil} else if expectedLine > 0 { //表示后续有数据的长度//解析的标志位,消息的类型state.msgType = msg[0]//现在正在读多行状态state.readingMultiline = true//设置后续参数的个数state.expectedArgsCount = int(expectedLine)//初始化数组的长度,二维数组进行存储state.args = make([][]byte, 0, expectedLine)return nil} else {return errors.New("protocol error: " + string(msg))}
}// parseBulkHeader 解析单行,例如:$4\r\n
func parseBulkHeader(msg []byte, state *readState) error {var err error//解析单行的数据长度例如:$4\r\n,这里解析出来就是4state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 32)if err != nil {return errors.New("protocol error: " + string(msg))}if state.bulkLen == -1 {return nil} else if state.bulkLen > 0 {//解析的标志位,消息的类型state.msgType = msg[0]//现在正在读多行状态state.readingMultiline = false//读取参数的行数为1行state.expectedArgsCount = 1//初始化数组的长度,单行的话那么后续的参数就为1state.args = make([][]byte, 0, 1)return nil} else {return errors.New("protocol error: " + string(msg))}
}// 解析客户端响应的 +OK,-err, :5\r\n 格式,这种就可以直接解析完
func parseSingleLine(msg []byte) (resp.Reply, error) {//将正文解析出来str := strings.TrimSuffix(string(msg), "\r\n")var result resp.Replyswitch msg[0] {case common.PLUS: //解析正确响应包result = reply.MakeStatusReply(str[1:])case common.DASH: //解析错误响应包result = reply.MakeStandardErrReply(str[1:])case common.COLON: //解析整数数据包val, err := strconv.ParseInt(str[1:], 10, 64)if err != nil {return nil, errors.New("protocol error: " + string(msg))}result = reply.MakeIntReply(val)}return result, nil
}/**
可能是以下两种情况:
1. $3\r\n
2. SET\r\n
*/
func readBody(msg []byte, state *readState) error {//这里面是将后续的 \r\n的分隔符截取了line := msg[0 : len(msg)-2]var err error// $3 解析$后面的3出来if line[0] == common.DOLLAR {//保存后续需要解析的字节长度到状态器里面state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)if err != nil {return errors.New("protocol error: " + string(msg))}//如果情况类型是 $0\r\nif state.bulkLen <= 0 {//空的长度state.args = append(state.args, []byte{})state.bulkLen = 0}} else {//走到这里数据就是 SETstate.args = append(state.args, line)}return nil
}