SpringBoot 整合 Avro 与 Kafka

ops/2025/1/23 3:17:37/

优质博文:IT-BLOG-CN

【需求】:生产者发送数据至 kafka 序列化使用 Avro,消费者通过 Avro 进行反序列化,并将数据通过 MyBatisPlus 存入数据库。

一、环境介绍

【1】Apache Avro 1.8;【2】Spring Kafka 1.2;【3】Spring Boot 1.5;【4】Maven 3.5;

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.codenotfound</groupId><artifactId>spring-kafka-avro</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-avro</name><description>Spring Kafka - Apache Avro Serializer Deserializer Example</description><url>https://www.codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.4.RELEASE</version></parent><properties><java.version>1.8</java.version><spring-kafka.version>1.2.2.RELEASE</spring-kafka.version><avro.version>1.8.2</avro.version></properties><dependencies><!-- spring-boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${spring-kafka.version}</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><version>${spring-kafka.version}</version><scope>test</scope></dependency><!-- avro --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>${avro.version}</version></dependency></dependencies><build><plugins><!-- spring-boot-maven-plugin --><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin><!-- avro-maven-plugin --><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>${avro.version}</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory><outputDirectory>${project.build.directory}/generated/avro</outputDirectory></configuration></execution></executions></plugin></plugins></build>
</project>

二、Avro 文件

【1】Avro 依赖于由使用JSON定义的原始类型组成的架构。对于此示例,我们将使用Apache Avro入门指南中的“用户”模式,如下所示。该模式存储在src / main / resources / avro下的 user.avsc文件中。我这里使用的是 electronicsPackage.avsc。namespace 指定你生成 java 类时指定的 package 路径,name 表时生成的文件。

{"namespace": "com.yd.cyber.protocol.avro","type": "record","name": "ElectronicsPackage","fields": [{"name":"package_number","type":["string","null"],"default": null},{"name":"frs_site_code","type":["string","null"],"default": null},{"name":"frs_site_code_type","type":["string","null"],"default":null},{"name":"end_allocate_code","type":["string","null"],"default": null},{"name":"code_1","type":["string","null"],"default": null},{"name":"aggregat_package_code","type":["string","null"],"default": null}]
}

【2】Avro附带了代码生成功能,该代码生成功能使我们可以根据上面定义的“用户”模式自动创建Java类。一旦生成了相关的类,就无需直接在程序中使用架构。这些类可以使用 avro-tools.jar 或项目是Maven 项目,调用 Maven Projects 进行 compile 自动生成 electronicsPackage.java 文件:如下是通过 maven 的方式

【3】这将导致生成一个 electronicsPackage.java 类,该类包含架构和许多 Builder构造 electronicsPackage对象的方法。

三、为 Kafka 主题生成 Avro消息

Kafka Byte 在其主题中存储和传输数组。但是,当我们使用 Avro对象时,我们需要在这些 Byte数组之间进行转换。在0.9.0.0版之前,Kafka Java API使用 Encoder/ Decoder接口的实现来处理转换,但是在新API中,这些已经被 Serializer/ Deserializer接口实现代替。Kafka附带了许多 内置(反)序列化器,但不包括Avro。为了解决这个问题,我们将创建一个 AvroSerializer类,该类Serializer专门为 Avro对象实现接口。然后,我们实现将 serialize() 主题名称和数据对象作为输入的方法,在本例中,该对象是扩展的 Avro对象 SpecificRecordBase。该方法将Avro对象序列化为字节数组并返回结果。这个类属于通用类,一次配置多次使用。

package com.yd.cyber.web.avro;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;/***  avro序列化类* @author zzx* @creat 2020-03-11-19:17*/
public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> arg0, boolean arg1) {}@Overridepublic byte[] serialize(String topic, T data) {if(data == null) {return null;}DatumWriter<T> writer = new SpecificDatumWriter<>(data.getSchema());ByteArrayOutputStream byteArrayOutputStream  = new ByteArrayOutputStream();BinaryEncoder binaryEncoder  = EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream , null);try {writer.write(data, binaryEncoder);binaryEncoder.flush();byteArrayOutputStream.close();}catch (IOException e) {throw new SerializationException(e.getMessage());}return byteArrayOutputStream.toByteArray();}
}

四、AvroConfig 配置类

Avro 配置信息在 AvroConfig 配置类中,现在,我们需要更改,AvroConfig 开始使用我们的自定义 Serializer实现。这是通过将“ VALUE_SERIALIZER_CLASS_CONFIG”属性设置为 AvroSerializer该类来完成的。此外,我们更改了ProducerFactory 和KafkaTemplate 通用类型,使其指定 ElectronicsPackage 而不是 String。当我们有多个序列化的时候,这个配置文件需要多次需求,添加自己需要序列化的对象。

