RabbitMQ 见解一

devtools/2024/9/19 19:18:50/ 标签: java-rabbitmq, rabbitmq, java

 1.初识MQ

1.1.同步和异步通讯

微服务间通讯有同步和异步两种方式:

同步通讯:就像打电话,需要实时响应。

异步通讯:就像发邮件,不需要马上回复。

两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。

1.1.1.同步通讯

Feign调用就属于同步方式,虽然调用可以实时得到结果,但存在下面的问题:

同步调用的优点:

- 时效性较强,可以立即得到结果

同步调用的问题:

- 耦合度高

- 性能和吞吐能力下降

- 有额外的资源消耗

- 有级联失败问题

1.1.2.异步通讯

异步调用则可以避免上述问题:

        我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响 应的库存并准备发货。

在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件 (event),事件中带上订单id。 订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务 即可。

为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布 者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。

Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线 就像协议一样,让服务间的通讯变得标准和可控。

好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速
  • 故障隔离:服务没有直接调用,不存在级联失败问题
  • 调用间没有阻塞,不会造成无效的资源占用
  • 耦合度极低,每个服务都可以灵活插拔,可替换
  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事 件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理
  • 需要依赖于Broker的可靠、安全、性能

1.2.技术对比

MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的 Broker。 比较常见的MQ实现: ActiveMQ RabbitMQ RocketMQ Kafka

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP, SMTP,STOMPOpenWire,STOMP, REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

2.了解MQ

2.1 MQ的基本结构:

RabbitMQ中的一些角色:

  • publisher:生产者
  • consumer:消费者
  • exchange个:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

2.2.RabbitMQ消息模型

2.3 简单队列模式

简单队列模式的模型图:

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

  • publisher:消息发布者,将消息发送到队列
  • queue queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息

2.4 基本消息 队列建立流程

基本消息队列的消息发送流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息

基本消息队列的消息接收流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定

3.SpringAMQP

       SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来 非常方便。 SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

3.1SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

3.2利用SpringAMQP实现基础消息队列功能

SpringAMQP如何发送消息

  1. 引入amqp的starter依赖
  2. 配置RabbitMQ地址
  3. 利用RabbitTemplate的convertAndSend方法

流程如下:

  1. 在父工程中引入spring-amqp的依赖
  2. java"><!--AMQP依赖,包含RabbitMQ-->
    <dependency>  
    <groupId>org.springframework.boot</groupId>   
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  3. 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
  4. 在consumer服务中编写消费逻辑,绑定simple.queue这个队列

SpringAMQP如何接收消息?

  1. 引入amqp的starter依赖
  2. 配置RabbitMQ地址
  3. 定义类,添加@Component注解
  4. 类中声明方法,添加@RabbitListener注解,方法参数就时消息
  5. 注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

3.3Work Queue 工作队列

Work queue,工作队列,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量
java">spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

3.4 发布( Publish )、订阅( Subscribe )模式

发布订阅的模型如图:

在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机) Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息, 例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange 的类型。
  • Exchange有以下3种类型:
  1. Fanout:广播,将消息交给所有绑定到交换机的队列
  2. Direct:定向,把消息交给符合指定routing key 的队列
  3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  • Queue:消息队列也与以前一样,接收消息、缓存消息。

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑 定,或者没有符合路由规则的队列,那么消息会丢失!

3.4.1 Fanout:广播

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

3.4.1.1声明队列和交换机 Spring提供了一个接口Exchange,来表示所有不同类型的交换机:

3.4.1.2 FanoutExchange要点

