Windows10、CentOS Stream9 环境下安装kafka_2.12-3.6.2记录

server/2024/11/14 15:27:32/

目录

  • Windows下操作
    • 1. 安装kafka [kafka_2.12-3.6.2](https://downloads.apache.org/kafka/3.6.2/kafka_2.12-3.6.2.tgz)
    • 2. 启动Zookeeper
      • 2.1 进入Kafka的config目录,修改zookeeper.properties配置文件
    • 3. 启动kafka
      • 3.1 进入Kafka的config目录,修改server.properties配置文件
    • 4.消费主题
      • 4.1 创建主题
      • 4.2 查询主题
        • 4.2.1 查询指定主题信息
      • 4.3 修改主题
      • 4.4 删除主题
    • 5. 生产数据
      • 5.1 工具操作
    • 6. 消费数据
    • 7. 源码关联(可忽略)
    • 8. Kafka集群部署
      • 8.1 安装Zookeeper
      • 8.2 安装kafka
      • 8.3 启动脚本
  • Linux下操作
    • 1. 安装zookeeper
    • 2. 安装kafka

Windows下操作


1. 安装kafka kafka_2.12-3.6.2

官网: https://kafka.apache.org/downloads

安装Java8(未来Kafka 4.X版本会完全弃用Java8)

文件目录结构如下:

binlinux系统下可执行脚本文件
bin/windowswindows系统下可执行脚本文件
config配置文件
libs依赖类库
licenses许可信息
site-docs文档
logs服务日志

Zookeeper_21">2. 启动Zookeeper

kafka_2.12-3.6.2该版本内部依赖ZooKeeper进行多节点协调调度,已内置ZooKeeper。


Kafkaconfigzookeeperproperties_26">2.1 进入Kafka的config目录,修改zookeeper.properties配置文件

dataDir=F:/Apache/kafka_2.13-3.6.2/data/zk

ZooKeeper数据存放目录

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

启动命令:

zookeeper-server-start.bat ../../config/zookeeper.properties

在解压目录下创建快速启动脚本:zk.cmd

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

可快速启动zookeeper


3. 启动kafka

如果要指定jdk版本,在kafka-run-class.bat里面添加一行set JAVA_HOME=F:/Java/jdk/jdk-17.0.11是可行的


Kafkaconfigserverproperties_60">3.1 进入Kafka的config目录,修改server.properties配置文件

log.dirs=F:/Apache/kafka_2.13-3.6.2/data/kafka

kafka数据存放目录

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

启动命令:

kafka-server-start.bat ../../config/server.properties

在解压目录下创建快速启动脚本:kfk.cmd

call bin/windows/kafka-server-start.bat config/server.properties

4.消费主题

消息发布/订阅(Publish/Subscribe)、将不同的消息进行分类,分成不同的主题(Topic)


4.1 创建主题

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

Kafka通过kafka-topics.bat指令文件进行消息主题操作(主题查询、创建、删除等功能)

  • 调用指令创建主题时,需要传递多个参数,而且参数的前缀为两个横线。

  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。

  • –create : 表示对主题的创建操作,是个操作参数,后面无需增加参数值。

  • –topic : 主题的名称,后面接的参数值一般是见名知意的字符串名称,类似于java中的字符串类型标识符名称,也可以使用数字,只不过最后还是当成数字字符串使用。

创建命令:

kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test

4.2 查询主题

Kafka通过kafka-topics.bat文件进行消息主题操作。(主题的查询,创建,删除等功能)

  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

  • –list : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值

kafka-topics.bat --bootstrap-server localhost:9092 --list

4.2.1 查询指定主题信息
  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

  • –describe : 查看主题的详细信息

  • –topic : 查询的主题名称

kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test

4.3 修改主题

Kafka通过kafka-topics.bat文件进行消息主题操作。(主题的查询,创建,删除等功能)

  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

  • –alter : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值

  • –topic : 修改的主题名称

  • –partitions : 修改的配置参数:分区数量

kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2

4.4 删除主题

Kafka通过kafka-topics.bat文件进行消息主题操作。(主题的查询,创建,删除等功能)

  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开

  • –delete: 表示对主题的删除操作,是个操作参数,后面无需增加参数值。默认情况下,删除操作是逻辑删除,也就是说数据存储的文件依然存在,但是通过指令查询不出来。如果想要直接删除,需要在server.properties文件中设置参数delete.topic.enable=true

  • –topic : 删除的主题名称

kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete

windows系统中由于权限或进程锁定的问题,删除topic会导致kafka服务节点异常关闭。(不然直接直接删除data下的数据)


5. 生产数据

Kafka通过kafka-console-producer.bat文件进行消息生产者操作

  • 调用指令时,需要传递多个参数,而且参数的前缀为两个横线
  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

输入生产数据:

kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

输入消息,回车发送到kafka服务器


5.1 工具操作

Kafka Web UI :

  • kafdrop https://github.com/obsidiandynamics/kafdrop

指定的话:java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar kafdrop-4.0.2.jar --kafka.brokerConnect=localhost:9092

访问http://localhost:9000/

  • kafka Tool(有点难用)
  • Java API
  <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version></dependency></dependencies>创建类:package cn.coisini;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.HashMap;
import java.util.Map;public class KafkaProducerTest {public static void main(String[] args) {// 配置属性集合Map<String, Object> configMap = new HashMap<>();// 配置属性:Kafka服务器集群地址configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka生产者对象,建立Kafka连接// 构造对象时,需要传递配置参数KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// 准备数据,定义泛型// 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key1", "value1");// 生产(发送)数据producer.send(record);// 关闭生产者连接producer.close();}
}

6. 消费数据

消息通过Kafka生产者客户端发送到Kafka服务器中,暂存在Kafka中,我们可通过Kafka消费者客户端对服务器指定主题的消息进行消费。

进入cmd界面

cd F:/Apache/kafka_2.13-3.6.2/bin/windows

Kafka通过kafka-console-consumer.bat文件进行消息消费者操作。

  • 调用指令时,需要传递多个参数,而且参数的前缀为两个横线。
  • –bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。
  • –topic : 主题的名称,后面接的参数值就是之前已经创建好的主题名称。其实这个参数并不是必须传递的参数,因为如果不传递这个参数的话,那么消费者会消费所有主题的消息。如果传递这个参数,那么消费者只能消费到指定主题的消息数据。
  • –from-beginning : 从第一条数据开始消费,无参数值,是一个标记参数。默认情况下,消费者客户端连接上服务器后,是不会消费到连接之前所生产的数据的。也就意味着如果生产者客户端在消费者客户端连接前已经生产了数据,那么这部分数据消费者是无法正常消费到的。所以在实际环境中,应该是先启动消费者客户端,再启动生产者客户端,保证消费数据的完整性。增加参数后,Kafka就会从第一条数据开始消费,保证消息数据的完整性。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

JavaAPi调用:

public class KafkaConsumerTest {public static void main(String[] args) {// 配置属性集合Map<String, Object> configMap = new HashMap<String, Object>();// 配置属性:Kafka集群地址configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚)configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");// 配置属性: 消费者组configMap.put("group.id", "coisini");// 配置属性: 自动提交偏移量configMap.put("enable.auto.commit", "true");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configMap);// 消费者订阅指定主题的数据consumer.subscribe(Collections.singletonList("test"));while ( true ) {// 每隔100毫秒,抓取一次数据ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(100));// 打印抓取的数据for (ConsumerRecord<String, String> record : records) {System.out.println("K = " + record.key() + ", V = " + record.value());}}}
}

7. 源码关联(可忽略)

  • 解压源码:kafka-3.6.2-src.tgz
  • 安装JDK17和Scala2.13

JDK17:https://download.oracle.com/java/17/latest/jdk-17_windows-x64_bin.exe

进入Scala官方网站https://www.scala-lang.org/下载Scala压缩包scala-2.13.12.zip。

在IDEA中安装Scala插件

在项目Platform Settings中的Global Libraries选择scala-2.13.12

  • 安装Gradle

进入Gradle官方网站https://gradle.org/releases/下载Gradle安装包.

新增系统环境GRADLE_HOME,指定gradle安装路径,并将%GRADLE_HOME%\bin添加到path中

Gradle安装及环境变量配置完成之后,打开Windows的cmd命令窗口,输入gradle –version

在解压缩目录中打开命令行,依次执行gradle idea命令

在命令行中执行gradle build --exclude-task test命令

使用IDE工具IDEA打开该项目目录


Kafka_339">8. Kafka集群部署

创建文件夹kafka-cluster

目录不能太深,最好放根目录,避免命令行执行不了。


Zookeeper_346">8.1 安装Zookeeper

使用内置ZooKeeper

  • 解压kafka_2.13-3.6.2.tgz文件,修改目录目录名为zookeeper

  • 修改config/zookeeper.properties文件

    dataDir=F:/kafka-cluster/zookeeper/data
    clientPort=2181
    
    • 数据目录,会自动创建data目录

    • ZooKeeper默认端口为2181


8.2 安装kafka

节点broker

  • 解压kafka_2.13-3.6.2.tgz文件,修改目录目录名为kafka-node-1

  • 修改config/server.properties配置文件

    broker.id=1
    listeners=PLAINTEXT://:9091
    log.dirs=F:/kafka-cluster/kafka-node-1/data
    zookeeper.connect=localhost:2181/kafka
    
    • kafka节点数字标识,集群内具有唯一性

    • 放开注释,监听器 9091为本地端口,如果冲突,需重新指定

    • 数据目录,会自动创建data目录

    • ZooKeeper软件连接地址,2181为默认的ZK端口号 /kafka 为ZK的管理节点

分别拷贝修改目录名kafka-node-2kafka-node-3

配置文件中 broker.id=1 改为 broker.id=2broker.id=3

配置文件中 端口 9091 改为 90929093

配置文件中 数据目录kafka-broker-1 改为 kafka-node-2kafka-node-3


8.3 启动脚本

  • kafka-zookeeper 目录下 创建 zk.cmd

    call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
    
  • kafka-broker-1、2、3目录下 创建 kfk.cmd

    call bin/windows/kafka-server-start.bat config/server.properties
    
  • kafka-cluster 集群目录下 创建启动 cluster.cmd 批处理文件

    cd kafka-zookeeper
    start zk.cmd
    ping 127.0.0.1 -n 10 >nul
    cd ../kafka-node-1
    start kfk.cmd
    cd ../kafka-node-2
    start kfk.cmd
    cd ../kafka-node-3
    start kfk.cmd
    
    • 创建清理和重置kafka数据的 cluster-clear.cmd 批处理文件
cd kafka-zookeeper
rd /s /q data
cd ../kafka-node-1
rd /s /q data
cd ../kafka-node-2
rd /s /q data
cd ../kafka-node-3
rd /s /q data


Linux下操作


1. 安装zookeeper

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.2/apache-zookeeper-3.7.2-bin.tar.gz

tar -zxvf apache-zookeeper-3.7.2-bin.tar.gz

mv apache-zookeeper-3.7.2-bin zookeeper-3.7.2

mkdir zookeeper-3.7.2/zkData

echo “1” > zookeeper-3.7.2/zkData/myid

cd zookeeper-3.7.2/conf/

mv zoo_sample.cfg zoo.cfg

vi zoo.cfg

修改数据目录:

dataDir=/opt/coisini/kafka/zookeeper-3.7.2/zkData

cd …

服务命令:

bin/zkServer.sh start

bin/zkServer.sh stop

bin/zkServer.sh status


2. 安装kafka

官网: https://kafka.apache.org/downloads

如果要指定jdk版本,在kafka-run-class.sh里面添加一行set JAVA_HOME=/opt/coisini/java/jdk-17.0.12是可行的

wget https://downloads.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz

tar -zxvf kafka_2.13-3.6.2.tgz

mkdir kafka_2.13-3.6.2/kfkData

cd kafka_2.13-3.6.2/config/

进入Kafka的config目录,修改server.properties配置文件

log.dirs=/opt/coisini/kafka/kafka_2.13-3.6.2/kfkData

broker的全局唯一编号,只能是数字。

broker.id=0

broker对外暴露的IP和端口 (主机名:9092)

advertised.listeners=PLAINTEXT://coisini:9092

相关命令:

bin/kafka-server-start.sh -daemon config/server.properties

bin/kafka-server-stop.sh

开放端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent

firewall-cmd --reload


(未完结,后续有时间再写)


http://www.ppmy.cn/server/119821.html

相关文章

C语言循环学习

作为初学者&#xff0c;学习C语言中的循环结构是非常重要的&#xff0c;它们能让你轻松地重复执行代码。在C语言中&#xff0c;常用的循环结构主要有for循环和while循环。我们将从基本概念开始&#xff0c;逐步讲解如何使用这两种循环&#xff0c;并通过示例帮助你理解和练习。…

解决ubuntu22.04 gnome-terminal 无法启动的问题

22.04下面默认的python 版本是3.10. 如果你安装了3.8或其它版本&#xff0c;尽量不要去ln -s python3.8 python3修改默认python3版本&#xff0c;不然Terminal会打不开。猜测Terminal可能用到了python的_gi这个库。 可以在xterm或putty远程连上&#xff0c;输入 sudo gnome-te…

96 kHz、24bit 立体声音频ADC芯片GC5358描述

概述&#xff1a; GC5358 是一款高性能、宽采样率、立体声音频模数转换器。其采样率范围是8KHz~96KHz&#xff0c;非常适合从消费级到专业级的音频应用系统。单端模拟输入不需要外围器件。GC5358 音频有两种数据格式&#xff1a;MSB对齐和 I2S 格式&#xff0c;和各种如 DTV、D…

98-策略模式的理解

‌策略模式是一种软件设计模式&#xff0c;它定义了一系列算法&#xff0c;并将每个算法封装起来&#xff0c;使它们可以相互替换。这种模式允许算法的变化不会影响使用算法的客户端&#xff0c;通过将使用算法的责任和算法的实现分割开来&#xff0c;并委派给不同的对象对这些…

java日志框架之JUL(Logging)

文章目录 一、JUL简介1、JUL组件介绍 二、Logger快速入门三、Logger日志级别1、日志级别2、默认级别info3、原理分析4、自定义日志级别5、日志持久化&#xff08;保存到磁盘&#xff09; 三、Logger父子关系四、Logger配置文件 一、JUL简介 JUL全程Java Util Logging&#xff…

ES机制原理

它们内部是如何运行的&#xff1f; 主分片和副本分片是如何同步的&#xff1f; 创建索引的流程是什么样的&#xff1f; ES 如何将索引数据分配到不同的分片上的&#xff1f;以及这些索引数据是如何存储的&#xff1f; 为什么说 ES 是近实时搜索引擎而文档的 CRUD (创建-读取…

蓝桥杯—STM32G431RBT6(LCD的液晶显示,由原理及实践,配置及lcd函数)

目录 一、LCD的原理&#xff08;了解&#xff09; LCD属性 OLED与LCD对比 二、使用步骤 1.引入库 2.代码部分 code.c&#xff08;含详解&#xff09; code.h main.c 效果展示 开源代码&#xff1a; 一、LCD的原理&#xff08;了解&#xff09; LCD属性 LCD 是英文 “…

OpenCV class2-C#+winfrom显示控件使用窗口大小并内存管理

一.控件效果说明 二.代码声明&#xff08;已经循环读取10000次&#xff09; 全局 OpenCvSharp.Point point new OpenCvSharp.Point(0, 0); OpenCvSharp.Size size2; Mat src new Mat(); 初始化 size2 new OpenCvSharp.Size(pictureBox1.Size.Width, pictureBox1.Size.Hei…