docker安装kafka,并集成springboot进行测试

news/2024/11/27 8:43:14/

大家好,今天我们开始学习kafka中间件,今天我们改变一下策略,不刷视频学习,改为实践学习,在网上找一些案例功能去做,来达到学习实践的目的。

首先,是安装相关组件。

1. docker安装安装

1.1 yum-utils软件包

yum install -y yum-utils

1.2 设置阿里云镜像


yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

1.3 安装docker

yum install docker-ce docker-ce-cli containerd.io 

1.4 启动docker

systemctl start docker

1.5 测试

docker version
docker run hello-world
docker images

至此,docker就安装完毕了。接下来就是安装zookeeper和kafka了,我这里用的是kafka2.x的版本,因此需要结合zookeeper去是使用。现在最新的kafka3.x已经可以抛弃zookeeper去单独使用了,小伙伴们有兴趣的话可以自己去动手安装实践下。

2. 安装zookeeper和kafka

2.1 docker安装zookeeper

docker pull wurstmeister/zookeeper

2.2 启动zookeeper

docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" --restart always wurstmeister/zookeeper 

2.3 docker查看zookeeper容器是否启动

docker ps

 出现以上信息,就代表zookeeper已经安装并启动成功。

2.4 安装kafka

docker pull wurstmeister/kafka

2.5 启动kafka

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=124.223.205.125:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://124.223.205.125:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" -e TZ="Asia/Shanghai" wurstmeister/kafka 

2.6 用docker ps查看kafka是否启动

出现以上信息,就代表kafka启动成功了。

下来就测试一下

3. 发送消息和消费消息

3.1 进入kafka容器

docker exec -it 容器id /bin/bashcd /opt/kafka_2.13-2.8.1/bin/

 3.2 连接生产者

./kafka-console-producer.sh --broker-list localhost:9092 --topic shopping

接下来就可以发送消息了。

 3.3 另起一个窗口,重复3.1的动作进入kafka容器,然后连接消费者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic shopping --from-beginning

这是就能就收消息了。

 到达这里,我们的kafka就安装并测试成功了。

4. 接下来我们就创建Springboot工程来连接kafka进行消息的生产和消费

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.4</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.volga</groupId><artifactId>kafka</artifactId><version>0.0.1-SNAPSHOT</version><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><!-- 阿里巴巴 fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

4.2 我们创建一个订单的实体类

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {/*** 订单id*/private long orderId;/*** 订单号*/private String orderNum;/*** 订单创建时间*/private LocalDateTime createTime;
}

4.3 创建生产者

@Component
@Slf4j
public class KafkaProvider {/*** 消息 TOPIC*/private static final String TOPIC = "shopping";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {// 构建一个订单类Order order = Order.builder().orderId(orderId).orderNum(orderNum).createTime(createTime).build();// 发送消息,订单类的 json 作为消息体ListenableFuture<SendResult<String, String>> future =kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));// 监听回调future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable throwable) {log.info("生产者产生消息 失败 ## Send message fail ...");}@Overridepublic void onSuccess(SendResult<String, String> result) {log.info("生产者产生消息 成功 ## Send message success ...");}});}
}

4.4 创建消费者

@Component
@Slf4j
public class KafkaConsumer {@KafkaListener(topics = "shopping", groupId = "group_id") //这个groupId是在yml中配置的public void consumer(String message) {log.info("消费者消费信息 ## consumer message: {}", message);}
}

4.5 创建测试类

@SpringBootTest
public class SpringBootKafakaApplicationTests {@Autowiredprivate KafkaProvider kafkaProvider;@Testpublic void sendMessage() throws InterruptedException {System.out.println("是否为空??+"+kafkaProvider);// 发送 10 个消息for (int i = 0; i < 10; i++) {long orderId = i+1;String orderNum = UUID.randomUUID().toString();kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());}TimeUnit.MINUTES.sleep(1);}
}

4.6 要创建一个Application方法,不然项目会启动报错

@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class,args);}
}

4.7 配置application.yml

