Springboot RabbitMq源码解析之RabbitListener注解 (四)

news/2024/11/29 12:40:34/

文章目录

      • 1.RabbitListener注解介绍
      • 2.EnableRabbit和RabbitBootstrapConfiguration
      • 3.RabbitListenerAnnotationBeanPostProcessor
      • 4.对RabbitListener注解的解析
      • 5.RabbitListenerEndpointRegistrar

1.RabbitListener注解介绍

RabbitListenerSpringboot RabbitMq中经常用到的一个注解,将被RabbitListener注解的类和方法封装成MessageListener注入MessageListenerContainer

  • 当RabbitListener注解在方法上时,对应的方式就是Rabbit消息的监听器
  • 当RabbitListener注解在类上时,和RabbitHandle注解配合使用,可以实现不同类型的消息的分发,类中被RabbitHandle注解的方法就是Rabbit消息的监听器

2.EnableRabbit和RabbitBootstrapConfiguration

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

通过自动配置类RabbitAutoConfiguration将EnableRabbit引入,而EnableRabbit又通过import注解引入了配置类RabbitBootstrapConfiguration

public class RabbitBootstrapConfiguration implements ImportBeanDefinitionRegistrar {@Overridepublic void registerBeanDefinitions(@Nullable AnnotationMetadata importingClassMetadata,BeanDefinitionRegistry registry) {if (!registry.containsBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,new RootBeanDefinition(RabbitListenerAnnotationBeanPostProcessor.class));}if (!registry.containsBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,new RootBeanDefinition(RabbitListenerEndpointRegistry.class));}}}

容器Ioc中注入RabbitListenerAnnotationBeanPostProcessor和RabbitListenerEndpointRegistry

3.RabbitListenerAnnotationBeanPostProcessor

在这里插入图片描述

RabbitListenerAnnotationBeanPostProcessor类实现了BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, SmartInitializingSingleton接口,Ordered表示处理顺序,BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware主要用于获取对应的BeanFactory,BeanClassLoader, Environment属性,我们主要关注从SmartInitializingSingletonBeanPostProcessor继承的方法

在这里插入图片描述

public void afterSingletonsInstantiated() {this.registrar.setBeanFactory(this.beanFactory);if (this.beanFactory instanceof ListableBeanFactory) {Map<String, RabbitListenerConfigurer> instances =((ListableBeanFactory) this.beanFactory).getBeansOfType(RabbitListenerConfigurer.class);for (RabbitListenerConfigurer configurer : instances.values()) {configurer.configureRabbitListeners(this.registrar);}}if (this.registrar.getEndpointRegistry() == null) {if (this.endpointRegistry == null) {Assert.state(this.beanFactory != null,"BeanFactory must be set to find endpoint registry by bean name");this.endpointRegistry = this.beanFactory.getBean(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,RabbitListenerEndpointRegistry.class);}this.registrar.setEndpointRegistry(this.endpointRegistry);}if (this.containerFactoryBeanName != null) {this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);}// Set the custom handler method factory once resolved by the configurerMessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();if (handlerMethodFactory != null) {this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);}// Actually register all listenersthis.registrar.afterPropertiesSet();// clear the cache - prototype beans will be re-cached.this.typeCache.clear();
}

初始化工作,主要是基于自定义配置RabbitListenerConfigurer进行RabbitListenerAnnotationBeanPostProcessor(尤其是registrar元素)的初始化

在这里插入图片描述

  • postProcessBeforeInitialization
  • postProcessAfterInitialization

在这里插入图片描述

在这里插入图片描述

	@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {Class<?> targetClass = AopUtils.getTargetClass(bean);final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);for (ListenerMethod lm : metadata.listenerMethods) {for (RabbitListener rabbitListener : lm.annotations) {processAmqpListener(rabbitListener, lm.method, bean, beanName);}}if (metadata.handlerMethods.length > 0) {processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);}return bean;}

对RabbitListener注解查找和解析

  • RabbitListenerAnnotationBeanPostProcessor#buildMetadata
  • RabbitListenerAnnotationBeanPostProcessor#processAmqpListener
  • RabbitListenerAnnotationBeanPostProcessor#processMultiMethodListeners

4.对RabbitListener注解的解析

RabbitListenerAnnotationBeanPostProcessor#buildMetadata

