kafka小白基础知识

embedded/2025/3/6 11:56:28/

一、Kafka 入门

(一)Kafka 简介

        Kafka 是一个开源的分布式流处理平台,最初由 LinkedIn 开发,后来贡献给了 Apache 软件基金会。它被设计用于处理实时数据流,具有高吞吐量、可扩展性、持久性和容错性等特点。Kafka 主要用于构建实时数据管道和流式应用程序,如日志收集、消息系统、事件驱动架构等。

(二)核心概念

  1. 主题(Topic):Kafka 中的消息以主题为单位进行分类,类似于数据库中的表。生产者将消息发送到特定的主题,消费者从主题中读取消息。
  2. 分区(Partition):每个主题可以被划分为多个分区,分区是 Kafka 实现高吞吐量和可扩展性的关键。消息在分区内是有序的,但在整个主题中不一定有序。
  3. 生产者(Producer):负责将消息发送到 Kafka 主题的应用程序。生产者可以根据需要选择将消息发送到特定的分区。
  4. 消费者(Consumer):从 Kafka 主题中读取消息的应用程序。消费者可以以组的形式存在,同一个组内的消费者共同消费主题中的消息,每个分区只能被组内的一个消费者消费。
  5. 代理(Broker):Kafka 集群中的每个服务器节点称为代理。代理负责存储和管理分区的数据,并处理生产者和消费者的请求。

(三)安装与启动

1. 下载 Kafka

从 Apache Kafka 官方网站(Apache Kafka)下载最新版本的 Kafka。

2. 解压文件

bash

tar -zxvf kafka_2.13 - 3.4.0.tgz
cd kafka_2.13 - 3.4.0
3. 单机安装与启动
启动 ZooKeeper

Kafka 使用 ZooKeeper 来管理集群的元数据,首先启动 ZooKeeper:

bash

bin/zookeeper-server-start.sh config/zookeeper.properties
启动 Kafka 代理

bash

bin/kafka-server-start.sh config/server.properties
4. 集群安装与启动
环境准备

假设我们要搭建一个包含 3 个节点的 Kafka 集群,节点的 IP 地址分别为 192.168.111.2192.168.111.3192.168.111.4。在每个节点上都需要完成 Kafka 的下载和解压操作。

修改配置文件

在每个节点上修改 config/server.properties 文件:

  • 节点 1(192.168.111.2)

properties

broker.id=0
listeners=PLAINTEXT://192.168.111.2:9092
advertised.listeners=PLAINTEXT://192.168.111.2:9092
zookeeper.connect=192.168.111.2:2181,192.168.111.3:2181,192.168.111.4:2181
  • 节点 2(192.168.111.3)

properties

broker.id=1
listeners=PLAINTEXT://192.168.111.3:9092
advertised.listeners=PLAINTEXT://192.168.111.3:9092
zookeeper.connect=192.168.111.2:2181,192.168.111.3:2181,192.168.111.4:2181
  • 节点 3(192.168.111.4)

properties

broker.id=2
listeners=PLAINTEXT://192.168.111.4:9092
advertised.listeners=PLAINTEXT://192.168.111.4:9092
zookeeper.connect=192.168.111.2:2181,192.168.111.3:2181,192.168.111.4:2181

配置说明:

  • broker.id:每个 Kafka 代理的唯一标识符,不能重复。
  • listeners:代理监听的地址和端口。
  • advertised.listeners:对外公布的地址和端口,用于生产者和消费者连接。
  • zookeeper.connect:ZooKeeper 集群的连接地址。
启动 ZooKeeper 集群

在每个节点上启动 ZooKeeper:

bash

bin/zookeeper-server-start.sh config/zookeeper.properties
启动 Kafka 集群

在每个节点上启动 Kafka 代理:

bash

bin/kafka-server-start.sh config/server.properties

(四)创建主题

使用 Kafka 提供的命令行工具创建一个主题:

bash

bin/kafka-topics.sh --create --topic test_topic --bootstrap-server 192.168.111.2:9092 --partitions 3 --replication-factor 1
  • --topic:指定主题名称
  • --bootstrap-server:指定 Kafka 代理的地址
  • --partitions:指定主题的分区数
  • --replication-factor:指定分区的副本数

(五)发送和接收消息

