Sprint Boot教程之五十八:动态启动/停止 Kafka 监听器

server/2025/1/17 17:12:06/

Spring Boot – 动态启动/停止 Kafka 监听器

当 Spring Boot 应用程序启动时,Kafka Listener 的默认行为是开始监听某个主题。但是,有些情况下我们不想在应用程序启动后立即启动它。

动态启动或停止 Kafka Listener,我们需要三种主要方法,即在需要处理 Kafka 消息时启动/停止、使用@KafkaListener注释、使用 kafkaListenerEndpointRegistry

在本文中,我们将介绍如何动态启动或停止 Kafka 监听器。

启动/停止 Kafka 监听器的不同方法

方法一

  • 当需要处理 Kafka 消息时,启动一个应用程序。
  • 处理成功后停止应用程序。

方法 2:在注册 Kafka Listener 时,我们可以设置以下 id 属性。

@KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", containerFactory = "messageListenerFactory", autoStartup = "false")public void consumeMessage(Message message)

方法 3:自动连接KafkaListenerEndpointRegistry bean 来控制 Kafka Listener 的启动或停止。

@AutowiredKafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

开始:

 public boolean startListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.start();

停止:

public boolean stopListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.stop();logger.info("{} Kafka Listener Stopped.", listenerId);

下面我们将以上述句法方法为例进行实现。

启动或停止特定 Kafka Listener的实现

创建一个类,其对象将被 Kafka 侦听器使用。

文件:Message.java

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {private String message;
}

配置 Kafka Listener 将使用的消费者。

文件:KakfaConsumerConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConsumerConfig {private String kafkaUrl = "localhost:9092";@Beanpublic ConsumerFactory<String, Message> messageConsumerFactory() {JsonDeserializer<Message> deserializer = new JsonDeserializer<>(Message.class, false);deserializer.setRemoveTypeHeaders(false);deserializer.addTrustedPackages("*");deserializer.setUseTypeMapperForKey(true);Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Message> messageListenerFactory() {ConcurrentKafkaListenerContainerFactory<String, Message> containerFactory = new ConcurrentKafkaListenerContainerFactory();containerFactory.setConsumerFactory(messageConsumerFactory());return containerFactory;}
}

创建一个具有必要参数的 Kafka 监听器。

  • id:此侦听器的容器唯一标识符。如果未指定,则使用自动生成的 ID。
  • groupId:仅为该监听器使用该值覆盖消费者工厂的 group.id 属性。
  • 主题:此侦听器的主题。条目可以是“主题名称”、“属性占位符键”或“表达式”。主题名称必须从表达式解析。这使用组管理,Kafka 将为组成员分配分区。
  • containerFactory:KafkaListenerContainerFactory的 bean 名称,将用于创建为该端点提供服务的消息侦听器容器。
  • autoStartup:设置为 true 或 false 以覆盖容器工厂的默认设置。默认情况下,该值设置为 true,因此,它将在我们的应用程序启动时立即开始使用消息。

文件:KafkaMessageListener.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;@Configuration
public class KafkaMessageListener {Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class);@KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", containerFactory = "messageListenerFactory", autoStartup = "false")public void consumeMessage(Message message) {logger.info("Message received : -> {}", message);}
}

KafkaListenerEndpointRegistry 类可用于通过 listenerId 获取 Kafka 侦听器容器。这里我们使用了@KafkaListener注释来将 bean 方法声明为 Kafka 侦听器容器的侦听器。现在可以使用此容器启动或停止 Kafka 侦听器。

文件:KafkaListenerAutomation.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;@Component
public class KafkaListenerAutomation {private final Logger logger = LoggerFactory.getLogger(KafkaListenerAutomation.class);@AutowiredKafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;public boolean startListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.start();logger.info("{} Kafka Listener Started", listenerId);return true;}public boolean stopListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.stop();logger.info("{} Kafka Listener Stopped.", listenerId);return true;}
}

