1 导入maven依赖jar包
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2 配置application.properties
#mq
spring.rabbitmq.host=192.168.43.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=my_vhost# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirm-type=correlated
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
3 配置普通交换机,队列,消息生成者,消费者
配置交换机和队列进行绑定
package com.example.springbootdemo.rabbitmq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;@Component
@Slf4j
@Configuration
public class RabbitmqTest {public static final String STATION_EXCHANGE = "station-exchange";public static final String STATION_QUEUE_11 = "station-queue-11";public static final String STATION_QUEUE_197 = "station-queue-197";@Beanpublic Binding testBinging(TopicExchange exchange,Queue testQueue){return BindingBuilder.bind(testQueue).to(exchange).with("#");}@Beanpublic TopicExchange stationExchange(){return new TopicExchange(STATION_EXCHANGE,true,false,null);}@Beanpublic Queue stationQueue(){return new Queue(STATION_QUEUE_11,true,false,false,null);}@Beanpublic Queue stationQueue197(){return new Queue(STATION_QUEUE_197,true,false,false,null);}@Beanpublic Binding stationBinding(TopicExchange stationExchange,Queue stationQueue){return BindingBuilder.bind(stationQueue).to(stationExchange).with("11");}@Beanpublic Binding stationBinding2(TopicExchange stationExchange,Queue stationQueue197){return BindingBuilder.bind(stationQueue197).to(stationExchange).with("197");}
}
配置消费者
package com.example.springbootdemo.listener;import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class StationListener {@RabbitListener(queues = RabbitmqTest.STATION_QUEUE_11)public void station11(Message message){log.info("11监听到消息: {}",new String(message.getBody()));}@RabbitListener(queues = RabbitmqTest.STATION_QUEUE_197)public void station197(Message message){log.info("197监听到消息: {}",new String(message.getBody()));}
}
消息生产者
package com.example.springbootdemo.rabbitmq.production;import com.example.springbootdemo.config.rabbitmq.RabbitmqTemplateConfirm;
import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
@Slf4j
public class TestMessageProduction {@Resourceprivate RabbitmqTemplateConfirm rabbitmqTemplateConfirm;public void testMessage(String key){log.info("消息发送确认");RabbitTemplate rabbitTemplate = rabbitmqTemplateConfirm.getRabbitTemplate();rabbitTemplate.convertAndSend(RabbitmqTest.STATION_EXCHANGE,key,"消息发送者确认");}
}
4 配置延时交换机和队列,生产者,消费者
配置延时交换机和队列
package com.example.springbootdemo.rabbitmq.config;import com.google.common.collect.Maps;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class PayMessageMq {public static final String ONLINE_PAY_EXCHANGE = "online-pay-exchange";public static final String ONLINE_PAY_QUEUE = "online-pay-queue";public static final String REFUND_EXCHANGE = "refund-exchange";public static final String REFUND_QUEUE = "refund-queue";@Beanpublic CustomExchange onlinePayExchange(){Map<String, Object> args = Maps.newHashMap();//自定义交换机的类型,指定分发方式args.put("x-delayed-type", "topic");//此处type指定为延迟交换机return new CustomExchange(ONLINE_PAY_EXCHANGE, "x-delayed-message", true, false, args);}@Beanpublic Queue onlinePayQueue(){return new Queue(ONLINE_PAY_QUEUE,true,false,false);}@Beanpublic Binding onlineBinding(Queue onlinePayQueue, CustomExchange onlinePayExchange){return BindingBuilder.bind(onlinePayQueue).to(onlinePayExchange).with("#").noargs();}@Beanpublic CustomExchange refundExchange(){Map<String,Object> args = Maps.newHashMap();args.put("x-delayed-type", "topic");return new CustomExchange(REFUND_EXCHANGE, "x-delayed-message", true, false, args);}@Beanpublic Queue refundQueue(){return new Queue(REFUND_QUEUE, true,false,false);}@Beanpublic Binding refundBinding(CustomExchange refundExchange,Queue refundQueue){return BindingBuilder.bind(refundQueue).to(refundExchange).with("#").noargs();}
}
配置消费者
package com.example.springbootdemo.listener;import com.example.springbootdemo.application.pay.OnlinePay;
import com.example.springbootdemo.application.pay.ali.response.QueryAliPayResponse;
import com.example.springbootdemo.entity.Order;
import com.example.springbootdemo.entity.PaymentRecord;
import com.example.springbootdemo.entity.dict.PaymentRecordEnumStatus;
import com.example.springbootdemo.rabbitmq.config.PayMessageMq;import com.example.springbootdemo.rabbitmq.entity.QueryPayMessageEvent;
import com.example.springbootdemo.rabbitmq.entity.RefundMessageEvent;
import com.example.springbootdemo.rabbitmq.production.MessageProduction;
import com.example.springbootdemo.service.PaymentRecordService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.util.Objects;@Slf4j
@Component
public class OnlinePayListener {@Resourceprivate OnlinePay onlinePay;@Resourceprivate ObjectMapper objectMapper;@Resourceprivate PaymentRecordService paymentRecordService;@Resourceprivate MessageProduction messageProduction;@RabbitListener(queues = PayMessageMq.ONLINE_PAY_QUEUE)public void queryPay(@Payload QueryPayMessageEvent event) throws JsonProcessingException {if(Objects.nonNull(event) && event.getQueryNum() <= 16) {log.info("查询支付结果orderCod: {}", event.getOrderCode());String payStatus = onlinePay.queryPayStatus(Order.builder().code(event.getOrderCode()).build());QueryAliPayResponse queryAliPayResponse = objectMapper.readValue(payStatus, QueryAliPayResponse.class);QueryAliPayResponse.AlipayTradeQueryResponse alipayTradeQueryResponse = queryAliPayResponse.getAlipay_trade_query_response();if(Objects.nonNull(alipayTradeQueryResponse) && alipayTradeQueryResponse.getCode().equals("10000") && alipayTradeQueryResponse.getTrade_status().equals("TRADE_SUCCESS")){PaymentRecord paymentRecord = paymentRecordService.lambdaQuery().eq(PaymentRecord::getOrderCode, event.getOrderCode()).one();paymentRecord.setTradeNo(alipayTradeQueryResponse.getTrade_no());paymentRecord.setStatus(PaymentRecordEnumStatus.SUCCESS);paymentRecordService.updateById(paymentRecord);}else {event.setQueryNum(event.getQueryNum()+1);messageProduction.queryPayRecord(event,59*1000);}log.info("支付结果: {}", payStatus);}}public void queryRefund(@Payload RefundMessageEvent event){if(Objects.nonNull(event) && event.getQueryNum() <= 16) {log.info("查询退款结果orderCode: {}", event.getOrderCode());String queryRefund = onlinePay.queryRefund(event.getOrderCode(), event.getOutRequestNo(),event.getTradeNo());System.out.println(queryRefund);}else {}}
}
生产者
package com.example.springbootdemo.rabbitmq.production;import com.example.springbootdemo.rabbitmq.config.PayMessageMq;
import com.example.springbootdemo.rabbitmq.entity.QueryPayMessageEvent;
import com.example.springbootdemo.rabbitmq.entity.RefundMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Slf4j
@Component
public class MessageProduction {@Resourceprivate RabbitTemplate rabbitTemplate;public void queryPayRecord(QueryPayMessageEvent event,Integer delayTime){log.info("发送查询支付结果事件:订单号: {} ",event.getOrderCode());rabbitTemplate.convertAndSend(PayMessageMq.ONLINE_PAY_EXCHANGE,"pay",event,correlationData->{correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});log.info("发送查询支付结果事件完成");}public void queryRefund(RefundMessageEvent event,Integer delayTime){log.info("发送查询退款事件: {}",event.getOrderCode());rabbitTemplate.convertAndSend(PayMessageMq.REFUND_EXCHANGE,"hello",event,corn->{corn.getMessageProperties().setDelay(delayTime);return corn;});log.info("发送查询退款事件完成");}
}
5 配置某些交换机,队列进行消息发布的确认
- 在application.properties配置:
发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirm-type=correlated
发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
- 代码配置如下
package com.example.springbootdemo.config.rabbitmq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Objects;/****发送者开启 confirm 确认机制*/
@Slf4j
@Component
public class RabbitmqTemplateConfirm implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.error("confirm==>发送到broker失败\r\n" +"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",correlationData, ack, cause);} else {log.info("confirm==>发送到broker成功\r\n" +"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",correlationData, ack, cause);}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText,String exchange, String routingKey) {log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +"replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",message, replyCode, replyText, exchange, routingKey);}private RabbitTemplate rabbitTemplate;/* 连接工厂 */@Resourceprivate ConnectionFactory connectionFactory;public RabbitTemplate getRabbitTemplate(){if(Objects.isNull(rabbitTemplate)){rabbitTemplate=new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}return rabbitTemplate;}
}
package com.example.springbootdemo.rabbitmq.production;import com.example.springbootdemo.config.rabbitmq.RabbitmqTemplateConfirm;
import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
@Slf4j
public class TestMessageProduction {@Resourceprivate RabbitmqTemplateConfirm rabbitmqTemplateConfirm;public void testMessage(String key){log.info("消息发送确认");RabbitTemplate rabbitTemplate = rabbitmqTemplateConfirm.getRabbitTemplate();rabbitTemplate.convertAndSend(RabbitmqTest.STATION_EXCHANGE,key,"消息发送者确认");}
}
6 对某些队列进行手动ack
设置消费端手动 ack none不确认 auto自动确认 manual手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
该配置是全局的,如果只对某些队列进行手动需要进行如下配置
package com.example.springbootdemo.config.rabbitmq;import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/**** rabbitmq对单个队列消费这设置手动ack*/
@Slf4j
@Configuration
public class RabbitmqConsumerAck {@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(RabbitmqTest.STATION_QUEUE_197);container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {log.info(RabbitmqTest.STATION_QUEUE_197 + "get msg:" +new String(message.getBody()));if(message.getMessageProperties().getHeaders().get("error") == null){// 消息手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);log.info("消息确认");}else {// 消息重新回到队列//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);// 拒绝消息(删除)channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);log.info("消息拒绝");}});return container;}
}
7 rabbitmq其他配置
##发送重试配置
#启用发送重试
#spring.rabbitmq.template.retry.enabled=true
#最大重试次数
#spring.rabbitmq.template.retry.max-attempts=5
#第一次和第二次尝试发布或传递消息之间的间隔
#spring.rabbitmq.template.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数 步长
#spring.rabbitmq.template.retry.multiplier=2
#最大重试时间间隔
#spring.rabbitmq.template.retry.max-interval=10000ms