Kafka 入门到起飞系列 - 生产者发送消息流程解析

news/2025/2/12 20:56:15/

在这里插入图片描述

  • 生产者通过producerRecord 对象封装消息主题、消息的value(内容)、timestamp(时间戳)等

  • 生产者通过send()方法发送消息,send()方法会经过如下几步
    1. 首先将消息交给拦截器(Interceptor)处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的效果,拦截器一般将一些通用的功能加进来,通常在消息发送前,producer回调逻辑前对消息做一些定制化需求,消息头部添加消息的属性等
    2. 接下来交给序列化器(Serializer),Key的序列化器和value的序列化器,对消息的key和value进行序列化,序列化为字节数组,
    3. 然后将序列化的结果交给分区器(Partitioner),分区器有3种策略来计算消息应该属于哪个分区,

    • 在producerRecord中直接指定分区,分区器会直接将消息放到指定分区

    • 如果没有指定分区器,但是消息有key,分区器会根据消息的key计算hash值,根据主题分区数量取模,来决定将消息放到哪个分区

    • 如果没有指定分区、也没有指定key,分区器会以轮询(Round Robin)的方式给消息分配分区

      在这里插入图片描述

  • 消息经过以上拦截器->序列化器->分区器 进行加工后,会将消息放到RecordAccumulator缓冲区,对每个分区都会有一个单独的缓冲区,经过分区器计算出分区号之后,不同的消息就会分配给不同的缓冲区,缓冲区里面消息也是有序的,我们可以指定对缓冲区里的消息进行分批次,也可以指定缓冲区大小

  • 在这里插入图片描述

  • 当缓冲区中消息达到条件会按批次发送到broker对应分区上

  • broker将接收到的消息进行刷盘持久化

  • 一个消息发出去之后,服务器(broker)会返回给producer响应,producer再来判断消息是否发送成功,

  • broker返回元数据信息 - > 落盘成功 ->生产者继续发送后面消息

  • broker返回元数据信息 - >落盘失败 - 生产者设置了重试次数 -> producer 会将消息重新放入缓冲区进行排队,等待再次发送,当一个消息发送失败重试需要重发,消息是放到缓冲区队尾,

  • 生产者去缓冲区重试发送


生产者在重试消息时,消息的顺序就错了,那怎么保证消息的有序性呢?

在这里插入图片描述

针对这种情况,可以做一个配置,
参数:max.in.flight.requests.per.connection表示producer 在收到broker响应之前可以发送多少批消息,默认5,
设置此值是1,表示broker在响应之前producer不能再向同一个broker发送请求,就是我确认一批你再发下一批,这样可以保证消息有序性,对消息顺序要求不高情况可以不考虑


补充:

  • Producer 创建时,会创建一个Sender线程(IO线程)设置为守护线程

  • Producer 创建时,会创建缓冲区

  • Producer 生产消息,内部是一个异步流程,Sender线程不断轮询RecordAccumulator,满足条件后进行真正的网络IO发送消息

  • 在这里插入图片描述

  • RecordAccumulator(缓冲区) 对每一个分区都有一个缓冲区

    • 每个分区的缓冲区中消息也是有序的
    • 可以指定缓冲区中的消息按批次发送
      • 缓冲区大小达到batch.size,默认16KB
      • 在缓冲区等待时间 lingger.ms 达到上限
      • 以上两个条件满足一个即发送一批
    • 可以指定整个缓冲区的大小

批次的概念很好理解,缓冲区就像一辆公交车,有两种发车方式,一是人满了就发车,一是等5分钟就发车,不管是人满了还是到5分钟了,发车,go~
分批发送可以减少网络IO,节省带宽使用,减少网络传输的压力,提升吞吐量

  • 一个批次消息发送后,通过网络,发往Kafka指定分区,然后刷盘到broker
  • 如果Producer设置了retries参数值>0,那么允许消息发送失败进行重试,重试机制由客户端Producer内部实现
  • Broker端消息落盘成功,会返回元数据给生产者
    • 通过阻塞直接返回 (同步发送)
    • 通过回调函数返回(异步发送)

http://www.ppmy.cn/news/961667.html

相关文章

ChatGPT应用开发框架库|Next.js 框架

Next.js 是一个基于 React 的轻量级框架,它可以帮助开发者快速构建 SSR(服务器端渲染)应用程序。Next.js 提供了一系列的开箱即用的特性,例如自动代码分割、静态文件服务、CSS 模块化、预渲染等,从而使得开发者可以更加…

chatGPT能让你成为高级前端开发人员助您轻松解决 JavaScript 常见问题,提升开发效率!

您是否曾经在 JavaScript 开发过程中遇到各种疑难问题?是否希望能够快速解决这些问题并提升开发效率?现在,您可以依靠我们的顶级工具 ChantGPT,成为解决 JavaScript 常见问题的超级英雄! ChantGPT 是一款基于先进人工…

Whisper与ChatGPT联手,轻松实现音频转录文本

目录 前言 一、Whisper简介 二、Whisper可用的模型和语言 三、开源 Whisper 本地转录 3.1、安装pytube库 3.2、下载音频MP4文件 3.3、安装 Whisper 库 四、在线 Whisper API 转录 4.1、Whisper API 接口调用 4.2、使用Prompt参数优化 4.3、其它参数介绍 4.4、转录过…

近期AI成为热点话题, ChatGPT, GPT4, new bing, Bard,AI 绘画, AI 编程工具引发大量讨论,结合自身经历,聊聊本人对 AI 技术以及其今后发展的看法。

文章目录 前言 一、你人生中第一次接触到“人工智能”的概念和产品是什么?什么让你觉得“人类做的东西的确有智能”? 二、描述你在学习中碰到的最高级的 AI 是什么? 三、你听说过最近的GPT,new bing,bard&#xff…

ChatGPT横空出世,让人们看到了AI的更大创造力,聚光灯再度打到了OpenAI的身上

预告了一整年的GPT-4迟迟没来,人们猜想OpenAI是不是要跳票了,更何况他们之前的得意之作DALL-E也被开源Stable Diffusion打了个措手不及,再不来点深水炸弹业界地位危矣。 不过,就在大家以为今年OpenAI将以沉寂收场时,聊…

超详细!python小白用langchain+chatgpt搭建自己业务聊天机器人

背景 随着chatgpt的爆火,各种开源大模型以及聊天机器人开始涌现。最近公司也想训练一个具有公司业务特色的聊天机器人,类似一个客服的角色。本人是一个java开发,对python也不是很懂,顺便把这几天的摸索的心得记录下来&#xff0c…

从MVC跨越到DDD微服务架构是如何演进的

微服务架构演进 领域模型中对象的层次从内到外依次是:值对象、实体、聚合和限界上下文。 实体或值对象的简单变更,一般不会让领域模型和微服务发生大变。但聚合的重组或拆分却可以。因为聚合内业务功能内聚,能独立完成特定业务。那聚合的重组…

gurobi安装vs配置gurobi

gurobi安装&vs配置gurobi 1、注册账号并登录 2、下载gurobi optimizer 3、获取license:User Portal (gurobi.com) online course可以免ip验证。 4、GENERATE NOW会生成,打开cmd进入gurobi安装路径(如F:\gurobi1001\win64\bin>)&am…