package com.yd.cyber.web.avro;/*** @author zzx* @creat 2020-03-11-20:23*/
@Configuration
@EnableKafka
public class AvroConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.max-request-size}")private String maxRequestSize;@Beanpublic Map<String, Object> avroProducerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);return props;}@Beanpublic ProducerFactory<String, ElectronicsPackage> elProducerFactory() {return new DefaultKafkaProducerFactory<>(avroProducerConfigs());}@Beanpublic KafkaTemplate<String, ElectronicsPackage> elKafkaTemplate() {return new KafkaTemplate<>(elProducerFactory());}
}

kafkaTemplate__217">五、通过 kafkaTemplate 发送消息

最后就是通过 Controller类调用 kafkaTemplate 的 send 方法接受一个Avro electronicsPackage对象作为输入。请注意,我们还更新了 kafkaTemplate 泛型类型。

package com.yd.cyber.web.controller.aggregation;import com.yd.cyber.protocol.avro.ElectronicsPackage;
import com.yd.cyber.web.vo.ElectronicsPackageVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;/*** <p>* InnoDB free: 4096 kB 前端控制器* </p>** @author zzx* @since 2020-04-19*/
@RestController
@RequestMapping("/electronicsPackageTbl")
public class ElectronicsPackageController {//日誌private static final Logger log = LoggerFactory.getLogger(ElectronicsPackageController.class);@Resourceprivate KafkaTemplate<String,ElectronicsPackage> kafkaTemplate;@GetMapping("/push")public void push(){ElectronicsPackageVO electronicsPackageVO = new ElectronicsPackageVO();electronicsPackageVO.setElectId(9);electronicsPackageVO.setAggregatPackageCode("9");electronicsPackageVO.setCode1("9");electronicsPackageVO.setEndAllocateCode("9");electronicsPackageVO.setFrsSiteCodeType("9");electronicsPackageVO.setFrsSiteCode("9");electronicsPackageVO.setPackageNumber("9");ElectronicsPackage electronicsPackage = new ElectronicsPackage();BeanUtils.copyProperties(electronicsPackageVO,electronicsPackage);//发送消息kafkaTemplate.send("Electronics_Package",electronicsPackage);log.info("Electronics_Package TOPIC 发送成功");}
}

六、从 Kafka主题消费 Avro消息反序列化

收到的消息需要反序列化为 Avro格式。为此,我们创建一个 AvroDeserializer 实现该 Deserializer接口的类。该 deserialize()方法将主题名称和Byte数组作为输入,然后将其解码回Avro对象。从 targetType类参数中检索需要用于解码的模式,该类参数需要作为参数传递给 AvroDeserializer构造函数。

package com.yd.cyber.web.avro;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.xml.bind.DatatypeConverter;/***  avro反序列化* @author fuyx* @creat 2020-03-12-15:19*/
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {//日志系统private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class);protected final Class<T> targetType;public AvroDeserializer(Class<T> targetType) {this.targetType = targetType;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> arg0, boolean arg1) {}@Overridepublic T deserialize(String topic, byte[] data) {try {T result = null;if(data == null) {return null;}LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));ByteArrayInputStream in = new ByteArrayInputStream(data);DatumReader<GenericRecord> userDatumReader = new SpecificDatumReader<>(targetType.newInstance().getSchema());BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);result = (T) userDatumReader.read(null, decoder);LOGGER.debug("deserialized data='{}'", result);return result;} catch (Exception ex) {throw new SerializationException("Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);} finally {}}
}

七、反序列化的配置类

我将反序列化的配置和序列化的配置都放置在 AvroConfig 配置类中。在 AvroConfig 需要被这样更新了AvroDeserializer用作值“VALUE_DESERIALIZER_CLASS_CONFIG”属性。我们还更改了 ConsumerFactory 和 ConcurrentKafkaListenerContainerFactory通用类型,以使其指定 ElectronicsPackage 而不是 String。将 DefaultKafkaConsumerFactory 通过1个新的创造 AvroDeserializer 是需要 “User.class”作为构造函数的参数。需要使用Class<> targetType,AvroDeserializer 以将消费 byte[]对象反序列化为适当的目标对象(在此示例中为 ElectronicsPackage 类)。

@Configuration
@EnableKafka
public class AvroConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.max-request-size}")private String maxRequestSize;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro");return props;}@Beanpublic ConsumerFactory<String, ElectronicsPackage> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),new AvroDeserializer<>(ElectronicsPackage.class));}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, ElectronicsPackage> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, ElectronicsPackage> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}}

八、消费者消费消息

消费者通过 @KafkaListener 监听对应的 Topic ,这里需要注意的是,网上直接获取对象的参数传的是对象,比如这里可能需要传入 ElectronicsPackage 类,但是我这样写的时候,error日志总说是返回序列化的问题,所以我使用 GenericRecord 对象接收,也就是我反序列化中定义的对象,是没有问题的。然后我将接收到的消息通过 mybatisplus 存入到数据库。

