003 SpringBoot集成Kafka操作

server/2025/2/27 16:45:19/

4.SpringBoot集成Kafka

文章目录

  • 4.SpringBoot集成Kafka
      • 1.入门示例
      • 2.yml完整配置
      • 3.关键配置注释说明
        • 1. 生产者优化参数
        • 2. 消费者可靠性配置
        • 3. 监听器高级特性
        • 4. 安全认证配置
      • 4.配置验证方法
      • 5.不同场景配置模板
        • 场景1:高吞吐日志收集
        • 场景2:金融级事务消息
        • 场景3:跨数据中心同步
    • 5.高级配置
      • 1.事务支持
      • 2.消息重试与死信队列

来源参考的deepseek,如有侵权联系立删

1.入门示例

1.pom依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2.KafkaProducer消息生产者配置

@Component
@Slf4j
public class KafkaProducer {private HashMap map=new HashMap<>();@Autowiredprivate KafkaTemplate<Integer,String> kafkaTemplate;public void send(String topic,String msg){log.info("开始发送消息,topic:{};message:{}",topic,msg);ListenableFuture<SendResult<Integer,String>> send=kafkaTemplate.send(topic, msg);//消息确认机制send.addCallback(new ListenableFutureCallback<SendResult<Integer,String>>(){@Overridepublic void onSuccess(SendResult<Integer, String> result) {log.info("消息发送成功,topic:{};message:{}",topic,msg);}@Overridepublic void onFailure(Throwable ex) {//落库操作map.put(topic,msg);}});}
}

springboot3.x写法

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;@Service
@RequiredArgsConstructor
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;// 同步发送(阻塞等待确认)public void sendMessageSync(String topic, String key, String value) {kafkaTemplate.send(topic, key, value).whenComplete((result, ex) -> {if (ex == null) {System.out.printf("发送成功:topic=%s, partition=%d, offset=%d%n",result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());} else {System.err.println("发送失败:" + ex.getMessage());}});}// 异步发送(默认方式)public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message);}
}
  • Spring Boot 2.xsend() 返回 ListenableFuture<SendResult>,支持 addCallback() 回调。
  • Spring Boot 3.xsend() 返回 CompletableFuture<SendResult>,弃用 ListenableFuture,因此需要使用 CompletableFuture 的 API(如 whenComplete)。

3.KafkaConsumer消息消费

