RocketMQ源码-NameServer架构设计及启动流程

news/2024/10/30 15:27:34/

本文我们来分析NameServer相关代码,在正式分析源码前,我们先来回忆下NameServer的功能:

NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:

  • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;

  • 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后ProducerConumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

1. 架构设计

Broker启动的时候会向所有的NameServer注册,生产者在发送消息时会先从NameServer中获取Broker消息服务器的地址列表,根据负载均衡算法选取一台Broker消息服务器发送消息。NameServer与每台Broker之间保持着长连接,并且每隔10秒会检查Broker是否存活,如果检测到Broker超过120秒未发送心跳,则从路由注册表中将该Broker移除。

但是路由的变化不会马上通知消息生产者,这是为了降低NameServe的复杂性,所以在RocketMQ中需要消息的发送端提供容错机制来保证消息发送的高可用性,这在后续关于RocketMQ消息发送的章节会介绍。

2. 启动流程源码分析

2.1 主方法:NamesrvStartup#main

NameServer位于RocketMq项目的namesrv模块下,主类是org.apache.rocketmq.namesrv.NamesrvStartup,代码如下:

public class NamesrvStartup {...public static void main(String[] args) {main0(args);}public static NamesrvController main0(String[] args) {try {// 创建 controllerNamesrvController controller = createNamesrvController(args);// 启动start(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;}...
}
复制代码

可以看到,main()方法里的代码还是相当简单的,主要包含了两个方法:

  • createNamesrvController(...):创建 controller
  • start(...):启动nameServer

接下来我们就来分析这两个方法了。

2.2 创建controllerNamesrvStartup#createNamesrvController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {// 省略解析命令行代码...// nameServer的相关配置final NamesrvConfig namesrvConfig = new NamesrvConfig();//  nettyServer的相关配置final NettyServerConfig nettyServerConfig = new NettyServerConfig();// 端口写死了。。。nettyServerConfig.setListenPort(9876);if (commandLine.hasOption('c')) {// 处理配置文件String file = commandLine.getOptionValue('c');if (file != null) {// 读取配置文件,并将其加载到 properties 中InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);// 将 properties 里的属性赋值到 namesrvConfig 与 nettyServerConfigMixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}// 处理 -p 参数,该参数用于打印nameServer、nettyServer配置,省略...// 将 commandLine 的所有配置设置到 namesrvConfig 中MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);// 检查环境变量:ROCKETMQ_HOMEif (null == namesrvConfig.getRocketmqHome()) {// 如果不设置 ROCKETMQ_HOME,就会在这里报错System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 省略日志配置...// 创建一个controllerfinal NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// 将当前 properties 合并到项目的配置中,并且当前 properties 会覆盖项目中的配置controller.getConfiguration().registerConfig(properties);return controller;
}
复制代码

这个方法有点长,不过所做的事就两件:

