k8s部署Kafka集群超详细讲解

ops/2024/10/24 1:08:27/

准备部署环境

Kubernetes集群信息

NAMEVERSION
k8s-masterv1.29.2
k8s-node01v1.29.2
k8s-node02v1.29.2

Kafka:3.7.1版本,apche版本
Zookeeper:3.6.3版本

准备StorageClass

# kubectl get sc 
NAME                PROVISIONER      RECLAIMPOLICY   VOLUMEBINDINGMODE   ALLOWVOLUMEEXPANSION   AGE
nfs-csi (default)   nfs.csi.k8s.io   Delete          Immediate           false                  51d

创建namespace

Kubectl create ns kafka

部署zookeeper集群

编写zookeeper集群部署zk.yaml文件

apiVersion: v1
kind: Service
metadata:name: zk-hsnamespace: kafkalabels:app: zk
spec:ports:- port: 2888name: server- port: 3888name: leader-election- port: 2181name: clientclusterIP: Noneselector:app: zk
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: zknamespace: kafka
spec:selector:matchLabels:app: zkserviceName: zk-hsreplicas: 3updateStrategy:type: RollingUpdatepodManagementPolicy: Paralleltemplate:metadata:labels:app: zkspec:containers:- name: zookeeperimagePullPolicy: Alawysimage: registry.cn-hangzhou.aliyuncs.com/aliyun_0612/zookeeper:3.6.3resources:requests:memory: "200Mi"cpu: "0.1"ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electioncommand:- sh- -c- "start-zookeeper \--servers=3 \--data_dir=/var/lib/zookeeper/data \--data_log_dir=/var/lib/zookeeper/data/log \--conf_dir=/opt/zookeeper/conf \--client_port=2181 \--election_port=3888 \--server_port=2888 \--tick_time=2000 \--init_limit=10 \--sync_limit=5 \--heap=512M \--max_client_cnxns=60 \--snap_retain_count=3 \--purge_interval=12 \--max_session_timeout=40000 \--min_session_timeout=4000 \--log_level=INFO"readinessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5livenessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5volumeMounts:- name: datadirmountPath: /var/lib/zookeepersecurityContext:runAsUser: 0fsGroup: 0volumeClaimTemplates:- metadata:name: datadirspec:accessModes: [ "ReadWriteMany" ]storageClassName: nfs-csiresources:requests:storage: 2Gi

kafkakafkayaml_135">kafka集群部署kafka.yaml文件

