kafka 配置自定义序列化方式

news/2024/9/19 19:18:11/ 标签: kafka

序列化

图片

  • kafka 需要将消息内容序列化(Serializer)成字节数组才能发送到 Broken节点

  • 消费者需要将字节数组反序列化(Deserializer)为消息内容,然后消费消息。接口定义如下

public interface Serializer<T> extends Closeable {default void configure(Map<String, ?> configs, boolean isKey) {}byte[] serialize(String var1, T var2);default byte[] serialize(String topic, Headers headers, T data) {return this.serialize(topic, data);}default void close() {}
}
public interface Deserializer<T> extends Closeable {default void configure(Map<String, ?> configs, boolean isKey) {}T deserialize(String var1, byte[] var2);default T deserialize(String topic, Headers headers, byte[] data) {return this.deserialize(topic, data);}default void close() {}
}

常用序列化器

kafka 内置了许多实现如 StringSerializer、IntegerSerializer、DoubleSerializer 等。

生产者与消费者 选择的序列化与反序列化要匹配才能正常解析

自定义序列化器

自定义序列化器 只需要实现对应的 Serializer,UserDeserializer

比如有一个 User 类

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {private Long id;private String userName;
}

UserSerializer

public class UserSerializer implements Serializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic byte[] serialize(String s, User user) {return JSON.toJSONBytes(user);}@Overridepublic void close() {}
}
public class UserDeserializer implements Deserializer<User> {@Overridepublic User deserialize(String s, byte[] bytes) {return JSON.parseObject(bytes,User.class);}
}

生产者与消费者配置

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);

上面我们配置了 User 相关的序列化,但是实际上项目发送的消息内容肯定不止 User 这一种,我们需要针对不同类型指定不同的序列化方式

DelegatingByTypeSerializer

从 2.8 开始新增了DelegatingByTypeSerializer完美解决了上面问题用法如下,

    @Beanpublic ProducerFactory producerFactory() {Map<Class<?>, Serializer> delegates = new HashMap<>();delegates.put(byte[].class, new ByteArraySerializer());delegates.put(Bytes.class, new BytesSerializer());delegates.put(String.class, new StringSerializer());delegates.put(User.class, new UserSerializer());return new DefaultKafkaProducerFactory<>(produceConfigs(),new StringSerializer(), new DelegatingByTypeSerializer(delegates));}

消费者序列化可以在   @KafkaListener 指定

完整配置

确保kafka-clients版本在2.8以上

kafka 配置类

@Configuration
@EnableKafka
public class KafkaConfig {private Map<String, Object> produceConfigs() {Map<String, Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:9092");configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);return configMap;}@Beanpublic ProducerFactory producerFactory() {Map<Class<?>, Serializer> delegates = new HashMap<>();delegates.put(byte[].class, new ByteArraySerializer());delegates.put(Bytes.class, new BytesSerializer());delegates.put(String.class, new StringSerializer());delegates.put(User.class, new UserSerializer());return new DefaultKafkaProducerFactory<>(produceConfigs(),new StringSerializer(), new DelegatingByTypeSerializer(delegates));}@Beanpublic KafkaTemplate kafkaTemplate() {return new KafkaTemplate(producerFactory());}private Map<String, Object> consumerConfigs() {Map<String, Object> configMap = new HashMap<>();configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:9092");configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,IntegerDeserializer.class);configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "groupTest");return configMap;}}

消费者  @KafkaListener 注解可以指定序列化器

 @KafkaListener(topics = "test")public void processMessage(final ConsumerRecord<String,User> record) {System.out.println("processMessage:" + JSON.toJSONString(record.value()));}@KafkaListener(topics = "topic2",properties = "value-deserializer:org.apache.kafka.common.serialization.StringDeserializer")public void processMessage(String content) {System.out.println("processMessage:" + content);}

发送消息代码

for (int i = 0; i < 100; i++) {messageProduce.sendMessage(new User(Long.valueOf(i), "zhangsan" + i));messageProduce.sendMessage2("hello world" + i);
}

结果验证

图片

消息被正常消费

