基于kraft部署kafka集群

news/2024/12/28 21:02:57/

kafka_0">kafka介绍

Apache Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用。
在这里插入图片描述

基于kraft部署集群

集群节点规划

主机名IP 地址角色node-idOS
kafka01192.168.72.31broker/controller1Ubuntu22.04
kafka02192.168.72.32broker/controller2Ubuntu22.04
kafka03192.168.72.33broker/controller3Ubuntu22.04

准备3个节点,每个节点同时作为broker和controller运行。

集群架构如下:
在这里插入图片描述

基础环境配置

以下操作在所有节点执行。

配置主机名

hostnamectl set-hostname kafka01
hostnamectl set-hostname kafka02
hostnamectl set-hostname kafka03

配置hosts解析,提前部署zookeeper集群:

cat > /etc/hosts <<EOF
192.168.72.31 kafka01
192.168.72.32 kafka02
192.168.72.33 kafka03
EOF

安装java环境

apt update -y
apt install -y openjdk-21-jdk

创建Kafka用户

sudo groupadd kafka
sudo useradd -m -s /bin/bash -g kafka kafka
sudo passwd kafka

kafka_57">安装kafka

以下操作在所有节点执行。

wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -zxvf kafka_2.13-3.9.0.tgz -C /opt
ln -s /opt/kafka_2.13-3.9.0 /opt/kafka

配置环境变量

cat > /etc/profile.d/kafka.sh <<'EOF'
export KAFKA_HOME=/opt/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOF
source /etc/profile

修改配置文件

为broker设置角色,以便他们也可以成为控制器。使用配置属性文件应用broker配置,包括角色设置。broker配置根据角色有所不同。KRaft 提供了三个示例broker配置属性文件。

  • /opt/kafka/config/kraft/broker.properties 有一个broker角色的示例配置,如果正在配置broker节点,那么需要选择该配置文件
  • /opt/kafka/config/kraft/controller.properties 有一个控制器角色的示例配置,如果正在配置控制器节点,那么需要选择该配置文件
  • /opt/kafka/config/kraft/server.properties 有一个组合角色的示例配置,如果正在配置broker和控制器节点,那么需要选择该配置文件

可以根据这些示例属性文件来配置broker,本示例使用 server.properties 配置。

修改kafka01节点配置文件

cp /opt/kafka/config/kraft/server.properties{,.bak}
cat <<EOF > /opt/kafka/config/kraft/server.properties
process.roles=broker,controller
node.id=1
num.partitions=6
default.replication.factor=2
min.insync.replicas=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=1
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://kafka01:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
log.dirs=/data/kafka/kraft-combined-logs
log.retention.hours=168 
EOF

修改kafka02节点配置文件

cp /opt/kafka/config/kraft/server.properties{,.bak}
cat <<EOF > /opt/kafka/config/kraft/server.properties
process.roles=broker,controller
node.id=2
num.partitions=6
default.replication.factor=2
min.insync.replicas=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=1
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://kafka02:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
log.dirs=/data/kafka/kraft-combined-logs
log.retention.hours=168
EOF

修改kafka03节点配置文件

cp /opt/kafka/config/kraft/server.properties{,.bak}
cat <<EOF > /opt/kafka/config/kraft/server.properties
process.roles=broker,controller
node.id=3
num.partitions=6
default.replication.factor=2
min.insync.replicas=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=1
controller.quorum.voters=1@kafka01:9093,2@kafka02:9093,3@kafka03:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://kafka03:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
log.dirs=/data/kafka/kraft-combined-logs
log.retention.hours=168 
EOF

