SpringBoot 整合 Avro 与 Kafka

devtools/2024/12/26 22:42:30/

优质博文:IT-BLOG-CN

【需求】:生产者发送数据至 kafka 序列化使用 Avro,消费者通过 Avro 进行反序列化,并将数据通过 MyBatisPlus 存入数据库。

一、环境介绍

【1】Apache Avro 1.8;【2】Spring Kafka 1.2;【3】Spring Boot 1.5;【4】Maven 3.5;

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.codenotfound</groupId><artifactId>spring-kafka-avro</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-avro</name><description>Spring Kafka - Apache Avro Serializer Deserializer Example</description><url>https://www.codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.4.RELEASE</version></parent><properties><java.version>1.8</java.version><spring-kafka.version>1.2.2.RELEASE</spring-kafka.version><avro.version>1.8.2</avro.version></properties><dependencies><!-- spring-boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${spring-kafka.version}</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><version>${spring-kafka.version}</version><scope>test</scope></dependency><!-- avro --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>${avro.version}</version></dependency></dependencies><build><plugins><!-- spring-boot-maven-plugin --><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin><!-- avro-maven-plugin --><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>${avro.version}</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory><outputDirectory>${project.build.directory}/generated/avro</outputDirectory></configuration></execution></executions></plugin></plugins></build>
</project>

二、Avro 文件

【1】Avro 依赖于由使用JSON定义的原始类型组成的架构。对于此示例,我们将使用Apache Avro入门指南中的“用户”模式,如下所示。该模式存储在src / main / resources / avro下的 user.avsc文件中。我这里使用的是 electronicsPackage.avsc。namespace 指定你生成 java 类时指定的 package 路径,name 表时生成的文件。

{"namespace": "com.yd.cyber.protocol.avro","type": "record","name": "ElectronicsPackage","fields": [{"name":"package_number","type":["string","null"],"default": null},{"name":"frs_site_code","type":["string","null"],"default": null},{"name":"frs_site_code_type","type":["string","null"],"default":null},{"name":"end_allocate_code","type":["string","null"],"default": null},{"name":"code_1","type":["string","null"],"default": null},{"name":"aggregat_package_code","type":["string","null"],"default": null}]
}

【2】Avro附带了代码生成功能,该代码生成功能使我们可以根据上面定义的“用户”模式自动创建Java类。一旦生成了相关的类,就无需直接在程序中使用架构。这些类可以使用 avro-tools.jar 或项目是Maven 项目,调用 Maven Projects 进行 compile 自动生成 electronicsPackage.java 文件:如下是通过 maven 的方式

【3】这将导致生成一个 electronicsPackage.java 类,该类包含架构和许多 Builder构造 electronicsPackage对象的方法。

三、为 Kafka 主题生成 Avro消息

Kafka Byte 在其主题中存储和传输数组。但是,当我们使用 Avro对象时,我们需要在这些 Byte数组之间进行转换。在0.9.0.0版之前,Kafka Java API使用 Encoder/ Decoder接口的实现来处理转换,但是在新API中,这些已经被 Serializer/ Deserializer接口实现代替。Kafka附带了许多 内置(反)序列化器,但不包括Avro。为了解决这个问题,我们将创建一个 AvroSerializer类,该类Serializer专门为 Avro对象实现接口。然后,我们实现将 serialize() 主题名称和数据对象作为输入的方法,在本例中,该对象是扩展的 Avro对象 SpecificRecordBase。该方法将Avro对象序列化为字节数组并返回结果。这个类属于通用类,一次配置多次使用。

java">package com.yd.cyber.web.avro;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;/***  avro序列化类* @author zzx* @creat 2020-03-11-19:17*/
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> arg0, boolean arg1) {}@Overridepublic byte[] serialize(String topic, T data) {if(data == null) {return null;}DatumWriter<T> writer = new SpecificDatumWriter<>(data.getSchema());ByteArrayOutputStream byteArrayOutputStream  = new ByteArrayOutputStream();BinaryEncoder binaryEncoder  = EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream , null);try {writer.write(data, binaryEncoder);binaryEncoder.flush();byteArrayOutputStream.close();}catch (IOException e) {throw new SerializationException(e.getMessage());}return byteArrayOutputStream.toByteArray();}
}

四、AvroConfig 配置类

Avro 配置信息在 AvroConfig 配置类中,现在,我们需要更改,AvroConfig 开始使用我们的自定义 Serializer实现。这是通过将“ VALUE_SERIALIZER_CLASS_CONFIG”属性设置为 AvroSerializer该类来完成的。此外,我们更改了ProducerFactory 和KafkaTemplate 通用类型,使其指定 ElectronicsPackage 而不是 String。当我们有多个序列化的时候,这个配置文件需要多次需求,添加自己需要序列化的对象。

