kafka安装及收发消息

ops/2024/9/24 7:25:40/

kafka需要与zookeeper配合使用,但是从2.8版本kafka引入kraft,也就是说在2.8后,zookeeper和kraft都可以管理kafka集群,这里我们依然采用zookeeper来配合kafka
1、首先我们下载zookeeper
下载地址为
https://zookeeper.apache.org/releases.html#download
(选择版本为3.7.2)
然后上传到服务器,解压后如下

drwxr-xr-x. 2 limuzi limuzi  4096 Oct  6  2023 bin
drwxr-xr-x. 2 limuzi limuzi    92 May 12 01:33 conf
drwxr-xr-x. 5 limuzi limuzi  4096 Oct  6  2023 docs
drwxrwxr-x. 2 limuzi limuzi  4096 May  9 06:04 lib
-rw-r--r--. 1 limuzi limuzi 11358 Oct  6  2023 LICENSE.txt
drwxrwxr-x. 2 limuzi limuzi    74 May  9 06:05 logs
-rw-r--r--. 1 limuzi limuzi  2084 Oct  6  2023 NOTICE.txt
-rw-r--r--. 1 limuzi limuzi  2214 Oct  6  2023 README.md
-rw-r--r--. 1 limuzi limuzi  3570 Oct  6  2023 README_packaging.md

进入到conf目录下,执行如下命令,copy一个zk的配置文件

cp zoo_sample.cfg zoo.cfg

进入到zoo.cfg的配置文件中,修改相关参数

dataDir=/home/limuzi/software/zookeeper/zookeeper_data

如上所示修改zk的数据存储位置

clientPort=2181

zk的启动端口为2181,这里我们不做修改
2、下载3.5版本的kafka
下载地址为:
https://kafka.apache.org/downloads
如图所示:
在这里插入图片描述
2.13是scala语言的版本,整个大版本是3.5的
(因为kafka的producer 和 consumer是用java语言写的,broker是用scala语言写的)
把压缩包上传到服务器解压,进入解压后的config文件,修改以下参数,

#设置broker的消息存储时间为180s,180s后消息删除
log.retention.ms=180000
#设置zk的连接地址,我这里是zk的单节点,不是集群,且启动端口是默认的2181
zookeeper.connect=localhost:2181
#listeners 是 Kafka 集群中用于接受客户端连接的网络监听器配置参数。通过设置 listeners 参数,你可以指定 Kafka 服务器监听客户端连接的网络地址和端口
#在 Kafka 的 server.properties 配置文件中,listeners 参数的默认值为 PLAINTEXT://hostname:9092,
#其中 hostname 是 Kafka 服务器的主机名或 IP 地址。这意味着 Kafka 服务器将在默认端口 9092 上监听来自客户端的连接请求
#这里要设置,要不然在java客户端连接不上kafka
listeners=PLAINTEXT://192.168.47.145:9092
#当前broker的id,这里我们是测试单节点,所以不用修改,
#如果是kafka集群(假如有三台服务器),那么分别需要修改为0,1,2
broker.id=0
#kafka文件存储的位置,整个文件里面存储有log文件和index文件,关于这两个文件的作用后续会说明
log.dirs=/home/limuzi/software/kafka_2.13-3.5.0/kafka_data

3、启动zk和kafka
首先启动zk,进入到zk安装的bin目录下,执行如下命令

./zkServer.sh start

查看zk是否启动成功

./zkServer.sh status

如果 ZooKeeper 服务已经成功启动,应该会显示类似于 Mode: standalone 的信息
启动kafka
进入到kafka安装目录的bin目录下

./kafka-server-start.sh ../config/server.properties &

加上 & 表示后台启动
4、在控制台测试收发消息
首先用命令创建topic,进入到安装目录bin目录下执行如下脚本,创建了一个topic=test,分区为1,副本数为1

./bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

开启生产者:进入到bin目录下,执行如下命令

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

然后在控制台可以输入消息
开启消费者:新开一个窗口,进入到bin目录下,执行如下命令

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

在控制台可以看到刚刚生产者发送的消息


http://www.ppmy.cn/ops/40270.html

相关文章

Python numpy np.clip() 将数组中的元素限制在指定的最小值和最大值之间

🍉 CSDN 叶庭云:https://yetingyun.blog.csdn.net/ numpy.clip:https://numpy.org/doc/stable/reference/generated/numpy.clip.html numpy.clip(a, a_min, a_max, outNone, **kwargs)下面这段示例代码使用了 Python 的 NumPy 库来实现一个简…

Linux 之 tail 命令

一、基本语法 tail [option] [file] 其中 option 是可选参数,用于定制命令的行为,file 则是要处理的目标文件名。 二、常用参数 几个常用的 option 选项: -n:显示文件的最后 n 行,默认为 10 行。-f:实…

SQLZOO:Using Null

数据表:teacher-dept teacher iddeptnamephonemobile1011Shrivell275307986 555 12341021Throd275407122 555 19201031Splint2293104Spiregrain32871052Cutflower321207996 555 6574106Deadyawn3345... dept idname1Computing2Design3Engineering... Q1 List the t…

经典的设计模式和Python示例(一)

目录 一、工厂模式(Factory Pattern) 二、单例模式(Singleton Pattern) 三、观察者模式(Observer Pattern) 一、工厂模式(Factory Pattern) 工厂模式(Factory Pattern…

uniapp引用第三方组件样式无法穿透

在通过uniapp编写小程序过程中发现,引用第三方组件库的样式无法穿透修改。微信小程序文档也给出对应的解决思路自定义组件样式穿透 组件样式隔离 默认情况下,自定义组件的样式只受到自定义组件 wxss 的影响。除非以下两种情况: 指定特殊的…

云计算的优势与未来发展

随着数字化转型的蓬勃发展,云计算作为信息技术应用的基础设施,逐渐成为企业的首选。云计算以其诸多优势和未来发展趋势,为企业带来了更高效、灵活和创新的IT解决方案,助力企业实现数字化转型和业务发展。 云计算的优势 首先&…

81.网络游戏逆向分析与漏洞攻防-移动系统分析-飞天遁地的实现与面向计算

免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 如果看不懂、不知道现在做的什么,那就跟着做完看效果,代码看不懂是正常的,只要会抄就行,抄着抄着就能懂了 内容…

Oracle 修改数据库的字符集

Oracle 修改数据库的字符集 alter system enable restricted session; alter database "cata" character set ZHS16CGB231280; alter database "cata" national character set ZHS16CGB231280; alter system enable restricted session; alter database…