package com.zzx.cyber.web.controller.dataSource.intercompany;import com.zzx.cyber.web.service.ElectronicsPackageService;
import com.zzx.cyber.web.vo.ElectronicsPackageVO;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Controller;import javax.annotation.Resource;/*** @desc:* @author: zzx* @creatdate 2020/4/1912:21*/
@Controller
public class ElectronicsPackageConsumerController {//日志private static final Logger log  = LoggerFactory.getLogger(ElectronicsPackageConsumerController.class);//服务层@Resourceprivate ElectronicsPackageService electronicsPackageService;/*** 扫描数据测试* @param genericRecordne*/@KafkaListener(topics = {"Electronics_Package"})public void receive(GenericRecord genericRecordne) throws Exception {log.info("数据接收:electronicsPackage + "+  genericRecordne.toString());//业务处理类,mybatispuls 自动生成的类ElectronicsPackageVO electronicsPackageVO = new ElectronicsPackageVO();//将收的数据复制过来BeanUtils.copyProperties(genericRecordne,electronicsPackageVO);try {//落库log.info("数据入库");electronicsPackageService.save(electronicsPackageVO);} catch (Exception e) {throw new Exception("插入异常"+e);}}
}

http://www.ppmy.cn/ops/152357.html

相关文章

【STM32-学习笔记-10-】BKP备份寄存器+时间戳

文章目录 BKP备份寄存器Ⅰ、BKP简介1. BKP的基本功能2. BKP的存储容量3. BKP的访问和操作4. BKP的应用场景5. BKP的控制寄存器 Ⅱ、BKP基本结构Ⅲ、BKP函数Ⅳ、BKP使用示例 时间戳一、Unix时间戳二、时间戳的转换&#xff08;time.h函数介绍&#xff09;Ⅰ、time()Ⅱ、mktime()…

MySQL日期时间函数详解

简介 本文主要讲解MySQL中的日期时间函数&#xff0c;包括&#xff1a;NOW、CURRENT_TIMESTAMP、CURDATE、CURRENT_DATE、CURTIME、CURRENT_TIME、STR_TO_DATE、DATE_FORMAT、TIME_FORMAT、DATE、TIME、YEAR、MONTH、DAY、HOUR、MINUTE、SECOND、QUARTER、YEARWEEK、WEEKDAY、…

Nginx 反向代理与负载均衡配置实践

一、引言 在当今互联网架构中&#xff0c;Nginx作为一款高性能的HTTP和反向代理服务器&#xff0c;广泛应用于各种场景&#xff0c;为众多网站和应用提供了强大的支持。它能够高效地处理大量并发请求&#xff0c;实现反向代理与负载均衡功能&#xff0c;显著提升系统的性能、可…

C#语言的学习路线

C#语言的学习路线 C#作为一种现代编程语言&#xff0c;凭借其简洁的语法、强大的功能和广泛的应用&#xff0c;得到了越来越多开发者的青睐。无论是开发桌面应用、Web应用、游戏&#xff0c;还是云服务&#xff0c;C#都有着广泛的应用场景。本文将为有志于学习C#的读者提供一条…

macOS如何进入 Application Support 目录(cd: string not in pwd: Application)

错误信息 cd: string not in pwd: Application 表示在当前目录下找不到名为 Application Support 的目录。可能的原因如下&#xff1a; 拼写错误或路径错误&#xff1a;确保你输入的目录名称正确。目录名称是区分大小写的&#xff0c;因此请确保使用正确的大小写。正确的目录名…

【Linux】Linux命令:free

目录 1、作用2、命令使用格式3、常用参数说明4、输出结果说明4.1 行字段说明4.2 列字段说明 5、示例5.1 以人类易读的方式显示内存使用情况5.2 显示内存总和行5.3 以2秒为间隔&#xff0c;持续输出内存使用情况5.4 以2秒为间隔&#xff0c;输出5次内存使用情况 1、作用 free命令…

【Red Hat8】:搭建FTP服务器

目录 一、匿名FTP访问 1、新建挂载文件 2、挂载 3、关闭防火墙 4、搭建yum源 5、安装VSFTPD 6、 打开配置文件 7、设置配置文件如下几个参数 8、重启vsftpd服务 9、进入图形化界面配置网络 10、查看IP地址 11、安装ftp服务 12、遇到拒绝连接 13、测试 二、本地…

QT:QTabWidget设置tabPosition为West时,文字向上

解决办法1&#xff08;无效&#xff09; tabWidget->setStyleSheet("QTabBar::tab { min-width: 100px; } QTabBar::tab:down { spacing: 2px; } QTabBar::tab:down { transform: rotate(270deg); }"); 解决办法2&#xff08;无效&#xff09; 写QTabBar。 pa…