Spring Boot教程之五十五:Spring Boot Kafka 消费者示例

ops/2025/1/12 11:51:08/

Spring Boot Kafka 消费者示例

Spring Boot 是 Java 编程语言中最流行和使用最多的框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作生产就绪的应用程序只需很少的时间。Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可以“直接运行”。因此,下面列出了 Spring boot 的一些主要功能。

  • 创建独立的 Spring 应用程序
  • 直接嵌入 Tomcat、Jetty 或 Undertow。
  • 提供“启动器”依赖项以简化构建配置。
  • 尽可能自动配置 Spring 和第三方库。
  • 提供可用于生产的功能,例如健康检查、指标和外部化配置。
  • 几乎不需要代码生成,也不需要 XML 配置。

Apache Kafka是一个发布-订阅消息系统。消息系统允许您在进程、应用程序和服务器之间发送消息。广义上讲,Apache Kafka 是一种可以定义和进一步处理主题(主题可能是类别)的软件。应用程序可以连接到此系统并将消息传输到主题上。消息可以包含任何类型的信息,来自您的个人博客上的任何事件,也可以是触发任何其他事件的非常简单的文本消息。在这里,我们将讨论如何使用来自 Kafka 主题的消息并使用 Spring Boot 将它们显示在控制台中,其中Kafka 是先决条件。 

例子:

先决条件:确保您已经在本地机器上安装了 Apache Kafka,因此您应该知道如何在 Windows 上安装和运行 Apache Kafka?

步骤 1:转到此链接并创建一个 Spring Boot 项目。将“ Spring for Apache Kafka ”依赖项添加到您的 Spring Boot 项目中。 

步骤2:创建一个名为KafkaConfig的配置文件。以下是KafkaConfig.java文件的代码。

  • Java

// Java Program to Illustrate Kafka Configuration

  

package com.amiya.kafka.apachekafkaconsumer.config;

  

// Importing required classes

import java.util.HashMap;

import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

  

// Annotations

@EnableKafka

@Configuration

  

// Class

public class KafkaConfig {

  

    @Bean

    public ConsumerFactory<String, String> consumerFactory()

    {

  

        // Creating a Map of string-object pairs

        Map<String, Object> config = new HashMap<>();

  

        // Adding the Configuration

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                   "127.0.0.1:9092");

        config.put(ConsumerConfig.GROUP_ID_CONFIG,

                   "group_id");

        config.put(

            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

        config.put(

            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

  

        return new DefaultKafkaConsumerFactory<>(config);

    }

  

    // Creating a Listener

    public ConcurrentKafkaListenerContainerFactory

    concurrentKafkaListenerContainerFactory()

    {

        ConcurrentKafkaListenerContainerFactory<

            String, String> factory

            = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        return factory;

    }

}

步骤 3:创建名为KafkaConsumer的消费者文件

  • Java

// Java Program to Illustrate Kafka Consumer

  

package com.amiya.kafka.apachekafkaconsumer.consumer;

  

// Importing required classes

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

  

@Component

  

// Class

public class KafkaConsumer {

  

    @KafkaListener(topics = "NewTopic",

                   groupId = "group_id")

  

    // Method

    public void

    consume(String message)

    {

        // Print statement

        System.out.println("message = " + message);

    }

}

步骤 4:现在我们必须做以下事情才能使用 Spring Boot 从 Kafka 主题消费消息

  • 运行 Apache Zookeeper 服务器
  • 运行 Apache Kafka 服务器
  • 从 Kafka 主题发送消息

使用此命令运行 Apache Zookeeper 服务器

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

类似地,使用此命令运行 Apache Kafka 服务器

C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

运行以下命令从 Kafka 主题发送消息

C:\kafka>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic

步骤 5:现在运行你的 Spring Boot 应用程序。确保已在application.properties文件中更改了端口号

server.port=8081

让我们在 ApacheKafkaConsumerApplication 文件中运行 Spring Boot 应用程序

输出:在输出中,您可以看到当您从 Kafka 主题发送消息时,它会实时显示在控制台上。 


http://www.ppmy.cn/ops/149438.html

相关文章

Flutter项目开发模版,开箱即用(Plus版本)

前言 当前案例 Flutter SDK版本&#xff1a;3.22.2 本文&#xff0c;是由这两篇文章 结合产出&#xff0c;所以非常建议大家&#xff0c;先看完这两篇&#xff1a; Flutter项目开发模版&#xff1a; 主要内容&#xff1a;MVVM设计模式及内存泄漏处理&#xff0c;涉及Model、…

C++之开散列哈希表

目录 闭散列哈希表 元素的插入 元素的查找 元素的删除 上期我们学习了闭散列哈希表&#xff0c;闭散列哈希表和开散列哈希表的区别就是插入的元素在冲突时&#xff0c;应对冲突的处理方式不同&#xff0c;本期我们将详细的学习闭散列哈希表。 闭散列哈希表 闭散列哈希表图示…

C#语言的数据结构

C#语言的数据结构探讨 数据结构是计算机科学中一种用于组织、存储和管理数据的方式。有效地使用数据结构能使算法更加高效&#xff0c;并提高程序的性能。在C#语言中&#xff0c;我们可以构建和使用多种数据结构&#xff0c;以满足不同的需求。本文将介绍C#中的常用数据结构&a…

SpringBoot操作spark处理hdfs文件

SpringBoot操作spark处理hdfs文件 1、导入依赖 <!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.2</version></dependency><depend…

STM32 : GPIO_TypeDef

结构体定义 (GPIO_TypeDef) 是STM32微控制器中用于描述GPIO端口寄存器的典型方式。每个GPIO端口&#xff08;如 GPIOA、GPIOB 等&#xff09;都由一组寄存器组成&#xff0c;这些寄存器控制和监控GPIO引脚的状态。 寄存器解释 CRL (Control Register Low): 低8位引脚的控制寄存…

数据库 -- 视图

1. 视图 1.1 什么是视图 视图是⼀个虚拟的表&#xff0c;它是基于⼀个或多个基本表或其他视图的查询结果集。视图本⾝不存储数据&#xff0c;⽽是通过执⾏查询来动态⽣成数据。⽤⼾可以像操作普通表⼀样使⽤视图进⾏查询、更新和管理。视图本⾝并不占⽤物理存储空间&#xff…

单片机实物成品-011 火灾监测

火灾监测&#xff08;20个版本&#xff09; 版本20&#xff1a; oled显示温湿度烟雾浓度火焰传感器天然气浓度窗户风扇水泵排气系统声光报警语音播报按键WIFI模块 ----------------------------------------------------------------------------- https://www.bilibili.com…

基于Python的音乐播放器 毕业设计-附源码73733

摘 要 本项目基于Python开发了一款简单而功能强大的音乐播放器。通过该音乐播放器&#xff0c;用户可以轻松管理自己的音乐库&#xff0c;播放喜爱的音乐&#xff0c;并享受音乐带来的愉悦体验。 首先&#xff0c;我们使用Python语言结合相关库开发了这款音乐播放器。利用Tkin…