1. 为什么需要消息队列?
1.1 异步处理
我们先来思考如何设计一个秒杀系统?
秒杀系统需要解决的核心问题是:如何利用有限的服务器资源,尽可能多地处理短时间内的海量请求。
一个秒杀请求包含了很多步骤,比如:
- 风险控制
- 库存锁定
- 生成订单
- 短信通知
- 更新统计数据
如果没有任何优化,正常的处理流程是:App 将请求发送给网关,依次调用上述 5 个流程,然后将结果返回给 App。
但是对于系统来说,能否秒杀成功,实际上只取决于 风险控制 和 库存锁定 这两个步骤。只要用户的秒杀请求通过风险控制,并且在服务端完成库存锁定,那就可以给用户返回秒杀结果了,至于后续的几个步骤,并不需要在这一次请求中处理完成,可以在完成前两个步骤后,直接返回给用户秒杀结果,然后把请求的数据放入消息队列中,由消息队列异步化地处理后续步骤。
通过使用消息队列,将五个步骤减少为两个步骤,这样做不仅响应速度快,并且在秒杀期间,可以将更多的服务器资源用来处理秒杀请求。
这就是消息队列的使用场景之一:实现服务的异步处理,这样做的好处是:
- 可以更快地返回结果
- 减少等待,自然实现了步骤之间的并发,提升系统总体性能
1.2 流量控制
那如何避免过多的请求压垮我们的秒杀系统呢?
设计思路是:使用消息队列隔离网关和后端服务,以达到流量控制和保护后端服务的目的。
整个秒杀流程变为:
- 网关在收到请求后,将请求放入请求消息队列
- 后端服务从请求消息队列中获取 App 请求,完成后续秒杀处理过程,然后返回结果
这样做,后端服务可以按照自己的最大处理能力,从消息队列中消费请求进行处理
更简单一点的,如果可以预估出秒杀服务的处理能力,可以用消息队列实现一个令牌桶,单位时间内只发放固定数量的令牌到令牌桶中,规定服务在处理请求之前必须先从令牌桶中拿出一个令牌,如果令牌桶中没有令牌,则拒绝请求。这样就保证单位时间内,能处理的请求不超过发放令牌的数量,起到了流量控制的作用。
1.3 服务解耦
核心模块与下游服务使用消息队列进行解耦,无论下游系统如何变化,核心模块都不需要改变
1.4 其他适用场景
- 作为发布/订阅系统实现一个微服务系统间的观察者模式
- 连接流计算任务和数据
- 用于将消息广播给大量接受者
2. 如何选择消息队列?
考量维度:可靠性、可拓展性、可运维性、支持集群、性能、功能、
2.1 RabbitMQ
- 优点:轻量级、容易部署和使用、支持多种客户端开发语言、支持灵活的路由配置
- 缺点:对消息堆积的支持并不好、性能和吞吐量较差、使用 Erlang 语言编写,比较难进行二次开发
2.2 RocketMQ
- 优点:性能好、稳定可靠、有活跃的中文社区、使用 Java 开发,容易进行二次开发、特点响应快
- 缺点:兼容性较差
2.3 Kafka
- 优点:兼容性极好、设计上大量使用了批量和异步的思想,有超高的性能、
- 缺点:由于 “先攒一波再一起处理” 的设计,时延较高,不太适合在线业务场景
2.4 总结
如果说,消息队列并不是你将要构建的主角之一,对消息队列的功能和性能都没有很高的要求,只需要一个开箱即用易于维护的产品,建议使用 RabbitMQ。
如果系统使用消息队列的主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,那 RocketMQ 的低延迟和金融级的稳定性是我们需要的。
如果需要的是处理海量的数据,像收集日志、监控信息或是前端的埋点这类数据,或是应用场景大量使用了大数据、流计算相关的开源产品,那 Kafka 是最适合的消息队列。
3. 不同消息队列的消息模型
3.1 主题与队列的区别?
3.1.1 队列模型
早期的消息队列,就是按照 “队列” 的数据结构来设计的,生产者发消息是入队操作,消费者收消息就是出队动作,服务单存放消息的容器就是 “队列”。
消费者之间是竞争关系,如果想要将一份消息分发给多个消费者,要求每个消费者都能收到全量的消息,单个队列就无法满足,一个比较笨的解决方法就是为每一个消费者创建一个消息队列,让生产者发送多份。
但是这样缺点比较明显:一是一份数据复制多份,浪费资源,二是生产者必须知道有多少个消费者,违背了 “解耦” 的设计初衷。
3.1.2 发布 - 订阅模型
为了解决队列模型的问题,演化出了这种模型。
在发布-订阅模型中,消息的发送者称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。
发布 - 订阅模型与队列模型之间最大的区别就是 一份消息数据能不能被消费多次的问题 。
3.2 RabbitMQ 的消息模型
RabbitMQ 是少数依然坚持使用队列模型的产品之一。在 RabbitMQ 中,Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange ,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。
同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务。
3.3 RocketMQ 的消息模型
RocketMQ 使用的是标准的发布 - 订阅模型。
由于 “请求 - 确认” 机制的存在,在消费端为了保证消息的有序性,某一条消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞,违背了有序性这个原则,也就是说,每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。
为了解决这个问题,RocketMQ 在主题下面增加了队列的概念,每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。
RocketMQ 中订阅者的概念是通过消费组(Consumer Group)来实现的,不同消费组订阅同一主题的时候,每个消费组都会消费一份完整的消息,不同消费组之间消费进度彼此不受影响。
消费组中包含多个消费者,同一组内的消费者之间是竞争关系,每个消费者负责消费组内的一部分消息。
因为消息会被不同消费组进行多次消费,所以消费完的消息并不会被立即删除,而是由 RocketMQ 为每一个消费组在每个队列上维护一个消费位置,这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。
3.4 Kafka 的消息模型
Kafka 的消息模型与 RocketMQ 是完全一致的,唯一的区别是:在 Kafka 中,队列对应的名称是 “分区(Partition)” ,其含义和功能没有任何区别。
4. 如何利用事务消息实现分布式事务?
4.1 分布式事务
首先来看什么是事务,事务就是如果我们需要对若干数据进行更新操作,为了保证这些数据的完整性和一致性,我们希望这些操作 要么都成功,要么都失败 。
一个严格意义上的事务实现,应该具有 4 个属性:原子性、一致性、隔离性、持久性,也就是 ACID 。
大部分的单体关系型数据库都完整的实现了 ACID,但是,对于分布式事务来讲,严格的实现 ACID 这四个特性几乎是不可能的,或者说实现的代价太大,大到我们无法接受。
所以,目前大家所说的分布式事务,更多情况下,是在分布式系统中事务的不完整实现,在实际应用中,比较常见的分布式事务实现有 2PC(二阶段提交)、TCC(Try-Confirm-Cancel)和事务消息,每一种实现都有其特定的使用场景,也有各自的问题,都不是完美的解决方案。
- 事务消息适用的场景主要是那些需要异步更新数据,并且对实时性要求不太高的场景。
4.2 消息队列如何实现分布式事务
Kafka 和 RocketMQ 都提供了事务相关功能。
首先,生产者在消息队列上开启一个事务,然后向消息服务器发送一个 “半消息”,所谓 “半消息” ,就是说该消息内容是完整的,但在事务提交之前,对消费者是不可见的。
半消息发送成功之后,生产者就可以执行本地事务了,根据本地事务执行成功与否决定提供或回滚消息,如果事务执行成功,那就提交事务消息,消费者就可以消费这条半消息了,如果事务执行失败,那就回滚事务,消费者系统就不会收到这条消息了。这样就基本实现了 “要么都成功,要么都失败” 的一致性要求。
其实在这个流程中还有一个问题未解决,那就是提交事务消息的时候也可能会失败,对于这个问题,RocketMQ 和 Kafka 给出了两种不同的解决方案。
Kafka 的解决方案比较简单粗暴,直接抛出异常,由用户决定怎样处理。可以在业务代码中反复重试提交,直到成功,也可以通过恢复生产者的状态进行补偿。
4.3 RocketMQ 的分布式事务实现
在 RocketMQ 中是通过增加了事务反查的机制来解决事务消息提交失败的问题。
如果生产者在提交或回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去生产者上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
5. 如何确保消息不会丢失?
5.1 检测消息丢失的方法
可以利用消息队列的有序性来验证是否有消息丢失。原理很简单,在 Producer 端,给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。
如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的。也就是说,收到的消息的序号必然是上一条消息的序号 + 1,如果检测到不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。
如果是分布式系统中则需要注意,像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。
如果系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。
Consumer 实例的数量最好和分区数量一致,做好 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。
5.2 确保消息可靠传递
一条消息从生产到消费完成这个过程,可以划分为三个阶段
- 生产阶段:消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
- 存储阶段:消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
- 消费阶段:Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。
5.2.1 生产阶段
在生产阶段,消息队列通过最常用的 请求确认机制 ,来保证消息的可靠性传递:当代码调用发送消息时,消息队列的客户端会把消息发送到 Broker ,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,就代表完成了一次正常消息的发送
只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有的消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。
所以,在编写代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。
5.2.2 存储阶段
一般来说,只要 Broker 正常运行,就不会出现消息丢失的问题,当然如果 Broker 出现了故障,仍然是有可能会丢失消息的。
如果对消息的可靠性要求极高,可以通过 配置 Broker 参数 来避免因为宕机而丢失消息。
对于单个节点的 Broker ,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。
如果是由多个节点组成的 Broker 集群,需要将 Broker 配置成:**至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。**这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker ,也不会发生消息丢失。
5.2.3 消费阶段
消费阶段采用和生产阶段类似的 请求确认机制 来保证消息的可靠传递,客户端从 Broker 拉去消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉取消息时还会返回同一条消息,确保消息不会在网络传输中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。
要注意:不要在收到消息后就立即发送消费确认,而是应当在执行完成所有消费业务逻辑之后,再发送消费确认。
6. 如何处理消费过程中的重复消息?
用幂等性解决重复消息问题
什么是幂等性,它本来是一个数学上的概念,如果函数 f(x) 满足 f(f(x)) = f(x) ,则函数满足幂等性。
这个概念被拓展到计算机领域,被用来描述一个操作、方法或者服务,一个幂等操作的特点是 其任意多次执行所产生的影响均与一次执行的影响相同 。
也就是说,一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。所以,对于幂等的方法,不用担心重复执行会对系统造成任何改变。
6.1 利用数据库的唯一约束实现幂等
- 使用数据库的唯一键或者创建唯一约束来保证只能有一条消息被成功消费,后续重复的插入操作会失败
- 或者使用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费
6.2 为更新的数据设置前置条件
- 给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
6.3 记录并检查操作
如果上面两种实现幂等的方法都不能适用,还有一种通用性最强,使用范围最广的实现幂等性方法:记录并检查操作
- 实现思路:在执行数据更新操作之前,先检查一下是否执行过这个更新操作
具体的实现方法为:在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
但是,在分布式系统中,此方法很难实现,问题有:
- 并不容易给消息指定一个全局唯一的 ID
- “检查消费状态,然后更新数据并且设置消费状态”,这三个操作必须作为一组操作,才能真正实现幂等。
7. 消息积压了该如何处理?
7.1 优化性能来避免消息积压
7.1.1 发送端性能优化
如果说,代码发送消息的性能上不去,需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。
对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。
7.1.2 消费端性能优化
使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。如果这种性能倒挂是暂时的,问题不大,如果消费速度一直比生产速度慢,时间长了,整个系统就会出现问题,要么,消息队列的存储被填满无法提供服务,要么消息丢失,这对于整个系统来说都是严重故障。
所以在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。
消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(队列)数量,确保 Consumer 的实例数和分区数量是相等的。
7.2 消息积压了该如何处理
如果系统在某一时刻,突然开始积压消息并且积压持续上涨,可能会是什么原因呢?
能导致积压突然增加,最粗粒度的原因只有两种:要么是发送变快了,要么是消费变慢了。
我们可以通过消息队列里内置的监控功能,去确定是哪种原因,如果是发送变快了,单位时间内发送的消息增多,比如说赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升性能,唯一的办法是通过扩容消费端实例来提升总体的消费能力。
如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。
还有一种不太常见的情况是,通过监控发现,无论是发送消息的速度还是消费消息的速度都和原来没什么变化,这时候就需要检查一下消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。
如果监控到消费变慢了,就检查一下消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下消费进程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。
,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。
还有一种不太常见的情况是,通过监控发现,无论是发送消息的速度还是消费消息的速度都和原来没什么变化,这时候就需要检查一下消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。
如果监控到消费变慢了,就检查一下消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下消费进程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。
《消息队列高手课》读书笔记