一.Nacos是什么?
Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。
Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。
来看一下Nacos的全景图
如图所示,他可以无缝的集成其他的中间件,如:
Spring Cloud、Apache Dubbo、Kubernetes等。
使用 Nacos 简化服务发现、配置管理、服务治理及管理的解决方案,让微服务的发现、管理、共享、组合更加容易。
二.Nacos架构图
1.服务 (Service)
服务是指一个或一组软件功能(例如特定信息的检索或一组操作的执行),其目的是不同的客户端可以为不同的目的重用(例如通过跨进程的网络调用)。Nacos 支持主流的服务生态,如 Kubernetes Service、gRPC|Dubbo RPC Service 或者 Spring Cloud RESTful Service.
2.服务注册中心 (Service Registry)
服务注册中心,它是服务,其实例及元数据的数据库。服务实例在启动时注册到服务注册表,并在关闭时注销。服务和路由器的客户端查询服务注册表以查找服务的可用实例。服务注册中心可能会调用服务实例的健康检查 API 来验证它是否能够处理请求。
3.服务元数据 (Service Metadata)
服务元数据是指包括服务端点(endpoints)、服务标签、服务版本号、服务实例权重、路由规则、安全策略等描述服务的数据
4.服务提供方 (Service Provider)
是指提供可复用和可调用服务的应用方
5.服务消费方 (Service Consumer)
是指会发起对某个服务调用的应用方
6.配置 (Configuration)
在系统开发过程中通常会将一些需要变更的参数、变量等从代码中分离出来独立管理,以独立的配置文件的形式存在。目的是让静态的系统工件或者交付物(如 WAR,JAR 包等)更好地和实际的物理运行环境进行适配。配置管理一般包含在系统部署的过程中,由系统管理员或者运维人员完成这个步骤。配置变更是调整系统运行时的行为的有效手段之一。
7.配置管理 (Configuration Management)
在数据中心中,系统中所有配置的编辑、存储、分发、变更管理、历史版本管理、变更审计等所有与配置相关的活动统称为配置管理。
8.名字服务 (Naming Service)
提供分布式系统中所有对象(Object)、实体(Entity)的“名字”到关联的元数据之间的映射管理服务,例如 ServiceName -> Endpoints Info, Distributed Lock Name -> Lock Owner/Status Info, DNS Domain Name -> IP List, 服务发现和 DNS 就是名字服务的2大场景。
9.配置服务 (Configuration Service)
在服务或者应用运行过程中,提供动态配置或者元数据以及配置管理的服务提供者。
这些都是在nacos官网的一些官方解释,更多的概念请访问其官网地址:
Nacos官网地址 https://nacos.io/en-us/
这是对看不懂英文文档的福利,很人性化的提供了一个中文文档,毕竟国内开源框架,不知道以后会不会像Dubbo和RocketMQ一样捐给Apache。
三.Nacos与其他注册中心对比
Nacos同时支持AP和CP模式,他根据服务注册选择临时和永久来决定走AP模式还是CP模式,他这里支持CP模式对于我的理解来说,应该是为了配置中心集群,因为nacos可以同时作为注册中心和配置中心,因为他的配置中心信息是保存在nacos里面的,假如因为nacos其中一台挂掉后,还没有同步配置信息,就可能发生配置不一致的情况, 配置中心的配置变更是服务端有监听器,配置中心发生配置变化,然后服务端会监听到配置发生变化,从而做出改变。
nacos异常情况 leader挂掉的情况:
1.不影响服务之间互相调用
2.不影响服务注册
3.不影响服务正常启动拉取配置文件
4.选举新leader差不多4,5秒钟
四.Nacos与其他配置中心对比
由于百度的Disconf不再维护,下面主要对比一下Spring Cloud Config、Apollo和Nacos
从配置中心角度来看,性能方面Nacos的读写性能最高,Apollo次之,Spring Cloud Config依赖Git场景不适合开放的大规模自动化运维API。功能方面Apollo最为完善,nacos具有Apollo大部分配置管理功能,而Spring Cloud Config不带运维管理界面,需要自行开发。Nacos的一大优势是整合了注册中心、配置中心功能,部署和操作相比Apollo都要直观简单,因此它简化了架构复杂度,并减轻运维及部署工作。
综合来看,Nacos的特点和优势还是比较明显的。
五.哪些企业在使用Naocs?
六.Nacos的应用实战
1.启动nacos服务端
首先从github上面下载下来一个nacos的源码,地址:https://github.com/alibaba/nacos/releases
这里以目前官网最新版本1.3.2为例,点击下方的assets,然后通过源码的方式去构建nacos,windows系统就下载zip文件,linux系统下载tar文件即可,我这里选择windows方式
下载完成之后解压,会解压成为一个maven项目
从nacos-1.3.2项目里面依次进入到distribution\target\nacos-server-1.3.2\nacos\bin目录里面,然后通过cmd命令直接启动,启动命令为startup.cmd -m standalone,standalone表示非集群启动,由于我这里没有部署nacos集群,所以就以此种方式启动。
看到successful即表示启动成功了
通过控制台还可以看到nacos的默认端口号为8848,不知道开发这个框架的人是不是收了8848钛金手机的广告费了
默认用户名密码都为nacos,直接登录即可
2.实现配置中心
首先在配置管理菜单里面的配置列表右上角,点击加号,去新建一个配置文件
在nacos中,通过dataId和group来确定到某个服务的配置文件,dataId的命名规则最好以服务名称为前缀,以文件格式为后缀,group的话自己定义一个即可。
配置文件创建完成之后,开始创建项目,因为稍后服务注册和服务发现要使用服务提供者和服务消费者体现的更明显,所以这里创建一个聚合工程。
在父工程里面添加springboot父工程的依赖,此处我使用的父版本为2.3.3。
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.3.RELEASE</version></parent>
添加是springboot启动器、nacos配置中心集成springcloud、nacos服务注册与发现集成springcloud等相关依赖。
<dependencyManagement><dependencies><!-- springboot-web启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.3.RELEASE</version></dependency><!-- springcloud整合nacos配置中心依赖 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId><version>2.1.0.RELEASE</version></dependency><!-- springcloud-nacos服务注册与发现依赖 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2.1.0.RELEASE</version></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><optional>true</optional></dependency></dependencies></dependencyManagement>
创建服务提供者,添加相关依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- springcloud整合nacos配置中心依赖 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><!-- springcloud-nacos服务注册与发现依赖 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
添加springboot启动类,由于刚才在nacos控制台创建的是以properties为后缀的dataId,所以这里创建properties文件,但是由于是配置中心,所以我们要创建bootstrap开头的配置文件。
这里大概解释一下bootstrap和application两种配置文件开头的区别:Spring Cloud 构建于 Spring Boot 之上,在 Spring Boot 中有两种上下文,一种是 bootstrap, 另外一种是 application, bootstrap 是应用程序的父上下文,也就是说 bootstrap 加载优先于 applicaton。bootstrap 主要用于从额外的资源来加载配置信息,还可以在本地外部配置文件中解密属性。这两个上下文共用一个环境,它是任何Spring应用程序的外部属性的来源。bootstrap 里面的属性会优先加载,它们默认也不能被本地相同配置覆盖。
bootstrap 配置文件有以下几个应用场景:使用 Spring Cloud Config 配置中心时,这时需要在 bootstrap 配置文件中添加连接到配置中心的配置属性来加载外部配置中心的配置信息;一些固定的不能被覆盖的属性一些加密/解密的场景;
在properties配置文件中添加相关配置属性
spring.application.name=nacos-provider
server.port=6123#springcloud实现nacos配置中心地址
spring.cloud.nacos.config.server-addr=localhost:8848
#在 Nacos Spring Cloud 中,dataId 的完整格式如下:
#${prefix}-${spring.profiles.active}.${file-extension}
#prefix 默认为 spring.application.name 的值,也可以通过配置项 spring.cloud.nacos.config.prefix来配置。
#spring.profiles.active 即为当前环境对应的 profile
#当 spring.profiles.active 为空时,对应的连接符 - 也将不存在,dataId 的拼接格式变成 ${prefix}.${file-extension}
#springcloud配置nacos配置中心的dataId的前缀
spring.cloud.nacos.config.prefix=${spring.application.name}
#springcloud配置nacos配置中心dataId配置文件后缀格式
spring.cloud.nacos.config.file-extension=properties
#组名
spring.cloud.nacos.config.group=nacos-provider1-group
创建一个controller,在里面通过@Value注解注入远程配置的属性,在当前controller里面通过springcloud原生注解@RefreshScope实现自动刷新功能
@Slf4j
@RestController
@RequestMapping("/provider")
@RefreshScope
public class ProviderController {@Value(value = "${nacos-info:hello nacos.}")private String nacosInfo;@NacosInjectedprivate NamingService namingService;@GetMapping("get")public String get() {log.info("服务提供者1被调用了。。。");return nacosInfo;}
}
浏览器去访问当前controller的路径,可以看到已经返回了nacos配置中心配置的内容了,然后去随便更改的话,后台服务器读取到的值也是实时刷新的。
在nacos控制台配置列表上对有一个示例代码,对应的有原生api、springboot、springcloud方式去实现配置中心的,这里就不做过多的讲解,直接贴一个原生api的方式。
/*** @Author: tian* @Date: 2020/8/15 13:18* @Desc: 通过nacos的api去获取nacos服务端的配置内容*/
public class NacosApiTest {public static void main(String[] args) throws Exception{// 默认端口号 8848String serverAddr = "localhost";// dataIdString dataId = "example";// 分组String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);ConfigService configService = NacosFactory.createConfigService(properties);String content = configService.getConfig(dataId, group, 5000);System.out.println(content);// 监听,数据发生变化的时候触发configService.addListener(dataId, group, new Listener() {@Overridepublic void receiveConfigInfo(String configInfo) {System.out.println("recieve:" + configInfo);}@Overridepublic Executor getExecutor() {return null;}});boolean isPublishOk = configService.publishConfig(dataId, group, "content");System.out.println(isPublishOk);Thread.sleep(3000);content = configService.getConfig(dataId, group, 5000);System.out.println(content);// 删除boolean isRemoveOk = configService.removeConfig(dataId, group);System.out.println(isRemoveOk);Thread.sleep(3000);content = configService.getConfig(dataId, group, 5000);System.out.println(content);Thread.sleep(300000);}
}
3.服务注册与发现
nacos集成springcloud实现服务注册与发现的依赖
<!-- springcloud-nacos服务注册与发现依赖 -->
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
在配置文件中添加
spring.cloud.nacos.discovery.server-addr=localhost:8848
表示开启服务发现功能,在启动类添加@EnableDiscoveryClient注解表示为服务发现的客户端,在springcloud1.5.8.RELEASE对应Edgeware.RELEASE不用添加该注解也能实现服务注册与发现。
启动相关的项目之后就可以在服务列表看到我们的服务了。
六.自己怎么去实现一个配置中心的功能?
1.服务器端的配置保存(持久化)
此处实现常见的方案就是基于数据库去持久化到本地磁盘或者直接通过io去操作文件。
2.服务器端提供访问api
一种是基于rpc通信,底层也就是netty,另外一种是基于http,restful风格的接口。
3.数据变化之后如何通知到客户端
push(服务端主动推送到客户端)? 还是pull(客户端主动拉取)?
push的问题:push一定要维持一个连接,zookeeper里面有一个session manager会话管理与客户端保持一个连接,如果客户端非常多的情况下,那么服务端的资源消耗也比较大,还要考虑连接的有效性,所以还要开发一个连接管理器。当然也有好处,就是客户端可以实时监听到配置的变化。
在 Pull 模式下,客户端需要定时从服务端拉取一次数据,由于定时任务会存在一定的时间间隔,所以不能保证数据的实时性。并且在服务端配置长时间不更新的情况下,客户端的定时任务会做一些无效的 Pull。
4.客户端如何去获得远程服务的数据
同3所述
5.安全性
进入控制台需要用户角色登录
6. 刷盘(本地缓存)
客户端每次访问都要发起远程访问吗?
能不能像dubbo那样设计一个本地缓存,通过一个单独的任务去不断的访问服务端,然后缓存到本地。
客户端如何判断配置不一样?
根据md5,此处的md5不是加密的意思,为签名的意思,可以理解为version,比如会有文件的修改时间,数据的修改时间等,每次修改的时候会做一个记录,变更一下此值。
Nacos下的方式
Nacos 采用的是 Pull 模式,但并不是简单的 Pull,而是一种长轮训机制,它结合 Push 和 Pull 两者的优势。客户端采用长轮训的方式定时发起 Pull 请求,去检查服务端配置信息是否发生了变更,如果发生了变更,则客户端会根据变更的数据获得最新的配置。所谓的长轮训,是客户端发起轮训请求之后,服务端如果有配置发生变更,就直接返回。
如果客户端发起 Pull 请求后,发现服务端的配置和客户端的配置是保持一致的,那么服务端会先 “Hold” 住这个请求,也就是服务端拿到这个连接之后在指定的时间段内一直不返回结果,直到这段时间内配置发生变化,服务端会把原来 “Hold” 住的请求进行返回。Nacos 服务端收到请求之后,先检查配置是否发生了变更,如果没有,则设置一个定时任务,延期 29.5s 执行,并且把当前的客户端长轮训加入 allSubs 队列。这个时候有两种方式触发该链接结果的返回:
第一种是在等待 29.5s 后触发自动检查机制,这个时候不管配置有没有发生变化,都会把结果返回客户端。而 29.5s 就是这个长连接保持的时间。
第二种是在 29.5s 内任意一个时刻,通过 Nacos Dashboard 或者 API 的方式对配置进行了修改,这会触发一个事件机制,监听该事件的任务会遍历 allSubs 队列,找到发生变更的配置项对应的 ClientLongPolling 任务,将变更的数据通过该任务的连接进行返回,就完成一次 “推送” 操作。
这样既能够保证客户端实时感知配置的变化,也降低了服务端的压力。其中,这个长连接的回话超时时间默认是 30s。
基于这种思想大概有了一个流程
七.浅谈Nacos配置中心的源码
1.通过一个工厂创建一个服务配置对象,服务配置对象根据我们的dataId、group、连接超时时间去拿到我们的配置
ConfigService configService = NacosFactory.createConfigService(properties);
String content = configService.getConfig(dataId, group, 5000);// 工厂模式创建出来一个配置服务
public static ConfigService createConfigService(Properties properties) throws NacosException {return ConfigFactory.createConfigService(properties);
}
2.通过反射创建了一个NacosConfigService类,实例化完成之后强转为他的接口ConfigService然后返回出去。
public static ConfigService createConfigService(Properties properties) throws NacosException {try {Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");Constructor constructor = driverImplClass.getConstructor(Properties.class);ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);return vendorImpl;} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}}
3.去看NacosConfigService的构造方法
public NacosConfigService(Properties properties) throws NacosException {String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);if (StringUtils.isBlank(encodeTmp)) {encode = Constants.ENCODE;} else {encode = encodeTmp.trim();}initNamespace(properties);// 通过http的方式发送请求,装饰器模式 agent = new MetricsHttpAgent(new ServerHttpAgent(properties));agent.start();// ClientWorker顾名思义,码农,跟具体的工作有关,把代理传给了码农worker = new ClientWorker(agent, configFilterChainManager, properties);
}
4.再点到ClientWorker的构造方法,创建了三个线程池
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {this.agent = agent;this.configFilterChainManager = configFilterChainManager;// Initialize the timeout parameterinit(properties);// 第一个为定时任务的线程池,看线程的名字应该是工作的,setDaemon(true)设置为守护线程executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker." + agent.getName());t.setDaemon(true);return t;}});// 看到longPoll的名字我们大概猜想一下他应该是做长轮循的executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());t.setDaemon(true);return t;}});// 指定延时的线程池,当这个线程初始化之后会延时1毫秒执行,每次执行的间隔时间为10毫秒, 可以看到该任务里面有checkConfigInfo(),也就是去检查配置的意思executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {checkConfigInfo();} catch (Throwable e) {LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);}}}, 1L, 10L, TimeUnit.MILLISECONDS);
}
5.这里做了一个分批处理,防止大数据量,为了保证原子性,cacheMap使用了AtomicReference,里面是一个map。看人家的注释写的也很清楚了,map的key就是groupKey,value就是缓存的数据。然后去拿到配置文件里面的数量,这里做了个向上取整,点到ParamUtil.getPerTaskConfigSize(),可以看到默认值为3000,currentLongingTaskCount的值为0,也就是说你只要有配置文件的话就会进入该任务去不停的执行。这里面的executorService也就是前面构造出来的长轮循的任务,里面执行了一个长连接的线程。
public void checkConfigInfo() {// 分任务int listenerSize = cacheMap.get().size();// 向上取整为批数int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {// 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}
}
6.从构造方法点进来看到该类实现了Runnable接口,所以一定会执行run方法,首先看到里面有一个属性为taskId,也就是每一个任务的编号,他是用来把任务进行标记的,进行分批处理。该方法里面的getServerConfig方法为核心,会以get的方式,根据dataId、group为参数发送http请求,得到远程的配置内容,拿到以后首先放到CacheData中,再通过一个listener告知CacheData中的变化。
class LongPollingRunnable implements Runnable {private int taskId;public LongPollingRunnable(int taskId) {this.taskId = taskId;}@Overridepublic void run() {List<CacheData> cacheDatas = new ArrayList<CacheData>();List<String> inInitializingCacheList = new ArrayList<String>();try {// check failover configfor (CacheData cacheData : cacheMap.get().values()) {// 根据taskId进行分组 ,也就是只把属于当前任务的集合放到cacheDatas里面if (cacheData.getTaskId() == taskId) {cacheDatas.add(cacheData);try {// 检查本地checkLocalConfig(cacheData);// 如果使用了本地的配置,就去检查本地的if (cacheData.isUseLocalConfigInfo()) {cacheData.checkListenerMd5();}} catch (Exception e) {LOGGER.error("get local config info error", e);}}}// check server configList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);for (String groupKey : changedGroupKeys) {String[] key = GroupKey.parseKey(groupKey);String dataId = key[0];String group = key[1];String tenant = null;if (key.length == 3) {tenant = key[2];}try {// 得到远程配置信息之后,首先会放入到CacheData中,该方法其实就是根据dataId 和group为http请求的参数,以固定的url去得到配置内容String content = getServerConfig(dataId, group, tenant, 3000L);CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));cache.setContent(content);LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",agent.getName(), dataId, group, tenant, cache.getMd5(),ContentUtils.truncateContent(content));} catch (NacosException ioe) {String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",agent.getName(), dataId, group, tenant);LOGGER.error(message, ioe);}}for (CacheData cacheData : cacheDatas) {if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {cacheData.checkListenerMd5();cacheData.setInitializing(false);}}inInitializingCacheList.clear();executorService.execute(this);} catch (Throwable e) {// If the rotation training task is abnormal, the next execution time of the task will be punishedLOGGER.error("longPolling error : ", e);executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);}}}
7.接下来我们看看他是怎么检查本地的
private void checkLocalConfig(CacheData cacheData) {final String dataId = cacheData.dataId;final String group = cacheData.group;final String tenant = cacheData.tenant;// 通过这一行我们可以看出是一个本地的文件缓存File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);// 没有 -> 有// 没有使用本地缓存,但是文件存在,也就是要从本地文件读取放到cacheData里面去,cacheData是在内存中的,这种情况就是在nacos控制台添加了一个配置之后,缓存到本地了if (!cacheData.isUseLocalConfigInfo() && path.exists()) {String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);String md5 = MD5.getInstance().getMD5String(content);cacheData.setUseLocalConfigInfo(true);cacheData.setLocalConfigInfoVersion(path.lastModified());cacheData.setContent(content);LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));return;}// 有 -> 没有。不通知业务监听器,从server拿到配置后通知。// 如果使用了本地配置但是文件不存在,就要从远程去拿了,因为本地配置不一定是最新的,所以一定要从远程拿if (cacheData.isUseLocalConfigInfo() && !path.exists()) {cacheData.setUseLocalConfigInfo(false);LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),dataId, group, tenant);return;}// 有变更// 使用本地的配置文件也存在,并且缓存的更新时间和文件最后的更新时间不一致,这种情况也就是本地文件发生了变化,要把本地文件放到内存中if (cacheData.isUseLocalConfigInfo() && path.exists()&& cacheData.getLocalConfigInfoVersion() != path.lastModified()) {String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);String md5 = MD5.getInstance().getMD5String(content);cacheData.setUseLocalConfigInfo(true);cacheData.setLocalConfigInfoVersion(path.lastModified());cacheData.setContent(content);LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));}
}
8.检查md5,如果当前md5和上次不一致,就设置通知,这大概就是本地检查的一种机制。
void checkListenerMd5() {for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) {safeNotifyListener(dataId, group, content, md5, wrap);}}
}
safeNotifyListener()方法里面就是根据dataId,group以及md5调用listener的receiveConfigInfo()方法进行回调,然后把内容给输出出去。和原生api里面回调函数一样。
9.本地检查完之后,再回过头来去看远程检查,就是在6中的run方法里面,从该run方法中点到checkUpdateDataIds()方法,接着我们点到checkUpdateConfigStr(sb.toString(), isInitializingCacheList)方法中
/*** 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。*/List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {StringBuilder sb = new StringBuilder();for (CacheData cacheData : cacheDatas) {// 如果没有使用本地配置,就把dataId和group以及MD5为参数拼接到一起发送远程调用if (!cacheData.isUseLocalConfigInfo()) {sb.append(cacheData.dataId).append(WORD_SEPARATOR);sb.append(cacheData.group).append(WORD_SEPARATOR);if (StringUtils.isBlank(cacheData.tenant)) {sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);} else {sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);}if (cacheData.isInitializing()) {// cacheData 首次出现在cacheMap中&首次check更新inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));}}}boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);}
10.这里面看的就很清楚了,就是发送一个http请求,根据得到的正确状态码把返回的内容进行解析返回,最终得到的这个groupKey就是dataId+group+tenant
/*** 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。*/List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);List<String> headers = new ArrayList<String>(2);headers.add("Long-Pulling-Timeout");headers.add("" + timeout);// told server do not hang me up if new initializing cacheData added inif (isInitializingCacheList) {headers.add("Long-Pulling-Timeout-No-Hangup");headers.add("true");}if (StringUtils.isBlank(probeUpdateString)) {return Collections.emptyList();}try {HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,agent.getEncode(), timeout);// 判断是否请求成功if (HttpURLConnection.HTTP_OK == result.code) {setHealthServer(true);// 解析内容并返回return parseUpdateDataIdResponse(result.content);} else {setHealthServer(false);LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);}} catch (IOException e) {setHealthServer(false);LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);throw e;}return Collections.emptyList();}
11.把该结果返回到刚刚的run方法里面,然后进行for循环遍历,根据返回的结果再调用getServerConfig()方法
12.这个里面很简单,就是组装请求的参数,发送调用得到配置。
public String getServerConfig(String dataId, String group, String tenant, long readTimeout)throws NacosException {if (StringUtils.isBlank(group)) {group = Constants.DEFAULT_GROUP;}HttpResult result = null;try {List<String> params = null;if (StringUtils.isBlank(tenant)) {params = Arrays.asList("dataId", dataId, "group", group);} else {params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);}// 发送get方式的http请求得到远程的配置内容result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);} catch (IOException e) {String message = String.format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),dataId, group, tenant);LOGGER.error(message, e);throw new NacosException(NacosException.SERVER_ERROR, e);}......
}
13.得到内容之后,会缓存到本地的内存cacheData中,更新完之后,还要检查是否有变化发生,如果有变化,有根据一个listener通知到客户端,和步骤8中的检查监听本地一样,到这里,远程拿配置的流程也大概结束了。
八.根据Nacos配置中心的实现
1.长轮循:客户端发送一个请求到服务端,服务端把这个请求给hold住,当数据发生变化的时候,服务端会把数据响应到客户端,也就是客户端从发送请求到服务端返回数据,会有一个hold状态;hold就是服务端没有给客户端返回数据。
2.服务端hold住请求之后,应该什么时候给出响应?
服务端的配置发生变化的时候;固定的时间去设置检查。
在ClientWorker里面有个checkUpdateDataIds()方法,从该方法
点进到checkUpdateConfigStr()方法,会有一个agent.httpPost()方法,
也就是基于http请求,此方法的最后一个参数为timeout,顾名思义,就是超时时间。
找到timeout在init()方法里面赋值的,此处NumberUtils.toInt()方法为读取配置文件的,
如果配置文件没配置的话就是用默认的,也就是第二个参数,为30000毫秒,
然后Math.max()方法为比较两个时间,如果NumberUtils.toInt()方法返回的时间大于后面的默认最小时间,就返回第一个参数,否则就是使用默认的最小时间,也就是10000毫秒。
3.大概画一下吧
4.不管我们是通过api还是dashboard去修改配置内容,可以再logs文件里面找到一个config-client-request.log文件
5.这个文件里面就是使用的长轮循一直请求服务端,第一个参数为时间,第二个参数为超时时间,但是这里确实29.5秒,下面会有讲到为什么
6.接下来去看一下他的请求,由之前的源码得知他的请求最终是以post的http请求发送的
// CONFIG_CONTROLLER_PATH这个变量为v1/cs/configsHttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,agent.getEncode(), timeout);
7.带着这个请求的路径我们去源码中找到这个请求的方法,在config模块里面的controller包下的ConfigController里面
8.前面的MD5处理,直接看最后一行doPollingConfig()方法,他对长轮循和短轮训都可以处理,由于我们发送的是一个长轮循,所以我们直接看if里面的addLongPollingClient()方法
/*** 轮询接口.*/public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {// 长轮循if (LongPollingService.isSupportLongPolling(request)) {longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);return HttpServletResponse.SC_OK + "";}// else兼容短轮训逻辑List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);// Compatible with short polling result.String oldResult = MD5Util.compareMd5OldResult(changedGroups);String newResult = MD5Util.compareMd5ResultString(changedGroups);String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);if (version == null) {version = "2.0.0";}int versionNum = Protocol.getVersionNumber(version);// 2.0.4 version,返回值放入header中if (versionNum < START_LONG_POLLING_VERSION_NUM) {response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);} else {request.setAttribute("content", newResult);}Loggers.AUTH.info("new content:" + newResult);// Disable cache.response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);return HttpServletResponse.SC_OK + "";}
9.再来看看addLongPollingClient()方法,他在LongPollingService这个类里面,这里面我们可以看到他会先拿到一个30秒的超时时间,然后会有0.5秒的延时处理,这就能解释为什么客户端响应超时时间是29.5+了。当然如果isFixedPolling=true 的情况下,不会提前返回响应根据客户端请求过来的md5和服务器端对应的group下对应内容的md5进行比较,如果不一致,则通过generateResponse 将结果返回
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {// 拿到一个timeout,默认为30sString str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);String tag = req.getHeader("Vipserver-Tag");// 0.5s的延迟int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.// 提前500ms返回,避免超时,这也解释了我们上面在日志文件中看到是29.5slong timeout = Math.max(10000, Long.parseLong(str) - delayTime);if (isFixedPolling()) {timeout = Math.max(10000, getFixedPollingInterval());// Do nothing but set fix polling timeout.} else {long start = System.currentTimeMillis();// 通过一个MD5比较,拿着服务端的MD5和我们发起请求的MD5比较List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);// 如果发现有改变,直接返回,不需要再等待29.5s,if (changedGroups.size() > 0) {generateResponse(req, rsp, changedGroups);// 写入到日志文件中LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,changedGroups.size());return;} // 得到远程的ipString ip = RequestUtil.getRemoteIp(req);// Must be called by http thread, or send response.// 一定要由http线程调用,否则离开后容器会立即发送响应final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout() is incorrect, Control by oneself// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制asyncContext.setTimeout(0L);// 如果配置文件没有发生变化,则通过scheduler.execute 启动了一个定时任务,将客户端的长轮询// 请求封装成一个叫 ClientLongPolling 的任务,交给 scheduler 去执行ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}
10.然后通过generatorResponse()方法把结果写出去
void generateResponse(HttpServletRequest request, HttpServletResponse response, List<String> changedGroups) {if (null == changedGroups) {return;}try {final String respString = MD5Util.compareMd5ResultString(changedGroups);// Disable cache.response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);response.getWriter().println(respString);} catch (Exception ex) {PULL_LOG.error(ex.toString(), ex);}}
11.接下来我们来分析一下,clientLongPolling到底做了什么操作。或者说我们可以先猜测一下应该会做什么事情
- 这个任务要阻塞29.5s才能执行,因为立马执行没有任何意义,毕竟前面已经执行过一次了
- 如果在29.5s+之内,数据发生变化,需要提前通知。需要有一种监控机制
基于这些猜想,我们可以看看它的实现过程
从代码粗粒度来看,它的实现似乎和我们的猜想一致,在run方法中,通过scheduler.schedule实现了一个定时任务,它的delay时间正好是前面计算的29.5s。在这个任务中,会通过MD5Util.compareMd5来进行计算
那另外一个,当数据发生变化以后,肯定不能等到29.5s之后才通知呀,那怎么办呢?我们发现有一个allSubs 的东西,它似乎和发布订阅有关系。那是不是有可能当前的clientLongPolling订阅了数据变化的事件呢?
public void run() {asyncTimeoutFuture = scheduler.schedule(new Runnable() {@Overridepublic void run() {try {getRetainIps().put(ClientLongPolling.this.ip,System.currentTimeMil·1·s());/*** 删除订阅关系*/allSubs.remove(ClientLongPolling.this);if (isFixedPolling()) {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime), "fix",RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()), "polling",clientMd5Map.size(), probeRequestSize);List<String> changedGroups = MD5Util.compareMd5((HttpServletRequest) asyncContext.getRequest(),(HttpServletResponse) asyncContext.getResponse(),clientMd5Map);if (changedGroups.size() > 0) {sendResponse(changedGroups);} else {sendResponse(null);}} else {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime), "timeout",RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),"polling", clientMd5Map.size(), probeRequestSize);sendResponse(null);}} catch (Throwable t) {LogUtil.defaultLog.error("long polling error:" + t.getMessage(),t.getCause());}}}, timeoutTime, TimeUnit.MILLISECONDS);allSubs.add(this);}
12.allSubs是一个队列,队列里面放了ClientLongPolling这个对象。这个队列似乎和配置变更有某种关联关系
/**
* 长轮询订阅关系
*/
final Queue<ClientLongPolling> allSubs;
allSubs.add(this);
13.在LongPollingService的无参构造方法中,通知中心注册了一个订阅,这个订阅为一个内部类,里面有个onEvent事件,判断事件类型是否为LocalDataChangeEvent,然后通过scheduler.execute执行DataChangeTask这个任务
public LongPollingService() {allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);// Register LocalDataChangeEvent to NotifyCenter.NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);// Register A Subscriber to subscribe LocalDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if (isFixedPolling()) {// Ignore.} else {if (event instanceof LocalDataChangeEvent) {LocalDataChangeEvent evt = (LocalDataChangeEvent) event;ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));}}}@Overridepublic Class<? extends Event> subscribeType() {return LocalDataChangeEvent.class;}});}
14.DataChangeTask.run从名字可以看出来,这个是数据变化的任务,最让人兴奋的应该是,它里面有一个循环迭代器,从allSubs里面获得ClientLongPolling,最后通过clientSub.sendResponse把数据返回到客户端。所以,这也就能够理解为何数据变化能够实时触发更新了。
public void run() {try {ConfigService.getContentBetaMd5(groupKey);for (Iterator<ClientLongPolling> iter = allSubs.iterator();iter.hasNext(); ) {ClientLongPolling clientSub = iter.next();if (clientSub.clientMd5Map.containsKey(groupKey)) {// 如果beta发布且不在beta列表直接跳过if (isBeta && !betaIps.contains(clientSub.ip)) {continue;}// 如果tag发布且不在tag列表直接跳过if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {continue;}getRetainIps().put(clientSub.ip, System.currentTimeMillis());iter.remove(); // 删除订阅关系LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - changeTime), "in-advance",RequestUtil.getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),"polling",clientSub.clientMd5Map.size(),clientSub.probeRequestSize, groupKey);clientSub.sendResponse(Arrays.asList(groupKey));}}} catch (Throwable t) {LogUtil.defaultLog.error("data change error:" + t.getMessage(),t.getCause());}}
15.接下来再看下数据变化的处理过程怎么触发事件的,我们在控制台更新一次配置,看一下请求的接口
16.可以看到还是在ConfigController里面,找到对应的方法,可以看到这个方法的参数就是如我们在控制台所配置的datId、group、内容、描述等参数,然后就是通过persistService调用其insertOrUpdate()方法更新数据,然后通过一个事件监听配置的变化,ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())),这里发布的事件是ConfigDataChangeEvent , 而LongPollingService感兴趣的事件是LocalDataChangeEvent。在Nacos中有一个DumpService,它会定时把变更后的数据dump到磁盘上,DumpService里面的init方法通过@PostConstruct注解被调用。然后在任务执行结束之后,会触发一个LocalDataChangeEvent 的事件,至此配置中心的数据变更和通知大概流程就结束了。
@PostMapping@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,@RequestParam(value = "appName", required = false) String appName,@RequestParam(value = "src_user", required = false) String srcUser,@RequestParam(value = "config_tags", required = false) String configTags,@RequestParam(value = "desc", required = false) String desc,@RequestParam(value = "use", required = false) String use,@RequestParam(value = "effect", required = false) String effect,@RequestParam(value = "type", required = false) String type,@RequestParam(value = "schema", required = false) String schema) throws NacosException {final String srcIp = RequestUtil.getRemoteIp(request);final String requestIpApp = RequestUtil.getAppName(request);// check tenantParamUtils.checkTenant(tenant);ParamUtils.checkParam(dataId, group, "datumId", content);ParamUtils.checkParam(tag);Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);ParamUtils.checkParam(configAdvanceInfo);if (AggrWhitelist.isAggrDataId(dataId)) {LOGGER.warn("[aggr-conflict] {} attemp to publish single data, {}, {}", RequestUtil.getRemoteIp(request),dataId, group);throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");}final Timestamp time = TimeUtils.getCurrentTime();String betaIps = request.getHeader("betaIps");ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);configInfo.setType(type);if (StringUtils.isBlank(betaIps)) {if (StringUtils.isBlank(tag)) {persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));} else {persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));}} else {// beta publishpersistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));}ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIp(),ConfigTraceService.PERSISTENCE_EVENT_PUB, content);return true;}
九.Nacos服务端的数据存储
1.在单机模式下,默认使用的是derby数据库存储的,在nacos的bin目录下可以看到有一个derby.log的文件
2.我们可以改为自定义的数据库配置,从上面的bin目录退一级然后进入到conf文件夹里面可以里面有一个nacos-mysql.sql文件,我们导入到自己的mysql数据库中
3.可以看到它里面有这些表
4.然后修改application.properties配置文件,改变数据库的配置为我们自己的mysql配置。
#*************** Config Module Related Configurations ***************#
### If use MySQL as datasource:
#数据库类型
spring.datasource.platform=mysql### Count of DB:
db.num=1### Connect URL of DB:
#msyql连接信息
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user=root
db.password=rootnacos.security.ignore.urls=/,/error,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/v1/auth/**,/v1/console/health/**,/actuator/**,/v1/console/server/**