kafka 生产者拦截器

ops/2024/9/25 1:16:17/

生产者拦截器

kafka 消息发送到Broker 之前大概需要经过 生产者拦截器 、序列化器分区器等一系列处理。本文主要介绍生产者拦截器

生产者拦截器可以在消息发送之前对消息进行拦截。它可以改变消息内容,包括key , value ,topic 等任何信息

通常不推荐修改key , topic , 我们可以给消息添加一些额外信息,比如版本号,过滤一些"非法"消息等。

图片

拦截器接口介绍

public interface ProducerInterceptor<K, V> extends Configurable {ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);void onAcknowledgement(RecordMetadata var1, Exception var2);void close();
}

生产者拦截器接口包含三个方法

  • onSend 方法可以对消息进行定制化修改

  • onAcknowledgement 在消息被应答或者发送失败时执行

    该方法在IO线程执行,所以不要执行一些耗时操作,会影响消息投递速度

  • close 用于执行一些资源释放的工作

自定义生产者拦截器Demo

该生产者拦截器给消息添加个包含版本号信息的Header ,配置生产者

ProducerConfig.INTERCEPTOR_CLASSES_CONFIG 属性使其生效

    private Map<String, Object> produceConfigs() {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);configMap.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class);configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(CustomerProduceInterector.class));return configMap;}
public class CustomerProduceInterector implements ProducerInterceptor {@Overridepublic ProducerRecord onSend(ProducerRecord producerRecord) {producerRecord.headers().add(new RecordHeader("version","v1".getBytes()));return producerRecord;}@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {System.out.println("onAcknowledgement  ....");}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

结语

生产者拦截器还是比较简单,相应的还有消费者拦截器,我们平常业务也未必用到,殊途同归,在一些RPC框架比如dubbo服务提供方和消费端都有类似的拦截器,可以做一些链路追踪等等。

以上就是我个人的理解  喜欢的一起关注交流学习


http://www.ppmy.cn/ops/115548.html

相关文章

弹性负载均衡ELB 详解和设置方法

一、弹性负载均衡ELB 详解 1. 定义与概念 弹性负载均衡&#xff08;Elastic Load Balancing&#xff0c;简称ELB&#xff09;是一种将访问流量自动分发到多台云服务器的流量分发控制服务。它通过在多个后端服务器之间均衡分配请求&#xff0c;提高应用程序的可用性、可扩展性…

【Linux】环境部署kafka集群

目录 一、kafka简介 1. 主要特点 2.组件介绍 3.消息中间件的对比 二、环境准备 1.Java环境 2.Zookeeper环境 3.硬件环境集群 三、Zookeeper的集群部署 1.下载zookeeper 2.部署zookeeper集群 &#xff08;1&#xff09;node1节点服务器 &#xff08;2&#xff09;no…

如何查看Android设备的dpi

adb shell getprop ro.sf.lcd_density adb shell cat /system/build.prop > build_prop.txt shell cat system/build.prop 结果&#xff1a;参考&#xff1a; 如何查看Android设备的dpi_安卓 查看手机dpi-CSDN博客

HarmonyOS鸿蒙开发实战(5.0)网格元素拖动交换案例实践

鸿蒙HarmonyOS开发实战往期必看文章&#xff1a; HarmonyOS NEXT应用开发性能实践总结 一分钟了解”纯血版&#xff01;鸿蒙HarmonyOS Next应用开发&#xff01; 最新版&#xff01;“非常详细的” 鸿蒙HarmonyOS Next应用开发学习路线&#xff01;&#xff08;从零基础入门…

828华为云征文 | 华为云X实例的镜像管理详解

前言 随着云计算的不断普及&#xff0c;云服务器成为企业和开发者日常工作中的重要工具。为了提升工作效率和降低运维成本&#xff0c;云服务器镜像的管理尤为重要。镜像作为服务器或磁盘的模板&#xff0c;预装了操作系统、软件及配置&#xff0c;是快速部署和迁移业务的重要…

Ruby-SAML CVE-2024-45409 漏洞解决方案

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署极狐GitLab。 学习极狐GitLab 的相关资料&#xff1a; 极狐GitLab 官网极狐…

ROM和RAM的区别

ROM&#xff08;Read-Only Memory&#xff0c;只读存储器&#xff09;和RAM&#xff08;Random Access Memory&#xff0c;随机存取存储器&#xff09;是计算机系统中两种不同类型的存储技术&#xff0c;它们在功能、用途和特性上有显著的区别&#xff1a; 1. 存储数据的持久性…

Real-Time Linux 合并到内核主线

在长达 20 年之后&#xff0c;Real-Time Linux(PREEMPT_RT)合并到内核主线。 从 Linux 6.12 开始&#xff0c;所有发行版都将包含实时 Linux 代码。 这意味着 Linux 将开始运行在更多任务关键设备和工业硬件上。实时操作系统对时间限制非常严格&#xff0c;需要确保关键任务在…