Golang RPC实现-day02

devtools/2024/9/23 6:28:16/

导航

  • Golang RPC实现
    • 一、客户端异步并发多个请求
      • 1、 客户端结构体
      • 2、 一个客户端,异步发送多个请求,使用`call`结构体代表客户端的每次请求
      • 3、客户端并发多个请求
      • 4、客户端接收请求

Golang RPC实现

  • day01 我们实现了简单的服务端客户端
  • 我们简单总结一下day01的模式。
  • 服务端按顺序处理客户端过来的请求,按顺序响应客户端的请求。
  • 客户端同步的方式发送请求,不能并发发出请求。
  • 那么我们day02干的事情就是,让客户端异步并发的发出请求(请求顺序变得随机),服务端依然是按请求顺序进行处理,处理完某一个请求就返回,可以不按请求的顺序响应数据,但是响应数据是要上锁的,否则会发生响应数据并发安全问题。
  • 主要逻辑是修改了客户端的代码,服务端和day01没有变化

一、客户端异步并发多个请求

1、 客户端结构体

type Client struct {cc       codec.Codec//编码方式opt      *Option//发出请求的第一个包,用来协商后续包的格式和编码方式sending  sync.Mutex // 当一个请求正在发送时,不可以转头去执行别的请求header   codec.Header // 请求头内容mu       sync.Mutex // protect followingseq      uint64 //记录该客户端一次请求连接的序号,pending  map[uint64]*Call//通过seq快速找到客户端的某个请求closing  bool // user has called Closeshutdown bool // server has told us to stop
}

2、 一个客户端,异步发送多个请求,使用call结构体代表客户端的每次请求

type Call struct {Seq           uint64	//当前请求的序号,唯一标识一个请求ServiceMethod string      // format "<service>.<method>" 此次请求的服务和方法Args          interface{} // arguments to the function 请求函数的参数Reply         interface{} // reply from the function 服务端函数的响应数据Error         error       // if error occurs, it will be set //发生错误时的信息Done          chan *Call  // Strobes when call is complete.完成一次请求通过chan来通知
}

3、客户端并发多个请求

  • 主函数逻辑
func main() {log.SetFlags(0)addr := make(chan string)go startServer(addr)client, _ := geerpc.Dial("tcp", <-addr)defer func() { _ = client.Close() }()time.Sleep(time.Second)// send request & receive responsevar wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func(i int) {//go 实现异步非阻塞发送多个请求defer wg.Done()args := fmt.Sprintf("geerpc req %d", i)//一次请求携带的数据var reply stringif err := client.Call("Foo.Sum", args, &reply); err != nil {//call发出一次请求,&reply,传的是引用,如果有响应,就能接收到log.Fatal("call Foo.Sum error:", err)}log.Println("reply:", reply)}(i)}wg.Wait()
}
  • Call 准备发出一次请求
// Call invokes the named function, waits for it to complete,
// and returns its error status.
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done//阻塞等待此次请求的channel,直到服务端处理并响应才返回return call.Error
}
  • 绑定数据到请求中
