kafka-消费者-指定offset消费(SpringBoot整合Kafka)

news/2024/9/19 8:39:02/ 标签: kafka, spring boot, linq, 指定offset消费

文章目录

  • 1、指定offset消费
    • 1.1、创建消费者监听器‘
    • 1.2、application.yml配置
    • 1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
    • 1.4、创建生产者发送消息
      • 1.4.1、分区0中的数据
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:

1、指定offset消费

1.1、创建消费者监听器‘

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaPartitionListener {//初始化偏移量指定后,每次重启都会从该位置消费一轮,所以一般是调式解决问题时才使用@KafkaListener(topicPartitions = {@TopicPartition(topic = "my_topic1",partitionOffsets = {@PartitionOffset(partition = "0",initialOffset = "2")})}, groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}

1.2、application.yml配置

server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本

package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class MyKafkaConfig {@Beanpublic NewTopic springTestPartitionTopic() {return TopicBuilder.name("my_topic1") //主题名称.partitions(3) //分区数量.replicas(3) //副本数量.build();}
}

在这里插入图片描述
在这里插入图片描述

1.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%3,"", "指定分区消费"+i);}}}

在这里插入图片描述
在这里插入图片描述

1.4.1、分区0中的数据

[[{"partition": 0,"offset": 0,"msg": "指定offset消费0","timespan": 1717660785962,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 1,"msg": "指定offset消费3","timespan": 1717660785974,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 2,"msg": "指定offset消费6","timespan": 1717660785975,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 3,"msg": "指定offset消费9","timespan": 1717660785975,"date": "2024-06-06 07:59:45"}]
]

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

kafka_debug__logbackxml_168">1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

kafka_178">1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、消费者控制台:

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

此时如果重新启动 SpringKafkaConsumerApplication 消费者还是会消费数据,重复消费

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

http://www.ppmy.cn/news/1467745.html

相关文章

PHP质量工具系列之php-depend

php-depend是一个开源的静态代码分析工具&#xff0c;它的主要功能包括&#xff1a; 代码质量分析 复杂度度量&#xff1a;计算类、方法和函数的Cyclomatic Complexity&#xff08;循环复杂度&#xff09;&#xff0c;帮助识别潜在的复杂代码段。 耦合度度量&#xff1a;分析类…

基于System-Verilog实现DE2-115开发板驱动HC_SR04超声波测距

目录 前言 一、SystemVerilog——下一代硬件设计语言 与Verilog关系 与SystemC关系 二、实验原理 2.1 传感器概述&#xff1a; 2.2 传感器引脚 2.3 传感器工作原理 2.4 整体测距原理及编写思路 三、System-Verilog文件 3.1 时钟分频 3.2 超声波测距 3.3 数码管驱动…

电脑屏幕监控软件有哪些:5款好用的电脑屏幕监控软件(宝藏篇)

什么是电脑屏幕监控软件&#xff1f; 电脑屏幕监控软件是一种专业的应用软件&#xff0c;主要用于远程或本地监控局域网&#xff08;LAN&#xff09;内其他电脑的屏幕显示和操作活动。 这类软件通常由两部分组成&#xff1a;一个是安装在监控者电脑上的主控端&#xff08;或称…

SpringCloud网关-gateway

一 什么是网关&#xff1f;为什么选择 Gateway? 网关功能如下&#xff1a; 身份认证和权限校验服务路由、负载均衡请求限流 在 Spring Cloud 中网关的实现包含两种&#xff1a; Gateway&#xff08;推荐&#xff09;&#xff1a;是基于 Spring5 中提供的 WebFlux &#xff…

Python画图(多图展示在一个平面)

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

具有 MOSFET 的电压到电流 (V-I) 转换器电路

设计说明 该单电源、低侧、V-I 转换器向可以连接到比运算放大器电源电压更高的电压的负载提供经过良好调节的电流。该 电路接受介于 0V 和 2V 之间的输入电压&#xff0c;将其转换为介于 0mA 和 100mA 之间的电流。通过将低侧电流检测电 阻 R3 上的压降反馈到运算放大器的反相…

python的最小二乘法(OLS)函数

1、作用 pandas提供了一些很方便的功能&#xff0c;比如最小二乘法(OLS)&#xff0c;可以用来计算回归方程式的各个参数。 2、Python导出的OLS模型的结果 下面是如何解读Python导出的OLS模型的结果。 1. 回归系数&#xff1a; 代表每个自变量对因变量的影响程度&#xff0c…

哪款开放式耳机佩戴最舒服?2024五款备受推崇产品分享!

​在现今耳机市场&#xff0c;开放式耳机凭借其舒适的佩戴体验和独特的不入耳设计&#xff0c;备受消费者追捧。它们不仅让你在享受音乐时&#xff0c;仍能察觉周围的声音&#xff0c;确保与人交流无障碍&#xff0c;而且有利于耳朵的卫生与健康。对于运动爱好者和耳机发烧友而…

