SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

news/2025/1/16 11:01:01/

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

上一篇直通车

SpringBoot整合SpringCloudStream3.1+版本Kafka

实现死信队列步骤

  1. 添加死信队列配置文件,添加对应channel
  2. 通道绑定配置对应的channel位置添加重试配置

结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

配置文件

Kafka基本配置(application-mq.yml)

server:port: 7105
spring:application:name: betrice-message-queueconfig:import:- classpath:application-bindings.ymlcloud:stream:kafka:binder:brokers: localhost:9092configuration:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer-properties:enable.auto.commit: falsebinders:betrice-kafka:type: kafkaenvironment:spring.kafka:bootstrap-servers: ${spring.cloud.stream.kafka.binder.brokers}

创建死信队列配置文件(application-dql.yml)

在这里插入图片描述

spring:cloud:stream:kafka:bindings:dqlTransfer-in-0:consumer:# When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>.# messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[].# By default, a failed record is sent to the same partition number in the DLQ topic as the original record.enableDlq: truedlqName: Evad05-message-dlqkeySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
#              valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerdevalueSerde: com.devilvan.pojo.Evad05MessageSerdeautoCommitOnError: trueautoCommitOffset: true

注意:这里的valueSerde使用了对象类型,需要搭配application/json使用,consumer接收到消息后会转化为json字符串

通道绑定文件添加配置(application-bindings.yml)

channel对应上方配置文件的dqlTransfer-in-0

在这里插入图片描述

spring:cloud:stream:betrice-default-binder: betrice-kafkafunction:# 声明两个channel,transfer接收生产者的消息,处理完后给sinkdefinition: transfer;sink;gather;gatherEcho;dqlTransfer;evad05DlqConsumerbindings:# 添加生产者bindiing,输出到destination对应的topicdqlTransfer-in-0:destination: Evad10binder: ${spring.cloud.stream.betrice-default-binder}group: evad05DlqConsumer # 使用死信队列必须要有groupcontent-type: application/jsonconsumer:maxAttempts: 2 # 当消息消费失败时,尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 消息消费失败后重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2,即第二次是第一次间隔时间的2倍,第三次是第二次的2倍backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s。dqlTransfer-out-0:destination: Evad10binder: ${spring.cloud.stream.betrice-default-binder}content-type: text/plain# 消费死信队列中的消息evad05DlqConsumer-in-0:destination: Evad05-message-dlqbinder: ${spring.cloud.stream.betrice-default-binder}content-type: text/plain

Controller

发送消息并将消息引入死信队列

@Slf4j
@RestController
@RequestMapping(value = "betriceMqController")
public class BetriceMqController {@Resource(name = "streamBridgeUtils")private StreamBridge streamBridge;@PostMapping("streamSend")public void streamSend(String topic, String message) {try {streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}}@PostMapping("streamSendDql")public void streamSendDql(String topic, String message) {try {streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}}@PostMapping("streamSendJsonDql")public void streamSendJsonDql(String topic) {try {Evad05MessageSerde message = new Evad05MessageSerde();message.setData("evad05 test dql");message.setCount(1);streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}}
}

Channel

这里使用了transfer通道,消息从Evad10(topic)传来,经过transfer()方法后抛出异常,随后进入对应的死信队列

@Configuration
public class BetriceMqSubChannel {@Beanpublic Function<String, String> dqlTransfer() {return message -> {System.out.println("transfer: " + message);throw new RuntimeException("死信队列测试!");};}@Beanpublic Consumer<String> evad05DlqConsumer() {return message -> {System.out.println("Topic: evad05 Dlq Consumer: " + message);};}
}

将自定义序列化类型转换为JSON消息

步骤

1. 通道绑定文件(application-bindings.yml)的valueSerde属性添加自定义的序列化

在这里插入图片描述

2. BetriceMqController中封装该自定义类型的对象,并作为消息发送

@PostMapping("streamSendJsonDql")
public void streamSendJsonDql(String topic) {try {Evad05MessageSerde message = new Evad05MessageSerde();message.setData("evad05 test dql");message.setCount(1);streamBridge.send(topic, message);log.info("发送消息:" + message);} catch (Exception e) {log.error("异常消息:" + e);}
}

