windows本地搭建zookeeper和kafka环境

embedded/2024/10/18 11:21:06/

zookeeper_2">zookeeper

zookeeper_3">1.1 下载zookeeper

下载地址
随便进一个站点,默认是新版本,旧版本点击archives进入,选择合适的版本下载,本文使用的是3.7.2
下载时候选择apache-zookeeper-3.7.2-bin.tar.gz 格式的,编译后的,解压后直接使用
位置

在这里插入图片描述

1.2 zk启动

1.2.1 单机启动

解压后进入 /zk/conf/ 复制一份zoo_sample.cfg文件到同级目录,然后更改名称为zoo.cfg。编辑zoo.cfg,确认端口和数据目录

clientPort=2181
dataDir=D:\\xxx\\apache-zookeeper-3.7.2-bin\\data

然后进入zk/bin/目录下,运行zkServer.cmd则启动zk服务器完成

1.2.2 伪集群启动

解压后拷贝3份,同理修改配置zoo.cfg,多了server.x的配置。区别在于端口要配置不一样,因为是一台电脑
dataDir指向自己的目录的data,每个data下面创建myid文件,myid就是下面配置的1,2,3 ,对应conf文件里面的server.x
举例:
D:\env\apache-zookeeper-3.7.2-bin-A\data\myid的内容就是 1
D:\env\apache-zookeeper-3.7.2-bin-B\data\myid的内容就是 2
D:\env\apache-zookeeper-3.7.2-bin-C\data\myid的内容就是 3
在这里插入图片描述
在这里插入图片描述

dataDir=D:\\env\\apache-zookeeper-3.7.2-bin-A\\data
clientPort=2181
server.1=localhost:2887:3887
server.2=localhost:2888:3888
server.3=localhost:2889:3889
dataDir=D:\\env\\apache-zookeeper-3.7.2-bin-B\\data
clientPort=2182
server.1=localhost:2887:3887
server.2=localhost:2888:3888
server.3=localhost:2889:3889
dataDir=D:\\env\\apache-zookeeper-3.7.2-bin-C\\data
clientPort=2183
server.1=localhost:2887:3887
server.2=localhost:2888:3888
server.3=localhost:2889:3889

启动同理,到3个zk的bin目录下启动zkserver,也可以写个bat一次性启动
zk-cluster.bat

@echo off
start cmd /c "D:\\env\\apache-zookeeper-3.7.2-bin-A\\bin\\zkServer.cmd"
start cmd /c "D:\\env\\apache-zookeeper-3.7.2-bin-B\\bin\\zkServer.cmd"
start cmd /c "D:\\env\\apache-zookeeper-3.7.2-bin-C\\bin\\zkServer.cmd"

kafka_67">kafka

下载

下载地址

选择二进制文件下载,我下载的是2.13-3.71
在这里插入图片描述

下载后解压,进入config目录,修改server.properties文件
对接单zk ,这边要和上面zk的端口一样

zookeeper.connect=localhost:2181
log.dirs=D:\\env\\kafka-zk-single\\logs

对接zk集群,这边要和上面zkA、B、C的端口一样

zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
log.dirs=D:\\env\\kafka-zk-cluster\\logs

kafka_86">启动kafka

进入kakfa/bin/windows/目录,启动kafka-server-start.bat 同时指定刚才的配置文件

也可以写个bat,放在外面
在这里插入图片描述
kafka-single.bat

"D:\\env\\kafka\\bin\\windows\\kafka-server-start.bat" "D:\\env\\kafka\\config\\server.properties"

这里提醒一下,不要一个kafka一会对接zk单机,一会对接zk集群,集群id是唯一的,混着用会报错
可以再复制一个kafka出来
类似
kafka-zk-single 用来对接单zk
kafka-zk-cluster 用来对接zk集群

kafkaui_102">kafka-ui

kafka可视化监控,类似的软件很多,这里用 UI for Apache Kafka
主要是方便,一个jar包,直接启动就好了,不过要求java版本高于17? 用idea下载一个,直接指定位置启动就好了,不影响日常jdk8使用
在这里插入图片描述

目录结构

下载好后同目录创建一个配置文件 application.yml。

server:port: 9988
kafka:clusters:-name: localbootstrapServers: localhost:9092

简单解释下,详细配置看文档
一个是修改下端口,默认是8080,可能会冲突
另一个就是配置kafka的地址

然后启动!

