rocketmq基本架构

news/2025/1/19 7:40:22/

在这里插入图片描述

简介

Name server

  • 负责broker注册、心跳,路由等功能,类似Kafka的ZK
  • name server节点之间不互相通信,broker需要和所有name server进行通信。扩容name server需要重启broker,不然broker不会和name server建立连接
  • producer和consumer也都是和name server建立长连接,获取路由信息,拿到对应的broker信息,与broker建立长连接,然后发送/消费消息

路由发现

Pull的模式。当topic路由信息发生变化时,name server不回主动推送给客户端,而是客户端定时拉取。默认客户端每30秒会拉取一次最新的路由信息

扩展:

1)push模型:实时性好,但是需要维护一个长链接,消耗服务端资源。client数量不多,实时性要求高,server数据变化比较频繁的场景适合此种模式

2)pull模型:实时性差

3)long
polling模型:长轮询模式。客户端定时发送拉取请求,服务端会hold住连接一段时间,在此期间的数据变动通过此连接推送。超过hold时间后才断开连接。兼顾以上两种方式

Broker

  • broker每30s给name server发送一次心跳,name server每120s检查一次所有的broker心跳时间,超过阈值踢出broker
  • broker节点集群是主从集群,master负责处理读写请求,slave负责对master中的数据进行备份。master和slave有相同的broker name,但broker id不同,broker id为0的是master,非0的是slave。每个broker与name server集群中的所有节点建立长连接,定时注册topic信息到所有name server

源码分析

NameServer

NameServer的启动过程分析

NameServer服务器相关的源码在namesrv模块下,目录结构如下:
在这里插入图片描述

NamesrvStartup类就是Name Server服务器启动的启动类,NamesrvStartup类中有一个main启动类,main方法调用main0,main0主要流程代码
在这里插入图片描述

main0 方法的主要作用就是创建Name Server服务器的控制器,并且启动Name Server服务器的控制器。NamesrvController类的作用就是为Name Server服务的启动提供具体的逻辑实现,主要包括配置信息的加载、远程通信服务器的创建和加载、默认处理器的注册以及心跳检测机器监控Broker的健康状态等。Name Server服务器的控制器的创建方法为createNamesrvController方法,createNamesrvController方法的主要流程代码如下:

//代码位置:org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController
public static NamesrvController createNamesrvController(String[] args){//设置rocketMQ的版本信息,REMOTING_VERSION_KEY的值为:rocketmq.remoting.version,CURRENT_VERSION的值为:V4_7_0System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//构建命令行,添加帮助命令和Name Server的提示命令,将createNamesrvController方法的args参数进行解析//代码省略//nameServer 服务器配置类和netty 服务器配置类final NamesrvConfig namesrvConfig = new NamesrvConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();//设置netty服务器的监听端口nettyServerConfig.setListenPort(9876);// 判断上述构建的命令行是否有configFile(缩写为C)配置文件,如果有的话,则读取configFile配置文件的配置信息,// 并将转为NamesrvConfig和NettyServerConfig的配置信息// 代码省略// 如果构建的命令行存在字符'p',就打印所有的配置信息病区退出方法// 代码省略//首先将构建的命令行转换为Properties,然后将通过反射的方式将Properties的属性转换为namesrvConfig的配置项和配置值。MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);//打印nameServer 服务器配置类和 netty 服务器配置类的配置信息MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);//将namesrvConfig和nettyServerConfig作为参数创建nameServer 服务器的控制器final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);//将所有的配置保存在内存中(Properties)controller.getConfiguration().registerConfig(properties);return controller;
}

createNamesrvController方法主要做了几件事,读取和解析配置信息,包括Name Server服务的配置信息、Netty 服务器的配置信息、打印读取或者解析的配置信息、保存配置信息到本地文件中,以及根据namesrvConfig配置和nettyServerConfig配置作为参数创建nameServer 服务器的控制器。创建好Name server控制器以后,就可以启动它了。启动Name Server的方法的主流程如下:

//代码位置:org.apache.rocketmq.namesrv.NamesrvStartup#start
public static NamesrvController start(final NamesrvController controller){//初始化nameserver 服务器,如果初始化失败则退出boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}//添加关闭的钩子,进行内存清理、对象销毁等惭怍Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));//启动controller.start();
}

start方法没什么逻辑,主要作用就是进行初始化工作,然后进行启动Name Server控制器,接下来看看进行了哪些初始化工作以及如何启动Name Server的,初始化initialize方法的主要流程如下:

