前言
本节内容我们主要介绍一下中间键nacos的客户端服务注册原理及其源码解读,便于我们理解nacos作为服务注册中心的具体实现。在springcloud的微服务体系中,nacos客户端的注册是通过使用spring的监听机制ApplicationListener实现的。学习本节内容,需要我们清楚springboot的启动过程。
正文
- nacos客户端服务注册原理说明
①在注册服务之前,开启一个服务线程,每隔5秒钟向nacos服务器上传一次心跳信息,访问地址为:/instance/beat,如果服务器还没有创建服务实例,那么先创建一个服务实例,并且更新服务最新的心跳时间,如果已经创建实例,则直接返回。
②通过轮询的方式访问nacos的服务地址(服务地址可能有多个),通过访问地址/nacos/v1/ns/instance去注册服务,直到注册成功,失败则抛出异常。
③服务端创建实例完成后,会创建一个定时任务来检查这个实例是否健康,如果心跳机制超过15秒,标记服务为不健康,超过30秒,这个实例会直接被删除。
- nacos客户端服务注册的机制
①在spring-cloud-alibaba-nacos-discovery-2.2.0.RELEASE.jar包的spring.factories配置文件中,存在nacos客户端注册的Bean(NacosServiceRegistryAutoConfiguration),该Bean中实现了nacos客户端注册的核心功能。
②nacos客户端服务注册的机制:nacos客户端使用springboot的event事件发布机制实现服务的注册。通过NacosAutoServiceRegistration类继承ApplicationListener监听器,当springboot容器启动后,会派发监听事件,触发监听器,从而执行nacos客户端的服务注册功能。
③在NacosAutoServiceRegistration类中,通过调用register()方法实现服务的注册
- nacos客户端服务注册的整体流程
①在NacosAutoServiceRegistration类的register()方法方法处打断点,启动服务,分析其服务注册原理
② 调用main方法启动服务
③刷新spring容器上下文
④发布容器监听器事件
⑤调用bind方法绑定事件
⑥调用start方法 ,开始执行注册服务的方法,并发布该事件,完成服务的最终注册
- nacos客户端服务注册的核心流程
①NacosServiceRegistry类的register()方法实现了服务注册的核心流程
public void register(Registration registration) {//1.判断初始化服务是否存在if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}String serviceId = registration.getServiceId();String group = nacosDiscoveryProperties.getGroup();//2.根据nacos客户端配置信息封装服务实例对象Instance instance = getNacosInstanceFromRegistration(registration);try {//3.实现真正的服务注册逻辑namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,instance.getIp(), instance.getPort());}catch (Exception e) {4.服务注册失败的处理log.error("nacos registry, {} register failed...{},", serviceId,registration.toString(), e);// rethrow a RuntimeException if the registration is failed.// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132rethrowRuntimeException(e);} }
②NacosNamingService类中的registerInstance(String serviceName, String groupName, Instance instance)方法实现具体的注册逻辑:封装心跳包数据BeatInfo,使用addBeatInfo
()方法发送心跳包数据,通过registerService()方法实现服务注册
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeral()) {//1.设置心跳包参数BeatInfo beatInfo = new BeatInfo();//服务名称beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));//ip地址beatInfo.setIp(instance.getIp());//端口号beatInfo.setPort(instance.getPort());//集群名称beatInfo.setCluster(instance.getClusterName());//服务的访问权重beatInfo.setWeight(instance.getWeight());//服务的meta信息,包括自定义信息beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);//获取心跳包的发送间隔时间long instanceInterval = instance.getInstanceHeartBeatInterval();//设置心跳包时间,如果没有配置,则获取默认的时间beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);//2.发送服务心跳包beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);}//3.通过http轮询方式注册服务serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
③在BeatReactor类中的addBeatInfo方法实现心跳包的发送,通过线程池实现心跳包的发送逻辑,period是发送心跳包的间隔。
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;//fix #1733if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);//通过线程池执行心跳包的发送executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }
④BeatReactor类中的BeatTask任务类的线程run方法执行发送心跳包的逻辑,首先验证心跳包是否是停止状态,如果不是,开始发送心跳包,根据心跳包返回的时间计算下一次心跳包的发送时间nextTime,调用线程池执行下一次心跳包的发送。
public void run() {//1.如果心跳包是停止状态,直接退出if (beatInfo.isStopped()) {return;}//2.向nacos服务端发送心跳包long result = serverProxy.sendBeat(beatInfo);//3.获取下一个心跳包的发送时间long nextTime = result > 0 ? result : beatInfo.getPeriod();//4.发送下一个心跳包executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); }
⑤在NamingProxy类中,调用sendBeat方法,通过RPC远程调用,发送心跳包到/instance/beat地址
public long sendBeat(BeatInfo beatInfo) {try {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());}//构建心跳包请求参数Map<String, String> params = new HashMap<String, String>(4);params.put("beat", JSON.toJSONString(beatInfo));params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());//通过RPC远程调用发送心跳包String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);JSONObject jsonObject = JSON.parseObject(result);if (jsonObject != null) {return jsonObject.getLong("clientBeatInterval");}} catch (Exception e) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: " + JSON.toJSONString(beatInfo), e);}return 0L; }
⑥调用NamingProxy类中的reqAPI方法执行远程RPC调用,callServer方法通过HttpClient客户端工具发送http请求。心跳包通过轮询的方式发送到每一台nacos服务器上。
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {params.put(CommonParams.NAMESPACE_ID, getNamespaceId());if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {throw new IllegalArgumentException("no server available");}Exception exception = new Exception();//验证nacos服务器是否存在,存在则向nacos服务器发送心跳请求if (servers != null && !servers.isEmpty()) {//产生一个随机数,获取随机的一台nacos服务地址Random random = new Random(System.currentTimeMillis());int index = random.nextInt(servers.size());//轮询向nacos服务器发送心跳请求for (int i = 0; i < servers.size(); i++) {String server = servers.get(index);try {//发送心跳请求,并返回结果return callServer(api, params, server, method);} catch (NacosException e) {exception = e;NAMING_LOGGER.error("request {} failed.", server, e);} catch (Exception e) {exception = e;NAMING_LOGGER.error("request {} failed.", server, e);}index = (index + 1) % servers.size();}throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: "+ exception.getMessage());}for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {try {return callServer(api, params, nacosDomain);} catch (Exception e) {exception = e;NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);}}throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "+ exception.getMessage());}
⑦NacosNamingService类中的registerInstance方法中代码段serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);实现服务的注册。具体的注册逻辑如下:封装客户端的基本信息,通过RPC远程调用nacos服务端,注册客户端服务。接口的调用方式和心跳包的方式一致,也是通过轮询调用。
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",namespaceId, serviceName, instance);//封装客户端服务注册信息数据final Map<String, String> params = new HashMap<String, String>(9);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put(CommonParams.GROUP_NAME, groupName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JSON.toJSONString(instance.getMetadata()));//内部通过轮询的方式,使用PRC远程服务调用,注册nacos客户端到服务器reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);}
结语
关于nacos客户端服务注册原理说明及源码解读到这里就结束了,我们下期见。。。。。。