1、关于消息偏移量的确认
消息的确认包括自动确认和手动确认,通常采用手动确认的方式,配置项和代码块分别如下所示。这里需要注意的是,当消息1、2、3顺序到达,2偏移量确认失败,3偏移量确认成功时,2的偏移量将被覆盖,即后续将从3的偏移量开始消费,不会再次消费消息2 !!!
spring:kafka:consumer:......# 关闭自动提交偏移量enable-auto-commit: falselistener:# 拉取数据方式: single、batchtype: single# 偏移量提交方式:手动ack-mode: manual_immediate
@Component
public class MessageConsumer {@KafkaListener(topics = "topic-smy", groupId = "my_group1")public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment ack) {// 消息处理System.out.println("接收到消息, topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());if(!"0".equals(record.value())) {// 消息偏移量确认ack.acknowledge();}}
}