RocketMQTemplate 常用方法以及介绍
RocketMQTemplate是SpringBoot整合RocketMQ的核心类,它封装了RocketMQ的Java SDK,提供了一系列方便的方法用于发送消息。常用的方法如下:
-
同步发送消息:SendResult syncSend(String destination, Message<?> message) throws RocketMQException,该方法用于同步发送消息,destination表示消息发送目的地,可以是主题或者队列,message表示消息内容,返回SendResult对象表示发送结果。
-
异步发送消息:void asyncSend(String destination, Message<?> message, SendCallback sendCallback) throws RocketMQException,该方法用于异步发送消息,destination表示消息发送目的地,可以是主题或者队列,message表示消息内容,sendCallback表示发送成功或失败时的回调函数。
-
单向发送消息:void sendOneWay(String destination, Message<?> message) throws RocketMQException,该方法用于单向发送消息,即不关心发送结果,destination表示消息发送目的地,可以是主题或者队列,message表示消息内容。
-
发送顺序消息:SendResult send(String destination, Message<?> message, MessageQueueSelector selector, Object arg) throws RocketMQException,该方法用于发送顺序消息,destination表示消息发送目的地,可以是主题或者队列,message表示消息内容,selector表示选择队列的策略,arg表示传递的参数。
-
发送事务消息:TransactionSendResult sendMessageInTransaction(Message<?> msg, Object arg) throws RocketMQException,该方法用于发送事务消息,msg表示消息内容,arg表示传递的参数。
除了上述常用方法外,RocketMQTemplate还提供了其他一些方法,如批量发送消息、延迟发送消息等,可以根据实际需求选择使用。
介绍一下 convertAndSend() 方法
convertAndSend是RocketMQTemplate的一个重载方法,用于将Java对象转换为消息并发送。该方法的定义如下:
<T> SendResult convertAndSend(String destination, T message) throws RocketMQException;
其中,destination表示消息发送目的地,可以是主题或者队列,message表示Java对象,会自动将其转换为消息发送。在将Java对象转换为消息时,需要使用消息转换器,可以通过设置RocketMQTemplate的messageConverter属性来指定转换器,例如:
rocketMQTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
以上代码指定了使用Jackson2JsonMessageConverter作为消息转换器,将Java对象转换为JSON格式的消息发送。在调用convertAndSend方法时,只需要传入目的地和Java对象即可,例如:
User user = new User("Tom", 18);
rocketMQTemplate.convertAndSend("topic_test", user);
以上代码将Java对象user转换为消息并发送到主题topic_test中。
详细介绍一下 convertAndSend() 及其重载方法以及它是同步发送还是异步发送
convertAndSend方法还有另外两个重载方法,分别是:
<T> SendResult convertAndSend(String destination, T message, MessagePostProcessor messagePostProcessor) throws RocketMQException;
<T> void convertAndSend(String destination, T message, SendCallback sendCallback) throws RocketMQException;
第一个重载方法可以在发送消息前对消息进行一些处理,例如设置消息头、消息属性等,需要传入一个MessagePostProcessor对象。例如:
rocketMQTemplate.convertAndSend("topic_test", user, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws RocketMQException {// 设置消息属性message.putUserProperty("key", "value");return message;}
});
以上代码在发送消息前设置了消息属性,将会在消息发送时携带该属性。
第二个重载方法用于异步发送消息,会在发送消息后立即返回,不会等待发送结果。需要传入一个SendCallback对象,用于处理发送结果。例如:
rocketMQTemplate.convertAndSend("topic_test", user, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 发送成功System.out.println("发送成功,消息ID:" + sendResult.getMsgId());}@Overridepublic void onException(Throwable throwable) {// 发送失败System.out.println("发送失败,原因:" + throwable.getMessage());}
});
以上代码异步发送消息,在发送成功或失败时分别打印消息ID和失败原因。可以看到,convertAndSend方法的重载方法既支持同步发送消息,也支持异步发送消息,具体使用哪种方式取决于传入的参数。
传入什么参数是同步发送什么是异步发送
在RocketMQTemplate中,convertAndSend方法有以下两种重载形式:
<T> SendResult convertAndSend(String destination, T message) throws RocketMQException;
<T> void convertAndSend(String destination, T message, SendCallback sendCallback) throws RocketMQException;
其中,第一种是同步发送消息,第二种是异步发送消息。
如果只调用convertAndSend方法,并且不传入SendCallback对象,则会使用同步方式发送消息。例如:
rocketMQTemplate.convertAndSend("topic_test", user);
以上代码会使用同步方式发送消息,并阻塞当前线程等待发送结果。如果发送成功,该方法会返回SendResult对象;如果发送失败,该方法会抛出RocketMQException异常。
如果需要使用异步方式发送消息,则需要调用第二种重载形式,并传入SendCallback对象。例如:
rocketMQTemplate.convertAndSend("topic_test", user, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 发送成功System.out.println("发送成功,消息ID:" + sendResult.getMsgId());}@Overridepublic void onException(Throwable throwable) {// 发送失败System.out.println("发送失败,原因:" + throwable.getMessage());}
});
以上代码会使用异步方式发送消息,并立即返回,不会阻塞当前线程。当发送结果可用时,会调用SendCallback对象的onSuccess方法或者onException方法,分别处理发送成功和失败的情况。
convertAndSend(“topic_test”, user, new SendCallback() )为什么我的没有这个方法
convertAndSend方法的重载形式可能因为不同的版本而略有不同。如果您的版本没有convertAndSend(String destination, T message, SendCallback sendCallback)方法,可能是因为您使用的是较早的版本。
如果您需要在发送消息时使用异步方式,并且需要处理发送结果,可以考虑使用send(String destination, Message<?> message, SendCallback sendCallback)方法,该方法是RocketMQTemplate的另一个方法,用于发送org.apache.rocketmq.common.message.Message类型的消息。该方法的定义如下:
void send(String destination, Message<?> message, SendCallback sendCallback) throws RocketMQException;
其中,destination表示消息发送目的地,可以是主题或者队列,message表示RocketMQ消息。在调用该方法时,需要手动创建RocketMQ消息,并将Java对象转换为消息体,例如:
User user = new User("Tom", 18);
Message<User> message = MessageBuilder.withPayload(user).build();
rocketMQTemplate.send("topic_test", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 发送成功System.out.println("发送成功,消息ID:" + sendResult.getMsgId());}@Overridepublic void onException(Throwable throwable) {// 发送失败System.out.println("发送失败,原因:" + throwable.getMessage());}
});
以上代码手动创建了一个RocketMQ消息,并将Java对象user作为消息体,使用send方法发送消息,在发送成功或失败时分别打印消息ID和失败原因。可以看到,send方法支持异步发送消息,并且可以处理发送结果,是一个比较灵活的发送消息方式。