kafka安装及配置

news/2025/2/21 18:45:30/

1. 下载

下载地址:Apache Kafka

我这里下载的是 3.2.1 版本。

2. 上传并解压

上传到 linux 下的 /home/software/ 目录下,然后解压 kafka_2.13-3.2.1.tgz 包到/usr/local/

cd /home/software
tar -zxvf kafka_2.13-3.2.1.tgz -C /usr/local  # -C 选项的作用是:指定需要解压到的目录。
# 重命名
cd /usr/local
mv kafka_2.13-3.2.1 kafka-3.2.1

 3. 修改kafka配置文件

  vim /usr/local/kafka-3.2.1/config/server.properties

修改内容:

  broker.id=0  # broker的id,每个broker的id必须不一样port=9092    # 服务端口host.name=192.168.11.221  # 主机地址advertised.host.name=192.168.11.221  # 备用主机地址log.dirs=/usr/local/kafka-3.2.1/kafka-logs  # kafka存储消息(log日志数据)的目录num.partitions=5 # 创建topic时默认的分区数量zookeeper.connect=192.168.11.221:2181,192.168.11.222:2181,192.168.11.223:2181  # zookeeper地址

配置文件中的一些配置项解释: 

  • zookeeper.connect :  ZooKeeper服务地址<ip:port>, 多个zk节点用逗号隔开。
  • listeners :用的比较少,表示客户端要连接的broker入口地址列表
  • broker.id :  kafka 节点的标示,每个节点必须不一样
  • log.dir 和 log.dirs  :kafka存储消息(log日志数据)的目录,log.dir配置单个目录,log.dirs可以配置多个目录
  • message.max.bytes:用来指定broker能够接受的单个消息最大值,默认1M左右
  • group.initial.rebalance.delay.ms :这个参数的主要效果就是让 coordinator(调度器) 推迟空消费组接收到成员加入请求后本应立即开启的 rebalance 。在实际使用时,假设你预估你的所有 consumer 组成员加入需要在10s内完成,那么你就可以设置该参数=10000,即表示10s之后重新分配消费者 consumer。

4. 创建kafka存储消息(log日志数据)的目录

由于配置文件里配置的 log.dirs=/usr/local/kafka-3.2.1/kafka-logs,所以要创建一个该目录:

mkdir /usr/local/kafka-3.2.1/kafka-logs

5. 启动kafka

 以配置文件的方式启动,后面的 & 表示后台启动。

/usr/local/kafka-3.2.1/bin/kafka-server-start.sh /usr/local/kafka-3.2.1/config/server.properties &

关闭命令:

/usr/local/kafka-3.2.1/bin/kafka-server-stop.sh

使用 jps 命令检查 kafka 是否启动成功,如下所示则是启动成功:

如果想要搭建集群的话,需要对于其他的虚拟机节点也按照上述方法执行安装,只是其中配置文件里的  broker.id=0 的值需要修改一下,每个节点必须保证不一样。

6. 安装kafka manager可视化管控台

(1)kafka manager 下载

下载地址:kafka-manage-2.0.0.2

把 kafka manager 的压缩包上传到 192.168.31.102 虚拟机上

 (2)解压zip文件

unzip kafka-manager-2.0.0.2.zip -d /usr/local/

(3)修改配置文件:

 vim /usr/local/kafka-manager-2.0.0.2/conf/application.conf

 修改内容:

kafka-manager.zkhosts="192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181"

(4)启动kafka manager 控制台

  /usr/local/kafka-manager-2.0.0.2/bin/kafka-manager &

如果提示权限不够,可以使用 chmod kafka-manager 修改权限 。

 (5)浏览器访问控制台:默认端口号是9000

  http://192.168.31.102:9000/

添加Cluster集群:

 添加 topic :

7. 操作:

(1)通过控制台创建了一个topic为"topic-test" 2个分区 1个副本

(2)消费发送与接收验证

在 192.168.31.101 节点上打开两个终端界面分别用于执行消息的发送和接收。

启动发送消息的脚本

cd /usr/local/kafka-3.2.1/bin
./kafka-console-producer.sh --broker-list 192.168.31.101:9092 --topic topic-test## --broker-list 192.168.31.101 指的是 kafka broker 的地址列表##  --topic topic-test 指的是把消息发送到 topic-test 主题