spring:kafka:# 指定 kafka 地址,我这里部署在的虚拟机,开发环境是Windows,kafkahost是虚拟机的地址, 若外网地址,注意修改为外网的IP( 集群部署需用逗号分隔)bootstrap-servers: 服务器ip:9092consumer:# 指定 group_idgroup-id: group_idauto-offset-reset: earliest# 指定消息key和消息体的序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 指定消息key和消息体的序列化方式key-deserializer: org.apache.kafka.common.serialization.StringSerializervalue-deserializer: org.apache.kafka.common.serialization.StringSerializerlistener:# 在侦听器容器中运行的线程数。concurrency: 5#listner负责ack,每调用一次,就立即commitack-mode: manual_immediatemissing-topics-fatal: false

以上就创建项目成功了,我们运行测试方法,就能获取kafka中的消息了。

### 生产消息

 ### 消费消息

这里就是简单实现了kafka的消息生产和消费,后续的kafka复杂场景的实现会持续更新。

我是空谷有来人,谢谢支持。 


http://www.ppmy.cn/news/49685.html

相关文章

URL 转为QR code(二维码)

推荐一个良心的网站&#xff0c;能够免费地将url、text编码为二维码&#xff0c;而且还能设计logo、颜色等。 https://www.the-qrcode-generator.com/ 如下图&#xff1a; 可以自己定义logo、颜色&#xff1a; 还能查看扫描历史等统计信息&#xff1a; 上述所有功能都是免…

01——计算机系统基础

计算机系统基础知识 计算机系统基础一、计算机系统的基本组成1 计算机硬件系统 二、计算机的类型三、计算机的组成和工作原理1 计算机的组成2 总线的基本概念2.1 总线的定义与分类 3 系统总线3.1 系统总线的概念3.2 常见的系统总线 4 外总线5 中央处理单元&#xff08;CPU&…

【Linux】网络配置ifonfig解读

1、配置文件位置 在Linux系统中&#xff0c;IP地址的配置信息通常存储在网络接口配置文件中。不同的发行版可能会将这些文件存放在不同的位置。 以较为流行的Ubuntu和CentOS为例&#xff1a; Ubuntu系统&#xff1a;网络接口配置文件位于/etc/network/interfacesCentOS/RHEL…

【软考备战·希赛网每日一练】2023年4月24日

文章目录 一、今日成绩二、错题总结第一题第二题第三题第四题第五题 三、知识查缺 题目及解析来源&#xff1a;2023年04月24日软件设计师每日一练 一、今日成绩 二、错题总结 第一题 解析&#xff1a; 第二题 解析&#xff1a; DPI表示每英寸像素点的个数。 300DPI表示每英寸…

【图像处理】基于MATLAB的纹理度量方法进行图像分割

目录 基于MATLAB的纹理度量方法进行图像分割 基于MATLAB的纹理度量方法进行图像分割 可以使用MATLAB中的纹理度量方法来进行图像分割。其中,纹理度量是一种用于描述和区分图像中局部纹理特征的方法。下面是基于MATLAB的纹理度量方法进行图像分割的一个简单示例: 使用MATLAB中…

从原理聊JVM(一):染色标记和垃圾回收算法

1 JVM运行时内存划分 1.1 运行时数据区域 •方法区 属于共享内存区域&#xff0c;存储已被虚拟机加载的类信息、常量、静态变量、即时编译器编译后的代码等数据。运行时常量池&#xff0c;属于方法区的一部分&#xff0c;用于存放编译器生成的各种字面量和符号引用。 JDK1.8…

【代码随想录】刷题Day5

1.链表重复节点删除 82. 删除排序链表中的重复元素 II 前后指针实现 1.做这道题最大的感受就是&#xff1a;不要觉得开辟空间浪费&#xff0c;多用临时变量去记录。越精确越容易成功 2.首先没有节点或者一个节点直接返回 3.因为头部会出现一样元素的情况&#xff0c;以至于我不…

ActiveMQ 学习指南:适用于不同级别程序员的技术博客

摘要&#xff1a;本文将为您介绍如何学习 ActiveMQ&#xff0c;无论您是初学者、有一定经验还是高级程序员&#xff0c;都能找到适合自己的学习方法。我们将会介绍 ActiveMQ 的基本概念、环境搭建、使用方法、最佳实践以及常见问题等方面的内容。 目录&#xff1a; ActiveMQ 简…