前几天面试一个测试开发,问了Kafka的基础性知识,受试者没有回答出来,在项目中介绍有使用Kafka但是对kafka的基础知识掌握的却不牢固不全面,今天列出Kafka经常会问到的10个基础知识,供大家参考学习
一、你会在哪些场景选择使用Kafka?
1)日志信息收集记录
我个人接触的项目中,Kafka使用最多的场景,就是用它与FileBeats和ELK组成典型的日志收集、分析处理以及展示的框架
该图为FileBeats+Kafka+ELK集群架构
Kafka在框架中,作为消息缓冲队列
FileBeats先将数据传递给消息队列,Logstash server(二级Logstash)拉取消息队列中的数据,进行过滤和分析,然后将数据传递给Elasticsearch进行存储,最后,再由Kibana将日志和数据呈现给用户
由于引入了Kafka缓冲机制,即使远端Logstash server因故障停止运行,数据也不会丢失,可靠性得到了大大的提升
2)用户轨迹跟踪:kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等操作,这些活动信息被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,当然也可以保存到数据库
3)运营指标:kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
4)流式处理:比如spark streaming和storm
二、Kafka消息中心的作用有哪些?
消息中心,有以下几大作用:
- 消息通讯:可以作为基本的消息通讯,比如聊天室等工具的使用
- 异步处理 : 将一些实时性要求不是很强的业务异步处理,起到缓冲的作用,一定程度上也会避免因为有些消费者处理的太慢或者网络问题导致的通讯等待太久,因而导致的单个服务崩溃,甚至产生多个服务间的雪崩效应;
- 应用解耦 : 消息队列将消息生产者和消费者分离开来,可以实现应用解耦
- 流量削峰: 可以通过在应用前端采用消息队列来接收请求,可以达到削峰的目的:请求超过队列长度直接不处理,重定向至错误页面。类似于网关限流的作用 冗余存储:消息队列把数据进行持久化,直到它们已经被完全处理,通过这一方式规避了数据丢失风险
三、Kafka使用哪种方式消费消息,pull还是push?
Kafka的消费者使用pull(拉)的方式将消息从broker中拉下来
1 这样做的好处是:
1)Kafka可以根据consumer的消费能力以适当的速率消费消息
2)消费者可以控制自己的消费方式:可以使用批量消费,也可以选择逐条消费
3)消费者还可以选择不同的提交方式来实现不同的传输语义,要是使用了push的方式,就没有这些优点了
2 缺点是:
会出现一种情况:如果Kafka没有数据,消费者会专门有个线程去等待数据,可能会陷入循环等待中
3 Kafka如何避免这一缺点:
我们可以通过在拉请求中设置参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)来避免这一问题
4 关于push的方式的话,它的优点是:
1)相对于pull的方式来说,它不需要专门有一个消息去等待,而可能造成线程循环等待的问题
2)它的缺点是:
push(推)模式一般是会以同样的速率将消息推给消费者,很难适应消费速率不同的消费者,这样很容易造成有些消费能力比较低的consumer来不及处理消息,导致出现拒绝服务以及网络拥塞的情况
四、Kafka与Zookeeper是什么关系?
Kafka的数据会存储在zookeeper上。包括broker和消费者consumer的信息
其中,
broker信息:包含各个broker的服务器信息、Topic信息
消费者信息:主要存储每个消费者消费的topic的offset的值
五、Kafka的缓冲池满了怎么办?
无论消息是否被消费,kafka都会保留所有消息。而当消息的大小,大于设置的最大值log.retention.bytes(默认为1073741824)的值,也就是说这个缓冲池满了的时候,Kafka便会清除掉旧消息
那么它每次删除多少消息呢?
topic的分区partitions,被分为一个个小segment,按照segment为单位进行删除(segment的大小也可以进行配置,默认log.segment.bytes = 1024 * 1024 * 1024),由时间从远到近的顺序进行删除
此外,Kafka还支持基于时间策略进行删除数据,过期时间默认为:log.retention.hours=168
注意:因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关
六、Kafka数据传输的事务有几种?
有三种:
- 最多一次(<=1): 消息不会被重复发送,最多被传输一次,但也有可能一次不传输
- 最少一次(>=1):消息不会被漏发送,最少被传输一次,但也有可能被重复传输
- 精确的一次(Exactly once)(=1): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的 那么,每种传输,分别是怎样实现的呢?
- 最多一次:consumer先读消息,记录offset,最后再处理消息
这样,不可避免地存在一种可能:在记录offset之后,还没处理消息就出现故障了,新的consumer会继续从这个offset处理,那么就会出现有些消息永远不会被处理。那么这种机制,就是消息最多被处理一次
最少一次:consumer可以先读取消息,处理消息,最后记录offset
七、Kafka在什么情况下会出现消息丢失?
以下几个阶段,都有可能会出现消息丢失的情况
1)消息发送的时候,如果发送出去以后,消息可能因为网络问题并没有发送成功
2)消息消费的时候,消费者在消费消息的时候,若还未做处理的时候,服务挂了,那这个消息不就丢失了
3)分区中的leader所在的broker挂了之后
我们知道,Kafka的Topic中的分区Partition是leader与follower的主从机制,发送消息与消费消息都直接面向leader分区,并不与follower交互,follower则会去leader中拉取消息,进行消息的备份,这样保证了一定的可靠性
但是,当leader副本所在的broker突然挂掉,那么就要从follower中选举一个leader,但leader的数据在挂掉之前并没有同步到follower的这部分消息肯定就会丢失掉
八、Kafka如何避免消息重复消费(保证消息的幂等性)
关于此问题,可以从以下方面回答:
(1)在Kafka中,每个消费者都必须加入至少一个消费者组。同一个消费者组内的消费者可以共享消费者的负载。
因此,如果一个消息被消费组中的任何一个消费者消费了,那么其他消费者就不会再收到这个消息了
(2)消费者可以通过手动提交消费位移来控制消息的消费情况。通过手动提交位移,消费者可以跟踪自己已经消费的消息,确保不会重复消费同一消息
(3)客户端自己可以做一些幂等机制,防止消息的重复消费。
针对重复消费的问题,一般可以根据业务情况处理:
1> 可以建立一个业务相关的本地表,使用主键或者唯一索引,针对每次消费之后已提交的数据记录消息的唯一id,等下次消息消费时判断是否消费过
2> 或者使用Redis缓存针对已经消费提交的消息进行标记,等下次消息消费进行校验
九、kafka如何解决消息丢失问题
保证消息不丢失,可以从以下三个方面考虑:
(1)生产者不丢消息,即:确保生产的消息能到达存储端
针对kafka而言,使用调用回调的 producer.send(msg,callback)方法,保证消息同步发送到存储端,
同时可以根据回调方法返回的异常信息针对消息发送失败问题进行处理,
如果是因为那些瞬时错误,Producer重试就可以了;如果是消息不合规造成的,那么调整消息格式后再次发送。
(2)Broker存储端不丢数据:确保消息持久化到磁盘中
例如:Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。如果此时其他的 follower 刚好有些数据没有同步,
结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,便会导致数据丢失
解决方案主要有两种:
1> 持久化存储:Kafka使用持久化存储来存储消息。这意味着消息在写入Kafka时将被写入磁盘,这种方式可以防止消息因为节点容机而丢失
2> ISR复制机制:Kafka使用ISR机制来确保消息不会丢失,Kafka使用复制机制来保证数据的可靠性。每个分区都有多个副本,副本可以分布在不同的节点上。
当一个节点宕机时,其他节点上的副本仍然可以提供服务,保证消息不丢失
服务端也可以设置如下参数:
1> topic 设置 replication.factor 参数值大于 1,要求每个 partition 必须有至少 2 个副本
2> 在 Kafka 服务端设置 min.insync.replicas 参数值大于 1,这个是要求一个 leader 至少有一个 follower 还跟自己保持联系
3> 在 producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了
4> 在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试
(3)消费者不丢数据:确保消息消费完成再提交
也就是说一个线程消费到了这个消息,然后消费者那边自动提交了 offset,会让 Kafka 以为此线程已经消费好了这个消息,
但实际情况是该线程准备处理此消息时,就挂掉了,结果就导致这条消息丢失了。
解决方案:
针对此种情况,只需要关闭kafka的自动提交 offset设置,即:将enable.auto.commit参数值设置为false,然后在处理完消息之后再手动提交 offset
十、如何处理消息积压问题
消息积压有如下几种情况:
(1)消息堆积在MQ中几个小时没有处理情况
这种情况一般只能临时扩容,大致思路如下:
1> 先修复 consumer 消费者的问题,以确保其恢复消费速度,然后将现有consumer 都停掉。
2> 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue数量。
3> 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
4> 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。
这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
5> 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
(2)优化一下消费的逻辑,比如之前是一条一条消息消费处理的话,我们可以确认是不是可以优化为批量处理消息。
如果还是慢,我们可以考虑水平扩容,增加 Topic 的队列数和消费组机器的数量,提升整体消费能力。
看完上述Kafka的基础知识,是不是Kafka再也卡不住你了^_^