参数说明

  • process.roles: 一个节点可以充当 代理 或 控制器 或 两者。本示例指明这个节点可以同时是一个 kafka 代理和一个 kraft 控制器节点。
  • node.id:作为集群中的节点 ID,识别这是哪个代理以及哪个 Kraft 控制器节点。
  • num.partitions:创建topic时默认分区数,建议broker节点的倍数
  • default.replication.factor:分区默认副本数,controller与broker 3节点混合部署时,由于只能故障一个节点,建议设为2,设为3依然只能故障一个broker。
  • min.insync.replicas:控制写入操作必须有至少多少个副本处于同步状态才能成功执行,通常建议设置为副本数 - 1。
  • offsets.topic.replication.factor:用于设置 Kafka 内部主题 __consumer_offsets 的副本因子。
  • transaction.state.log.replication.factor:决定 Kafka 内部主题 __transaction_state 的副本因子
  • transaction.state.log.min.isr:定义 __transaction_state 主题的最小同步副本数(ISR,In-Sync Replicas)。
  • controller.quorum.voters: 用于指示所有可用的 kraft 控制器。这里指明将有 3 个 kraft 控制器节点在端口 9093上运行。
  • listeners:在这里我们指明代理将使用 9092 端口,而 kraft 控制器将使用 19092 端口
  • advertised.listeners:用于配置 Kafka Broker 如何向客户端(生产者或消费者)暴露其地址和端口。
  • controller.listener.names:这里的控制器监听器名称设置为 CONTROLLER
  • listener.security.protocol.map:在这里添加连接安全详细信息
  • log.dirs:这是 Kafka 存储数据的日志目录
  • log.retention.hours:用于控制主题分区日志数据保留时间的配置参数,默认168小时(7天)

建立集群 ID

需要为新 Kafka 版本形成一个集群 ID,集群 ID 在集群中的所有节点之间相同,在第一个节点执行

/opt/kafka/bin/kafka-storage.sh random-uuid > /opt/kafka/config/kraft/cluster.id
scp /opt/kafka/config/kraft/cluster.id kafka02:/opt/kafka/config/kraft/
scp /opt/kafka/config/kraft/cluster.id kafka03:/opt/kafka/config/kraft/

使用集群 ID 建立存储,在所有节点执行

export kafka_cluster_id=$(cat /opt/kafka/config/kraft/cluster.id)
kafka-storage.sh format -t $kafka_cluster_id -c /opt/kafka/config/kraft/server.properties

修改目录权限

mkdir -p /opt/kafka /data/kafka
sudo chown -R kafka:kafka /opt/kafka*
sudo chown -R kafka:kafka /data/kafka

启动kafka服务

建立 Kafka 服务启动定义

