springboot 配置Kafka 关闭自启动连接

news/2024/12/25 3:59:15/

这里写自定义目录标题

  • springboot 配置Kafka 关闭自启动连接
      • 方法一:使用 @ConditionalOnProperty
      • 方法二:手动管理Kafka监听器容器
      • 方法三:使用 autoStartup=false
      • 结语

springboot 配置Kafka 关闭自启动连接

在Spring Boot应用程序中,默认情况下,Kafka监听器容器会在应用程序启动时自动开始连接到Kafka broker。如果你希望禁用这种自动启动行为,可以通过配置来实现。以下是几种常见的方法:

方法一:使用 @ConditionalOnProperty

你可以使用条件注解来控制Kafka监听器容器的启动。通过设置一个属性来决定是否启用Kafka监听器。
步骤:

  1. 定义配置属性: 在你的application.yml或application.properties文件中添加一个自定义属性,用于控制Kafka监听器的启用状态。
   spring:kafka:enabled: false
  1. 使用 @ConditionalOnProperty 注解: 在你的Kafka监听器类上使用@ConditionalOnProperty注解,根据配置属性来决定是否启用该监听器。
java">   import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Component@ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true")public class MyKafkaListener {@KafkaListener(topics = "your-topic-name", groupId = "your-group-id")public void listen(String message) {System.out.println("Received Message: " + message);}}

方法二:手动管理Kafka监听器容器

另一种方法是手动管理Kafka监听器容器的生命周期,而不是依赖于Spring Boot的自动配置。
步骤:

  1. 禁用自动配置: 在你的主应用程序类或配置类上排除KafkaAutoConfiguration。
java">   import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;@SpringBootApplication(exclude = KafkaAutoConfiguration.class)public class DeviceExchangeApplication {public static void main(String[] args) {long startTime = System.currentTimeMillis();System.out.println("-----------> 数据交换链[device-exchange]启动...");SpringApplication.run(DeviceExchangeApplication.class, args);System.out.println("-----------> 数据交换链[device-exchange]启动成功,耗时:" + (System.currentTimeMillis() - startTime) + "毫秒");}}
  1. 手动创建和管理Kafka监听器容器: 创建并管理Kafka监听器容器,以便在需要的时候手动启动它们。
java">   import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import org.springframework.kafka.listener.MessageListenerContainer;import java.util.HashMap;import java.util.Map;@Configurationpublic class KafkaConfig {@Autowiredprivate MyKafkaListener myKafkaListener;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id");return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}@Beanpublic MessageListenerContainer kafkaListenerContainer() {ConcurrentMessageListenerContainer<String, String> container =kafkaListenerContainerFactory().createContainer("your-topic-name");container.setupMessageListener(myKafkaListener::listen);return container;}}
  1. 手动启动Kafka监听器容器: 在需要的时候手动启动Kafka监听器容器。
java">   import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.kafka.listener.MessageListenerContainer;import org.springframework.stereotype.Component;@Componentpublic class KafkaStarter implements CommandLineRunner {@Autowiredprivate MessageListenerContainer kafkaListenerContainer;@Overridepublic void run(String... args) throws Exception {// 手动启动Kafka监听器容器kafkaListenerContainer.start();}}

方法三:使用 autoStartup=false

你可以在Kafka监听器容器的配置中设置autoStartup=false,这样它就不会在应用程序启动时自动启动。
步骤:

  1. 配置 autoStartup=false: 在你的Kafka监听器配置中设置autoStartup=false。
java">   import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import org.springframework.kafka.listener.MessageListenerContainer;import org.springframework.stereotype.Component;@Componentpublic class MyKafkaListener {@KafkaListener(id = "myListener", topics = "your-topic-name", autoStartup = "false")public void listen(String message) {System.out.println("Received Message: " + message);}}
  1. 手动启动Kafka监听器容器: 使用MessageListenerContainer接口的手动启动方法。
java">   import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.listener.MessageListenerContainer;import org.springframework.stereotype.Component;@Componentpublic class KafkaStarter {@Autowiredprivate MessageListenerContainer myListenerContainer;public void startKafkaListener() {myListenerContainer.start();}}