@Component
@Slf4j
public class KafkaConsumer {private List<String> exist=new ArrayList<>();@KafkaListener(topics = {"lx"},groupId = "lx")public void consumer(ConsumerRecord<Integer,String> record){if (exist.contains(record.value())){log.error("不满足幂等校验!!!");}log.info("消息消费成功,topic:{},message:{}", record.topic(), record.value());exist.add(record.value());}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumerService {// 单个消息消费(手动提交偏移量)@KafkaListener(topics = "test-topic", groupId = "spring-group")public void listenMessage(String message, Acknowledgment ack) {System.out.println("收到消息:" + message);ack.acknowledge(); // 手动提交}// 批量消费(需配置 listener.type=batch)@KafkaListener(topics = "batch-topic", groupId = "spring-group")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));ack.acknowledge();}
}

4.yml配置文件

生产者配置

#kafka配置
spring:kafka:#kafka集群地址# bootstraps-server: 192.168.25.100:9092bootstrap-servers: 47.122.26.22:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后,会发送properties:linger.ms: 1# 禁止生产者触发 Topic 创建请求allow.auto.create.topics: false#代表集群中从节点都持久化后才认为发送成功acks: -1

消费者配置

spring:kafka:#kafka集群地址bootstraps-server: 192.168.25.100:9092consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000# 禁用生产者触发 Topic 元数据请求时自动创建allow.auto.create.topics: falsegroup-id: testauto-offset-reset: earliestlistener:ack-mode: manual_immediate # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch

5.实体类

@Data
public class KafkaRequest {/*** 主题*/private String topic;/*** 消息*/private String message;
}

6.消息发送

@RestController
@Slf4j
public class KafkaController {private final String topic="lx";private int temp=1;@Autowiredprivate KafkaProducer producer;/*** 下单** @param kafkaRequest* @return null*/@RequestMapping("/test01")public void test01(KafkaRequest kafkaRequest){log.info("test01测试成功!topic:{};message:{}",kafkaRequest.getTopic(), kafkaRequest.getMessage());producer.send(kafkaRequest.getTopic(), kafkaRequest.getMessage());}@RequestMapping("/test02")public void test02(KafkaRequest kafkaRequest){log.info("test02测试成功!topic:{};message:{}",topic, temp);producer.send(topic, String.valueOf(temp));temp++;}
}

kafka启动方式

./kafka-server-start.sh  ../config/server.properties

2.yml完整配置

spring:kafka:# 基础配置(必填项)bootstrap-servers: localhost:9092  # Kafka集群地址,多节点用逗号分隔 client-id: spring-boot-app         # 客户端标识(日志追踪用)# 生产者配置 producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer   # 键序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer  # 值序列化器acks: all                         # 消息确认机制:all表示所有副本确认(最高可靠性)retries: 5                        # 发送失败重试次数(需配合幂等性使用)batch-size: 16384                 # 批量发送缓冲区大小(单位:字节)linger-ms: 50                     # 发送延迟等待时间(毫秒,提高吞吐量)buffer-memory: 33554432           # 生产者内存缓冲区大小(默认32MB)compression-type: snappy          # 消息压缩算法(可选gzip/lz4/zstd)transaction-id-prefix: tx-        # 开启事务时需配置前缀(需配合@Transactional)# 消费者配置 consumer:group-id: app-consumer-group      # 消费者组ID(同一组共享分区)auto-offset-reset: earliest       # 无Offset时策略:earliest(从头)/latest(最新)enable-auto-commit: false         # 关闭自动提交Offset(推荐手动提交)key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500             # 单次poll最大消息数(避免OOM)fetch-max-wait-ms: 500            # 消费者等待broker返回数据的最长时间isolation-level: read_committed   # 事务消息隔离级别(read_committed/read_uncommitted)# 监听器配置(高级优化)listener:type: single                      # 监听器类型:single(单条)/batch(批量)ack-mode: manual                  # Offset提交模式:manual(手动)/batch(批量提交)concurrency: 3                    # 消费者线程数(建议等于分区数)poll-timeout: 3000                # poll方法超时时间(毫秒)# 消息重试与死信队列(容错机制)retry:topic:attempts: 3                     # 最大重试次数initial-interval: 1000          # 初始重试间隔(毫秒)multiplier: 2.0                 # 重试间隔倍数(指数退避)dead-letter-topic: dlq-${topic}   # 死信队列命名规则(自动创建)# 安全协议(企业级场景)properties:security.protocol: SASL_PLAINTEXT  # 安全协议(如PLAINTEXT/SASL_SSL)sasl.mechanism: PLAIN             # SASL认证机制ssl.truststore.location: /path/to/truststore.jks# 自定义业务配置(非Kafka标准参数)app:kafka:topics:input-topic: user-events        # 业务输入Topicoutput-topic: processed-events  # 业务输出Topic

3.关键配置注释说明

1. 生产者优化参数
参数说明推荐值
acks=all确保所有ISR副本写入成功,防止数据丢失高可靠性场景必选
compression-type=snappy减少网络带宽占用,提升吞吐量消息体>1KB时启用
transaction-id-prefix支持跨分区原子性写入(需配合@Transactional注解)金融交易类业务必配
2. 消费者可靠性配置
参数说明注意事项
enable-auto-commit=false避免消息处理失败但Offset已提交导致数据丢失需手动调用ack.acknowledge()
isolation-level=read_committed只消费已提交的事务消息需与生产者事务配置联动
3. 监听器高级特性
参数使用场景示例
type=batch批量消费(提升吞吐量)适用于日志处理等实时性要求低的场景
concurrency=3并发消费者数需与Topic分区数一致,避免资源浪费
4. 安全认证配置
spring:kafka:properties:security.protocol: SASL_SSLsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret";
  • 企业级必配:生产环境需启用SSL加密+SASL认证

4.配置验证方法

