⚙️ 如何调整重试策略以适应不同的业务需求?

embedded/2024/11/15 0:31:05/

调整 Kafka 生产者和消费者的重试策略以适应不同的业务需求,需要根据业务的特性和容错要求来进行细致的配置。以下是一些关键的调整策略:

  1. 业务重要性

    • 对于关键业务消息,可以增加重试次数,并设置较长的重试间隔,以减少消息丢失的风险。
    • 对于非关键业务消息,可以减少重试次数或不进行重试,以避免不必要的资源消耗。
  2. 消息幂等性

    • 如果业务逻辑是幂等的,即多次处理相同消息不会导致业务状态不一致,可以增加重试次数。
    • 如果业务逻辑不是幂等的,需要谨慎设置重试策略,或者实现去重逻辑。
  3. 消息时效性

    • 对于时效性要求高的消息,可以减少重试间隔,以便快速尝试重新发送。
    • 对于时效性要求不高的消息,可以增加重试间隔,减少对 Kafka 集群的压力。
  4. 系统容量和负载

    • 根据 Kafka 集群和下游系统的容量和负载情况调整重试策略,避免因重试导致的额外负载影响系统稳定性。
  5. 错误类型

    • 对于临时性错误(如网络问题),可以设置较高的重试次数和较短的重试间隔。
    • 对于永久性错误(如消息格式错误),应减少重试次数,避免无意义的重试。
  6. 死信队列(DLQ)

    • 对于重试次数用尽后仍然发送失败的消息,可以配置死信队列进行存储,以便后续分析和处理。
  7. 监控和告警

    • 实施实时监控,对重试次数、失败率等关键指标进行监控,并设置告警阈值。
  8. 业务流程控制

    • 在业务流程中实现重试逻辑,例如在业务层捕获异常并根据业务规则进行重试。
  9. 自定义重试策略

    • 实现自定义的重试策略,例如指数退避策略,以适应特定的业务场景。
  10. 事务性消息

    • 如果业务要求消息发送的原子性,可以启用事务性消息发送,确保消息要么全部发送成功,要么全部不发送。
  11. 资源限制

    • 考虑到生产者和消费者的资源限制,如内存和网络带宽,合理设置重试策略,避免资源耗尽。
  12. 反馈机制

    • 建立反馈机制,根据业务运行情况和系统性能反馈调整重试策略。

通过综合考虑上述因素,可以为不同的业务需求定制合适的重试策略,以确保 Kafka 消息系统的高效性和可靠性。

在这里插入图片描述

以下是一些代码案例,展示了如何根据不同的业务需求调整 Kafka 生产者和消费者的重试策略

在这里插入图片描述

