【Kafka】SpringBoot整合Kafka详细介绍及代码示例

ops/2024/9/25 19:18:39/

Kafka介绍

Apache Kafka是一个分布式流处理平台。它最初由LinkedIn开发,后来成为Apache软件基金会的一部分,并在开源社区中得到了广泛应用。Kafka的核心概念包括Producer、Consumer、Broker、Topic、Partition和Offset。

  • Producer:生产者,负责将数据发送到Kafka集群。
  • Consumer:消费者,从Kafka集群中读取数据。
  • Broker:Kafka服务器实例,Kafka集群通常由多个Broker组成。
  • Topic:主题,数据按主题进行分类。
  • Partition:分区,每个主题可以有多个分区,用于实现并行处理和提高吞吐量。
  • Offset:偏移量,每个消息在其分区中的唯一标识。

使用场景

Kafka适用于以下场景:

  1. 日志收集:集中收集系统日志和应用日志,通过Kafka传输到大数据处理系统。
  2. 消息队列:作为高吞吐量、低延迟的消息队列系统。
  3. 数据流处理:实时处理数据流,用于实时分析、监控和处理。
  4. 事件源架构:将所有的变更事件存储在Kafka中,实现事件溯源和回放。
  5. 流数据管道:构建数据管道,连接数据源和数据存储系统。

Spring Boot整合Kafka 

项目结构

springboot-kafka
│
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com.example.kafka
│   │   │       ├── KafkaApplication.java
│   │   │       ├── config
│   │   │       │   └── KafkaConfig.java
│   │   │       ├── producer
│   │   │       │   └── KafkaProducer.java
│   │   │       ├── consumer
│   │   │       │   └── KafkaConsumer.java
│   │   │       └── controller
│   │   │           └── KafkaController.java
│   │   └── resources
│   │       ├── application.yml
│   │       └── logback-spring.xml (可选)
│   └── test
│       └── java
│           └── com.example.kafka
│               └── KafkaApplicationTests.java
└── pom.xml

1. 创建Spring Boot项目并添加依赖

pom.xml
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. 配置Kafka

application.yml
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

3. 创建Kafka配置类

KafkaConfig.java
java">package com.example.kafka.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class KafkaConfig {@Beanpublic NewTopic myTopic() {return new NewTopic("my-topic", 1, (short) 1);}
}

4. 创建Kafka生产者

KafkaProducer.java
java">package com.example.kafka.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
}

5. 创建Kafka消费者

KafkaConsumer.java
java">package com.example.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group")public void listen(String message) {System.out.println("Received message: " + message);}
}

6. 创建控制器发送消息

KafkaController.java
java">package com.example.kafka.controller;import com.example.kafka.producer.KafkaProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private final KafkaProducer kafkaProducer;public KafkaController(KafkaProducer kafkaProducer) {this.kafkaProducer = kafkaProducer;}@GetMapping("/send")public String sendMessage(@RequestParam String message) {kafkaProducer.sendMessage("my-topic", message);return "Message sent";}
}

7. 创建Spring Boot主类

KafkaApplication.java
java">package com.example.kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}
}

8. 测试应用

通过访问以下URL来发送消息:

java">http://localhost:8080/send?message=HelloKafka

9. 日志配置(可选)

为了更好地查看Kafka的日志,可以添加logback-spring.xml配置:

logback-spring.xml
<configuration><springProfile name="default"><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss} - %msg%n</pattern></encoder></appender><logger name="org.apache.kafka" level="INFO"/><root level="INFO"><appender-ref ref="STDOUT"/></root></springProfile>
</configuration>

10. 测试类(可选)

KafkaApplicationTests.java
java">package com.example.kafka;import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class KafkaApplicationTests {@Testvoid contextLoads() {}
}

至此,你已经完成了Spring Boot整合Kafka的详细配置和代码示例。你可以根据实际需求进一步扩展和修改这个基础代码。


http://www.ppmy.cn/ops/48867.html

相关文章

exfat文件系统无法NFS导出的问题

最近项目中移植了exfat-linux驱动&#xff0c;但发现exfat格式的U盘无法用exportfs命令在NFS上导出。这篇文章记录了分析、解决方法。 一、问题现象 问题描述&#xff1a;exfat驱动更新后&#xff0c;exfat格式的U盘用exportfs命令NFS导出会报错 $ exportfs -o ro,fsid0,no_ro…

Spring Boot中的RESTful API详细介绍及使用

在Spring Boot中&#xff0c;RESTful API的实现通过控制器类中的方法和特定的注解来完成。每个注解对应不同的HTTP请求方法&#xff0c;并通过处理请求参数和返回响应来实现不同的操作。 下面将详细解释RESTful API中的各个方面&#xff0c;包括GetMapping, PostMapping, PutMa…

中小企业使用CRM系统的优势有哪些

中小企业如何在竞争激烈的市场中脱颖而出&#xff1f;除了优秀的产品和服务&#xff0c;一个高效的管理工具也是必不可少的。而客户关系管理&#xff08;CRM&#xff09;系统正是这样一个能帮助企业提升客户体验、优化内部管理流程的重要工具。接下来&#xff0c;让我们一起探讨…

1586. 扫地机器人

问题描述 Mike同学在为扫地机器人设计一个在矩形区域中行走的算法,Mike是这样设计的:先把机器人放在出发点 (1,1)(1,1) 点上,机器人在每个点上都会沿用如下的规则来判断下一个该去的点是哪里。规则:优先向右,如果向右不能走(比如:右侧出了矩形或者右侧扫过了)则尝试向…

MYSQL 三、mysql基础知识 4(存储过程与函数)

MySQL从5.0版本开始支持存储过程和函数。存储过程和函数能够将复杂的SQL逻辑封装在一起&#xff0c;应用程序无须关注存储过程和函数内部复杂的SQL逻辑&#xff0c;而只需要简单地调用存储过程和函数即可。 一、存储过程概述&#xff1a; 1.1理解&#xff1a; 含义&am…

collections.defaultdict(int)

collections.defaultdict 是 Python collections 模块中的一个类&#xff0c;它提供了一种便捷的方式来创建带有默认值的字典。当你尝试访问一个不存在的键时&#xff0c;它会自动为该键创建一个默认值。这在处理计数、分类等任务时特别有用。 collections.defaultdict(int) 特…

算法:位运算题目练习

目录 常见的位运算的操作总结 ①基础位操作 ②给一个数n&#xff0c;确定它的二进制表示中的第x位是0还是1 ③将一个数n的二进制表示的第x位修改成1 ④将一个数n的二进制表示的第x位修改成0 ⑤位图的思想 ⑥提取一个数n二进制表示中最右侧的1 ⑦干掉一个数n二进制表示中…

【知识点】std::thread::detach std::lock_guard std::unique_lock

在 C11 中&#xff0c;std::thread 提供了并发编程的基础设施&#xff0c;使得我们可以创建和管理线程。std::thread 的 detach 方法是一种常用的线程管理方式&#xff0c;允许线程在后台独立运行&#xff0c;而不必与主线程同步或等待其完成。 std::thread::detach 方法 当你…