SpringBoot集成kafka-监听器手动确认接收消息(主要为了保证业务完成后再确认接收)

server/2024/11/15 8:24:31/

SpringBoot集成kafka-监听器手动确认接收消息

  • 1、说明
  • 2、示例
    • 2.1、application.yml
    • 2.2、消费者
    • 2.3、生产者
    • 2.4、测试类
    • 2.5、测试

1、说明

kafak中默认情况下是自动确认消息接收的,也就是说先启动消费者监听程序,再启动生产者发送消息,此时消费者监听到生产者发送的消息后,程序会自动确认接收成功,偏移量会自动下移,此时再启动消费者,偏移量会从新的位置读取数据,如果本次出现异常,业务没有处理完成,那么下次启动消费者是读取不到本次的消息的,所以可以采用手动确认的配置,确保本次消费者接收到了消息,并且业务正常处理完毕了,给kafak手动反馈接收成功。

在这里插入图片描述

2、示例

在这里插入图片描述

2.1、application.yml

在这里插入图片描述

2.2、消费者

package com.power.consumer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.function.Consumer;@Component
public class EventConsumer {@KafkaListener(topics = {"${kafka.topic.name}"},groupId="${kafka.consumer.group}")public void onEvent4(String userJson,@Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,ConsumerRecord<String,String> record,Acknowledgment ack){try {User user =JSONUtils.toBean(userJson,User.class);System.out.println("读取/消费到的事件,user:"+user+",topic:"+topic+",partition:"+partition);System.out.println("读取/消费到的事件:"+record.toString());int a = 10/0;//业务确认完成,给kafka服务器反馈确认ack.acknowledge();//手动确认消息,就是告诉kafka服务器,该消息我已经接收到了,默认情况下是自动确认//手动确认后,下次启动消费者,偏移量会从新的位置开始;没有手动确认,下次启动消费者,偏移量还是从老位置开始}catch (Exception e){e.printStackTrace();}}}

2.3、生产者

package com.power.producer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;public void sendEvent2(){User user = User.builder().id(10001).phone("15676767676").birthday(new Date()).build();String userJson = JSONUtils.toJSON(user);kafkaTemplate.send("helloTopic",userJson);}}

2.4、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot02KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendEvent2(){eventProducer.sendEvent2();}}

2.5、测试

先启动消费者监听程序
再启动生产者发送消息
程序再业务中出现了异常:

在这里插入图片描述
再次启动消费者程序,因为再上次启动时出现了异常,也没有进行手动确认接收,所以本地启动消费者后依然可以读取到上次未完成业务时接收到的数据
在这里插入图片描述


http://www.ppmy.cn/server/108346.html

相关文章

依赖注入:原则、实践与Spring中的应用

在软件开发中,依赖注入(Dependency Injection,DI)是一种实现控制反转(Inversion of Control,IoC)的模式,它用于减少代码间的耦合度并提高模块化。Spring框架作为当前最流行的企业级应用开发框架之一,其核心特性之一就是依赖注入。本文将深入探讨依赖注入的概念、实践方…

OceanBase 功能解析之 Binlog Service

前言 MySQL&#xff0c;是在全球广泛应用的开源关系型数据库&#xff0c;除了其稳定性、可靠性和易用性&#xff0c;他早期推出的二进制日志功能&#xff0c;即binlog&#xff0c;也是MySQL广受欢迎的原因。 MySQL binlog&#xff0c;即二进制日志&#xff0c;是 MySQL 中用于…

香农定理简单理解

香农定理 香农定理,又称为信息论基础定理,是信息论中最具代表性和影响力的定理之一。它主要由三大定理组成,这些定理为通信系统的设计提供了重要的理论依据,并对现代通信技术的发展产生了深远影响。以下是对香农定理的简单理解: 一、香农第一定理(可变长无失真信源编码…

气势向前,豪华向上 全新BMW X3长轴距版全球首发,更大体量、更高价值、更多驾趣

X家族的中流砥柱&#xff0c;豪华中型SAV的引领者现代极简主义&#xff0c;原石切割般的型面&#xff0c;塑造纯粹的BMW X辨识度“以驾驶员为中心”和“以简释繁”&#xff0c;打造“中国专属”的精致豪华空间创新不止&#xff0c;驾趣依旧&#xff0c;探索未知的忠实伙伴 &…

一篇搞懂C++ STL 元组std::tuple

文章目录 前言什么是 std::tuple为什么要使用 std::tuplestd::tuple 的构造函数和操作函数1. 构造函数2. 操作函数 make_tuple函数std::make_tuple 的功能函数原型参数返回值使用示例std::make_tuple 的特点 示例代码总结 前言 在 C 中&#xff0c;元组&#xff08;Tuple&…

JS中【Data】详解

在JavaScript中&#xff0c;“Date” 是一个内置对象&#xff0c;专门用于处理日期和时间。Date 对象提供了多种方法&#xff0c;能够让你创建、操作和格式化日期和时间数据。下面我们将详细讲解 Date 对象的各个方面&#xff0c;包括它的创建、方法、使用场景和一些常见的坑。…

Java-List工具类

为了扩展 List 工具类,我们可以创建一个新的工具类 ListUtil,它将包含一系列静态方法,用于方 便地创建、操作和查询 List 集合。以下是一些常用的扩展方法示例: 批量添加元素:addAllElements 方法允许向列表中批量添加多个元素。 批量移除元素:removeAllElements 方法…

MySQL——多表操作(四)子查询(1)带 IN 关键字的子查询

子查询是指一个查询语句嵌套在另一个查询语句内部的查询。它可以嵌套在SELECT、SELECT、INTO 语句、INSERT…INTO 等语句中。在执行查询语句时&#xff0c;首会执行子查询中的语句&#xff0c;然后将返回的结果作为外层查询的过滤条件&#xff0c;在子查询中通可以使用 IN、EXI…