【Kafka】从kafka中读取最新数据

news/2025/2/12 2:53:33/

【Kafka】从kafka中读取最新数据

  • 一、死循环无限拉取kafka数据
    • 1.1 整体框架剖析
    • 1.2 测试
  • 二、@KafkaListener注解 实现监听kafka数据
  • 三、参考资料

前情提要:我这里只是读取kafka里面的数据,生产者已经配置好且会自动监控数据库的变化来推入kafka中,所以这里不对生产者做过多的解释。

一、死循环无限拉取kafka数据

1.1 整体框架剖析

1、要想从Kafka中读取数据,就要先对消费者信息进行配置

//1、创建消费者配置信息Properties properties = new Properties();//2、给配置信息赋值//2.1 kafka集群信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");//2.2 开启自动提交offset 提交以后每次offset都在消费的最新位置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//2.3 自动提交offset延时 1秒钟提交一次properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");//2.4 key value的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//2.5 消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");

2、消费者基本配置信息完成以后,创建消费者、订阅主题、为了后面的消费

 //创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅主题consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));

3、订阅主题,就相当于已经订阅了kafka中的消息,下一步就是消费。而kafka消费消息的方式是poll拉取,我们这里对kafka中的数据进行消费,上面我们选择了自动提交offset,那么每次offset就是在上一次消费完成以后的最新位置,所以我们接下来的每次消费得到的都是最新未消费的数据!

