1.传送门
安装RocketMq,RocketMq图形界面,spring boot集成RocketMq,参考以下三篇
本文安装版本是目前最新版RocketMq 5.2.0
安装RocketMq(服务器Mq配置外网IP)_rocket mq如何不使用ip连接-CSDN博客
RocketMq安装控制台图形界面_rocketmq图形化界面-CSDN博客
Spring Boot集成RocketMq(一看就会)_springboot集成rocketmq-CSDN博客
RocketMq实现单条发送,批量消费
2.pom依赖修改
boot集成RocketMq版本号是2.3.0(也就是原生的5.2.0版本),服务端同样也是最新的版本5.2.0
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
3.生产者
@GetMapping("/sendManyMessage")
public String sendManyMessage() {
HashMap<String, Object> hashMap = new HashMap<>();
hashMap.put("message", "hello Word");
int count = 320;
for (int i = 0; i < count; i++) {
rocketMQTemplate.convertAndSend(JmsConfig.TOPIC, hashMap.toString() + i);
System.out.println("第 " + i + " 次发送成功");
}
return "发送Mq成功,共发送 " + count + "次";
}
4.消费者编写
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import com.wlh.conf.JmsConfig;@Component
@RocketMQMessageListener(topic = JmsConfig.TOPIC, consumerGroup = "my-consumer-group")
public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(MessageExt message) {String formattedTime = DateTimeFormatter.ofPattern("HH:mm:ss:SSS").format(LocalDateTime.now());System.out.println(formattedTime + ": Mq消费成功,消费内容=====" + new String(message.getBody()));}// DefaultMessageListenerConcurrently中的consumeMessage中debug查看是否生效//maxTransferCountOnMessageInMemory=100;mq中设置最大拉取数量100@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {consumer.setPullInterval(5000);//消费者一次处理的消息的最大批次consumer.setConsumeMessageBatchMaxSize(100);//费者一次从消息队列中拉取的消息批次大小consumer.setPullBatchSize(100);consumer.setConsumeThreadMax(1);consumer.setConsumeThreadMin(1);System.out.println("myDefaultMQPushConsumer init");}
}
5.查看是否生效
在DefaultMessageListenerConcurrently类中的consumeMessage方法中debug查看参数msgs的大小即可知道有没有生效
6.注意
1.RocketMq默认最大是32,如果超过32 时,RocketMq服务端需要修改/conf/broker.conf
maxTransferCountOnMessageInMemory=100
2.setConsumeMessageBatchMaxSize 这个是最大数量,不超过这个数量,不一定每次都一样的,而且跟生产速度和总条数有关系,比如我设置的每次消费100条,发了320条消息,但实际分四次消费的,每次消费80条,这是mq中有自己一套消费机制的
如果想要看效果的话,可以生产者、消费者分开写,生产者先完全发送消息后,再启动消费者