交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding
3.4.2 Direct

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息 被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey (路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey 。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列 的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息

案例需求如下: 1. 利用@RabbitListener声明Exchange、Queue、RoutingKey 2. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2 3. 在publisher中编写测试方法,向itcast. direct发送消息

基于注解声明队列和交换机:

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

java">@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

 消息发送:在publisher服务的SpringAmqpTest类中添加测试方法:

java">@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

3.4.2.1Direct交换机要点

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

基于@RabbitListener注解声明队列和交换机有哪些常见注解?

  • @Queue
  • @Exchange

3.4.3 Topic

3.4.3.1.说明

Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不 过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

# :匹配一个或多个词

* :匹配不多不少恰好1个词

举例: item.# :能够匹配 item.spu.insert 或者 item.spu

item.* :只能匹配 item.spu

注:

- Queue1:绑定的是`china.#` ,因此凡是以 `china.`开头的`routing key` 都会被匹配到。包括china.news和china.weather

- Queue2:绑定的是`#.news` ,因此凡是以 `.news`结尾的 `routing key` 都会被匹配。包括china.news和japan.news

案例需求: 实现思路如下: 1. 并利用@RabbitListener声明Exchange、Queue、RoutingKey 2. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2 3. 在publisher中编写测试方法,向itcast. topic发送消息

3.4.3.1消息发送 ,在publisher服务的SpringAmqpTest类中添加测试方法:

java">/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "国足进入十六强!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

3.4.3.2消息接收,在consumer服务的SpringRabbitListener中添加方法:

java">@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

3.4.4 消息转换器

Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题: 1.数据体积过大 2.有安全漏洞 3.可读性差

3.4.4.1 配置JSON转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式 来做序列化和反序列化。

在publisher和consumer两个服务中都引入依赖:

java"><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

配置消息转换器。

在启动类中添加一个Bean即可:

java">@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}

http://www.ppmy.cn/devtools/52352.html

相关文章

MongoDB 自动增长

MongoDB 自动增长 MongoDB 是一个流行的 NoSQL 数据库&#xff0c;以其灵活的数据模型和强大的查询语言而闻名。在关系型数据库中&#xff0c;我们通常使用自动增长的整数作为主键&#xff0c;以确保唯一性。然而&#xff0c;MongoDB 的文档模型与此略有不同。MongoDB 使用 _i…

Html_Css问答集(3)

17、label for与label进行包裹两种形式有什么区别&#xff1f; label for形式适用于只有一个表单控件需要与label 标签关联的情况。例如&#xff0c;在下面的HTML代码中&#xff0c;label标签的for属性与input标签的id属性关联&#xff0c;从而将label标签与input标签关联起来。…

[absl_py][python]absl_py所有whl文件下载地址汇总

absl_py是Google开发并维护的一个Python软件包&#xff0c;它是C Abseil库的Python版本。该库旨在提供一系列高质量的、跨平台的实用工具&#xff0c;帮助开发者构建大规模软件项目。以下是关于absl_py的详细介绍&#xff1a; 功能概述&#xff1a; 字符串处理&#xff1a;abs…

【MySQL】分库分表

https://www.bilibili.com/video/BV1Kr4y1i7ru/?p163​ https://blog.csdn.net/qq_47959003/article/details/126058710 随着互联网及移动互联网的发展&#xff0c;应用系统的数据量也是成指数式增长&#xff0c;若采用单数据库进行数据存储&#xff0c;存在以下性能瓶颈&…

Elasticsearch-ES查询单字段去重

ES 语句 整体数据 GET wkl_test/_search {"query": {"match_all": {}} }结果&#xff1a; {"took" : 123,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0…

官方解决 maven 上传jar包到中央仓库401

目录 目录背景&#xff1a;报错现象一句话解决一句话原因问题描述前车之鉴&#xff1a;尝试无果的记录尝试1&#xff1a;询问 GPT 未果尝试2&#xff1a;搜索引擎未果 正确原因分析&#xff1a;佐证 解决方案&#xff08;三步完成&#xff09;&#xff1a;1. 打开 sonatype2. 生…

Java -jar 运行 报 MalformedInputException: Input length = 1

Intellij IDEA 中运行正常&#xff0c;linux 运行正常&#xff0c; cmd 下运行 报&#xff1a;MalformedInputException: Input length 1 微服务项目&#xff0c;在Nacos中做了配置&#xff0c;在引用 Nacos中配置时&#xff0c;编码问题&#xff0c;导致的错误 org.yaml.sna…

Linux_理解程序地址空间和页表

目录 1、进程地址空间示意图 2、验证进程地址空间的结构 3、验证进程地址空间是虚拟地址 4、页表-虚拟地址与物理地址 5、什么是进程地址空间 6、进程地址空间和页表的存在意义 6.1 原因一&#xff08;效率性&#xff09; 6.2 原因二&#xff08;安全性&#xff09; …

图书管理系统代码(Java)

1、运行演示 QQ2024528-205028-HD 详细讲解在这篇博客&#xff1a;JavaSE&#xff1a;图书管理系统-CSDN博客 2、所建的包 3、Java代码 3.1 book包 3.1.1 Book类代码 package book;/*** Created with IntelliJ IDEA.* Description:* User: dings* Date: 2024-05-13* Time:…

web前端培训生:深入探索与技能进阶之路

web前端培训生&#xff1a;深入探索与技能进阶之路 作为web前端培训生&#xff0c;你正站在一个充满挑战与机遇的起点。在这个日新月异的行业中&#xff0c;掌握扎实的前端技能和不断学习的态度将是你脱颖而出的关键。本文将分四个方面、五个方面、六个方面和七个方面&#xf…

【Python入门与进阶】Python中从字符串中获取子串

在Python中&#xff0c;可以使用多种方法从字符串中获取子字符串。 以下是一些常用的方法和示例&#xff1a; 1. 使用切片&#xff08;Slicing&#xff09; 切片是获取子字符串最常用的方法。切片语法为str[start:end:step]&#xff0c;其中start是起始索引&#xff0c;end是…

C++中的代理模式

目录 代理模式&#xff08;Proxy Pattern&#xff09; 实际应用 虚拟代理 远程代理 访问控制代理 总结 代理模式&#xff08;Proxy Pattern&#xff09; 代理模式是一种结构型设计模式&#xff0c;它提供了一个代理对象&#xff0c;充当了被访问对象的接口&#xff0c;使…

【Ubuntu通用压力测试】Ubuntu16.04 CPU压力测试

​ 使用 stress 对CPU进行压力测试 我也是一个ubuntu初学者&#xff0c;分享是Linux的优良美德。写的不好请大佬不要喷&#xff0c;多谢支持。 sudo apt-get update 日常先更新再安装东西不容易出错 sudo apt-get upgrade -y 继续升级一波 sudo apt-get install -y linux-to…

QT TCP服务器和客户端示例程序

下面是一个简单的 Qt TCP 服务器和客户端示例&#xff0c;演示了如何使用 vSetDriver、vSetListener 和 vTcpServerStart 函数。假设 vSetDriver 和 vSetListener 是你定义的自定义函数。 TCP 服务器部分 tcpserver.h #ifndef TCPSERVER_H #define TCPSERVER_H#include <QT…

vue脚手架 笔记07

01 手动搭建vuex vuex叫做全局状态管理仓库 vuex的搭建: 1.下载vuex3.x版本 npm i vuex3.0.0 --save 2. 在src/store下面创建index.js 3.引入vue和vuex import Vue from vue import Vuex from vuex Vue.use(Vuex) 4.export default new Vuex.Store({ 五大属性 state mutatio…

apache poi excel export

apache poi excel export 水一篇 凑个数&#xff08;啊 水文章好羞耻啊&#xff01;请原谅我私密马赛&#xff01;&#xff09; 1 ExcelColumn Data Builder ToString NoArgsConstructor AllArgsConstructor public class ExcelColumn implements Serializable {Serialprivat…

齐普夫定律在循环神经网络中的语言模型的应用

目录 齐普夫定律解释公式解释图与公式的关系代码与图的分析结论 使用对数表达方式的原因1. 线性化非线性关系2. 方便数据可视化和分析3. 降低数值范围4. 方便参数估计公式详细解释结论 来自&#xff1a;https://zh-v2.d2l.ai/chapter_recurrent-neural-networks/language-model…

JAVA 策略模式使用spring托管其实现类

上一篇的“JAVA 替代SWITCH 枚举值 CASE 的 策略模式 ” 中使用new关键字实例化一个类时&#xff0c;这个类的实例不由Spring容器管理&#xff0c;因此Autowired注解在这个实例中不会生效。Spring的依赖注入功能仅对其管理的bean有效&#xff0c;即那些通过Spring容器创建和管理…

安卓实现圆形按钮轮廓以及解决无法更改按钮颜色的问题

1.实现按钮轮廓 在drawable文件新建xml文件 <shape xmlns:android"http://schemas.android.com/apk/res/android"<!--实现圆形-->android:shape"oval"><!--指定内部的填充色--><solid android:color"#FFFFFF"/><!-…

Mysql知识点

1. 二叉树 1.1 什么是B-树 B树又名平衡多路查找树&#xff08;也把B树称为B-树&#xff09;查找路径不只两个&#xff0c;不同于常见的二叉树&#xff0c;它是一种多叉树&#xff0c;我们常见的使用场景一般是在数据库索引技术里&#xff0c;大量使用者B树和B树的数据结构。 …