前言
这次跟大家分享kafka消费的另一种接入实现。其实原因是因为目前这个项目的框架太老了,springboot还是1.5的,直接用注解@KafkaListener无法消费的问题。我也不想调这个框架,没工时不说,万一再整出兼容性问题,那问题就大了,而且现在时间太赶了。
一、目标场景
- 目前是物联网设备的流水上报后,会存ES,同时经过物模型解析后,会往下游kafka推送信息。
- 下游系统接收kafka的设备流水,进行流水解析,解析成业务数据,做业务融合。
二、使用步骤
1.引入库
代码如下(示例):
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>
<!-- spring-kafka内部依赖kafka-clients升级补偿 -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version>
</dependency>
其实上面看起来说的是排除springboot的kafka-clients,引入自定义的kafka-clients做为升级补偿。
编译、运行都不报错,但是使用@KafkaListener注解消费kafka信息,会报错,大致意思就是springframe版本低。应该就是低版本springboot的依赖springframe与高版本kafka-client依赖的springframe不匹配导致。
没有去调整框架,具体就不发散了。
import com.easylinkin.emp.hngw.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.List;/*** 消费者listener** @author zhengwen**/
@Slf4j
@Component
//@Lazy
public class KafkaListenConsumer {@Resourceprivate DataTransService dataTransService;/*** 设备流水listenner** @param records 消费信息* @param ack Ack机制*/@KafkaListener(topics = "${easylinkin.analyze.device.flow.topic.consumer}")public void deviceFlowListen(List<ConsumerRecord> records, Acknowledgment ack) {log.debug("=====设备流水deviceFlowListen消费者接收信息====");try {for (ConsumerRecord record : records) {log.debug("---开启线程解析设备流水数据:{}", record.toString());dataTransService.deviceFlowTransSave(record);}} catch (Exception e) {log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);} finally {//手动提交偏移量ack.acknowledge();}}
}
上面就是我最初直接使用注解写的消费方法。
2.主动启动消费
import com.easylinkin.emp.hngw.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;@Slf4j
public class DeviceFlowConsumerServerStart {@Resourceprivate DataTransService dataTransService;@Value("${easylinkin.analyze.device.flow.topic.consumer}")private String topic;@Value("${spring.kafka.bootstrap-servers:localhost:9092}")private String kafkaServiceUrl;@Value("${spring.kafka.consumer.group-id}")private String groupId;@PostConstructvoid start() {log.info("设备流水消费kafka服务启动!");//配置信息Properties props = new Properties();//先自定义的设置下,再用配置里的覆盖//声明kafka的地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceUrl);//每个消费者分配独立的消费者组编号props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//如果value合法,则自动提交偏移量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");//设置多久一次更新被消费消息的偏移量props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");//自动重置offsetprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerParams);//订阅消费topickafkaConsumer.subscribe(Arrays.asList(topic));startConsumer(kafkaConsumer);log.info("设备流水消费kafka服务启动完成!");}private void startConsumer(KafkaConsumer<String, String> kafkaConsumer) {new Thread(()->{while (true){try {ConsumerRecords<String,String> poll = kafkaConsumer.poll(2000);Iterable<ConsumerRecord<String,String>> records = poll.records(topic);Iterator<ConsumerRecord<String,String>> iterator = records.iterator();while (iterator.hasNext()){dataTransService.deviceFlowTransSave(iterator.next());}}catch (Exception e){log.error("消费失败",e);startConsumer(kafkaConsumer);break;}}}).start();}}
这里设置订阅后,启用线程消费,希望是消费异常不要把这里主线程搞挂了。因为我这里消费信息,会用一个dataTransService做设备流水的进一步解析,做业务融合,可能就涉及到事物嵌套的问题。
总结
- 针对老项目的另一种kafka消费接入方式
- 老springboot是真狗,各种接入不丝滑
- 就写到这里,希望能帮到大家,uping!