SpringCloud Alibaba】(十三)学习 RocketMQ 消息队列

news/2024/9/17 18:58:37/ 标签: spring cloud, 学习, rocketmq

目录

  • 1、MQ 使用场景与选型对比
    • 1.1、MQ 的使用场景
    • 1.2、引入 MQ 后的注意事项
    • 1.3、MQ 选型对比
  • 2、下载、安装 RocketMQ 及 RocketMQ 控制台
    • 2.1、下载安装 RocketMQ
    • 2.2、测试 RocketMQ 环境
    • 2.3、RocketMQ 控制台【图形化管理控制台】
      • 2.3.1、下载、安装
      • 2.3.2、验证 RocketMQ 控制台
  • 3、RocketMQ 快速入门
    • 3.1、导入 RocketMQ 依赖
    • 3.2、编写生产者代码
    • 3.3、编写消费者代码
    • 3.4、测试消息的生产与消费
  • 4、集成 RocketMQ
    • 4.1、用户微服务集成 RocketMQ
    • 4.2、订单微服务整合 RocketMQ
    • 4.3、测试集成的 RocketMQ

1、MQ 使用场景与选型对比

1.1、MQ 的使用场景

MQ 的英文全称是 Message Queue,翻译成中文就是消息队列,队列实现了先进先出(FIFO)的消息模型。通过消息队列,我们可以实现多个进程之间的通信,例如,可以实现多个微服务之间的消息通信。MQ 的最简模型就是生产者生产消息,将消息发送到 MQ,消息消费者订阅 MQ,消费消息

在这里插入图片描述

MQ的使用场景通常包含:异步解耦、流量削峰

1.2、引入 MQ 后的注意事项

引入MQ最大的优点就是异步解耦和流量削峰,但是引入 MQ 后也有很多需要注意的事项和问题,主要包括:系统的整体可用性降低、系统的复杂度变高、引入了消息一致性的问题

  • 系统的整体可用性降低:在对一个系统进行架构设计时,引入的外部依赖越多,系统的稳定性和可用性就会降低。系统中引入了MQ,部分业务就会出现强依赖MQ的现象,此时,如果MQ宕机,则部分业务就会变得不可用。所以,引入MQ时,我们就要考虑如何实现MQ的高可用。
  • 系统的复杂度变高:引入 MQ 后,会使之前的同步接口调用变成通过 MQ 的异步调用,在实际的开发过程中,异步调用会比同步调用复杂的多。并且异步调用出现问题后,重现问题,定位问题和解决问题都会比同步调用复杂的多。并且引入 MQ 后,还要考虑如何保证消息的顺序等问题
  • 消息一致性问题 :引入 MQ 后,不得不考虑的一个问题就是消息的一致性问题。这期间就要考虑如何保证消息不丢失,消息幂等和消息数据处理的幂等性问题

1.3、MQ 选型对比

目前,在行业内使用的比较多的 MQ 包含 RabbitMQ、Kafka 和 RocketMQ。这里,我将三者的对比简单整理了个表格,如下所示:

消息中间件(MQ)优点缺点使用场景
RabbitMQ功能全面、消息的可靠性比较高吞吐量低,消息大量积累会影响性能,使用的开发语言是 erlang,不好定制功能规模不大的场景
Kafka吞吐量最高,性能最好,集群模式下高可用功能上比较单一,会丢失部分数据日志分析,大数据场景
RocketMQ吞吐量高,性能高,可用性高,功能全面。使用 Java 语言开发,容易定制功能开源版不如阿里云上版文档比较简单几乎支持所有场景,包含大数据场景和业务场景

2、下载、安装 RocketMQ 及 RocketMQ 控制台

2.1、下载安装 RocketMQ

Apache RocketMQ开发者指南

Windows 部署 RocketMQ

RocketMQ 下载、安装过程:

  1. 下载、解压。下载地址:二进制版本 4.9.2 官方下载