1. 启动生产者

bash

bin/kafka-console-producer.sh --topic test_topic --bootstrap-server 192.168.111.2:9092

在控制台输入消息,按回车键发送。

2. 启动消费者

bash

bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server 192.168.111.2:9092

--from-beginning 表示从主题的开头开始消费消息。

二、Kafka 的安全性

(一)身份验证

Kafka 支持多种身份验证机制,如 SSL/TLS、SASL(Simple Authentication and Security Layer)等。

1. SSL/TLS 身份验证
  • 配置步骤
    • 生成证书和密钥,包括 CA 证书、服务器证书和客户端证书。
    • 在 Kafka 代理的 server.properties 中配置 SSL 相关参数,如 listenersssl.keystore.locationssl.keystore.password 等。
    • 在生产者和消费者的配置中也需要配置相应的 SSL 参数,如 security.protocol=SSLssl.truststore.locationssl.truststore.password 等。
2. SASL 身份验证

SASL 提供了多种认证机制,如 PLAIN、GSSAPI(用于 Kerberos 认证)等。以 PLAIN 机制为例:

  • 配置步骤
    • 在 Kafka 代理的 server.properties 中配置 SASL 相关参数,如 listeners=SASL_PLAINTEXT://:9092sasl.enabled.mechanisms=PLAIN 等。
    • 创建用户和密码文件,并在 server.properties 中指定文件路径。
    • 生产者和消费者配置相应的 SASL 参数,如 security.protocol=SASL_PLAINTEXTsasl.mechanism=PLAIN 等。

(二)授权

Kafka 可以通过 ACL(Access Control List)来控制用户对主题、分区等资源的访问权限。

配置步骤
  • 启用 ACL:在 Kafka 代理的 server.properties 中设置 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  • 创建 ACL 规则:使用 Kafka 提供的命令行工具 kafka-acls.sh 来创建和管理 ACL 规则。例如,允许用户 user1 对主题 test_topic 进行读写操作:

bash

bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.111.2:2181 --add --allow-principal User:user1 --operation Read --operation Write --topic test_topic

(三)数据加密

        Kafka 可以使用 SSL/TLS 对数据进行加密传输,确保数据在网络传输过程中的安全性。配置 SSL/TLS 加密的步骤与 SSL/TLS 身份验证类似,主要是在代理、生产者和消费者的配置中设置相关的 SSL 参数。

三、Kafka 的特性

(一)高吞吐量

        Kafka 的设计目标之一是实现高吞吐量的数据处理。它采用了批量处理、顺序读写磁盘、零拷贝等技术,能够在短时间内处理大量的消息。例如,在一个高并发的日志收集场景中,Kafka 可以轻松应对每秒数万条甚至更多的日志消息。

(二)可扩展性

        Kafka 可以通过增加代理节点来扩展集群的规模,以应对不断增长的数据量和并发请求。新的代理节点可以无缝加入集群,并且 Kafka 会自动进行分区的重新分配,确保数据的均匀分布。

(三)持久性和容错性

        Kafka 将消息持久化存储在磁盘上,并且支持消息的多副本复制。每个分区可以有多个副本,分布在不同的代理节点上。当某个代理节点出现故障时,其他副本可以继续提供服务,保证数据的可用性和持久性。

(四)分布式特性

        Kafka 是一个分布式系统,各个代理节点之间通过 ZooKeeper 进行协调和管理。生产者和消费者可以分布式部署,并行地进行消息的生产和消费,提高系统的整体性能和可靠性。

四、保障 Kafka 数据一致性

(一)副本机制

        Kafka 通过副本机制来保障数据的一致性和可用性。每个分区可以有多个副本,其中一个副本作为领导者(Leader),负责处理所有的读写请求;其他副本作为追随者(Follower),从领导者副本同步数据。

工作原理
  • 生产者将消息发送到领导者副本,领导者副本将消息写入本地日志,并向追随者副本进行同步。
  • 追随者副本从领导者副本拉取消息,并写入本地日志。当追随者副本确认收到消息后,会向领导者副本发送确认信息。
  • 领导者副本只有在收到大多数副本(超过半数)的确认信息后,才会向生产者返回消息写入成功的响应。