cat <<EOF > /etc/systemd/system/kafka.service
[Unit]
Description=Kafka
After=network.target[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
LimitNOFILE=10000000
LimitCORE=infinity
LimitNPROC=infinity
LimitMEMLOCK=infinity[Install]
WantedBy=multi-user.target
EOF

启动kafka服务

systemctl daemon-reload
systemctl enable --now kafka.service
systemctl status kafka.service 

查看集群状态

描述运行时状态

Kafka 提供工具来帮助您调试在 KRaft 模式下运行的集群。

可以使用kafka-metadata-quorum 工具描述集群元数据分区的运行时状态,并指定一个带有 --bootstrap-server 选项的 Kafka 代理或一个带有 --bootstrap-controller 选项的 KRaft 控制器。

例如,以下命令指定一个代理并显示元数据法定人数的摘要:

kafka-metadata-quorum.sh --bootstrap-server kafka01:9092 describe --status

示例输出如下

root@kafka01:~# kafka-metadata-quorum.sh --bootstrap-server kafka01:9092 describe --status
ClusterId:              ZgI_5pGNQcOgbMNT1Vqqxw
LeaderId:               2
LeaderEpoch:            77
HighWatermark:          1677
MaxFollowerLag:         0
MaxFollowerLagTimeMs:   350
CurrentVoters:          [{"id": 1, "directoryId": null, "endpoints": ["CONTROLLER://kafka01:9093"]}, {"id": 2, "directoryId": null, "endpoints": ["CONTROLLER://kafka02:9093"]}, {"id": 3, "directoryId": null, "endpoints": ["CONTROLLER://kafka03:9093"]}]
CurrentObservers:       []

该脚本列出了集群状态的基本信息。在显示的输出中,您可以看到节点 2 被选为领导者,所有三个节点 ([1,2,3]) 都在投票池中并同意该决定。

您可以使用 --bootstrap-controller 选项指定一个控制器。这在代理不可访问时非常有用。

kafka-metadata-quorum.sh --bootstrap-controller kafka01:9093 describe --status

调试日志段

kafka-dump-log 工具可用于调试集群元数据目录中的日志段和快照。该工具将扫描提供的文件并解码元数据记录。例如,以下命令解码并打印第一个日志段中的记录:

kafka-dump-log.sh --cluster-metadata-decoder --files \
/data/kafka/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log 

创建topic主题

通过运行创建一个名为 first-topic 的主题,将复制因子设置为 2 确保该主题将在至少两个节点上可用。

kafka-topics.sh --create --topic first-topic \
--bootstrap-server kafka01:9092 --replication-factor 2

然后,运行 kafka-topics.sh 脚本以查看分区在节点上的排列情况:

kafka-topics.sh --describe --bootstrap-server kafka01:9092 --topic first-topic

示例输出如下

root@kafka01:~# kafka-topics.sh --describe --bootstrap-server kafka01:9092 --topic first-topic
Topic: first-topic      TopicId: g_CJm0fRR_-ztyoLeNlg2g PartitionCount: 6       ReplicationFactor: 2    Configs: min.insync.replicas=1Topic: first-topic      Partition: 0    Leader: 3       Replicas: 3,1   Isr: 3,1        Elr:    LastKnownElr: Topic: first-topic      Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2        Elr:    LastKnownElr: Topic: first-topic      Partition: 2    Leader: 2       Replicas: 2,3   Isr: 2,3        Elr:    LastKnownElr: Topic: first-topic      Partition: 3    Leader: 3       Replicas: 3,2   Isr: 3,2        Elr:    LastKnownElr: Topic: first-topic      Partition: 4    Leader: 2       Replicas: 2,1   Isr: 2,1        Elr:    LastKnownElr: Topic: first-topic      Partition: 5    Leader: 1       Replicas: 1,3   Isr: 1,3        Elr:    LastKnownElr: 
root@kafka01:~# 

可以看到每个分区都有其领导者、两个副本和两个同步副本集(ISR)。分区领导者是一个代理节点,负责将分区数据提供给客户端,而副本仅保留副本。如果副本节点在过去十秒内与领导者保持同步,则默认情况下被视为 ISR。此时间间隔可以根据每个主题进行配置。

生产消息

现在已经创建了一个主题,使用 kafka-console-producer.sh 脚本生成其消息。运行以下命令以启动生产者:

kafka-console-producer.sh --topic first-topic --bootstrap-server kafka01:9092

broker正在等待输入文本消息。输入 Hello World!并按 ENTER。提示将如下所示:

>Hello World!
>

生产者现在正在等待下一条消息,这意味着之前的消息已成功传递给 Kafka。可以输入任意数量的消息进行测试。要退出生产者,请按 CTRL+C

消费消息

需要一个消费者来从主题中读取消息。Kafka 提供了一个简单的消费者,名为 kafka-console-consumer.sh。通过运行以下命令来执行它:

kafka-console-consumer.sh --topic first-topic --from-beginning \
--bootstrap-server kafka01:9092

您将看到从主题中读取的消息:

Hello World!
...

模拟节点故障

在第三个 Kafka 节点上,通过运行以下命令停止服务:

sudo systemctl stop kafka

然后,通过运行来描述主题:

kafka-topics.sh --describe --bootstrap-server kafka01:9092 --topic first-topic

输出将类似于此:

root@kafka01:~# kafka-topics.sh --describe --bootstrap-server kafka01:9092 --topic first-topic
Topic: first-topic      TopicId: g_CJm0fRR_-ztyoLeNlg2g PartitionCount: 6       ReplicationFactor: 2    Configs: min.insync.replicas=1Topic: first-topic      Partition: 0    Leader: 1       Replicas: 3,1   Isr: 1  Elr:    LastKnownElr: Topic: first-topic      Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2        Elr:    LastKnownElr: Topic: first-topic      Partition: 2    Leader: 2       Replicas: 2,3   Isr: 2  Elr:    LastKnownElr: Topic: first-topic      Partition: 3    Leader: 2       Replicas: 3,2   Isr: 2  Elr:    LastKnownElr: Topic: first-topic      Partition: 4    Leader: 2       Replicas: 2,1   Isr: 2,1        Elr:    LastKnownElr: Topic: first-topic      Partition: 5    Leader: 1       Replicas: 1,3   Isr: 1  Elr:    LastKnownElr: 
root@kafka01:~# 

之前所有在节点3上为leader角色的分区,已经将其他节点的follower分区切换为leader分区。尽管节点 3 被列为副本,但由于不可用,它在 ISR 集合中缺失。

一旦它重新加入集群,它将与其他节点同步并尝试恢复其之前的位置。

再试着阅读来自 first-topic 的消息:

kafka-console-consumer.sh --topic first-topic --from-beginning \
--bootstrap-server kafka01:9092

你会看到它们像往常一样可访问:

Hello World!
...

由于副本的存在,前两个节点接管并为消费者提供服务。现在可以在第三台服务器上启动 Kafka:

sudo systemctl start kafka

在这一步中,已经看到 Kafka 如何减轻集群中节点不可用的情况。

在节点之间迁移数据

在这一步中,您将学习如何在 Kafka 集群中迁移主题。当向现有集群添加带有主题的节点时,Kafka 不会自动将任何分区转移到该节点,这可能是您想要做的。这对于移除节点也很有用,因为现有的分区不会自动移动到剩余的节点。

Kafka 提供了一个名为kafka-reassign-partitions.sh的脚本,可以生成、执行和验证迁移计划。使用它来创建一个将first-topic的分区移动到前两个节点的计划。

首先,您需要定义哪些主题应该被移动。脚本接受一个包含主题定义的 JSON 文件,因此请创建并打开它以进行编辑:

vi topics-to-move.json

添加以下行:

topics-to-move.json

{"topics": [{"topic": "first-topic"}],"version": 1
}

topics下,您定义一个引用first-topic的对象。完成后,保存并关闭文件。

运行以下命令生成迁移计划,将 kafka01替换为指向其中一个 Kafka 节点的域名:

kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 \
--topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

您将 "1,2" 传递给 --broker-list,表示目标经纪人的 ID。

输出将类似于此:

Current partition replica assignment
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[1,2],"log_dirs":["any","any"]}]}