修改配置文件 conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH# 自动创建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存储路径
storePathRootDir=D:/dev/rocketmq-4.9.2/data/dataDir
# commitLog路径
storePathCommitLog=D:/dev/rocketmq-4.9.2/data/dataDir/commitlog
# 消息队列存储路径
storePathConsumeQueue=D:/dev/rocketmq-4.9.2/data/dataDir/consumequeue
# 消息索引存储路径
storePathIndex=D:/dev/rocketmq-4.9.2/data/dataDir/index
# checkpoint文件路径
storeCheckpoint=D:/dev/rocketmq-4.9.2/data/dataDir/checkpoint
# abort文件存储路径
abortFile=D:/dev/rocketmq-4.9.2/data/dataDir/abort
  1. 配置 ROCKETMQ 环境变量:否则,启动报错

    • 先添加一个环境变量 ROCKETMQ_HOME
      在这里插入图片描述
    • 在 Path 中进行添加

在这里插入图片描述

  1. 内存分配设置【可选】

①:编辑 server 启动文件:bin/runserver.cmd

set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

在这里插入图片描述

②:编辑 broker 启动文件:bin/runbroker.cmd

set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx256m -Xmn128m"

在这里插入图片描述

  1. 修改日志存储默认路径:conf/logback_broker.xmlconf/logback_namesrv.xmlconf/logback_tools.xml

在这里插入图片描述

  1. 启动 NameServer

命令行执行:

mqnamesrv.cmd

打印出如下信息,说明 RocketMQ 的 NameServer 启动成功了:

在这里插入图片描述

  1. 启动 Broker

命令行执行:

mqbroker.cmd -n localhost:9876

打印出如下信息,说明 RocketMQ 的 Broker 服务启动成功了:

在这里插入图片描述

2.2、测试 RocketMQ 环境

RocketMQ 内置了大量的测试案例,并且这些测试案例可以通过 RocketMQ 的 bin 目录下的 tools.cmd 命令进行测试

1、启动生产者程序向 RocketMQ 发送消息

重新打开 cmd 命令行,进入 RocketMQ 的 bin 目录,在命令行输入如下命令调用 RocketMQ 自带的生产者程序向 RocketMQ 发送消息:

set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer

可以看到,执行完上述两条命令后,生产者程序开始向 RocketMQ 发送消息:

在这里插入图片描述

2、启动消费者程序消费 RocketMQ 中的消息

重新打开 cmd 命令行,进入 RocketMQ 的 bin 目录,在命令行输入如下命令调用 RocketMQ 自带的消费者程序消费 RocketMQ 中的消息

set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer

可以看到,执行完上述两条命令后,消费者程序开始消费 RocketMQ 中的消息:

在这里插入图片描述

2.3、RocketMQ 控制台【图形化管理控制台】

2.3.1、下载、安装

RocketMQ 控制台本质上是一个 SpringBoot 程序,启动后默认监听的端口是 8080。RocketMQ 的新版控制台已经从 RocketMQ 的 rocketmq-externals 项目中分离出来了。也就是说,新版的 RocketMQ 控制台已经从 https://github.com/apache/rocketmq-externals 链接所示的项目中分离出来,新版控制台的链接地址为:https://github.com/apache/rocketmq-dashboard

1、下载 RocketMQ 控制台源码
2、修改配置:src/main/resources/application.properties

  • 端口:7000
  • namesrvAddr:localhost:9876
  • dataPath:D:/dev/rocket-mq-master/data

在这里插入图片描述

3、打开 cmd 命令行,进入 RocketMQ 控制台源码的根目录,输入如下 Maven 命令开始编 RocketMQ 控制台的源码:

mvn clean install -Dmaven.test.skip=true

4、编译完成后,会在 RocketMQ 控制台源码的根目录下生成 target 目录,进入 target 目录,可以看到生成了 rocketmq-dashboard-1.0.1-SNAPSHOT.jar 文件,如下所示

在这里插入图片描述

5、重新打开 cmd 命令行,进入 rocketmq-dashboard-1.0.0.jar 文件所在的目录,在命令行直接输入如下命令启动 RocketMQ 控制台程序

java -jar rocketmq-console-ng-1.0.0.jar

2.3.2、验证 RocketMQ 控制台

在浏览器中输入 http://localhost:7000 后,出现如下画面说明 RocketMQ 启动成功【可切换语言】:

在这里插入图片描述

选择【Topic】菜单想后可以看到目前 RocketMQ 中存在一个名称为 TopicTest 的主题:

在这里插入图片描述

点击 TopicTest 主题的状态按钮,如下所示:

在这里插入图片描述

可以看到,正确显示出了 TopicTest 主题的消息队列信息,说明 RocketMQ 控制台启动成功了

