kafka集群部署

devtools/2024/9/24 20:27:09/

1. 系统环境

主机名

IP

角色

软件

kafka-1

192.168.183.155

kafka

jdk8/kafka3.7.0/zookeeper3.8.4/kafka-eagle3.0.1

kafka-2

192.168.183.156

kafka

jdk8/kafka3.7.0/zookeeper3.8.4

kafka-2

192.168.183.137

kafka

jdk8/kafka3.7.0/zookeeper3.8.4

2. 准备

# 关闭SELINUX
sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config
setenforce 0
# 关闭防火墙
systemctl stop firewalld
systemctl disable firewalld

3. 安装Java环境

下载地址:Java Archive Downloads - Java SE 8u211 and later

所有节点执行,将下载的安装包上传至服务器

tar -zxvf jdk-8u381-linux-x64.tar.gz
mv jdk1.8.0_381 /usr/local/java
vim /etc/profile
------------------------------------
export JAVA_HOME=/usr/local/java
export CLASSPATH=.:$JAVA_HOME/jre/lib:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar
PATH=$PATH:$HOME/bin:$JAVA_HOME/bin
------------------------------------
source /etc/profile
java -version

4. 安装zookeeper

下载地址:Apache ZooKeeper

所有节点执行,将下载的安装包上传到服务器

tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz
mv apache-zookeeper-3.8.4-bin /usr/local/zookeeper
mkdir -p /usr/local/zookeeper/data
cd /usr/local/zookeeper
cp conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg
---
# 心跳时间
tickTime=2000
# follow连接leader的初始化连接时间,表示tickTime的倍数
initLimit=10
# syncLimit配置表示leader与follower之间发送消息,请求和应答时间长度。如果followe在设置的时间内不能与leader进行通信,那么此follower将被丢弃,tickTime的倍数
syncLimit=5
# 客户端连接端口
clientPort=2181
# 节点数据存储目录,需要提前创建,注意myid添加,用于标识服务器节点
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/log
server.1=192.168.183.155:2888:3888
server.2=192.168.183.156:2888:3888
server.3=192.168.183.137:2888:3888---
# 修改每个节点zookeeper.properties文件一致 192.168.183.155的myid为1,以此类推
echo 1 > /usr/local/zookeeper/data/myid

启动zookeeper

./bin/zkServer.sh start

验证zookeeper

./bin/zkServer.sh ststus

5. 安装kafka

下载地址:Apache Kafka

所有节点执行,将下载的安装包上传至服务器

tar -zxvf kafka_2.12-3.7.0.tgz
mv kafka_2.12-3.7.0 /usr/local/kafka
cd /usr/local/kafka
cp  config/server.properties  config/server.properties.back# 配置server.propertie
vim config/server.properties
# 修改时去掉注释
------------------------------------
broker.id=155  # 修改 broker.id  id保持不一致
listeners = PLAINTEXT://192.168.183.155:9092  ##侦听端口 为各节点本机ip
advertised.listeners=PLAINTEXT://192.168.183.155:9092  ##节点侦听端口 为各节点本机ip
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka-logs     ## 日志目录
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.183.155:2181,192.168.183.156:2181,192.168.183.137:2181 ### zookeeper 地址
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true    ### 删除属性
-----------------------------------# 配置zookeeper.properties
cp  config/zookeeper.properties  config/zookeeper.properties.back
vim config/zookeeper.properties
-----------------------------------
# Zookeeper的数据存储路径与Zookeeper集群配置保持一致
dataDir=/usr/local/zookeeer/data
-----------------------------------# 配置consumer.properties
cp config/consumer.properties config/consumer.properties.back
vim config/consumer.properties
-----------------------------------
bootstrap.servers=192.168.183.155:9092,192.168.183.156:9092,192.168.183.137:9092
#配置Zookeeper地址
zookeeper.connect=192.168.183.155:2181,192.168.183.156:2181,192.168.183.137:2181
# consumer group id
group.id=test-consumer-group
-----------------------------------# 配置producer.properties
cp config/producer.properties  config/producer.properties.back
vim config/producer.properties
-----------------------------------
bootstrap.servers=192.168.183.155:9092,192.168.183.156:9092,192.168.183.137:9092
# 配置Zookeeper地址
zookeeper.connect=192.168.183.155:2181,192.168.183.156:2181,192.168.183.137:2181
# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none
-----------------------------------# 配置kafka-run-class.sh
cp bin/kafka-run-class.sh bin/kafka-run-class.sh.back
vim bin/kafka-run-class.sh
-----------------------------------
# 行首新增JAVA_HOME配置
export JAVA_HOME=/usr/local/java

