kafka单条消息过大发送失败

news/2025/1/16 7:47:00/

一、背景

生产环境中使用kafka作为消息队列,生产者发送消息失败,查询报错日志,得到如下输出:

Caused by: org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 4067035 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

这个错误信息表明生产者尝试发送到Kafka的消息在序列化后大小为4067035字节(约4MB),超过了Kafka配置中的max.request.size参数的限制,该参数当前设置为1048576字节(1MB)

二、原因

排查kafkamax.request.size

排查发现最大的请求大小为10485760字节(10M)

排查springboot配置文件中max.request.size的值

项目中没有配置这个值

排查springboot默认kafka的max.request.size值

springboot中Kafka消息队列的默认max.request.size值通常是1048576字节,即1MB

好了就是这个原因了

三、修改方案

要解决这个问题,通常有以下几种选择:

1. 增加kafka本身的message.max.bytes配置

message.max.bytes=10485760 # 10MB

当然也可以为特定的主题设置这个值:

kafka-topics.sh --alter --zookeeper localhost:2181 --topic your_topic --config max.message.bytes=10485760 

2. 增加生产者max.request.size配置

如果是代码则配置如下

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("max.request.size", 10485760 ); // 10MB

如果是yml 文件则配置如下 

spring:kafka:producer:properties:max.request.size: 10485760 

3. 增加消费者的fetch.max.bytesmax.partition.fetch.bytes配置

如果是代码则配置如下

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("fetch.max.bytes", 10485760); // 10MB
props.put("max.partition.fetch.bytes", 10485760); // 10MB

如果是yml 文件则配置如下

spring:kafka:consumer:properties:fetch.max.bytes: 10485760max.partition.fetch.bytes: 10485760

4. 分割消息

如果消息非常大,可能需要考虑将其分割成较小的部分,然后在消费者端重新组装。这种方法可以避免配置过大的消息大小限制。

5.生产者压缩消息

你可以启用消息压缩来减少消息的大小。Kafka支持多种压缩算法,如gzip、snappy、lz4等。我们可以在生产者配置中启用压缩:

 如果是代码则配置如下:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "gzip");

如果是yml 文件则配置如下

spring:kafka:producer:properties:compression.type: gzip


http://www.ppmy.cn/news/1520638.html

相关文章

JSON格式化?超简单!

JSON格式化 是指将JSON(JavaScript Object Notation)数据转换成一种更易读、结构化的格式。 在以下场景中,可能需要进行JSON格式化: 调试和开发:在开发过程中,开发人员经常需要查看和解析JSON数据。通过…

基于深度学习的遥感图像分类识别系统,使用PyTorch框架实现

取5个场景 [海滩, 灌木丛, 沙漠, 森林, 草地] 划分数据集 train:val:test 7:2:1 环境依赖 pytorch1.1 or 1.0 tensorboard1.8 tensorboardX pillow 注意调低batch_size参数特别是像我这样的渣渣显卡 使用方法 只需要指…

如何快速采集淘宝商品数据?

无论是谁,如果单凭人工的方式去收集淘宝、天猫等平台的商品数据信息,工作量是巨大的,如果借助有采集软件的第三方公司操作,则可实现对大数据的轻松掌握,但是外包给第三方公司需要支付一定的费用,包含技术费…

Canvas 在 微信小程序-uni-APP 和 H5 中的使用差异

Canvas 是一个强大的绘图工具,无论是在 Web 开发还是跨平台应用开发中都有广泛应用。然而,在 uni-APP 和传统 H5 环境中使用 Canvas 时,存在一些重要的差异。本文将深入探讨这些差异,帮助开发者在不同平台上更好地使用 Canvas。 1. API 差异 H5 环境 在 H5 环境中,我们使用标…

十四、低空安全综合管理服务平台建设方案

1、系统背景 由于低空安全研究关系到国家战略安全保障,世界各国相继开展了相关法律法规建设,以及一系列的理论与技术保障研究。为了保障低空空域安全,需推动建立各省级安全管理平台,做好与企业级监控服务平台的管理衔接和数据共享,强化本区域内民用无人机的安全监管工作;…

vue大屏可视化:4k带鱼屏、4k、2k、1920*1080、笔记本 全适配方案

本方案采用的是媒体查询的方法来实现的 css媒体查询书写(可按照自己需求新增)不同尺寸下显示不同的文字大小图片大小等: // 4K 带鱼屏 media screen and (max-width: 3840px) and (max-height: 1080px) {} // 4K media screen and (max-widt…

计算机网络(八股文)

这里写目录标题 计算机网络一、网络分层模型1. TCP/IP四层架构和OSI七层架构⭐️⭐️⭐️⭐️⭐️2. 为什么网络要分层?⭐️⭐️⭐️3. 各层都有那些协议?⭐️⭐️⭐️⭐️ 二、HTTP【重要】1. http状态码?⭐️⭐️⭐️2. 从输入URL到页面展示…

基于 web教学管理系统设计与实现

3 总体设计 3.1 系统软件体系结构 系统采用B/S结构,统一管理数据库和Web服务器。在这种结构下,用户界面完全通过WWW浏览器实现,一部分事务逻辑在前端实现,但是主要事务逻辑在服务器端实现,形成所谓3-tier结构,第一…