3、RocketMQ 快速入门

3.1、导入 RocketMQ 依赖

在用户微服务 shop-userpom.xml 中,添加 RocketMQ 相关的依赖,如下所示:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.3</version>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.5.2</version>
</dependency>

3.2、编写生产者代码

在用户微服务的 src/test/java 目录下新建 com.zzc.rocketmq.test包,在包下创建 RocketMQProducer 类,作为 RocketMQ 的生产者,代码如下所示:

public class RocketMQProducer {public static void main(String[] args) throws Exception {// 1.创建消息生产者producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("producerGroup");// 2.指定Nameserver地址producer.setNamesrvAddr("127.0.0.1:9876");// 3.启动producerproducer.start();// 4.构建消息对象Message message = new Message("bingheTopic", "bingheTag", "Hello RocketMQ".getBytes());System.out.println("生产者发出的消息为:" + JSONObject.toJSONString(message));// 5.发送消息并接收结果SendResult sendResult = producer.send(message);System.out.println("生产者收到的发送结果信息为:" + JSONObject.toJSONString(sendResult));// 6.关闭生产者producer.shutdown();}
}

3.3、编写消费者代码

com.zzc.rocketmq.test 包下新建 RocketMQConsumer 类,作为 RocketMQ 的消费者,代码如下所示:

public class RocketMQConsumer {public static void main(String[] args) throws Exception {// 1.创建消息消费者 consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");// 2.指定Nameserver地址consumer.setNamesrvAddr("127.0.0.1:9876");// 3.订阅 testTopic主题consumer.subscribe("testTopic", "*");// 4.设置消息监听,当收到消息时 RocketMQ 会回调消息监听consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {// 打印消息消费者收到的RocketMQ消息System.out.println("消费者收到的消息为:" + list);// 返回消息消费成功的标识return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5.启动消费者System.out.println("消费者启动成功");consumer.start();}
}

3.4、测试消息的生产与消费

1、为了便于观察,这里我们先启动消费者程序 RocketMQConsumer,启动 RocketMQConsumer后会在 IDEA 的控制台打印如下信息:

在这里插入图片描述

2、运行生产者程序 RocketMQProducer,运行后 RocketMQProducer 程序控制台会输出如下信息:

在这里插入图片描述

说明生产者程序 RocketMQProducer 成功将消息发送到 RocketMQ

3、接下来,再看下消费者程序 RocketMQConsumer 的控制台,如下所示:

在这里插入图片描述

说明生产者发送到 RocketMQ 的消息,被消费者成功消费到了

4、集成 RocketMQ

在项目中模拟一个用户成功下单后,为用户发送通知,通知用户下单成功的逻辑。

具体的流程:

下单成功后将订单的信息发送到 RocketMQ,然后用户微服务订阅 RocketMQ 的消息,接收到消息后进行打印

