spring-boot-rabbitmq–消息中间件整合
前言:RabbitMQ的各种交换机说明
1、直连交换机
- 生产者发布消息时必须带着routing-key,队列绑定到交换机时必须指定binding-key ,且routing-key和binding-key必须完全相同,如此才能将消息路由到队列中
- 直连交换机通常用来循环分发任务给多个workers,例如在一个日志处理系统中,一个worker处理error级别日志,另外一个worker用来处理info级别的日志,此时生产者只需要在发送时指定特定的routing-key即可,绑定队列时binding-key只需要和routing-key保持一致即可接收到特定的消息。
2、扇形交换机
- 相对于直连交换机,扇形交换机没有路由设置
3、主题交换机
- routing-key必须由多个单词或者通配符组成,单词或者通配符之间使用.隔开,上限为255个字节;
- 通配符只能匹配一个单词;
- 通配符可以匹配零个或者多个单词;
队列绑定交换机时的binding-key要能够匹配发送消息时的routing-key才能将消息路由到对应的队列; - 根据routing-key和binding-key的匹配情况,消息可能进入单个队列,也可能进入多个队列,也可能丢失
- 主题队列的routing-key设置为#时,表示所有所有的队列都可以接收到消息,相当于fanout交换机;
- 主题队列的routing-key中不包含#或者*时,表示指定队列可以接收到消息,相当于direct交换机;
4、RabbitMQ有五种消息模式
- 无交换机模式
- 简单模式【点对点模式】
- 工作模式【一对多,资源争抢模式】
- 有交换机模式
- 直连交换机【通过路由Key进行分配到不同队列】
- 扇形交换机【发布订阅模式,即生产者将消息发布到订阅的队列上】
- 主题交换机【通过主题标识分配,属于直连交换机的升级】
5、消息确认机制
- 不确认
- 自动确认
- 手动确认:
6、RabbitMQ可以作为RPC异步调用
一、基础快速入门
- 一个系统有消息发送也有消息接收,本示例采用发送和接收放到一个项目中
- 本示例采用简易配置,附录中有详细配置参数
1、添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、公共配置项整理
- listener.simple的concurrency和max-concurrency 为并发线程处理,
- rabbitmq.template.retry下的是生产者重试配置,listener.retry 为消费者重试配置
- publisher-confirms 是生产消息到达exchange后回调,publisher-returns 是exhcange路由到queue的回调
spring:application:name: rabbitmq-provider# 配置rabbitmqrabbitmq:# 连接地址,多个地址之间用都好隔开address: 192.168.1.82:5762, 192.168.1.82:5763,192.168.1.82:5764connection-timeout: 15000 # 连接超时时间virtual-host: / # 虚拟主机的方式username: guestpassword: guest# 生产者配置publisher-confirm-type: simple # 设置回调方式为simple,correlatedpublisher-confirms: true #开启消息到达exchange的回调,发送成功失败都会触发回调publisher-returns: true #开启消息从exhcange路由到queue的回调,只有路由失败时才会触发回调template:#为true时,如果exchange根据routingKey将消息路由到queue时找不到匹配的queue,# 触发return回调,为false时,exchange直接丢弃消息。mandatory: true# 配置重试机制retry:enabled: true # 开启发送机制max-attempts: 3 # 最大重试次数。默认为 3 。initial-interval: 1000 # 重试间隔,单位为毫秒。默认为 1000 。# 消费者监听器配置listener:simple:concurrency: 2 # 每个 @ListenerContainer 的并发消费的线程数max-concurrency: 10 # 每个 @ListenerCon 允许的并发消费的线程数acknowledge-mode: manual #auto 自动确认,manual 手动确认,none 没有消息确认prefetch: 5# 重试机制retry: # 配置重试机制enabled: true # 开启消费重试机制max-attempts: 3 # 最大重试次数。默认为 3 。initial-interval: 1000 # 重试间隔,单位为毫秒。默认为 1000 。
3、公用配置类整理
队列初始化工作可以在RabbitMQ的界面上创建,也可以采用代码的方式初始化,一般建议在管理平台上创建
一般需要主机交换机名称,队列名称,路由名称的使用
- DirectExchange(String name, boolean durable, boolean autoDelete) 直连交换机
- FanoutExchange(String name, boolean durable, boolean autoDelete) 扇形交换机
- TopicExchange(String name, boolean durable, boolean autoDelete) 主题交换机
- Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) 队列
- Binding 绑定
@Configuration
public class RabbitConfig {//JSON序列化@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}/*** 直连交换机配置*/public static class DirectExchangeDemoConfiguration {// 创建 Queue@Beanpublic Queue demo01Queue() {return new Queue(Demo01Message.QUEUE, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}.build();}// 创建 Direct Exchange@Beanpublic DirectExchange demo01Exchange() {return new DirectExchange(Demo01Message.EXCHANGE,true, // durable: 是否持久化false); // exclusive: 是否排它}// 创建 Binding// Exchange:Demo01Message.EXCHANGE// Routing key:Demo01Message.ROUTING_KEY// Queue:Demo01Message.QUEUE@Beanpublic Binding demo01Binding() {return BindingBuilder.bind(demo01Queue()).to(demo01Exchange()).with(Demo01Message.ROUTING_KEY);}}/*** 主题交换机 */public static class TopicExchangeDemoConfiguration {// 创建 Queue@Beanpublic Queue demo02Queue() {return new Queue(Demo02Message.QUEUE, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}// 创建 Topic Exchange@Beanpublic TopicExchange demo02Exchange() {return new TopicExchange(Demo02Message.EXCHANGE,true, // durable: 是否持久化false); // exclusive: 是否排它}// 创建 Binding// Exchange:Demo02Message.EXCHANGE// Routing key:Demo02Message.ROUTING_KEY// Queue:Demo02Message.QUEUE@Beanpublic Binding demo02Binding() {return BindingBuilder.bind(demo02Queue()).to(demo02Exchange()).with(Demo02Message.ROUTING_KEY);}}/*** 扇形交换机*/public static class FanoutExchangeDemoConfiguration {// 创建 Queue A@Beanpublic Queue demo03QueueA() {return new Queue(Demo03Message.QUEUE_A, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}// 创建 Queue B@Beanpublic Queue demo03QueueB() {return new Queue(Demo03Message.QUEUE_B, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}// 创建 Fanout Exchange@Beanpublic FanoutExchange demo03Exchange() {return new FanoutExchange(Demo03Message.EXCHANGE,true, // durable: 是否持久化false); // exclusive: 是否排它}// 创建 Binding A// Exchange:Demo03Message.EXCHANGE// Queue:Demo03Message.QUEUE_A@Beanpublic Binding demo03BindingA() {return BindingBuilder.bind(demo03QueueA()).to(demo03Exchange());}// 创建 Binding B// Exchange:Demo03Message.EXCHANGE// Queue:Demo03Message.QUEUE_B@Beanpublic Binding demo03BindingB() {return BindingBuilder.bind(demo03QueueB()).to(demo03Exchange());}}/*** HeadersExchange 示例的配置类*/public static class HeadersExchangeDemoConfiguration {// 创建 Queue@Beanpublic Queue demo04Queue() {return new Queue(Demo04Message.QUEUE, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}// 创建 Headers Exchange@Beanpublic HeadersExchange demo04Exchange() {return new HeadersExchange(Demo04Message.EXCHANGE,true, // durable: 是否持久化false); // exclusive: 是否排它}// 创建 Binding// Exchange:Demo04Message.EXCHANGE// Queue:Demo04Message.QUEUE// Headers: Demo04Message.HEADER_KEY + Demo04Message.HEADER_VALUE@Beanpublic Binding demo4Binding() {return BindingBuilder.bind(demo04Queue()).to(demo04Exchange()).where(Demo04Message.HEADER_KEY).matches(Demo04Message.HEADER_VALUE); // 配置 Headers 匹配}}}
4、消息生产者和消费者类
一般建议,一类消息生产采用一个类,做到职责单一。
交换机,路由,和主题的名称最好采用枚举或者常量的方式定义
注意下面的消息生产者和消息消费者的交换机方式是一一对应的
二、消息生产者
- rabbitTemplate#convertAndSend 方法实现所有形式的消息发放
- rabbitTemplate.setConfirmCallback 设置消息回调
- 直连交换机是常用的方式
RabbitMq消息生产者实例
public class rabbitMqProducer{//点对点模式public void send(String msg){amqpTemplate.convertAndSend(SIMPLE_QUEUE_NAME, msg );}//工作模式public void work() throws InterruptedException {String msg = "这是一个work模式";for (int i = 0; i < 10; i++) {amqpTemplate.convertAndSend(WORKER_QUEUE_NAME, msg + i);Thread.sleep(5000);}}//发送同步消息@Overridepublic void sendMessage(Object message) {rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME,Constants.SAVE_USER_QUEUE_ROUTE_KEY,message, correlationData);log.info("发送消息到RabbitMQ, 消息内容: " + message);}//发送异步消息@Asyncpublic ListenableFuture<Void> asyncSend(Integer id) {try {//设置消息属性MessageProperties messageProperties = new MessageProperties();messageProperties.setMessageId(UUID.randomUUID().toString());messageProperties.setContentType("text/plain");messageProperties.setContentEncoding("utf-8");// 发送消息Message message = new Message(messageStr.getBytes(), messageProperties);message.setId(id);// 同步发送消息rabbitTemplate.convertAndSend(Demo01Message.EXCHANGE, Demo01Message.ROUTING_KEY, message);// 返回成功的 Futurereturn AsyncResult.forValue(null);} catch (Throwable ex) {// 返回异常的 Futurereturn AsyncResult.forExecutionException(ex);}}//扇形交换机public void sendMessage(Object message) {log.info("【消息发送者】发送消息到fanout交换机,消息内容为: {}", message);rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE_NAME, "", message);}//带消息序列的消息public void sendMessage() {//采用内部类方式for (int i = 1; i <= 3; i++) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());logger.info("【Producer】发送的消费ID = {}", correlationData.getId());String msg = "hello confirm message" + i;logger.info("【Producer】发送的消息 = {}", msg);rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, msg, correlationData);}}//主题交换机//确认回调接口final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {/*** @param ack broker 是否落盘成功* @param cause 失败的一些异常信息*/@Overridepublic void confirm(CorrelationData correlationData,boolean ack, String cause) {System.err.println("消息ACK结果:" + ack+ ", correlationData: " + correlationData.getId());}};// 简单的主题消息public void sendMessage1(Object message,Map<String, Object> properties) throws Exception {//创建消息MessageHeaders mhs = new MessageHeaders(properties);Message<?> msg = MessageBuilder.createMessage(message, mhs);rabbitTemplate.setConfirmCallback(confirmCallback);// 指定业务唯一的iDCorrelationData correlationData =new CorrelationData(UUID.randomUUID().toString());//消息处理器MessagePostProcessor mpp = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {System.err.println("---> post to do: " + message);return message;}};//消息发送,交换机,路由rabbitTemplate.convertAndSend("exchange-1", "route.01", msg,mpp, correlationData);}//消息发送public void syncSend(Integer id, String headerValue) {// 创建 MessageProperties 属性MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader(Demo04Message.HEADER_KEY, headerValue); // 设置 header// 创建 Message 消息Message message = rabbitTemplate.getMessageConverter().toMessage(new Demo04Message().setId(id), messageProperties);// 同步发送消息rabbitTemplate.send(Demo04Message.EXCHANGE, null, message);}
}
@Data
public class Demo07Message implements Serializable {public static final String QUEUE = "QUEUE_DEMO_07"; // 正常队列public static final String DEAD_QUEUE = "DEAD_QUEUE_DEMO_07"; // 死信队列public static final String EXCHANGE = "EXCHANGE_DEMO_07";public static final String ROUTING_KEY = "ROUTING_KEY_07"; // 正常路由键public static final String DEAD_ROUTING_KEY = "DEAD_ROUTING_KEY_07"; // 死信路由键private Integer id;private String body;
}
6、Headers交换机
Headers Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配
//消息体重定义HEADER_KEY
public class Demo04Message implements Serializable {public static final String QUEUE = "QUEUE_DEMO_04_A";public static final String EXCHANGE = "EXCHANGE_DEMO_04";public static final String HEADER_KEY = "color";public static final String HEADER_VALUE = "red";/*** 编号*/private Integer id;// ... 省略 set/get/toString 方法}
三、消息消费者
- @RabbitHandler 只能放在方法上,表明这是一个消息处理方法
- @RabbitListener 可以放在方法上和类上,一般使用在方法上
- queues 指定队列名称
- concurrency 并发消费
消息消费者示例
public class RabbitMQConsumerDemo{// 通过注解自动创建 spring.simple.queue 队列@RabbitListener(queuesToDeclare = @Queue("spring.simple.queue"))public void listen(String msg) {System.out.println("简单队列 接收到消息:" + msg);}// 通过注解自动创建 spring.work.queue 队列@RabbitListener(queuesToDeclare = @Queue("spring.work.queue"))public void listen(String msg) {System.out.println("work模式 接收到消息:" + msg);}// 创建两个队列共同消费@RabbitListener(queuesToDeclare = @Queue("spring.work.queue"))public void listen2(String msg) {System.out.println("work模式二 接收到消息:" + msg);}//直连交换机示例@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "customer.code.wechatCallback",autoDelete = "false"),exchange = @Exchange(value = "code.wechatCallback",type = ExchangeTypes.FANOUT)),concurrency = "2")public void receiveMessage01(String msg, Channel channel, Message message) throws IOException {try {// 这里模拟一个空指针异常,String string = null;string.length();//消费端限流channel.basicQos(3,false);log.info("【Consumer01成功接收到消息】>>> {}", msg);// 确认收到消息,只确认当前消费者的一个消息收到channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.info("【Consumer01】消息已经回滚过,拒绝接收消息 : {}", msg);// 拒绝消息,并且不再重新进入队列channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.info("【Consumer01】消息即将返回队列重新处理 :{}", msg);//设置消息重新回到队列处理// requeue表示是否重新回到队列,true重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}//扇形交换机示例@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "customer.code.wechatCallback",autoDelete = "false"),exchange = @Exchange(value = "code.wechatCallback",type = ExchangeTypes.FANOUT)),concurrency = "1")public void receiveMessage(Object message) {logger.info("消息接收者接收到来自【队列一】的消息,消息内容: {}", message);}//主题交换机@RabbitHandler@RabbitListener(bindings = @QueueBinding(value= @Queue(value = "queue-1",durable = "true",autoDelete = "false"),exchange = @Exchange(name = "exchange-1",durable = "true",type = ExchangeTypes.TOPICignoreDeclarationExceptions = "true"),key = "route.*"))public void onMessage(Message message, Channel channel,String body) throws Exception {// 1. 收到消息以后进行业务端消费处理System.err.println("消费消息:" + message.getPayload());//设置手工牵手Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);//手动确认消息channel.basicAck(deliveryTag, false);}//header交换机@Component@RabbitListener(queues = Demo04Message.QUEUE)public class Demo04Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(Demo04Message message) {logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);}}
}
2、重试机制【消息消费者】
- 方式一:通过配置项的方式,详细看公用配置项的设置,改方式的优点是简单,缺点是不容易控制重试的时间
- 方式二:结合MySQL或Redis等持久化方式控制重试次数,该方式的优点是可以自由控制重试的间隔时间,缺点是比较复杂
五、回调消息【消息生产者】
(1)方式一:自定义消息回调处理类
@Slf4j
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.*/@PostConstructpublic void init() {//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);//指定 ReturnCallbackrabbitTemplate.setReturnCallback(this);}/*** 消息从交换机成功到达队列,则returnedMessage方法不会执行;* 消息从交换机未能成功到达队列,则returnedMessage方法会执行;*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("returnedMessage回调方法>>>" + new String(message.getBody(), StandardCharsets.UTF_8) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);}/*** 如果消息没有到达交换机,则该方法中isSendSuccess = false,error为错误信息;* 如果消息正确到达交换机,则该方法中isSendSuccess = true;*/@Overridepublic void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {log.info("confirm回调方法>>>回调消息ID为: " + correlationData.getId());if (isSendSuccess) {logger.info("confirm回调方法>>>消息发送到交换机成功!");} else {logger.info("confirm回调方法>>>消息发送到交换机失败!,原因 : [{}]", error);}}}
public class callbackConfirmDemo{@AutoWiredprivate CustomConfirmAndReturnCallback confirmCallback;// 简单的主题消息public void sendMessage1(Object message,Map<String, Object> properties) throws Exception {//创建消息MessageHeaders mhs = new MessageHeaders(properties);Message<?> msg = MessageBuilder.createMessage(message, mhs);// 使用自定义的确认回调rabbitTemplate.setConfirmCallback(confirmCallback);// 指定业务唯一的iDCorrelationData correlationData =new CorrelationData(UUID.randomUUID().toString());//消息处理器MessagePostProcessor mpp = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {System.err.println("---> post to do: " + message);return message;}};//消息发送rabbitTemplate.convertAndSend("exchange-1", "springboot.rabbit", msg,mpp, correlationData);}//设置回调public void syncSend(Integer id) {// 创建 Demo13Message 消息Demo13Message message = new Demo13Message();message.setId(id);// 同步发送消息rabbitTemplate.invoke(new RabbitOperations.OperationsCallback<Object>() {@Overridepublic Object doInRabbit(RabbitOperations operations) {// 同步发送消息operations.convertAndSend(Demo13Message.EXCHANGE, Demo13Message.ROUTING_KEY, message);logger.info("[doInRabbit][发送消息完成]");// 等待确认operations.waitForConfirms(0); // timeout 参数,如果传递 0 ,表示无限等待logger.info("[doInRabbit][等待 Confirm 完成]");return null;}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {logger.info("[handle][Confirm 成功]");}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {logger.info("[handle][Confirm 失败]");}});}//设置消息回调public void sendMessage(){//设置消息消费端确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{if(ack){log.info("消息{}接收成功",correlationData.getId());}else{log.info("消息{}接收失败,原因{}",correlationData.getId(),cause);}});//设置消费端返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{log.info("消息{}发送失败,应答码{},原因{},交换机{},路由键{}",message.toString(),replyCode,replyText,exchange,routingKey);});rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE_NAME, "", message);}
}
(2)方式二:匿名内部类方式处理
六、批量消息生产和消费
方式一:通过application,yaml配置项中添加批量生产和消费相关配置可以实现
方式二:通过手动的方式,设置BatchingRabbitTemplate和SimpleRabbitListenerContainerFactory实现
方式三: 批量消费中,通过注解配置 concurrency = "2"形式类配置并发批量消费
注意,批量消费和批量生产不一定需要严格搭配,按照各自需求来即可
1、在配置类中添加
- BatchingRabbitTemplate
- SimpleRabbitListenerContainerFactory
@Configuration
public class RabbitConfig {//手动设置批量处理@Beanpublic BatchingRabbitTemplate batchRabbitTemplate(ConnectionFactory connectionFactory) {// 创建 BatchingStrategy 对象,代表批量策略int batchSize = 16384; // 超过收集的消息数量的最大条数。int bufferLimit = 33554432; // 每次批量发送消息的最大内存int timeout = 30000; // 超过收集的时间的最大等待时长,单位:毫秒BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(batchSize, bufferLimit, timeout);// 创建 TaskScheduler 对象,用于实现超时发送的定时器TaskScheduler taskScheduler = new ConcurrentTaskScheduler();// 创建 BatchingRabbitTemplate 对象BatchingRabbitTemplate batchTemplate = new BatchingRabbitTemplate(batchingStrategy, taskScheduler);batchTemplate.setConnectionFactory(connectionFactory);return batchTemplate;}// 批量消息监听@Bean(name = "consumerBatchContainerFactory")public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {// 创建 SimpleRabbitListenerContainerFactory 对象SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);// 额外添加批量消费的属性factory.setBatchListener(true);factory.setBatchSize(10);factory.setReceiveTimeout(30 * 1000L);factory.setConsumerBatchEnabled(true);return factory;}
}
2、批量生产
- BatchingRabbitTemplate
@Component
public class Demo06Producer {@Autowiredprivate BatchingRabbitTemplate batchingRabbitTemplate;public void syncSend(Integer id) {// 创建 Demo05Message 消息Demo05Message message = new Demo05Message();message.setId(id);// 同步发送消息batchingRabbitTemplate.convertAndSend(Demo05Message.EXCHANGE, Demo05Message.ROUTING_KEY, message);}}
2、批量消费
- containerFactory
@Component
@RabbitListener(queues = Demo05Message.QUEUE, concurrency = "2",containerFactory = "consumerBatchContainerFactory")
public class Demo05Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(List<Demo05Message> messages) {logger.info("[onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getId(), messages.size());}}
七、延迟队列
方式一:采用RabbitMQ的控制台方式添加延迟队列
方式二:采用程序的方式设置延迟队列
1、配置延迟队列
// 创建 Queue
public class delayQueueDemo{@Beanpublic Queue demo08Queue() {return QueueBuilder.durable(Demo08Message.QUEUE) // durable: 是否持久化.exclusive() // exclusive: 是否排它.autoDelete() // autoDelete: 是否自动删除.ttl(10 * 1000) // 设置队列里的默认过期时间为 10 秒.deadLetterExchange(Demo08Message.EXCHANGE).deadLetterRoutingKey(Demo08Message.DELAY_ROUTING_KEY).build();}
}public class delayProducerDemo{public void syncSend(Integer id, Integer delay) {// 创建 Demo07Message 消息Demo08Message message = new Demo08Message();message.setId(id);// 同步发送消息rabbitTemplate.convertAndSend(Demo08Message.EXCHANGE, Demo08Message.ROUTING_KEY, message,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 设置消息的 TTL 过期时间if (delay != null && delay > 0) {message.getMessageProperties().setExpiration(String.valueOf(delay)); // Spring-AMQP API 设计有问题,所以传入了 String = =}return message;}});}
}
八、消费异常处理
1、自定义异常处理
@Component("rabbitListenerErrorHandler")
public class RabbitListenerErrorHandlerImpl implements RabbitListenerErrorHandler {private Logger logger = LoggerFactory.getLogger(getClass());@Overridepublic Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,ListenerExecutionFailedException exception) {// 打印异常日志logger.error("[handleError][amqpMessage:[{}] message:[{}]]", amqpMessage, message, exception);// 直接继续抛出异常throw exception;}}
@Component
public class RabbitLoggingErrorHandler implements ErrorHandler {private Logger logger = LoggerFactory.getLogger(getClass());public RabbitLoggingErrorHandler(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory) {rabbitListenerContainerFactory.setErrorHandler(this);}@Overridepublic void handleError(Throwable t) {logger.error("[handleError][发生异常]]", t);}}
2、消费者添加监听处理异常
@Component
@RabbitListener(queues = Demo16Message.QUEUE,errorHandler = "rabbitListenerErrorHandler")
//@RabbitListener(queues = Demo15Message.QUEUE)
public class Demo16Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(Demo16Message message) {logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);// 模拟消费异常throw new RuntimeException("你猜");}}
九、广播消费和集群消费
在 RabbitMQ 中,如果多个 Consumer 订阅相同的 Queue ,那么每一条消息有且仅会被一个 Consumer 所消费。这个特性,就为我们实现集群消费提供了基础。在广播消费概念中,如果多个 Consumer 订阅相同的 Queue ,我们可以通过给每个 Consumer 创建一个其独有 Queue ,从而保证都能接收到全量的消息。同时,RabbitMQ 支持队列的自动删除,所以我们可以在 Consumer 关闭的时候,通过该功能删除其独有的 Queue如何实现广播消费,
@Component
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = BroadcastMessage.QUEUE + "-" + "#{T(java.util.UUID).randomUUID()}",autoDelete = "true"),exchange = @Exchange(name = BroadcastMessage.EXCHANGE,type = ExchangeTypes.TOPIC,declare = "false"))
)
public class BroadcastConsumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(BroadcastMessage message) {logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);}}
十、关于死信队列的处理方式
-
首先,死信队列的绑定,参考公用配置类
-
方式一:通过配置类自动创建死信交换机,队列并绑定,同时在正常的队列中通过Map的形式添加死信队列的配置
-
方式二:在RabbitMQ的工作台上手动给队列添加死信队列,
-
如果死信队列积累较多,可以通过死信队列监听器来处理死信队列数据
1、定义死信队列的绑定关系
public class DirectExchangeDemoConfiguration {@Beanpublic DirectExchange dlxExchange(){//死信交换机return new DirectExchange(dlxExchangeName);}@Beanpublic Queue dlxQueue(){//死信队列return new Queue(dlxQueueName);}@Beanpublic Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){//死信队列绑定交换机return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);}//正常队列@Beanpublic Queue queue(){// 正常队列中通过参数绑定死信队列Map<String,Object> params = new HashMap<>();params.put("x-dead-letter-exchange",dlxExchangeName);//声明当前队列绑定的死信交换机params.put("x-dead-letter-routing-key",dlxRoutingKey);//声明当前队列的死信路由键return QueueBuilder.durable(queueName).withArguments(params).build();}// 创建 Direct Exchange@Beanpublic DirectExchange demo01Exchange() {return new DirectExchange(Demo01Message.EXCHANGE,true, // durable: 是否持久化false); // exclusive: 是否排它}// 创建 Binding// Exchange:Demo01Message.EXCHANGE// Routing key:Demo01Message.ROUTING_KEY// Queue:Demo01Message.QUEUE@Beanpublic Binding demo01Binding() {return BindingBuilder.bind(demo01Queue()).to(demo01Exchange()).with(Demo01Message.ROUTING_KEY);}
}
2、定义死信队列的消费者
@Component
@RabbitListener(queues = Demo07Message.DEAD_QUEUE)
public class Demo07DeadConsumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(Demo07Message message) {logger.info("[onMessage][【死信队列】线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);}}
十一、顺序队列
- 普通顺序消息 :Producer 将相关联的消息发送到相同的消息队列。
- 完全严格顺序 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。
1、定义多个队列
// Demo10Message.javapublic class Demo10Message implements Serializable {private static final String QUEUE_BASE = "QUEUE_DEMO_10-";public static final String QUEUE_0 = QUEUE_BASE + "0";public static final String QUEUE_1 = QUEUE_BASE + "1";public static final String QUEUE_2 = QUEUE_BASE + "2";public static final String QUEUE_3 = QUEUE_BASE + "3";public static final int QUEUE_COUNT = 4;public static final String EXCHANGE = "EXCHANGE_DEMO_10";/*** 编号*/private Integer id;// ... 省略 set/get/toString 方法}
2、实现顺序队列的绑定关系
public class DirectExchangeDemoConfiguration {// 创建 Queue@Beanpublic Queue demo10Queue0() {return new Queue(Demo10Message.QUEUE_0);}@Beanpublic Queue demo10Queue1() {return new Queue(Demo10Message.QUEUE_1);}@Beanpublic Queue demo10Queue2() {return new Queue(Demo10Message.QUEUE_2);}@Beanpublic Queue demo10Queue3() {return new Queue(Demo10Message.QUEUE_3);}// 创建 Direct Exchange@Beanpublic DirectExchange demo10Exchange() {return new DirectExchange(Demo10Message.EXCHANGE,true, // durable: 是否持久化false); // exclusive: 是否排它}// 创建 Binding@Beanpublic Binding demo10Binding0() {return BindingBuilder.bind(demo10Queue0()).to(demo10Exchange()).with("0");}@Beanpublic Binding demo10Binding1() {return BindingBuilder.bind(demo10Queue1()).to(demo10Exchange()).with("1");}@Beanpublic Binding demo10Binding2() {return BindingBuilder.bind(demo10Queue2()).to(demo10Exchange()).with("2");}@Beanpublic Binding demo10Binding3() {return BindingBuilder.bind(demo10Queue3()).to(demo10Exchange()).with("3");}}
2、发送顺序队列消息
// Demo10Producer.java@Component
public class Demo10Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void syncSend(Integer id) {// 创建 Demo10Message 消息Demo10Message message = new Demo10Message();message.setId(id);// 同步发送消息rabbitTemplate.convertAndSend(Demo10Message.EXCHANGE, this.getRoutingKey(id), message);}private String getRoutingKey(Integer id) {return String.valueOf(id % Demo10Message.QUEUE_COUNT);}}
3、消费顺序队列消息
// Demo10Consumer.java@Component
@RabbitListener(queues = Demo10Message.QUEUE_0)
@RabbitListener(queues = Demo10Message.QUEUE_1)
@RabbitListener(queues = Demo10Message.QUEUE_2)
@RabbitListener(queues = Demo10Message.QUEUE_3)
public class Demo10Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandler(isDefault = true)public void onMessage(Message<Demo10Message> message) {logger.info("[onMessage][线程编号:{} Queue:{} 消息编号:{}]", Thread.currentThread().getId(), getQueue(message),message.getPayload().getId());}private static String getQueue(Message<Demo10Message> message) {return message.getHeaders().get("amqp_consumerQueue", String.class);}}
十二、事务消息
1、创建事务消息配置
// RabbitConfig.java@Configuration
@EnableTransactionManagement
public class RabbitConfig {/*** Direct Exchange 示例的配置类*/public static class DirectExchangeDemoConfiguration {// 创建 Queue@Beanpublic Queue demo11Queue() {return new Queue(Demo11Message.QUEUE, // Queue 名字true, // durable: 是否持久化false, // exclusive: 是否排它false); // autoDelete: 是否自动删除}// 创建 Direct Exchange@Beanpublic DirectExchange demo11Exchange() {return new DirectExchange(Demo11Message.EXCHANGE,true, // durable: 是否持久化false); // exclusive: 是否排它}// 创建 Binding// Exchange:Demo11Message.EXCHANGE// Routing key:Demo11Message.ROUTING_KEY// Queue:Demo11Message.QUEUE@Beanpublic Binding demo11Binding() {return BindingBuilder.bind(demo11Queue()).to(demo11Exchange()).with(Demo11Message.ROUTING_KEY);}}@Beanpublic RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory, RabbitTemplate rabbitTemplate) {// <Y> 设置 RabbitTemplate 支持事务rabbitTemplate.setChannelTransacted(true);// 创建 RabbitTransactionManager 对象return new RabbitTransactionManager(connectionFactory);}}
2、生产事务消息
- 注意@Transactional注解
@Component
public class Demo11Producer {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate RabbitTemplate rabbitTemplate;@Transactionalpublic void syncSend(Integer id) throws InterruptedException {// 创建 Demo11Message 消息Demo11Message message = new Demo11Message();message.setId(id);// 同步发送消息rabbitTemplate.convertAndSend(Demo11Message.EXCHANGE, Demo11Message.ROUTING_KEY, message);logger.info("[syncSend][发送编号:[{}] 发送成功]", id);// <X> 等待Thread.sleep(10 * 1000L);}}
3、消费事务消息
// Demo11ProducerTest.java@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo11ProducerTest {@Autowiredprivate Demo11Producer producer;@Testpublic void testSyncSend() throws InterruptedException {int id = (int) (System.currentTimeMillis() / 1000);producer.syncSend(id);// 阻塞等待,保证消费new CountDownLatch(1).await();}}
十三、消息确认
1、修改配置文件
# 同步确认
spring.rabbitmq.listener.simple. acknowledge-mode=manual
spring.rabbitmq.publisher-confirm-type=simple# 异步确认
spring.rabbitmq.publisher-confirm-type=correlated
2、消费确认
- channel.basicAck
@Component
@RabbitListener(queues = Demo12Message.QUEUE)
public class Demo12Consumer {private Logger logger = LoggerFactory.getLogger(getClass());@RabbitHandlerpublic void onMessage(Demo12Message message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);// 提交消费进度if (message.getId() % 2 == 1) {// ack 确认消息// 第二个参数 multiple ,用于批量确认消息,为了减少网络流量,手动确认可以被批处。// 1. 当 multiple 为 true 时,则可以一次性确认 deliveryTag 小于等于传入值的所有消息// 2. 当 multiple 为 false 时,则只确认当前 deliveryTag 对应的消息channel.basicAck(deliveryTag, false);}}}
3、发送确认
// Demo13Producer.java@Component
public class Demo13Producer {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate RabbitTemplate rabbitTemplate;public void syncSend(Integer id) {// 创建 Demo13Message 消息Demo13Message message = new Demo13Message();message.setId(id);// 同步发送消息rabbitTemplate.invoke(new RabbitOperations.OperationsCallback<Object>() {@Overridepublic Object doInRabbit(RabbitOperations operations) {// 同步发送消息operations.convertAndSend(Demo13Message.EXCHANGE, Demo13Message.ROUTING_KEY, message);logger.info("[doInRabbit][发送消息完成]");// 等待确认operations.waitForConfirms(0); // timeout 参数,如果传递 0 ,表示无限等待logger.info("[doInRabbit][等待 Confirm 完成]");return null;}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {logger.info("[handle][Confirm 成功]");}}, new ConfirmCallback() {@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {logger.info("[handle][Confirm 失败]");}});}}
4、定义异步确认的回调类setConfirmCallback
- rabbitTemplate.setConfirmCallback 配置了全局的异步确认
@Component
public class RabbitProducerConfirmCallback implements RabbitTemplate.ConfirmCallback {private Logger logger = LoggerFactory.getLogger(getClass());public RabbitProducerConfirmCallback(RabbitTemplate rabbitTemplate) {rabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {logger.info("[confirm][Confirm 成功 correlationData: {}]", correlationData);} else {logger.error("[confirm][Confirm 失败 correlationData: {} cause: {}]", correlationData, cause);}}}
5、返回确认ReturnCallback
// RabbitProducerReturnCallback.java@Component
public class RabbitProducerReturnCallback implements RabbitTemplate.ReturnCallback {private Logger logger = LoggerFactory.getLogger(getClass());public RabbitProducerReturnCallback(RabbitTemplate rabbitTemplate) {rabbitTemplate.setReturnCallback(this);}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.error("[returnedMessage][message: [{}] replyCode: [{}] replyText: [{}] exchange: [{}] routingKey: [{}]]",message, replyCode, replyText, exchange, routingKey);}}
RabbitMQ的消息高可用+重试方案落地
- 1、通过MQ消息失败记录表记录失败记录
- 2、通过xxl-job定时扫描数据表,然后对于超过重试次数的进行重试。
一、消息消费
1、创建一个抽象的模板类
@Slf4j
public abstract class AbstractConsumer {/*** 定义静态内部类指定监听队列名称*/static class queueName{public static final String ORDER_CANCEL_QUEUE = "queue.assets_center.unification.order_cancel";public static final String HOUR_EXCHANGE_REFUND_QUEUE = "queue.assets_center.unification.convert_in_wandou_packet_revert";}@Resourceprivate IMqConsumeService mqConsumeService;@Autowiredprivate RetryMessageMapper retryMessageMapper;@Autowiredprivate DingDingHelper dingDingHelper;private static final String MQ_HEADER_REF = "spring_returned_message_correlation";/*** 消息订阅入口方法*/@SneakyThrowsprotected void doConsume(Channel channel, Message message, String body) {String messageId = (String) message.getMessageProperties().getHeaders().get(MQ_HEADER_REF);try {//1、添加traceIdMDC.put("X-TRACE-ID", messageId);log.info("[{}] 消费消息: messageId:{}, mq参数:{}", getConsumeType().getMessage(), messageId, body);//2、保存消息日志以及幂等性判断boolean check = this.saveConsumeLogAndJudgeIdempotent(channel, message, body);if(check){//3、业务处理this.bizHandle(body);//4、手工ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} catch (Exception exception) {log.error("消息消费异常, messageId = " + messageId, exception);//1、异常通知dingDingHelper.sendAlert(getConsumeType().getMessage() + "- 消息消费异常","消息内容:" + body);//2、异常记录,并启动重试机制RetryMessage retryMessage = new RetryMessage(getQueueEnum(),getTimedMaxRetryTimes(),getTimedRecompenseBizMethod(),messageId,body,exception);retryMessageMapper.insert(retryMessage);//3、响应消费失败channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} finally {MDC.remove("X-TRACE-ID");}}/*** 保存消费日志 & 消息幂等判断*/@SneakyThrowsprotected boolean saveConsumeLogAndJudgeIdempotent(Channel channel, Message message, String body) {String messageId = (String) message.getMessageProperties().getHeaders().get(MQ_HEADER_REF);//幂等校验,是否重复MqConsume mqConsume = new MqConsume(getConsumeType(),getQueueEnum().getQueueName(),messageId,body);Boolean repeat = mqConsumeService.insert(mqConsume);//重复消费控制if (repeat) {// 手工ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return false;}return true;}//定义的处理业务代码,由子类实现处理protected abstract void bizHandle(String body);/*** 获取订阅的队列key*/protected abstract ListennedQueueEnum getQueueEnum();/*** 获取消费业务类型*/protected abstract MqConsumeTypeEnums getConsumeType();/*** 定时补偿方法*/public void timedRecompense(String body) {bizHandle(body);}/*** 获取本地定时补偿次数 默认3次*/protected Integer getTimedMaxRetryTimes() {return 3;}/*** 定时补偿业务方法* 格式:springbean的实例+需要执行补偿的方法* @return*/protected String getTimedRecompenseBizMethod() {Component component = AnnotationUtils.findAnnotation(this.getClass(), Component.class);String beanName = StrUtil.isBlank(component.value())? StrUtil.lowerFirst(this.getClass().getSimpleName()) : component.value();return beanName.concat(".").concat("timedRecompense");}
}
2、消息监听和消费过程
- 继承AbstractConsumer 并实现其中的抽象方法
@Slf4j
@Component
public class AssertsConvertOrderCancelConsumer extends AbstractConsumer {@Value("${klzz.internal.config.appId}")private String localAppId;/*** 监听订单取消*/@RabbitListener(queues = queueName.ORDER_CANCEL_QUEUE)public void messageListener(Channel channel, Message message, String body) {//调用父类的doConsume入口进行处理super.doConsume(channel, message, body);}/*** 子类实现的业务处理方法* @param body*/@Overridepublic void bizHandle(String body) {// TODO 真正的消息处理}@Overrideprotected ListennedQueueEnum getQueueEnum() {return ListennedQueueEnum.ORDER_CANCEL;}@Overrideprotected MqConsumeTypeEnums getConsumeType() {return MqConsumeTypeEnums.ASSETS_CONVERT_ORDER_CANCEL;}
}
3、定义消息的队列配置枚举
@Getter
@AllArgsConstructor
public enum ListennedQueueEnum {ORDER_CANCEL("order_cancel","exchange.trade_order","queue.assets_center.unification.order_cancel"),HOUR_EXCHANGE_REFUND("hour_exchange_refund","exchange.order","queue.assets_center.unification.convert_in_wandou_packet_revert");private String routeKey;private String exchangeKey;private String queueName;}
二、消息消息持久化
- 接收消息持久化,是为了保证MQ消息的高可用,
- 同时在处理MQ消息的过程中一定要确保mq_body的长度能够保存,否则容易出现因为保存数据失败导致MQ消息处理问题
1、消息日志表
- 注意:msg_id设置为唯一索引,进行唯一性校验
CREATE TABLE `mq_consume` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID',`type` int(11) NOT NULL DEFAULT '0' COMMENT '消费类型',`msg_id` varchar(64) NOT NULL DEFAULT '' COMMENT '消息id',`title` varchar(32) NOT NULL DEFAULT '' COMMENT '标题',`queue_name` varchar(64) NOT NULL DEFAULT '' COMMENT '队列名称',`mq_body` varchar(3000) NOT NULL DEFAULT '' COMMENT '消息内容',`remark` varchar(256) NOT NULL DEFAULT '' COMMENT '备注',`app_id` varchar(64) NOT NULL DEFAULT '' COMMENT 'appId',`is_deleted` tinyint(1) NOT NULL DEFAULT '1' COMMENT '删除状态 1:正常 2:已删除',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',`version` int(5) NOT NULL DEFAULT '1' COMMENT '版本号',`status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '1:成功,2=失败',PRIMARY KEY (`id`),UNIQUE KEY `unix_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='mq消息消费表';
2、消息持久化Bean类
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MqConsume implements Serializable {private static final long serialVersionUID = 1L;@TableId(value = "id", type = IdType.AUTO)private Long id;private Integer type;private String msgId;private String title;private String queueName;private String mqBody;private String remark;private String appId;private Integer isDeleted; //删除状态 1:正常 2:已删除private LocalDateTime createTime;private LocalDateTime updateTime;private Integer version;private Integer status; // 1:成功,2=失败public MqConsume(MqConsumeTypeEnums consumeType,String queueName, String messageId, String body){this.type = consumeType.getCode();this.title = consumeType.getMessage();this.msgId = messageId;this.mqBody = body;this.queueName = queueName;}
}
3、MQ消息的Mapper
//MQ消息接口
public interface IMqConsumeService extends IService<MqConsume> {Boolean insert(MqConsume mqConsume);
}
//MQ业务实现类
@Slf4j
@Service("mqConsumeService")
public class MqConsumeServiceImpl extends ServiceImpl<MqConsumeMapper, MqConsume> implements IMqConsumeService {@Resourceprivate MqConsumeMapper mqConsumeMapper;@Overridepublic Boolean insert(MqConsume mqConsume) {Boolean repeat=Boolean.FALSE;try {mqConsumeMapper.insert(mqConsume);} catch (DuplicateKeyException e) {repeat = Boolean.TRUE;log.error("重复key插入失败={}",e.getMessage());}return repeat;}public MqConsume getByMessageId(String messageId) {LambdaQueryWrapper<MqConsume> queryWrapper = new QueryWrapper<MqConsume>().lambda().eq(MqConsume::getMsgId, messageId).eq(MqConsume::getIsDeleted, 1);return baseMapper.selectOne(queryWrapper);}
}
//DAO类
public interface MqConsumeMapper extends BaseMapper<MqConsume> {}
三、消息的重试机制
1、消息重试持久化Bean
@Data
@NoArgsConstructor
@EqualsAndHashCode(callSuper = false)
@ApiModel(value="retryMessage对象", description="用户消息重试表")
public class RetryMessage implements Serializable {private static final long serialVersionUID = 1L;private Long id;@ApiModelProperty(value = "消息id")private String messageId;@ApiModelProperty(value = "1-消费失败,2-消费成功,3-重试次数达到上限,4:执行中")private Integer consumerStatus;@ApiModelProperty(value = "交换机名字")private String exchange;@ApiModelProperty(value = "路由key")private String routingKey;@ApiModelProperty(value = "队列名字")private String queue;@ApiModelProperty(value = "消息实体,json格式")private String msgBody;@ApiModelProperty(value = "已重试次数")private Integer retryNum;@ApiModelProperty(value = "最大重试次数")private Integer maxRetryNum;@ApiModelProperty(value = "处理器名称")private String handleName;@ApiModelProperty(value = "错误信息")private String errorMsg;private Date createTime;private Date updateTime;/*** 构造方法*/public RetryMessage(ListennedQueueEnum queueEnum,Integer maxRetryNum,String handleName,String messageId,String body,Exception e){this.messageId = messageId;this.consumerStatus = 1;this.routingKey = queueEnum.getRouteKey();this.exchange = queueEnum.getExchangeKey();this.queue = queueEnum.getQueueName();this.msgBody = body;this.retryNum = 0;this.maxRetryNum = maxRetryNum;this.handleName = handleName;String errorMessage = Optional.ofNullable(e.getMessage()).orElse("");errorMessage = errorMessage.length() > 800 ? errorMessage.substring(0, 800) + "..." : errorMessage;this.errorMsg = errorMessage;this.createTime = new Date();this.updateTime = new Date();}}
2、消息重试Mapper
public interface RetryMessageMapper extends BaseMapper<RetryMessage> {@Select("SELECT * from retry_message WHERE consumer_status = #{consumerStatus} order by id asc limit #{beginSize}, #{size}")List<RetryMessage> getConsumerList(@Param("consumerStatus") Integer consumerStatus, @Param("beginSize") Integer beginSize, @Param("size") Integer size);}
3、创建消息重试定时任务
@Slf4j
@Component
@JobHandler(value = "retryMessageJobHandler")
public class RetryMessageJobHandler extends IJobHandler {@Resourceprivate RetryMessageMapper retryMessageMapper;@Resourceprivate SpringReflectionUtil springReflectionUtil;@Resourceprivate DingDingHelper dingDingHelper;@Trace@Overridepublic ReturnT<String> execute(String s) throws Exception {String[] split = s.split(",");//起始值Integer beginSize = Integer.valueOf(split[0]);//页大小Integer size = Integer.valueOf(split[1]);List<RetryMessage> retryMessages = retryMessageMapper.getConsumerList(1, beginSize, size);if (!CollectionUtils.isEmpty(retryMessages)) {for (RetryMessage retryMessage : retryMessages) {try {//更新状态,改为执行中,保证幂等UpdateWrapper updateWrapper = new UpdateWrapper<>();updateWrapper.eq("id", retryMessage.getId());updateWrapper.eq("consumer_status", ConsumerSatusEnums.CONSUME_FAIL.getCode());retryMessage.setConsumerStatus(ConsumerSatusEnums.CONSUME_EXEC.getCode());int updateResult = retryMessageMapper.update(retryMessage, updateWrapper);//没更新成功,证明已经在执行中,直接返回if (updateResult < 1) {continue;}//超过最大次数,不处理if (retryMessage.getRetryNum()>=retryMessage.getMaxRetryNum()) {continue;}//处理器为dealUserPayStatusMsg的业务String handleName = retryMessage.getHandleName();//.号分开String[] handleNameSplit = handleName.split("\\.");String serviceName = handleNameSplit[0];String methodName = handleNameSplit[1];Object[] params = {retryMessage.getMsgBody()};springReflectionUtil.springInvokeMethod(serviceName, methodName, params);//成功更新重试次数sucessUpdate(retryMessage);} catch (Exception e) {//失败更新重试次数failUpdate(retryMessage);log.error("RetryMessageJobHandler处理异常={},retryMessage实体= {}", e.getMessage(),JSONUtil.toJsonStr(retryMessage));}}}return ReturnT.SUCCESS;}/*** 失败更新* @param retryMessage*/private void failUpdate(RetryMessage retryMessage) {RetryMessage retryMessageUpadte = new RetryMessage();retryMessageUpadte.setRetryNum(retryMessage.getRetryNum()+1);retryMessageUpadte.setId(retryMessage.getId());//由消费中,改回消费失败retryMessageUpadte.setConsumerStatus(ConsumerSatusEnums.CONSUME_FAIL.getCode());//判断是否达到最大重试次数if (retryMessageUpadte.getRetryNum()>=retryMessage.getMaxRetryNum()) {retryMessageUpadte.setConsumerStatus(ConsumerSatusEnums.CONSUME_MAX_LIMIT.getCode());//钉钉通知人工处理String title = "补偿最大次数用完,转人工干涉处理";String context = "消息实体=" + JSONUtil.toJsonStr(retryMessage);dingDingHelper.sendAlert(title,context);}retryMessageMapper.updateById(retryMessageUpadte);}/*** 成功更新* @param retryMessage*/private void sucessUpdate(RetryMessage retryMessage) {RetryMessage retryMessageUpadte = new RetryMessage();retryMessageUpadte.setRetryNum(retryMessage.getRetryNum()+1);retryMessageUpadte.setId(retryMessage.getId());//处理消费成功retryMessageUpadte.setConsumerStatus(ConsumerSatusEnums.CONSUME_SUCCESS.getCode());retryMessageMapper.updateById(retryMessageUpadte);}}
Spring反射工具类
@Component
public class SpringReflectionUtil {@Resourceprivate ApplicationContext applicationContext;public Object springInvokeMethod(String serviceName, String methodName, Object[] params){Object service = applicationContext.getBean(serviceName);Class<? extends Object>[] paramClass = null;if (params != null) {int paramsLength = params.length;paramClass = new Class[paramsLength];for (int index = 0; index < paramsLength; index++) {paramClass[index] = params[index].getClass();}}// 找到需要执行的方法Method method = ReflectionUtils.findMethod(service.getClass(), methodName, paramClass);// 执行方法return ReflectionUtils.invokeMethod(method, service, params);}}
四、RabbitMQ消息发送
1、MQ消息管理器
@Slf4j
@Component
public class RabbitMQManager {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(QueueEnum queueEnum, String jsonStr) {String exchange = queueEnum.getExchange();String routeKey = queueEnum.getRouteKey();String error = "success";long timeMillis = System.currentTimeMillis();try {rabbitTemplate.convertAndSend(exchange, routeKey, jsonStr);} catch (Exception e) {log.error(e.getMessage(), e);error = e.getMessage();} finally {String stringBuilder = "Rabbit MQ 发送消息:\n" +"交换机\t: " + exchange + "\n" +"路由键\t: " + routeKey + "\n" +"消息内容\t: " + jsonStr + "\n" +"错误信息\t: " + error + "\n" +"消耗时间\t: " + (System.currentTimeMillis() - timeMillis) + "ms\n";log.info(stringBuilder);}}/*** 发送消息,用object,cn.vipthink.assets.config.MqConfig,已经做了json的配置,直接传object就行,* 如果传string,等同于序列化了两次,序列化出来的body会多一些转义字符,* MqConfig中有配置消费接收body去除转义,所以我们自身的项目可以正常接收* 但是业务方,没有一般不会配置消费接收body去除转义* @param queueEnum* @param object*/public void sendMessage(QueueEnum queueEnum, Object object) {String exchange = queueEnum.getExchange();String routeKey = queueEnum.getRouteKey();String error = "success";long timeMillis = System.currentTimeMillis();try {rabbitTemplate.convertAndSend(exchange, routeKey, object);} catch (Exception e) {log.error(e.getMessage(), e);error = e.getMessage();} finally {String stringBuilder = "Rabbit MQ 发送消息:\n" +"交换机\t: " + exchange + "\n" +"路由键\t: " + routeKey + "\n" +"消息内容\t: " + JSONUtil.toJsonStr(object) + "\n" +"错误信息\t: " + error + "\n" +"消耗时间\t: " + (System.currentTimeMillis() - timeMillis) + "ms\n";log.info(stringBuilder);}}
}
2、使用消息发送消息
//生命rabbitMQManager
@Resource
private RabbitMQManager rabbitMQManager;//在方法中调用sendMessage实现发送消息rabbitMQManager.sendMessage(QueueEnum.OPEN_HOUR_NOTIFY, messageNotifyRequestDTO.getMessageBody().toString());
3、定制Queue的枚举,注意需要区别于监听的枚举
@Getter
@AllArgsConstructor
public enum QueueEnum {/*** 开包通知mq*/OPEN_HOUR_NOTIFY("exchange.unification_assets", "open_hour_notify","queue.yx-poster.unification_assets.direct.open_hour_notify"),;private final String exchange;private final String routeKey;private final String name;}
五、可靠性消息的投递的通用方案–starter
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SALisY9a-1692633241334)(…/…/bak/md_img/image-20220316140719460.png)]
1、添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.vipthink.infra</groupId><artifactId>mq-client-starter</artifactId><version>1.1.3</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><spring.boot.version>2.1.17.RELEASE</spring.boot.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring.boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.5.2</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jdbc</artifactId><version>5.3.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>2.1.20.RELEASE</version></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>3.8.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId><version>2.3.12.RELEASE</version></dependency></dependencies><distributionManagement><repository><id>nexus-releases</id><name>Nexus Nexus Repository</name><url>http://nexus-op.vipthink.cn/repository/maven-releases</url></repository><snapshotRepository><id>nexus-snapshots</id><name>local Nexus Repository</name><url>http://nexus-op.vipthink.cn/repository/maven-snapshots</url></snapshotRepository></distributionManagement></project>
2、配置项管理
(1)项目的配置项
spring.rabbitmq.addresses=xxx
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
spring.rabbitmq.port=5672
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
(2)配置项类
@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "tx.message")
public class AppProperties {private Integer taskInterval = 60;private Integer backoffDelay = 30;private Integer backoffMultiplier = 2;private Integer maxRetry = 3;private Integer querySize = 100;private String dingTalkTitle;private String dingTalkWebhook;
}
(3)设置配置管理类
@Configuration
@EnableConfigurationProperties({AppProperties.class})
public class AppConfiguration {@Autowiredprivate AppProperties appProperties;public AppConfiguration() { }@Beanpublic MessageService messageService(JdbcTemplate jdbcTemplate) {MessageService messageService = new MessageService();messageService.setJdbcTemplate(jdbcTemplate);return messageService;}@Beanpublic MessageCallback messageCallback(MessageService messageService, RabbitTemplate rabbitTemplate) {MessageCallback messageCallback = new MessageCallback();messageCallback.setMessageService(messageService);messageCallback.setDingTalkUtil(this.dingTalkUtil());rabbitTemplate.setConfirmCallback(messageCallback);return messageCallback;}@Beanpublic MessageSender messageSender(MessageService messageService, RabbitTemplate rabbitTemplate) {MessageSenderImpl messageSenderImpl = new MessageSenderImpl();messageSenderImpl.setMessageService(messageService);messageSenderImpl.setRabbitTemplate(rabbitTemplate);messageSenderImpl.setBackoffDelay(this.appProperties.getBackoffDelay());messageSenderImpl.setBackoffMultiplier(this.appProperties.getBackoffMultiplier());messageSenderImpl.setMaxRetry(this.appProperties.getMaxRetry());return messageSenderImpl;}@Bean(initMethod = "execute")public MessageTask messageTask(MessageService messageService,RabbitTemplate rabbitTemplate, RedisTemplate redisTemplate) {MessageTask messageTask = new MessageTask();messageTask.setMessageService(messageService);messageTask.setRabbitTemplate(rabbitTemplate);messageTask.setRedisTemplate(redisTemplate);messageTask.setTaskInterval(this.appProperties.getTaskInterval());messageTask.setQuerySize(this.appProperties.getQuerySize());return messageTask;}@Beanpublic OkHttpClient okHttpClient() {OkHttpClient client = (new Builder()).build();return client;}//加载DingTalkUtil@Beanpublic DingTalkUtil dingTalkUtil() {DingTalkUtil dingTalkUtil = new DingTalkUtil();dingTalkUtil.setClient(this.okHttpClient());dingTalkUtil.setUrl(this.appProperties.getDingTalkWebhook());dingTalkUtil.setTitle(this.appProperties.getDingTalkTitle());return dingTalkUtil;}
}
(4)在META-INF/spring.factories文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.vipthink.infra.mq.AppConfiguration
3、持久化消息部分
(1)TxMessageDO持久化类以及SQL
CREATE TABLE `tx_message`(id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,message_id BIGINT UNSIGNED NOT NULL COMMENT '消息ID',send_times TINYINT NOT NULL DEFAULT 0 COMMENT '已发送次数',max_retry_times TINYINT NOT NULL DEFAULT 5 COMMENT '最大重试次数',exchange_name VARCHAR(255) NOT NULL COMMENT '交换器名',routing_key VARCHAR(255) COMMENT '路由键',content TEXT COMMENT '消息内容'next_schedule_time DATETIME NOT NULL COMMENT '下一次调度时间',message_status TINYINT NOT NULL DEFAULT 'INIT' COMMENT '消息状态 INIT:处理中 SUCCESS:成功 ,FAIL:失败 ',backoff_delay BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,单位为秒',backoff_multiplier TINYINT NOT NULL DEFAULT 2 COMMENT '退避因子',create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,deleted TINYINT NOT NULL DEFAULT 0,INDEX idx_create_time (create_time),INDEX idx_next_schedule_time (next_schedule_time),unique INDEX idx_message_id(message_id)
) COMMENT '本地消息表';
@Data
@Accessors(chain=true)
@NoArgsConstructor
public class TxMessageDO {private Long id;private String messageId;private Integer sendTimes;private Integer maxRetryTimes;private String exchangeName;private String routingKey;private String content;private LocalDateTime nextScheduleTime;private String status;private Integer backoffDelay;private Integer backoffMultiplier;private LocalDateTime createTime;private LocalDateTime updateTime;private Integer deleted;
}
(2)DAO和Service类
public class MessageService {public static final String TABLE_NAME = "tx_message";private JdbcTemplate jdbcTemplate;public MessageService() {}@Autowiredpublic void setJdbcTemplate(JdbcTemplate jdbcTemplate) {this.jdbcTemplate = jdbcTemplate;}//插入到消息表@Transactional( rollbackFor = {Exception.class})public int insert(final TxMessageDO txMessageDO) {String insertSql = "INSERT INTO tx_message(message_id,max_retry_times,exchange_name,routing_key,content,next_schedule_time,backoff_delay,backoff_multiplier) VALUES (?,?,?,?,?,?,?,?)";int i = this.jdbcTemplate.update(insertSql, new PreparedStatementSetter() {public void setValues(PreparedStatement pstmt) throws SQLException {pstmt.setString(1, txMessageDO.getMessageId());pstmt.setInt(2, txMessageDO.getMaxRetryTimes());pstmt.setString(3, txMessageDO.getExchangeName());pstmt.setString(4, txMessageDO.getRoutingKey());pstmt.setString(5, txMessageDO.getContent());pstmt.setObject(6, txMessageDO.getNextScheduleTime());pstmt.setInt(7, txMessageDO.getBackoffDelay());pstmt.setInt(8, txMessageDO.getBackoffMultiplier());}});return i;}//通过消息ID查询TxMessageDOpublic TxMessageDO getByMessageId(String messageId) {String sql = "SELECT id, message_id, send_times, max_retry_times, exchange_name, routing_key, next_schedule_time, status, backoff_delay, backoff_multiplier FROM tx_message WHERE message_id = ?";RowMapper<TxMessageDO> rowMapper = new BeanPropertyRowMapper(TxMessageDO.class);TxMessageDO txMessageDO = (TxMessageDO)this.jdbcTemplate.queryForObject(sql, rowMapper, new Object[]{messageId});return txMessageDO;}//查询所有的失败的TxMessageDOpublic List<TxMessageDO> listFail(String startTime, String endTime, Integer size) {String sql = "SELECT id, message_id, send_times, max_retry_times, exchange_name, routing_key, next_schedule_time, status, backoff_delay, backoff_multiplier,content FROM tx_message WHERE send_times<= max_retry_times AND status IN( 'INIT','FAIL') AND next_schedule_time BETWEEN ? AND ? ORDER BY ID DESC LIMIT ? ";RowMapper<TxMessageDO> rowMapper = new BeanPropertyRowMapper(TxMessageDO.class);List<TxMessageDO> txMessageList = this.jdbcTemplate.query(sql, rowMapper, new Object[]{startTime, endTime, size});return txMessageList;}//更新消息IDpublic int updateById(final TxMessageDO txMessageDO) {String sql = "UPDATE tx_message SET send_times=? ,next_schedule_time=? , status =? WHERE id =? ";int i = this.jdbcTemplate.update(sql, new PreparedStatementSetter() {public void setValues(PreparedStatement pstmt) throws SQLException {pstmt.setInt(1, txMessageDO.getSendTimes());pstmt.setObject(2, txMessageDO.getNextScheduleTime());pstmt.setString(3, txMessageDO.getStatus());pstmt.setLong(4, txMessageDO.getId());}});return i;}
}
(3)消息状态枚举
public enum MessageStatus {INIT,SUCCESS,FAIL;private MessageStatus() {}
}
4、消息回调
public class MessageCallback implements ConfirmCallback {private static final Logger log = LoggerFactory.getLogger(MessageCallback.class);private MessageService messageService;private DingTalkUtil dingTalkUtil;public MessageCallback() {}public void setMessageService(MessageService messageService) {this.messageService = messageService;}public void setDingTalkUtil(DingTalkUtil dingTalkUtil) {this.dingTalkUtil = dingTalkUtil;}//消息确认public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (correlationData != null && StringUtils.hasText(correlationData.getId())) {log.info("MessageId:{},ack:{},cause:{}", new Object[]{correlationData.getId(), ack, cause});TxMessageDO messageDO = this.messageService.getByMessageId(correlationData.getId());if (messageDO != null) {TxMessageDO txMessageDO = TxMessageDO.builder().build();txMessageDO.setId(messageDO.getId());txMessageDO.setStatus(ack ? MessageStatus.SUCCESS.name() : MessageStatus.FAIL.name());txMessageDO.setSendTimes(messageDO.getSendTimes() + 1);txMessageDO.setNextScheduleTime(messageDO.getNextScheduleTime().plusSeconds((long)(txMessageDO.getSendTimes()* messageDO.getBackoffDelay() * messageDO.getBackoffDelay())));this.messageService.updateById(txMessageDO);if (txMessageDO.getSendTimes() > messageDO.getMaxRetryTimes() && !ack) {this.dingTalkUtil.send("消息ID:" + txMessageDO.getMessageId() + " 达到最大发送次数:" + messageDO.getMaxRetryTimes());}}}}
}
5、消息发送
(1)消息发送接口
public interface MessageSender {//消息发送void send(Message message);//发送延迟消息void sendDelay(Message message, LocalDateTime delayTime);
}
(2)消息发送实现类
public class MessageSenderImpl implements MessageSender {private RabbitTemplate rabbitTemplate;private MessageService messageService;//最大重试次数private Integer maxRetry;//延迟消息private Integer backoffDelay;private Integer backoffMultiplier;public MessageSenderImpl() {}public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void setMessageService(MessageService messageService) {this.messageService = messageService;}public void setMaxRetry(Integer maxRetry) {this.maxRetry = maxRetry;}public void setBackoffDelay(Integer backoffDelay) {this.backoffDelay = backoffDelay;}public void setBackoffMultiplier(Integer backoffMultiplier) {this.backoffMultiplier = backoffMultiplier;}//消息发送public void send(final Message message) {this.check(message);TxMessageDO txMessageDO = this.convert(message);this.messageService.insert(txMessageDO);boolean isSynchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();if (!isSynchronizationActive) {this.sendMsg(message);} else {TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {public void afterCommit() {MessageSenderImpl.this.sendMsg(message);}});}}//发送消息private void sendMsg(Message message) {CorrelationData correlationData = new CorrelationData();correlationData.setId(message.getMessageId());this.rabbitTemplate.convertAndSend(message.getExchange(), message.getRoutingKey(),message.getMessage(),correlationData);}//发送延迟消息public void sendDelay(Message message, LocalDateTime delayTime) {this.check(message);Assert.notNull(delayTime, "messageId can not be null");TxMessageDO txMessageDO = this.convert(message);txMessageDO.setNextScheduleTime(delayTime);this.messageService.insert(txMessageDO);}//消息转换成DOprivate TxMessageDO convert(Message message) {TxMessageDO txMessageDO = TxMessageDO.builder().build().setMessageId(message.getMessageId()).setContent(message.getMessage()).setExchangeName(message.getExchange()).setRoutingKey(message.getRoutingKey()).setBackoffDelay(this.backoffDelay).setBackoffMultiplier(this.backoffMultiplier).setMaxRetryTimes(this.maxRetry).setNextScheduleTime(LocalDateTime.now());return txMessageDO;}//检查private void check(Message message) {Assert.notNull(message, "message can not be null");Assert.notNull(message.getMessageId(), "messageId can not be null");Assert.notNull(message.getMessage(), "message can not be null");Assert.notNull(message.getExchange(), "exchange can not be null");Assert.notNull(message.getRoutingKey(), "routingKey can not be null");}
}
6、消息监听AOP类
@Aspect
public class MessageListnerAspct {public MessageListnerAspct() {}@Pointcut("execution(* cn.vipthink.orderfeign.*Feign*.*(..)) ")public void oldOrderFeignAspect() {}@Around("oldOrderFeignAspect()")public Object oldOrderFeignAspect(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {//获取渠道ObjObject channelObj = Arrays.stream(proceedingJoinPoint.getArgs()).filter((s) -> {return s instanceof Channel;}).findAny().orElse((Object)null);//获取消息ObjectObject messageObj = Arrays.stream(proceedingJoinPoint.getArgs()).filter((s) -> {return s instanceof Message;}).findAny().orElse((Object)null);if (channelObj != null) {Channel var4 = (Channel)channelObj;}if (channelObj != null && messageObj != null) {Message message = (Message)messageObj;String envIdHeader = (String)message.getMessageProperties().getHeader("envId");String envId = (String)System.getenv().get("envId");if (StringUtils.hasText(envIdHeader) && envIdHeader.equals(envId)) {return proceedingJoinPoint.proceed();} else if (message.getMessageProperties().getRedelivered()) {return proceedingJoinPoint.proceed();} else {Channel channel = (Channel)channelObj;channel.basicRecover(true);return null;}} else {return proceedingJoinPoint.proceed();}}
}
7、引入将starter打包成为POM
(1)引入POM包
<dependency><groupId>cn.vipthink.infra</groupId><artifactId>mq-client-starter</artifactId><version>1.1.3</version>
</dependency>
(2)添加配置
spring.rabbitmq.addresses=xxx
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxx
spring.rabbitmq.port=5672
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
(3)使用MessageSender发送
@Autowired
private MessageSender messageSender;Message message = new Message();
messageSender.send(message)
代码参考:https://www.iocoder.cn/Spring-Boot/RabbitMQ/?github