Kafka集群创建

ops/2024/12/12 4:56:16/

上次集群忘了写文档,这次集群创建zk和kafka放在了一起,版本和生产一致,所以使用低版本
2.8.6

一、准备配置

1.1、配置env

$ cat /etc/profile.d/kafka.sh
# Java Environment
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export PATH=$PATH:$JAVA_HOME/bin
# Kafka  Environment
export KAFKA_HOME=/data/kafka
export PATH=$PATH:$KAFKA_HOME/bin
$ cat /etc/profile.d/zookeeper.sh 
# ZooKeeper Environment
export ZOOKEEPER_HOME=/data/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

1.2、启动文件

zk启动文件,forking是通过子进程去管理,适合守护进程

$ cat /etc/systemd/system/zookeeper.service
[Unit]
Description=zookeeper
After=syslog.target network.target[Service]
Type=forking
User=root
ExecStart=/data/zookeeper/bin/zkServer.sh start
ExecStop=/data/zookeeper/bin/zkServer.sh stop
Restart=on-failure
Restart=always
RestartSec=5[Install]
WantedBy=multi-user.target

kafka启动文件

$ cat /etc/systemd/system/kafka.service
[Unit]
Description=kafka
Requires=network.target remote-fs.target
After=network.target remote-fs.target[Service]
LimitNOFILE=infinity
LimitNPROC=infinity
Type=forking
ExecStart=/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties
ExecStop=/data/kafka/bin/kafka-server-stop.sh
Restart=on-failure[Install]
WantedBy=multi-user.target

1.3、服务配置文件

$ cat /data/zookeeper/conf/zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
clientPort=2181
server.15=10.198.170.15:2888:3888
server.16=10.198.170.16:2888:3888
server.17=10.198.170.17:2888:3888
4lw.commands.whitelist=*

kafka_75">二、kafka集群初始化

由于我们要启用broker之间的加密通信,所以需要主节点这里先生成管理员用户密码

# kafka的时候第一台节点先不使用加密运行,然后生成SECRM加密
$ cat /data/kafka/config/server.properties 
broker.id=1
delete.topic.enable=true
listeners=PLAINTEXT://10.198.170.15:9092
# 认证配置
#inter.broker.listener.name=SASL_PLAINTEXT
#listeners=SASL_PLAINTEXT://10.198.170.15:9092
#security.protocol=SASL_PLAINTEXT
#sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
#sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
# ACL配置
allow.everyone.if.no.acl.found=true
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
num.network.threads=5
num.io.threads=8
socket.send.buffer.bytes=10240000
socket.receive.buffer.bytes=10240000
socket.request.max.bytes=1048576000
log.dirs=/data/kafka/data
num.partitions=1
default.replication.factor=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.198.170.15:2181,10.198.170.16:2181,10.198.170.17:2181
zookeeper.connection.timeout.ms=12000
zookeeper.session.timeout.ms=12000
group.initial.rebalance.delay.ms=500
log.flush.interval.messages=10000
log.flush.interval.ms=1000
num.replica.fetchers=3
replica.fetch.min.bytes=1
replica.fetch.max.bytes = 104857600
unclean.leader.election.enable=false
auto.create.topics.enable = true
min.isync.replicas=2
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replica.lag.time.max.ms =5000
replica.fetch.wait.max.ms = 1000
log.message.timestamp.type=LogAppendTime
log.cleanup.policy = delete
log.roll.hours=168
broker.rack=kafka-rac1
message.max.bytes=10000000
request.timeout.ms=30000

启动kafka
systemctl start kafka

生成加密用户信息

$ kafka-configs.sh --bootstrap-server localhost:9092   --alter --add-config 'SCRAM-SHA-256=[password=gwz3CFEuec9cwzxnd]'   --entity-type users --entity-name admin
# 启动参数里面有这个文件
$ cat /data/kafka/config/kafka_server_scram_jaas.conf 
KafkaServer {org.apache.kafka.common.security.scram.ScramLoginModule requiredusername="admin"password="gwz3CFEuec9cwzxnd";
};

生成后注释listeners=PLANTEXT...,取消注释认证配置下面的5行

listeners=PLAINTEXT://10.198.170.15:9092
# 认证配置
#inter.broker.listener.name=SASL_PLAINTEXT
#listeners=SASL_PLAINTEXT://10.198.170.15:9092
#security.protocol=SASL_PLAINTEXT
#sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
#sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN

三、启动集群

3.1、修改启动参数

按需修改kafka启动参数
1)修改kafka启动脚本中的参数
2)在systemd启动脚本中添加env

3.2、


http://www.ppmy.cn/ops/141163.html

相关文章

利用PHP和GD库实现图片拼接的方法

利用PHP和GD库实现图片拼接的方法主要涉及到加载图片资源、创建目标画布、将图片资源绘制到目标画布上,并最终输出或保存拼接后的图片。以下是实现图片拼接的基本步骤: 加载图片资源: 使用imagecreatefromjpeg()、imagecreatefrompng()或ima…

day08 接口测试(4)知识点完结!!

【没有所谓的运气🍬,只有绝对的努力✊】 目录 1、postman读取外部数据文件(参数化) 1.1 数据文件简介 1.2 导入外部数据文件 1.2.1 csv文件 1.2.2 导入 json文件 1.3 读取数据文件数据 1.4 案例 1.5 生成测试报告 2、小…

selenium学习:等待方式

隐式等待 1.针对查找元素设置最大的超时时间 2.可以全局性的设置 3.不满足时,提示no such element driver.implicitly_wait(5) #对查找元素最大的超时时间,如果超过最大等待时间后,没有找到元素,则会报错:no such #e…

黑马点评项目笔记

代码仓库:https://gitcode.com/xu1feng/hm-dianpnig/overview,欢迎star~ 整体功能架构图 短信登录 导入黑马点评项目 首先,导入数据库SQL文件hmdp.sql。 其中的表有: tb_user:用户表tb_user_info:用户…

L22.【LeetCode笔记】相交链表(新版)

目录 1.题目 代码模板 2.分析 ​编辑 算法误区 正确方法1 但不能通过所有的测试用例 修改后 提交结果 正确方法2 节省代码的技巧 1.题目 https://leetcode.cn/problems/3u1WK4/description/ 给定两个单链表的头节点 headA 和 headB ,请找出并返回两个单…

【深入探讨PostgreSQL:彻底删除数据并释放索引空间】——让数据库空间管理更高效!

全文目录: 开篇语前言 🌟📜 目录1. DELETE真的删除了吗? 🤔2. 删除数据后如何释放索引空间? 📉2.1 VACUUM 🧹2.2 VACUUM FULL 🧹💯2.3 REINDEX 重新索引 &…

【Kubernetes理论篇】容器集群管理系统Kubernetes(K8S)

Kubernetes集群部署基本管理实战 这么好的机会,还在等什么! 01、Kubernetes 概述 K8S是什么 K8S 的全称为 Kubernetes (K12345678S),PS:“嘛,写全称也太累了吧,写”。不如整个缩写 K8s 作为缩写的结果…

从0到1实现项目Docker编排部署

在深入讨论 Docker 编排之前,首先让我们了解一下 Docker 技术本身。Docker 是一个开源平台,旨在帮助开发者自动化应用程序的部署、扩展和管理。自 2013 年推出以来,Docker 迅速发展成为现代软件开发和运维领域不可或缺的重要工具。 Docker 采…