java">package com.yd.cyber.web.avro;/*** @author zzx* @creat 2020-03-11-20:23*/
@Configuration
@EnableKafka
public class AvroConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.max-request-size}")private String maxRequestSize;@Beanpublic Map<String, Object> avroProducerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);return props;}@Beanpublic ProducerFactory<String, ElectronicsPackage> elProducerFactory() {return new DefaultKafkaProducerFactory<>(avroProducerConfigs());}@Beanpublic KafkaTemplate<String, ElectronicsPackage> elKafkaTemplate() {return new KafkaTemplate<>(elProducerFactory());}
}

kafkaTemplate__210">五、通过 kafkaTemplate 发送消息

最后就是通过 Controller类调用 kafkaTemplate 的 send 方法接受一个Avro electronicsPackage对象作为输入。请注意,我们还更新了 kafkaTemplate 泛型类型。

java">package com.yd.cyber.web.controller.aggregation;import com.yd.cyber.protocol.avro.ElectronicsPackage;
import com.yd.cyber.web.vo.ElectronicsPackageVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;/*** <p>* InnoDB free: 4096 kB 前端控制器* </p>** @author zzx* @since 2020-04-19*/
@RestController
@RequestMapping("/electronicsPackageTbl")
public class ElectronicsPackageController {//日誌private static final Logger log = LoggerFactory.getLogger(ElectronicsPackageController.class);@Resourceprivate KafkaTemplate<String,ElectronicsPackage> kafkaTemplate;@GetMapping("/push")public void push(){ElectronicsPackageVO electronicsPackageVO = new ElectronicsPackageVO();electronicsPackageVO.setElectId(9);electronicsPackageVO.setAggregatPackageCode("9");electronicsPackageVO.setCode1("9");electronicsPackageVO.setEndAllocateCode("9");electronicsPackageVO.setFrsSiteCodeType("9");electronicsPackageVO.setFrsSiteCode("9");electronicsPackageVO.setPackageNumber("9");ElectronicsPackage electronicsPackage = new ElectronicsPackage();BeanUtils.copyProperties(electronicsPackageVO,electronicsPackage);//发送消息kafkaTemplate.send("Electronics_Package",electronicsPackage);log.info("Electronics_Package TOPIC 发送成功");}
}

六、从 Kafka主题消费 Avro消息反序列化

收到的消息需要反序列化为 Avro格式。为此,我们创建一个 AvroDeserializer 实现该 Deserializer接口的类。该 deserialize()方法将主题名称和Byte数组作为输入,然后将其解码回Avro对象。从 targetType类参数中检索需要用于解码的模式,该类参数需要作为参数传递给 AvroDeserializer构造函数。

java">package com.yd.cyber.web.avro;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.xml.bind.DatatypeConverter;/***  avro反序列化* @author fuyx* @creat 2020-03-12-15:19*/
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {//日志系统private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class);protected final Class<T> targetType;public AvroDeserializer(Class<T> targetType) {this.targetType = targetType;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> arg0, boolean arg1) {}@Overridepublic T deserialize(String topic, byte[] data) {try {T result = null;if(data == null) {return null;}LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));ByteArrayInputStream in = new ByteArrayInputStream(data);DatumReader<GenericRecord> userDatumReader = new SpecificDatumReader<>(targetType.newInstance().getSchema());BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);result = (T) userDatumReader.read(null, decoder);LOGGER.debug("deserialized data='{}'", result);return result;} catch (Exception ex) {throw new SerializationException("Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);} finally {}}
}

七、反序列化的配置类

我将反序列化的配置和序列化的配置都放置在 AvroConfig 配置类中。在 AvroConfig 需要被这样更新了AvroDeserializer用作值“VALUE_DESERIALIZER_CLASS_CONFIG”属性。我们还更改了 ConsumerFactory 和 ConcurrentKafkaListenerContainerFactory通用类型,以使其指定 ElectronicsPackage 而不是 String。将 DefaultKafkaConsumerFactory 通过1个新的创造 AvroDeserializer 是需要 “User.class”作为构造函数的参数。需要使用Class<?> targetType,AvroDeserializer 以将消费 byte[]对象反序列化为适当的目标对象(在此示例中为 ElectronicsPackage 类)。

java">@Configuration
@EnableKafka
public class AvroConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.max-request-size}")private String maxRequestSize;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro");return props;}@Beanpublic ConsumerFactory<String, ElectronicsPackage> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),new AvroDeserializer<>(ElectronicsPackage.class));}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, ElectronicsPackage> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, ElectronicsPackage> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}}

八、消费者消费消息