while (true) {//获取数据ConsumerRecords<String, String> poll = consumer.poll(100);//解析并打印for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());}}

1.2 测试

方法一:

1、创建MyConsumer1类,根据上面整体结构的剖析,添加如下代码,并进行测试。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections;
import java.util.Properties;/*** @author potential*/
public class MyConsumer1 {public static void main(String[] args) {//1、创建消费者配置信息Properties properties = new Properties();//2、给配置信息赋值//2.1 kafka集群信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");//2.2 开启自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//2.3 自动提交offset延时 1秒钟提交一次properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");//2.4 key value的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//2.5 消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");//创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//订阅主题consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));while (true) {//获取数据ConsumerRecords<String, String> poll = consumer.poll(100);//解析并打印for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());}}
//        //关闭连接
//        consumer.close();}
}

方法二:
2、创建MyConsumer2类,根据上面整体结构的剖析,添加如下代码,并进行测试。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;/*** @author potential*/
public class MyConsumer2 {public static void main(String[] args) {//配置必要的参数//准备一个map集合放置参数Map<String, Object> config = new HashMap<String, Object>();//bootserversconconfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");//开启自动提交offsetconfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交offset延时 1秒钟提交一次config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");//valuedeserilizerconfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class);//groupidconfig.put(ConsumerConfig.GROUP_ID_CONFIG, "base_db_app_210325");//如果找不到偏移量,设置earliest,则从最新消费开始,也就是消费者一开始最新消费的时候//一定要注意顺序,读取时候的顺序会影响config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//        //此处是把消费者的偏移量重置到生产者最顶端
//        Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
//        hashMaps.put(new TopicPartition("ticdc-paperfree-monitor", 0), new OffsetAndMetadata(129));//消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(config);
//        //放置刚刚设置的偏移量
//        consumer.commitSync(hashMaps);//先订阅后消费consumer.subscribe(Arrays.asList("ticdc-paperfree-monitor"));//        // 批量从主题的分区拉取消息
//        //final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);
//        ConsumerRecords<String, String> consumerRecords = consumer.poll(3000);while (true) {//获取数据ConsumerRecords<String, String> poll = consumer.poll(100);//解析并打印for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());}}//
//        //遍历本次从主题的分区拉取的批量消息 这里是将整个分区中的全部数据都拉出来了
//        consumerRecords.forEach(new java.util.function.Consumer<ConsumerRecord<Integer, String>>() {
//            @Override
//            public void accept(ConsumerRecord<Integer, String> consumerRecord) {
//                System.out.println(
//                        consumerRecord.topic() +"\t"
//                                +consumerRecord.offset() + "\t"
//                                +consumerRecord.key() +"\t"
//                                +consumerRecord.value()+"\t"
//                );
//            }
//        });
//        consumer.close();}
}

注意:
方式一、方式二只是写法上的不同,整体架构都是一样的,任选其一来写即可。
在这里插入图片描述至此,从kafka中读取最新数据的流程就全部结束了。

二、@KafkaListener注解 实现监听kafka数据

1、导入依赖

【我这里SpringBoot版本是2.2.13】

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.3.7.RELEASE</version></dependency>

注意:
1、springboot +2、kafka-clients +3、spring-kafka(在下图中体现为Sprig for Apache Kafka Version) 这三个 要注意版本对应。具体对应情况如下图所示:
在这里插入图片描述2、配置文件
application.yml文件中添加如下内容:

spring:kafka:consumer:bootstrap-servers: 192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092 #集群信息producer: #生产者retries: 0 #设置大于0的值,则客户端将发送失败的记录重新发送batch-size: 16384 #批量大小buffer-momory: 33554432 #生产端缓冲区大小acks: 1 #应答级别#指定消息key和消息体的解编码方式  序列化与反序列化key- key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializconsumer:group-id: base_db_app_210325enable-auto-comnit: true #是否自动提交offsetauto-offset-reset: latest #重置为分区中最新的offset(消费者分区中新产生的数据)key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializertopic-name: ticdc-paperfree-monitor #主题listener:ack-mode: manual_immediate

3、创建MyConsumer类,添加如下内容:

 @KafkaListener(id = "test1", topics = "ticdc-paperfree-monitor")//这里id是随意起的,我这里叫test1,我这里主题直接写死,取ticdc-paperfree-monitor这个主题下的数据,也可以${},动态获取主题名称,group_idpublic void listen(ConsumerRecord<String, String> record) {//从Kafka中读取到的数据System.out.println("topic:" + record.topic());System.out.println("value:" + record.value());}

4、测试
运行主启动类,会自动进行监听且在程序运行的过程中将数据输出。
在这里插入图片描述

三、参考资料

https://blog.csdn.net/m0_67391270/article/details/126505944
https://blog.csdn.net/weixin_46271129/article/details/119800649


http://www.ppmy.cn/news/383370.html

相关文章

最新版本zookeeper+dubbo-admin

zookeeper 下载地址 :https://archive.apache.org/dist/zookeeper/ 修改conf下zoo_sample.cfg - >zoo.cfgbin下启动zkServer.cmd启动成功 :binding to port 0.0.0.0/0.0.0.0:2181 问题1&#xff1a;zookeper安装 1.去官网下载apache-zookeeper-3.6.2-bin.tar.gz名字中带有…

最新豆瓣API接口

前段时间豆瓣API不能访问了, 今天军哥写了一个php项目 技术交流QQ群 277030213 豆瓣 api php项目 https://github.com/wenyanjun/douban 豆瓣 api 地址 服务器地址 http://106.55.173.177:8081/index.php/top250?page0 1.搜索 接口 http://106.55.173.177:8081/index.php/se…

【大学计算机技术】第三、四章 测试4

文章目录 选择题 选择题 在Word 2010中&#xff0c;格式刷按钮的作用是( )。 A. 复制文本 B. 复制图形 C. 复制文本和格式 D. 复制格式 正确答案&#xff1a; D 在Word 2010的“字体”对话框中&#xff0c;不可设定文字的( )。 A. 字间距 B. 字号 C. 删除线 D. 行距 正确答案&…

Java最新JDK和API下载(持续同步更新于官网)

目录 引言一、JDK 8&#xff08;LTS&#xff09;APILinux安装包&#xff08;rpm&#xff09;32位64位 压缩包&#xff08;tar.gz&#xff09;32位64位 MacOS安装包&#xff08;dmg&#xff09;64位 Windows安装包&#xff08;exe&#xff09;32位64位 二、JDK 11&#xff08;LT…

Mysql分组查询每组最新的一条数据(三种实现方法)

MySQL分组查询每组最新的一条数据 前言注意事项准备SQL错误查询错误原因 方法一方法二&#xff08;适用于自增ID和创建时间排序一致&#xff09;方法三&#xff08;适用于自增ID和创建时间排序一致&#xff09;总结MAX()函数和MIN()这一类函数和GROUP BY配合使用存在问题 前言 …

最新目标检测算法回顾2022笔记

目标检测算法回顾2022笔记[附PPT] 总目录篇章1&#xff1a;目标检测的应用与需求篇章2&#xff1a;目标检测的定义与挑战篇章3&#xff1a;目标检测损失函数的进展篇章4&#xff1a;目标检测IOU的发展历程篇章5&#xff1a;目标检测评价指标及数据集[篇章6&#xff1a; 目标检测…

Docker 最新版Version 20.10安装

1.官网 Install Docker Engine on CentOS | Docker Documentation Dockerfile完全指南文档&#xff1a; 基础镜像的选择 (FROM) — Docker Tips 0.1 documentation 2.版本说明 Docker Engine release notes | Docker Documentation 目前最新的是20.10 3.安装的方法 官网…

windows安装VMware最新版本(VMware Workstation 17.0 Pro)详细教程

专栏地址&#xff1a;嵌入式开发 专栏文章&#xff1a; 【01】windows安装VMware最新版本(VMware Workstation 17.0 Pro)详细教程 【02】VMware17虚拟机安装Ubuntu最新版本(Ubuntu22.04LTS)详细步骤 【03】Ubuntu22.04 添加samba&#xff0c;并在windows访问 的详细教程 【04】…