签名安全规范:解决【请求对象json序列化时,时间字段被强制转换成时间戳的问题】

文章目录 引言I 签名安全规范1.1 签名生成的通用步骤1.2 签名运算(加密规则)1.3 对所有传入参数按照字段名的 ASCII 码从小到大排序(字典序)1.4 允许的请求头字段1.5 签名校验工具II 注解校验签名2.1 获取请求数据,并校验签名数据2.2 解决时间格式被强制转换成时间戳的问题…

tcpdump抓包,抓包导出.pcap文件用wireshark看

1、抓所有口的包 tcpdump -i any host 设备的ip2、抓特定口的包 tcpdump -i eth2 port 61182 -nne3、将抓到的包导出到pacb文件 tcpdump -i eth2 port 61182 -nne -s0 -w /tmp/61182.pcap -s0: Sets the snapshot length to capture the entire packet. The 0 means that tcpd…

【C++PCL】点云处理Trimmed_ICP配准

作者:迅卓科技 简介:本人从事过多项点云项目,并且负责的项目均已得到好评! 公众号:迅卓科技,一个可以让您可以学习点云的好地方 重点:每个模块都有参数如何调试的讲解,即调试某个参数对结果的影响是什么,大家有问题可以评论哈,如果文章有错误的地方,欢迎来指出错误的…

Docker从安装开始精通

从虚拟机到容器 1.环境配置的难题 软件开发最大的麻烦事之一&#xff0c;就是环境配置。用户计算机的环境都不相同&#xff0c;你怎么知道自家的软件&#xff0c;能在那些机器跑起来&#xff1f; 用户必须保证两件事&#xff1a;操作系统的设置&#xff0c;各种库和组件的安装…

瑞鑫RK3588 画中画 OSD 效果展示

这些功能本来在1126平台都实现过 但是迁移到3588平台之后 发现 API接口变化较大 主要开始的时候会比较费时间 需要找到变动接口对应的新接口 之后 就比较好操作了 经过几天的操作 已实现 效果如下

人手一份的MinewSemi模块新系列命名规则手册,值得收藏!

产品系列命名是企业塑造品牌多样化和满足消费者多样化需求的关键环节。在当前数字化时代&#xff0c;数据的管理和标识变得越来越重要&#xff0c;通过简单的字母或者数字组合&#xff0c;赋予了不一样的意义。命名规则对于确保信息传递的准确性和效率至关重要。 创新微MinewS…

Java利用Scanner实现控制台文字游戏,Java实现猜数字游戏简易文字游戏,Java实现石头剪刀布简易文字游戏

1、猜数字游戏简易文字游戏 public static void main(String[] args) {//文本扫描器Scanner scanner new Scanner(System.in);//获取本次游戏的正确数字int num new Random().nextInt(100);System.out.println("开始猜数字游戏&#xff0c;输入数字后按enter");whi…

10、有条件提前退出关键字Return From Keyword If【robot framework】

在 Robot Framework 中&#xff0c;Return From Keyword If 是一个有用的关键字&#xff0c;它允许你在特定条件下从关键字中返回。这在需要在满足某个条件时提前退出关键字的情况下特别有用。 以下是 Return From Keyword If 的语法和使用示例&#xff1a; 语法 Return From…

机关事业单位需要进行等保测评吗?一年要几次?

机关事业单位需要进行等保测评吗&#xff1f;一年要几次&#xff1f; 【回答】&#xff1a;根据规定&#xff0c;机关事业单位应当自行或者委托具有相应资质的第三方网络安全服务机构&#xff0c;对互联网政务应用网络和数据安全每年至少进行一次安全检测评估。机关事业单位每…

专转本考试的三个难点

专转本对于我们来说&#xff0c;其实真正的难点并不是考试&#xff0c;而是以下三个方面。第一个是能不能坚持到底。我们有很多同学刚开始的时候说我要专转本&#xff0c;但是到了后面会慢慢的慢慢的就放弃了。所以说最后能真正走到考场的同学&#xff0c;说实话并没有多少。第…

OpenAI新研究破解GPT-4大脑,分解1600万个特征打开“黑匣子”,Ilya 、Jan Leike也参与了!

https://cdn.openai.com/papers/sparse-autoencoders.pdf 这份论文探讨了稀疏自编码器&#xff08;SAE&#xff09;在语言模型中的应用&#xff0c;旨在从语言模型中提取可解释的特征。论文的主要贡献包括&#xff1a; 1. 训练方法&#xff1a; 提出了一种训练大型稀疏自编码…

Python基础教程 第2版 PDF下载

Python基础教程 第2版 PDF下载 在数字时代的浪潮中&#xff0c;Python已成为众多程序员和数据分析师的首选编程语言。对于初学者来说&#xff0c;一本好的Python教程无疑是开启编程之门的金钥匙。本文将为您介绍如何下载《Python基础教程 第2版》的PDF版本&#xff0c;并从四个…