总结

        本文主要介绍了自定义序列化方式,以及为不同类型指定不同的序列化实现方式。

        由于篇幅有限,文中只包含核心配置和重要代码,部分代码未贴出,可留言交流学习。


http://www.ppmy.cn/news/1527733.html

相关文章

【百日算法计划】:每日一题,见证成长(013)

题目 回文链表 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 输入&#xff1a;head [1,2,2,1] 输出&#xff1a;true 思路 找到中间节点反转后半部分链表前后链表顺序比…

linux驱动开发-自旋锁

自旋锁自旋锁的特点工作原理适用场景优点缺点API实现注意事项调试和分析最佳实践自旋锁在中断上下文中的使用 使用自旋锁的最佳做法 自旋锁 自旋锁是一种轻量级的锁机制&#xff0c;用于保护共享资源&#xff0c;它是多线程或多核环境中实现并发访问控制的一种方式。 自旋锁通…

TS - tsconfig.json 和 tsconfig.node.json 的关系,如何在TS 中使用 JS 不报错

目录 1&#xff0c;前言2&#xff0c;二者关系2.1&#xff0c;使用 3&#xff0c;遇到的问题3.1&#xff0c;TS 中使用 JS 1&#xff0c;前言 通过 Vite 创建的 Vue3 TS 项目&#xff0c;根目录下会有 tsconfig.json 和 tsconfig.node.json 文件&#xff0c;并且存在引用关系…

leetcode:字符串中的第一个唯一字符

#include <unordered_map> class Solution { public:int firstUniqChar(string s) {unordered_map<char, int> HashMap;string::iterator it s.begin();int i 0;//标记元素下标while (it ! s.end())//初始化哈希表{if (HashMap.count(*it) > 0)//原先hash表中…

第十章 【后端】环境准备(10.10)——Nacos

10.10 Nacos 10.10.1 本地安装 下载 下载地址:https://nacos.io/download/nacos-server/ 解压缩 进入 bin 目录 启动(standalone代表着单机模式运行,非集群模式) ./startup.cmd -m standalone注意:PowerShell 下执行 .\startup.cmd -m standalone 命令

撤回仓库的提交

结论先行 未推送提交&#xff1a;可以使用 git reset --soft HEAD~1 或 git reset --hard HEAD~1 来撤回提交&#xff0c;选择保留或丢弃修改。已推送提交&#xff1a;可以使用 git reset --soft HEAD~1 后&#xff0c;通过 git push --force 强制覆盖远程的提交记录。 问题描…

react 组件通讯

组件通讯 组件是独立且封闭的单元&#xff0c;默认情况下&#xff0c;只能使用组件自己的数据。在组件化过程中&#xff0c;我们将一个完整的功能拆分成多个组件&#xff0c;以更好的完成整个应用的功能。而在这个过程中&#xff0c;多个组件之间不可避免的要共享某些数据。为…

东方博宜 24年9月-A组(萌新)- 巧克力

题目描述 假期快要结束了&#xff0c;小 A 打算好好犒劳一下自己&#xff0c;一路小跑来到超市&#xff0c;看到货架上的各种巧克力&#xff0c;都是自己爱吃的&#xff0c;一口气抓了一堆&#xff0c;有黑巧克力&#xff0c;白巧克力&#xff0c;牛奶巧克力&#xff0c;总共 …

【Java】网络编程:TCP_IP协议详解(IP协议数据报文及如何解决IPv4不够的状况)

&#x1f308;个人主页&#xff1a;努力学编程’ ⛅个人推荐&#xff1a; c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构&#xff0c;刷题刻不容缓&#xff1a;点击一起刷题 &#x1f319;心灵鸡汤&#xff1a;总有人要赢&#xff0c;为什么不能是我呢 &#x1f354…

Java 中使用 Redis 的几种方式优缺点对比

一、为什么选择 Redis&#xff1f; 在分析 Java 中使用 Redis 的不同方式之前&#xff0c;我们需要了解为什么 Redis 在分布式应用中如此重要。以下是 Redis 在 Java 项目中常见的应用场景&#xff1a; 缓存&#xff1a;通过将热点数据缓存到 Redis&#xff0c;可以减少数据库…

