Kafka消息自定义序列化

server/2025/3/26 5:55:36/

文章目录

  • 1. 默认序列化
  • 2.自定义序列化
  • 3.示例
  • 4.自定义解序列化器


1. 默认序列化

在网络中发送数据都是以字节的方式,Kafka也不例外。Apache Kafka支持用户给broker发送各种类型的消息。它可以是一个字符串、一个整数、一个数组或是其他任意的对象类型。序列化器(serializer)负责在producer发送前将消息转换成字节数组;而与之相反,解序列化器(deserializer)则用于将consumer接收到的字节数组转换成相应的对象。
常见的serializer有:

  • ByteArraySerializer:本质上什么都不用做,因为已经是字节数组了。
  • ByteBufferSerializer:列化ByteBuffer。
  • BytesSerializer:序列化Kafka自定义的 Bytes 类。
  • DoubleSerializer:列化 Double 类型
  • IntegerSerializer:列化Integer 类型
  • LongSerializer:序列化Long类型。
  • StringSerializer:序列化 String 类型。

producer的序列化机制使用起来非常简单,只需要在构造producer时同时指定参数key.serializer 和 value.serializer的值即可,用户可以为消息的key和value 指定不同类型的 serializer,只要与解序列类型分别保持一致就可以。

2.自定义序列化

Kafka支持用户自定义消息序列化。若要编写一个自定义的serializer,需要完成以下3件事情。
1)定义数据对象格式。
2)创建自定义序列化类,实现 org.apache.kafka.common.serialization.Serializer 接口,在serializer方法中实现序列化逻辑。
3)在用于构造KafkaProducer 的Properties 对象中设置 key.serializer 或 value.serializer取决于是为消息key还是 value 做自定义序列化。

3.示例

下面结合一个实例来说明如何创建自定义的serializer。首先定义待序列化的数据对象。本例中使用一个简单的Java POJO对象,如下面的代码所示:

public class User {private String firstName;private String lastName;private int age;private String address;public User(String firstName, String lastName, int age, String address) {this.firstName = firstName;this.lastName = lastName;this.age = age;this.address = address;}@Overridepublic String toString() {return "User{" +"firstName='" + firstName + '\'' +", lastName='" + lastName + '\'' +", age=" + age +", address='" + address + '\'' +'}';}
}

接下来创建 serializer。本例中使用了jackson-mapper-asl包的 ObjectMapper 帮助我们直接把对象转成字节数组。为了使用该类,你需要在producer工程中增加依赖:

<dependency><groupId>org.codehaus.jackson</groupId><artifactId>jackson-mapper-asl</artifactId><version>1.9.13</version>
</dependency>

UserSerializer代码如下:

import org.apache.kafka.common.serialization.Serializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
public class UserSerializer implements Serializer<User> {private ObjectMapper objectMapper;@Overridepublic void configure(Map configs, boolean isKey) {objectMapper=new ObjectMapper();}@Overridepublic byte[] serialize(String topic, User data) {byte[] ret =null;try {if (data == null){System.out.println("Null received at serializing");return null;}ret=objectMapper.writeValueAsString(data).getBytes();} catch (IOException e) {e.printStackTrace();}return ret;}@Overridepublic void close() {}
}

指定Serializer,然后构建消息发送:

import com.exm.collectcodenew.kafka.producer.customSerializer.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");//必须指定props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//必须指定props.put("value.serializer", "com.exm.collectcodenew.kafka.producer.customSerializer.UserSerializer");//必须指定props.put("acks", "-1");props.put("retries", 3);props.put("batch.size", 323840);props.put("linger.ms", 10);props.put("buffer.memory", 33554432);props.put("max.block.ms", 3000);props.put("partitioner.class","com.exm.collectcodenew.kafka.producer.customPartitioner.AuditPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);//构建User对象User user = new User("Z","tt",18,"Beijing,China");ProducerRecord record = new ProducerRecord("topic-test",user);producer.send(record);producer.close();}
}

4.自定义解序列化器

Kafka支持用户自定义消息的deserializer。成功编写一个自定义的deserializer需要完成以下3件事情。
1)定义或复用 serializer 的数据对象格式,
2) 创建自定义 deserializer 类,令其实现 org.apache.kafka.common.serialization.Deserializer接口。在deserializer方法中实现 deserialize 逻辑。
3)在构造KafkaConsumer的Properties对象中设置key.deserializer和(或)value.deserializer为上一步的实现类。
依然使用序列化中的User 例子来实现自定义的 deserializer。代码如下。

