RabbitMQ是什么?RabbitMQ简介

embedded/2024/10/21 10:06:32/

一:技术背景

        假如我们有一个支付服务,支付服务的业务逻辑是:首先支付扣减余额,更新支付单状态,更新订单状态,发短信,给这个用户增加积分。在这个场景下,如果我们使用同步调用通信,那么调用支付这个接口是不是得等到给这个用户增加积分的业务执行完后,这个调用链条依次返回到支付接口时,支付接口的业务才算完成,可见这个情况明显很不合理。首先支付接口等待时间太长了,我支付业务的核心逻辑做完后,还要兼顾着非核心的业务,那如果有一个非核心业务down掉了,或者因为网络问题或者硬件问题或者代码问题使这个接口不能响应或者响应时间很长,那么下面的业务也会做不了,这条线程就出了大问题。堆积在tomcat内,持久占用tomcat的资源,再有用户下单,还是一样的占用tomcat的资源,现在一个服务失败了,业务中远程调用的接口及其的多,依次类推,就会造成服务级联失败,雪崩现象产生。假如我们来做限流,或者熔断,那肯定不行啊,这可是支付业务,非常重要,如果熔断,那么业务不可以,就可能造成数据不一致性等问题,这是非常严重的。

        所以同步通信在有些场景下使不适用的,我们应该把业务的核心逻辑来做同步通信,(也就是,我们必须要拿到业务处理完成返回的响应,从而必须做下一步的操作的这种场景下),在同步通信下,可做限流熔断等服务的保护操作;而非核心业务我们就用异步通信来做。非常常见的秒杀场景,它是一种流量激增的业务,激增的这段时间,qps及其高,如果采用同步调用,后果可想而知,轻则响应时间非常慢,用户体验差,重则服务器直接崩溃。异步调用就相当于把一个很长的链条拆开,留其重要的部分做同步调用,不太重要的做异步调用,就类似于把山峰削开,用来填山谷,让它变得平整。

二:安装部署

官网:https://www.rabbitmq.com/tutorials/tutorial-one-java

