RocketMQTemplate 发送消息的高级用法

news/2025/1/16 1:02:54/

Apache RocketMQ 是一款强大的分布式消息中间件,与 Spring Boot 集成后,通过 RocketMQTemplate 可以实现在应用程序中方便地发送消息。在本文中,我们将深入探讨 RocketMQTemplate 的一些高级用法,以提供更灵活的消息发送和控制。

引言

消息中间件在现代分布式系统中起着至关重要的作用,能够解耦应用组件、提高系统可伸缩性。Apache RocketMQ 作为一款开源消息中间件,提供了高吞吐量、低延迟和高可靠性的特性。通过 Spring Boot 集成,我们可以使用 RocketMQTemplate 更方便地在应用程序中发送消息。

RocketMQTemplate 基础用法回顾

首先,我们回顾一下 RocketMQTemplate 的基础用法。它是 RocketMQ 提供的 Spring Boot 集成组件,封装了发送消息的逻辑,让消息发送变得简单。

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageSenderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic, String message) {rocketMQTemplate.convertAndSend(topic, message);}
}

在这个例子中,我们通过 convertAndSend 方法发送了一条消息到指定的主题。

RocketMQTemplate 高级用法

1. 发送延迟消息

RocketMQTemplate 支持发送延迟消息,通过设置 delayLevel 参数来指定消息的延迟级别。

rocketMQTemplate.syncSend("topic", MessageBuilder.withPayload("Delayed Message").build(), 3000, 3);

在这个例子中,我们发送了一条延迟 3000 毫秒的消息,延迟级别为 3。

2. 同步发送和异步发送

RocketMQTemplate 提供了同步和异步发送消息的方法,以满足不同的业务场景。

// 同步发送
SendResult syncResult = rocketMQTemplate.syncSend("topic", "Sync Message");// 异步发送
rocketMQTemplate.asyncSend("topic", "Async Message", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理成功回调}@Overridepublic void onException(Throwable e) {// 处理异常回调}
});

3. 单向发送

RocketMQTemplate 支持单向发送消息,即不等待发送结果的发送方式。

javaCopy code
rocketMQTemplate.sendOneWay("topic", "One-Way Message");

4. 消息队列选择器

消息队列选择器允许你根据一些逻辑,将消息发送到特定的消息队列,从而实现更精细的消息路由。在 RocketMQ 中,消息队列选择器是通过实现 MessageQueueSelector 接口来实现的。

代码示例
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.List;@Service
public class OrderMessageSenderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendOrderMessage(String topic, OrderMessage orderMessage, int orderId) {// 根据订单ID选择消息队列MessageQueueSelector selector = (mqs, msg, arg) -> {int orderIdToUse = (int) arg;int index = orderIdToUse % mqs.size();return mqs.get(index);};// 发送消息到特定的消息队列SendResult sendResult = rocketMQTemplate.send(topic, MessageBuilder.withPayload(orderMessage).build(), selector, orderId);// 处理发送结果System.out.println("Send Order Message Result: " + sendResult);}
}

在这个例子中,我们通过 RocketMQTemplate 发送了一个订单消息,并使用了 MessageQueueSelector 来选择消息队列。具体的选择逻辑通过 lambda 表达式实现,这里根据订单ID选择了特定的消息队列。

5. 事务消息

通过 RocketMQTemplate 支持发送事务消息,确保消息的可靠性传递。

// 发送事务消息
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction("tx-group", "topic", MessageBuilder.withPayload("Transaction Message").build(), null);

你需要实现 TransactionListener 接口,并在事务消息中指定事务监听器。

总结

通过深入了解 RocketMQTemplate 的高级用法,我们可以更灵活地控制消息的发送和路由。从延迟消息到事务消息,RocketMQTemplate 提供了多种方式满足不同业务场景下的需求。

在实际应用中,要根据业务需求和性能考虑来选择使用哪种方式。通过合理使用 RocketMQTemplate,我们可以更好地利用 RocketMQ 提供的特性,构建高效可靠的分布式消息系统。


http://www.ppmy.cn/news/1258340.html

相关文章

3分钟在CentOS 7上离线安装Docker

在CentOS 7上离线安装Docker的详细步骤如下: 环境检查和准备 检查内核版本:Docker要求系统为64位且内核版本至少为3.10。使用命令uname -r查看内核版本。 检查CentOS版本:通过命令cat /etc/redhat-release查看版本信息。 更新yum包&#xff0…

在 CentOS 7 上使用 `redis` 用户安装 Redis 7.2.3 的完整步骤

在 CentOS 7 上使用 redis 用户安装 Redis 7.2.3 的完整步骤如下: 安装依赖:首先,您需要安装一些必要的软件包,以编译和运行 Redis。打开终端并执行以下命令: sudo yum install gcc make创建 Redis 用户:为…

大数据之HBase(二)

Master详细架构 位置:namenode实现类:HMaster组成 负载均衡器:通过meta了解region的分配,通过zk了解rs的启动情况,5分钟调控一次分配平衡元数据表管理器:管理自己的预写日志,如果宕机&#xff…

FlowJo软件的简单介绍 掌控流式细胞分析的科技巨匠 FlowJo10

FlowJo 10 for Mac是一款强大的流式细胞数据分析软件,具有以下功能: 数据导入与预处理:FlowJo 10可以轻松导入各种类型的流式细胞数据,并对数据进行预处理,包括去噪、背景校正等,以确保数据的准确性和可靠…

嵌入式中,为什么使用 cout 输出会导致段错误,而使用 printf 却不会

嵌入式中,为什么使用 cout 输出会导致段错误,而使用 printf 却不会 环境: 硬件:板子(ARMv7)、串口板、电源、电脑; 系统:Linux; 语言:C; 程序在…

postgresql-effective_cache_size参数详解

在 PostgreSQL 中,effective_cache_size 是一个配置参数,用于告诉查询规划器关于系统中可用缓存的估计信息。这个参数并不表示实际的内存量,而是用于告诉 PostgreSQL 查询规划器系统中可用的磁盘缓存和操作系统级别的文件系统缓存的大小。它用…

离散数学-函数

1、函数的概念 1&#xff09;函数定义 定义&#xff1a;设 x &#xff0c; y是集合&#xff0c;f是x到y的二元关系&#xff0c;若对每个x属于X&#xff0c;都有唯一的y属于Y&#xff0c;使得<x,y>属于f&#xff0c;则称f是x到y的函数或映射&#xff0c;记作&#xff1a…

redis整理

1. 数据类型 string , hash, 链表&#xff0c;Set, ZSet. string 底层是sds, sds与普通字符串的区别: a. sds存储了字符串长度&#xff0c;获取长度的时间复杂度为O(1); b. sds操作字符串会预先判断长度是否满足要求, 不会有字符串溢出的情况出现; c. 提前预分配, 惰性回收…