private TypeMetadata buildMetadata(Class<?> targetClass) {Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = classLevelListeners.size() > 0;final List<ListenerMethod> methods = new ArrayList<>();final List<Method> multiMethods = new ArrayList<>();ReflectionUtils.doWithMethods(targetClass, method -> {Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);if (listenerAnnotations.size() > 0) {methods.add(new ListenerMethod(method,listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));}if (hasClassLevelListeners) {RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);if (rabbitHandler != null) {multiMethods.add(method);}}}, ReflectionUtils.USER_DECLARED_METHODS);if (methods.isEmpty() && multiMethods.isEmpty()) {return TypeMetadata.EMPTY;}return new TypeMetadata(methods.toArray(new ListenerMethod[methods.size()]),multiMethods.toArray(new Method[multiMethods.size()]),classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
}

RabbitListenerAnnotationBeanPostProcessor就是针对每一个bean类进行解析,针对类上的RabbitListener注解、方法上的RabbitHandle注解和方法上的RabbitListener注解解析后封装到TypeMetadata类中

通过RabbitListenerAnotationBeanPostProcessor#buildMetadata查找并封装成TypeMetadata分别交给processAmqpListenerprocessMultiMethodListeners进行解析

protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {Method methodToUse = checkProxy(method, bean);MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();endpoint.setMethod(methodToUse);endpoint.setBeanFactory(this.beanFactory);endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));String errorHandlerBeanName = resolveExpressionAsString(rabbitListener.errorHandler(), "errorHandler");if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));}processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}private void processMultiMethodListeners(RabbitListener[] classLevelListeners, Method[] multiMethods,Object bean, String beanName) {List<Method> checkedMethods = new ArrayList<Method>();for (Method method : multiMethods) {checkedMethods.add(checkProxy(method, bean));}for (RabbitListener classLevelListener : classLevelListeners) {MultiMethodRabbitListenerEndpoint endpoint = new MultiMethodRabbitListenerEndpoint(checkedMethods, bean);endpoint.setBeanFactory(this.beanFactory);processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);}
}

RabbitListenerAnnotationBeanPostProcessor#processAmqpListener针对被RabbitListener注解的方法进行解析,
RabbitListenerAnnotationBeanPostProcessot#processMultiMethodListeners针对RabbitListener注解的类中被RabbitHandle注解的方法进行解析

新建MultiMethodRabbitListenerEndpoint对象,针对两种方式的差异进行部分属性的初始化后交给RabbitListenerAnnotationBeanPostProcessor进行后续处理processListener

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,Object adminTarget, String beanName) {endpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(rabbitListener));endpoint.setQueueNames(resolveQueues(rabbitListener));endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));String group = rabbitListener.group();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String autoStartup = rabbitListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));}endpoint.setExclusive(rabbitListener.exclusive());String priority = resolve(rabbitListener.priority());if (StringUtils.hasText(priority)) {try {endpoint.setPriority(Integer.valueOf(priority));}catch (NumberFormatException ex) {throw new BeanInitializationException("Invalid priority value for " +rabbitListener + " (must be an integer)", ex);}}String rabbitAdmin = resolve(rabbitListener.admin());if (StringUtils.hasText(rabbitAdmin)) {Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");try {endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +rabbitAdmin + "' was found in the application context", ex);}}RabbitListenerContainerFactory<?> factory = null;String containerFactoryBeanName = resolve(rabbitListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");try {factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +containerFactoryBeanName + "' was found in the application context", ex);}}this.registrar.registerEndpoint(endpoint, factory);
}

根据RabbitListener注解的属性进行MethodRabbitListenerEndpoint 的属性设置和校验,最后通过RabbitListenerEndpointRegistrar#registerEndpoint方法将MethodRabbitListenerEndpoint 注入容器RabbitListenerContainerFactory

5.RabbitListenerEndpointRegistrar

在这里插入图片描述

@Override
public void afterPropertiesSet() {registerAllEndpoints();
}protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true;  // trigger immediate startup}
}

RabbitListenerEndpointRegistrar#registerEndpoint

public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {Assert.notNull(endpoint, "Endpoint must be set");Assert.hasText(endpoint.getId(), "Endpoint id must be set");// Factory may be null, we defer the resolution right before actually creating the containerAmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);synchronized (this.endpointDescriptors) {if (this.startImmediately) { // Register and start immediatelythis.endpointRegistry.registerListenerContainer(descriptor.endpoint,resolveContainerFactory(descriptor), true);}else {this.endpointDescriptors.add(descriptor);}}
}

RabbitListenerEndpointRegistry#registerListenerContainer进行注册监听器的容器

RabbitListenerEndpointRegistry#registerListenerContainer

