目录
kafka%E7%AE%80%E4%BB%8B-toc" style="margin-left:40px;">一、kafka简介
1. 主要特点
2.组件介绍
3.消息中间件的对比
二、环境准备
1.Java环境
2.Zookeeper环境
3.硬件环境集群
三、Zookeeper的集群部署
1.下载zookeeper
2.部署zookeeper集群
(1)node1节点服务器
(2)node2节点服务器和node3 节点服务器重复上述node1节点安装zookeeper步骤
(3)启动node1、node2、node3节点的zookeeper
kafka%E9%9B%86%E7%BE%A4-toc" style="margin-left:40px;">编辑 四、搭建kafka集群
kafka%E5%AE%89%E8%A3%85%E5%8C%85-toc" style="margin-left:80px;">1.下载kafka安装包
kafka%E9%9B%86%E7%BE%A4-toc" style="margin-left:80px;">2.部署kafka集群
(1)node1节点部署
kafka%E6%AD%A5%E9%AA%A4-toc" style="margin-left:120px;">(2)node2节点服务器、node3 节点服务器重复上述node1节点安装kafka步骤
kafka%E9%9B%86%E7%BE%A4-toc" style="margin-left:120px;">(3)启动kafka集群
(4)测试集群
kafka%E7%AE%80%E4%BB%8B">一、kafka简介
Apache Kafka 是一个开源的分布式事件流平台,最初由LinkedIn开发,并于2011年贡献给了Apache软件基金会。Kafka被设计为一个高吞吐量、可扩展且容错的消息系统,它能够处理大量的实时数据流。Kafka广泛应用于构建实时数据管道和流应用。
1. 主要特点
(1)发布/订阅消息模型:Kafka支持传统的发布/订阅模式,允许多个消费者订阅并处理相同的消息流。
(2)持久化存储:消息在Kafka中被持久化到磁盘,并且可以根据配置保留一段时间(例如7天)。这使得Kafka可以作为可靠的存储系统来使用。
(3)高吞吐量:Kafka设计用于支持每秒百万级别的消息吞吐量,适用于大规模的数据处理场景。
(4)水平扩展性:可以通过增加更多的broker节点来轻松扩展Kafka集群,以处理更大的流量。
(5)多租户能力:支持多个生产者和消费者共享同一集群,同时保持隔离性和安全性。
(6)零拷贝原理:使用操作系统级别的零拷贝技术来优化网络传输效率,减少CPU使用率。
(7)分区机制:每个topic可以分成多个partition,这些partition可以分布在不同的broker上,从而实现负载均衡和故障转移。
(8)消费组:一组消费者可以组成一个消费者组,共同消费一个topic下的不同partition,确保每个partition只被组内的一个消费者消费。
2.组件介绍
Broker:Kafka集群中的单个服务器,负责存储消息和处理客户端请求。
Topic:特定类型的消息流,是消息的分类或主题。
Partition:每个topic可以分为多个partition,提高并行处理能力和吞吐量。
Producer:向Kafka发送消息的应用程序。
Consumer:从Kafka读取消息的应用程序。
Consumer Group:一组消费者的集合,它们共同消费一个topic的所有消息。
Offset:每个partition中的消息都有一个唯一的序列号,称为offset。
Zookeeper:虽然不是Kafka的核心组件,但早期版本的Kafka依赖Zookeeper来管理集群元数据。新版本中已经内置了替代方案,减少了对Zookeeper的依赖。
3.消息中间件的对比
二、环境准备
1.Java环境
需要Java 8或更高版本(JDK)。Kafka是用Scala编写的,并且运行在JVM之上。
安装步骤参考:Linux环境下离线安装jdk1.8(内置最新的jdk安装包x64)_linux 离线安装jdk1.8-CSDN博客
2.Zookeeper环境
Kafka使用Zookeeper来管理集群元数据。
3.硬件环境集群
采用三台服务器部署kafka和zookeeper集群。
node1 | node2 | node3 |
Zookeeper | Zookeeper | Zookeeper |
Kafka | Kafka | Kafka |
三、Zookeeper的集群部署
1.下载zookeeper
地址:Apache ZooKeeper
2.部署zookeeper集群
(1)node1节点服务器
下载zookeeper包之后上传服务器
# 1.解压 zookeeper
tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz# 2.移动到/usr/local/目录下方便管理
mv apache-zookeeper-3.8.4-bin /usr/local/zookeeper-3.8.4# 3.切换到/usr/local/zookeeper-3.8.4/conf 配置目录下
cp zoo_sample.cfg zoo.cfg# 4.编辑配置文件
vim zoo.cfg
zoo.cfg文件新增如下内容: 数据目录和日志目录需要提前创建
说明:server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
节点标记
myid配置:在/data/zookeeper/data设置myid, 这个myid的数字跟配置文件里面的server id对应
echo 1 > /data/zookeeper/data/myid
配置环境变量
# 打开环境变量的文件
vim /etc/profile# 文件末尾添加如下内容
export ZOOKEEPER_HOME=/usr/local/zookeeper-3.8.4
export PATH=$ZOOKEEPER_HOME/bin:$PATH# 保存并退出 刷新环境变量
source /etc/profile
到此node1节点服务器zookeeper部署完成
(2)node2节点服务器和node3 节点服务器重复上述node1节点安装zookeeper步骤
注意:记得修改节点标记 myid 改为 2 和 3,若没改启动时会报错。
(3)启动node1、node2、node3节点的zookeeper
# 启动三台机器zookeeper
zkServer.sh start# 查看集群状态
zkServer.sh status# 停止命令
zkServer.sh stop
从下面的截图就能看出来 node1节点为主节点,其他两个是从节点。
kafka%E9%9B%86%E7%BE%A4"> 四、搭建kafka集群
kafka%E5%AE%89%E8%A3%85%E5%8C%85">1.下载kafka安装包
地址:Apache Kafka
kafka%E9%9B%86%E7%BE%A4">2.部署kafka集群
(1)node1节点部署
# 1.将下载好的kafka包上传服务器
# 2.解压
tar -zxvf kafka_2.12-3.8.0.tgz# 3.将解压后的目录移动到 /usr/local/下
mv kafka_2.12-3.8.0 /usr/local/kafka_2.12-3.8.0# 4.切换到目录下 编辑kafka配置文件
cd /usr/local/kafka_2.12-3.8.0/config
vim server.properties
主要修改如下所示几个配置信息
#--------------------------------配置文件-------------------------------------------
# broker的全局唯一编号,不能重复,多个节点需要修改 0、1、2
broker.id=0
# 监听
listeners=PLAINTEXT://:9092 #开启此项
# 日志目录
log.dirs=/data/kafka/kafka_logs #修改日志目录(需提前创建)
# 配置zookeeper的连接(如果不是本机,需要该为ip或主机名)多个地址 用,隔开
zookeeper.connect=cong11:2181,cong12:2181,cong13:2181
#--------------------------------配置文件-------------------------------------------
配置环境变量
# 打开环境变量的文件
vim /etc/profile# 文件末尾添加如下内容
export KAFKA_HOME=/usr/local/kafka_2.12-3.8.0
export PATH=$KAFKA_HOME/bin:$PATH# 保存并退出 刷新环境变量
source /etc/profile
kafka%E6%AD%A5%E9%AA%A4">(2)node2节点服务器、node3 节点服务器重复上述node1节点安装kafka步骤
或者 你直接将节点1的kafka目录分别复制到节点2和节点3上,修改内容如下
node2节点:
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://ip:9092
node3节点
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://ip:9092
在添加环境变量 刷新环境变量。
kafka%E9%9B%86%E7%BE%A4">(3)启动kafka集群
# 后台启动服务(3台都启动)
kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.8.0/config/server.properties
可以通过 jps 或者 ps -ef|grep kafka 查看进程
(4)测试集群
1)创建一个测试主题(node1)
kafka-topics.sh --create --topic test --partitions 3 --replication-factor 3 --bootstrap-server ip1:9092
- --replication-factor:指定副本数量
- --partitions:指定分区数量
- --topic:主题名称
注意:在node1节点上创建之后,会同步到node2、node3节点主题信息。说明你集群搭建成功。
2)查看刚刚创建的主题信息
kafka-topics.sh --describe --topic test --bootstrap-server IP1:9092
说明
Partition: 0
:分区编号为0。Leader: 0
:当前分区的leader broker是broker 0。Replicas: 0,2,1
:该分区的所有副本分布在broker 0、broker 2和broker 1上。Isr: 0,2,1
:与leader保持同步的副本列表(In-Sync Replicas),即broker 0、broker 2和broker 1。Elr: N/A
:选举中的leader(Elected Leader Replica),这里没有显示。LastKnownElr: N/A
:上次已知的选举中的leader,这里也没有显示。
3)查看所有的topic命令和查看指定的topic命令
# 列出指定的topic
kafka-topics.sh --bootstrap-server ip:9092 --list --topic test# 列出所有的topic
kafka-topics.sh --bootstrap-server ip:9092 --list
4)在node1上创建生产者(producer)
kafka-console-producer.sh --broker-list ip:9092 --topic test
随便写一些测试消息
5) 在node2上测试消费消息
kafka-console-consumer.sh --bootstrap-server IP1:9092,IP2:9092,IP3:9092 --topic test --from-beginning
-
指定Kafka集群的多个broker地址。这里列出了三个broker:--bootstrap-server IP1:9092,IP2:9092,IP3:9092
:JClouds:9092
,Book:9092
, 和Client01:9092
。这些broker地址用于初始化连接。Kafka客户端会使用这些地址来发现整个集群的其他broker。 -
从主题的最早可用偏移量(offset)开始消费消息。如果没有这个选项,消费者将从最新的偏移量开始消费。--from-beginning
:
6)删除topic
kafka-topics.sh --delete --bootstrap-server IP1:9092 --topic test
参考:Kafka 集群部署_kafka集群安装部署-CSDN博客