SpringCloud学习-实用篇04

news/2024/12/22 8:51:35/

以下内容的代码可见:SpringCloud_learn/day04


1.初始MQ


同步通讯和异步通讯

微服务间通讯有同步和异步两种方式,同步通讯就像打电话需要实时响应,异步通讯就像发邮件不需要马上回复。两种方式各有优劣,比如打电话能立即得到响应,但不能跟多人同时通话,而发送邮件可同时与多个人收发,但往往响应会有延迟

  • 同步调用:

    • Feign调用属于同步方式,虽然调用可以实时得到结果,但存在下面问题:

    在这里插入图片描述

    • 优点和缺点:

      • 时效性较强,可以立即得到结果

      • 耦合度高

      • 性能和吞吐能力下降

      • 有额外的资源消耗

      • 有级联失败问题

  • 异步调用:

    • 常见实现即事件驱动模式。以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,并且调用物流服务从仓库分配响应的库存并准备发货
      • 在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),且该事件带上订单id
      • 订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可
      • 为解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有中间人(Broker)
        • 发布者发布事件到Broker,不关心谁来订阅事件,而订阅者从Broker订阅事件,不关心谁发来的消息
        • 它就像数据总线,所有的服务要接收数据和发送数据都发到这个总线上,该总线就像协议一样让服务间的通讯变得标准和可控

    在这里插入图片描述

    • 优缺点:
      • 吞吐量提升:无需等待订阅者处理完成,响应更快速(支付服务发布后无需等待其他服务,可以去发布其他事件)
      • 故障隔离:服务没有直接调用,不存在级联失败问题(其他服务挂了也不会影响支付服务继续工作)
      • 调用间没有阻塞,不会造成无效的资源占用
      • 耦合度极低,每个服务都可以灵活插拔,可替换(其他服务可以通过取消/增加订阅来决定是否继续处理事件)
      • 流量削峰:不管发布事件的流量波动多大都由Broker接收,订阅者可以按照自己的速度去处理事件
      • 架构复杂,业务没有明显的流程线,不好管理
      • 需要依赖于Broker的可靠、安全、性能

    在这里插入图片描述

tips:

  • 大多数情况下对并发没有很大要求,但对时效性要求高,所以使用同步;如果不需要立即知道结果,同时对并发、吞吐量要求较高,且需要解除服务间的耦合关系,则使用异步

什么是MQ

  • MQ:消息队列(MessageQueue),即存放消息的队列,也就是事件驱动架构中的Broker

  • 比较常见的MQ实现:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般
  • 使用场景:

    • 追求可用性:KafkaRocketMQRabbitMQ
  • 追求可靠性:RabbitMQRocketMQ

    • 追求吞吐能力:RocketMQKafka
  • 追求消息低延迟:RabbitMQKafka


2.RabbitMQ快速入门


RabbitMQ概述

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,安装步骤如下(以Ccentos7安装RabbitMQ为例):

  • 在线拉取:
docker pull rabbitmq:3-management
  • 安装:
docker run \-e RABBITMQ_DEFAULT_USER=xxx \-e RABBITMQ_DEFAULT_PASS=xxx \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

RabbitMQ的结构和概念如图所示,其中角色有以下5个:

  • publisher:生产者
  • consumer:消费者
  • exchange:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchangequeue等资源,即逻辑分组

在这里插入图片描述

tips:

  • 消息一旦消费就会从队列删除,因为RabbitMQ没有消息回溯功能

RabbitMQ消息模型

RabbitMQ官方提供了五个不同的示例,对应了不同的消息模型:

  • 基本消息队列(Basic Queue):

在这里插入图片描述

  • 工作消息队列(WorkQueue):

在这里插入图片描述

  • 发布订阅(Publish、Subscribe),又根据交换机类型分为以下三种:

    • Fanout Exchange:广播

    在这里插入图片描述

    • Direct Exchange:路由

    在这里插入图片描述

    • Topic Exchange:主题

    在这里插入图片描述


入门案例

基本消息队列的消息发送流程:具体代码见PublisherTest

  • 建立connection
  • 创建channel
  • 利用channel声明队列
  • 利用channel向队列发送消息

基本消息队列的消息接收流程:具体代码见ConsumerTest

  • 建立connection
  • 创建channel
  • 利用channel声明队列
  • 定义consumer的消费行为handleDelivery()
  • 利用channel将消费者与队列绑定