启动

cd /usr/local/kafka/
# 启动(每个节点)
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
bin/kafka-server-start.sh -daemon config/server.properties
# 停止(每个节点)
bin/kafka-server-stop.sh config/server.properties

测试,在集群内任意节点执行都可以

export IP=192.168.183.155:9092,192.168.183.156:9092,192.168.183.137:9092
# 创建topic
bin/kafka-topics.sh --create --bootstrap-server $IP --replication-factor 1 --partitions 1 --topic test
# 查看topic
bin/kafka-topics.sh --describe --bootstrap-server $IP --topic test

生产者生成数据

bin/kafka-console-producer.sh --broker-list $IP --topic test

消费者消费数据

bin/kafka-console-consumer.sh --bootstrap-server $IP --topic test --from-beginning
# --from-beginning 从头开始消费,不加该参数则从最新的消息开始消费,之前的丢弃
# --bootstrap-server 将在kafka集群上创建一个名称为“__consumer_offsets”的topic,50个分区,1个副本,用于存放消费者偏移量

删除topic

bin/kafka-topics.sh --delete --bootstrap-server $IP --topic test

6. 安装kafka Eagle

安装mysql5.7

wget  https://dev.mysql.com/get/mysql80-community-release-el7-7.noarch.rpm
yum -y install mysql80-community-release-el7-7.noarch.rpm
yum-config-manager --disable mysql80-community
yum-config-manager --enable mysql57-community
yum clean all && yum makecache
yum -y  install mysql-community-server
systemctl start mysqld
grep 'temporary password' /var/log/mysqld.log
MySQL > ALTER USER 'root'@'localhost' IDENTIFIED BY 'YIERSAN123pp@';
MySQL > GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'YIERSAN123pp@' WITH GRANT OPTION;   #5.7
MySQL > FLUSH PRIVILEGES;
MySQL > create database ke;

安装kafka Eagle

下载地址:EFAK

tar -zxvf kafka-eagle-bin-3.0.1.tar.gz
cd kafka-eagle-bin-3.0.1
tar -zxvf efak-web-3.0.1-bin.tar.gz
mv efak-web-3.0.1 /usr/local/kafka-eagle
cd /usr/local/kafka-eagle# 配置环境变量 
vim/etc/profile
-----------------------------------
export KE_HOME=/usr/local/kafka-eagle
export PATH=$KE_HOME/bin:$PATH
-----------------------------------
source /etc/profilevim conf/system-config.properties
-----------------------------------
# 配置Zookeeper地址
# 注释cluster2
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.183.155:2181,192.168.183.156:2181,192.168.183.137:2181# 配置MySQL
# kafka mysql jdbc driver address
######################################
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://192.168.183.155:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=YIERSAN123pp@

配置文件详解

# 此处设置zookeeper集群的客户端连接地址
efak.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=tdn1:2181,tdn2:2181,tdn3:2181
cluster2.zk.list=xdn1:2181,xdn2:2181,xdn3:2181
# 添加zookeeper acl
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
# Kafka broker nodes online list
cluster1.efak.broker.size=10
cluster2.efak.broker.size=20
# Zookeeper 集群允许连接到的客户端数量
# 如果启用分布式模式,则可以将值设置为4或8
kafka.zk.limit.size=8
# EFAK webui端口访问地址
efak.webui.port=8048
######################################
# EFAK 启用分布式
######################################
efak.distributed.enable=false
# master工作节点将状态设置为master,其他节点将状态设为slave
efak.cluster.mode.status=slave
# efak服务器地址
efak.worknode.master.host=localhost
efak.worknode.port=8085
# Kafka offset storage -- 存储在Kafka集群中,如果存储在zookeeper中,则不能使用此选项
cluster1.efak.offset.storage=kafka
cluster2.efak.offset.storage=kafka
# 是否启用Kafka性能监控图
efak.metrics.charts=false
# EFAK数据保存时间,默认30天
efak.metrics.retain=30
# 如果偏移量超出范围,请启用此属性--仅适用于kafka-sql
efak.sql.fix.error=false
efak.sql.topic.records.max=5000
# Delete kafka topic token -- 设置为删除主题令牌,以便管理员有权删除
efak.topic.token=keadmin
# Kafka sasl authenticate
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
# 如果未设置,则该值可以为空
cluster1.efak.sasl.client.id=
# 添加kafka集群cgroups
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=kafka_ads01,kafka_ads02
cluster2.efak.sasl.enable=true
cluster2.efak.sasl.protocol=SASL_PLAINTEXT
cluster2.efak.sasl.mechanism=PLAIN
cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";
cluster2.efak.sasl.client.id=
cluster2.efak.sasl.cgroup.enable=false
cluster2.efak.sasl.cgroup.topics=kafka_ads03,kafka_ads04
# 使用sqlite存储数据,与MySQL二选一
efak.driver=org.sqlite.JDBC
# '/hadoop/kafka-eagle/db' 路径必须存在
efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
efak.username=root
efak.password=smartloli
# 设置MySQL地址
#efak.driver=com.mysql.jdbc.Driver
#efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#efak.username=root
#efak.password=smartloli