Kafka 生产者重试策略案例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomRetryProducerDemo {public static void main(String[] args) {// 配置生产者属性Properties props = new Properties();props.put("bootstrap.servers", "4.5.8.4:9092");props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());props.put("retries", 5); // 设置重试次数props.put("retry.backoff.ms", 1000); // 设置重试间隔为1秒props.put("buffer.memory", 33554432); // 设置缓冲区大小props.put("batch.size", 16384); // 设置批次大小props.put("linger.ms", 10); // 设置等待时间为10毫秒props.put("max.in.flight.requests.per.connection", 1); // 设置最大在途请求数// 创建生产者实例Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 100; i++) {String key = "key-" + i;String value = "value-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);producer.send(record, (metadata, exception) -> {if (exception != null) {// 处理消息发送失败的情况System.err.println("发送消息失败:" + exception.getMessage());} else {// 处理消息发送成功的情况System.out.println("消息发送成功,偏移量:" + metadata.offset());}});}// 关闭生产者producer.close();}
}

Kafka 消费者重试策略案例

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class CustomRetryConsumerDemo {public static void main(String[] args) {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "4.5.8.4:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", StringDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());props.put("max.poll.records", 500); // 设置每次拉取的最大记录数props.put("fetch.min.bytes", 1024); // 设置最小获取1KB的数据props.put("fetch.max.wait.ms", 500); // 设置最大等待500ms// 创建消费者实例Consumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {try {// 处理消息System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());// 假设处理消息可能会失败if (record.value().contains("error")) {throw new RuntimeException("模拟处理消息失败");}} catch (Exception e) {// 处理消息失败,记录日志或重试System.err.println("处理消息失败:" + e.getMessage());// 可以在这里实现重试逻辑,例如将消息发送到死信队列}}// 批量提交偏移量consumer.commitSync();}}
}

死信队列(DLQ)案例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class DLQProducerDemo {public static void main(String[] args) {// 配置生产者属性Properties props = new Properties();props.put("bootstrap.servers", "4.5.8.4:9092");props.put("key.serializer", StringSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());props.put("retries", 5); // 设置重试次数props.put("retry.backoff.ms", 1000); // 设置重试间隔为1秒// 创建生产者实例Producer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 100; i++) {String key = "key-" + i;String value = "value-" + i;ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", key, value);producer.send(record, (metadata, exception) -> {if (exception != null) {// 处理消息发送失败的情况System.err.println("发送消息失败:" + exception.getMessage());// 将失败的消息发送到死信队列ProducerRecord<String, String> dlqRecord = new ProducerRecord<>("test-topic-DLQ", key, exception.getMessage());producer.send(dlqRecord);} else {// 处理消息发送成功的情况System.out.println("消息发送成功,偏移量:" + metadata.offset());}});}// 关闭生产者producer.close();}
}

这些代码案例展示了如何根据不同的业务需求调整 Kafka
生产者和消费者的重试策略,包括设置重试次数、重试间隔、处理消息发送失败的情况以及实现死信队列(DLQ)。希望这些示例能帮助您更好地理解和应用
Kafka 的重试机制。


http://www.ppmy.cn/embedded/137629.html

相关文章

鸿蒙进阶篇-type、typeof、类

“在科技的浪潮中&#xff0c;鸿蒙操作系统宛如一颗璀璨的新星&#xff0c;引领着创新的方向。作为鸿蒙开天组&#xff0c;今天我们将一同踏上鸿蒙基础的探索之旅&#xff0c;为您揭开这一神奇系统的神秘面纱。” 各位小伙伴们我们又见面了,我就是鸿蒙开天组,下面让我们进入今…

【STM32外设系列】NRF24L01无线收发模块

NRF24L01是一款基于2.4GHz ISM频段的无线收发模块&#xff0c;它采用nRF24L01芯片&#xff0c;支持多通道通信和自动重发机制&#xff0c;非常适合用于短距离无线通信。本文将详细介绍如何使用STM32微控制器驱动NRF24L01模块&#xff0c;并通过代码实现无线数据的收发。 1. NR…

mysql 配置文件 my.cnf 增加 lower_case_table_names = 1 服务启动不了的原因

原因&#xff1a;在MySQL8.0之后的版本&#xff0c;只允许在数据库初始化时指定&#xff0c;之后不允许修改了 mysql 配置文件 my.cnf 增加 lower_case_table_names 1 服务启动不了 报错信息&#xff1a;Job for mysqld.service failed because the control process exited …

Window下PHP安装最新sg11(php5.3-php8.3)

链接: https://pan.baidu.com/s/10yyqTJdwH_oQJnQtWcwIeA 提取码: qz8y 复制这段内容后打开百度网盘手机App&#xff0c;操作更方便哦 (链接失效联系L88467872) 1.下载后解压文件&#xff0c;将对应版本的ixed.xx.win文件放进php对应的ext目录下&#xff0c;如图所示 2.修改ph…

【2024软考架构案例题】你知道 Es 的几种分词器吗?Standard、Simple、WhiteSpace、Keyword 四种分词器你知道吗?

&#x1f449;博主介绍&#xff1a; 博主从事应用安全和大数据领域&#xff0c;有8年研发经验&#xff0c;5年面试官经验&#xff0c;Java技术专家&#xff0c;WEB架构师&#xff0c;阿里云专家博主&#xff0c;华为云云享专家&#xff0c;51CTO 专家博主 ⛪️ 个人社区&#x…

记一次工作中订单幂等处理方案

1.背景 有个项目会将本公司的下单接口通过Open API暴露到外网供第三方使用&#xff0c;最近收到商务反馈第三方公司&#xff0c;在重试的过程中多次使用同一个第三方订单号下单&#xff0c;导致我们业务系统出现了重复订单。我们系统并没有做幂等校验&#xff0c;数据库也没有对…

Labelme标注数据的一些操作

文章目录 1、删除没有标注的图片2、对图片重新命名3、对标注后的数据重新命名4、两个Labelme数据文件夹合并5、其他格式的图片转jpg6、Labelme的json文件中加入imagedata7、将json中的imagedata转为图像8、修改标注类别的名字 1、删除没有标注的图片 有些图片没有标注&#xf…

HCIP-快速生成树RSTP

一、RSTP是什么 STP&#xff08;Spanning Tree Protocol &#xff09;是生成树协议的英文缩写。该协议可应用于环路网络&#xff0c;通过一定的算法实现路径冗余&#xff0c;同时将环路网络修剪成无环路的树型网络&#xff0c;从而避免报文在环路网络中的增生和无限循环。 RS…