4.6 事务消息
4.6.1 流程分析
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1)事务消息发送及提交
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2)事务补偿
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
3)事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- LocalTransactionState.CommitTransaction: 提交事务,它允许消费者消费此消息。
- LocalTransactionState.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- LocalTransactionState.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
4.6.1 发送事务消息
1) 创建事务消息生产者:TransactionMQProducer
使用 TransactionMQProducer
类创建生产者,并指定唯一的 ProducerGroup
,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。
java public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //创建事务监听器 TransactionListener transactionListener = new TransactionListenerImpl(); //创建消息生产者 TransactionMQProducer producer = new TransactionMQProducer("group6"); producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); //生产者设置监听器 producer.setTransactionListener(transactionListener); //启动消息生产者 producer.start(); String[] tags = new String[]{"TagA", "TagB", "TagC"}; for (int i = 0; i < 3; i++) { try { Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); TimeUnit.SECONDS.sleep(1); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } //producer.shutdown(); } }
2)实现事务的监听接口:TransactionListener
当发送半消息成功时,我们使用 executeLocalTransaction
方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation
方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
java public class TransactionListenerImpl implements TransactionListener { /** * 在该方法中执行本地事务 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("执行本地事务"); if (StringUtils.equals("TagA", msg.getTags())) { return LocalTransactionState.COMMIT_MESSAGE; } else if (StringUtils.equals("TagB", msg.getTags())) { return LocalTransactionState.ROLLBACK_MESSAGE; } else { return LocalTransactionState.UNKNOW; } } /** * 执行事务回查 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("MQ检查消息Tag【"+msg.getTags()+"】的本地事务执行结果"); return LocalTransactionState.COMMIT_MESSAGE; } }
4.6.2 使用限制
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的
transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener
类来修改这个行为。 - 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECKIMMUNITYTIMEINSECONDS 来改变这个限制,该参数优先于
transactionMsgTimeout
参数。 - 事务性消息可能不止一次被检查或消费。需要做幂等。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。