上篇简单看了下Nacos客户端在服务注册时做了什么。
本篇开始分析Nacos
在服务注册时,服务端的相关逻辑。
建议先阅读这篇文章:支持 gRPC 长链接,深度解读 Nacos 2.0 架构设计及新模型
回顾一下,上篇我们看了Nacos
在服务注册时,客户端的相关源码。Nacos2.X通过grpc
支持了长链接,那么客户端发起调用,肯定就有一个grpc的服务端在接收请求。那么就从这个grpc的相关代码看起~
grpc server
abstract class BaseRpcServer
是nacos-core
中一个抽象类,有一个@PostConstruct
修饰的start方法。
@PostConstructpublic void start() throws Exception {String serverName = getClass().getSimpleName();String tlsConfig = JacksonUtils.toJson(grpcServerConfig);Loggers.REMOTE.info("Nacos {} Rpc server starting at port {} and tls config:{}", serverName, getServicePort(), tlsConfig);//启动grpc服务端startServer();Loggers.REMOTE.info("Nacos {} Rpc server started at port {} and tls config:{}", serverName, getServicePort(), tlsConfig);//钩子函数:处理退出信号Runtime.getRuntime().addShutdownHook(new Thread(() -> {Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);try {BaseRpcServer.this.stopServer();Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);} catch (Exception e) {Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);}}));}
这个startServer()
是一个抽象方法,我们看下其实现。
/*** Start sever.** @throws Exception exception throw if start server fail.*/public abstract void startServer() throws Exception;
追踪代码发现,这个方法是当前类BaseRpcServer
的子类BaseGrpcServer
实现的
public abstract class BaseGrpcServer extends BaseRpcServer
看下startServer()
的代码:
@Overridepublic void startServer() throws Exception {final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();//注册服务addServices(handlerRegistry, new GrpcConnectionInterceptor());NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());if (grpcServerConfig.getEnableTls()) {if (grpcServerConfig.getCompatibility()) {builder.protocolNegotiator(new OptionalTlsProtocolNegotiator(getSslContextBuilder()));} else {builder.sslContext(getSslContextBuilder());}}server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry).compressorRegistry(CompressorRegistry.getDefaultInstance()).decompressorRegistry(DecompressorRegistry.getDefaultInstance()).addTransportFilter(new AddressTransportFilter(connectionManager)).keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS).keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS).permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS).build();//启动grpc的server服务server.start();}
研读框架源码时,先不要陷入细节当中,第一遍梳理清楚整个框架的即可,关于grpc-server
就先看到这里。
上文中我们提到了抽象类BaseRpcServer
,简单分析下这个类。
BaseRpcServer
BaseRpcServer
和BaseGrpcServer
都是抽象类,GrpcClusterServer
和GrpcSdkServer
都是抽象实现类,并且这两个实现类都有@Service
注解标注,那么就意味着这两个类会被注册为spring bean 。
上文我们提过BaseRpcServer.start()
有一个@PostConstruct
注解,那么也就意味着具体调用时使用了GrpcClusterServer
和GrpcSdkServer
的任何一个类,都会去调用BaseRpcServer.start()
方法去启动grpc-server
。
GrpcClusterServer
和GrpcSdkServer
的区别
从名字上可以看出,一个是Cluster服务调用,一个是SDK调用。
那么客户端注册使用的是哪个Server?
BaseRpcServer
中定义了一个获取端口偏移量的方法:
/*** the increase offset of nacos server port for rpc server port.** @return delta port offset of main port.*/public abstract int rpcPortOffset();
GrpcSdkServer
对此给出的实现是返回一个常量定义:
public static final Integer SDK_GRPC_PORT_DEFAULT_OFFSET = 1000;
GrpcClusterServer
给出的常量中定义的端口是CLUSTER_GRPC_PORT_DEFAULT_OFFSET = 1001
即然定义了端口,那么这个常量在创建这两个server的时候肯定会用到,我们通过这个常量去寻找到调用方,自然而然也就找到了server创建的逻辑,进而找到这两个server使用上的区别。
通过常量Constants.SDK_GRPC_PORT_DEFAULT_OFFSET
发现一个新的类GrpcSdkClient
中rpcPortOffset()
方法使用了这个常量。
通过观察构造方法的调用,我们找到了RpcClient的创建工厂RpcClientFactory
//本地缓存,存储client,key为clientNameprivate static final Map<String, RpcClient> CLIENT_MAP = new ConcurrentHashMap<>();public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,Integer threadPoolMaxSize, Map<String, String> labels, RpcClientTlsConfig tlsConfig) {if (!ConnectionType.GRPC.equals(connectionType)) {throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());}//如果当前clientName不存在,那么执行GrpcSdkClient的创建逻辑return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);try {return new GrpcSdkClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig);} catch (Throwable throwable) {LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);throw throwable;}});}
那么再看下上边源码中createClient
的调用方是谁,查看代码得知分别有两个调用方,ClientWorker
和NamingGrpcClientProxy
,第二个调用方是不是有点熟悉?
对的,我们在上篇文章看客户端注册服务的代码时,看到过如下代码:
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}private NamingClientProxy getExecuteClientProxy(Instance instance) {return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
那么这个grpcClientProxy
是哪一个实现呢?
@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);clientProxy.registerService(serviceName, groupName, instance);}
再次观察创建服务实例的代码,我们可以看到在实际的注册服务时,使用了一个客户端代理clientProxy
来做处理,我们来看下这个代理的是怎么创建的。
private NamingClientProxy clientProxy;private void init(Properties properties) throws NacosException {final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);ValidatorUtils.checkInitParam(nacosClientProperties);this.namespace = InitUtils.initNamespaceForNaming(nacosClientProperties);InitUtils.initSerialization();InitUtils.initWebRootContext(nacosClientProperties);initLogName(nacosClientProperties);this.notifierEventScope = UUID.randomUUID().toString();this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);NotifyCenter.registerSubscriber(changeNotifier);this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);//一目了然this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);}
this.clientProxy = new NamingClientProxyDelegate
,可以看到这个代理实际上是一个创建了一个委托类,那继续看下这个委托类的构造方法。
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, NacosClientProperties properties,InstancesChangeNotifier changeNotifier) throws NacosException {this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,changeNotifier);this.serverListManager = new ServerListManager(properties, namespace);this.serviceInfoHolder = serviceInfoHolder;this.securityProxy = new SecurityProxy(this.serverListManager.getServerList(),NamingHttpClientManager.getInstance().getNacosRestTemplate());initSecurityProxy(properties);this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);//终于,我们找到了NamingGrpcClientProxythis.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder);}
回过头来再次看NacosNamingServie
的registerInstance
方法。
@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);clientProxy.registerService(serviceName, groupName, instance);}
我们已经知道了clientProxy
是NamingClientProxyDelegate
,那么就看下它是如何实现registerService
即可。
别着急,谜底马上揭开😎
@Overridepublic void registerService(String serviceName, String groupName, Instance instance) throws NacosException {getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);}private NamingClientProxy getExecuteClientProxy(Instance instance) {return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;}
instance.
封装了客户端要注册的服务,ephemeral
默认为true
。所以默认会选择grpcClientProxy
去注册服务。
this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder);
回答我们自己提出的问题:GrpcClusterServer
和GrpcSdkServer
的区别?
后者是客户端注册使用的,那么前者肯定就是服务内部调用使用的了。
总结
梳理关键类和方法
- 客户端
- NacosDiscoveryAutoRegister
- 是一个
ApplicationListener
,监听WebServerInitializedEvent
事件后,注册当前实例
- 是一个
- NacosNamingService
- private void init(Properties properties) throws NacosException
- 初始化时,定义了创建客户端的代理委托类
clientProxy
为NamingClientProxyDelegate
- 初始化时,定义了创建客户端的代理委托类
- registerInstance
- 使用
clientProxy
注册服务
- 使用
- private void init(Properties properties) throws NacosException
- NamingClientProxyDelegate
- 构造方法注入:
grpcClientProxy = new NamingGrpcClientProxy
instance.isEphemeral() ? grpcClientProxy : httpClientProxy
- 构造方法注入:
- NamingGrpcClientProxy
registerService
requestToServer
this.rpcClient.request(request)
这里的rpcClient其实就是GrpcSdkClient
- NacosDiscoveryAutoRegister
- 服务端
- RpcClientFactory
createClient
:返回GrpcSdkClient
- GrpcSdkClient
- 未完待续…
- RpcClientFactory
本篇很大篇幅上讲了服务注册时,服务端grpc-server
相关的一些逻辑。从中可以看到服务端使用了很多委托类、代理类来抽象、封装相关业务逻辑,所以刚开始看框架源码如果一头雾水的时候,不要着急。抓大放小,先建立整体认知后,再回过头来深入细节。
下篇从源码上深入服务端在注册服务时的业务细节。
下班!🕶️