SpringBoot基础Kafka示例

devtools/2025/3/13 8:00:41/

这里将生产者和消费者放在一个应用中

使用的Boot3.4.3

引入Kafka依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

yml配置


spring:application:name: kafka-1#kafka连接地址kafka:bootstrap-servers: 127.0.0.1:9092#配置生产者producer:#消息发送失败重试次数retries: 0#一个批次可以使用内存的大小batch-size: 16384#一个批次消息数量buffer-memory: 33554432#键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer#值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializeracks: allconsumer:#是否自动提交enable-auto-commit: false#自动提交的频率auto-commit-interval: 1000#earliest	从分区的最早偏移量开始消费	需要消费所有历史消息  latest	从分区的最新偏移量开始消费,忽略历史消息	只关心新消息#none	如果没有有效的偏移量,抛出异常	严格要求偏移量必须存在#exception spring-kafka不支持auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:#用于配置消费者如何处理消息的确认  ack配置方式  这里指定由消费者手动提交偏移量#Acknowledgment.acknowledge() 方法来提交偏移量ack-mode: MANUAL_IMMEDIATEconcurrency: 4
test-1: group-1
test-2: group-2
test-3: group-3server:port: 8099

生产者示例,一般可能是一个MQTT接收消息入口

package com.hrui.kafka1.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;/*** @author hrui* @date 2025/3/10 14:56*/
@RestController
public class EventProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/sendMessage")public String sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);return "Message sent to topic '" + topic + "': " + message;}@RequestMapping("/sendMessage2")public String sendMessage2() {//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();kafkaTemplate.send(message);return "Message sent to topic";}}

消费者示例

注意:如果配置了手动提交ack,那么

主要目的不仅仅是避免重复消费,而是为了确保消息的可靠处理和偏移量(offset)的正确提交。它可以避免重复消费,但更重要的是保证消息不会丢失,并且在消息处理失败时能够重新消费。

package com.hrui.kafka1.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.jms.AcknowledgeMode;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @author hrui* @date 2025/3/10 15:57*/
@Component
public class EventConsumer {@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-1}'}")public void onMessage(ConsumerRecord<String,String> message){System.out.println("接收到消息1:"+message.value());}@KafkaListener(topics = {"ceshi"},groupId = "#{'${test-2}'}")public void onMessage(String message){System.out.println("接收到消息2:"+message);}@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,@Header(KafkaHeaders.GROUP_ID) String groupId) {try {System.out.println("接收到消息3:" + message + ", ack:" + ack + ", topic:" + topic + ", groupId:" + groupId);// 处理消息逻辑// ...} catch (Exception e) {// 处理异常,记录日志System.err.println("处理消息失败: " + e.getMessage());// 可以根据业务需求决定是否重新抛出异常}finally {// 手动提交偏移量ack.acknowledge();}}
}

生产者可选择异步或者同步发送消息

生产者发送消息有同步异步之说 那么消费者在消费消息时候 有没有同步异步之说呢???

在 Kafka 消费者中,消费消息的方式本质上是由 Kafka 的设计决定的,而不是由消费者代码显式控制的。Kafka 消费者在消费消息时,通常是以拉取(poll)的方式从 Kafka 服务器获取消息,然后处理这些消息。从这个角度来看,消费者的消费行为是同步的,因为消费者需要主动调用 poll 方法来获取消息。

然而,消费者的消息处理逻辑可以是同步异步的,具体取决于业务实现。以下是对消费者消费消息的同步和异步行为的详细分析:

 消费者的同步消费

在默认情况下,Kafka 消费者的消费行为是同步的,即:

  • 消费者通过 poll 方法从 Kafka 拉取一批消息。

  • 消费者逐条处理这些消息。

  • 每条消息处理完成后,消费者提交偏移量(offset)。

  • 消费者继续调用 poll 方法获取下一批消息。

特点:
  • 消息处理是顺序的,即一条消息处理完成后才会处理下一条消息。

  • 如果某条消息处理时间较长,会影响后续消息的处理速度。

  • 适合消息处理逻辑简单、处理时间较短的场景。

