目录
一、 简介
1.1、概念
1.2、核心成员
1.3、特点
二、安装
2.1、zookeeper
2.2、上传解压重命名
2.3、修改配置文件
2.4、启动
2.5、一个启停脚本
三、使用
3.1、主题命令行操作
3.1.1查看操作主题命令参数
3.1.2创建 first topic
3.1.3查看当前服务器中的所有 topic
3.1.4查看主题的详情
3.1.5修改分区数
3.2、生产者命令行操作
3.2.1查看操作生产者命令参数
3.2.2发送消息
3.3、消费者命令行操作
3.3.1查看操作消费者命令参数
3.3.2消费主题中的数据
3.3.3把主题中所有的数据都读取出来(包括历史数据)
一、 简介
1.1、概念
Kafka 是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能的数据管道、流分析、数据集成和关键任务应用。作为一个事件流平台,Kafka 不仅处理消息,还处理事件流,即连续的数据流。它支持事件驱动的应用程序,通过事件流来触发业务逻辑。Kafka 提供了高吞吐量的数据传输能力,能够处理大量的数据流,同时通过多副本和持久化机制确保数据的可靠性和持久性。
1.2、核心成员
1)Producer:消息生产者,就是向 Kafka broker 发消息的客户端。
2)Consumer:消息消费者,向 Kafka broker 取消息的客户端。
3)Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消 费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
4)Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。
6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
7)Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower。
8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
9)Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。
1.3、特点
1)高吞吐量:能够每秒处理数十万条消息,通过批量发送和异步处理机制进一步提高了系统的吞吐量。其次,Kafka 设计为低延迟的消息系统,适用于实时数据处理,从生产者发送消息到消费者接收到消息的时间非常短。
2)可扩展性:可以通过增加更多的 Broker 节点来实现水平扩展,并且可以在运行时动态调整集群的大小,无需停机。持久性和可靠性也是 Kafka 的核心优势之一,通过多副本机制和持久化存储,确保数据的持久性和可靠性,即使在 Broker 失效后也能恢复。
3) 支持数据分区和负载均衡:每个 Topic 可以划分为多个 Partition,每个 Partition 是一个有序的消息队列,通过多个 Partition 实现生产者和消费者的负载均衡,提高系统的整体性能。此外,Kafka 保证了消息的顺序性,在一个 Partition 内,消息是按顺序存储和消费的,通过合理的分区策略,可以实现全局顺序性。
4)提供灵活的消费模型:消费者可以加入消费组,共同消费一个 Topic 的消息,实现负载均衡和故障恢复。消费者可以从任意偏移量开始消费消息,支持消息的重播和重新处理。强大的生态系统也是 Kafka 的一大优势,内置的 Kafka Streams 框架用于构建实时数据处理应用程序,Kafka Connect 提供丰富的连接器,用于将 Kafka 与其他数据系统集成,Kafka Manager 则用于监控和管理 Kafka 集群。
5)多种身份验证机制:如 SASL/PLAIN、SASL/SCRAM、SSL 等,并提供细粒度的访问控制,限制不同用户的操作权限,同时支持数据传输的加密,确保数据的安全性。容错性方面,Kafka 具有自动故障恢复机制,当某个 Broker 失效时,会自动选举新的 Leader,确保系统的高可用性,并支持跨数据中心的部署,实现地理上的冗余和容错。
二、安装
2.1、zookeeper
kafka通过zk进行集群管理,并且Kafka集群的broker信息、topic和partition的状态信息都会存储在zk节点上,当有变化时,zk的信息也会更新,zk还负责kafka中分区leader的选举。
所以安装Kafka之前要检查一下zookeeper是否安装,若没有安装可点击下面这篇文章进行安装:
Zookeeper的安装与使用
2.2、上传解压重命名
上传:将压缩包上传到 /opt/moudles 下
解压
tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/installs/
重命名
cd /opt/installs
2.3、修改配置文件
先进入 /opt/installs/kafka3/config 下
cd /opt/installs/kafka3/config
vi server.properties
共有三处需要修改,以下是修改之后的
broker.id=0
log.dirs=/opt/installs/kafka3/datas
zookeeper.connect=bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka #这个需要配置了映射,若是没有配置则需要配置一下 /etc/hosts
环境变量
vi /etc/profile
将下面的复制进去:
export KAFKA_HOME=/opt/installs/kafka3
export PATH=$PATH:$KAFKA_HOME/bin
在外面刷新一下:source /etc/profile
将这些配置分发到另外两台机器上:
scp /opt/installs/kafka3 root@"你另外两台机器的IP地址":/opt/installs
profile文件也要分发
2.4、启动
使用启动脚本 zk.sh 启动 zk 启动Kafka之前必须启动zookeeper
依次在三台机器上启动kafka:
在kafka3下:
bin/kafka-server-start.sh -daemon config/server.properties
关闭kafka命令: bin/kafka-server-stop.sh 三台机器上依次运行
2.5、一个启停脚本
在/usr/local/sbin 目录下创建文件 kf.sh 脚本文件
vim kf.sh
#! /bin/bash
case $1 in
"start"){
for i in bigdata01 bigdata02 bigdata03
do
echo " --------启动 $i Kafka-------"
ssh $i "source /etc/profile; /opt/installs/kafka3/bin/kafka-server-start.sh -daemon /opt/installs/kafka3/config/server.properties"
done
};;
"stop"){
for i in bigdata01 bigdata02 bigdata03
do
echo " --------停止 $i Kafka-------"
ssh $i "source /etc/profile; /opt/installs/kafka3/bin/kafka-server-stop.sh"
done
};;
esac
添加权限 :chmod u+x kf.sh
使用:开启 kf.sh start 停止 kf.sh stop 在第一个节点上运行即可
三、使用
3.1、主题命令行操作
3.1.1查看操作主题命令参数
bin/kafka-topics.sh
3.1.2创建 first topic
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic second
选项说明:
--topic 定义 topic 名
--replication-factor 定义副本数
--partitions 定义分区数
3.1.3查看当前服务器中的所有 topic
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --list
3.1.4查看主题的详情
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic second
3.1.5修改分区数
注意:分区数只能增加,不能减少
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 3
3.2、生产者命令行操作
3.2.1查看操作生产者命令参数
bin/kafka-console-producer.sh
3.2.2发送消息
bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic first
3.3、消费者命令行操作
3.3.1查看操作消费者命令参数
bin/kafka-console-consumer.sh
3.3.2消费主题中的数据
bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic first
生产者发送消息,消费者消费,这边发那边立马就能收到
3.3.3把主题中所有的数据都读取出来(包括历史数据)
bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --from-beginning --topic first