前言
上节内容我们通过分析nacos客户端源码,了解了nacos客户端是如何向服务端注册服务和发送心跳包的,本节内容话接上一节内容,我们通过分析nacos服务的源码,查看服务端是如何处理客户端注册时候的心跳包的。关于nacos服务端的源码,下载地址为:GitHub - alibaba/nacos: an easy-to-use dynamic service discovery, configuration and service management platform for building cloud native applications.
正文
①找到nacos服务端处理心跳包的接口InstanceController类
接口地址:/nacos/v1/ns/instance/beat
②在InstanceController类中的beat方法实现了心跳包的处理逻辑
心跳包的整体处理流程说明:
@CanDistro@PutMapping("/beat")@Secured(action = ActionTypes.WRITE)public ObjectNode beat(HttpServletRequest request) throws Exception {ObjectNode result = JacksonUtils.createEmptyJsonNode();//1.设置默认心跳间隔时间result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());//2.获取心跳包数据String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);RsInfo clientBeat = null;if (StringUtils.isNotBlank(beat)) {//3.解析心跳包数据clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}//4.获取集群名称、IP地址、和端口String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {// fix #2533clientBeat.setCluster(clusterName);}ip = clientBeat.getIp();port = clientBeat.getPort();}//5.获取客户端服务命名空间和服务名称String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);//6.检查服务名称规范,不符合要求抛出异常NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,serviceName, namespaceId);BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();builder.setRequest(request);//7.处理心跳包数据int resultCode = getInstanceOperator().handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);//8.返回处理结果result.put(CommonParams.CODE, resultCode);result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;}
③在InstanceOperatorClientImpl类中handleBeat方法是对心跳包的具体处理流程
handleBeat方法执行流程的说明:
public int handleBeat(String namespaceId, String serviceName, String ip, int port, String cluster,RsInfo clientBeat, BeatInfoInstanceBuilder builder) throws NacosException {//1.根据服务命名空间和服务名称获取服务实例对象Service service = getService(namespaceId, serviceName, true);//2.根据ip和端口号获取客户端IDString clientId = IpPortBasedClient.getClientId(ip + InternetAddressUtil.IP_PORT_SPLITER + port, true);//3.查询注册的客户端IpPortBasedClient client = (IpPortBasedClient) clientManager.getClient(clientId);//4.如果客户端不存在或者客户端服务实例还未发布,注册客户端实例,否则跳过该操作if (null == client || !client.getAllPublishedService().contains(service)) {if (null == clientBeat) {//4.1心跳包不存在,直接返回不存在的提示码return NamingResponseCode.RESOURCE_NOT_FOUND;}//4.2根据心跳包和服务名称构建Instance实例对象Instance instance = builder.setBeatInfo(clientBeat).setServiceName(serviceName).build();//4.3注册Instance实例对象,该方式中存在关于client的实例对象的注册registerInstance(namespaceId, serviceName, instance);//4.4再次获取客户端实例client = (IpPortBasedClient) clientManager.getClient(clientId);}//5.验证服务实例对象是否存在,不存在则抛出服务不存在的异常if (!ServiceManager.getInstance().containSingleton(service)) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}//6.心跳包不存在,则根据传入参数封装客户端心跳包数据if (null == clientBeat) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(cluster);clientBeat.setServiceName(serviceName);}//7.服务健康检查,更新服务的心跳时间,如果服务的健康状态是false,则更新为true,表明服务实例是健康状态ClientBeatProcessorV2 beatProcessor = new ClientBeatProcessorV2(namespaceId, clientBeat, client);HealthCheckReactor.scheduleNow(beatProcessor);//8.更新客户端时间client.setLastUpdatedTime();return NamingResponseCode.OK;}
registerInstance(namespaceId, serviceName, instance)方法实现了客户端服务和客户端实例的注册
下面的方法实现了服务的心跳时间更新和健康状态更新
ClientBeatProcessorV2 beatProcessor = new ClientBeatProcessorV2(namespaceId, clientBeat, client); HealthCheckReactor.scheduleNow(beatProcessor);
④ InstanceOperatorClientImpl类中的registerInstance方法实现了IpPortBasedClient客户端和Instance服务实例的注册
IpPortBasedClient客户端和Instance服务实例的注册的源码说明:public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//1.检查服务实例是否合法NamingUtils.checkInstanceIsLegal(instance);//2.获取IpPortBasedClient客户端idboolean ephemeral = instance.isEphemeral();String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);//3.创建IpPortBasedClient客户端createIpPortClientIfAbsent(clientId);//4.获取服务实例Service service = getService(namespaceId, serviceName, ephemeral);//5.创建Instance实例clientOperationService.registerInstance(service, instance, clientId);}
⑤分析IpPortBasedClient客户端实例创建的方法createIpPortClientIfAbsent(clientId);
- IpPortBasedClient客户端存储在一个ConcurrentMap集合中
private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();- 判断IpPortBasedClient客户端是否存在,不存在则创建该客户端
private void createIpPortClientIfAbsent(String clientId) {//1.判断IpPortBasedClient客户端是否存在if (!clientManager.contains(clientId)) {ClientAttributes clientAttributes;if (ClientAttributesFilter.threadLocalClientAttributes.get() != null) {clientAttributes = ClientAttributesFilter.threadLocalClientAttributes.get();} else {clientAttributes = new ClientAttributes();}//2.客户端不存在,创建IpPortBasedClient客户端clientManager.clientConnected(clientId, clientAttributes);}}
-调用clientConnected方法,使用客户端工厂类EphemeralIpPortClientFactory创建一个客户端IpPortBasedClient
public boolean clientConnected(String clientId, ClientAttributes attributes) {//使用客户端工厂工具创建一个客户端return clientConnected(clientFactory.newClient(clientId, attributes)); }
-调用clientConnected同名方法,将IpPortBasedClient客户端存储于ConcurrentMap中,并执行init初始化方法
public boolean clientConnected(final Client client) {//将IpPortBasedClient客户端存储到clients的ConcurrentMap中clients.computeIfAbsent(client.getClientId(), s -> {Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;//调用IpPortBasedClient的init方法,实现初始化方法ipPortBasedClient.init();return ipPortBasedClient;});return true; }
-在init方法中,实现了注册服务心跳包检查和健康检查
public void init() {if (ephemeral) {//心跳包检查beatCheckTask = new ClientBeatCheckTaskV2(this);HealthCheckReactor.scheduleCheck(beatCheckTask);} else {//健康检查healthCheckTaskV2 = new HealthCheckTaskV2(this);HealthCheckReactor.scheduleCheck(healthCheckTaskV2);} }
- 心跳包检查,在类ClientBeatCheckTaskV2中的doHealthCheck()方法实现心跳包的健康检查
-在doInterceptor方法中的passIntercept方法实现了客户端检查、健康检查、服务实例检查
-这里我们以服务实例检查为例,ExpiredInstanceChecker中的doCheck方法实现服务实例的过期检查,如果过期则删除实例,默认超过30秒删除实例
public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();//如果过期则删除服务注册实例if (expireInstance && isExpireInstance(service, instance)) {deleteIp(client, service, instance);} }
- 在UnhealthyInstanceChecker中的doCheck方法实现服务实例的健康检查,默认时间超过15秒则视实例为不健康
public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {//判断实例是否健康if (instance.isHealthy() && isUnhealthy(service, instance)) {//如果实例不健康则将实例状态置为falsechangeHealthyStatus(client, service, instance);} }
⑥服务实例Instance注册 registerInstance
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {//1.检查服务实例是否合法NamingUtils.checkInstanceIsLegal(instance);//2.获取并创建单实例的服务Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM,String.format("Current service %s is persistent service, can't register ephemeral instance.",singleton.getGroupedServiceName()));}//3.获取客户端,如果不合法则直接返回Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//4.获取实例发布信息InstancePublishInfo instanceInfo = getPublishInfo(instance);//5.更新客户端信息client.addServiceInstance(singleton, instanceInfo);client.setLastUpdatedTime();client.recalculateRevision();//6.发布客户端服务注册事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false)); }
结语
至此,关于nacos服务端服务注册心跳包(/nacos/v1/ns/instance/beat)源码解读到这里就结束了,下期见。。。。。。