springboot 集成rabbitmq

news/2024/10/31 7:30:41/

1 导入maven依赖jar包

 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2 配置application.properties


#mq
spring.rabbitmq.host=192.168.43.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=my_vhost# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirm-type=correlated
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true

3 配置普通交换机,队列,消息生成者,消费者

配置交换机和队列进行绑定

package com.example.springbootdemo.rabbitmq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;@Component
@Slf4j
@Configuration
public class RabbitmqTest {public static final String STATION_EXCHANGE = "station-exchange";public static final String STATION_QUEUE_11 = "station-queue-11";public static final String STATION_QUEUE_197 = "station-queue-197";@Beanpublic Binding testBinging(TopicExchange exchange,Queue testQueue){return BindingBuilder.bind(testQueue).to(exchange).with("#");}@Beanpublic TopicExchange stationExchange(){return new TopicExchange(STATION_EXCHANGE,true,false,null);}@Beanpublic Queue stationQueue(){return new Queue(STATION_QUEUE_11,true,false,false,null);}@Beanpublic Queue stationQueue197(){return new Queue(STATION_QUEUE_197,true,false,false,null);}@Beanpublic Binding stationBinding(TopicExchange stationExchange,Queue stationQueue){return BindingBuilder.bind(stationQueue).to(stationExchange).with("11");}@Beanpublic Binding stationBinding2(TopicExchange stationExchange,Queue stationQueue197){return BindingBuilder.bind(stationQueue197).to(stationExchange).with("197");}
}

配置消费者

package com.example.springbootdemo.listener;import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class StationListener {@RabbitListener(queues = RabbitmqTest.STATION_QUEUE_11)public void station11(Message message){log.info("11监听到消息: {}",new String(message.getBody()));}@RabbitListener(queues = RabbitmqTest.STATION_QUEUE_197)public void station197(Message message){log.info("197监听到消息: {}",new String(message.getBody()));}
}

消息生产者

package com.example.springbootdemo.rabbitmq.production;import com.example.springbootdemo.config.rabbitmq.RabbitmqTemplateConfirm;
import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
@Slf4j
public class TestMessageProduction {@Resourceprivate RabbitmqTemplateConfirm rabbitmqTemplateConfirm;public void testMessage(String key){log.info("消息发送确认");RabbitTemplate rabbitTemplate = rabbitmqTemplateConfirm.getRabbitTemplate();rabbitTemplate.convertAndSend(RabbitmqTest.STATION_EXCHANGE,key,"消息发送者确认");}
}

4 配置延时交换机和队列,生产者,消费者

配置延时交换机和队列