//代码位置:org.apache.rocketmq.namesrv.NamesrvStartup#initialize
public boolean initialize() {// key-value 配置加载this.kvConfigManager.load();// //创建netty远程服务器,用来进行网络传输以及通信this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);//远程服务器线程池this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));//注册处理器this.registerProcessor();//每10秒扫描不活跃的brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);//每10秒打印配置信息(key-value)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);//省略部分代码return true;}

initialize方法的主要逻辑如下:

加载配置文件。读取文件名为"user.home/namesrv/kvConfig.json"(其中user.home为用户的目录),然后将读取的文件内容转为KVConfigSerializeWrapper类,最后将所有的key-value保存在如下map中:

//用来保存不同命名空间的key-value private final HashMap<String/* Namespace /,
HashMap<String/
Key /, String/ Value */>> configTable = new
HashMap<String, HashMap<String, String>>();

  • 创建Netty服务器。Name Server 用netty与生产者、消费者以及Boker进行通信。
  • 注册处理器。这里主要注册的是默认的处理器DefaultRequestProcessor,注册的逻辑主要是初始化DefaultRequestProcessor并保存着,待需要使用的时候直接使用。处理器的作用就是处理生产者、消费者以及Broker服务器的不同请求,比如获取生产者和消费者获取所有的路由信息,Broker服务器注册路由信息等。处理器DefaultRequestProcessor处理不同的请求将会在下面进行讲述。
  • 执行定时任务。主要有两个定时任务,一个是每十秒扫描不活跃的Broker。并且将过期的Broker清理掉。另外一个是每十秒打印key-valu的配置信息。

上面就是initialize方法的主要逻辑,特别需要注意每10秒扫描不活跃的broker的定时任务:

//NamesrvController.this.routeInfoManager.scanNotActiveBroker();
//代码位置:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBrokerpublic void scanNotActiveBroker() {//所有存活的BrokerIterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();//遍历Brokerwhile (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();//最后更新时间加上broker过期时间(120秒)小于当前时间,则关闭与broker的远程连接。并且将缓存在map中的broker信息删除if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();//将过期的Channel连接清理掉。以及删除缓存的Brokerthis.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}}

scanNotActiveBroker方法的逻辑主要是遍历缓存在brokerLiveTable的Broker,将Broker最后更新时间加上120秒的结果是否小于当前时间,如果小于当前时间,说明Broker已经过期,可能是已经下线了,所以可以清除Broker信息,并且关闭Name Server 服务器与Broker服务器连接,这样被清除的Broker就不会与Name Server服务器进行远程通信了。brokerLiveTable的结果如下:

//保存broker地址与broker存活信息的对应关系
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

brokerLiveTable缓存着以brokerAddr为key(Broker 地址),以BrokerLiveInfo为value的结果,BrokerLiveInfo是Broker存活对象,主要有如下几个属性:

class BrokerLiveInfo {//最后更新时间private long lastUpdateTimestamp;//版本信息private DataVersion dataVersion;//连接private Channel channel;//高可用服务器地址private String haServerAddr;//省略代码
}

从BrokerLiveInfo中删除了过期的Broker后,还需要做清理Name Server服务器与Broker服务器的连接,onChannelDestroy方法主要是清理缓存在如下map的信息:

////代码位置:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager//保存broker地址与broker存活信息的对应关系
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//保存broker地址与过滤服务器的对应关系,Filter Server 与消息过滤有关系
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//保存broker 名字与 broker元数据的关系
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//保存集群名字与集群下所有broker名字对应的关系
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//保存topic与topic下所有队列元数据的对应关系private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

在扫描过期的broker时,首先找到不活跃的broker,然后onChannelDestroy方法清理与该不活跃broker有关的缓存,清理的主要流程如下:

  • 清理不活跃的broker存活信息。首先遍历brokerLiveTable找到不活跃的broker,然后删除brokerLiveTable中的与该不活跃的broker有关的缓存信息。
  • 清理与消息过滤有关的缓存。找到不活跃的broker存活信息,删除filterServerTable中的与该broker地址有关的消息过滤的服务信息。
  • 清理与不活跃broker的元素居。brokerAddrTable保存着broker名字与broker元素居对应的信息,BrokerData类保存着cluster、brokerName、brokerId与broker name。遍历brokerAddrTable找到与该不活跃broker的名字相等的broker元素进行删除。
  • 清理集群下对应的不活跃broker名字。clusterAddrTable保存集群名字与集群下所有broker名字对应的关系,遍历clusterAddrTable的所有key,从clusterAddrTable中找到与不活跃broker名字相等的元素,然后删除。
  • 清理与该不活跃broker的topic对应队列数据。topicQueueTable保存topic与topic下所有队列元数据的对应关系,QueueData保存着brokerName、readQueueNums(可读队列数量)、writeQueueNums(可写队列数量)等。遍历topicQueueTable的key,找到与不活跃broker名字相同的QueueData进行删除。

初始化nameserver 服务器以后,接下来就可以启动nameserver 服务器:

//代码位置:org.apache.rocketmq.namesrv.NamesrvController#start
public void start() throws Exception {//启动远程服务器(netty 服务器)this.remotingServer.start();//启动文件监听线程if (this.fileWatchService != null) {this.fileWatchService.start();}
}

start方法做了两件事,第一件就是启动netty服务器,netty服务器主要负责与Broker、生产者与消费者之间的通信,处理Broker、生产者与消费者的不同请求。根据nettyConfig配置,设置启动的配置和各种处理器,然后采用netty服务器启动的模板启动服务器,具体的代码就不分析了,有兴趣的可以看看netty启动代码模板是怎么样的。第二件事就是启动文件监听线程,监听tts相关文件是否发生变化。

Name Server 服务器启动流程的源代码分析到此为止了,在这里总结下Name Server 服务器启动流程主要做了什么事:

  • 加载和读取配置。设置Name Server 服务器启动的配置NamesrvConfig和启动Netty服务器启动的配置NettyServerConfig。
  • 初始化相关的组件。netty服务类、远程服务线程池、处理器以及定时任务的初始化。
  • 启动Netty服务器。Netty服务器用来与broker、生产者、消费者进行通信、处理与它们之间的各种请求,并且对请求的响应结果进行处理。

Broker管理和路由信息的管理

Name Server 服务器的作用主要有两个:Broker管理和路由信息管理。

Broker管理

在上面分析的Name Server 服务器的启动过程中,也有一个与Broker管理相关的分析,那就是启动一个定时线程池每十秒去扫描不活跃的Broker。将不活跃的Broker清理掉。除了在Name Server 服务器启动时启动定时任务去扫描不活跃的Broker外,Name Server 服务器启动以后,通过netty服务器接收Broker、生产者、消费者的不同请求,将接收到请求会交给在Name Server服务器启动时注册的处理器DefaultRequestProcessor类的processRequest方法处理。processRequest方法根据请求的不同类型,将请求交给不同的方法进行处理。有关Broker管理的请求主要有注册Broker、注销Broker,processRequest方法处理注册Broker、注销Broker请求的代吗如下:

//代码位置:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequestpublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request)  {switch (request.getCode()) {//省略无关代码//注册Brokercase RequestCode.REGISTER_BROKER:Version brokerVersion = MQVersion.value2Version(request.getVersion());if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);} else {return this.registerBroker(ctx, request);}//注销Brokercase RequestCode.UNREGISTER_BROKER:return this.unregisterBroker(ctx, request);//省略无关代码}}
Broker注册

Broker 服务器启动时,会向Name Server 服务器发送Broker 相关的信息,如集群的名字、Broker地址、Broker名字、topic相关信息等,注册Broker主要的代码比较长,接下来会分成好几部分进行讲解。如下:

//代码位置:org.apache.rocketmq.namesrv.processor.RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();this.lock.writeLock().lockInterruptibly();//根据集群的名字获取所有的broker名字Set<String> brokerNames = this.clusterAddrTable.get(clusterName);if (null == brokerNames) {brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);}//名字保存在broker名字中brokerNames.add(brokerName);//省略代码
}