(二)ISR(In-Sync Replicas)机制

        ISR 是指与领导者副本保持同步的追随者副本集合。Kafka 通过 ISR 机制来确保数据的一致性和可用性。

工作原理
  • 领导者副本会定期检查追随者副本的同步状态,如果某个追随者副本的同步延迟超过一定阈值,领导者副本会将其从 ISR 中移除。
  • 只有当消息被写入 ISR 中的所有副本后,才被认为是已提交的消息。消费者只能消费已提交的消息,从而保证了数据的一致性。

(三)acks 参数

生产者在发送消息时,可以通过设置 acks 参数来控制消息的确认机制,从而影响数据的一致性。

参数取值及含义
  • acks=0:生产者发送消息后,不需要等待任何确认信息,直接认为消息发送成功。这种方式吞吐量最高,但可能会丢失消息,数据一致性最差。
  • acks=1:生产者发送消息后,只需要等待领导者副本确认收到消息,就认为消息发送成功。这种方式在一定程度上保证了数据的一致性,但如果领导者副本在消息同步给追随者副本之前出现故障,可能会丢失消息。
  • acks=all 或 acks=-1:生产者发送消息后,需要等待 ISR 中的所有副本都确认收到消息,才认为消息发送成功。这种方式数据一致性最高,但吞吐量相对较低。

http://www.ppmy.cn/embedded/170471.html

相关文章

学网络安全报班可靠吗?

在当今社会,网络安全已经成为我们工作和生活中不可忽视的重要部分,而且市场上各大企业对网络安全人才的需求量非常之大,因此网络安全培训班应运而生,那么学网络安全报培训班靠谱吗?这是很多小伙伴都关心的问题,我们来…

双链路提升网络传输的可靠性扩展可用带宽

为了提升网络传输的可靠性或增加网络可用带宽, 通常使用双链路冗余备份或者双链路聚合的方式。 本文介绍几种双链路网络通信的案例。 5GWiFi冗余传输 双Socket绑定不同网络接口:通过Android的ConnectivityManager绑定5G蜂窝网络和WiFi的Socket连接&…

JavaEE基础之- ajax

1. 初始AJAX(熟悉) 1.1. AJAX介绍 AJAX 异步 JavaScript 和 XML。 AJAX 是一种用于创建快速动态网页的技术。 js jQuery 通过在后台与服务器进行少量数据交换,AJAX 可以使网页实现异步更新。这意味着可以在不重新加载整个网页的情况下,对网页的某部分…

网络安全的八大机制

文章目录 第一章 网络安全概述与环境配置第二章 网络安全协议基础第四章 网络扫描与网络监听第五章 网络入侵第六章 网络后门与网络隐身第八章 操作系统安全基础第九章 密码学与信息加密第十章 防火墙与入侵检测第十一章 IP安全和WEB安全 第一章 网络安全 概述与环境配置 1.狭…

MySQL官网驱动下载(jar包驱动和ODBC驱动)【详细教程】

1.打开MySQL的官网,选择下载(Download) MySQL[这里是图片001]https://www.mysql.com/cn/ 2.往下划点击MySQL Community(GPL)Downloads 3.要下载MySQL的jar包的选择Connector/J 4.进入后,根据自己的需求选择相应的版本 5.下载完成后,进行解压…

ASP .NET Core 学习(.NET9)Serilog日志整合

Serilog 是一个功能强大的 .NET 日志库,以其简洁的配置和灵活的输出方式而受到开发者喜爱。支持多种日志输出目标(如控制台、文件、数据库等),并且可以通过结构化日志的方式记录丰富的上下文信息,便于后续的日志分析和…

解决redis lettuce连接池经常出现连接拒绝(Connection refused)问题

一.软件环境 windows10、11系统、springboot2.x、redis 6 7 linux(centos)系统没有出现这问题,如果你是linux系统碰到的,本文也有一定大参考价值。 根本思路就是:tcp/ip连接的保活(keepalive)。 二.问题描述 在spr…

【FAQ】HarmonyOS SDK 闭源开放能力 —Map Kit(5)

1.问题描述: 提供两套标准方案,可根据体验需求选择: 1.地图Picker(地点详情) 用户体验:①展示地图 ②标记地点 ③用户选择已安装地图应用 接入文档:https://developer.huawei.com/consumer/cn/doc/harmonyos-guide…