3.SpringAMQP

  • SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便
  • SpringAMQP提供了三个功能:
    • 自动声明队列、交换机及其绑定关系
    • 基于注解的监听器模式,异步接收消息
    • 封装了RabbitTemplate工具,用于发送消息

tips:

  • Advanced Message Queuing Protocol是用于在应用程序之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求
  • SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现

Basic Queue-简单队列模型

  • 在父工程mq-demo中引入依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 消息发送:
# 添加MQ的连接信息
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: xxxx  # rabbitMQ的ip地址port: 5672  # 端口username: xxxxpassword: xxxxvirtual-host: /
// 实现消息发送
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName, message);}
}
  • 消息接收:完成下面配置后运行consumer服务即可
# 添加MQ的连接信息
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: xxxx  # rabbitMQ的ip地址port: 5672  # 端口username: xxxxpassword: xxxxvirtual-host: /
// consumer服务中新建一个类,编写消费逻辑
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")  // 声明监听的队列public void listenSimpleQueue(String msg) {System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");}
}

Work Queue-工作队列

Work Queues也被称为任务模型(Task Queues),就是让多个消费者绑定到一个队列,共同消费队列中的消息。它可提高消息处理速度,避免队列消息堆积

  • 消息发送:循环发送,模拟大量消息堆积现象
// 一共发送50条消息
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
  • 消息接收:模拟多个消费者绑定同一个队列
# 如果不加下面配置,消费者1很快完成了自己的25条消息,而消费者2却在缓慢的处理自己的25条消息,即消息是平均分配给每个消费者,没有考虑到消费者的处理能力,这是不合理的
spring:rabbitmq:listener:simple:prefetch: 1  # 每次只能获取一条消息,处理完成才能获取下一个消息(消费慢的就不会和消费快的预取一样的消息数)
// 最终结果是消费者1处理了40条,消费者2处理了10条
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

Publish/Subscribe-发布/订阅

发布订阅模式与之前的区别是允许将同一消息发送给多个消费者,实现方式是加入了交换机(exchange)。常见exchange类型包括:

  • Publisher:要发送消息的程序,但不再发送到队列中,而是发给交换机
  • Exchange:一方面接收生产者发送的消息,另一方面知道如何处理消息,例如递交给某个特别队列、递交给所有队列或是将消息丢弃。到底如何操作取决于Exchange的类型
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
  • Consumer:订阅队列,与以前一样
  • Queue:接收消息、缓存消息,与以前一样

tips:

  • Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,则消息会丢失

Fanout

在广播模式下,消息发送注意事项如下:

  • 可有多个队列
  • 每个队列都要绑定到Exchange
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  • 交换机把消息发送给绑定过的所有队列,订阅队列的消费者都能拿到消息

具体实现过程如下:

  • 声明队列和交换机:
@Configuration
public class FanoutConfig {// 声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}// 声明第一个队列fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}// 绑定队列1到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 声明第二个队列fanout.queue2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}// 绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
  • 消息发送:
@Test
public void testSendFanoutExchange() {// 交换机名称String exchangeName = "itcast.fanout";// 消息String message = "hello, every one!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "", message);
}
  • 消息接收:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
}

Direct