计算机网络 第三章: 点对点协议

文章目录 点对点协议PPP概述PPP的帧格式PPP桢的透明传输PPP帧的差错检测PPP的工作状态 点对点协议PPP概述 点对点协议&#xff08;Point-to-Point Protocol&#xff0c;PPP&#xff09;是目前使用最广泛的点对点数据链路层协议。 点对点协议PPP是因特网工程任务组&#xff08…

什么是蜘蛛池?有什么作用

在网络爬虫的世界里&#xff0c;蜘蛛池&#xff08;Spider Pool&#xff09;是一个专门用于管理和维护大量爬虫的系统。它为爬虫提供了一个集中的工作环境&#xff0c;使得爬虫能够更高效、更稳定地进行数据抓取。本文将探讨蜘蛛池的概念、组成以及它在现代网络爬虫技术中的作用…

CSU18M91四电极测脂模块开发体脂秤方案

一台体脂秤通过测试体重、体脂、BMI、水分等数据并给出相应提示&#xff0c;并且许多人都将体脂检测数据作为身体健康指数衡量标准&#xff0c;辅助用户来关注身体健康&#xff0c;同时可以通过蓝牙与手机APP应用相连&#xff0c;记录日常身体变化情况&#xff0c;根据变化情况…

黑神话悟空mac可以玩吗

黑神话悟空mac上能不能玩对于苹果玩家来说很重要&#xff0c;那么黑神话悟空mac可以玩吗&#xff1f;目前是玩不了了&#xff0c;没有针对ios系统的版本&#xff0c;只能之后在云平台上找找了&#xff0c;大家可以再观望下看看。 黑神话悟空mac可以玩吗 ‌使用CrossOver‌&…

前端面试常见手写题

实现一个new操作符 //实现一个new操作符 function myNew(fn,...args){if(typeof fn ! function) {throw (fn is not a function)}//将对象的原型设置为fn的prototypelet resObject.create(fn.prototype)//使用 apply 执行构造函数 并传入参数 arguments 获取函数的返回值let r…

区块链审计 如何测试solidity的bool值占用几个字节

文章目录 艾里卡的bool类型有多大&#xff1f;代码环节 艾丽卡更精确的测试bool代码环节 bool的gas疑惑&#xff1f; 艾里卡的bool类型有多大&#xff1f; 木森和艾丽卡坐在他们的实验室里&#xff0c;面前摆着一本魔法书和一些奇怪的魔法工具。他们正在进行一项重要的研究——…

Spring Boot-静态资源管理问题

在Spring Boot中&#xff0c;静态资源管理是构建现代Web应用程序时必不可少的一部分。无论是处理静态页面、图片、CSS、JavaScript文件&#xff0c;还是一些自定义文件&#xff0c;正确管理这些资源能够提升用户体验和优化应用的性能。 1. Spring Boot中的静态资源管理概述 S…

RedisTemplate混用带来的序列化问题

最近在工作中发现一个现象&#xff0c;项目中使用了不同的 RedisTemplate 来操作redis&#xff0c;有的同事用默认的 RedisTemplate &#xff0c;有的同事用 StringRedisTemplate。这就导致了我本次遇到的问题&#xff1a; 在一次需求中&#xff0c;我需要从 redis 中取值&…

在单片机中,处于高阻态是什么状态

在单片机&#xff08;微控制器&#xff09;中&#xff0c;高阻态&#xff08;High-Z&#xff0c;High Impedance State&#xff09;是指引脚的电气特性类似于没有连接状态&#xff0c;即该引脚的电流非常小&#xff0c;几乎不对电路产生影响。具体来说&#xff0c;高阻态具有以…

爬虫逆向学习(六):补环境过某数四代

声明&#xff1a;本篇文章内容是整理并分享在学习网上各位大佬的优秀知识后的实战与踩坑记录 引用博客&#xff1a; https://blog.csdn.net/shayuchaor/article/details/103629294 https://blog.csdn.net/qq_36291294/article/details/128600583 https://blog.csdn.net/weixin_…