使用 API 端点,我们可以通过提供 listenerID 来启动或停止特定的 Kafka 监听器。

文件:StartOrStopListenerController.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class StartOrStopListenerController {@AutowiredKafkaListenerAutomation kafkaListenerAutomation;@GetMapping("/start")public void start(@RequestParam("id") String listenerId) {kafkaListenerAutomation.startListener(listenerId);}@GetMapping("/stop")public void stop(@RequestParam("id") String listenerId) {kafkaListenerAutomation.stopListener(listenerId);}
}

输出:

1.Kafka Listener启动:

2.Kafka Listener 收到消息:

3. Kafka Listener 停止:

最后

理想情况下,应用程序应在需要处理 Kafka 消息时启动,并在该过程完成后立即停止。限制 Kafka 侦听器以有效利用它是一种很好的做法。


http://www.ppmy.cn/server/159143.html

相关文章

Android-目前最稳定和高效的UI适配方案

谈到适配&#xff0c;首先需要介绍几个基本单位&#xff1a; 1、密度无关像素&#xff08;dp&#xff09;&#xff1a; 含义&#xff1a;density-independent pixel&#xff0c;叫dp或dip&#xff0c;与终端上的实际物理像素点无关 单位&#xff1a;dp&#xff0c;可以保证在…

基于多个边缘盒子部署的综合视频安防系统的智慧地产开源了

智慧地产视觉监控平台是一款功能强大且简单易用的实时算法视频监控系统。它的愿景是最底层打通各大芯片厂商相互间的壁垒&#xff0c;省去繁琐重复的适配流程&#xff0c;实现芯片、算法、应用的全流程组合&#xff0c;从而大大减少企业级应用约95%的开发成本。 AI是新形势下数…

HTML5+Canvas实现的鼠标跟随自定义发光线条源码

源码介绍 HTML5Canvas实现的鼠标跟随自定义发光线条特效源码非常炫酷&#xff0c;在黑色的背景中&#xff0c;鼠标滑过即产生彩色变换的发光线条效果&#xff0c;且线条周围散发出火花飞射四溅的粒子光点特效。 效果预览 源码如下 <!DOCTYPE html PUBLIC "-//W3C//D…

mysql community server社区版M2 macbook快速安装

Django玩的时候用到了mysql&#xff0c;简单整理一下这个老伙计的安装教程 1. 下载地址&#xff1a;MySQL :: Download MySQL Community Server 2. M2芯片mac的话选择第一个下载&#xff0c;按提示安装即可 3. 或者直接用这篇文章附属安装包 4. 但安装之后可能会有zsh: command…

【Golang 面试题】每日 3 题(三十一)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…

智能家居篇 一、Win10 VM虚拟机安装 Home Assistant 手把手教学

智能家居篇 一、Win10 VM虚拟机安装 Home Assistant 手把手教学 文章目录 [智能家居篇]( )一、Win10 VM虚拟机安装 Home Assistant 手把手教学 前言一.下载Vm版本的HomeAsistant安装包 二.打开Vmware选择新建虚拟机1.选择自定义高级2.选择16.x及以上3.选择稍后安装4.根据官网的…

【商城系统支付安全】

商城系统支付安全是电子商务中至关重要的一环&#xff0c;直接关系到用户的隐私和财产安全。以下是对商城系统支付安全的详细分析&#xff1a; 数据加密 使用SSL或TLS等加密协议&#xff0c;对传输的用户数据进行加密。 采用HTTPS协议&#xff0c;通过将HTTP与SSL/TLS结合&am…

java项目启动时,执行某方法

1. J2EE项目 在Servlet类中重写init()方法&#xff0c;这个方法会在Servlet实例化时调用&#xff0c;即项目启动时调用。 import javax.servlet.ServletException; import javax.servlet.http.HttpServlet;public class MyServlet extends HttpServlet {Overridepublic void …