总结
通过上述三种方法,你可以有效地控制Kafka监听器容器的自动启动行为。选择适合你项目需求的方法来实现即可。通常情况下,使用@ConditionalOnProperty是最简单和灵活的方式。

结语

以上答案来自大模型,第二种和第三种都比较麻烦,最后采用了第一种方式在所有的消费类上加了@ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true"),启动就很快了,KafkaAdmin 和 KafkaConsumer就没有自动启动了。用kafkaTemplate发送消息还是会去连接Kafka服务器,不影响正常使用。
注意:必须是所有的消费类必须加,不然就不会起作用。
主要场景:一般线上部署环境才会去连接kafka,本地开发的时候 不一定要去连,所以想暂时关闭一下


http://www.ppmy.cn/news/1557903.html

相关文章

新版国标GB28181设备端Android版EasyGBD支持国标GB28181-2022,支持语音对讲,支持位置上报,开源在Github

经过近3个月的迭代开发&#xff0c;新版本的国标GB28181设备端EasyGBD安卓Android版终于在昨天发布到Github了&#xff0c;最新的EasyGBD支持了国标GB28181-2022版&#xff0c;还支持了语音对讲、位置上报、本地录像等功能&#xff0c;比原有GB28181-2016版的EasyGBD更加高效、…

用Python设置Excel工作表的页眉和页脚

在处理和分析数据时&#xff0c;Excel作为一款功能强大的工具&#xff0c;被广泛应用于各个领域。当涉及到打印或分享工作表时&#xff0c;为文档添加专业的页眉和页脚不仅能提升文件的视觉效果&#xff0c;还能提供必要的信息&#xff0c;例如公司标识、日期、文件名或是页码等…

微信小程序的轮播图学习报告

微信小程序轮播图学习报告 好久都没分享新内容了&#xff0c;实在惭愧惭愧。今天给大家做一个小程序轮播图的学习报告。 先给大家看一下我的项目状态&#xff1a; 很空昂&#xff01;像一个正在修行的老道&#xff0c;空的什么也没有。 但是我写了 4 个 view 容器&#xff0c;…

C05S07-Tomcat服务架设

一、Tomcat 1. Tomcat概述 Tomcat也是一个Web应用程序&#xff0c;具有三大核心功能。 Java Servlet&#xff1a;Tomcat是一个Servlet容器&#xff0c;负责管理和执行Java Servlet、服务端的Java程序&#xff0c;处理客户端的HTTP请求和响应。Java Server&#xff1a;服务端…

Unity全局雾效

1、全局雾效是什么 全局雾效&#xff08;Global Fog&#xff09;是一种视觉效果&#xff0c;用于在3D场景中模拟大气中的雾气对远处物体的遮挡 它通过在场景中加入雾的效果&#xff0c;使得距离摄像机较远的物体看起来逐渐被雾气覆盖&#xff0c;从而创造出一种朦胧、模糊的视…

MMAudio - 自动给视频配音效

MMAudio 在给定视频和/或文本输入的情况下生成同步音频。我们的关键创新是多模式联合训练&#xff0c;它允许对广泛的视听和音频文本数据集进行训练。此外&#xff0c;同步模块将生成的音频与视频帧对齐。 419 Stars 26 Forks 2 Issues NA 贡献者 MIT License Python 语言 代码…

Docker环境下MySQL数据库持久化部署全攻略

概述 在当今的软件开发领域&#xff0c;Docker容器技术已经成为应用部署和管理的新标准。它不仅简化了应用的部署流程&#xff0c;还为数据管理提供了灵活的解决方案。特别是在涉及到MySQL数据库时&#xff0c;数据持久化是一个不可忽视的重要环节。本文将分享如何在Docker中部…

websocket 局域网 webrtc 一对一 多对多 视频通话 的示例

基本介绍 WebRTC&#xff08;Web Real-Time Communications&#xff09;是一项实时通讯技术&#xff0c;它允许网络应用或者站点&#xff0c;在不借助中间媒介的情况下&#xff0c;建立浏览器之间点对点&#xff08;Peer-to-Peer&#xff09;的连接&#xff0c;实现视频流和&am…