public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,boolean startImmediately) {Assert.notNull(endpoint, "Endpoint must not be null");Assert.notNull(factory, "Factory must not be null");String id = endpoint.getId();Assert.hasText(id, "Endpoint id must not be empty");synchronized (this.listenerContainers) {Assert.state(!this.listenerContainers.containsKey(id),"Another endpoint is already registered with id '" + id + "'");MessageListenerContainer container = createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {List<MessageListenerContainer> containerGroup;if (this.applicationContext.containsBean(endpoint.getGroup())) {containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);}else {containerGroup = new ArrayList<MessageListenerContainer>();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}containerGroup.add(container);}if (startImmediately) {startIfNecessary(container);}}
}

基于RabbitListenerEndpoint根据监听器的容器工厂类生成一个监听器的容器,并且整个注册过程是同步的,同时最多只能有一个endpoint在注册

RabbitListenerEndpointRegistry#start

@Override
public void start() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}
}
private void startIfNecessary(MessageListenerContainer listenerContainer) {if (this.contextRefreshed || listenerContainer.isAutoStartup()) {listenerContainer.start();}
}

调用MessageListenerContainer#start方法, 监听器的启动。


http://www.ppmy.cn/news/1577.html

相关文章

HTML+CSS+JS做一个好看的个人网页—web网页设计作业

个人网页设计个人网页&#xff08;htmlcssjs&#xff09;——网页设计作业带背景音乐&#xff08;The way I still Love you&#xff09;、樱花飘落效果、粒子飘落效果页面美观&#xff0c;样式精美涉及&#xff08;htmlcssjs&#xff09;&#xff0c;下载后可以根据自己需求进…

TIDB在centos7.9上通过docker-compose进行安装、备份

1.环境介绍&#xff1a; 在centos7.9上安装tidb docker-compose版本 虚拟机配置2C/8G/40G 最小化安装 2.安装步骤 2.1 安装centos7.9 略 2.2 安装docker &#xff08;1&#xff09;安装依赖包 yum install -y yum-utils device-mapper-persistent-data lvm2&#xff08;2…

Hbase的SQL接口之Phoenix使用心得

PHOENIX 官方定义 A SQL layer over HBase delivered as a client-embedded JDBC drivertargeting low latency queries over HBase data 不同于Hive on HBase的方式&#xff0c;Phoenix将Query Plan直接使用HBaseAPI实现&#xff0c;目的是规避MapReduce框架&#xff0c;减少…

MySQL事务

目录 一、MySQL事务的基本了解 1、事务的概念 2、事务的ACID特点 2.1 原子性 2.2 一致性 2.3 隔离性 2.4 持久性 2.5 总结 3、对于隔离性的扩充 3.1 查询全局事务隔离级别 3.2 查询会话事务隔离级别 3.3 设置全局事务隔离级别 3.4 设置会话事务隔离级别 二、事务…

ATF问题二则:EL3可能没有实现吗? aarch32中的S-EL1是什么?

最近两个问题&#xff0c;戳到了我的知识盲点&#xff0c;当然我这个菜鸡ATF哪里都是盲点。 问题一&#xff1a;EL3可能没有实现吗&#xff1f; 问题二&#xff1a;bl2是aarch32, 那么bl2是S-EL1&#xff0c;bl31也是S-EL1? 1、EL3可能没有实现吗&#xff1f; The Armv8-A …

【Python自然语言处理】文本向量化的六种常见模型讲解(独热编码、词袋模型、词频-逆文档频率模型、N元模型、单词-向量模型、文档-向量模型)

觉得有帮助请点赞关注收藏~~~ 一、文本向量化 文本向量化&#xff1a;将文本信息表示成能够表达文本语义的向量&#xff0c;是用数值向量来表示文本的语义。 词嵌入(Word Embedding)&#xff1a;一种将文本中的词转换成数字向量的方法&#xff0c;属于文本向量化处理的范畴。 …

推特营销引流入门指南

一、关注 当您关注另一个Twitter用户时&#xff0c;您进行订阅&#xff0c;即可立即阅读其内容分享。因此&#xff0c;请评估您关注的人&#xff0c;尤其是刚开始时。跟踪新用户的一种简单方法是找到他们的个人资料&#xff0c;然后单击“关注”按钮。 Twitter对于那些疯狂点…

kubernetes Service详解

linux kubernetes Service详解 Service介绍 在kubernetes中&#xff0c;pod是应用程序的载体&#xff0c;我们可以通过pod的ip来访问应用程序&#xff0c;但是pod的ip地址不是固定的&#xff0c;这也就意味着不方便直接采用pod的ip对服务进行访问。 为了解决这个问题&#xf…