启动接收消息的脚本

cd /usr/local/kafka-3.2.1/bin
./kafka-console-consumer.sh --bootstrap-server 192.168.31.101:9092 --topic topic-test

截图如下: 

8. 使用 java 代码连接 kafka 节点失败

(1)报错

java端代码启动生产者或者消费者时,报错如下:

 java代码是在我的windows电脑中,而 kafka 是安装在CentOS虚拟机中,虚拟机的 hostname 是 master,报错提示是连接到 master 节点失败,也就是java代码连不上虚拟机中的kafka服务。

(2)解决方案:

修改 kafka 节点上的 配置文件:

  vim /usr/local/kafka-3.2.1/config/server.properties

修改内容为:在配置文件中加入下面一行代码(原来这行代码是被注释掉的):

listeners=PLAINTEXT://192.168.31.101:9092  # 其中的ip地址修改成你的 kafka 节点的ip


http://www.ppmy.cn/news/66955.html

相关文章

Mysql 存储过程+触发器+存储函数+游标

视图&#xff08;view&#xff09; 虚拟存在的表&#xff0c;不保存查询结构&#xff0c;只保存查询的sql逻辑 语法 存储过程 实现定义并存储在数据库的一段sql语句的集合&#xff0c;可减少网络交互&#xff0c;提高性能&#xff0c;代码复用,内容包括&#xff1a;变量&am…

Ubuntu常用终端操作

终端快捷键 打开 Ctrlaltt:打开终端&#xff08;默认路径为家目录&#xff09; Ctrlshiftn&#xff1a;打开终端&#xff08;与当前终端处于同一路径下&#xff09; Ctrlshiftt:打开终端&#xff08;在大终端下面创建小终端&#xff09; alt数字 关闭 exitCtrld 窗口切换 …

CMAKE介绍和使用

CMake是一个跨平台的安装&#xff08;编译&#xff09;工具&#xff0c;可以用简单的语句来描述所有平台的安装(编译过程)。 在 linux 平台下使用 CMake 生成 Makefile 并编译的流程如下&#xff1a; 写 CMake 配置文件 CMakeLists.txt 。执行命令 cmake PATH 或者 ccmake PA…

Linux知识点 -- 常见指令及权限理解

Linux知识点 – 常见指令及权限理解 文章目录 Linux知识点 -- 常见指令及权限理解一、Linux下基本指令1.ls指令 - 列文件或目录信息2.pwd命令 - 显示用户当前所在目录3.cd指令 - 改变工作目录4.touch指令 - 更改文件时间或新建文件5.mkdir指令 - 创建目录 / tree - 以树状形式显…

非法捕捞识别预警系统 yolov7

非法捕捞识别预警系统通过yolov7网络模型AI视频分析技术&#xff0c;非法捕捞识别预警系统模型算法能够对河道湖泊画面场景中出现的非法捕捞行为进行7*24小时不间断智能检测识别实时告警通知相关人员及时处理。Yolo算法采用一个单独的CNN模型实现end-to-end的目标检测&#xff…

Android 中你碰不到但是很重要的类之ActivityThread

作者&#xff1a;Drummor 通过本文能了解一下内容 1、和系统进程打交道的桥头堡 应用进程起来之后ART(Android Runtime)第一站就是ActivityThread&#xff0c;代码层面上就是ActivityThread的main()方法&#xff0c;是不是很熟悉&#xff0c;爷青回啊&#xff0c;这不就是java…

【Linux】基本权限

&#x1f601;作者&#xff1a;日出等日落 &#x1f514;专栏&#xff1a;Linux 任何值得到达的地方&#xff0c;都没有捷径。 目录 Linux权限: 权限的概念&#xff1a; Linux上面的用户分类&#xff1a; Linux权限管理 文件访问者的分类&#xff08;人&#xff09; 文件…

(一)ArcGIS空间数据的转换与处理——投影变换

ArcGIS空间数据的转换与处理——投影变换 原始数据往往由于在数据结构、数据组织、数据表达等方面与用户需求不一致而要进行转换与处理。本节主要介绍 ArGIS 中数据的投影变换内容。 目录 ArcGIS空间数据的转换与处理——投影变换 1.概述2.定义投影3.投影变换3.1栅格数据的投…