客户端
关键属性
HostReactor
- Map<String, ScheduledFuture<?>> futureMap:缓存向服务端请求ServiceInfo的定时任务
- Map<String, ServiceInfo> serviceInfoMap:缓存从服务端获取的Service信息
- Map<String, Object> updatingMap:用来标记是是否存在其他请求向服务端以相同的条件请求ServiceInfo
ServiceManager
- Map<String, Map<String, Service>> serviceMap:服务端缓存注册信息的Map:Map(namespace, Map(group::serviceName, Service)).
NacosNamingService.getAllInstances
获取所有的Instance集合
- boolean subscribe:true,请求结果需要进行缓存并设置定时任务更新缓存;false:直接从服务端查询并返回结果
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;if (subscribe) {//请求结果需要进行缓存并设置定时任务更新缓存serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));} else {// 直接向服务端请求并返回结果,客户端不做缓存,不设置定时任务serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));}List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<Instance>();}return list;}
HostReactor.getServiceInfoDirectlyFromServer
直接向服务端请求并返回结果,客户端不做缓存,不设置定时任务
public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters)throws NacosException {String result = serverProxy.queryList(serviceName, clusters, 0, false);if (StringUtils.isNotEmpty(result)) {return JacksonUtils.toObj(result, ServiceInfo.class);}return null;}
HostReactor.getServiceInfo
根据serviceName和cluster集合获取实例集合,请求结果需要进行缓存并设置定时任务更新缓存
1.从客户端缓存(HostReactor.serviceInfoMap)中获取ServiceInfo
2.本地缓存中不存在对应的值,新建ServiceInfo实例并加入serviceInfoMap中,同时将serviceName加入updatingMap中并向服务端请求获取Instance集合并更新ServiceInfo,更新完成后从updatingMap中删除这个serviceName
3.本地缓存中存在对应的ServiceInfo,且updatingMap中有serviceName时,说明上一个请求正在请求服务端获取信息,调用wait方法,上一个请求更新完内存后会调用notifyAll方法
4.调用scheduleUpdateIfAbsent方法定时向服务端请求并更新serviceInfoMap的值
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);if (null == serviceObj) {serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);// 说明存在请求在向服务端请求Instance} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}// 添加定时任务,向服务端请求并更新serviceInfoMap的值scheduleUpdateIfAbsent(serviceName, clusters);return serviceInfoMap.get(serviceObj.getKey());}
HostReactor.updateServiceNow
// 从服务端获取信息并保存到客户端内存中
private void updateServiceNow(String serviceName, String clusters) {try {updateService(serviceName, clusters);} catch (NacosException e) {NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);}}// 从服务端获取信息并保存到客户端内存中public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {// 请求服务端获取信息String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);// 处理从服务端获得的消息(将结果保存到HostReactor.serviceInfoMap中)if (StringUtils.isNotEmpty(result)) {processServiceJson(result);}} finally {// 唤醒HostReactor.getServiceInfo中调用wait的线程if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}}// 处理从服务端获取的消息// 1.先将json字符串转换为ServiceInfo// 2.之前在serviceInfoMap存在值,将新得到的ServiceInfo保存到serviceInfoMap中,并将现在的和之前的进行比较,如果内容发生变化,写日志,并发送InstancesChangeEvent事件// 3.之前在serviceInfoMap不存在值,将新得到的ServiceInfo保存到serviceInfoMap中,写日志,并发送InstancesChangeEvent事件// 4.返回第一步的ServiceInfopublic ServiceInfo processServiceJson(String json) {ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}ServiceInfo oldService = serviceInfoMap.get(serviceKey);if (pushEmptyProtection && !serviceInfo.validate()) {//empty or error push, just ignorereturn oldService;}boolean changed = false;if (oldService != null) {if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "+ serviceInfo.getLastRefTime());}serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());for (Instance host : oldService.getHosts()) {oldHostMap.put(host.toInetAddr(), host);}Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());for (Instance host : serviceInfo.getHosts()) {newHostMap.put(host.toInetAddr(), host);}Set<Instance> modHosts = new HashSet<Instance>();Set<Instance> newHosts = new HashSet<Instance>();Set<Instance> remvHosts = new HashSet<Instance>();List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(newHostMap.entrySet());for (Map.Entry<String, Instance> entry : newServiceHosts) {Instance host = entry.getValue();String key = entry.getKey();if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) {modHosts.add(host);continue;}if (!oldHostMap.containsKey(key)) {newHosts.add(host);}}for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {Instance host = entry.getValue();String key = entry.getKey();if (newHostMap.containsKey(key)) {continue;}if (!newHostMap.containsKey(key)) {remvHosts.add(host);}}if (newHosts.size() > 0) {changed = true;NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "+ JacksonUtils.toJson(newHosts));}if (remvHosts.size() > 0) {changed = true;NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "+ JacksonUtils.toJson(remvHosts));}if (modHosts.size() > 0) {changed = true;updateBeatInfo(modHosts);NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "+ JacksonUtils.toJson(modHosts));}serviceInfo.setJsonFromServer(json);if (changed) {NotifyCenter.publishEvent(new InstancesChangeEvent(this.notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));DiskCache.write(serviceInfo, cacheDir);}} else {changed = true;NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "+ JacksonUtils.toJson(serviceInfo.getHosts()));serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);NotifyCenter.publishEvent(new InstancesChangeEvent(this.notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, cacheDir);}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "+ JacksonUtils.toJson(serviceInfo.getHosts()));}return serviceInfo;}
NamingProxy.queryList
向服务端发送http请求来获取该请求条件下的Instance集合
- String serviceName:服务名称
- String clusters:用“,”隔开的cluster集合的字符串
- boolean healthyOnly:返回结果里的实例是否健康,默认为false
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);}
服务端
InstanceController.list
@GetMapping("/list")@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)public ObjectNode list(HttpServletRequest request) throws Exception {String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);String agent = WebUtils.getUserAgent(request);String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));String env = WebUtils.optional(request, "env", StringUtils.EMPTY);boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));String app = WebUtils.optional(request, "app", StringUtils.EMPTY);String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);}
InstanceController.doSrvIpxt
在服务端中根据条件查询Instance集合,方法太长了,这里只显示部分关键代码
healthyOnly为true时:默认情况下返回的所有满足条件且健康且能接受请求的Instance,如果不存在健康状态的Instance,返回所有满足条件且能接受请求的Instance。
healthyOnly为false时:返回所有满足条件的能接受请求的Instance,不管健康状态。
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {ClientInfo clientInfo = new ClientInfo(agent);ObjectNode result = JacksonUtils.createEmptyJsonNode();// 根据namespaceId和serviceName从ServiceManager.serviceMap中获取ServiceService service = serviceManager.getService(namespaceId, serviceName);long cacheMillis = switchDomain.getDefaultCacheMillis();// 不存在Service时,返回给客户端空集合,下面if逻辑中省略了部分参数组装代码if (service == null) {result.replace("hosts", JacksonUtils.createEmptyArrayNode());return result;}List<Instance> srvedIPs;// 如果clusters为空,获取所有的Instance,否则获取clusters对应的Cluster集合里的所有InstancesrvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));// 使用Service中的selector对上面获得的Instance集合进行过滤if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {srvedIPs = service.getSelector().select(clientIP, srvedIPs);}// 不存在Instance时,返回给客户端空集合,下面if逻辑中省略了部分参数组装代码if (CollectionUtils.isEmpty(srvedIPs)) {result.set("hosts", JacksonUtils.createEmptyArrayNode());return result;}Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);ipMap.put(Boolean.TRUE, new ArrayList<>());ipMap.put(Boolean.FALSE, new ArrayList<>());// 将Instance根据健康状态进行分类for (Instance ip : srvedIPs) {ipMap.get(ip.isHealthy()).add(ip);}double threshold = service.getProtectThreshold();// 如果集合中健康状态的Instance的占比<=Serice.protectThreshold,会将不健康的Instance加入健康的集合中,即默认所有的Instance都是健康的,等效于healthyOnly为false,Serice.protectThreshold的默认值为0,即healthyOnly为true时,返回的所有Instance要么都是健康的,要么都是不健康的if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));ipMap.get(Boolean.FALSE).clear();}// 保存需要返回给客户端的Instance信息ArrayNode hosts = JacksonUtils.createEmptyArrayNode();for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {List<Instance> ips = entry.getValue();// healthyOnly=true,说明客户端只要健康状态的Instance,会过滤掉不健康的的Instance,即key为false的时不处理if (healthyOnly && !entry.getKey()) {continue;}for (Instance instance : ips) {// 如果Instance不能接收外部的请求,跳过当前Instanceif (!instance.isEnabled()) {continue;}ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();// 省略了将instance的内从组装到ipObj里的过程hosts.add(ipObj);}}result.replace("hosts", hosts);....return result;}