【kafka-03】springboot整合kafka以及核心参数详解

devtools/2024/9/22 15:36:10/

Kafka系列整体栏目


内容链接地址
【一】afka安装和基本核心概念https://zhenghuisheng.blog.csdn.net/article/details/142213307
【二】kafka集群搭建https://zhenghuisheng.blog.csdn.net/article/details/142253288
【三】springboot整合kafka以及核心参数详解https://zhenghuisheng.blog.csdn.net/article/details/142346016

springboot整合kafka以及核心参数详解

  • 一,springboot整合kafka以及核心参数详解
    • 1,springboot整合kafka
    • 2,kafka核心参数的讲解
      • 2.1,生产者端的参数核心讲解
        • 2.1.1,生产者端的ack机制
        • 2.2.2,重试次数retries
        • 2.2.3,发送缓冲区buffer-memory
        • 2.2.4,批量拉取batch-size
        • 2.2.5,序列化
      • 2.2,消费者端参数核心讲解
        • 2.2.1,enable-auto-commit自动提交偏移量
        • 2.2.2,auto-commit-interval自动提交偏移量时间间隔
        • 2.2.3,ack-mode手动提交偏移量
        • 2.2.4,max-poll-records 消费者拉取数据
        • 2.2.5,heartbeat-interval心跳维护
        • 2.2.6,auto-offset-reset

kafka_10">一,springboot整合kafka以及核心参数详解

前面两篇主要讲解了kafka的安装启动,以及kafka的集群的搭建,接下来这篇主要讲解springboot如何整合kafka,以及在kafka中的核心参数需要如何设置,以及设置的意义是什么

kafka_14">1,springboot整合kafka

首先需要确定kafka版本,这里选择2.8.10的版本

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.10</version>
</dependency>

接下来在yml配置文件中设置对应的参数,首先生产者和消费者都需要定义序列化,然后生产者设置默认组,发送到broker的消息确认机制等

spring:kafka:#bootstrap-servers: 175.178.75.153:9092,175.178.75.153:9093,175.178.75.153:9094bootstrap-servers: 175.178.75.153:9092producer: # 生产者retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送batch-size: 16384buffer-memory: 33554432acks: 1# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交# TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交# COUNT# TIME | COUNT 有一个条件满足时提交# COUNT_TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交# MANUAL# 手动调用Acknowledgment.acknowledge()后立即提交# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATE

接下来就直接编写一个controller,模拟生产者往broker中发送消息,如直接在原先的9092端口建立的 zhstest111 的主题上面发送消息,其代码如下

@RestController
@RequestMapping("/test")
public class KafkaController {private static final String TOPIC = "zhstest11";@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;//发送数据@GetMapping("/send")public AjaxResult sendMessage() {String uuid = UUID.randomUUID().toString();kafkaTemplate.send(TOPIC, "hello kafka" + uuid);return AjaxResult.success("数据发送成功");}
}

消费者的消费如下,定义一个component配置类,然后通过 KafkaListener 监听对应的topic,有数据就消费

@Component
@Slf4j
public class KafkaConfig {private static final String TOPIC = "zhstest11";//接收数据@KafkaListener(topics = TOPIC, groupId = "my-group")public void consume(String message) {log.info("接收到的数据为: " + message);}
}

kafka_104">2,kafka核心参数的讲解

2.1,生产者端的参数核心讲解

2.1.1,生产者端的ack机制

在生产者往broker中投递消息时,为了保证消息的可靠送达以及持久化机制,需要通过这个ack机制来接收到broker的应答

ack:1
  • 当ack=0时:性能最高,消息直接异步给完broker就行,不需要broker任何答复,缺点就是容易丢消息
  • 当ack=1时,性能其次,需要leader结点将数据成功写入到本地日志,但是不需要等待集群中的follower写入,如果出现leader挂掉,但是follower未及时同步,那么在follower变成leader之后,就会丢失这部分消息
  • 当ack=-1时,性能最低,但是安全,生产者端需要等待broker集群中的leader和副本都成功写入日志

ack默认设置为1,允许在极端的情况下丢失部分消息。如果是为了记录海量的日志,那么可以将ack设置为0,如果是需要相对安全的,如金融领域不能丢失订单数据等,那么就设置成-1

