springboot 配置Kafka 关闭自启动连接

server/2024/12/26 0:15:13/

这里写自定义目录标题

  • springboot 配置Kafka 关闭自启动连接
      • 方法一:使用 @ConditionalOnProperty
      • 方法二:手动管理Kafka监听器容器
      • 方法三:使用 autoStartup=false
      • 结语

springboot 配置Kafka 关闭自启动连接

在Spring Boot应用程序中,默认情况下,Kafka监听器容器会在应用程序启动时自动开始连接到Kafka broker。如果你希望禁用这种自动启动行为,可以通过配置来实现。以下是几种常见的方法:

方法一:使用 @ConditionalOnProperty

你可以使用条件注解来控制Kafka监听器容器的启动。通过设置一个属性来决定是否启用Kafka监听器。
步骤:

  1. 定义配置属性: 在你的application.yml或application.properties文件中添加一个自定义属性,用于控制Kafka监听器的启用状态。
   spring:kafka:enabled: false
  1. 使用 @ConditionalOnProperty 注解: 在你的Kafka监听器类上使用@ConditionalOnProperty注解,根据配置属性来决定是否启用该监听器。
java">   import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Component@ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true")public class MyKafkaListener {@KafkaListener(topics = "your-topic-name", groupId = "your-group-id")public void listen(String message) {System.out.println("Received Message: " + message);}}

方法二:手动管理Kafka监听器容器

另一种方法是手动管理Kafka监听器容器的生命周期,而不是依赖于Spring Boot的自动配置。
步骤:

  1. 禁用自动配置: 在你的主应用程序类或配置类上排除KafkaAutoConfiguration。
java">   import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;@SpringBootApplication(exclude = KafkaAutoConfiguration.class)public class DeviceExchangeApplication {public static void main(String[] args) {long startTime = System.currentTimeMillis();System.out.println("-----------> 数据交换链[device-exchange]启动...");SpringApplication.run(DeviceExchangeApplication.class, args);System.out.println("-----------> 数据交换链[device-exchange]启动成功,耗时:" + (System.currentTimeMillis() - startTime) + "毫秒");}}
  1. 手动创建和管理Kafka监听器容器: 创建并管理Kafka监听器容器,以便在需要的时候手动启动它们。
java">   import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import org.springframework.kafka.listener.MessageListenerContainer;import java.util.HashMap;import java.util.Map;@Configurationpublic class KafkaConfig {@Autowiredprivate MyKafkaListener myKafkaListener;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-group-id");return props;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}@Beanpublic MessageListenerContainer kafkaListenerContainer() {ConcurrentMessageListenerContainer<String, String> container =kafkaListenerContainerFactory().createContainer("your-topic-name");container.setupMessageListener(myKafkaListener::listen);return container;}}
  1. 手动启动Kafka监听器容器: 在需要的时候手动启动Kafka监听器容器。
java">   import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.kafka.listener.MessageListenerContainer;import org.springframework.stereotype.Component;@Componentpublic class KafkaStarter implements CommandLineRunner {@Autowiredprivate MessageListenerContainer kafkaListenerContainer;@Overridepublic void run(String... args) throws Exception {// 手动启动Kafka监听器容器kafkaListenerContainer.start();}}

方法三:使用 autoStartup=false

你可以在Kafka监听器容器的配置中设置autoStartup=false,这样它就不会在应用程序启动时自动启动。
步骤:

  1. 配置 autoStartup=false: 在你的Kafka监听器配置中设置autoStartup=false。
java">   import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import org.springframework.kafka.listener.MessageListenerContainer;import org.springframework.stereotype.Component;@Componentpublic class MyKafkaListener {@KafkaListener(id = "myListener", topics = "your-topic-name", autoStartup = "false")public void listen(String message) {System.out.println("Received Message: " + message);}}
  1. 手动启动Kafka监听器容器: 使用MessageListenerContainer接口的手动启动方法。
java">   import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.listener.MessageListenerContainer;import org.springframework.stereotype.Component;@Componentpublic class KafkaStarter {@Autowiredprivate MessageListenerContainer myListenerContainer;public void startKafkaListener() {myListenerContainer.start();}}

