4.SpringBoot集成Kafka
文章目录
- 4.SpringBoot集成Kafka
- 1.入门示例
- 2.yml完整配置
- 3.关键配置注释说明
- 1. 生产者优化参数
- 2. 消费者可靠性配置
- 3. 监听器高级特性
- 4. 安全认证配置
- 4.配置验证方法
- 5.不同场景配置模板
- 场景1:高吞吐日志收集
- 场景2:金融级事务消息
- 场景3:跨数据中心同步
- 5.高级配置
- 1.事务支持
- 2.消息重试与死信队列
来源参考的deepseek,如有侵权联系立删
1.入门示例
1.pom依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2.KafkaProducer消息生产者配置
@Component
@Slf4j
public class KafkaProducer {private HashMap map=new HashMap<>();@Autowiredprivate KafkaTemplate<Integer,String> kafkaTemplate;public void send(String topic,String msg){log.info("开始发送消息,topic:{};message:{}",topic,msg);ListenableFuture<SendResult<Integer,String>> send=kafkaTemplate.send(topic, msg);//消息确认机制send.addCallback(new ListenableFutureCallback<SendResult<Integer,String>>(){@Overridepublic void onSuccess(SendResult<Integer, String> result) {log.info("消息发送成功,topic:{};message:{}",topic,msg);}@Overridepublic void onFailure(Throwable ex) {//落库操作map.put(topic,msg);}});}
}
springboot3.x写法
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;@Service
@RequiredArgsConstructor
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;// 同步发送(阻塞等待确认)public void sendMessageSync(String topic, String key, String value) {kafkaTemplate.send(topic, key, value).whenComplete((result, ex) -> {if (ex == null) {System.out.printf("发送成功:topic=%s, partition=%d, offset=%d%n",result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());} else {System.err.println("发送失败:" + ex.getMessage());}});}// 异步发送(默认方式)public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message);}
}
- Spring Boot 2.x:
send()
返回ListenableFuture<SendResult>
,支持addCallback()
回调。 - Spring Boot 3.x:
send()
返回CompletableFuture<SendResult>
,弃用ListenableFuture
,因此需要使用CompletableFuture
的 API(如whenComplete
)。
3.KafkaConsumer消息消费
@Component
@Slf4j
public class KafkaConsumer {private List<String> exist=new ArrayList<>();@KafkaListener(topics = {"lx"},groupId = "lx")public void consumer(ConsumerRecord<Integer,String> record){if (exist.contains(record.value())){log.error("不满足幂等校验!!!");}log.info("消息消费成功,topic:{},message:{}", record.topic(), record.value());exist.add(record.value());}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumerService {// 单个消息消费(手动提交偏移量)@KafkaListener(topics = "test-topic", groupId = "spring-group")public void listenMessage(String message, Acknowledgment ack) {System.out.println("收到消息:" + message);ack.acknowledge(); // 手动提交}// 批量消费(需配置 listener.type=batch)@KafkaListener(topics = "batch-topic", groupId = "spring-group")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));ack.acknowledge();}
}
4.yml配置文件
生产者配置
#kafka配置
spring:kafka:#kafka集群地址# bootstraps-server: 192.168.25.100:9092bootstrap-servers: 47.122.26.22:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后,会发送properties:linger.ms: 1# 禁止生产者触发 Topic 创建请求allow.auto.create.topics: false#代表集群中从节点都持久化后才认为发送成功acks: -1
消费者配置
spring:kafka:#kafka集群地址bootstraps-server: 192.168.25.100:9092consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000# 禁用生产者触发 Topic 元数据请求时自动创建allow.auto.create.topics: falsegroup-id: testauto-offset-reset: earliestlistener:ack-mode: manual_immediate # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch
5.实体类
@Data
public class KafkaRequest {/*** 主题*/private String topic;/*** 消息*/private String message;
}
6.消息发送
@RestController
@Slf4j
public class KafkaController {private final String topic="lx";private int temp=1;@Autowiredprivate KafkaProducer producer;/*** 下单** @param kafkaRequest* @return null*/@RequestMapping("/test01")public void test01(KafkaRequest kafkaRequest){log.info("test01测试成功!topic:{};message:{}",kafkaRequest.getTopic(), kafkaRequest.getMessage());producer.send(kafkaRequest.getTopic(), kafkaRequest.getMessage());}@RequestMapping("/test02")public void test02(KafkaRequest kafkaRequest){log.info("test02测试成功!topic:{};message:{}",topic, temp);producer.send(topic, String.valueOf(temp));temp++;}
}
kafka启动方式
./kafka-server-start.sh ../config/server.properties
2.yml完整配置
spring:kafka:# 基础配置(必填项)bootstrap-servers: localhost:9092 # Kafka集群地址,多节点用逗号分隔 client-id: spring-boot-app # 客户端标识(日志追踪用)# 生产者配置 producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值序列化器acks: all # 消息确认机制:all表示所有副本确认(最高可靠性)retries: 5 # 发送失败重试次数(需配合幂等性使用)batch-size: 16384 # 批量发送缓冲区大小(单位:字节)linger-ms: 50 # 发送延迟等待时间(毫秒,提高吞吐量)buffer-memory: 33554432 # 生产者内存缓冲区大小(默认32MB)compression-type: snappy # 消息压缩算法(可选gzip/lz4/zstd)transaction-id-prefix: tx- # 开启事务时需配置前缀(需配合@Transactional)# 消费者配置 consumer:group-id: app-consumer-group # 消费者组ID(同一组共享分区)auto-offset-reset: earliest # 无Offset时策略:earliest(从头)/latest(最新)enable-auto-commit: false # 关闭自动提交Offset(推荐手动提交)key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500 # 单次poll最大消息数(避免OOM)fetch-max-wait-ms: 500 # 消费者等待broker返回数据的最长时间isolation-level: read_committed # 事务消息隔离级别(read_committed/read_uncommitted)# 监听器配置(高级优化)listener:type: single # 监听器类型:single(单条)/batch(批量)ack-mode: manual # Offset提交模式:manual(手动)/batch(批量提交)concurrency: 3 # 消费者线程数(建议等于分区数)poll-timeout: 3000 # poll方法超时时间(毫秒)# 消息重试与死信队列(容错机制)retry:topic:attempts: 3 # 最大重试次数initial-interval: 1000 # 初始重试间隔(毫秒)multiplier: 2.0 # 重试间隔倍数(指数退避)dead-letter-topic: dlq-${topic} # 死信队列命名规则(自动创建)# 安全协议(企业级场景)properties:security.protocol: SASL_PLAINTEXT # 安全协议(如PLAINTEXT/SASL_SSL)sasl.mechanism: PLAIN # SASL认证机制ssl.truststore.location: /path/to/truststore.jks# 自定义业务配置(非Kafka标准参数)app:kafka:topics:input-topic: user-events # 业务输入Topicoutput-topic: processed-events # 业务输出Topic
3.关键配置注释说明
1. 生产者优化参数
参数 | 说明 | 推荐值 |
---|---|---|
acks=all | 确保所有ISR副本写入成功,防止数据丢失 | 高可靠性场景必选 |
compression-type=snappy | 减少网络带宽占用,提升吞吐量 | 消息体>1KB时启用 |
transaction-id-prefix | 支持跨分区原子性写入(需配合@Transactional注解) | 金融交易类业务必配 |
2. 消费者可靠性配置
参数 | 说明 | 注意事项 |
---|---|---|
enable-auto-commit=false | 避免消息处理失败但Offset已提交导致数据丢失 | 需手动调用ack.acknowledge() |
isolation-level=read_committed | 只消费已提交的事务消息 | 需与生产者事务配置联动 |
3. 监听器高级特性
参数 | 使用场景 | 示例 |
---|---|---|
type=batch | 批量消费(提升吞吐量) | 适用于日志处理等实时性要求低的场景 |
concurrency=3 | 并发消费者数 | 需与Topic分区数一致,避免资源浪费 |
4. 安全认证配置
spring:kafka:properties:security.protocol: SASL_SSLsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret";
- 企业级必配:生产环境需启用SSL加密+SASL认证
4.配置验证方法
- 启动检查:添加
@ConfigurationProperties(prefix = "spring.kafka")
绑定配置到Bean,通过单元测试验证注入值 - 日志监控:开启DEBUG日志观察生产者/消费者连接状态
logging:level:org.springframework.kafka: DEBUG
- AdminClient 工具:通过编程方式检查Topic元数据
@Autowired
private KafkaAdminClient adminClient;public void checkTopic() {Map<String, TopicDescription> topics = adminClient.describeTopics("user-events");topics.values().forEach(topic -> System.out.println(topic));
}
5.不同场景配置模板
场景1:高吞吐日志收集
producer:compression-type: lz4batch-size: 65536linger-ms: 100
consumer:auto-offset-reset: latestenable-auto-commit: true # 允许少量数据丢失以换取性能
场景2:金融级事务消息
producer:acks: allretries: 10transaction-id-prefix: fin-tx-
consumer:isolation-level: read_committedenable-auto-commit: false
场景3:跨数据中心同步
spring:kafka:bootstrap-servers: kafka-dc1:9092,kafka-dc2:9092properties:client.dns.lookup: use_all_dns_ips # 支持多IP解析reconnect.backoff.ms: 1000 # 断线重连策略
5.高级配置
1.事务支持
// 配置事务管理器
@Bean
public KafkaTransactionManager<String, String> transactionManager(ProducerFactory<String, String> producerFactory) {return new KafkaTransactionManager<>(producerFactory);
}// 使用事务发送
@Transactional
public void sendWithTransaction() {kafkaTemplate.send("topic1", "msg1");kafkaTemplate.send("topic2", "msg2");
}
2.消息重试与死信队列
spring:kafka:listener:retry:max-attempts: 3backoff:initial-interval: 1000multiplier: 2.0dead-letter-topic: my-dlt-topic # 死信队列