  1. 启动检查:添加@ConfigurationProperties(prefix = "spring.kafka")绑定配置到Bean,通过单元测试验证注入值
  2. 日志监控:开启DEBUG日志观察生产者/消费者连接状态
   logging:level:org.springframework.kafka: DEBUG
  1. AdminClient 工具:通过编程方式检查Topic元数据
@Autowired
private KafkaAdminClient adminClient;public void checkTopic() {Map<String, TopicDescription> topics = adminClient.describeTopics("user-events");topics.values().forEach(topic -> System.out.println(topic));
}

5.不同场景配置模板

场景1:高吞吐日志收集
producer:compression-type: lz4batch-size: 65536linger-ms: 100
consumer:auto-offset-reset: latestenable-auto-commit: true  # 允许少量数据丢失以换取性能
场景2:金融级事务消息
producer:acks: allretries: 10transaction-id-prefix: fin-tx-
consumer:isolation-level: read_committedenable-auto-commit: false
场景3:跨数据中心同步
spring:kafka:bootstrap-servers: kafka-dc1:9092,kafka-dc2:9092properties:client.dns.lookup: use_all_dns_ips  # 支持多IP解析reconnect.backoff.ms: 1000          # 断线重连策略

5.高级配置

1.事务支持

// 配置事务管理器
@Bean
public KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);
}// 使用事务发送
@Transactional
public void sendWithTransaction() {kafkaTemplate.send("topic1", "msg1");kafkaTemplate.send("topic2", "msg2");
}

2.消息重试与死信队列

spring:kafka:listener:retry:max-attempts: 3backoff:initial-interval: 1000multiplier: 2.0dead-letter-topic: my-dlt-topic # 死信队列

http://www.ppmy.cn/server/171085.html

相关文章

CentOS 下安装和配置 HTTPD 服务的详细指南

CentOS 下安装和配置 HTTPD 服务的详细指南 CentOS 下安装和配置 HTTPD 服务的详细指南1. 环境准备2. 安装 HTTPD 服务2.1 更新系统2.2 安装 HTTPD2.3 启动 HTTPD 服务2.4 检查 HTTPD 服务状态 3. 配置防火墙3.1 开放 HTTP 和 HTTPS 端口3.2 验证防火墙规则 4. 配置 HTTPD4.1 主…

【C++修炼之路】C++类与对象:面向对象编程的第一步

&#x1f3dd;️专栏&#xff1a; 【C修炼之路】 &#x1f305;主页&#xff1a; f狐o狸x “于高山之巅&#xff0c;方见大河奔涌&#xff1b;于群峰之上&#xff0c;更觉长风浩荡” 目录 一、面向过程和面向对象的初步认识 二、类的定义 三、类的访问限定符及封装 3.1 访问…

如何进行OceanBase 运维工具的部署和表性能优化

本文来自OceanBase 用户的实践分享 随着OceanBase数据库应用的日益深入&#xff0c;数据量不断攀升&#xff0c;单个表中存储数百万乃至数千万条数据的情况变得愈发普遍。因此&#xff0c;部署专门的运维工具、实施针对性的表性能优化策略&#xff0c;以及加强指标监测工作&…

1、进程和线程之间有什么区别 【高频】

进程 是 调度 和 资源分配 的最小单位&#xff0c;线程 是 执行程序 的最小单位。一个进程可以运行多个线程。 进程与线程的区别 共享信息&#xff1a; 进程间之间共享信息不方便&#xff0c;通信比较麻烦&#xff0c;需要一些特殊机制&#xff0c;如管道、有名管道、共享内存…

android studio gradle 阿里镜像

阿里云gradle镜像地址&#xff1a;macports-distfiles-gradle安装包下载_开源镜像站-阿里云macports-distfiles-gradle安装包是阿里云官方提供的开源镜像免费下载服务&#xff0c;每天下载量过亿&#xff0c;阿里巴巴开源镜像站为包含macports-distfiles-gradle安装包的几百个操…

MacOS 终端选型

MacOS终端工具选型与技术栈建议 一、核心工具对比矩阵 工具名称最新版本核心优势适用场景推荐指数引用来源iTerm25.3分屏/自动补全/多语言支持/全局搜索全栈开发/服务器运维⭐⭐⭐⭐⭐19Warp1.4AI智能补全/块编辑/现代UI/跨平台协作新手友好/团队协作⭐⭐⭐⭐39Tabby2.0多协议…

PyCharm Professional 2025 安装配置全流程指南(Windows平台)

一、软件定位与核心功能 PyCharm 2025 是 JetBrains 推出的智能 Python IDE&#xff0c;新增深度学习框架自动补全、实时性能热力图等功能1。相较于社区版&#xff0c;专业版支持&#xff1a; Web开发&#xff08;Django/Flask&#xff09;数据库工具&#xff08;PostgreSQL/…

Git原理+使用(超详细)

Git初识 当我们写项目代码时&#xff0c;需要不断的更新版本&#xff0c;那么就需要一个东西去管理这些不同版本的文件—版本控制器。 目前最主流的版本控制器就是Git。它是一个可以记录工程的每一次改动和版本迭代的管理系统&#xff0c;同时方便多人协同作业。 &#xff0…