registerBroker方法根据集群的名字获取该集群下所有的Broker名字的Set,如果不存在就创建并添加进clusterAddrTable中,clusterAddrTable保存着集群名字与该集群下所有的Broker名字对应关系,最后将broker名字保存在set中。

public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {//省略代码boolean registerFirst = false;//获取broker 元数据BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);}//获取所有的broker地址Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);registerFirst = registerFirst || (null == oldAddr);//省略代码}

上述代码主要做了两件事:

  • 缓存broker元数据信息。首先根据broker名字从brokerAddrTable中获取Broker元数据brokerData,如果brokerData不存在,说明是第一次注册,创建Broker元数据并添加进brokerAddrTable中,brokerAddrTable保存着Broker名字与Broker元数据对应的信息。
  • 从Broker元数据brokerData中获取该元数据中的所有Broker地址信息brokerAddrsMap。brokerAddrsMap保存着brokerId与所有Broker名字对应信息。遍历brokerAddrsMap中的所有broker地址,查找与参数brokerAddr相同但是与参数borkerId不同的进行删除,保证一个broker名字对应着BrokerId,最后将参数brokerId与参数brokerAddr保存到brokerData元数据的brokerAddrsMap中进行缓存。
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {//省略代码//如果topic的配置不空并且是broker masterif (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) {//如果topic配置改变或者是第一次注册if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {//获取所有的topic配置ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {//遍历topic配置,创建并更新队列元素for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}//省略代码}

如果参数topicConfigWrapper不等于空,并且brokerId等于0时,判断topic是否改变,如果topic改变或者是第一次注册,获取所有的topic配置,并创建和更新队列元数据。QueueData保存着队列元数据,如Broker名字、写队列数量、读队列数量,如果队列缓存中不存在该队列元数据,则添加,否则遍历缓存map找到该队列元数据进行删除,如果是新添加的则添加进队列缓存中。

public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {//省略代码//创建broker存活对象,并进行保存BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}//如果过滤服务地址不为空,则缓存到filterServerTableif (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}//如果不是broker master,获取高可用服务器地址以及master地址if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}return result;}

最后代码片段,主要做了三件事,首先创建了Broker存活对象BrokerLiveInfo,添加到brokerLiveTable中缓存,在Name Server 启动时,供定时线程任务每十秒进行扫描。以确保非正常的Broker被清理掉。然后是判断参数filterServerList是否为空,如果不为空,则添加到filterServerTable缓存,filterServerTable保存着与消息过滤相关的过滤服务。最后,判断该注册的Broker不是Broker master,则设置高可用服务器地址以及master地址。到此为止,Broker注册的代码就分析完成了,总而言之,Broker注册就是Broker将相关的元数据信息,如Broker名字,Broker地址、topic信息发送给Name Server服务器,Name Server接收到以后将这些元数据缓存起来,以供后续能够快速找到这些元数据,生产者和消费者也可以通过Name Server服务器获取到Broke相关的信息,这样,生产者和消费者就可以和Broker服务器进行通信了,生产者发送消息给Broker服务器,消费者从Broker服务器消费消息。

Broker注销

Broker注销的过程刚好跟Broker注册的过程相反,Broker注册是将Broker相关信息和Topic配置信息缓存起来,以供生产者和消费者使用。而Broker注销则是将Broker注销缓存的Broker信息从缓存中删除,Broker注销unregisterBroker方法主要代码流程如下:

//代码位置:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#unregisterBroker
public void unregisterBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) {this.lock.writeLock().lockInterruptibly();//将缓存的broker存活对象删除BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);//将所有的过滤服务删除this.filterServerTable.remove(brokerAddr);boolean removeBrokerName = false;//删除broker元数据if (null != brokerData) {String addr = brokerData.getBrokerAddrs().remove(brokerId);if (brokerData.getBrokerAddrs().isEmpty()) {this.brokerAddrTable.remove(brokerName);removeBrokerName = true;}}//如果删除broker元数据成功if (removeBrokerName) {Set<String> nameSet = this.clusterAddrTable.get(clusterName);if (nameSet != null) {boolean removed = nameSet.remove(brokerName);if (nameSet.isEmpty()) {this.clusterAddrTable.remove(clusterName);}}//根据brokerName删除topic配置信息this.removeTopicByBrokerName(brokerName);}this.lock.writeLock().unlock();
}

unregisterBroker方法的参数有集群名字、broker地址、broker名字、brokerId,主要逻辑为:

  • 根据broker地址删除broker存活对象。
  • 根据broker地址删除所有消息过滤服务。
  • 删除broker元数据。
  • 如果删除元数据成功,则根据集群名字删除该集群的所有broker名字,以及根据根据- brokerName删除topic配置信息。
路由信息的管理

处理器DefaultRequestProcessor类的processRequest方法除了处理Broker注册和Broker注销的请求外,还处路由信息管理有关的请求,接收到生产者和消费者的路由信息相关的请求,会交给处理器DefaultRequestProcessor类的processRequest方法处理,processRequest方法则会根据不同的请求类型将请求交给RouteInfoManager类的不同方法处理。RouteInfoManager类用map进行缓存路由相关信息,map如下:

//topic与队列数据对应映射关系
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//broker 名字与broker 元数据对应映射关系
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//保存cluster的所有broker name
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//broker 地址 与 BrokerLiveInfo存活对象的对应映射关系
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//broker 地址 的所有过滤服务
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

RouteInfoManager类利用上面几个map缓存了Broker信息,topic相关信息、集群信息、消息过滤服务信息等,如果这些缓存的信息有变化,就是网这些map新增或删除缓存。这就是Name Server服务的路由信息管理。processRequest方法是如何处理路由信息管理的,具体实现可以阅读具体的代码,无非就是将不同的请求委托给RouteInfoManager的不同方法,RouteInfoManager的不同实现了上面缓存信息的管理。

Broker

Broker 主要负责消息的存储,投递和查询以及保证服务的高可用。Broker负责接收生产者发送的消息并存储、同时为消费者消费消息提供支持。为了实现这些功能,Broker包含几个重要的子模块:

通信模块:负责处理来自客户端(生产者、消费者)的请求。
客户端管理模块:负责管理客户端(生产者、消费者)和维护消费者的Topic订阅信息。
存储模块:提供存储消息和查询消息的能力,方便Broker将消息存储到硬盘。
高可用服务(HA Service):提供数据冗余的能力,保证数据存储到多个服务器上,将Master Broker的数据同步到Slavew Broker上。
索引服务(Index service):对投递到Broker的消息建立索引,提供快速查询消息的能力。
在这里插入图片描述

broker启动过程分析

在Name Server启动以后,Broker就可以开始启动了,启动过程将所有路由信息都注册到Name server服务器上,生产者就可以发送消息到Broker,消费者也可以从Broker消费消息。接下来就来看看Broker的具体启动过程。

//源代码位置:org.apache.rocketmq.broker.BrokerStartup#main
public static void main(String[] args) {start(createBrokerController(args));
}

BrokerStartup类是Broker的启动类,在BrokerStartup类的main方法中,首先创建用createBrokerController方法创建Broker控制器(BrokerController类),Broker控制器主要负责Broker启动过程的具体的相关逻辑实现。创建好Broker 控制器以后,就可以启动Broker 控制器了,所以下面将从两个部分分析Broker的启动过程:

  • 创建Broker控制器
  • 初始化配置信息
  • 创建并初始化Broker控制
  • 注册Broker关闭的钩子
  • 启动Broker控制器
创建Broker控制器

Broker在启动的时候,会初始化一些配置,如Broker配置、netty服务端配置、netty客户端配置、消息存储配置,为Broker启动提供配置准备。

//源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerControllerpublic static BrokerController createBrokerController(String[] args) {/**省略代码注释:1、设置RocketMQ的版本2、设置netty接收和发送请求的buffer大小3、构建命令行:将命令行进行解析封装**///broker配置、netty服务端配置、netty客户端配置final BrokerConfig brokerConfig = new BrokerConfig();final NettyServerConfig nettyServerConfig = new NettyServerConfig();final NettyClientConfig nettyClientConfig = new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));//设置netty监听接口nettyServerConfig.setListenPort(10911);//消息存储配置final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();//如果broker的角色是slave,设置命中消息在内存的最大比例if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}//省略代码}

createBrokerController方法创建BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类,BrokerConfig类是Broker配置类。

  • BrokerConfig:属性主要包括Broker相关的配置属性,如Broker名字、Broker Id、Broker连接的Name server地址、集群名字等。
  • NettyServerConfig:Broker netty服务端配置类,Broker netty服务端主要用来接收客户端的请求,NettyServerConfig类主要属性包括监听接口、服务工作线程数、接收和发送请求的buffer大小等。
  • NettyClientConfig:netty客户端配置类,用于生产者、消费者这些客户端与Broker进行通信相关配置,配置属性主要包括客户端工作线程数、客户端回调线程数、连接超时时间、连接不活跃时间间隔、连接最大闲置时间等。
  • MessageStoreConfig:消息存储配置类,配置属性包括存储路径、commitlog文件存储目录、CommitLog文件的大小、CommitLog刷盘的时间间隔等。
初始化配置信息

创建完这些配置类以后,接下来会为这些配置类的一些配置属性设置值,先看看如下代码:

//源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerControllerpublic static BrokerController createBrokerController(String[] args) {//省略代码//如果命令中包含字母c,则读取配置文件,将配置文件的内容设置到配置类中if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {configFile = file;InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);//读取配置文件的中namesrv地址properties2SystemEnv(properties);//将配置文件中的配置项映射到配置类中去MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);//设置配置broker配置文件BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}//设置broker配置类MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);//省略代码}

上述主要的代码逻辑为如果命令行中存在命令参数为‘c’(c是configFile的缩写),那么就读取configFile文件的内容,将configFile配置文件的配置项映射到BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类中。接下来createBrokerController方法做一些判断必要配置的合法性,如下代码所示:

//源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
public static BrokerController createBrokerController(String[] args) {//省略代码//如果broker配置文件的rocketmqHome属性值为null,直接结束程序if (null == brokerConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}//如果name server服务器的地址不为nullString namesrvAddr = brokerConfig.getNamesrvAddr();if (null != namesrvAddr) {try {//namesrvAddr是以";"分割的多个地址String[] addrArray = namesrvAddr.split(";");//每个地址是ip:port的形式,检测下是否形如ip:port的形式for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",namesrvAddr);System.exit(-3);}}//设置BrokerId,broker master 的BrokerId设置为0,broker slave 设置为大于0的值switch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE://如果小于等于0,退出程序if (brokerConfig.getBrokerId() <= 0) {System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}//省略代码}

首先会判断下RocketmqHome的值是否为空,RocketmqHome是Borker相关配置保存的文件目录,如果为空则直接退出程序,启动Broker失败;然后判断下Name server 地址是否为空,如果不为空则解析以“;”分割的name server地址,检测下地址的合法性,如果不合法则直接退出程序;最后判断下Broker的角色,如果是master,BrokerId设置为0,如果是SLAVE,则BrokerId设置为大于0的数,否则直接退出程序,Broker启动失败。
createBrokerController方法进行必要配置参数的判断以后,将进行日志的设置、以及打印配置信息,主要代码如下:

//源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
public static BrokerController createBrokerController(String[] args) {//省略代码//注释:日志设置//printConfigItem 打印配置信息if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption('m')) {//printImportantConfig 打印重要配置信息InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}//打印配置信息log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);//代码省略}

createBrokerController方法的以上代码逻辑打印配置信息,先判断命令行参数是否包含字母‘p’(printConfigItem的缩写),如果包含字母‘p’,则打印配置信息,否则判断下命令行是否包含字母‘m’,则打印被@ImportantField注解的配置属性,也就是重要的配置属性。最后,不管命令行中是否存在字母‘p’或者字母‘m’,都打印配置信息。

以上就是初始化配置信息的全部代码,初始化配置信息主要是创建BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类,并为这些配置类设置配置的值,同时根据命令行参数判断打印配置信息。

初始化Broker控制器
//源代码位置:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
public static BrokerController createBrokerController(String[] args) {//省略代码//创建BrokerController(broker 控制器)final BrokerController controller = new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discard//将所有的配置信息保存在内存controller.getConfiguration().registerConfig(properties);//初始化broker控制器boolean initResult = controller.initialize();//如果初始化失败,则退出if (!initResult) {controller.shutdown();System.exit(-3);}//省略代码
}

创建并初始化Broker控制的代码比较简单,创建以配置类作为参数的BrokerController对象,并将所有的配置信息保存在内容中,方便在其他地方使用;创建完Broker控制器对象以后,对控制器进行初始化,当初始化失败以后,则直接退出程序。

initialize方法主要是加载一些保存在本地的一些配置数据,总结起来做了如下几方面的事情:

  • 加载topic配置、消费者位移数据、订阅组数据、消费者过滤配置
  • 创建消息相关的组件,并加载消息数据
  • 创建netty服务器
  • 创建一系列线程
  • 注册处理器
  • 启动一系列定时任务
  • 初始化事务组件
  • 初始化acl组件
  • 注册RpcHook
加载topic配置、消费者位移数据、订阅组数据、消费者过滤配置
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//加载topic配置 topics.jsonboolean result = this.topicConfigManager.load();//加载消费者位移数据 consumerOffset.jsonresult = result && this.consumerOffsetManager.load();//加载订阅组数据 subscriptionGroup.jsonresult = result && this.subscriptionGroupManager.load();//加载消费者过滤 consumerFilter.jsonresult = result && this.consumerFilterManager.load();//省略代码
}

load方法是抽象类ConfigManager的方法,该方法读取文件的内容解码成对应的配置对象,如果文件中的内容为空,就读取备份文件中的内容进行解码。读取的文件都是保存在user.home/store/config/下,user.home是用户目录,不同人的电脑user.home一般不同。topicConfigManager.load()读取topics.json文件,如果该文件的内容为空,那么就读取topics.json.bak文件内容,topics.json保存的是topic数据;同理,consumerOffsetManager.load()方法读取consumerOffset.json和consumerOffset.json.bak文件,保存的是消费者位移数据;subscriptionGroupManager.load()方法读取subscriptionGroup.json和subscriptionGroup.json.bak文件,保存订阅组数据(消费者分组数据)、consumerFilterManager.load()方法读取的是consumerFilter.json和consumerFilter.json.bak的内容,保存的是消费者过滤数据。

创建消息相关的组件,并加载消息数据
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码//如果上述都加载成功if (result) {try {//创建消息存储器this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,                                this.messageArrivingListener,this.brokerConfig);//如果开启了容灾、主从自动切换,添加DLedger角色改变处理器if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}//broker 相关统计this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load plugin//加载消息存储插件MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}//加载消息文件result = result && this.messageStore.load();//省略代码
}

如果加载topic配置、消费者位移数据、订阅组数据、消费者过滤配置成功以后,就创建消息相关的组件,并加载消息数据,这个过程创建了消息存储器、DLedger角色改变处理器、Broker统计相关组件以及消息存储插件,然后加载消息文件中的数据。接下来具体看看加载消息文件中的messageStore.load()方法:

//代码位置:org.apache.rocketmq.store.DefaultMessageStore#load
public boolean load() {boolean result = true;try {//判断abort是否存在boolean lastExitOK = !this.isTempFileExist();log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");//加载定时消费服务器if (null != scheduleMessageService) {//读取delayOffset.json文件result = result && this.scheduleMessageService.load();}// load Commit Log//加载 Commit log 文件result = result && this.commitLog.load();// load Consume Queue//加载消费者队列 文件consumequeueresult = result && this.loadConsumeQueue();if (result) {//加载检查点文件checkpointthis.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));//加载索引文件this.indexService.load(lastExitOK);//数据恢复this.recover(lastExitOK);log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());}} catch (Exception e) {log.error("load exception", e);result = false;}if (!result) {this.allocateMappedFileService.shutdown();}return result;
}

load方法主要逻辑就是加载各种数据文件,主要有以下几方面进行加载数据:

  • isTempFileExist方法判断abort是否存在,如果不存在,说明Broker是正常关闭的,否则就是异常关闭。
  • scheduleMessageService.load()方法读取user.home/store/config/下的delayOffset.json文件的内容,该文件内容保存延迟消息的位移数据。
  • commitLog.load()加载 CommitLog 文件, CommitLog 文件保存的是消息内容
  • loadConsumeQueue()方法加载consumequeue目录下的内容,ConsumeQueue(消息消费队列)是消费消息的索引,消费者通过ConsumeQueue可以快速找到查找待消费的消息,consumequeue目录下的文件组织方式是:topic/queueId/fileName,所以就可以快速找待消费的消息在哪一个Commit log 文件中。
  • indexService.load(lastExitOK)加载索引文件,加载的是user.home/store/index/目录下文件,文件名fileName是以创建时的时间戳命名的,所以可以通过时间区间来快速查询消息,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故底层实现为hash索引。
  • recover(lastExitOK)方法将CommitLog 文件的内容加载到内存中以及topic队列。
创建netty服务器
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码if (result) {//创建netty远程服务器this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);//省略代码}//省略代码
}

创建netty服务器的时候创建了两个,一个是普通的,一个是快速的,remotingServer用来与生产者、消费者进行通信。当isSendMessageWithVIPChannel=true的时候会选择port-2的fastRemotingServer进行的消息的处理,为了防止某些很重要的业务阻塞,就再开启了一个remotingServer进行处理,但是现在默认是不开启的,fastRemotingServer主要是为了兼容老版本的RocketMQ.。

创建一系列线程池
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//代码省略//发送消息线程池this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));//拉取消息线程池//this.pullMessageExecutor //回复消息线程池//this.replyMessageExecutor //查询消息线程池//this.queryMessageExecutor //broker 管理线程池//this.adminBrokerExecutor//客户端管理线程池//this.clientManageExecutor //心跳线程池//this.heartbeatExecutor //事务线程池// this.endTransactionExecutor //消费者管理线程池this.consumerManageExecutor =Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));//代码省略
}

创建的线程池对象有发送消息线程池、拉取消息线程池、回复消息线程池、查询消息线程池、broker 管理线程池、客户端管理线程池、心跳线程池、事务线程池、消费者管理线程池。

注册请求处理器
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码//注册处理器this.registerProcessor();//省略代码
}

registerProcessor()方法如下:

//源代码位置:org.apache.rocketmq.broker.BrokerController#registerProcessor
public void registerProcessor() {//发送消息处理器SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);//远程服务注册发送消息处理器this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);//注册拉消息处理器//注册回复消息处理器//注册查询消息处理器//注册客户端管理处理器//注册消费者管理处理器//注册事务处理器//注册broker处理器AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
}

registerProcessor方法注册了发送消息处理器、远程服务注册发送消息处理器、拉消息处理器、回复消息处理器、查询消息处理器、客户端管理处理器、消费者管理处理器、事务处理器、broker处理器。registerProcessor注册方法也很简单,就是以RequestCode作为key,以Pair<处理器,线程池>作为Value保存在名字为processorTable的HashMap中。每个请求都是在线程池中处理的,这样可以提高处理请求的性能。对于每个传入的请求,根据RequestCode就可以在processorTable查找处理器来处理请求。每个处理器都有有一个processRequest方法进行处理请求。

启动一系列定时任务

Broker初始化方法initialize中,会启动一系列的后台定时线程任务,这些后台任务包括都是由scheduledExecutorService线程池执行的,scheduledExecutorService是单线程线程池( Executors.newSingleThreadScheduledExecutor()),只用单线程线程池执行后台定时任务有一个好处就是减少线程过多,反而导致线程为了抢占CPU加剧了竞争。这一些后台定时线程任务如下:

  • 每24小时打印昨天产生了多少消息,消费了多少消息
  • 每五秒保存消费者位移到文件中
  • 每10秒保存消费者过滤到文件中
  • 每3分钟定时检测消费的进度
  • 每秒打印队列的大小以及队列头部元素存在的时间
  • 每分钟打印已存储在CommitLog中但尚未分派到消费队列的字节数
  • 每两分钟定时获取获取name server 地址
  • 每分钟定时打印slave 数据同步落后多少
//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码final long period = 1000 * 60 * 60 * 24;//每24小时打印昨天产生了多少消息,消费了多少消息this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);//省略代码
}