2.2.2,重试次数retries
retries: 3

在生产者往broker投递消息时,当消息投递失败时,那么就可以设置重试,根据设置的值决定重试的次数。当然也有可能因为网络抖动的问题导致消息在响应时比较慢,生产者由于没接收到响应,但是消息时投递成功到broker的,那么可能就会投递两条,那么就可能会导致重复消费的问题,因此后期需要设置消费的幂等性的问题等。

2.2.3,发送缓冲区buffer-memory
buffer-memory: 33554432

消息发送到broker时,为了提升消息发送的效率,kafka内部将单条发送做了优化,将单条发送改成批量发送,因此设置了发送缓冲区,默认是32M大小。

2.2.4,批量拉取batch-size
batch-size: 16384

上面讲解在生产者端会有一个发送缓冲区,数据会先存储到这个发送缓冲区中,当然数据还是需要投放到broker机器上,因此需要这只这个batch-size批量的将数据从这个缓冲区中拉取到broker上面。当拉取的数据满了16kb之后,立马触发将数据投递到broker的上面。

当然也不是说只有满16kb才能去拉取数据投放到broker中,比如只有1kb数据,后台会默认多少时间去投递一次,如间隔10ms投递一次,从而保证消息投递的高可用

2.2.5,序列化

在网络传输中,需要将数据或者实体序列化成0,1这种二进制文件数据通过网络传输,那么操作系统就可以识别这种数据,因此在发送端中需要设置好响应的序列化器和反序列化器,这样才能解析服务端发送的数据

key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

2.2,消费者端参数核心讲解

2.2.1,enable-auto-commit自动提交偏移量
enable-auto-commit: false

在前面两篇文章中讲解过,不管是单体的消费者还是消费者组,当有消息被消费后都会默认的去增加partition的偏移量。这个参数默认会设置为true,默认时会增加消费者的偏移量的,如果设置成false,那么就不会每次的去修改partition的偏移量,那么消费者每次消费就相当于从头开始消费,有点类似于 –from-beginning命令了

bin/kafka-console-consumer.sh --topic zhstest11 --from-beginning --bootstrap-server localhost:9092
2.2.2,auto-commit-interval自动提交偏移量时间间隔
auto-commit-interval:5000

上面提到了自动提交偏移量这个参数,当这个参数设置成true时,那么每个消费者的偏移量都得上报到每个topic主题中,类似于kafka内部会做一个记录,记录每一个消费者记录到什么地方,哪一个off_set偏移量。当消费者重启时或者出现故障之后,都可以重正确的地方开始消费。而这个参数,就是为了将每个消费者消费了多少偏移量进行上报的功能,默认情况就是每隔5s上报一次。

自动提交虽然方便,但是假设说这5s的数据还没来的及上报成功,服务器宕机了,那么消费者可能就会丢失这5s的消费记录,在topic中找不到,因此就会导致重复消费的问题。因此在实际开发中,更加的的倾向于使用手动提交偏移量,因此上面的这个 enable-auto-commit 参数最好还是设置成false,这也是保证高可用的一种方式

2.2.3,ack-mode手动提交偏移量
#指定手动提交确认模式,使用 Acknowledgment 对象来手动确认消费
ack-mode: manual

上面说了自动提交对数据可能会有不安全性,因此更加的推荐使用手动提交,因此在消费者参数配置这个ackmode的value值为manual,那么就可以直接使用这个 Acknowledgment 对象来手动确认消费

如下面这段代码,增加了 Acknowledgment 对象,直接通过调用 acknowledge 实现手动的提交偏移量