// Go invokes the function asynchronously.
// It returns the Call structure representing the invocation.
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {if done == nil {done = make(chan *Call, 10)} else if cap(done) == 0 {log.Panic("rpc client: done channel is unbuffered")}call := &Call{ServiceMethod: serviceMethod,//此次请求的服务和方法Args:          args,//此次请求的参数Reply:         reply,//此处是引用类型,暂时还没有数据,等服务端响应就有数据了Done:          done,//绑定此次请求的响应channel,服务端响应后就往对应的channel发一条数据}client.send(call)return call
}
  • 发送请求到服务端
func (client *Client) send(call *Call) {// make sure that the client will send a complete requestclient.sending.Lock()defer client.sending.Unlock()// register this call.seq, err := client.registerCall(call)//注册这次call,把这次的请求ID注册到客户端中。。。if err != nil {call.Error = errcall.done()return}// prepare request headerclient.header.ServiceMethod = call.ServiceMethodclient.header.Seq = seqclient.header.Error = ""// encode and send the requestif err := client.cc.Write(&client.header, call.Args); err != nil {//发送请求头和请求参数call := client.removeCall(seq)// call may be nil, it usually means that Write partially failed,// client has received the response and handledif call != nil {call.Error = errcall.done()}}
}

4、客户端接收请求

func (client *Client) receive() {var err errorfor err == nil {var h codec.Headerif err = client.cc.ReadHeader(&h); err != nil { 接收请求是一个个来,当连接关闭时,此处会报错,退出整个客户端break}call := client.removeCall(h.Seq)//通过Seq唯一标识符删除一个请求switch {case call == nil:// it usually means that Write partially failed// and call was already removed.err = client.cc.ReadBody(nil)case h.Error != "":call.Error = fmt.Errorf(h.Error)err = client.cc.ReadBody(nil)call.done()default:err = client.cc.ReadBody(call.Reply)if err != nil {call.Error = errors.New("reading body " + err.Error())}call.done()//向通道发送一条消息,客户端等待的这个call可以推出了}}// error occurs, so terminateCalls pending callsclient.terminateCalls(err)//关闭所有请求
}

在这里插入图片描述


http://www.ppmy.cn/devtools/42100.html

相关文章

【专用】C# ArrayList的用法总结

System.Collections.ArrayList类是一个特殊的数组。通过添加和删除元素&#xff0c;就可以动态改变数组的长度。 一、优点 1. 支持自动改变大小的功能 2. 可以灵活的插入元素 3. 可以灵活的删除元素 4. 可以灵活访问元素 二、局限性 跟一般的数组比起来&#xff0c;速度…

软考--试题六--享元模式(Flyweight)

享元模式(Flyweight) 意图 运用共享技术有效地支持大量细粒度的对象(将对象进行细分) 结构 适用性 1、一个应用程序使用了大量的对象 2、完全由于使用大量的对象&#xff0c;造成很大的存储开销 3、对象的大多数状态都快变为外部状态 4、如果删除对象的外部状态(易变)&…

阅读笔记——《代码整洁之道》ch3

引言 clean-code ch3阅读笔记 短小 函数的第一规则是要短小&#xff0c;一般来说不要一个函数体不要超过半个屏幕。 只做一件事情 函数应该做一件事。做好这件事情。只做一件事。 编写函数毕竟是为了把大一些的概念拆分为另一抽象层上的一系列步骤。只做一件事的函数无法…

在win10折腾Flowise:部署和尝试

Flowise 是一种低代码/无代码拖放工具&#xff0c;旨在让人们轻松可视化和构建 LLM 应用程序。 本地部署 操作系统&#xff1a; win10 由于网络、操作系统等各种未知问题&#xff0c;使用npm install -g flowise的方式&#xff0c;尝试了很多次&#xff0c;都没有部署成功&am…

二.使用PgAdmin连接Postgresql

二.使用PgAdmin连接Postgresql PostgreSQL是一种开源的对象关系型数据库管理系统(ORDBMS),它支持大部分SQL标准并提供了许多高级功能,例如事务、外键、视图、触发器等。PostgreSQL由PostgreSQL全球开发组维护和开发,它是一种高度可扩展的数据库系统,可以在各种操作系统…

专业的服贸会服务团队-媒体邀约宣传

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 2024服贸会开展在即&#xff0c;许多企业都做好了的参展的准备&#xff0c;北京麦塔文化提供专业的展览展会服务&#xff0c;下面做个简单介绍。 、会场搭建团队&#xff1a; 负责整个活…

基于光纤技术的新能源汽车电池安全监测--FOM²系统

为什么要进行动力电池包的温度监测&#xff1f; 新能源电动汽车的动力电池包的工作温度&#xff0c;不仅会影响电池包性能&#xff0c;而且直接关系到车辆安全。时有发生的新能源汽车电池包起火事件&#xff0c;对电池包、冷却系统以及电池管理系统&#xff08;BMS&#xff09…

解决电脑睡眠后,主机ping不通VMware虚拟机

文章目录 问题解决方法方法一方法二注意 问题 原因&#xff1a;电脑休眠一段时间&#xff0c;再次打开电脑就ping不通VMware虚拟机。 解决方法 方法一 重启电脑即可&#xff0c;凡是遇到电脑有毛病&#xff0c;重启能解决90%问题。但是重启电脑比较慢&#xff0c;而且重启…