3. channel(BetriceMqSubChannel)接收到该消息并反序列化

@Bean
public Consumer<String> evad05DlqConsumer() {return message -> {System.out.println("Topic: evad05 Dlq Consumer: " + JSON.parseObject(message, Evad05MessageSerde.class));};
}

4. 结果

在这里插入图片描述
在这里插入图片描述

参考网址

Kafka 消费端消费重试和死信队列 - Java小强技术博客 (javacui.com)
spring cloud stream kafka rabbit 实现死信队列_spring cloud stream kafka 死信队列_it噩梦的博客-CSDN博客


http://www.ppmy.cn/news/949293.html

相关文章

springboot整合feign实现RPC调用,并通过Hystrix实现服务降级

目录 一、服务提供者 二、服务消费者 三、测试效果 四、开启Hystrix实现服务降级 feign/openfeign和dubbo是常用的微服务RPC框架&#xff0c;由于feigin内部已经集成ribbon&#xff0c;自带了负载均衡的功能&#xff0c;当有多个同名的服务注册到注册中心时&#xff0c;会根…

ChatGPT 中文版插件来了

点击关注公众号&#xff1a;互联网架构师&#xff0c;后台回复 2T获取2TB学习资源&#xff01; 上一篇&#xff1a;Alibaba开源内网高并发编程手册.pdf 转自 | 机器之心 ChatGPT 的 Debug 功能&#xff0c;有人应用化了。 ChatGPT 这几天可谓是风头无两。作为一个问答语言模型&…

ChatGPT中文版:一款让人工智能更接近自然语言的创新模型

随着人工智能技术的发展&#xff0c;越来越多的企业和机构开始应用自然语言处理&#xff08;NLP&#xff09;技术&#xff0c;以提高文本分析、信息检索、对话系统等方面的效率和精度。而在这个领域里&#xff0c;ChatGPT模型可以说是一款非常重要的人工智能模型之一。 ChatGP…

chatGPT中文版设定冷知识

ChatGPT是一个基于自然语言处理技术的聊天机器人&#xff0c;能够根据输入文本进行智能回复。对于中文版的ChatGPT设定&#xff0c;以下是一些重要步骤&#xff1a; 选择中文预训练模型 在开始使用ChatGPT中文版之前&#xff0c;你需要选择适合中文场景的预训练模型。目前市面…

JavaScript基本操作数组方法

1、访问元素&#xff1a; 通过索引访问元素&#xff1a;使用方括号和索引来获取数组中指定位置的元素。 const array [1, 2, 3]; console.log(array[0]); // 输出: 1获取数组长度&#xff1a;使用 length 属性获取数组的长度。 const array [1, 2, 3]; console.log(array.…

flink on yarn 中的flink-conf.yaml参数

在 Flink on YARN 中,flink-conf.yaml 是 Flink 配置文件,用于配置 Flink 应用程序在 YARN 上的运行。通过修改 flink-conf.yaml 文件中的参数,你可以调整 Flink 集群的行为和性能。以下是一些常见的在 flink-conf.yaml 中设置的参数: yarn.application.name: 指定 Flink 应…

无需注册的ChatGPT来了,直接使用

火爆全网的 ChatGPT 已来 这次不用梯子&#xff0c;无需注册&#xff0c;国内入口 直接使用&#xff0c;极速体验&#xff01;&#xff01;&#xff01; 还可以直接绑定公众号对话 WOW! 未来已来&#xff0c;你不来试试? 大语言模型绝对是一场新的革命 ChatGPT 和千行百业…

ChatGPT大封号,注册功能关闭!亚洲成重灾区,网友喊话:不要登录,不要登录...

Datawhale干货 最新&#xff1a;GPT封号情况&#xff0c;来源&#xff1a;量子位 “不要登录ChatGPT&#xff01;” “暂时远离人工智能和ChatGPT概念板块高位股&#xff01;” 就在这两天&#xff0c;一些关于ChatGPT的疾呼突然在各种社交平台和群聊刷屏了。 而看到这些消息的…