老项目接入kafka消费信息另一种方式

news/2024/10/18 7:57:13/

前言

       这次跟大家分享kafka消费的另一种接入实现。其实原因是因为目前这个项目的框架太老了,springboot还是1.5的,直接用注解@KafkaListener无法消费的问题。我也不想调这个框架,没工时不说,万一再整出兼容性问题,那问题就大了,而且现在时间太赶了。


一、目标场景

  1. 目前是物联网设备的流水上报后,会存ES,同时经过物模型解析后,会往下游kafka推送信息。
  2. 下游系统接收kafka的设备流水,进行流水解析,解析成业务数据,做业务融合。

二、使用步骤

1.引入库

代码如下(示例):

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>
<!-- spring-kafka内部依赖kafka-clients升级补偿 -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version>
</dependency>

       其实上面看起来说的是排除springboot的kafka-clients,引入自定义的kafka-clients做为升级补偿。
       编译、运行都不报错,但是使用@KafkaListener注解消费kafka信息,会报错,大致意思就是springframe版本低。应该就是低版本springboot的依赖springframe与高版本kafka-client依赖的springframe不匹配导致。
       没有去调整框架,具体就不发散了。


import com.easylinkin.emp.hngw.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.List;/*** 消费者listener** @author zhengwen**/
@Slf4j
@Component
//@Lazy
public class KafkaListenConsumer {@Resourceprivate DataTransService dataTransService;/*** 设备流水listenner** @param records 消费信息* @param ack     Ack机制*/@KafkaListener(topics = "${easylinkin.analyze.device.flow.topic.consumer}")public void deviceFlowListen(List<ConsumerRecord> records, Acknowledgment ack) {log.debug("=====设备流水deviceFlowListen消费者接收信息====");try {for (ConsumerRecord record : records) {log.debug("---开启线程解析设备流水数据:{}", record.toString());dataTransService.deviceFlowTransSave(record);}} catch (Exception e) {log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);} finally {//手动提交偏移量ack.acknowledge();}}
}

       上面就是我最初直接使用注解写的消费方法。

2.主动启动消费


import com.easylinkin.emp.hngw.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;@Slf4j
public class DeviceFlowConsumerServerStart {@Resourceprivate DataTransService dataTransService;@Value("${easylinkin.analyze.device.flow.topic.consumer}")private String topic;@Value("${spring.kafka.bootstrap-servers:localhost:9092}")private String kafkaServiceUrl;@Value("${spring.kafka.consumer.group-id}")private String groupId;@PostConstructvoid start() {log.info("设备流水消费kafka服务启动!");//配置信息Properties props = new Properties();//先自定义的设置下,再用配置里的覆盖//声明kafka的地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceUrl);//每个消费者分配独立的消费者组编号props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//如果value合法,则自动提交偏移量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//设置多久一次更新被消费消息的偏移量props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");//自动重置offsetprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerParams);//订阅消费topickafkaConsumer.subscribe(Arrays.asList(topic));startConsumer(kafkaConsumer);log.info("设备流水消费kafka服务启动完成!");}private void startConsumer(KafkaConsumer<String, String> kafkaConsumer) {new Thread(()->{while (true){try {ConsumerRecords<String,String> poll = kafkaConsumer.poll(2000);Iterable<ConsumerRecord<String,String>> records = poll.records(topic);Iterator<ConsumerRecord<String,String>> iterator = records.iterator();while (iterator.hasNext()){dataTransService.deviceFlowTransSave(iterator.next());}}catch (Exception e){log.error("消费失败",e);startConsumer(kafkaConsumer);break;}}}).start();}}

       这里设置订阅后,启用线程消费,希望是消费异常不要把这里主线程搞挂了。因为我这里消费信息,会用一个dataTransService做设备流水的进一步解析,做业务融合,可能就涉及到事物嵌套的问题。


总结

  1. 针对老项目的另一种kafka消费接入方式
  2. 老springboot是真狗,各种接入不丝滑
  3. 就写到这里,希望能帮到大家,uping!

http://www.ppmy.cn/news/1401941.html

相关文章

java Web 疫苗预约管理系统用eclipse定制开发mysql数据库BS模式java编程jdbc

一、源码特点 JSP 疫苗预约管理系统是一套完善的web设计系统&#xff0c;对理解JSP java 编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,eclipse开发&#xff0c;数据库为Mysql5.0&#xff0c;使…

二维码门楼牌管理应用平台建设:构建智慧警务新生态

文章目录 前言一、背景与意义二、平台架构与功能设计三、业务地址与标准地址的关联四、数据关联与应用场景五、总结与展望六、挑战与对策 前言 随着信息化技术的快速发展&#xff0c;二维码门楼牌管理应用平台已成为智慧城市建设的重要组成部分。本文将详细探讨如何通过该平台…

boot整合xfire

最近换了项目组&#xff0c;框架使用的boot整合的xfire&#xff0c;之前没使用过xfire&#xff0c;所以写个例子记录下&#xff0c;看 前辈的帖子 整理下 pom文件 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot…

Dockerfile:自定义镜像

Dockerfile 是一个文本文件&#xff0c;其中包含了一系列用于自动化构建Docker镜像的指令。通过编写Dockerfile&#xff0c;开发者能够明确地定义一个软件应用及其运行环境应该如何被封装进一个可移植、可重复构建的Docker镜像中。 第一步&#xff1a;在/tmp文件下新建docker…

[创业之路-102] :结构化思考:产学研人才联合创业公司的特点、优点与困境

目录 前言&#xff1a; 一、什么是产学研 1.1 什么是产学研 1.2 什么是产学研人才联合创业 二、产、学、研的区别、各自的特点 2.1 产业&#xff08;产&#xff09;特点 2.2 其次&#xff0c;学术&#xff08;学&#xff09;特点 2.3 科学研究&#xff08;研&#xff0…

TheMoon 恶意软件短时间感染 6,000 台华硕路由器以获取代理服务

文章目录 针对华硕路由器Faceless代理服务预防措施 一种名为"TheMoon"的新变种恶意软件僵尸网络已经被发现正在侵入全球88个国家数千台过时的小型办公室与家庭办公室(SOHO)路由器以及物联网设备。 "TheMoon"与“Faceless”代理服务有关联&#xff0c;该服务…

【unity】如何汉化unity Hub

相信大家下载安装unity后看着满操作栏的英文&#xff0c;英文不好的小伙伴们会一头雾水。但是没关系你要记住你要怎么高速运转的机器进入中国&#xff0c;请记住我给出的原理&#xff0c;不懂不代表不会用啊。现在我们就来把编译器给进行汉化。 第一步&#xff1a;我们打开Uni…

[Godot] 3D拾取

CollisionObject3D文档 Camera3D文档 CollisionObject3D有个信号_input_event&#xff0c;可以用于处理3D拾取。 Camera3D也有project_position用于将屏幕空间坐标投影到3D空间。 extends Node3D#是否处于选中状态 var selected : bool false #摄像机的前向量 var front : V…