Direct Exchange会将接收到的消息根据规则路由到指定的Queue(因此称为路由模式):

  • 每一个QueueExchange不能是任意绑定,需要设置一个BindingKey
  • 消息的发送方向Exchange发送消息时必须指定消息的RoutingKey
  • ``Exchange不再把消息交给每一个绑定的队列,而是将消息路由到BindingKey与消息RoutingKey`一致的队列

具体实现过程如下:

  • 基于注解声明队列和交换机:基于@Bean的方式在FanoutConfig中声明比较麻烦,可以采用注解在SpringRabbitListener中声明
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
  • 消息发送:
@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "hello, red!";// 发送消息// 此时两个消费者都会接收到各自队列direct.queue1和direct.queue2的消息rabbitTemplate.convertAndSend(exchangeName, "red", message);  
}

tips:

  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Topic

Topic ExchangeDirect Exchange相比可根据RoutingKey把消息路由到不同的队列,只不过Topic Exchange可让队列在绑定Routing key 的时候使用通配符:

  • Routingkey 一般由一或多个单词组成,多个单词之间以.分割

  • 通配符规则:

    #:匹配一个或多个词,比如item.#能匹配item.spu.insertitem.spu

    *:恰好匹配一个词,比如item.*只能匹配item.spu

具体实现过程如下:

  • 基于注解声明队列和交换机:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),// 凡是以china.开头的routing key都会被匹配到key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),// 凡是以.news结尾的routing key都会被匹配key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
  • 消息发送:
@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "今天天气不错,我的心情好极了!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}

消息转换器

Spring会把发送的消息序列化为字节发送给MQ,接收消息的时候还会把字节反序列化为Java对象。但默认情况下采用的序列化方式是JDK序列化,该方式存在以下问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

因此可以使用JSON方式来做序列化和反序列化,具体步骤如下:

  • publisherconsumer两个服务中都引入依赖:在父工程中引入即可
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
  • publisherconsumer两个服务的启动类中声明MessageConverter
@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}
  • 消息发送:
@Test
public void testSendMap() throws InterruptedException {Map<String,Object> msg = new HashMap<>();msg.put("name", "Jack");msg.put("age", 21);rabbitTemplate.convertAndSend("simple.queue","", msg);
}
  • 消息接收:
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg){System.out.println("接收到object.queue的消息:" + msg);
}

tips:

  • 注意发送方与接收方必须使用相同的MessageConverter

参考

黑马程序员SpringCloud框架P61-P76

RabbitMQ官方文档

SpringAmqp官方地址



http://www.ppmy.cn/news/75271.html

相关文章

Kali-linux分析密码

在实现密码破解之前&#xff0c;介绍一下如何分析密码。分析密码的目的是&#xff0c;通过从目标系统、组织中收集信息来获得一个较小的密码字典。本节将介绍使用Ettercap工具或MSFCONSOLE来分析密码。 8.2.1 Ettercap工具 Ettercap是Linux下一个强大的欺骗工具&#xff0c;也…

08 集合框架1

什么是数据结构? 存储数据,组织数据的方法,就是对数据做增删改查的操作 常见的数据结构有哪些?各自的优缺点是什么? 数组:擅长修改 查找操作,不擅长增加 删除操作 链表:有单项链表和双向链表,擅长增加和删除操作,不擅长修改和查找的操作 队列:擅长操作头和尾,先进先出,…

LeetCode94. 二叉树的中序遍历(递归与非递归)

写在前面&#xff1a; 题目链接&#xff1a;添加链接描述 编程语言&#xff1a;c 题目难度&#xff1a;简单 一、题目描述 给定一个二叉树的根节点 root &#xff0c;返回 它的 中序 遍历 。 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,3,2] 示例 2&#xff1a;…

使用Amazon EC2实例部署三个项目

在部署这三个项目时&#xff0c;以下是一种可能的思路&#xff1a; 1. **配置服务器环境&#xff1a;**确保你的服务器已经安装了适当的操作系统&#xff08;例如Linux&#xff09;和所需的软件&#xff08;如Python、Node.js等&#xff09;。 2. **设置域名和端口&#xff1a;…

【图论】想越狱的小衫

题目描述 这次小杉来到了经典美剧《越狱》的场景里……他被抓起来了&#xff08;-.-干嘛幻想这么郁闷的场景……&#xff09;。 小杉身为新一代的Scofield&#xff0c;在挖了半个月之后终于挖通牢房里的地道。 在地道里&#xff0c;无数的管道路线困惑了他。 小杉看了看自己…

游戏洞察丨自来水还是井水,后流量时代的私域挑战

流量生意本质上是买卖用户浏览时间的生意&#xff0c;如果用户增长到顶&#xff0c;那就意味着供给到顶。对比 2021 年&#xff0c;2022 年的游戏出海在谷歌和 Facebook 上投入的广告成本几乎翻了一倍。新晋“渠道王者”TikTok 逐渐走进大家的视野。该现象背后的原因在于&#…

MySQL数据库最常见的6种故障的排除方法

MySQL数据库最常见的6中故障的排除方法 1.MySQL无法启动 2.MySQL连接不上 3.MySQL打开文件失败 4.MySQL挂起&#xff08;hung&#xff09; 5.MySQL崩溃&#xff08;crash&#xff09; 6.忘记用户密码 1.MySQL无法启动 1.无法访问系统资源 2.参数设置错误 无法访问系统…

ffmpeg命令行工具源码之结构体分析1-命令行参数(未完结,持续更新)

前言 ffmpeg作为多媒体文件转换工具&#xff0c;至少需要有一个要转换的输入文件信息&#xff08;不仅仅是普通文件&#xff0c;还可以是摄像头设备&#xff0c;网络流等&#xff09;&#xff0c;和通常至少需要一个输出格式的文件&#xff08;输出文件不仅仅指普通的文件&…