基于docker安装(前提你已经在官网下载好了rabbitmq

2-1:加载rabbitmq.tar

docker load -i mq.tar

2-2: 运行rabbitmq

docker run \-e RABBITMQ_DEFAULT_USER=rabbitmq \-e RABBITMQ_DEFAULT_PASS=rabbitmq \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network zzb-network\-d \rabbitmq:3.8-management

三:初识RabbltMQ

整体架构:

  • virtual-host:虚拟主机,起到数据隔离的作用
  • publisher:消息发送者
  • consumer:消息的消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由消息

注意:交换机只能路由消息,无法存储消息;交换机只会路由消息给其绑定的队列,因此队列必须与交换机绑定

2-3 Java简单示例 (基于AMQP协议)

引入amqp-starter依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置MQ地址,在发送者服务和接收者服务的application.yml中添加配置(最好配置在nacos中,方便管理):

spring:rabbitmq:host: 192.168.203.130 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hellomq # 虚拟主机username: zzb # 用户名password: 123456 # 密码

发送者:

@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void send(){//队列名称String queue = "simple.queue";//消息String message = "hello RabbitMQ";//发送消息rabbitTemplate.convertAndSend(queue, message);
}

接受者

// 把接受消息的类的对象放进ioc容器
@Component
@Slf4j
public class Listener {@RabbitListener(queues = "simple.queue")public void receive(String message) {log.info("接收到消息:{}", message);}
}

三:SpringAMQP

3-1:Work queues,任务模型:

        Work queues,就是多个消费者监听同一个队列。

        在实际开发中,并不是在一个服务里编写多个消费者监听同一个队列,而是部署多个实例,形成集群,监听那个队列。

        1:Work queues的特点

        队列中的一条消息只会被一个消费者监听执行,如果有其他的消费者,是拿不到这个消息的;基于这个特点,如果有其他消费者,其他消费者就会监听执行这个队列后面的消息,所以对于一个队列中所有的消息,多消费者是可以更快的处理这些消息,所以加消费者是在高并发场景下一个不错的处理方案。

        我们要知道,部署多个实例,每个机器的性能可能有好有坏,如果有台机器性能比较差一些,那它处理消息的速度肯定比不过其他的消费者,那队列如何分配消息给消费者呢?答案默认是轮询,每个消费者处理一个消息。显然这个方案和实际情况背道而驰,所以我们需要配置,让每一个消费者只能处理一条消息,等当前消息处理完毕后,才能获取下一条消息。这样完美解决了消息积压问题。在application.yaml中配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

        3-2:交换机

        交换机的作用主要是接受发送者发送的消息,并将消息路由到预期绑定的队列,常见的交换机类型有三种:

3-2-1:Fauout:广播rabbitTemplate.convertAndSend(x,x,x)三个参数

每个监听不同队列的消费者都可以接受到来自同一个交换机转发来的消息。

3-2-2:Direct:定向

Direct Exchange会将接受到的消息根据规则路由到指定的queue,因此称为定向路由

  • 每一个queue都会erexchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
3-2-3:Topic:话题

TopicExchange也是基于RoutingKey做消息路由,然是routingKey通常是多个单词的组合,并且以   .  分割,比如                 user.#                  item.#                     #.cn 

BindingKey可以使用通配符:#代表0个或者多个单词,*代表一个单词

        3-3 声明队列交换机

1. 基于Bean声明队列交换机

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队别,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

fanout示例:

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("a.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

direct示例:

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("a.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

可以看到,声明direct交换机与队列绑定关系的代码显得非常臃肿,所以一般我们使用注解来声明

//在监听消息时,指定绑定关系
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "a.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "a.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

3-4 消息转换器

发送消息时,默认的把消息转换是jdk来做的,把对象序列化为字节进行传输,这种方式不推荐

建议采用别的消息转换器,如json序列化

步骤一:publisherconsumer两个服务中都引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

步骤二:配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}


http://www.ppmy.cn/embedded/118097.html

相关文章

【ShuQiHere】深入理解二叉搜索树(Binary Search Tree, BST):结构、操作与代码实现

【ShuQiHere】 &#x1f333; 引言 在数据结构的世界里&#xff0c;二叉搜索树&#xff08;Binary Search Tree, BST&#xff09; 是一种非常重要且常见的结构。它广泛应用于数据库系统、文件系统、网络路由表和搜索引擎中。通过二叉搜索树&#xff0c;我们可以高效地进行查找…

探索 ShellGPT:终端中的 AI 助手

文章目录 探索 ShellGPT&#xff1a;终端中的 AI 助手背景介绍ShellGPT 是什么&#xff1f;如何安装 ShellGPT&#xff1f;简单的库函数使用方法场景应用常见问题及解决方案总结 探索 ShellGPT&#xff1a;终端中的 AI 助手 背景介绍 在当今快速发展的技术领域&#xff0c;命…

Java | Leetcode Java题解之第436题寻找右区间

题目&#xff1a; 题解&#xff1a; class Solution {public int[] findRightInterval(int[][] intervals) {int n intervals.length;int[][] startIntervals new int[n][2];int[][] endIntervals new int[n][2];for (int i 0; i < n; i) {startIntervals[i][0] inter…

JavaEE:探索网络世界的魅力——玩转UDP编程

文章目录 UDPUDP的特点UDP协议端格式校验和前置知识校验和具体是如何工作的? UDP UDP的特点 UDP传输的过程类似于寄信. 无连接: 知道对端的IP和端口号就直接进行传输,不需要建立连接.不可靠: 没有确认机制,没有重传机制,如果因为网络故障导致该段无法到达对方,UDP协议也不会…

UDP Socket聊天室(Java)

UDP聊天室&#xff1a;循环的发送字 通过while循环&#xff0c;文字一直可以发送 dp.getData()是获取DatagramPacket中存储的数据的字节数组。 发送端&#xff1a; package TseUDP;import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.Inet…

气压高度加误差的两种方法(直接添加 vs 换算到气压误差),附MATLAB程序

在已知高度真实值时,如果需要计算此高度下的气压计误差,可考虑本文所述的两种方法 气压高度 气压与高度之间的关系可以用大气压的垂直变化来描述。随着高度的增加,气压通常会下降。这是因为空气的密度在高度增加时减少,导致上方空气柱对下方空气施加的压力减小。 主要关系…

速记篇 |TCP/IP五层模型怎么背,OSI七层模型怎么背?

背景 记忆TCP/IP五层模型和OSI七层模型可以通过理解每一层的功能、作用以及它们之间的逻辑关系来进行。下面分别给出这两个模型的记忆方法和要点&#xff1a; TCP/IP五层模型 TCP/IP五层模型是一个简化的模型&#xff0c;从下到上依次为&#xff1a; 1.物理层&#xff08;Physi…

【IDEA】tomcat中war exploded加载慢

参考:Tomcat部署时war和war exploded区别以及平时踩得坑 参考:Tomcat启动war包卡死 启动慢 idea配置tomcat中war和war exploded的区别 虽然做了以下配置,但是感觉效果不太明显 [2024-09-25 11:47:59,212] 工件 ahb-service:war exploded: 正在部署工件,请稍候… [2024-09-…