go-zero 的网关服务实际是个 go-zero 的 API 服务,也就是一个 http 服务,或者说 rest 服务。http 转 grpc 使用了开源的 grpcurl 库,当网关需要往 rpc 服务传递额外的数据,比如鉴权数据的时候,通过 http 的 header 进行:
func AuthMiddleware(next http.HandlerFunc, w http.ResponseWriter, r *http.Request) {authResp, err := authClient.Authenticate(r.Context(), &authReq) // 调用鉴权服务r.Header.Set("Grpc-Metadata-myuid", authResp.UserId) // 往 rpc 服务传递额外数据next.ServeHTTP(w, r)
}
rpc 服务端从 metadata 取出数据:
func (l *QueryUserLogic) QueryUser(in *user.UserReq) (*user.UserResp, error) {vals := metadata.ValueFromIncomingContext(l.ctx, "gateway-myuid")uid = vals[0]
}
这里有两个需要注意的地方,在网关侧的名必须以“Grpc-Metadata-”打头,而 rpc 服务端必须以“gateway-”打头,这是 go-zero 的 gateway/internal/headerprocessor.go 写死的规则:
const (metadataHeaderPrefix = "Grpc-Metadata-"metadataPrefix = "gateway-"
)// ProcessHeaders builds the headers for the gateway from HTTP headers.
func ProcessHeaders(header http.Header) []string {var headers []stringfor k, v := range header {if !strings.HasPrefix(k, metadataHeaderPrefix) { // 判断是否以“Grpc-Metadata-”打头(网关侧传递的)continue // 非以“Grpc-Metadata-”打头的都会被丢弃掉}key := fmt.Sprintf("%s%s", metadataPrefix, strings.TrimPrefix(k, metadataHeaderPrefix)) // 替换为新的前缀“gateway-”(rpc 服务端看到的)for _, vv := range v {headers = append(headers, key+":"+vv)}}return headers
}
调用栈:
(dlv) bt0 0x00000000019da092 in github.com/zeromicro/go-zero/gateway/internal.ProcessHeadersat ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/gateway/internal/headerprocessor.go:151 0x00000000019dc40a in github.com/zeromicro/go-zero/gateway.(*Server).prepareMetadataat ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/gateway/server.go:1752 0x00000000019dbf69 in github.com/zeromicro/go-zero/gateway.(*Server).buildHandler.func1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/gateway/server.go:1323 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:21224 0x00000000008ca162 in net/http.Handler.ServeHTTP-fmat <autogenerated>:15 0x000000000195adc5 in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:21226 0x000000000195adc5 in gateway/middleware.AuthMiddleware // 网关代码at ./Getting-Started-with-Go-zero/gateway_login/gateway/middleware/login_and_auth.go:987 0x000000000195a325 in gateway/middleware.LoginAndAuthMiddleware.func1at ./Getting-Started-with-Go-zero/gateway_login/gateway/middleware/login_and_auth.go:378 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:21229 0x0000000001969169 in github.com/zeromicro/go-zero/rest/handler.GunzipHandler.func1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/rest/handler/gunziphandler.go:26
10 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:2122
11 0x000000000196b3bf in github.com/zeromicro/go-zero/rest/handler.MaxBytesHandler.func2.1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/rest/handler/maxbyteshandler.go:24
12 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:2122
13 0x000000000196ba15 in github.com/zeromicro/go-zero/rest/handler.MetricHandler.func1.1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/rest/handler/metrichandler.go:21
14 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:2122
15 0x000000000196c243 in github.com/zeromicro/go-zero/rest/handler.RecoverHandler.func1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/rest/handler/recoverhandler.go:21
16 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:2122
17 0x000000000196d45c in github.com/zeromicro/go-zero/rest/handler.(*timeoutHandler).ServeHTTP.func1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/rest/handler/timeouthandler.go:82
18 0x0000000000471921 in runtime.goexitat /usr/local/go/src/runtime/asm_amd64.s:1598(dlv) bt0 0x00000000019d636a in google.golang.org/grpc/metadata.NewOutgoingContextat ./go/pkg/mod/google.golang.org/grpc@v1.59.0/metadata/metadata.go:1651 0x00000000019d636a in github.com/fullstorydev/grpcurl.InvokeRPCat ./go/pkg/mod/github.com/fullstorydev/grpcurl@v1.8.9/invoke.go:1362 0x00000000019dc058 in github.com/zeromicro/go-zero/gateway.(*Server).buildHandler.func1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/gateway/server.go:1323 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:21224 0x00000000008ca162 in net/http.Handler.ServeHTTP-fmat <autogenerated>:15 0x000000000195adc5 in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:21226 0x000000000195adc5 in gateway/middleware.AuthMiddleware // 网关代码at ./Getting-Started-with-Go-zero/gateway_login/gateway/middleware/login_and_auth.go:987 0x000000000195a325 in gateway/middleware.LoginAndAuthMiddleware.func1at ./Getting-Started-with-Go-zero/gateway_login/gateway/middleware/login_and_auth.go:378 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:21229 0x0000000001969169 in github.com/zeromicro/go-zero/rest/handler.GunzipHandler.func1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/rest/handler/gunziphandler.go:26
10 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:2122
11 0x000000000196b3bf in github.com/zeromicro/go-zero/rest/handler.MaxBytesHandler.func2.1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/rest/handler/maxbyteshandler.go:24
12 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:2122
13 0x000000000196ba15 in github.com/zeromicro/go-zero/rest/handler.MetricHandler.func1.1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/rest/handler/metrichandler.go:21
14 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:2122
15 0x000000000196c243 in github.com/zeromicro/go-zero/rest/handler.RecoverHandler.func1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/rest/handler/recoverhandler.go:21
16 0x000000000089f52f in net/http.HandlerFunc.ServeHTTPat /usr/local/go/src/net/http/server.go:2122
17 0x000000000196d45c in github.com/zeromicro/go-zero/rest/handler.(*timeoutHandler).ServeHTTP.func1at ./go/pkg/mod/github.com/zeromicro/go-zero@v1.6.0/rest/handler/timeouthandler.go:82
18 0x0000000000471921 in runtime.goexitat /usr/local/go/src/runtime/asm_amd64.s:1598
在文件 zrpc/internal/clientinterceptors/tracinginterceptor.go 中调用了 metadata.NewOutgoingContext:
func startSpan(ctx context.Context, method, target string) (context.Context, trace.Span) {md, ok := metadata.FromOutgoingContext(ctx)if !ok {md = metadata.MD{}}tr := otel.Tracer(ztrace.TraceName)name, attr := ztrace.SpanInfo(method, target)ctx, span := tr.Start(ctx, name, trace.WithSpanKind(trace.SpanKindClient),trace.WithAttributes(attr...))ztrace.Inject(ctx, otel.GetTextMapPropagator(), &md)ctx = metadata.NewOutgoingContext(ctx, md)return ctx, span
}// UnaryTracingInterceptor returns a grpc.UnaryClientInterceptor for opentelemetry.
func UnaryTracingInterceptor(ctx context.Context, method string, req, reply any,cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {ctx, span := startSpan(ctx, method, cc.Target())defer span.End()ztrace.MessageSent.Event(ctx, 1, req)err := invoker(ctx, method, req, reply, cc, opts...)ztrace.MessageReceived.Event(ctx, 1, reply)if err != nil {s, ok := status.FromError(err)if ok {span.SetStatus(codes.Error, s.Message())span.SetAttributes(ztrace.StatusCodeAttr(s.Code()))} else {span.SetStatus(codes.Error, err.Error())}return err}span.SetAttributes(ztrace.StatusCodeAttr(gcodes.OK))return nil
}
拦截器:
./zrpc/internal/rpcserver.go: interceptors = append(interceptors, serverinterceptors.UnaryTracingInterceptor)
./zrpc/internal/client.go: interceptors = append(interceptors, clientinterceptors.UnaryTracingInterceptor)
服务端代码:
//zrpc/internal/rpcserver.go
func (s *rpcServer) buildUnaryInterceptors() []grpc.UnaryServerInterceptor {var interceptors []grpc.UnaryServerInterceptorif s.middlewares.Trace {interceptors = append(interceptors, serverinterceptors.UnaryTracingInterceptor)}if s.middlewares.Recover {interceptors = append(interceptors, serverinterceptors.UnaryRecoverInterceptor)}if s.middlewares.Stat {interceptors = append(interceptors,serverinterceptors.UnaryStatInterceptor(s.metrics, s.middlewares.StatConf))}if s.middlewares.Prometheus {interceptors = append(interceptors, serverinterceptors.UnaryPrometheusInterceptor)}if s.middlewares.Breaker {interceptors = append(interceptors, serverinterceptors.UnaryBreakerInterceptor)}return append(interceptors, s.unaryInterceptors...)
}func (s *rpcServer) Start(register RegisterFn) error {lis, err := net.Listen("tcp", s.address)if err != nil {return err}unaryInterceptorOption := grpc.ChainUnaryInterceptor(s.buildUnaryInterceptors()...)streamInterceptorOption := grpc.ChainStreamInterceptor(s.buildStreamInterceptors()...)options := append(s.options, unaryInterceptorOption, streamInterceptorOption)server := grpc.NewServer(options...)register(server)// register the health check serviceif s.health != nil {grpc_health_v1.RegisterHealthServer(server, s.health)s.health.Resume()}s.healthManager.MarkReady()health.AddProbe(s.healthManager)// we need to make sure all others are wrapped up,// so we do graceful stop at shutdown phase instead of wrap up phasewaitForCalled := proc.AddShutdownListener(func() {if s.health != nil {s.health.Shutdown()}server.GracefulStop()})defer waitForCalled()return server.Serve(lis)
}
客户端代码:
//zrpc/internal/client.go
func (c *client) buildUnaryInterceptors(timeout time.Duration) []grpc.UnaryClientInterceptor {var interceptors []grpc.UnaryClientInterceptorif c.middlewares.Trace {interceptors = append(interceptors, clientinterceptors.UnaryTracingInterceptor)}if c.middlewares.Duration {interceptors = append(interceptors, clientinterceptors.DurationInterceptor)}if c.middlewares.Prometheus {interceptors = append(interceptors, clientinterceptors.PrometheusInterceptor)}if c.middlewares.Breaker {interceptors = append(interceptors, clientinterceptors.BreakerInterceptor)}if c.middlewares.Timeout {interceptors = append(interceptors, clientinterceptors.TimeoutInterceptor(timeout))}return interceptors
}func (c *client) buildDialOptions(opts ...ClientOption) []grpc.DialOption {var cliOpts ClientOptionsfor _, opt := range opts {opt(&cliOpts)}var options []grpc.DialOptionif !cliOpts.Secure {options = append([]grpc.DialOption(nil),grpc.WithTransportCredentials(insecure.NewCredentials()))}if !cliOpts.NonBlock {options = append(options, grpc.WithBlock())}options = append(options,grpc.WithChainUnaryInterceptor(c.buildUnaryInterceptors(cliOpts.Timeout)...),grpc.WithChainStreamInterceptor(c.buildStreamInterceptors()...),)return append(options, cliOpts.DialOptions...)
}func (c *client) dial(server string, opts ...ClientOption) error {options := c.buildDialOptions(opts...)timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)defer cancel()conn, err := grpc.DialContext(timeCtx, server, options...)if err != nil {service := serverif errors.Is(err, context.DeadlineExceeded) {pos := strings.LastIndexByte(server, separator)// len(server) - 1 is the index of last charif 0 < pos && pos < len(server)-1 {service = server[pos+1:]}}return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is already started",server, err.Error(), service)}c.conn = connreturn nil
}// NewClient returns a Client.
func NewClient(target string, middlewares ClientMiddlewaresConf, opts ...ClientOption) (Client, error) {cli := client{middlewares: middlewares,}svcCfg := fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, p2c.Name)balancerOpt := WithDialOption(grpc.WithDefaultServiceConfig(svcCfg))opts = append([]ClientOption{balancerOpt}, opts...)if err := cli.dial(target, opts...); err != nil {return nil, err}return &cli, nil
}