每24小时打印昨天产生了多少消息,消费了多少消息的定时任务比较简单,就是将昨天消息的生产和消费的数量统计出来,然后把这两个指标打印出来。

//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码//每五秒保存消费者位移this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);//每10秒保存消费者过滤this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);//省略代码
}

每五秒保存消费者位移和每10秒保存消费者过滤定时任务都是保存在文件中,每五秒保存消费者位移定时任务将消费者位移保存在consumerOffset.json文件中,每10秒保存消费者过滤定时任务将消费者过滤保存在consumerFilter.json文件中。

//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//省略代码//每3分钟定时检测消费的进度this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);//省略代码
}

每3分钟定时检测消费进度的定时任务的作用是检测消费者的消费进度,当消费者消费消息的进度落后大于配置的最大落后阈值时,就停止消费者消费,具体的实现看protectBroker的源码:

//源代码位置:org.apache.rocketmq.broker.BrokerController#protectBroker
public void protectBroker() {
//是否开启慢消费检测开关,默认未开启
if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
//遍历统计项
final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<String, MomentStatsItem> next = it.next();
final long fallBehindBytes = next.getValue().getValue().get();
//消费者消费消息的进度落后消费者落后阈值
if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {
final String[] split = next.getValue().getStatsKey().split(“@”);
final String group = split[2];
LOG_PROTECTION.info(“[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it”, group, fallBehindBytes);
//设置消费者消费的标志,关闭消费
this.subscriptionGroupManager.disableConsume(group);
}
}
}
}
protectBroker方法首先判别是否开启慢消费检测开关,如果开启了,就进行遍历统计项,判断消费者消费消息的进度落后消费者落后阈值的时候,就停止该消费者停止消费来保护broker,如果消费者消费比较慢,那么在Broker的消费会越来越多,积压在Broker上,所以停止慢消费者消费消息,让其他消费者消费,减少消息的积压。