7. 开启Kafka JMX监控

vim /usr/local/kafka/bin/kafka-server-start.sh
# 开启Kafka JMX监控
# 修改Kafka各节点JMX启动配置,开启监控功能
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-server -Xmx1G -Xms1G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70 "export JMX_PORT="9999"
fi

8. 启动kafka Eagle

# 重启kafka
cd /usr/local/kafka-eagle/bin
./ke.sh start


http://www.ppmy.cn/devtools/8738.html

相关文章

Java中stream()的使用

文章目录 一.stream()是什么流的主要特点 二.stream的使用步骤步骤: 三.常见的中间操作和终端操作中间操作终端操作 四.示例 一.stream()是什么 Stream API 提供了一种高级的抽象,使我们可以将集合转换成一种流式的处理模式,从而能大链式地执…

简化安卓操作:利用ADB命令返回主页、首页等操作

介绍: 在日常使用安卓设备时,经常需要返回到主页或者首页。虽然通过手动点击设备上的按钮可以轻松实现,但对于一些需要频繁进行此操作的场景,这种方式可能显得有些繁琐。幸运的是,利用ADB(Android Debug Br…

Mini-Gemini: 探索多模态视觉语言模型的新境界

一、背景 在数字化时代,人工智能的发展正以前所未有的速度推进。特别是在多模态学习领域,结合视觉和语言的能力已成为研究的热点。最近,一篇名为“Mini-Gemini: Mining the Potential of Multi-modality Vision Language Models”的文章在arX…

【QT教程】QT6图形渲染与OpenGL编程

QT6图形渲染与OpenGL编程 使用AI技术辅助生成 QT界面美化视频课程 QT性能优化视频课程 QT原理与源码分析视频课程 QT QML C扩展开发视频课程 免费QT视频课程 您可以看免费1000个QT技术视频 免费QT视频课程 QT统计图和QT数据可视化视频免费看 免费QT视频课程 QT性能优化视频免…

橡胶衬板在化工领域中的应用

橡胶衬板在化工领域中的应用 橡胶衬板,一种以橡胶为主要材料制成的防护层,因其独特的物理和化学性质,在化工领域中得到了广泛的应用。橡胶衬板具有优良的耐腐蚀性、耐磨损性、抗冲击性和密封性,使其在化工设备的防护、密封和连接…

Java反射笔记(自用)

文章目录 一.java程序编译运行过程1. Java源代码2. 编译器3. JVM可执行的字节码4. JVM中的解释器5. 机器可执行的二进制机器码6. 程序运行 二.什么是反射1.反射与字节码的关系2.Class对象工作流程 三.反射的使用1. 获取Class对象2. 创建对象实例3. 访问字段4. 调用方法5. 操作数…

项目实践 | 如何监控Java线程池

ThreadPoolExecutor能实时获取线程池的当前活动线程数、正在排队中的任务数、已经执行完成的线程数、总任务数等。 总任务数 排队任务数 活动线程数 执行完成的线程数。 下面给出一个线程池使用示例,及教你获取线程池状态。 private static ExecutorService es …

公网IP地址如何申请SSL证书?有免费的IP ssl吗?

如果用户没有域名或只有公网IP地址或者不方便使用域名,IP地址ssl证书这一特殊的证书可以为IP地址实现HTTPS的安全保护,提高网站数据传输的安全性。 IP地址申请SSL证书的基本步骤 IP ssl证书下载---注册填写230916https://www.joyssl.com/certificate/sel…