package com.example.springbootdemo.rabbitmq.config;import com.google.common.collect.Maps;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class PayMessageMq {public static final String ONLINE_PAY_EXCHANGE = "online-pay-exchange";public static final String ONLINE_PAY_QUEUE = "online-pay-queue";public static final String REFUND_EXCHANGE = "refund-exchange";public static final String REFUND_QUEUE = "refund-queue";@Beanpublic CustomExchange onlinePayExchange(){Map<String, Object> args = Maps.newHashMap();//自定义交换机的类型,指定分发方式args.put("x-delayed-type", "topic");//此处type指定为延迟交换机return new CustomExchange(ONLINE_PAY_EXCHANGE, "x-delayed-message", true, false, args);}@Beanpublic Queue onlinePayQueue(){return new Queue(ONLINE_PAY_QUEUE,true,false,false);}@Beanpublic Binding onlineBinding(Queue onlinePayQueue, CustomExchange onlinePayExchange){return BindingBuilder.bind(onlinePayQueue).to(onlinePayExchange).with("#").noargs();}@Beanpublic CustomExchange refundExchange(){Map<String,Object> args = Maps.newHashMap();args.put("x-delayed-type", "topic");return new CustomExchange(REFUND_EXCHANGE, "x-delayed-message", true, false, args);}@Beanpublic Queue refundQueue(){return new Queue(REFUND_QUEUE, true,false,false);}@Beanpublic Binding refundBinding(CustomExchange refundExchange,Queue refundQueue){return BindingBuilder.bind(refundQueue).to(refundExchange).with("#").noargs();}
}

配置消费者

package com.example.springbootdemo.listener;import com.example.springbootdemo.application.pay.OnlinePay;
import com.example.springbootdemo.application.pay.ali.response.QueryAliPayResponse;
import com.example.springbootdemo.entity.Order;
import com.example.springbootdemo.entity.PaymentRecord;
import com.example.springbootdemo.entity.dict.PaymentRecordEnumStatus;
import com.example.springbootdemo.rabbitmq.config.PayMessageMq;import com.example.springbootdemo.rabbitmq.entity.QueryPayMessageEvent;
import com.example.springbootdemo.rabbitmq.entity.RefundMessageEvent;
import com.example.springbootdemo.rabbitmq.production.MessageProduction;
import com.example.springbootdemo.service.PaymentRecordService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.util.Objects;@Slf4j
@Component
public class OnlinePayListener {@Resourceprivate OnlinePay onlinePay;@Resourceprivate ObjectMapper objectMapper;@Resourceprivate PaymentRecordService paymentRecordService;@Resourceprivate MessageProduction messageProduction;@RabbitListener(queues = PayMessageMq.ONLINE_PAY_QUEUE)public void queryPay(@Payload QueryPayMessageEvent event) throws JsonProcessingException {if(Objects.nonNull(event) && event.getQueryNum() <= 16) {log.info("查询支付结果orderCod: {}", event.getOrderCode());String payStatus = onlinePay.queryPayStatus(Order.builder().code(event.getOrderCode()).build());QueryAliPayResponse queryAliPayResponse = objectMapper.readValue(payStatus, QueryAliPayResponse.class);QueryAliPayResponse.AlipayTradeQueryResponse alipayTradeQueryResponse = queryAliPayResponse.getAlipay_trade_query_response();if(Objects.nonNull(alipayTradeQueryResponse) && alipayTradeQueryResponse.getCode().equals("10000") && alipayTradeQueryResponse.getTrade_status().equals("TRADE_SUCCESS")){PaymentRecord paymentRecord = paymentRecordService.lambdaQuery().eq(PaymentRecord::getOrderCode, event.getOrderCode()).one();paymentRecord.setTradeNo(alipayTradeQueryResponse.getTrade_no());paymentRecord.setStatus(PaymentRecordEnumStatus.SUCCESS);paymentRecordService.updateById(paymentRecord);}else {event.setQueryNum(event.getQueryNum()+1);messageProduction.queryPayRecord(event,59*1000);}log.info("支付结果: {}", payStatus);}}public void queryRefund(@Payload RefundMessageEvent event){if(Objects.nonNull(event) && event.getQueryNum() <= 16) {log.info("查询退款结果orderCode: {}", event.getOrderCode());String queryRefund = onlinePay.queryRefund(event.getOrderCode(), event.getOutRequestNo(),event.getTradeNo());System.out.println(queryRefund);}else {}}
}

生产者

package com.example.springbootdemo.rabbitmq.production;import com.example.springbootdemo.rabbitmq.config.PayMessageMq;
import com.example.springbootdemo.rabbitmq.entity.QueryPayMessageEvent;
import com.example.springbootdemo.rabbitmq.entity.RefundMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Slf4j
@Component
public class MessageProduction {@Resourceprivate RabbitTemplate rabbitTemplate;public void queryPayRecord(QueryPayMessageEvent event,Integer delayTime){log.info("发送查询支付结果事件:订单号: {} ",event.getOrderCode());rabbitTemplate.convertAndSend(PayMessageMq.ONLINE_PAY_EXCHANGE,"pay",event,correlationData->{correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});log.info("发送查询支付结果事件完成");}public void queryRefund(RefundMessageEvent event,Integer delayTime){log.info("发送查询退款事件: {}",event.getOrderCode());rabbitTemplate.convertAndSend(PayMessageMq.REFUND_EXCHANGE,"hello",event,corn->{corn.getMessageProperties().setDelay(delayTime);return corn;});log.info("发送查询退款事件完成");}
}

5 配置某些交换机,队列进行消息发布的确认

  • 在application.properties配置:

发送者开启 confirm 确认机制

spring.rabbitmq.publisher-confirm-type=correlated

发送者开启 return 确认机制

spring.rabbitmq.publisher-returns=true

  • 代码配置如下
package com.example.springbootdemo.config.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Objects;/****发送者开启 confirm 确认机制*/
@Slf4j
@Component
public class RabbitmqTemplateConfirm implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.error("confirm==>发送到broker失败\r\n" +"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",correlationData, ack, cause);} else {log.info("confirm==>发送到broker成功\r\n" +"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",correlationData, ack, cause);}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText,String exchange, String routingKey) {log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +"replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",message, replyCode, replyText, exchange, routingKey);}private RabbitTemplate rabbitTemplate;/* 连接工厂 */@Resourceprivate ConnectionFactory connectionFactory;public RabbitTemplate getRabbitTemplate(){if(Objects.isNull(rabbitTemplate)){rabbitTemplate=new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}return rabbitTemplate;}
}
package com.example.springbootdemo.rabbitmq.production;import com.example.springbootdemo.config.rabbitmq.RabbitmqTemplateConfirm;
import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
@Slf4j
public class TestMessageProduction {@Resourceprivate RabbitmqTemplateConfirm rabbitmqTemplateConfirm;public void testMessage(String key){log.info("消息发送确认");RabbitTemplate rabbitTemplate = rabbitmqTemplateConfirm.getRabbitTemplate();rabbitTemplate.convertAndSend(RabbitmqTest.STATION_EXCHANGE,key,"消息发送者确认");}
}