@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {try {System.out.println("接收到消息:" + message.value());// 同步处理消息逻辑processMessage(message);} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());} finally {ack.acknowledge(); // 手动提交偏移量}
}private void processMessage(ConsumerRecord<String, String> message) {// 模拟消息处理逻辑try {Thread.sleep(1000); // 假设处理一条消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}

2. 消费者的异步消费

在某些场景下,消费者可能需要以异步的方式处理消息,即:

  • 消费者通过 poll 方法拉取一批消息。

  • 将每条消息提交到一个线程池或异步任务中处理。

  • 消费者继续调用 poll 方法获取下一批消息,而不等待上一条消息处理完成。

特点:
  • 消息处理是并发的,可以提高消息处理的吞吐量。

  • 需要额外的线程池或异步任务管理机制。

  • 适合消息处理逻辑复杂、处理时间较长的场景。

示例代码:

@Autowired
private ExecutorService executorService; // 注入线程池@KafkaListener(topics = {"ceshi"}, groupId = "#{'${test-3}'}")
public void onMessage(ConsumerRecord<String, String> message, Acknowledgment ack) {if (!StringUtils.hasText(message.value())) {ack.acknowledge();return;}// 提交异步任务处理消息executorService.submit(() -> {try {System.out.println("接收到消息:" + message.value());processMessage(message); // 异步处理消息} catch (Exception e) {System.err.println("处理消息失败: " + e.getMessage());} finally {ack.acknowledge(); // 手动提交偏移量}});
}private void processMessage(ConsumerRecord<String, String> message) {// 模拟消息处理逻辑try {Thread.sleep(1000); // 假设处理一条消息需要 1 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}
}

同步代码示例

@RequestMapping("/sendMessage2")public String sendMessage2(){//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);try {//阻塞等待拿结果SendResult<String, String> sendResult = send.get();System.out.println("说明消息发送成功,如果不成功会抛出异常");} catch (Exception e) {throw new RuntimeException(e);}return "Message sent to topic";}

异步注册回调的方式

 @RequestMapping("/sendMessage2")public String sendMessage2(){//通过构建器模式创建Message对象Message<String> message = MessageBuilder.withPayload("Hello, Kafka!").setHeader(KafkaHeaders.TOPIC, "ceshi").build();CompletableFuture<SendResult<String, String>> send = kafkaTemplate.send(message);//非阻塞  异步 注册回调异步通知send.thenAccept(result -> {System.out.println("消息发送成功");}).exceptionally(e->{System.out.println("发送失败");e.printStackTrace();return null;});return "Message sent to topic";}

如果需要发送的不是String类型 

那么要发送的不是String类型

KafkaTemplate<String,Object> kafkaTemplate;

一般来说可以专成JSON字符串发送

在引入spring-kafka的时候     KafkaAutoConfiguration中  配置了KafkaTemplate

Kafka<Object,Object>

如果需要用KafkaTemplate发送对象的时候

默认用的String序列化   会报错   除非将对象转为JSON字符串(一般可以这么做)

如果用对象的话   改成JsonSerializer  这样自动转JSON字符串


http://www.ppmy.cn/devtools/166716.html

相关文章

WPF基础知识41-60

动画与多媒体拓展 41. 如何实现一个复杂的组合动画&#xff0c;包含多个动画效果同时或顺序执行&#xff1f; 答案&#xff1a;可以使用 Storyboard 来组合多个动画效果。Storyboard 可以包含多个不同类型的动画&#xff08;如 DoubleAnimation、ColorAnimation 等&#xff09…

uniApp实战三:自定义插件的实现

文章目录 1.最终效果预览2.页面布局3.插件开发4.编译uni页面5.平台申请离线key6.根据证书文件查看SHA1及SHA256值7.将模块编译为aar包并生成插件8.勾选本地插件并打自定义基座包 1.最终效果预览 2.页面布局 定义一个按钮触发点击事件 <view click"testClick">…

【每日八股】Golang篇(五):垃圾回收

目录 golang 的垃圾回收&#xff1f;写屏障&#xff1f;垃圾回收的触发条件&#xff1f; golang 的垃圾回收&#xff1f; golang GC 算法使用的是无分代&#xff08;对象没有代际之分&#xff09;、不整理&#xff08;回收过程中不对对象进行移动和整理&#xff09;、并发&…

AI+API引爆数据分析:BI已成过去?

目录 1 BI的瓶颈与新时代的需求 2 AI与API如何重塑数据分析&#xff1f; 3 QuickAPI&#xff1a;AI与API的桥梁 4 数据分析的未来&#xff1a;智能、互联、即时 5 结语 在过去十年中&#xff0c;商业智能&#xff08;BI&#xff09;一直是企业数据分析的基石。仪表盘、报表…

新型神经网络KAN:准确性高且易于解释

人工神经网络&#xff08;ANN&#xff09;是现代人工智能的核心技术&#xff0c;广泛应用于聊天机器人、图像生成器等领域。然而&#xff0c;传统的神经网络由于其复杂的结构和大量的神经元连接&#xff0c;往往被视为“黑匣子”&#xff0c;难以解释其内部工作原理。近年来&am…

Uniapp实现多种文件类型上传

一、前言 在移动端开发中&#xff0c;文件上传是常见的功能需求。本文将通过Uniapp框架&#xff0c;详细讲解如何实现支持多类型文件&#xff08;图片、视频、文档等&#xff09;的上传功能&#xff0c;并解决跨平台兼容性问题&#x1f604;&#x1f604;&#x1f604;。 二、…

中国软件供应链安全技术指南|DevSecOps敏捷安全技术金字塔V3.0正式发布

2022年12月28日&#xff0c;由悬镜安全主办&#xff0c;3S-Lab软件供应链安全实验室、Linux基金会OpenChain社区、ISC、OpenSCA社区联合协办的第二届全球DevSecOps敏捷安全大会&#xff08;DSO 2022&#xff09;已通过全球直播的形式圆满举行。本届大会以“共生敏捷进化”为主题…

在线商城服务器

1、项目背景 本项目是一个基于 C语言 开发的轻量级 HTTP 服务器&#xff0c;旨在实现基本的静态文件服务和简单的动态请求处理。 核心目标&#xff1a; 支持 HTTP/1.1 协议的 GET/POST 请求解析与响应。 提供静态资源&#xff08;HTML、图片等&#xff09;的快速分发。 作为学…