@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {try {// 消费消息的逻辑log.info("Received message: " + record.value());// 手动提交偏移量acknowledgment.acknowledge();} catch (Exception e) {// 异常处理逻辑log.info("Error processing message: " + e.getMessage());}
}
2.2.4,max-poll-records 消费者拉取数据
max-poll-records:500

消费者在消费broker的数据时,也会设置默认的拉取数量,默认最多是500条。当然可以根据消费者消费的情况做一个适配和调整,消费过快的话可以调大这个参数,消费过慢的话可以调小这个参数

2.2.5,heartbeat-interval心跳维护
heartbeat-interval:1000

用于维护kafka和消费者之间的心跳问题,默认是1s,如果在指定时间内消费者没有往kafka发送心跳,那么kafka集群的协调器就会认为这个消费者已经失效。此时partition无消费者消费,那么就会触发一个消费的平衡机制,将该分区分配给其他消费者或者其他消费者组

2.2.6,auto-offset-reset
auto-offset-reset: earliest

这个参数比较有意思,和kafka的特性有关,假设有一个group1组,先消费了order订单主题的消息,此时offset的偏移量记录为1000,现在突然新增了一个group2,那么这个group2默认时不能消费到group1消费到的消息的,即使是两个不同的组,因为在默认情况下,新的组会从主题已有的offset的偏移量继续往下消费,就是说启动后能消费到后面生产者所发送的消息

因为在kafka内部,这个 auto-offset-reset 参数默认设置的是 latest,就是说只消费自己启动之后生产者发送到broker的消息,因此为了让新的group组也消费前面的消息,可以设置这个值为earliest


http://www.ppmy.cn/devtools/115526.html

相关文章

MySQL版本问题无法使用 group by xxx

mysql命令gruop by报错this is incompatible with sql_modeonly_full_group_by 在mysql 工具 搜索或者插入数据时报下面错误&#xff1a; ERROR 1055 (42000): Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column database_tl.emp.i…

CSS从入门到精通(已完结)

关注作者微信公众号&#xff0c;开启探索更多 CSS 知识的精彩之旅。在这里&#xff0c;你将收获丰富的 CSS 专业内容&#xff0c;深入了解这一网页开发语言的奥秘&#xff0c;不断拓展你的知识边界&#xff0c;提升技能水平。快来关注吧&#xff01; 微信公众号专栏地址&#x…

C++两点成一线

目录 开头程序程序的流程图程序执行的效果下一篇博客要说的东西 开头 大家好&#xff0c;我叫这是我58。 程序 #include <iostream> #include <cstring> #include <Windows.h> #define PANADD(A,B) ((A) < (B) ? 1 : -1) using namespace std; void p…

IS-ISv4/6双栈

文章目录 IS-ISv4/6双栈实验要求配置 IS-ISv4/6双栈 实验要求 配置双栈 R1、2、3、4配置 IS-ISv4 和 IS-ISv6&#xff0c;配置IPv6多拓扑 上面为Level-1类型、中间为Level-1-2、下面是Level-2类型 还有就是说ATT位置1有一定要求连接L1/2连接L1或者L2类型路由器&#xff0c;至…

how can I train a OpenAI fine tuned model with more prompts

题意&#xff1a;我如何使用更多提示来训练一个 OpenAI 微调模型&#xff1f; 问题背景&#xff1a; I fine-tuned OpenAI model with some prompts following this documentation it succeeded and created a new model in the playground. How I can retrain (fine-tune) th…

无人机之激光避障篇

无人机的激光避障技术是通过激光传感器来感知和避开周围障碍物的一种高级技术。以下是关于无人机激光避障技术的详细解析&#xff1a; 一、技术原理 激光避障技术利用激光束的直线传播和反射特性&#xff0c;通过发送激光束并接收反射回来的信号&#xff0c;来检测和计算周围障…

7--SpringBoot-后端开发、原理详解(面试高频提问点)

目录 SpringBoot原理 起步依赖 自动配置 配置优先级 Bean设置 获取Bean 第三方Bean SpringBoot原理 内容偏向于底层的原理分析 基于Spring框架进行项目的开发有两个不足的地方&#xff1a; 在pom.xml中依赖配置比较繁琐&#xff0c;在项目开发时&#xff0c;需要自己去找…

鸿蒙Harmony-Next 实现渐变跑马灯效果

最近在搞鸿蒙开发&#xff0c;有个效果是要实现文字跑马灯效果&#xff0c;便记录做了一个分享 实现步骤 1. 创建组件 首先我们创建一个自定义组件 MarqueeGradientTextView&#xff0c;并在其中定义需要的参数如文本内容、字体大小、字体颜色、渐变角度、渐变开始颜色和结束…