接上篇:RocketMQ(六):跟着官网学习敲代码
1 定时消息
在/bin目录下执行以下命令,创建定时主题:
sh mqadmin updateTopic -c DefaultCluster -t DelayTopic001 -o true -n 127.0.0.1:9876 -a +message.type=DELAY
1.1 发送同步、异步定时消息
public class DelayMessageTest {private static final String TOPIC_DELAY = "DelayTopic001";private static final String TAG = "TAG01";public static void main(String[] args) throws ClientException {//加载服务final ClientServiceProvider provider= ClientServiceProvider.loadService();//创建producerProducer producer = ProducerSingleton.getInstance(TOPIC_DELAY);//发送 同步定时消息//sendDelayMessage(provider,producer) ;//发送异步定时消息sentDelayMessageAsync(provider,producer);}public static void sendDelayMessage(ClientServiceProvider provider,Producer producer){for (int i = 0; i < 20; i++) {//消息体byte[] messageBody = ("孔乙己"+i).getBytes(StandardCharsets.UTF_8);//设置消息延迟时间-3秒钟Duration messageDelayTimes = Duration.ofSeconds(3);//设置消息属性Message message = provider.newMessageBuilder().setTag(TAG).setTopic(TOPIC_DELAY)//设置唯一索引键.setKeys("001").setBody(messageBody).setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTimes.toMillis()).build();try {SendReceipt sendReceipt = producer.send(message);log.info("发送定时消息成功,messageId = {}" ,sendReceipt.getMessageId());}catch (Throwable t){log.error("发送定时消息失败",t);}}}//发送异步定时消息public static void sentDelayMessageAsync(ClientServiceProvider provider,Producer producer){for (int i = 0; i < 20; i++) {//消息体byte[] messageBody = ("孔乙己"+i).getBytes(StandardCharsets.UTF_8);Duration messageDelayTimes = Duration.ofSeconds(3);//设置消息属性Message message = provider.newMessageBuilder().setTag(TAG).setTopic(TOPIC_DELAY)//设置唯一索引键.setKeys("001").setBody(messageBody).setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTimes.toMillis()).build();CompletableFuture<SendReceipt> future = producer.sendAsync(message);ExecutorService threadPool = Executors.newFixedThreadPool(6);future.whenCompleteAsync((sendReceipt,throwable) ->{if(null != throwable){log.info("消息发送失败");}log.info("消息发送成功,messageId = {}", sendReceipt.getMessageId());},threadPool);}}
}
1.2 分别使用SimpleConsumer和PushConsumer消费定时消息
package com.jay.demo01;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.*;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.com.google.errorprone.annotations.Var;import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;@Slf4j
public class PushConsumerAndSimpleConsumerTest {//接入点private static final String ENDPOINT = "x.x.x.x:8081";//普通消息消费者组名private static final String GROUPNAME = "groupName01";private static final String DEYALGROUPNAME = "delayGroupName001";//Tag标签private static final String TAG = "TAG01";//普通主题private static final String TOPIC = "topic001";private static final String TOPIC_DELAY = "DelayTopic001";//定时消息主题public static void main(String[] args) throws ClientException, IOException, InterruptedException {final ClientServiceProvider provider = ClientServiceProvider.loadService();//测试PushConsumer//testPushConsumer(provider);//测试SimpleConsumertestSimpleConsumer(provider);}public static void testPushConsumer(ClientServiceProvider provider)throws InterruptedException, IOException, ClientException{//定义ClientConfiguration,设置接入点ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT).build();//定义过滤表达式FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG);//定义PushConsumerlog.info("pushConsumer 开始接收信息");PushConsumer pushConsumer = provider.newPushConsumerBuilder()//设置配置信息.setClientConfiguration(clientConfiguration)//设置消费者组.setConsumerGroup(DEYALGROUPNAME)//设置主题表达式.setSubscriptionExpressions(Collections.singletonMap(TOPIC_DELAY,filterExpression))//设置消息监听器.setMessageListener(messageView -> {log.info("pushConsumer 接收消息成功,messageId = {}", messageView.getMessageId());return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);pushConsumer.close();}public static void testSimpleConsumer(ClientServiceProvider provider) throws ClientException {//1 客户端配置接入点ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT).build();// 2 配置 Tag标签过滤表达式FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG);//3 设置请求时间Duration duration = Duration.ofSeconds(30);//4 创建SimpleConsumerSimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)//配置消费者组.setConsumerGroup(DEYALGROUPNAME)//配置等待时间.setAwaitDuration(duration)//为消费者设置订阅属性,包括主题和标签.setSubscriptionExpressions(Collections.singletonMap(TOPIC_DELAY, filterExpression)).build();//5 设置最多消息数量int maxMessageNum = 16;// 6 设置不可见时间Duration invisibleDuration = Duration.ofSeconds(15);//7 接收信息do{log.info("simpleConsumer 开始接收信息");final List<MessageView> messageViewList = simpleConsumer.receive(maxMessageNum,invisibleDuration);log.info("simpleConsumer 接收{} 消息",messageViewList.size());for(MessageView messageView : messageViewList){final MessageId messageId = messageView.getMessageId();try {simpleConsumer.ack(messageView);log.info("消息被成功消费,messageId = {}",messageId);}catch (Throwable t){log.error("消息消费失败,messageId = {}",messageId,t);}}}while (true);}public static void testPushConsumerByDelayMessage(ClientServiceProvider provider) throws ClientException, InterruptedException, IOException {//定义ClientConfiguration,设置接入点ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT).build();//定义过滤表达式FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG);PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(DEYALGROUPNAME).setSubscriptionExpressions(Collections.singletonMap(TOPIC_DELAY, filterExpression)).setMessageListener(new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {log.info("成功消费消息,messageId = {}" + messageView);return ConsumeResult.SUCCESS;}}).build();Thread.sleep(Long.MAX_VALUE);pushConsumer.close();}
}
2 顺序消息
在/bin目录下执行以下命令,创建顺序主题:
sh mqadmin updateTopic -c DefaultCluster -t FIFOTopic -o true -n 192.168.31.77:9876 -a +message.type=FIFO
2.1 发送同步、异步顺序消息
package com.jay.demo01;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
public class FIFOMessage {private static final String FIFO_DELAY = "FIFOTopic";private static final String TAG = "TAG01";public static void main(String[] args) throws ClientException {//加载服务final ClientServiceProvider provider= ClientServiceProvider.loadService();//创建producerProducer producer = ProducerSingleton.getInstance(FIFO_DELAY);//发送同步顺序消息//sendFIFOMessage(provider,producer) ;//发送异步顺序消息sentFIFOMessageAsync(provider,producer);}public static void sendFIFOMessage(ClientServiceProvider provider,Producer producer){for (int i = 0; i < 20; i++) {//消息体byte[] messageBody = ("孔乙己"+i).getBytes(StandardCharsets.UTF_8);//设置消息属性Message message = provider.newMessageBuilder().setTag(TAG).setTopic(FIFO_DELAY)//设置唯一索引键.setKeys("001").setMessageGroup("messageGroup001").setBody(messageBody).build();try {SendReceipt sendReceipt = producer.send(message);log.info("发送定时消息成功,messageId = {};messageBody = {}" ,sendReceipt.getMessageId(),messageBody.toString());}catch (Throwable t){log.error("发送定时消息失败",t);}}}public static void sentFIFOMessageAsync(ClientServiceProvider provider,Producer producer){for (int i = 0; i < 20; i++) {byte[] messageBody = ("孔乙己"+i).getBytes(StandardCharsets.UTF_8);Message message = provider.newMessageBuilder()//设置主题.setTopic(FIFO_DELAY)//设置标签.setTag(TAG)//设置唯一索引键.setKeys("001").setMessageGroup("messageGroup001")//设置消息体.setBody(messageBody).build();CompletableFuture<SendReceipt> future = producer.sendAsync(message);ExecutorService threadPool = Executors.newFixedThreadPool(5);future.whenCompleteAsync((sendReceipt,throwable) ->{if(null != throwable){log.info("消息发送失败");}log.info("发送定时消息成功,messageId = {};messageBody = {}" ,sendReceipt.getMessageId(),messageBody.toString());},threadPool);}}}
2.2 分别使用SimpleConsumer和PushConsumer消费顺序消息
package com.jay.demo01;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.*;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.com.google.errorprone.annotations.Var;import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;@Slf4j
public class PushConsumerAndSimpleConsumerTest {//接入点private static final String ENDPOINT = "192.168.31.77:8081";//普通消息消费者组名private static final String GROUPNAME = "groupName01";private static final String DEYALGROUPNAME = "delayGroupName001";//Tag标签private static final String TAG = "TAG01";//普通主题private static final String TOPIC = "topic001";private static final String TOPIC_DELAY = "DelayTopic001";private static final String TOPIC_FIFO = "FIFOTopic";private static final String FIFO_GROUP = "fifoGroup001";//定时消息主题public static void main(String[] args) throws ClientException, IOException, InterruptedException {final ClientServiceProvider provider = ClientServiceProvider.loadService();//测试PushConsumertestPushConsumer(provider);//测试SimpleConsumer//testSimpleConsumer(provider);}public static void testPushConsumer(ClientServiceProvider provider)throws InterruptedException, IOException, ClientException{//定义ClientConfiguration,设置接入点ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT).build();//定义过滤表达式FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG);//定义PushConsumerlog.info("pushConsumer 开始接收信息");PushConsumer pushConsumer = provider.newPushConsumerBuilder()//设置配置信息.setClientConfiguration(clientConfiguration)//设置消费者组.setConsumerGroup(FIFO_GROUP)//设置主题表达式.setSubscriptionExpressions(Collections.singletonMap(TOPIC_FIFO,filterExpression))//设置消息监听器.setMessageListener(messageView -> {log.info("消费顺序消息成功,messageId = {};messageBody = {}" ,messageView.getMessageId(),messageView.getBody().toString());return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);pushConsumer.close();}public static void testSimpleConsumer(ClientServiceProvider provider) throws ClientException {//1 客户端配置接入点ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT).build();// 2 配置 Tag标签过滤表达式FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG);//3 设置请求时间Duration duration = Duration.ofSeconds(30);//4 创建SimpleConsumerSimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)//配置消费者组.setConsumerGroup(FIFO_GROUP)//配置等待时间.setAwaitDuration(duration)//为消费者设置订阅属性,包括主题和标签.setSubscriptionExpressions(Collections.singletonMap(TOPIC_FIFO, filterExpression)).build();//5 设置最多消息数量int maxMessageNum = 16;// 6 设置不可见时间Duration invisibleDuration = Duration.ofSeconds(15);//7 接收信息do{log.info("simpleConsumer 开始接收信息");final List<MessageView> messageViewList = simpleConsumer.receive(maxMessageNum,invisibleDuration);log.info("simpleConsumer 接收{} 消息",messageViewList.size());for(MessageView messageView : messageViewList){final MessageId messageId = messageView.getMessageId();try {simpleConsumer.ack(messageView);log.info("消费顺序消息成功,messageId = {};messageBody = {}" ,messageId,messageView.getBody().toString());}catch (Throwable t){log.error("消息消费失败,messageId = {}",messageId,t);}}}while (true);}public static void testPushConsumerByDelayMessage(ClientServiceProvider provider) throws ClientException, InterruptedException, IOException {//定义ClientConfiguration,设置接入点ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT).build();//定义过滤表达式FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG);PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(DEYALGROUPNAME).setSubscriptionExpressions(Collections.singletonMap(TOPIC_DELAY, filterExpression)).setMessageListener(new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {log.info("成功消费消息,messageId = {}" + messageView);return ConsumeResult.SUCCESS;}}).build();Thread.sleep(Long.MAX_VALUE);pushConsumer.close();}
}
3 事务消息
在/bin/目录下执行命令,创建事务主题:
sh mqadmin updatetopic -n 192.168.31.77:9876 -t TransactionTopic001 -c DefaultCluster -a +message.type=TRANSACTION
3.1 发送同步事务消息
package com.jay.demo01;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.*;import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
public class TransactionMessage {private static final String TRANSACION_TOPIC = "TransactionTopic001";private static final String TAG = "TAG01";public static void main(String[] args) throws ClientException {ClientServiceProvider provider = ClientServiceProvider.loadService();//发送同步事务消息sendTransactionMessage(provider) ;}private static void sendTransactionMessage(ClientServiceProvider provider) throws ClientException {TransactionChecker checker = messageView -> {log.info("接收事务消息检查, message={}", messageView);return TransactionResolution.COMMIT;};Producer producer = ProducerSingleton.getTransactionalProducer(checker,TRANSACION_TOPIC);for (int i = 0; i < 20; i++) {Transaction transaction = producer.beginTransaction();//消息体byte[] messageBody = ("孔乙己"+i).getBytes(StandardCharsets.UTF_8);//设置消息属性Message message = provider.newMessageBuilder().setTag(TAG).setTopic(TRANSACION_TOPIC)//设置唯一索引键.setKeys("001").setBody(messageBody).build();try {SendReceipt sendReceipt = producer.send(message,transaction);log.info("发送事务消息成功,messageId = {};messageBody = {}" ,sendReceipt.getMessageId(),messageBody.toString());}catch (Throwable t){log.error("发送事务消息失败",t);}transaction.commit();}}
}
3.2 分别使用SimpleConsumer 和PushConsumer消费事务消息
package com.jay.demo01;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.*;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.com.google.errorprone.annotations.Var;import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;@Slf4j
public class PushConsumerAndSimpleConsumerTest {//接入点private static final String ENDPOINT = "x.x.x.x:8081";//普通消息消费者组名private static final String GROUPNAME = "groupName01";private static final String DEYALGROUPNAME = "delayGroupName001";//Tag标签private static final String TAG = "TAG01";//普通主题private static final String TOPIC = "topic001";private static final String TOPIC_DELAY = "DelayTopic001";private static final String TOPIC_FIFO = "FIFOTopic";private static final String FIFO_GROUP = "fifoGroup001";private static final String TRANSACION_TOPIC = "TransactionTopic001";private static final String TRANSACION_GROUP = "transactionGroupName001";//定时消息主题public static void main(String[] args) throws ClientException, IOException, InterruptedException {final ClientServiceProvider provider = ClientServiceProvider.loadService();//测试PushConsumer//testPushConsumer(provider);//测试SimpleConsumertestSimpleConsumer(provider);}public static void testPushConsumer(ClientServiceProvider provider)throws InterruptedException, IOException, ClientException{//定义ClientConfiguration,设置接入点ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT).build();//定义过滤表达式FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG);//定义PushConsumerlog.info("pushConsumer 开始接收信息");PushConsumer pushConsumer = provider.newPushConsumerBuilder()//设置配置信息.setClientConfiguration(clientConfiguration)//设置消费者组.setConsumerGroup(TRANSACION_GROUP)//设置主题表达式.setSubscriptionExpressions(Collections.singletonMap(TRANSACION_TOPIC,filterExpression))//设置消息监听器.setMessageListener(messageView -> {log.info("消费事务消息成功,messageId = {};messageBody = {}" ,messageView.getMessageId(),messageView.getBody().toString());return ConsumeResult.SUCCESS;}).build();Thread.sleep(Long.MAX_VALUE);pushConsumer.close();}public static void testSimpleConsumer(ClientServiceProvider provider) throws ClientException {//1 客户端配置接入点ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT).build();// 2 配置 Tag标签过滤表达式FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG);//3 设置请求时间Duration duration = Duration.ofSeconds(30);//4 创建SimpleConsumerSimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)//配置消费者组.setConsumerGroup(TRANSACION_GROUP)//配置等待时间.setAwaitDuration(duration)//为消费者设置订阅属性,包括主题和标签.setSubscriptionExpressions(Collections.singletonMap(TRANSACION_TOPIC, filterExpression)).build();//5 设置最多消息数量int maxMessageNum = 16;// 6 设置不可见时间Duration invisibleDuration = Duration.ofSeconds(15);//7 接收信息do{log.info("simpleConsumer 开始接收信息");final List<MessageView> messageViewList = simpleConsumer.receive(maxMessageNum,invisibleDuration);log.info("simpleConsumer 接收{} 消息",messageViewList.size());for(MessageView messageView : messageViewList){final MessageId messageId = messageView.getMessageId();try {simpleConsumer.ack(messageView);log.info("消费事务消息成功,messageId = {};messageBody = {}" ,messageId,messageView.getBody().toString());}catch (Throwable t){log.error("消息消费失败,messageId = {}",messageId,t);}}}while (true);}public static void testPushConsumerByDelayMessage(ClientServiceProvider provider) throws ClientException, InterruptedException, IOException {//定义ClientConfiguration,设置接入点ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT).build();//定义过滤表达式FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG);PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration).setConsumerGroup(DEYALGROUPNAME).setSubscriptionExpressions(Collections.singletonMap(TOPIC_DELAY, filterExpression)).setMessageListener(new MessageListener() {@Overridepublic ConsumeResult consume(MessageView messageView) {log.info("成功消费消息,messageId = {}" + messageView);return ConsumeResult.SUCCESS;}}).build();Thread.sleep(Long.MAX_VALUE);pushConsumer.close();}
}