6 对某些队列进行手动ack

设置消费端手动 ack none不确认 auto自动确认 manual手动确认

spring.rabbitmq.listener.simple.acknowledge-mode=manual
该配置是全局的,如果只对某些队列进行手动需要进行如下配置

package com.example.springbootdemo.config.rabbitmq;import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/**** rabbitmq对单个队列消费这设置手动ack*/
@Slf4j
@Configuration
public class RabbitmqConsumerAck {@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(RabbitmqTest.STATION_QUEUE_197);container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {log.info(RabbitmqTest.STATION_QUEUE_197 + "get msg:" +new String(message.getBody()));if(message.getMessageProperties().getHeaders().get("error") == null){// 消息手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);log.info("消息确认");}else {// 消息重新回到队列//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);// 拒绝消息(删除)channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);log.info("消息拒绝");}});return container;}
}

7 rabbitmq其他配置


##发送重试配置
#启用发送重试
#spring.rabbitmq.template.retry.enabled=true
#最大重试次数
#spring.rabbitmq.template.retry.max-attempts=5
#第一次和第二次尝试发布或传递消息之间的间隔
#spring.rabbitmq.template.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数 步长
#spring.rabbitmq.template.retry.multiplier=2
#最大重试时间间隔
#spring.rabbitmq.template.retry.max-interval=10000ms

http://www.ppmy.cn/news/21362.html

相关文章

这就是传说中超难的N皇后?——详细图解!

✔️本文主题&#xff1a;回溯算法之N皇后 算法 ✔️题目链接&#xff1a;N皇后 详解N皇后一、前言二、题目信息三、解题思路四、参考代码五、结语一、前言 大家好久不见&#xff0c;今天我们一起来学习一道很经典、也很有难度的一道题目——N皇后 二、题目信息 按照国际象棋…

Git提交后代码后修改commit信息

文章目录1. $ git rebase -i HEAD~n2. 执行后显示近n次commit信息3.执行 git commit --amend后会跳出编辑器4.执行$ git rebase --continue修改最近n次提交1. $ git rebase -i HEAD~n 例如&#xff1a;要修改近三次提交&#xff0c;git rebase -i HEAD~3 2. 执行后显示近n次c…

【计网】ip地址和子网掩码

ip和子网掩码两者基本作用ip地址ip地址范围ip地址组成和子网掩码子网掩码为什么需要子网掩码总结两者基本作用 ip地址为接入网络的节点的地址&#xff0c;分为网络号和主机号。 子网掩码则是用来定义/判断ip地址的网络号和主机号。 ip地址 ip地址范围 用我们最常见的ip地址…

Java版数据结构与算法笔记

文章目录一、数据结构与算法概述及题目1、数据结构和算法的关系2、线性结构与非线性结构Ⅰ-线性结构Ⅱ-非线性结构3、经典面试题Ⅰ-字符串匹配问题&#xff1a;Ⅱ-汉诺塔游戏Ⅲ-八皇后问题:Ⅳ-马踏棋盘算法4、几个实际编程中遇到的问题Ⅰ-字符串替换问题Ⅱ-一个五子棋程序Ⅲ-约…

【算法】【位运算模块】使用位运算完成整数的加减乘除

目录前言问题介绍解决方案代码编写java语言版本c语言版本c语言版本思考感悟写在最后前言 当前所有算法都使用测试用例运行过&#xff0c;但是不保证100%的测试用例&#xff0c;如果存在问题务必联系批评指正~ 在此感谢左大神让我对算法有了新的感悟认识&#xff01; 问题介绍 …

感谢第三弹 | 开启地铁国产化浪潮 GBASE获多方城市“地下动脉”肯定

岁末年初&#xff0c;GBASE收到了来自深圳地铁、高新现代智能系统股份有限公司、深圳达实智能股份有限公司等客户及合作伙伴发来的荣誉证书及感谢信。作为亲密无间的战友&#xff0c;GBASE携手高新现代、达实智能在深圳地铁CLC、ACC、AFC多个条线项目中通力合作&#xff0c;助力…

Cesium 内置变量、常量、函数-Shader

内置uniform 内置uniform主要置于AutomaticUniforms类里面,该类私有未开放文档。 czm_backgroundColor代表当前场景背景颜色的自动GLSL制服。 例: // GLSL声明 统一vec4 czm_backgroundColor; //示例:如果给定颜色的RGB与背景颜色匹配,则将其反转。 vec4 AdjustColorForCo…

【关于Linux中----信号】

文章目录一、信号入门1.1 信号概念1.2 用 kill-l命令查看信号列表1.3 信号处理常见方式预览二、产生信号2.1 通过终端按键产生信号2.2 由于程序中存在异常产生信号2.3 系统接口调用产生信号2.4 软件条件产生信号三、阻塞信号3.1 信号相关常见概念补充3.2 在内核中的表示3.3 sig…