RocketMq实现单条发送,批量消费

news/2025/4/1 3:30:30/

1.传送门

安装RocketMq,RocketMq图形界面,spring boot集成RocketMq,参考以下三篇

本文安装版本是目前最新版RocketMq 5.2.0

安装RocketMq(服务器Mq配置外网IP)_rocket mq如何不使用ip连接-CSDN博客

RocketMq安装控制台图形界面_rocketmq图形化界面-CSDN博客

Spring Boot集成RocketMq(一看就会)_springboot集成rocketmq-CSDN博客

RocketMq实现单条发送,批量消费

2.pom依赖修改

boot集成RocketMq版本号是2.3.0(也就是原生的5.2.0版本),服务端同样也是最新的版本5.2.0

           <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.3.0</version>
            </dependency>

3.生产者

    @GetMapping("/sendManyMessage")
    public String sendManyMessage() {
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put("message", "hello Word");
        int count = 320;
        for (int i = 0; i < count; i++) {
            rocketMQTemplate.convertAndSend(JmsConfig.TOPIC, hashMap.toString() + i);
            System.out.println("第 " + i + " 次发送成功");
        }
        return "发送Mq成功,共发送 " + count + "次";
    }

4.消费者编写


import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;import com.wlh.conf.JmsConfig;@Component
@RocketMQMessageListener(topic = JmsConfig.TOPIC, consumerGroup = "my-consumer-group")
public class Consumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(MessageExt message) {String formattedTime = DateTimeFormatter.ofPattern("HH:mm:ss:SSS").format(LocalDateTime.now());System.out.println(formattedTime + ": Mq消费成功,消费内容=====" + new String(message.getBody()));}// DefaultMessageListenerConcurrently中的consumeMessage中debug查看是否生效//maxTransferCountOnMessageInMemory=100;mq中设置最大拉取数量100@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {consumer.setPullInterval(5000);//消费者一次处理的消息的最大批次consumer.setConsumeMessageBatchMaxSize(100);//费者一次从消息队列中拉取的消息批次大小consumer.setPullBatchSize(100);consumer.setConsumeThreadMax(1);consumer.setConsumeThreadMin(1);System.out.println("myDefaultMQPushConsumer init");}
}

5.查看是否生效

在DefaultMessageListenerConcurrently类中的consumeMessage方法中debug查看参数msgs的大小即可知道有没有生效

6.注意

1.RocketMq默认最大是32,如果超过32 时,RocketMq服务端需要修改/conf/broker.conf 

maxTransferCountOnMessageInMemory=100

2.setConsumeMessageBatchMaxSize 这个是最大数量,不超过这个数量,不一定每次都一样的,而且跟生产速度和总条数有关系,比如我设置的每次消费100条,发了320条消息,但实际分四次消费的,每次消费80条,这是mq中有自己一套消费机制的

如果想要看效果的话,可以生产者、消费者分开写,生产者先完全发送消息后,再启动消费者


http://www.ppmy.cn/news/1409396.html

相关文章

python爬虫学习第十五天-------ajax的get和post请求

嗨嗨嗨&#xff01;兄弟姐妹大家好哇&#xff01;今天我们来学习ajax的get和post请求 一、了解ajax Ajax&#xff08;Asynchronous JavaScript and XML&#xff09;是一种在 Web 开发中用于创建交互式网页应用程序的技术。通过 Ajax&#xff0c;网页可以在不重新加载整个页面…

Linux 常用指令及其理论知识

个人主页&#xff1a;仍有未知等待探索-CSDN博客 专题分栏&#xff1a;http://t.csdnimg.cn/Tvyou 欢迎各位指教&#xff01;&#xff01;&#xff01; 目录 一、理论知识 二、基础指令 1、ls指令&#xff08;列出该目录下的所有子目录和文件&#xff09; 语法&#xff1a; …

俺们家Copilot和ChatGPT可不是一回事

俺们微软真的很想让大家使用俺们家的 Copilot 人工智能工具&#xff0c;不管大家是否愿意 。 一份新的报告显示&#xff0c;一些客户遇到了一个问题&#xff1a;它没有 ChatGPT 那么好用。但俺们微软认为&#xff0c;问题出在他们没能正确使用俺们家 Copilot 或不理解这两种产…

[xboard]real6410-3 S3C6410光盘资料与功能测试

文章目录 1 real6410官方资源1 官方镜像2 官方源码3 官方测试2 ok6410a资源官方镜像3 友善6410资源1 官方镜像4 友坚5 uboot学习参考1 real6410官方资源 1 官方镜像 2 官方源码 3 官方测试 2 ok6410a资源 官方镜像 http://bbs.witech.com.cn/thread-44722-1-1.h

MySQL-基本SQL语句编写:运算符练习

运算符练习 1.选择工资不在5000到12000的员工的姓名和工资 SELECT last_name,salary FROM employees #where salary not between 5000 and 12000; WHERE salary < 5000 OR salary > 12000;2.选择在20或50号部门工作的员工姓名和部门号 SELECT last_name,department_id…

Springboot 集成Rabbitmq之延时队列

1.首先确保已经引入了Spring AMQP和RabbitMQ的相关依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> 2. 创建一个普通队列并设置TTL&#x…

各主流电商数据采集机器人|电商数据采集API接口(淘宝/京东/1688)抓取效率提升100%

业务痛点 某电商代运营公司帮助新手电商在京东平台进行开店、创业&#xff0c;需要获取平台上大量的商品信息&#xff0c;以作为帮助客户分析选品趋势&#xff0c;爆款打造的依据。 商家或客服人员通常需要整理、分析的商品数据量巨大。客服每天需要频繁点击、下载大量商品信…

设计模式:抽象工厂

定义 抽象工厂模式是一种创建型设计模式&#xff0c;它提供了一个接口&#xff0c;用于创建一系列相关或相互依赖的对象&#xff0c;而无需指定它们具体的类。这种模式特别适用于处理产品族&#xff0c;但在不可能修改的情况下扩展产品族是困难的。 应用场景 抽象工厂模式通…