Spring Boot教程之五十五:Spring Boot Kafka 消费者示例

embedded/2025/1/17 19:32:04/

Spring Boot Kafka 消费者示例

Spring Boot 是 Java 编程语言中最流行和使用最多的框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作生产就绪的应用程序只需很少的时间。Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可以“直接运行”。因此,下面列出了 Spring boot 的一些主要功能。

  • 创建独立的 Spring 应用程序
  • 直接嵌入 Tomcat、Jetty 或 Undertow。
  • 提供“启动器”依赖项以简化构建配置。
  • 尽可能自动配置 Spring 和第三方库。
  • 提供可用于生产的功能,例如健康检查、指标和外部化配置。
  • 几乎不需要代码生成,也不需要 XML 配置。

Apache Kafka是一个发布-订阅消息系统。消息系统允许您在进程、应用程序和服务器之间发送消息。广义上讲,Apache Kafka 是一种可以定义和进一步处理主题(主题可能是类别)的软件。应用程序可以连接到此系统并将消息传输到主题上。消息可以包含任何类型的信息,来自您的个人博客上的任何事件,也可以是触发任何其他事件的非常简单的文本消息。在这里,我们将讨论如何使用来自 Kafka 主题的消息并使用 Spring Boot 将它们显示在控制台中,其中Kafka 是先决条件。 

例子:

先决条件:确保您已经在本地机器上安装了 Apache Kafka,因此您应该知道如何在 Windows 上安装和运行 Apache Kafka?

步骤 1:转到此链接并创建一个 Spring Boot 项目。将“ Spring for Apache Kafka ”依赖项添加到您的 Spring Boot 项目中。 

步骤2:创建一个名为KafkaConfig的配置文件。以下是KafkaConfig.java文件的代码。

  • Java

// Java Program to Illustrate Kafka Configuration

  

package com.amiya.kafka.apachekafkaconsumer.config;

  

// Importing required classes

import java.util.HashMap;

import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

  

// Annotations

@EnableKafka

@Configuration

  

// Class

public class KafkaConfig {

  

    @Bean

    public ConsumerFactory<String, String> consumerFactory()

    {

  

        // Creating a Map of string-object pairs

        Map<String, Object> config = new HashMap<>();

  

        // Adding the Configuration

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                   "127.0.0.1:9092");

        config.put(ConsumerConfig.GROUP_ID_CONFIG,

                   "group_id");

        config.put(

            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

        config.put(

            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

  

        return new DefaultKafkaConsumerFactory<>(config);

    }

  

    // Creating a Listener

    public ConcurrentKafkaListenerContainerFactory

    concurrentKafkaListenerContainerFactory()

    {

        ConcurrentKafkaListenerContainerFactory<

            String, String> factory

            = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        return factory;

    }

}

步骤 3:创建名为KafkaConsumer的消费者文件

  • Java

// Java Program to Illustrate Kafka Consumer

  

package com.amiya.kafka.apachekafkaconsumer.consumer;

  

// Importing required classes

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

  

@Component

  

// Class

public class KafkaConsumer {

  

    @KafkaListener(topics = "NewTopic",

                   groupId = "group_id")

  

    // Method

    public void

    consume(String message)

    {

        // Print statement

        System.out.println("message = " + message);

    }

}

步骤 4:现在我们必须做以下事情才能使用 Spring Boot 从 Kafka 主题消费消息

  • 运行 Apache Zookeeper 服务器
  • 运行 Apache Kafka 服务器
  • 从 Kafka 主题发送消息

使用此命令运行 Apache Zookeeper 服务器

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

类似地,使用此命令运行 Apache Kafka 服务器

C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

运行以下命令从 Kafka 主题发送消息

C:\kafka>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic

步骤 5:现在运行你的 Spring Boot 应用程序。确保已在application.properties文件中更改了端口号

server.port=8081

让我们在 ApacheKafkaConsumerApplication 文件中运行 Spring Boot 应用程序

输出:在输出中,您可以看到当您从 Kafka 主题发送消息时,它会实时显示在控制台上。 


http://www.ppmy.cn/embedded/154741.html

相关文章

Android 13 Hotseat定制化修改——001 hotseat布局方向

一.背景 由于需求是需要自定义修改Hotseat&#xff0c;所以此篇文章是记录如何自定义修改hotseat的&#xff0c;应该可以覆盖大部分场景&#xff0c;修改点有修改hotseat布局方向&#xff0c;hotseat图标数量&#xff0c;hotseat图标大小&#xff0c;hotseat布局位置&#xff0…

SpringBoot开发——Spring Boot 3.3实现多端数据一致性的实时数据同步方案

文章目录 1、基于WebSocket的即时推送2、利用Kafka实现异步数据同步3、数据库变更监听与触发小结 在数字化浪潮下&#xff0c;业务横跨Web端、移动端&#xff0c;数据实时同步成了刚需。 Spring Boot 3.3携强大方案登场&#xff0c;为多端数据一致性难题精准“破局”。 1、基于…

不触碰资金的支付网关有哪些?

在加密货币支付领域&#xff0c;资金安全始终是商家和消费者最关心的问题之一。传统的支付网关通常需要用户将资金托管给第三方平台&#xff0c;这种方式虽然方便&#xff0c;但也带来了潜在的安全风险。近年来&#xff0c;一种基于智能合约的支付网关模式逐渐兴起&#xff0c;…

《Java核心技术II》用Java连接到服务器

用Java连接到服务器 用程序访问&#xff0c;通telnet工具相同。 Socket&#xff0c;英文意思(原意是插座)&#xff1a;计算机中翻译为 套接字。 所谓套接字(Socket)&#xff0c;就是对网络中不同主机上的应用进程之间进行双向通信的端点的抽象。一个套接字就是网络上进程通信的…

Web开发(一)HTML5

Web开发&#xff08;一&#xff09;HTML5 写在前面 参考黑马程序员前端Web教程做的笔记&#xff0c;主要是想后面自己搭建网页玩。 这部分是前端HTML5CSS3移动web视频教程的HTML5部分。主要涉及到HTML的基础语法。 HTML基础 标签定义 HTML定义 HTML(HyperText Markup Lan…

【一个按钮一个LED】用STM32F030单片机实现苹果充电器的定时装置

文章目录 前言一、要实现的功能1、循环定时2、倒计时3、指示灯提示4、使用场景二、实现方法1、使用方法2、电路设计三、程序代码和成品1.定时中断子程序2.键值处理3.主函数总结前言 笔者前几年买苹果手机、IPAD配的适配器是A1443型号,这种5V1A,USB-A口、小功率的适配器,苹果…

C# 中对 Task 中的异常进行捕获

以下是在 C# 中对 Task 中的异常进行捕获的几种常见方法&#xff1a; 方法一&#xff1a;使用 try-catch 语句 你可以使用 try-catch 语句来捕获 Task 中的异常&#xff0c;尤其是当你使用 await 关键字等待任务完成时。 using System; using System.Threading.Tasks;class …

网络学习记录6

查找下一跳和流量如何通过&#xff0c;是网络路由的基本概念。下面我会尽量用通俗易懂的方式来解释这个过程。 查找下一跳 数据包的目的地&#xff1a;当一个数据包在网络中传输时&#xff0c;它的目标是一个特定的IP地址。 路由表的作用&#xff1a;路由器有一个叫做路由表的东…