import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.Map;
public class UserDeserializer implements Deserializer {private ObjectMapper objectMapper;@Overridepublic void configure(Map configs, boolean isKey) {objectMapper = new ObjectMapper();}@Overridepublic Object deserialize(String topic, byte[] data) {User user =null;try {user=objectMapper.readValue(data,User.class);} catch (IOException e) {throw new RuntimeException(e);}finally {return user;}}@Overridepublic void close() {}
}

消费者代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");//必须指定props.put("group.id","test-group");//必须指定props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//必须指定props.put("value.deserializer", "com.exm.collectcodenew.kafka.producer.customSerializer.UserDeserializer");//必须指定props.put("enable.auto.commit","true");props.put("auto.commit.interval.ms","1000");props.put("auto.offset.reset","earliest");//从最早的消息开始读取KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//创建consumer实例consumer.subscribe(Arrays.asList("topic-test"));while(true){ConsumerRecords<String,String> records=consumer.poll(1000);for (ConsumerRecord<String, String> record: records){System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());}}}
}

http://www.ppmy.cn/server/177162.html

相关文章

基于Python+Ollama DeepSeek与MySQL进行数据分析探索

目录 1、前言 2、环境准备 3、本地部署 3.1、安装ollama 3.2、部署deepseek-r1模型 4、Python接入本地模型 4.1、示例代码 4.2、附加一些功能 4.3、其他的数据库 4.3.1、PostgreSQL 示例 4.3.2、SQLite 示例 5、总结 1、前言 在当今数据驱动的时代&#xff0c;数据…

开源模型应用落地-LangGraph101-多智能体协同实践(六)

一、前言 随着人工智能技术的快速发展,如何高效处理复杂任务成了 AI 系统的一大挑战。传统的线性架构在面对多轮对话和动态决策时常常显得无能为力。而 LangGraph 这种多智能体合作框架的出现,为这个问题提供了新的解决方案。 相关文章: 开源模型应用落地-LangGraph101-探索…

Java 中 PriorityQueue 的底层数据结构及相关分析

Java 中 PriorityQueue 的底层数据结构及相关分析 1. PriorityQueue 的底层数据结构 在 Java 中&#xff0c;PriorityQueue 的底层数据结构是基于堆&#xff08;Heap&#xff09;实现的二叉堆&#xff08;Binary Heap&#xff09;&#xff0c;默认使用最小堆&#xff08;Min …

MySQL-sql优化

插入数据 insert优化 批量插入 insert into tb_users values (1,cat),(2,mouse),(3,bird); 手动提交事务 因为MySQL中默认的事务提交方式为自动提交&#xff0c;所以在每次插入数据时&#xff0c;就是涉及到事务频繁地开启和关闭。所以将事务提交方式改成自动提交&#xf…

wps字符很分散

出现的问题图 alt enter开启自动换行即可解决 检查全局自动换行设置 在 WPS 表格中&#xff0c;点击 ​​“文件”​ → ​​“选项”​ → ​​“高级”​ → 确保 ​​“自动换行”​ 选项开启。 ​使用快捷键快速调整 ​启用自动换行&#xff1a;选中文本后按 ​Alt Enter…

IDEA编码实用技巧

快速注释 Ctrl / 可以将选中的代码或者光标所在的那一行变为用//注释 Ctrl Shift / 可以将选中的代码或者光标所在的那一行变为用/**/注释 移动代码 Alt Shift 上下键 移动该行代码 移动 Ctrl enter&#xff08;回车&#xff09;将光标下面的代码都往下移动一行&a…

用 pytorch 从零开始创建大语言模型(四):从零开始实现一个用于生成文本的GPT模型

从零开始创建大语言模型&#xff08;Python/pytorch &#xff09;&#xff08;四&#xff09;&#xff1a;从零开始实现一个用于生成文本的GPT模型 4 从零开始实现一个用于生成文本的GPT模型4.1 编写 L L M LLM LLM架构4.2 使用层归一化对激活值进行标准化4.3 使用GELU激活函数…

IS-IS原理与配置

一、IS-IS概述 IS-IS&#xff08;Intermediate System to Intermediate System&#xff0c;中间系统到中间系统&#xff09;是ISO&#xff08;International Organization for Standardization&#xff0c;国际标准化组织&#xff09;为它的CLNP&#xff08;ConnectionLessNet…