关闭超时订单
创建订单之后的一段时间内未完成支付而关闭订单的操作,该功能一般要求每笔订单的超时时间是一致的
TTL(Time To Live)存活时间,只能被设置为某个固定的值,不能更改,否则抛出异常
死信(当消息达到过期时间还没有被消费,那么该消息就“死了”)
消息变为死信的条件:
- 消息被拒绝,并且requeue=false
- 消息的过期时间到期了
- 队列达到最大长度
死信交换机:Dead-Letter-Exchange,简称 DLX
- 当消息在一个队列中变成死信之后,如果这个消息所在的队列设置了 x-dead-letter-exchange参数,那么它会被发送到 x-dead-letter-exchange对应值的交换机上,这个交换机就称之为死信交换机,与这个死信交换器绑定的队列就是死信队列。
- x-dead-letter-exchange:出现死信之后将死信重新发送到指定交换机
- x-dead-letter-routing-key:出现死信之后将死信重新按照指定的 routing-key 发送,如果不设置默认使用消息本身的 routing-key
- 生产者发送带有ttl的消息放入交换机路由到延时队列
- 延时队列绑定死信交换机与死信转发的routing-key
- 等延时队列中的消息达到延时时间之后变成死信转发到死信交换机并路由到死信队列中
- 最后供消费者消费
①、配置类
@Configuration
public class DelayQueueRabbitConfig{public static final String DLX_QUEUE = "queue.dlx";//死信队列public static final String DLX_EXCHANGE = "exchange.dlx";//死信交换机public static final String DLX_ROUTING_KEY = "routingkey.dlx";//死信队列与死信交换机绑定的routing-keypublic static final String ORDER_QUEUE = "queue.order";//订单的延时队列public static final String ORDER_EXCHANGE = "exchange.order";//订单交换机public static final String ORDER_ROUTING_KEY = "routingkey.order";//延时队列与订单交换机绑定的routing-key//定义死信队列@Beanpublic Queue dlxQueue(){return new Queue(DLX_queue,true);}//定义死信交换机@Beanpublic DirectExchange dlxExchange(){return new DirectExchange(DLX_EXCHANGE,true,false);}//死信交换机和死信队列绑定@BeanBinding bindingDLX(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}//订单延时队列//设置队列里的死信转发到的DLX名称,设置死信在转发时携带的routing-key名称@Beanpublic Queue orderQueue(){Map<String,Object> params = new HashMap<>();params.put("x-dead-letter-exchange", DLX_EXCHANGE);params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);return new Queue(ORDER_QUEUE, true, false, false, params);}//订单交换机@Beanpublic DirectExchange orderExchange(){return new DirectExchange(ORDER_EXCHANGE,true,false);}//把订单队列和订单交换机绑定一块@Beanpublic Binding orderBinding(){return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);}
}
②、发送消息
@RequestMapping("/order")
public class OrderSendMessageController{@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMessage")public String sendMessage(){String delayTime = "10000";rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, DelayQueueRabbitConfig.ORDER_ROUTING_KEY,"发送消息!",message->{message.getMessageProperties().setExpiration(delayTime);return message;});return "ok";}
}
③、消费消息
@Component
@RabbitListener(queues=DelayQueueRabbitConfig.DLX_QUEUE)//监控死信队列
public class OrderMQReciever{@RabbitHandlerpublic void process(String message){System.out.println("OrderMQReciever接收到的消息是:"+ message);}
}
测试:
通过调用接口,发现 10 秒之后才会消费消息
七天自动确认收货
签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家
这个过程持续七天,就是使用了消息中间件的延迟推送功能
传统解决方案:
- 使用 Redis 给订单设置过期时间,最后通过判断 Redis 中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低,因为我们知道 Redis 都是存储于内存中,我们遇到恶意下单或者刷单的将会给内存带来巨大压力;
- 使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了 IO 次数,性能极低;
- 使用 JVM 原生的 DelayQueue ,也是大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失订单信息。
还是通过死信队列 + TTL过期时间来实现延迟队列:
①、在 RabbitMQ 3.6.x 开始,RabbitMQ 官方提供了延迟队列的插件
插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
下载放置到 RabbitMQ 根目录下的 plugins 下
②、创建交换机和消息队列
@Configuration
public class MQConfig{public static final String LAZY_EXCHANGE = "Ex.LazyExchange";public static final String LAZY_QUEUE = "MQ.LazyQueue";public static final String LAZY_KEY = "lazy.#";@Beanpublic TopicExchange lazyExchange(){//Map<String, Object> pros = new HashMap<>();//设置交换机支持延迟消息推送//pros.put("x-delayed-message", "topic");TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE,true,false,pros);exchange.setDelayed(true);//来开启延迟队列return exchange;}@Beanpublic Queue lazyQueue(){return new Queue(LAZY_QUEUE,true);}@Beanpublic Binding lazyBinding(){return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);}
}
③、发送方
@Component
public class MQSender{@Autowiredprivate RabbitTemplate rabbitTemplate;/**指定延迟推送时间传入参数 new MessagePostProcessor() 是为了获得 Message对象,因为需要借助 Message对象的API 来设置延迟时间*/public void sendLazy(Object message){//confirmCallback returnCallback 代码省略rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(cofirmCallback);rabbitTemplate.setReturnCallback(returnCallback);//id+时间戳 全局唯一CorrelationData correlationData = new CorrelationData("12345678909" + new Date());rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot",new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//message.getMessageProperties().setHeader("x-delay", "6000");message.getMessageProperties().setDelay(6000);return message;}},correlationData);}
}
④、消费方
@Component
public class MQReceiver{@RabbitListener(queues = "MQ.lazyQueue")@RabbitHandlerpublic void onLazyMessage(Message msg, Channel channel)throws IOException{long deliveryTag = msg.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag, true);System.out.println("lazy receive " + new String(msg.getBody()));}
}
⑤、测试
在 6 秒后收到了消息 “lazy receive hello spring boot:”
@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest{@Autowiredprivate MQSender mqSender;@Testpublic void sendLazy()throws Exception{String msg = "hello spring boot";mqSender.sendLay(msg + ":");}
}
RabbitMQ规范
1. 一个 RabbitMQ 应用里建立多个 vhost,去对应不同的开发项目
不同的 vhost 对应不同的项目,拥有自己的队列、绑定、交换器和权限控制
当在 RabbitMQ 中创建一个用户时,用户通常会被指派给至少一个 vhost,并且只能访问被指派 vhost 内的队列、交换器和绑定,vhost 之间是绝对隔离的。
现在的状况是大部分使用 RabbitMQ 的技术团队往往就使用默认的 vhost:“/”,如果多出一个项目了,就再去创建一个 RabbitMQ 的进程,这样做,非常浪费开发资源。
推荐一个项目对应一个 vhost
2. 不直接使用 RabbitMQ 自己的客户端
很多公司使用 RabbitMQ 都是直接使用 RabbitMQ 自己的 Java 版本客户端
由于 RabbitMQ 本身内在的复杂性和多样性,有很多技术细节需要独自处理,比如网络连接的处理,比如异常的处理,比如消息失败的处理等等等。这些,如果手头没有一套成熟的框架,那么很可能由于一些细节处理不到位,导致非常多的问题,这都是不必要的成本。
所以,要么使用一套已有的 RabbitMQ 客户端框架(比如 Spring 的 RabbitMQ 框架),要么自己封装出一套底层 RabbitMQ 客户端框架,而不是单独使用 RabbitMQ 的客户端
3. 无论如何消费者必须给回 ACK 响应
ACK 机制就是消费者从 RabbitMQ 收到消息并处理完成后,反馈给 RabbitMQ,然后 RabbitMQ 收到反馈后才将此消息从队列中删除。
由于 ACK 机制本身必须回复给 RabbitMQ,消息才会丢弃这个特点。对于何时给 ACK,我们做开发的时候一定要在开发项目前提前规划好、设计好。
使用 RabbitMQ 通常不想在收到消息就立即给回 ACK 的,也不会设置 autoACK 机制即消费端收到自动返回一个 ACK 响应。一般来讲,我们都会根据业务逻辑的不同,会在不同的位置手动返回 ACK。
这时候,就可能出现问题:当收到消息,有时候处理业务逻辑报错了,往往在处理完业务逻辑就会忽略 ACK,这会导致消息始终卡死在 queue 里……如果数量越来越多,后续处理非常麻烦。
4. 考虑设置 dead letter exchanges
有时候消息投递出错,并不总是在应用接收的时候出了问题,会有很多非应用的问题。比如:
- 消费端有问题,发出的消息被拒绝了。并且我们也设置了 requeue=false;
- 消息可能因为没有收到 ACK 超时被删除,或者消费端消费速度跟不上导致消息超时被删除;
- 消息数量超过了队列最大长度限制被抛弃;
- 消息总大小超过了队列消息总大小限制被抛弃。
对于这些问题,设置 dead letter exchanges 算是一个解决办法。
当消息一旦出现我上面列举出来的情况,就会被发送到我们设置的 dead letter exchanges。然后我们就可以对这些特殊情况的消息进行单独处理,这样的做法可以让我们的项目更健壮,更容易追踪问题。
5. 尽量使用 Direct Exchange
RabbitMQ 的Exchange 就是消息交换机,它指定消息按什么规则,路由到哪个队列。
- Direct:处理路由键,需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键为“green”,则只有路由键为“green”的消息才被转发,不会转发路由键为"red",只会转发路由键为“green”;
- Topic:将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”只能匹配一个词;
- Fanout:不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列上;
- Headers:不处理路由键,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到 RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。
在这四种类型里,Direct 类型的 Exchange 投递消息是最快的。其他的 Exchange,MQ 还得花时间计算投递的位置。
所以,能使用 Direct 类型的建议使用 Direct。
多表操作开启事务
①、MybatisPlusConfig配置类添加注解@EnableTransactionManagement
@Configuration
@MapperSCAN("com.michael.ssxy.*.mapper")
@EnableTransactionManagement
public class MybatisPlusConfig{//新的分页插件,一级和二级遵循mybatis规则//需要设置MybatisConfiguration#useDeprecatedExecutor@Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor(){}
}
②、业务方法添加@Transactional
//向两张表添加数据
@Transactional(rollbckFor={Exception.class})
public Long saveOrder(OrderSubmitVo orderParamVo,List<CartInfo> cartInfoList){}
通过RabbitMQ删除生成订单后的redis购物车数据
①、Rabbit工具类
public class RabbitService{@Autowiredprivate RabbitTemplate rabbitTemplate;/**exchange交换机routingKey路由message消息*/public boolean sendMessage(String exchange,String routingKey,ObjectMessage){rabbitTemplate.convertAndSend(exchange,routingKey,message);return true;}
}
②、service-order模块中service业务类
将订单id发送到MQ中
private RabbitService rabbitService;//方法内部
rabbitService.sendMessage(MqConst.EXCHANGE_ORDER_DIRECT,MqConst.ROUTING_DELETE_CART,orderParamVo.getUserId());
③、service-cart模块中创建receiver包
@Component
public class CartReceiver{@Autowiredprivate CartInfoService cartInfoService;/**userId从MQ中传过来的*/@RabbitListener(bindings = @QueueBinding(value=@Queue(value=MqConst.QUEUE_DELETE_CART,derable="true"),//持久化exchange=@Exchange(value=MqConst.EXCHANGE_ORDER_DIRECT),key={MqConst.ROUTING_DELETE_CART}))public void deleteCart(Long userId,Message message,Channel channel){if(userId != null){cartInfoService.deleteCartChecked(userId);}//手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}
业务实现类
@Override
public void deleteCartChecked(Long userId){//根据用户id删除购物车已选中的数据List<CartInfo> cartInfoList = this.getCartCheckedList(userId);//获取skuId集合List<Long> skuIdList = cartInfoList.stream().map(item -> item.getSkuId()).collect(Collectors.toList());//构建redis的keyString cartKey = this.getCartKey(userId);BoundHashOperations<String,String,CartInfo> hashOperation = redisTemplate.boundHashOps(cartKey);skuIdList.forEach(skuId -> {hashOperations.delete(skuId.toString())});
}