本文首发在这里
考虑这种常见情景:服务多开,正常连接采用轮询负载均衡,但若服务有状态,重连则需进入之前的服务
本文其实主要在讨论以下两篇官方文档
- gRPC naming and discovery
- Custom Load Balancing Policies
实现依赖即将废弃的resolver.Address.Metadata,其实也仅是想复用已有的官方代码
自定义负载均衡并未依照示例,官方可能也是想展示更多细节,提供的明显不是最简单的实现方式,毕竟都是支持Service Config集成自定义配置的,也是需要熟悉endpointsharding和pickfirst相关逻辑的,所以不太适合用来入门
反观官方源码中roundrobin,基于baseBalancer仅实现Picker
的方式,才真是将代码写在了刀刃上
服务启动clientv3.Put
,服务关闭clientv3.Delete
,创建租约并借助Lease.KeepAlive
确保服务异常退出时因未自动续期而删除
通过context.WithValue
携带ServiceID
实现选择指定服务的连接
其余看看便知的就不赘述啦
server.go
package mainimport ("context""flag""fmt""log""log/slog""net""os""os/signal""github.com/panshiqu/golang/discovery""github.com/panshiqu/golang/utils""google.golang.org/grpc"pb "google.golang.org/grpc/examples/features/proto/echo"
)var id = flag.Int("id", 1, "id")
var env = flag.String("env", "dev", "environment")
var host = flag.String("h", "127.0.0.1", "host")type echoServer struct {pb.UnimplementedEchoServer
}func (s *echoServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %d)", req.Message, *id)}, nil
}func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf("%s:0", *host))if err != nil {log.Fatal(utils.Wrap(err))}s := grpc.NewServer()pb.RegisterEchoServer(s, &echoServer{})addr := fmt.Sprintf("%s:%d", *host, lis.Addr().(*net.TCPAddr).Port)service, err := discovery.Register("http://127.0.0.1:2379", fmt.Sprintf("%s/game", *env), addr, *id)if err != nil {log.Fatal(utils.Wrap(err))}go func() {if err := s.Serve(lis); err != nil {slog.Error("serve", slog.Any("err", err))}}()c := make(chan os.Signal, 1)signal.Notify(c, os.Interrupt)sig := <-cslog.Info("notify", slog.Any("signal", sig))if err := service.Release(); err != nil {slog.Error("release", slog.Any("err", err))}s.GracefulStop()
}
client.go
package mainimport ("context""flag""fmt""log""time""github.com/panshiqu/golang/balancer"_ "github.com/panshiqu/golang/balancer""github.com/panshiqu/golang/utils"clientv3 "go.etcd.io/etcd/client/v3""go.etcd.io/etcd/client/v3/naming/resolver""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "google.golang.org/grpc/examples/features/proto/echo"
)var id = flag.Int("id", 0, "id")func main() {flag.Parse()cli, err := clientv3.NewFromURL("http://127.0.0.1:2379")if err != nil {log.Fatal(utils.Wrap(err))}etcdResolver, err := resolver.NewBuilder(cli)if err != nil {log.Fatal(utils.Wrap(err))}cc, err := grpc.NewClient("etcd:///discovery/dev/game", grpc.WithResolvers(etcdResolver), grpc.WithDefaultServiceConfig(`{"loadBalancingConfig":[{"custom_round_robin":{}}]}`), grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatal(utils.Wrap(err))}defer cc.Close()ec := pb.NewEchoClient(cc)ctx := context.Background()if *id != 0 {ctx = context.WithValue(ctx, balancer.ServiceID, fmt.Sprint(*id))}for {r, err := ec.UnaryEcho(ctx, &pb.EchoRequest{Message: "hi"})if err != nil {log.Fatal(utils.Wrap(err))}fmt.Println(r)time.Sleep(time.Second)}
}
终端依次执行以下命令试试
go run client.go
go run server.go -id=1
go run server.go -id=2
go run client.go -id=1
# stop server 2