Kafka的Offset(偏移量)详解

news/2025/1/8 19:02:36/

Kafka的Offset详解

  • 1、生产者Offset
  • 2、消费者Offset
    • 2.1、消费者
    • 2.2、生产者
    • 2.3、实体类对象
    • 2.4、JSON工具类
    • 2.5、项目配置文件
    • 2.6、测试类
    • 2.7、测试
    • 2.8、总结

1、生产者Offset

在这里插入图片描述
在这里插入图片描述

2、消费者Offset

在这里插入图片描述

2.1、消费者

package com.power.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {/*** topics 用于指定从哪个主题中消费消息* concurrency 用于指定有多少个消费者* @param record*/@KafkaListener(topics = {"offSetTopic"}, groupId = "offSetGroup")public void onEventA(ConsumerRecord<String, String> record) {System.out.println(Thread.currentThread().getId()+"---> 消费消息 record = " + record);}
}

2.2、生产者

package com.power.producer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;public void sendEvent(){for (int i = 0; i < 2; i++) {User user = User.builder().id(i).phone("1567676767"+i).birthday(new Date()).build();String userJson = JSONUtils.toJSON(user);kafkaTemplate.send("offSetTopic","k"+i, userJson);}}}

2.3、实体类对象

package com.power.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {private Integer id;private String phone;private Date birthday;}

2.4、JSON工具类

package com.power.util;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;public class JSONUtils {private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();public static String toJSON(Object object){try {return OBJECTMAPPER.writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static <T> T toBean(String json,Class<T> clazz){try {return OBJECTMAPPER.readValue(json,clazz);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}
}

2.5、项目配置文件

spring:application:#应用名称name: spring-boot-06-kafka-offset#kafka连接地址(ip+port)kafka:bootstrap-servers: <你的kafka服务器IP>:9092#配置消费者的反序列化consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2.6、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot07KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendInterceptor(){eventProducer.sendEvent();}}

2.7、测试

  • 先启动生产者,会发送两条消息到kafka服务器

  • 再启动消费者监听,此时我们发现,启动后的消费者并不会监听到生产者已发送的两条消息

  • kafka安装目录的bin文件夹下执行命令:

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group offSetGroup --describe
  • 根据命令结果:查看kafka消费者的偏移量offset,我们发现当前消费者偏移量CURRENT-OFFSET值为2 ,当前日志记录的生产者消息偏移量LOG-END-OFFSET值为2,消费者偏移量和日志记录的生产者消息偏移量差值LAG值为0 ,所以消费者查询不到生产者发送的消息。

在这里插入图片描述

  • 关闭消费者,再次使用生产者发送消息,再次执行命令查看消费者偏移量

在这里插入图片描述

  • 此时我们发现消费者偏移量为4,日志记录的偏移量为6,两者差值为2,此时启动消费者,读取到了差值为2的数据

2.8、总结

在这里插入图片描述

  • 消费者从什么地方开始消费,就看消费者的offset是多少,消费者启动后他的offset是多少。
  • 消费者offset是多少,可以通过命令查看
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group offSetGroup --describe

http://www.ppmy.cn/news/1561351.html

相关文章

Spring Boot教程之四十七:Spring Boot ——JDBC

Spring Boot – JDBC Spring Boot JDBC用于通过提供库和启动器依赖项将Spring Boot 应用程序与JDBC连接起来。Spring Boot JDBC 对正在编写的 SQL 查询具有一定程度的控制。Spring Boot JDBC 通过使用自动配置的概念自动化工作和步骤&#xff0c;简化了 Spring JDBC 的工作。J…

51c自动驾驶~合集45

我自己的原文哦~ https://blog.51cto.com/whaosoft/13020031 #运动控制和规划控制需要掌握的技术栈~ 各大垃圾家电造车厂又要开始了~~~​ 1、ROS的通信方式 李是Lyapunov的李&#xff1a;谈谈ROS的通信机制 话题通信和服务通信&#xff0c;其中话题通信是通过发布和订阅…

亚马逊云科技 re:Invent 2024 Amazon Bedrock 推出新功能,加速AI落地

文章目录 一、前言二、本次 re:Invent Amazon Bedrock 重大相关更新2.1 Amazon Bedrock 基础模型的重大更新2.2 亚马逊 Bedrock Marketplace 的发布使得可以在 Bedrock 上使用第三方模型。2.3 新的厂商新的大模型加入 Amazon Bedrock 平台2.4 Amazon Bedrock 自定义模型与相关优…

数据结构9.3 - 文件基础(C++)

目录 1 打开文件字符读写关闭文件 上图源自&#xff1a;https://blog.csdn.net/LG1259156776/article/details/47035583 1 打开文件 法 1法 2ofstream file(path);ofstream file;file.open(path); #include<bits/stdc.h> using namespace std;int main() {char path[]…

探索 ES6 Set:用法与实战

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

机器视觉——相机光源选择

工业相机光源选型 包括背光光源、同轴光源、条光光源、点光源、环形光源、面光源、激光光源、 UV光源、 红外光源 1. 背光光源 将背光源放置在检测物体下方&#xff0c;向镜头方向打光&#xff0c;物体部分由于不透光在镜头中呈现暗色阴影&#xff0c;边缘的缺陷会非常清晰。…

基于深度学习的视觉检测小项目(六) 项目的信号和变量的规划

• 关于前后端分离 当前流行的一种常见的前后端分离模式是vueflask&#xff0c;vueflask模式的前端和后端之间进行数据的传递通常是借助 API&#xff08;应用程序编程接口&#xff09;来完成的。vue通过调用后端提供的 API 来获取或提交数据。例如&#xff0c;前端可能通过发送…

黑马Java面试教程_P10_设计模式

系列博客目录 文章目录 系列博客目录前言1. 工厂方法模式1.1 概述1.2 简单工厂模式1.2.1 结构1.2.2 实现1.2.3 优缺点 1.3 工厂方法模式1.3.1 概念1.3.2 结构1.3.3 实现1.3.4 优缺点 1.4 抽象工厂模式1.4.1 概念1.4.2 结构1.4.3 实现1.4.4 优缺点1.4.5 使用场景 总结&#xff0…