🌻🌻 目录
- 一、Kafka Broker
- 1.1 Kafka Broker工作流程
- 1.1.1 Zookeeper 存储的Kafka信息
- 1.1.2 Kafka Broker 总体工作流程
- 1.1.3 Broker 重要参数
- 1.2 生产经验——节点服役和退役
- 1.2.1 服役新节点
- 1.2.2 退役旧节点
- 1.3 Kafka 副本
- 1.3.1 副本基本信息
- 1.3.2 Leader 选举流程
- 1.3.3 Leader 和 Follower 故障处理细节
- 1.3.4 分区副本分配
- 1.3.5 生产经验——手动调整分区副本存储
- 1.3.6 生产经验——Leader Partition负载平衡
- 1.3.7 生产经验——增加副本因子
- 1.4 文件存储
- 1.4.1 文件存储机制
- 1.4.2 文件清理策略
- 1.5 高效读写数据
一、Kafka Broker
1.1 Kafka Broker工作流程
1.1.1 Zookeeper 存储的Kafka信息
(1)启动Zookeeper客户端。
cd /usr/local/zookeeper/bin/ls./zkCli.sh
(2)通过ls命令可以查看kafka相关信息。
ls /ls /kafka/ls /kafka/brokers/ls /kafka/brokers/ids
zookeerper 可视化工具 PrettyZoo:
- 下载
- 使用参考
①
②
1.1.2 Kafka Broker 总体工作流程
1)模拟Kafka上下线,Zookeeper中数据变化
(1)查看/kafka/brokers/ids路径上的节点。
./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties./kafka-server-stop.sh
1.1.3 Broker 重要参数
1.2 生产经验——节点服役和退役
1.2.1 服役新节点
1)新节点准备
(1)关闭linux-102(已经装了hadoop,jdk,zookeeper,kafka
),并右键执行克隆操作,linux-103,linux-104(注:之前若有直接可以删掉重新克隆)
(2)开启linux-103,linux-104 并修改IP地址。
①
②
③
④ 修改ip
,将192.168.10.102
修改为 192.168.10.103
,保存,并重启网络
⑤ 修改 主机名
⑥ 重启进行远程连接
⑦ 删除日志,并分别修改linux-103与linux-102 的节点 id 和服务器 ip,并开启集群配置
开启集群配置(linux-102,linux-103,linux-104都需修改)
(3) 先启动linux-102,再启动linux-103,再次启动 linux-104(脚本启动后期更新
)
cd /usr/local/zookeeper/bin/./zkServer.sh start./zkServer.sh statuscd /usr/local/kafka/bin./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
(4) 查看 linux-102 上有哪些主题,并分配了分区(生产环境建议至少分两个副区,如果不小心删除一个,则还有一个类似备份的
)
./kafka-topics.sh --bootstrap-server linux-102:9092 --list./kafka-topics.sh --bootstrap-server linux-102:9092 --topic first --describe
思考:如何将
linux-102
上面的一些分到linux-103
达到负载均衡呢?看下面 2)
2)执行负载均衡操作
(1)创建一个要均衡的主题(在 linux-102
服务器的 kafka
下面创建)。
cd /usr/local/kafkavi topics-to-move.json
{"topics": [{"topic": "first"}],"version": 1
}
(2)生成一个负载均衡的计划(在 linux-102
服务器的 kafka
下面生成)。
# 0,1,2 代表三台服务器
bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
遇到错误
:java.io.IOException: Unable to read file topics-to-move.json
解决:
直接切换到kafka 根目录进行生成即可:
(3)创建副本存储计划(所有副本存储在broker2
、broker3
、broker4
)。
vi increase-replication-factor.json
输入如下内容:
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":3,"replicas":[2],"log_dirs":["any"]}]}
(4)执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --execute
(5)验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --verify
上述操作都是在linux-102
上操作的。
1.2.2 退役旧节点
1)执行负载均衡操作
先按照退役一台节点,
生成执行计划
,然后按照服役时操作流程执行负载均衡
。
(1)创建一个要均衡的主题。
与上面(1)创建一个要均衡的主题(在 linux-102
服务器的 kafka
下面创建)。一样无需再创建。
(2)创建执行计划。
#上面是分配到了三台机器 2,3,4,现在移除 4即服务器 linux-104
bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "2,3" --generate
(3)创建副本存储计划(所有副本存储在broker2
、broker3
)。
vi increase-replication-factor.json
删除前面的,将上面复制的粘贴里面保存即可。
粘贴上面复制的如下内容:
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"first","partition":3,"replicas":[2],"log_dirs":["any"]}]}
(4)执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --execute
(5)验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server linux-102:9092 --reassignment-json-file increase-replication-factor.json --verify
2)执行停止命令
在linux-104
上执行停止命令即可
./kafka-server-stop.sh
1.3 Kafka 副本
下面的所有后期会详细更新(可跳过直接看 👉👉
大数据技术之Kafka3.x(4)
)
1.3.1 副本基本信息
- (1)Kafka 副本作用:
提高数据可靠性
。- (2)Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
- (3)Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。
(4)Kafka分区中的所有副本统称为AR(Assigned Repllicas)。
AR = ISR + OSR
ISR,表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。
OSR,表示Follower与Leader副本同步时,延迟过多的副本。
1.3.2 Leader 选举流程
(
跳过后期更新
)
Kafka 集群中有一个
broker
的Controller
会被选举为Controller Leader
,负责管理集群 broker 的上下线,所有topic
的分区副本分配和Leader选举等工作。
Controller的信息同步工作是依赖于Zookeeper的。
开启
linux-102,linux-103,linux-104
,进行如下操作:
(1)创建一个新的topic,4个分区,4个副本(因为我是单节点,所以设置了 1)
bin/kafka-topics.sh --bootstrap-server linux-102:9092 --create --topic Daniel1 --partitions 1 --replication-factor 1
(2)查看Leader分布情况
bin/kafka-topics.sh --bootstrap-server linux-102:9092 --describe --topic Daniel1
(3)停止掉linux-103
的kafka进程,并查看Leader分区情况
(4)停止掉linux-104
的kafka进程,并查看Leader
分区情况
(5)启动linux-104
的kafka进程,并查看Leader
分区情况
(6)启动linux-104
的kafka进程,并查看Leader
分区情况
(7)停止掉linux-103
的kafka进程,并查看Leader
分区情况
1.3.3 Leader 和 Follower 故障处理细节
1.3.4 分区副本分配
(
跳过后期更新
)
1)创建16分区,3个副本
(1)创建一个新的topic,名称为second。
bin/kafka-topics.sh --bootstrap-server linux-102:9092 --create --topic second --partitions 16 --replication-factor 3
(2)查看分区和副本情况。
1.3.5 生产经验——手动调整分区副本存储
手动调整分区副本存储的步骤如下:
(1)创建一个新的topic,名称为three。
(2)查看分区副本存储情况。
(3)创建副本存储计划(所有副本都指定存储在broker0、broker1中)。
(4)执行副本存储计划。
(5)验证副本存储计划。
(6)查看分区副本存储情况。
1.3.6 生产经验——Leader Partition负载平衡
1.3.7 生产经验——增加副本因子
在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。
1)创建topic
2)手动增加副本存储
(1)创建副本存储计划(所有副本都指定存储在broker0、broker1、broker2中)。
(2)执行副本存储计划。
1.4 文件存储
1.4.1 文件存储机制
1)Topic数据的存储机制
2)思考:Topic数据到底存储在什么位置?
(1)启动生产者,并发送消息。
(2)查看linux-102(或者linux-103、linux-104)的/usr/local/kafka/datas/first-1(first-0、first-2)路径上的文件。
(3)直接查看log日志,发现是乱码。
(4)通过工具查看index和log信息。
3)index文件和log文件详解
说明:日志存储参数配置
1.4.2 文件清理策略
Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。
Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。
- log.retention.hours,最低优先级小时,默认7天。
- log.retention.minutes,分钟。
- log.retention.ms,最高优先级毫秒。
- log.retention.check.interval.ms,负责设置检查周期,默认5分钟。
那么日志一旦超过了设置的时间,怎么处理呢?
Kafka中提供的日志清理策略有delete和compact两种。
1)delete日志删除:将过期数据删除
- log.cleanup.policy = delete 所有数据启用删除策略
(1)基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳。
(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。
log.retention.bytes,默认等于-1,表示无穷大。
思考:如果一个segment中有一部分数据过期,一部分没有过期,怎么处理?
2)compact日志压缩
1.5 高效读写数据
- 1)Kafka本身是分布式集群,可以采用分区技术,并行度高
- 2)读数据采用稀疏索引,可以快速定位要消费的数据
- 3)顺序写磁盘
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
4)页缓存 + 零拷贝技术