---
apiVersion: v1
kind: Service
metadata:name: kafka-svcnamespace: kafkalabels:app: kafka
spec:ports:- port: 9092name: serverclusterIP: Noneselector:app: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafkanamespace: kafka
spec:selector:matchLabels:app: kafkaserviceName: kafka-svcreplicas: 3template:metadata:labels:app: kafkaspec:containers:- name: k8s-kafkaimagePullPolicy: Always image: registry.cn-hangzhou.aliyuncs.com/aliyun_0612/kafka:3.7.1resources:requests:memory: "600Mi"cpu: 500mports:- containerPort: 9092name: servercommand:- sh- -c- "exec /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \--override listeners=PLAINTEXT://:9092 \--override zookeeper.connect=zk-hs.kafka.svc.cluster.local:2181 \--override log.dir=/var/lib/kafka \--override auto.create.topics.enable=true \--override auto.leader.rebalance.enable=true \--override background.threads=10 \--override compression.type=producer \--override delete.topic.enable=false \--override leader.imbalance.check.interval.seconds=300 \--override leader.imbalance.per.broker.percentage=10 \--override log.flush.interval.messages=9223372036854775807 \--override log.flush.offset.checkpoint.interval.ms=60000 \--override log.flush.scheduler.interval.ms=9223372036854775807 \--override log.retention.bytes=-1 \--override log.retention.hours=168 \--override log.roll.hours=168 \--override log.roll.jitter.hours=0 \--override log.segment.bytes=1073741824 \--override log.segment.delete.delay.ms=60000 \--override message.max.bytes=1000012 \--override min.insync.replicas=1 \--override num.io.threads=8 \--override num.network.threads=3 \--override num.recovery.threads.per.data.dir=1 \--override num.replica.fetchers=1 \--override offset.metadata.max.bytes=4096 \--override offsets.commit.required.acks=-1 \--override offsets.commit.timeout.ms=5000 \--override offsets.load.buffer.size=5242880 \--override offsets.retention.check.interval.ms=600000 \--override offsets.retention.minutes=1440 \--override offsets.topic.compression.codec=0 \--override offsets.topic.num.partitions=50 \--override offsets.topic.replication.factor=3 \--override offsets.topic.segment.bytes=104857600 \--override queued.max.requests=500 \--override quota.consumer.default=9223372036854775807 \--override quota.producer.default=9223372036854775807 \--override replica.fetch.min.bytes=1 \--override replica.fetch.wait.max.ms=500 \--override replica.high.watermark.checkpoint.interval.ms=5000 \--override replica.lag.time.max.ms=10000 \--override replica.socket.receive.buffer.bytes=65536 \--override replica.socket.timeout.ms=30000 \--override request.timeout.ms=30000 \--override socket.receive.buffer.bytes=102400 \--override socket.request.max.bytes=104857600 \--override socket.send.buffer.bytes=102400 \--override unclean.leader.election.enable=true \--override zookeeper.session.timeout.ms=6000 \--override zookeeper.set.acl=false \--override broker.id.generation.enable=true \--override connections.max.idle.ms=600000 \--override controlled.shutdown.enable=true \--override controlled.shutdown.max.retries=3 \--override controlled.shutdown.retry.backoff.ms=5000 \--override controller.socket.timeout.ms=30000 \--override default.replication.factor=1 \--override fetch.purgatory.purge.interval.requests=1000 \--override group.max.session.timeout.ms=300000 \--override group.min.session.timeout.ms=6000 \--override inter.broker.protocol.version=2.2.0 \--override log.cleaner.backoff.ms=15000 \--override log.cleaner.dedupe.buffer.size=134217728 \--override log.cleaner.delete.retention.ms=86400000 \--override log.cleaner.enable=true \--override log.cleaner.io.buffer.load.factor=0.9 \--override log.cleaner.io.buffer.size=524288 \--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \--override log.cleaner.min.cleanable.ratio=0.5 \--override log.cleaner.min.compaction.lag.ms=0 \--override log.cleaner.threads=1 \--override log.cleanup.policy=delete \--override log.index.interval.bytes=4096 \--override log.index.size.max.bytes=10485760 \--override log.message.timestamp.difference.max.ms=9223372036854775807 \--override log.message.timestamp.type=CreateTime \--override log.preallocate=false \--override log.retention.check.interval.ms=300000 \--override max.connections.per.ip=2147483647 \--override num.partitions=3 \--override producer.purgatory.purge.interval.requests=1000 \--override replica.fetch.backoff.ms=1000 \--override replica.fetch.max.bytes=1048576 \--override replica.fetch.response.max.bytes=10485760 \--override reserved.broker.max.id=1000 "env:- name: KAFKA_HEAP_OPTSvalue : "-Xmx512M -Xms512M"- name: KAFKA_OPTSvalue: "-Dlogging.level=INFO"volumeMounts:- name: datadirmountPath: /var/lib/kafkareadinessProbe:tcpSocket:port: 9092timeoutSeconds: 1initialDelaySeconds: 5securityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirspec:accessModes: [ "ReadWriteMany" ]storageClassName: nfs-csiresources:requests:storage: 10Gi

部署集群

部署集群

[root@k8s-master ~]# kubectl apply -f zk.yaml
[root@k8s-master ~]# kubectl apply -f kafka.yaml

检查状态
在这里插入图片描述
出现启动失败情况,解决不了可以下方评论,专业大神会提供解决方案

验证zk集群

