Kafka 3.x.x 入门到精通(07)——Java应用场景——SpringBoot集成

devtools/2024/9/24 7:01:46/

Kafka 3.x.x 入门到精通(07)——Java应用场景——SpringBoot集成

  • 4. Java应用场景——SpringBoot集成
    • 4.1 创建SpringBoot项目
      • 4.1.1 创建SpringBoot项目
      • 4.1.2 修改pom.xml文件
      • 4.1.3 在resources中增加application.yml文件
    • 4.2 编写功能代码
      • 4.2.1 创建配置类:SpringBootKafkaConfig
      • 4.2.2 创建生产者控制器:KafkaProducerController
      • 4.2.3 创建消费者:KafkaDataConsumer
    • 4.3 集成测试
      • 4.3.1 启动ZooKeeper
      • 4.3.2 启动Kafka
      • 4.3.3 启动应用程序
      • 4.3.4 生产数据测试

在这里插入图片描述

在这里插入图片描述

本文档参看的视频是:

  • 尚硅谷Kafka教程,2024新版kafka视频,零基础入门到实战
  • 黑马程序员Kafka视频教程,大数据企业级消息队列kafka入门到精通
  • 小朋友也可以懂的Kafka入门教程,还不快来学

本文档参看的文档是:

  • 尚硅谷官方文档,并在基础上修改 完善!非常感谢尚硅谷团队!!!!

在这之前大家可以看我以下几篇文章,循序渐进:

❤️Kafka 3.x.x 入门到精通(01)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(03)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(04)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(05)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(06)——对标尚硅谷Kafka教程

在这里插入图片描述

4. Java应用场景——SpringBoot集成

Spring Boot帮助您创建可以运行的、独立的、生产级的基于Spring的应用程序。您可以使用Spring Boot创建Java应用程序,这些应用程序可以通过使用java-jar或更传统的war部署启动。
我们的目标是:

  • 为所有Spring开发提供从根本上更快、广泛访问的入门体验。
  • 开箱即用,但随着要求开始偏离默认值,请迅速离开。
  • 提供大型项目(如嵌入式服务器、安全性、指标、健康检查和外部化配置)常见的一系列非功能性功能。
  • 绝对没有代码生成(当不针对原生图像时),也不需要XML配置。

在这里插入图片描述

在这里插入图片描述

4.1 创建SpringBoot项目

4.1.1 创建SpringBoot项目

在这里插入图片描述

在这里插入图片描述

4.1.2 修改pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.atguigu</groupId><artifactId>springboot-kafka</artifactId><version>0.0.1</version><name>springboot-kafka</name><description>Kafka project for Spring Boot</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-json</artifactId><version>5.8.11</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-db</artifactId><version>5.8.11</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

在这里插入图片描述

4.1.3 在resources中增加application.yml文件

spring:kafka:bootstrap-servers: localhost:9092producer:acks: allbatch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 0consumer:group-id: test#消费者组#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费# earliest:无提交记录,从头开始消费#latest:无提交记录,从最新的消息的下一条开始消费auto-offset-reset: earliestenable-auto-commit: true #是否自动提交偏移量offsetauto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交的频率key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 2properties:#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}session.timeout.ms: 120000#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡max.poll.interval.ms: 300000#配置控制客户端等待请求响应的最长时间。#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,#或者如果重试次数用尽,则请求失败。request.timeout.ms: 60000#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置falseallow.auto.create.topics: true#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一heartbeat.interval.ms: 40000#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节#0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制#仍然会返回该消息,以确保消费者可以进行#max.partition.fetch.bytes=1048576  #1Mlistener:#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交#ack-mode: manual_immediatemissing-topics-fatal: true #如果至少有一个topic不存在,true启动失败。false忽略#type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-recordstype: batchconcurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲template:default-topic: "test"
server:port: 9999

4.2 编写功能代码

4.2.1 创建配置类:SpringBootKafkaConfig

package com.atguigu.springkafka.config;public class SpringBootKafkaConfig {public static final String TOPIC_TEST = "test";public static final String GROUP_ID = "test";
}

在这里插入图片描述

4.2.2 创建生产者控制器:KafkaProducerController

package com.atguigu.springkafka.controller;import com.atguigu.springkafka.config.SpringBootKafkaConfig;
import lombok.extern.slf4j.Slf4j;
import cn.hutool.json.JSONUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.*;import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@ResponseBody@PostMapping(value = "/produce", produces = "application/json")public String produce(@RequestBody Object obj) {try {String obj2String = JSONUtil.toJsonStr(obj);kafkaTemplate.send(SpringBootKafkaConfig.TOPIC_TEST, obj2String);return "success";} catch (Exception e) {e.getMessage();}return "success";}
}