C:\Users\Admin\.jdks\graalvm-jdk-17.0.11\bin\java.exe -jar kafka-ui-api-v0.7.2.jar -Dspring.config.additional-location=application.yml

http://localhost:9988/

kafka_132">springboot对接kafka

加依赖、加配置、写代码

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>
spring:kafka:bootstrap-servers: localhost:9092consumer:auto-offset-reset: latestenable-auto-commit: trueauto-commit-interval: 3000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:retries: 0acks: 1key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

3种监听方式

    @KafkaListener(topics = "test", groupId = "test_group")public void receiveMessage(String message) {System.out.println("消费消息:============> " + message);}@KafkaListener(topics = "test", groupId = "test_group1", properties = {"auto.commit.interval.ms=4000"})public void receiveMessage2(ConsumerRecord<String, String> record) {System.out.println("消费record:============> " + record);}@KafkaListener(topics = "test", groupId = "test_group2")public void receiveMessage3(List<ConsumerRecord<String, String>> records) {System.out.println("消费records:============> " + records);}

启动程序。。

测试

进入web页面test的topic里面
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

代码控制台打印:

消费record:============> ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1724315931298, serialized key size = 9, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = 测试key, value = 测试值)
消费消息:============> 测试值
消费records:============> [测试值]

http://www.ppmy.cn/embedded/99358.html

相关文章

前端与rabbitmq通信

本篇如何讲述前端通过stomp.js实现向rabbitmq推送和订阅消息。 开发前准备 rabbitmq 在实现通信前&#xff0c;先需要开启rabbitmq的web stomp Plugin。这个在rabbitmq官网也有详细的说明点这里。 在rabbitmq服务器上输入如下命令即可开启&#xff1a; rabbitmq-plugins enab…

利用puppeteer将html网页生成图片

1.什么是puppeteer&#xff1f; Puppeteer是一个Node库&#xff0c;它提供了一个高级API来通过DevTools协议控制Chromium或Chrome。 可以使用Puppeteer来自动化完成浏览器的操作&#xff0c;官方给出的一些使用场景如下&#xff1a; 生成页面PDF抓取 SPA&#xff08;单页应用…

redis set(集合)类型

集合中的元素是无序的&#xff0c;集合中的元素是不能重复的 1.set add member [member ...] &#xff1a;向集合中添加元素 返回值&#xff1a;成功添加的元素个数 2.smembers key&#xff1a;获取set中的所有元素 3.sismember key member: 判断一个元素在不在set中&#x…

U盘安装Ubuntu24.04,乌邦图,UltralISO

文章目录 前言通过UltraISO&#xff0c;制作启动U盘下载镜像制作工具UltraISO(软碟通)下载ubuntu镜像文件制作启动U盘 安装ubuntu设置root密码&#xff0c;并登陆root 前言 在Ubuntu作为主流的linux系统&#xff0c;有时候使用VMware安装使用&#xff0c;总归有一定的性能损耗…

JavaScript 保留词

JavaScript 保留词 在 JavaScript 中&#xff0c;您不能把这些保留词作为变量、标记或函数名来使用&#xff1a; abstract arguments await* boolean break byte case catch char class* const continue debugger default delete do double else enum* eva…

创新实践:流媒体服务器如何推动WebRTC支持H.265及JS硬软解码(MSE硬解、WASM软解)

为了实现这一全面的解决方案&#xff0c;我们投入了近半年的时间进行调研与研发。我们的主要目标是&#xff1a;让流媒体服务器能够直接传输H.265编码的视频&#xff0c;而无需将其转码为H.264&#xff0c;从而使Chrome浏览器能够无缝解码并播放H.265视频。 值得注意的是&#…

iPhone照片怎么导入电脑?一键导入毫不费力

随着智能手机的普及&#xff0c;我们越来越依赖手机来记录生活的点点滴滴。iPhone作为其中的佼佼者&#xff0c;其高质量的摄像头为用户捕捉了无数珍贵瞬间。然而&#xff0c;随着照片数量的增多&#xff0c;手机存储空间可能会变得捉襟见肘&#xff0c;此时将照片导入电脑既能…

目标检测(Object Detection)

The essence of object detection is to get category and location through input images. 1、CV 计算机视觉概述 2、Classification of Object Detection 基于深度学习的目标检测算法主要分为两类&#xff1a;Two stage 和 One stage 。 &#xff08;1&#xff09;Tow St…