该脚本总共生成了两个计划,分别描述了当前和提议的分区布局。如果您需要稍后恢复更改,将提供第一个计划。请注意第二个计划,将其存储在一个名为 migration-plan.json 的单独文件中。创建并打开该文件进行编辑:

vi migration-plan.json

添加第二个执行计划:

migration-plan.json

{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}

保存并关闭文件。然后,运行以下命令来执行它:

kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 \
--reassignment-json-file migration-plan.json --execute

输出将是:

Current partition replica assignment{"version":1,"partitions":[{"topic":"first-topic","partition":0,"replicas":[3,1],"log_dirs":["any","any"]},{"topic":"first-topic","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":2,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":3,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"first-topic","partition":4,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"first-topic","partition":5,"replicas":[2,1],"log_dirs":["any","any"]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-topic-0,first-topic-1,first-topic-2,first-topic-3,first-topic-4,first-topic-5

脚本指出迁移已经开始。要查看迁移的进度,请传入 --verify

kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 \
--reassignment-json-file migration-plan.json --verify

经过一段时间,输出将类似于此:

Status of partition reassignment:
Reassignment of partition first-topic-0 is completed.
Reassignment of partition first-topic-1 is completed.
Reassignment of partition first-topic-2 is completed.
Reassignment of partition first-topic-3 is completed.
Reassignment of partition first-topic-4 is completed.
Reassignment of partition first-topic-5 is completed.Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic first-topic

现在可以描述 first-topic 以验证代理 3 上没有分区:

kafka-topics.sh --describe --bootstrap-server kafka01:9092 --topic first-topic

输出将如下所示:

Topic: first-topic      TopicId: 4kVImoFNTQeyk3r2zQbdvw PartitionCount: 6       ReplicationFactor: 2    Configs: segment.bytes=1073741824Topic: first-topic      Partition: 0    Leader: 2       Replicas: 2,1   Isr: 1,2Topic: first-topic      Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2Topic: first-topic      Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1Topic: first-topic      Partition: 3    Leader: 1       Replicas: 1,2   Isr: 1,2Topic: first-topic      Partition: 4    Leader: 2       Replicas: 2,1   Isr: 2,1Topic: first-topic      Partition: 5    Leader: 1       Replicas: 1,2   Isr: 2,1

只有代理 12 作为副本和 ISR 存在,这意味着迁移成功。


http://www.ppmy.cn/news/1558881.html

相关文章

如何通过HTTP API分组检索Doc

本文介绍如何通过HTTP API在Collection中进行分组相似性检索。 前提条件 已创建Cluster&#xff1a;创建Cluster。 已获得API-KEY&#xff1a;API-KEY管理。 Method与URL HTTP POST https://{Endpoint}/v1/collections/{CollectionName}/query_group_by 使用示例 说明 需要…

MySQL 查询优化案例分享

在日常开发中&#xff0c;SQL 查询性能直接影响到系统的响应速度和用户体验。随着数据量的增长&#xff0c;慢查询可能成为系统的瓶颈。本文将通过实际案例&#xff0c;分享几种常见的 MySQL 查询优化方法&#xff0c;帮助开发者快速定位和优化慢查询&#xff0c;提升数据库性能…

React 第十九节 useLayoutEffect 用途使用技巧注意事项详解

1、概述 useLayoutEffect 是useEffect 的一个衍生版本&#xff0c;只是他们的执行时机不同 useLayoutEffect 用于在DOM更新执行完成之后&#xff0c;浏览器渲染绘制之前执行&#xff0c;这会阻塞浏览器的渲染&#xff1b; useEffect 的执行时机是在组件首次渲染和更新渲染之后…

Android APP 集成本地大模型 LLM

这篇文章介绍如何在Android项目中集成大模型LLM&#xff0c;并展示大模型 text-to-text 结果。主要依赖如下2项&#xff1a; MediaPipeGemma 2BMediaPipe Google 在2017年发布了 TensorFlow Lite&#xff0c; 它是一个用于在Mobile和IoT上进行 ML 推断的轻量级框架。主要用于将…

【每日学点鸿蒙知识】hap安装报错、APP转移账号、import本地文件、远程包构建问题、访问前端页面方法

1、HarmonyOS 打包生产的hap&#xff0c;安装报错&#xff0c;如何解决&#xff1f; 通过build打包的hap文件&#xff0c;通过hdc进行安装&#xff0c;提示错误&#xff0c;证书信息确认是release。如何解决&#xff1f; release证书无法用于安装。 2、HarmonyOS 普通账户下…

深入解析Android Framework中的android.location包:架构设计、设计模式与系统定制

深入解析Android Framework中的android.location包:架构设计、设计模式与系统定制 目录 引言android.location包概述核心类解析 LocationManagerLocationProviderLocationCriteriaGpsStatusGpsStatus.ListenerLocationListener位置服务的工作原理位置信息的获取与处理GPS状态…

Linux零基础速成篇一(理论+实操)

前言&#xff1a;本教程适合Linux零基础学习&#xff0c;也适合Linux期末考试的小伙伴&#xff0c;从头到尾理论与实操相结合&#xff0c;让你快速对Linux进行了解和掌握。 一、Linux概述 为什么要学习Linux操作系统&#xff1f; 完全免费-开源 任何用户均可下载使用 安全…

STM32 高级 谈一下IPV4/默认网关/子网掩码/DNS服务器/MAC

首先可以通过 winr->输入cmd->输入ipconfig 命令可以查看计算机的各种地址 IPV4&#xff1a;是互联网协议第 4 版&#xff08;Internet Protocol version 4&#xff09;所使用的地址。它是一个 32 位的二进制数字&#xff0c;通常被分为 4 个 8 位的部分&#xff…