//源代码位置:org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//代码省略//每秒打印队列的大小以及队列头部元素存在的时间this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);//代码省略
}

每秒打印队列的大小以及队列头部元素存在的时间定时任务,会打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小,以及打印队列头部元素存在的时间,这个时间等于当前时间减去头部元素创建的时间,就是该元素创建到现在已经花费了多长时间。具体的代码如下:

//源代码位置:org.apache.rocketmq.broker.BrokerController#headSlowTimeMills
public long headSlowTimeMills(BlockingQueue<Runnable> q) {long slowTimeMills = 0;//队列的头final Runnable peek = q.peek();if (peek != null) {RequestTask rt = BrokerFastFailure.castRunnable(peek);//当前时间减去创建时间slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();}if (slowTimeMills < 0) {slowTimeMills = 0;}return slowTimeMills;
}
初始化事务消息
//源码位置:org.apache.rocketmq.broker.BrokerController#initialTransaction
private void initialTransaction() {//加载transactionalMessageService,利用spithis.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);if (null == this.transactionalMessageService) {this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());}//创建transactionalMessage检查监听器this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);if (null == this.transactionalMessageCheckListener) {this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());}this.transactionalMessageCheckListener.setBrokerController(this);//创建事务消息检查服务this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);}

initialTransaction方法主要创建与事务消息相关的类,创建transactionalMessageService(事务消息服务)、transactionalMessageCheckListener(事务消息检查监听器)、transactionalMessageCheckService(事务消息检查服务)。transactionalMessageService用于处理事务消息,transactionalMessageCheckListener主要用来回查消息监听,transactionalMessageCheckService用于检查超时的 Half 消息是否需要回查。RocketMQ发送事务消息是将消费先写入到事务相关的topic的中,这个消息就称为半消息,当本地事务成功执行,那么半消息会还原为原来的消息,然后再进行保存。initialTransaction在创建transactionalMessageService和transactionalMessageCheckListener都使用了ServiceProvider.loadClass方法,这个方法就是采用SPI原理,SPI原理就是利用反射加载META-INF/service目录下的某个接口的所有实现,只要实现接口,然后META-INF/service目录下添加文件名为全类名的文件,这样SPI就可以加载具体的实现类,具有可拓展性。

初始化acl组件
//源码位置:org.apache.rocketmq.broker.BrokerController#initialAcl
private void initialAcl() {if (!this.brokerConfig.isAclEnable()) {log.info("The broker dose not enable acl");return;}//利用SPI加载权限相关的校验器List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);if (accessValidators == null || accessValidators.isEmpty()) {log.info("The broker dose not load the AccessValidator");return;}//将所有的权限校验器进行缓存以及注册for (AccessValidator accessValidator: accessValidators) {final AccessValidator validator = accessValidator;accessValidatorMap.put(validator.getClass(),validator);this.registerServerRPCHook(new RPCHook() {@Overridepublic void doBeforeRequest(String remoteAddr, RemotingCommand request) {//Do not catch the exceptionvalidator.validate(validator.parse(request, remoteAddr));}@Overridepublic void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {}});}}

initialAcl方法主要是加载权限相关校验器,RocketMQ的相关的管理的权限验证和安全就交给这里的加载的校验器了。initialAcl方法也利用SPI原理加载接口的具体实现类,将所有加载的校验器缓存在map中,然后再注册RPC钩子,在请求之前调用校验器的validate的方法。

注册RpcHook
//源码位置:org.apache.rocketmq.broker.BrokerController#initialRpcHooks
private void initialRpcHooks() {//利用SPI加载钩子List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);if (rpcHooks == null || rpcHooks.isEmpty()) {return;}//注册钩子for (RPCHook rpcHook: rpcHooks) {this.registerServerRPCHook(rpcHook);}
}

initialRpcHooks方法加RPC钩子,利用SPI原理加载具体的钩子实现,然后将所有的钩子进行注册,钩子的注册是将钩子保存在List中。

以上分析就是创建Broker控制器的全过程,这个过程首先进行一些必要的初始化配置,如Broker配置、网络通信Neety配置以及存储相关配置等。然后在创建并初始化Broker控制器,创建并初始化Broker控制器的过程中,又进行了多个步骤,如加载topic配置、消费者位移数据、启动一系列后台定时任务、创建事务消息相关组件等。

Broker控制器的启动

//源码位置:org.apache.rocketmq.broker.BrokerController#start
public static BrokerController start(BrokerController controller) {try {//Broker控制器启动controller.start();//打印Broker成功的消息String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();if (null != controller.getBrokerConfig().getNamesrvAddr()) {tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();}log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}

controller.start()方法主要是启动各种组件:

  • 启动消息消息存储器
  • netty服务的启动
  • 文件监听器启动
  • broker 对外api启动
  • 长轮询拉取消息服务启动
  • 客户端长连接服务启动
  • 过滤服务管理启动
  • broker 相关统计启动
  • broker 快速失败启动
//源码位置:org.apache.rocketmq.broker.BrokerController#start
public void start() throws Exception {if (this.messageStore != null) {//启动消息消息存储this.messageStore.start();}if (this.remotingServer != null) {//netty服务的启动this.remotingServer.start();}if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}//文件改变监听启动if (this.fileWatchService != null) {this.fileWatchService.start();}//broker 对外api启动if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}//保持长轮询请求的服务启动if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}//客户端长连接服务启动if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}//过滤服务管理启动if (this.filterServerManager != null) {this.filterServerManager.start();}//如果没有采用主从切换(多副本)if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}//定时注册brokerthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);//broker 相关统计启动if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}//broker 快速失败启动if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}}

启动过程还有很多细节没有分析,放到下个文章吧吧吧吧吧

文章大量参考:https://www.zhihu.com/column/c_1437729921845690368


http://www.ppmy.cn/news/1564347.html

相关文章

【深度学习】Java DL4J 2024年度技术总结

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…

SpringMVC (2)

目录 1. RequestMapping 注解介绍 2. RequestMapping 使用 3. RequestMapping与请求方式 3.1 RequestMapping 支持Get和Post类型的请求 3.2 RequestMapping 指定接收某种请求 3.3 GetMapping和PostMapping 4. 传参 4.1 通过查询字符串传参 4.2 在 Body 中传参 4.2.1 …

[深度学习]神经网络线性回归简易实例

线性回归简易实例 文章目录 线性回归简易实例导入模块所模拟的模型生成数据获取数据定义模型定义LOSS使用模型拟合出真实参数实现梯度下降函数&#xff0c;用于更新参数训练函数 完整代码 导入模块 import torch import matplotlib.pyplot as plt #画图import random #随机tor…

RabbitMQ 进阶

文章目录 一、发送者的可靠性 1.1 生产者重试机制&#xff1a;1.2 生产者确认机制&#xff1a; 1.2.1 开启生产者确认&#xff1a;1.2.2 定义 ReturnCallback&#xff1a;1.2.3 定义 ConfirmCallback&#xff1a; 二、MQ 的可靠性 2.1 数据持久化&#xff1a; 2.1.1 交换机持…

ASP.NET Core 中的 JWT 鉴权实现

在当今的软件开发中&#xff0c;安全性和用户认证是至关重要的方面。JSON Web Token&#xff08;JWT&#xff09;作为一种流行的身份验证机制&#xff0c;因其简洁性和无状态特性而被广泛应用于各种应用中&#xff0c;尤其是在 ASP.NET Core 项目里。本文将详细介绍如何在 ASP.…

Python爬虫---中国大学MOOC爬取数据(文中有数据集)

1、内容简介 本文为大二在校学生所做&#xff0c;内容为爬取中国大学Mooc网站的课程分类数据、课程数据、评论数据。数据集大佬们需要拿走。主要是希望大佬们能指正代码问题。 2、数据集 课程评论数据集&#xff0c;343525条&#xff08;包括评论id、评论时间、发送评论用户…

Android 导出CSV文件乱码问题处理

最近有一个需求&#xff0c;需要在Android端导出CSV文件&#xff0c;自测是用的WPS&#xff0c;没啥问题。可到了测试那边&#xff0c;用Excel直接打开就是乱码&#xff0c;需要在Excel数据里面用【从文件/CSV】打开。这样就显示非常的不方便。 解决办法&#xff1a; public …

python中元类相关的问答题

元类&#xff08;metaclass&#xff09;是 Python 面试中的高级主题&#xff0c;主要考察候选人对类和对象底层机制的理解。以下是常见的 Python 元类面试题及详细解答&#xff1a; 1. 什么是元类&#xff1f; 回答&#xff1a; 元类是用于创建类的类。普通类的实例是对象&am…