  1. 处理配置
  2. 创建NamesrvController实例

2.2.1 处理配置

咱们先简单地看下配置的处理。在我们启动项目中,可以使用-c /xxx/xxx.conf指定配置文件的位置,然后在createNamesrvController(...)方法中,通过如下代码

InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
复制代码

将配置文件的内容加载到properties对象中,然后调用MixAll.properties2Object(properties, namesrvConfig)方法将properties的属性赋值给namesrvConfig,``MixAll.properties2Object(...)`代码如下:

public static void properties2Object(final Properties p, final Object object) {Method[] methods = object.getClass().getMethods();for (Method method : methods) {String mn = method.getName();if (mn.startsWith("set")) {try {String tmp = mn.substring(4);String first = mn.substring(3, 4);// 首字母小写String key = first.toLowerCase() + tmp;// 从Properties中获取对应的值String property = p.getProperty(key);if (property != null) {// 获取值,并进行相应的类型转换Class<?>[] pt = method.getParameterTypes();if (pt != null && pt.length > 0) {String cn = pt[0].getSimpleName();Object arg = null;// 转换成intif (cn.equals("int") || cn.equals("Integer")) {arg = Integer.parseInt(property);// 其他类型如long,double,float,boolean都是这样转换的,这里就省略了    } else if (...) {...} else {continue;}// 反射调用method.invoke(object, arg);}}} catch (Throwable ignored) {}}}
}
复制代码

这个方法非常简单:

  1. 先获取到object中的所有setXxx(...)方法
  2. 得到setXxx(...)中的Xxx
  3. 首字母小写得到xxx
  4. properties获取xxx属性对应的值,并根据setXxx(...)方法的参数类型进行转换
  5. 反射调用setXxx(...)方法进行赋值

这里之后,namesrvConfignettyServerConfig就赋值成功了。

2.2.2 创建NamesrvController实例

我们再来看看createNamesrvController(...)方法的第二个重要功能:创建NamesrvController实例.

创建NamesrvController实例的代码如下:

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

我们直接进入NamesrvController的构造方法:

/*** 构造方法,一系列的赋值操作*/
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {this.namesrvConfig = namesrvConfig;this.nettyServerConfig = nettyServerConfig;this.kvConfigManager = new KVConfigManager(this);this.routeInfoManager = new RouteInfoManager();this.brokerHousekeepingService = new BrokerHousekeepingService(this);this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

构造方法里只是一系列的赋值操作,没做什么实质性的工作,就先不管了。

2.3 启动nameServerNamesrvStartup#start

让我们回到一开始的NamesrvStartup#main0方法,

public static NamesrvController main0(String[] args) {try {NamesrvController controller = createNamesrvController(args);start(controller);...} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}

接下来我们来看看start(controller)方法中做了什么,进入NamesrvStartup#start方法:

public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}// 初始化boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 关闭钩子,可以在关闭前进行一些操作Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));// 启动controller.start();return controller;
}

start(...)方法的逻辑也十分简洁,主要包含3个操作:

  1. 初始化,想必是做一些启动前的操作
  2. 添加关闭钩子,所谓的关闭钩子,可以理解为一个线程,可以用来监听jvm的关闭事件,在jvm真正关闭前,可以进行一些处理操作,这里的关闭前的处理操作就是controller.shutdown()方法所做的事了,所做的事也很容易想到,无非就是关闭线程池、关闭已经打开的资源等,这里我们就不深究了
  3. 启动操作,这应该就是真正启动nameServer服务了

接下来我们主要来探索初始化与启动操作流程。

2.3.1 初始化:NamesrvController#initialize

初始化的处理方法是NamesrvController#initialize,代码如下:

public boolean initialize() {// 加载 kv 配置this.kvConfigManager.load();// 创建 netty 远程服务this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);// netty 远程服务线程this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));// 注册,就是把 remotingExecutor 注册到 remotingServerthis.registerProcessor();// 开启定时任务,每隔10s扫描一次broker,移除不活跃的brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);// 省略打印kv配置的定时任务...// Tls安全传输,我们不关注if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {...}return true;
}

这个方法所做的事很明了,代码中都已经注释了,代码看着多,实际干的就两件事:

  1. 处理netty相关:创建远程服务与工作线程
  2. 开启定时任务:移除不活跃的broker

什么是NettyRemotingServer呢?在本文开篇介绍NamerServer的功能时,提到NameServer是一个简单的注册中心,这个NettyRemotingServer就是对外开放的入口,用来接收broker的注册消息的,当然还会处理一些其他消息,我们后面会分析到。

1. 创建NettyRemotingServer

我们先来看看NettyRemotingServer的创建过程:

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());this.serverBootstrap = new ServerBootstrap();this.nettyServerConfig = nettyServerConfig;this.channelEventListener = channelEventListener;int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 创建 publicExecutorthis.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 判断是否使用 epollif (useEpoll()) {// bossthis.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});// workerthis.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {// 这里也是创建了两个线程...}// 加载ssl上下文loadSslContext();
}

整个方法下来,其实就是做了一些赋值操作,我们挑重点讲:

  1. serverBootstrap:熟悉netty的小伙伴应该对这个很熟悉了,这个就是netty服务端的启动类
  2. publicExecutor:这里创建了一个名为publicExecutor线程池,暂时并不知道这个线程有啥作用,先混个脸熟吧
  3. eventLoopGroupBosseventLoopGroupSelector线程组:熟悉netty的小伙伴应该对这两个线程很熟悉了,这就是netty用来处理连接事件与读写事件的线程了,eventLoopGroupBoss对应的是netty的boss线程组,eventLoopGroupSelector对应的是worker线程组

到这里,netty服务的准备工作本完成了。

2. 创建netty服务线程池

让我们再回到NamesrvController#initialize方法,NettyRemotingServer创建完成后,接着就是netty远程服务线程池了:

this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

创建完成线程池后,接着就是注册了,也就是registerProcessor方法所做的工作:

this.registerProcessor();

registerProcessor()中 ,会把当前的 NamesrvController 注册到 remotingServer中:

private void registerProcessor() {if (namesrvConfig.isClusterTest()) {this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),this.remotingExecutor);} else {// 注册操作this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);}
}

最终注册到为NettyRemotingServerdefaultRequestProcessor属性:

@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}

好了,到这里NettyRemotingServer相关的配置就准备完成了,这个过程中一共准备了4个线程池:

  1. publicExecutor:暂时不知道做啥的,后面遇到了再分析
  2. eventLoopGroupBoss:处理netty连接事件的线程组
  3. eventLoopGroupSelector:处理netty读写事件的线程池
  4. remotingExecutor:暂时不知道做啥的,后面遇到了再分析

3. 创建定时任务

准备完netty相关配置后,接着代码中启动了一个定时任务:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}
}, 5, 10, TimeUnit.SECONDS);

这个定时任务位于NamesrvController#initialize方法中,每10s执行一次,任务内容由RouteInfoManager#scanNotActiveBroker提供,它所做的主要工作是监听broker的上报信息,及时移除不活跃的broker,关于源码的具体分析,我们后面再详细分析。

2.3.2 启动:NamesrvController#start

分析完NamesrvController的初始化流程后,让我们回到NamesrvStartup#start方法:

public static NamesrvController start(final NamesrvController controller) throws Exception {...// 启动controller.start();return controller;
}

接下来,我们来看看NamesrvController的启动流程:

public void start() throws Exception {// 启动nettyServerthis.remotingServer.start();// 监听tls配置文件的变化,不关注if (this.fileWatchService != null) {this.fileWatchService.start();}
}

这个方法主要调用了NettyRemotingServer#start,我们跟进去:

public void start() {...ServerBootstrap childHandler =// 在 NettyRemotingServer#init 中准备的两个线程组this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 省略 option(...)与childOption(...)方法的配置...// 绑定ip与端口.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}...
}

这个方法中,主要处理了NettyRemotingServer的启动,关于其他一些操作并非我们关注的重点,就先忽略了。

可以看到,这个方法里就是处理了一个netty的启动流程,关于netty的相关操作,非本文重点,这里就不多作说明了。这里需要指出的是,在netty中,如果Channel是出现了连接/读/写等事件,这些事件会经过Pipeline上的ChannelHandler上进行流转,NettyRemotingServer添加的ChannelHandler如下:

ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);

这些ChannelHandler只要分为几类:

  1. handshakeHandler:处理握手操作,用来判断tls的开启状态
  2. encoder/NettyDecoder:处理报文的编解码操作
  3. IdleStateHandler:处理心跳
  4. connectionManageHandler:处理连接请求
  5. serverHandler:处理读写请求

这里我们重点关注的是serverHandler,这个ChannelHandler就是用来处理broker注册消息、producer/consumer获取topic消息的,这也是我们接下来要分析的重点。

执行完NamesrvController#startNameServer就可以对外提供连接服务了。

3. 总结

本文主要分析了NameServer的启动流程,整个启动流程分为3步:

  1. 创建controller:这一步主要是解析nameServer的配置并完成赋值操作
  2. 初始化controller:主要创建了NettyRemotingServer对象、netty服务线程池、定时任务
  3. 启动controller:就是启动netty 服务

 


http://www.ppmy.cn/news/21423.html

相关文章

Cesium 渐变长方体实现-Shader

position获取: 1.1 在cesium中,可通过vec4 p = czm_computePosition();获取 模型坐标中相对于眼睛的位置矩阵 1.2 vec4 eyePosition = czm_modelViewRelativeToEye * p; // position in eye coordinates 获取eyePosition 1.3 v_positionEC = czm_inverseModelView * eyePo…

用光盘怎样重装电脑系统

用光盘怎样重装电脑系统&#xff1f;重装系统&#xff0c;听起来好像很难的样子。其实没那么难&#xff0c;用光盘装还是比较容易的。下面一起看看如何用光盘重装系统吧。 工具/原料&#xff1a; 系统版本&#xff1a;win7 品牌型号&#xff1a;联想yoga13 方法/步骤&#xf…

【C++题解】NOIP2015提高组 - 跳石头

✍个人博客&#xff1a;https://blog.csdn.net/Newin2020?spm1011.2415.3001.5343 &#x1f4da;专栏地址&#xff1a;杂题讲解 &#x1f4dd;原题地址&#xff1a;跳石头 &#x1f4e3;专栏定位&#xff1a;在这里我将整理一些其他比赛或面试的题解~ ❤️如果有收获的话&…

RabbitMQ面试题

什么是 MQ MQ(message queue)&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO 先入先出&#xff0c;只不过队列中存放的内容是 message 而已。 还是一种跨进程的通信机制&#xff0c;用于上下游传递消息。 在互联网架构中&#xff0c;MQ 是一种非常常见的…

弱网测试利器-Charles工具实战

一&#xff1a;弱网测试要点 二&#xff1a;利用抓包工具charles进行弱网设置&#xff0c;适用PC端和移动端&#xff08;IOS&#xff0f;Android&#xff09; 1、以charles 4.5.6版本为例&#xff0c;打开Proxy->Throttle Settings 2、打开Throttle Settings&#xff0c;界…

【Linux】十分钟快速了解Linux常用指令(建议收藏)

目录&#x1f496;一. 关机指令01. shutdown02. halt03. reboot&#x1f496;二. 常用指令04. ls05. pwd06. cd07. touch08. mkdir09. rm10. man11. cp(复制)12. mv指令13. nano14. cat15. less16. head17. tail18. find19. grep20. zip/unzip21. tar&#x1f496;三、 日期指令…

重写QTableView类解决鼠标右击、单击、双击问题(附使用方法)

目录 一.重写响应事件 1.区分单击和右击 如何使用 2.区分单击和双击 3.其他修改 二.eventFilter截获事件&#xff08;待验证&#xff09; 界面上的QTableView在点击右键想出现右键事件的时候&#xff0c;同时把单击对应的槽函数执行了一遍&#xff0c;所以需要处理做一下…

二分算法学习

&#x1f33c; 扎着马尾的姑娘&#xff0c;笑容温柔很善良自在的少年 - 要不要买菜 - 单曲 - 网易云音乐 前言 本来打算做蓝桥杯2022&#xff23;&#xff0b;&#xff0b;A组省赛F题青蛙过河的,看到标签显示"二分",第一时间竟然想不到二分是什么,所以来学习下 目录…