进入zk pod中,查看zk集群是否正常

[root@k8s-master test]# kubectl exec -it -nkafka zk-0 bash

在这里插入图片描述
使用

[root@k8s-master test]# zkCli.sh

在这里插入图片描述
查看kafka brokers
在这里插入图片描述
通过验证,zk集群正常

验证Kafka集群

进入kafka pod节点中

[root@k8s-master ~]# kubectl exec -it -nkafka kafka-0 bash

在这里插入图片描述
创建测试topic

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic mytopic --replication-factor 3 --partitions 3

这里要使用绝对路径
在这里插入图片描述
使用describe 查看topic分区数以及副本数正常

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic mytopic

这里要使用绝对路径
在这里插入图片描述
至此,kafka集群在kubernetes上部署已经完成。
有什么问题可以下方评论。


http://www.ppmy.cn/ops/127963.html

相关文章

旅游攻略网站毕业设计计算机毕设基于SpringBootSSM框架

目录 1.摘要 2 引言 2.1 开发目标 2.2 项目内容 2.3 项目背景与目的 3. 技术选型 3.1 JAVA 简介 3.2 MySQL 介绍 ‌4. 功能描述与创新点 4.1 功能描述 ‌4.2创新点设计 4.3 功能图展示 5. 数据库设计 6. 项目任务与要求 1.摘要 本文旨在设计并实现一个功能全面、…

C++代码操作指令的定义

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、最简单的函数二、栈设计StackEntry 类std::vector 容器std::vector<StackEntry>的特点std::vector<StackEntry>的操作成员枚举 Type&#xff1…

RabbitMQ service is already present - only updating service parameters

Windows下卸载RabbitMQ之后,然后重新注册RabbitMQ服务的时候,报错以下信息: D:\software\rabbitmq-server-4.0.2\rabbitmq_server-4.0.2\sbin>D:\software\rabbitmq-server-4.0.2\rabbitmq_server-4.0.2\sbin\rabbitmq-service.bat install RabbitMQ service is already …

【Jmeter】jmeter运行时自动获取线程号几种方式

背景&#xff1a; 在模拟多线程场景时&#xff0c;使用序号可以标记并追踪当前执行请求的虚拟用户。 方法1&#xff1a;通过添加BeanShell前置处理器获取或BeanShell预处理程序获取线程号 获取函数&#xff1a;${__BeanShell(ctx.getThread().getThreadName())} Thread.slee…

②PROFINET转ModbusTCP, EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关

EtherCAT/Ethernet/IP/Profinet/ModbusTCP协议互转工业串口网关https://item.taobao.com/item.htm?ftt&id822721028899 协议转换通信网关 PROFINET 转 Modbus TCP &#xff08;接上一章&#xff09; 配置使用 与 PROFINET 主站进行组态说明 这里介绍与西门子 PLC 的…

Godot中类和静态类型

目录 类 关键字class_name 除了为类定义方法&#xff0c;我们也可以为类定义属性字段 实例释放前后的打印 Refcounted RefCounted维护了一个引用计数器 get_reference_count 类是引用类型数据 class关键字 静态类型 静态方法 静态方法只能访问静态变量 类 是面向…

“摄像机”跟随及攻击抖动实现

学习Unity的摄像机功能&#xff0c;可以帮助我们实现摄像机对人物的跟随移动&#xff0c;还可以使用这个工具自带的插件&#xff0c;摄像机震动&#xff0c;颤动&#xff0c;增强打击感&#xff1b; 首先来安装一下这个插件&#xff0c;window菜单--packageManage--左上角Unit…

数据结构 - 树,初探

树是一种非线性数据结构&#xff0c;是以分支关系定义的层次结构&#xff0c;因此形态上和自然界中的倒挂的树很像&#xff0c;而数据结构中树根向上树叶向下。 什么是树&#xff1f; 01定义 树是由n&#xff08;n>0&#xff09;个元素节点组成的有限集合&#xff0c;当n0时…