在这里插入图片描述

4.2.3 创建消费者:KafkaDataConsumer

package com.atguigu.springkafka.component;import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import com.atguigu.springkafka.config.SpringBootKafkaConfig;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Optional;@Component
@Slf4j
public class KafkaDataConsumer {@KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {for (String message : messages) {final JSONObject entries = JSONUtil.parseObj(message);System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("data"));//ack.acknowledge();}}
}

在这里插入图片描述

4.3 集成测试

4.3.1 启动ZooKeeper

在这里插入图片描述

在这里插入图片描述

4.3.2 启动Kafka

4.3.3 启动应用程序

在这里插入图片描述

在这里插入图片描述

4.3.4 生产数据测试

可以采用任何的工具进行测试,我们这里采用postman发送POST数据

在这里插入图片描述

消费者消费数据

在这里插入图片描述

在这里插入图片描述


http://www.ppmy.cn/devtools/27214.html

相关文章

字节跳动(社招)四面算法原题

TikTok 进展 又是一期定时汇报 TikTok 进展的推文。 上周&#xff0c;美国总统拜登签署了价值 950 亿美元的一揽子对外援助法案。 该法案涉及强制字节跳动剥离旗下应用 TikTok 美国业务&#xff0c;即 针对 TikTok 非卖即禁的"强抢行为"开始进入九个月&#xff08;27…

2024深圳杯数学建模竞赛A题(东三省数学建模竞赛A题):建立火箭残骸音爆多源定位模型

更新完整代码和成品完整论文 《2024深圳杯&东三省数学建模思路代码成品论文》↓↓↓&#xff08;浏览器打开&#xff09; https://www.yuque.com/u42168770/qv6z0d/zx70edxvbv7rheu7?singleDoc# 2024深圳杯数学建模竞赛A题&#xff08;东三省数学建模竞赛A题&#xff0…

WPF控件:密码框绑定MVVM

以下是一种使用 MVVM 模式的方法&#xff1a; 首先&#xff0c;在 ViewModel 中添加一个属性来保存密码&#xff0c;我们可以使用 SecureString 类型。 // 密码变量private SecureString \_password;// 密码属性&#xff0c;用于获取和设置密码public SecureString Password{g…

从MySQL+MyCAT架构升级为分布式数据库,百丽应用OceanBase 4.2的感受分享

本文来自OceanBase的客户&#xff0c;百丽时尚的使用和测试分享 业务背景 百丽时尚集团&#xff0c;作为国内大型时尚鞋服集团&#xff0c;在中国超过300个城市设有直营门店&#xff0c;数量超过9,000家。集团构建了以消费者需求为核心的垂直一体化业务模式&#xff0c;涵盖了…

练习题(2024/5/1)

1二叉树的层平均值 简单 相关标签 相关企业 给定一个非空二叉树的根节点 root , 以数组的形式返回每一层节点的平均值。与实际答案相差 10-5 以内的答案可以被接受。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;[3.00000,14.50000…

【Cortex-M3 CMSIS内核驱动文件详解】1:文件预置宏

文章目录 一、宏定义准备1.1 前置宏定义1.2 Cortex-M3存储器映射基地址宏1.3 硬件抽象层宏 一、宏定义准备 1.1 前置宏定义 #ifdef __cplusplusextern "C" { #endif #define __CM3_CMSIS_VERSION_MAIN (0x01) …

qt嵌入并控制外部程序

一、流程 1、调用Window接口模拟鼠标&#xff0c;键盘事件 POINT point; LPPOINT lpppoint &point; GetCursorPos(lpppoint);//获取鼠标位置 SetCursorPos(point.x, point.y);//设置鼠标位置//鼠标左键按下 mouse_event(MOUSEEVENTF_LEFTDOWN | MOUSEEVENTF_LEFTUP, poi…

c++图论基础(2)

目录 图的存储方式&#xff1a; 邻接矩阵&#xff1a; 代码实现&#xff1a; 邻接表&#xff1a; 代码实现&#xff1a; 邻接矩阵邻接表对比&#xff1a; 带权图&#xff1a; 邻接矩阵存储&#xff1a; 邻接表存储(代码实现)&#xff1a; 图的存储方式&#xff1a; 邻…