总结
通过上述三种方法,你可以有效地控制Kafka监听器容器的自动启动行为。选择适合你项目需求的方法来实现即可。通常情况下,使用@ConditionalOnProperty是最简单和灵活的方式。

结语

以上答案来自大模型,第二种和第三种都比较麻烦,最后采用了第一种方式在所有的消费类上加了@ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true"),启动就很快了,KafkaAdmin 和 KafkaConsumer就没有自动启动了。用kafkaTemplate发送消息还是会去连接Kafka服务器,不影响正常使用。
注意:必须是所有的消费类必须加,不然就不会起作用。
主要场景:一般线上部署环境才会去连接kafka,本地开发的时候 不一定要去连,所以想暂时关闭一下


http://www.ppmy.cn/server/152897.html

相关文章

《VQ-VAE》:Stable Diffusion设计的架构源泉

文章目录 阅读本文你可以了解到1 VQ-VAE的核心思想1.1 为什么VQ-VAE想要把图像编码成离散向量&#xff1f;1.2 VQ-VAE引入codebook(即embedding space嵌入空间&#xff09;1.3 VQ-VAE的工作过程 2 VQ-VAE实现方法2.1 VQ-VAE的编码器怎么输出离散向量。2.2 VQ-VAE怎么优化编码器…

Certifying LLM Safety against Adversarial Prompting

erase-and-check erase&#xff1a;逐一删除prompt中的词元&#xff08;token&#xff09; check&#xff1a;用安全过滤器检查生成的子序列。 如果任何子序列或输入提示本身被过滤器检测为有害&#xff0c;则将该提示标记为有害。 如图&#xff0c;对有对抗性后缀的有害pr…

「Mac畅玩鸿蒙与硬件47」UI互动应用篇24 - 虚拟音乐控制台

本篇将带你实现一个虚拟音乐控制台。用户可以通过界面控制音乐的播放、暂停、切换歌曲&#xff0c;并查看当前播放的歌曲信息。页面还支持调整音量和动态显示播放进度&#xff0c;是音乐播放器界面开发的基础功能示例。 关键词 UI互动应用音乐控制播放控制动态展示状态管理按钮…

网络协议与网络安全学习记录

SSL(Secure Sockets Layer 安全套接层),及其继任者传输层安全&#xff08;Transport Layer Security&#xff0c;TLS&#xff09;是为网络通信提供安全及数据完整性的一种安全协议。TLS与SSL在传输层对网络连接进行加密 HTTPS,代表Hyper Text Transfer Protocol Secure,将SSL/T…

redis编译安装(版本6.2.6)

redis编译安装&#xff08;版本6.2.6&#xff09; 安装 官网&#xff1a;https://redis.io下载&#xff1a;http://download.redis.io/releases中文网&#xff1a;https://www.redis.net.cn/ tar -zxvf redis-6.2.6.tar.gz -C /usr/redis [rootlocalhost redis-6.2.6]# make…

前端TypeScript学习day01-TS介绍与TS部分常用类型

(创作不易&#xff0c;感谢有你&#xff0c;你的支持&#xff0c;就是我前行的最大动力&#xff0c;如果看完对你有帮助&#xff0c;请留下您的足迹&#xff09; 目录 TypeScript 介绍 TypeScript 是什么 TypeScript 为什么要为 JS 添加类型支持&#xff1f;? TypeScript …

Pytorch | 从零构建EfficientNet对CIFAR10进行分类

Pytorch | 从零构建EfficientNet对CIFAR10进行分类 CIFAR10数据集EfficientNet设计理念网络结构性能特点应用领域发展和改进 EfficientNet结构代码详解结构代码代码详解MBConv 类初始化方法前向传播 forward 方法 EfficientNet 类初始化方法前向传播 forward 方法 训练过程和测…

latex中复制到word里面之后如何转变成word自带的公式

转自latex中复制到word里面之后如何转变成word自带的公式_latex公式转word-CSDN博客