消费者通过 @KafkaListener 监听对应的 Topic ,这里需要注意的是,网上直接获取对象的参数传的是对象,比如这里可能需要传入 ElectronicsPackage 类,但是我这样写的时候,error日志总说是返回序列化的问题,所以我使用 GenericRecord 对象接收,也就是我反序列化中定义的对象,是没有问题的。然后我将接收到的消息通过 mybatisplus 存入到数据库。

java">package com.zzx.cyber.web.controller.dataSource.intercompany;import com.zzx.cyber.web.service.ElectronicsPackageService;
import com.zzx.cyber.web.vo.ElectronicsPackageVO;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Controller;import javax.annotation.Resource;/*** @desc:* @author: zzx* @creatdate 2020/4/1912:21*/
@Controller
public class ElectronicsPackageConsumerController {//日志private static final Logger log  = LoggerFactory.getLogger(ElectronicsPackageConsumerController.class);//服务层@Resourceprivate ElectronicsPackageService electronicsPackageService;/*** 扫描数据测试* @param genericRecordne*/@KafkaListener(topics = {"Electronics_Package"})public void receive(GenericRecord genericRecordne) throws Exception {log.info("数据接收:electronicsPackage + "+  genericRecordne.toString());//业务处理类,mybatispuls 自动生成的类ElectronicsPackageVO electronicsPackageVO = new ElectronicsPackageVO();//将收的数据复制过来BeanUtils.copyProperties(genericRecordne,electronicsPackageVO);try {//落库log.info("数据入库");electronicsPackageService.save(electronicsPackageVO);} catch (Exception e) {throw new Exception("插入异常"+e);}}
}

http://www.ppmy.cn/devtools/145645.html

相关文章

VScode 查看linux 内核代码

0&#xff0c;安装c.c 1&#xff0c;查看linux 目录下的linux代码&#xff0c;安装remote ssh 2&#xff0c; 输入服务器IP 3 选择服务器为linux

2025考研加油!Jing也加油哦!

一恍惚&#xff0c;离自己考研初试过去都两年了&#xff01;研究生生活也过去一大半&#xff01;借此机会也总结一下研究生这一段生活——研究生生活&#xff08;上&#xff09; About I 昨天实验室聚餐&#xff0c;作为老生欢迎新生&#xff0c;啊啊啊&#xff0c;真的没想到…

Android笔试面试题AI答之Android基础(3)

文章目录 1.谈一谈 Android 的安全机制一、系统架构层面的安全设计二、核心安全机制三、其他安全机制与措施 2.Android 的四大组件是哪四大&#xff1f;3.Android 的四大组件都需要在清单文件中注册吗&#xff1f;4.介绍几个常用的Linux命令一、文件和目录管理二、用户和权限管…

STM32-笔记7-继电器定时开闭

1、复制02项目&#xff0c;重命名08-继电器定时开闭 打开项目工程 在\Drivers\BSP\该路径下&#xff0c;新建alarm文件夹&#xff0c;该文件夹下里面包含alarm.c和alarm.h文件 加载进该项目中 为什么这里使用的是 这个单词&#xff0c;而不是继电器&#xff08;relay&#…

TypeScript 流程控制语句

文章目录 前言一、if - else 与 else - if 条件判断二、switch 语句的使用及注意事项三、for 循环&#xff08;常规、for - in、for - of&#xff09;&#xff08;一&#xff09;常规 for 循环&#xff08;二&#xff09;for - in 循环&#xff08;三&#xff09;for - of 循环…

时间轮在 Netty , Kafka 中的设计与实现

本文基于 Netty 4.1.112.Final , Kafka 3.9.0 版本进行讨论 在业务开发的场景中&#xff0c;我们经常会遇到很多定时任务的需求。比如&#xff0c;生成业务报表&#xff0c;周期性对账&#xff0c;同步数据&#xff0c;订单支付超时处理等。针对业务场景中定时任务逻辑复杂&…

Artec Space Spider助力剑桥研究团队解码古代社会合作【沪敖3D】

挑战&#xff1a;考古学家需要一种安全的方法来呈现新出土的陶瓷容器&#xff0c;对比文物形状。 解决方案&#xff1a;Artec Space Spider, Artec Studio 效果&#xff1a;本项目是REVERSEACTION项目的一部分&#xff0c;旨在研究无国家社会中复杂的古代技术。研究团队在考古地…

Grafana服务监控与日志查询可视化

目录 Grafana 简介 使用 1. 选择日志采集工具 2. 选择日志存储系统 3. 配置日志采集工具 3.1 使用 Filebeat 采集日志 3.2 使用 Promtail 采集日志 4. 配置日志存储系统 4.1 配置 Elasticsearch 4.2 配置 Loki 5. 配置 Grafana 5.1 安装 Grafana 5.2 添加数据源 …