  • 用户微服务:消费者
  • 订单微服务:生产者

4.1、用户微服务集成 RocketMQ

1、在用户微服务 shop-user 导入了 RocketMQ 的依赖

2、在用户微服务 shop-userapplication.yml 文件中添加如下 RocketMQ 的配置

rocketmq:name-server: 127.0.0.1:9876

3、在用户微服务 shop-user 中创建 com.zzc.user.mq 包,在包下创建 RocketConsumeListener,实现 org.apache.rocketmq.spring.core.RocketMQListener 接口,具体代码如下所示:

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "user-group", topic = "order-topic")
public class RocketConsumeListener implements RocketMQListener<Order> {@Overridepublic void onMessage(Order order) {log.info("用户微服务收到了订单信息:{}", JSONObject.toJSONString(order));}
}

@RocketMQMessageListener 注解,表示当前类是一个 RocketMQ 的消费者,在@RocketMQMessageListener 注解中配置了消费者组为 user-group,主题为 order-topic

4.2、订单微服务整合 RocketMQ

1、在订单微服务 shop-orderpom.xml 文件中添加 RocketMQ 的依赖

2、在订单微服务 shop-orderapplication.yml 文件中添加如下配置:

rocketmq:name-server: 127.0.0.1:9876producer:group: order-group

3、修改 OrderServiceImpl

@Slf4j
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {// ...@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Override@Transactional(rollbackFor = Exception.class)public void saveOrder(OrderParamVo orderParamVo) {// ...log.info("库存扣减成功");// 发送消息rocketMQTemplate.convertAndSend("order-topic", order);}
}

4.3、测试集成的 RocketMQ

1、分别启动 Nacos,Sentinel,ZipKin 和 RocketMQ

2、分别启动用户微服务、商品微服务、订单微服务和网关服务

3、在浏览器中输入localhost:10001/server-order/order/submit_order?userId=1001&productId=1001&count=1

4、查看用户微服务 shop-user 的控制台,发现会输出订单的信息,如下所示:

在这里插入图片描述


http://www.ppmy.cn/news/1520816.html

相关文章

day-49 使数组中所有元素相等的最小操作数

思路 第一个数和最后一个数要变为一致&#xff0c;需要操作n-1次&#xff0c;然后第二个数和倒数第二个数要操作n-3次 解题过程 以此类推即可得出答案 Code class Solution {public int minOperations(int n) {int ans0;int t(n-1);while(t>0){anst;t-2;}return ans;} }作…

String核心设计模式——建造者模式

目录 建造者模式 优点 缺点 使用场景 结构 步骤 1 Item.java Packing.java 步骤 2 Wrapper.java Bottle.java 步骤 3 Burger.java ColdDrink.java 步骤 4 VegBurger.java ChickenBurger.java Coke.java Pepsi.java 步骤 5 Meal.java 步骤 6 MealBuilder…

Proteus 仿真设计:开启电子工程创新之门

摘要&#xff1a; 本文详细介绍了 Proteus 仿真软件在电子工程领域的广泛应用。从 Proteus 的功能特点、安装与使用方法入手&#xff0c;深入探讨了其在电路设计、单片机系统仿真、PCB 设计等方面的强大优势。通过具体的案例分析&#xff0c;展示了如何利用 Proteus 进行高效的…

CSP-CCF ★201512-2 消除类游戏★

一、问题描述 问题描述 消除类游戏是深受大众欢迎的一种游戏&#xff0c;游戏在一个包含有n行m列的游戏棋盘上进行&#xff0c;棋盘的每一行每一列的方格上放着一个有颜色的棋子&#xff0c;当一行或一列上有连续三个或更多的相同颜色的棋子时&#xff0c;这些棋子都被消除。当…

Spring的核心设计模式——工厂模式

目录 工厂模式 工厂模式的类型 优点 缺点 使用场景 结构 实现 步骤 1 步骤 2 步骤 3 步骤 4 步骤 5 工厂模式 工厂模式&#xff08;Factory Pattern&#xff09;是 Java 中最常用的设计模式之一&#xff0c;它提供了一种创建对象的方式&#xff0c;使得创建对象的…

Scratch教师节:给老师的一封信

小虎鲸Scratch资源站-免费Scratch作品源码,素材,教程分享平台! 【Scratch教师节特别献礼】—— 给老师的一封信&#xff1a;编程之光&#xff0c;照亮梦想之路 在这个金秋送爽、硕果累累的季节里&#xff0c;我们迎来了一个特别而温馨的日子——教师节。在这个充满感激与敬意的…

无人机纪录片航拍认知

写在前面 博文内容为纪录片航拍简单认知&#xff1a;纪录片 航拍镜头&#xff0c;航拍流程&#xff0c;航拍环境条件注意事项介绍航拍学习书籍推荐《无人机商业航拍教程》读书笔记整理&#xff0c;适合小白认知理解不足小伙伴帮忙指正 &#x1f603;,生活加油 99%的焦虑都来自于…

使用模板:封装栈、队列

栈 #include <iostream>using namespace std;//封装栈 template<typename T> class myStack { private:T *data;int top-1; //记录栈顶元素下标int size; //栈的大小 public:myStack():size(10){data new T[size];top-1;}myStack(int size){data…

Axure RP实战:打造高效文字点选验证码

Axure RP实战&#xff1a;打造高效文字点选验证码 前言 在数字时代&#xff0c;网络安全和用户体验是设计在线表单时的两大关键考量。 验证码作为一种验证用户身份的技术&#xff0c;已经从简单的数字和字母组合&#xff0c;发展到了更为复杂和用户友好的形式。 今天&#…

实用的4大网站建设模板资源网站

WP模板牛 (wpniu.com ) WP模板牛是一个综合性的WordPress建站模板分享网站&#xff0c;提供中文WP模板、外贸WP模板、免费WP模板等100多个WordPress模板。使用这些模板可以快速搭建起属于自己的WordPress网站。 模板之家 (mymoban.com ) 模板之家是一个提供各种网站模板资源…

设计者模式之访问者模式

1.访问者模式概述 允许你在不改变对象结构中的元素类的前提下&#xff0c;向对象结构中的元素增加新的行为。这种模式将数据结构与作用于结构上的操作分离&#xff0c;从而使得我们可以独立地改变数据结构和操作。 2.访问者模式的组成部分 1&#xff09;Visitor&#xff08;访…

【C++】STL—vector的使用

目录 前言vector的常见构造迭代器的使用vector空间增长问题vector的增删查改尾插和尾删findinsert和erase 前言 vector是表示可变大小数组的序列容器。它就像数组一样&#xff0c;采用连续的存储空间来存储元素&#xff0c;且它的大小可以动态改变。并且vector在访问元素时也非…

【护网相关知识】

文章目录 一、什么是防火墙1.WAF2.IDS入侵检测系统3.IPS入侵防御系统4.安全工具5.OSI参考模型6.常见的网络攻击方式7.OWSPTOP10常见漏洞及基本原理8.常见漏洞及其基本原理9.流量分析10.域名系统11.渗透测试报告编写及安全事件处置 一、什么是防火墙 二代防火墙 三代防火墙部署…

十五章 为xxl-job这个项目创建一个 helm

为xxl-job这个项目创建一个 helm 1. 创建 Helm Chart 在当前目录下创建一个名为 xxl-job-admin 的 Helm Chart&#xff1a; helm create xxl-job-admin这会生成一个包含 Chart.yaml、values.yaml、templates 等文件夹的目录。 2. 自定义 Helm Chart 将之前提供的 values.y…

橘子学ES实战操作之管道类型Ingest pipelines的基本使用

简介 我们在使用ES的时候&#xff0c;经常的用法就是把其他数据源比如Mysql的数据灌到ES中。 借用ES的一些功能来提供数据的全文检索以及聚合分析之类的功能。 在这个灌数据的过程中&#xff0c;我们经常会对数据做一些治理&#xff0c;类似ETL的能力。然后把治理后的数据写入…

查看 mysql 密码过期策略,设置永不过期

SELECT user, host, password_expired, password_lifetime FROM mysql.user WHERE user username;如果 password_expired 为 Y&#xff0c;则密码已过期。字段 password_lifetime 存储密码的寿命&#xff0c;以天为单位&#xff0c;这个可以用来作为是否到期的依据。 修改密码…

【APP自动化】Appium 环境搭建

1 基础环境 安装 node.js (1) 安装node.js 安装的是10版本&#xff0c;node-v10.16.0-x64&#xff0c;node.js安装比较简单&#xff0c;直接采用默认选项即可&#xff0c;路径的话&#xff0c;可以自己更改下。 (2) 添加Path环境变量 (3) 验证node.js是否安装成功 可以在CMD…

力扣122-买卖股票的最佳时机 II(java详细题解)

题目链接&#xff1a;122. 买卖股票的最佳时机 II - 力扣&#xff08;LeetCode&#xff09; 前情提要&#xff1a; 因为本人最近都来刷贪心类的题目所以该题就默认用贪心方法来做。 贪心方法&#xff1a;局部最优推出全局最优。 如果一个题你觉得可以用局部最优推出全局最优…

数据库课程设计mysql

1. 简介 1.1. 概述 MySQL是一个开源的关系型数据库管理系统(RDBMS),它使用了Structured Query Language(SQL)来操作和查询数据。MySQL由瑞典MySQL AB公司开发,后来被Sun微系统收购,最后由甲骨文公司接管。MySQL支持多种平台,如Windows、Linux、Unix等,并且在Web应用…

pikachu文件包含漏洞靶场

漏洞基础 程序在引用文件的时&#xff0c;引用的文件名存在用户可控的情况&#xff0c;传入的文件名没有经过合理的校验或校验不严&#xff0c;从而操作了预想之外的文件&#xff0c;就有可能导致文件泄漏和恶意的代码注入&#xff1b; 本地文件包含 指通过相对路径/绝对路…