【实战-08】flink 消费kafka自定义序列化

news/2025/2/12 16:47:06/

目的

让从kafka消费出来的数据,直接就转换成我们的对象

mvn pom

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.boke</groupId><artifactId>Flink1.7.1</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
<!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
<!--			<scope>provided</scope>--></dependency><!-- table 环境依赖【connectors 和 formats 和driver】 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/configuration/overview/		--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version>
<!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version>
<!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.18</version></dependency><!--idea 运行比西甲这个否则报错:【 Make sure a planner module is on the classpath】--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency><!--第三方的包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.boke.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

核心代码

package com.boke.kafka;

import com.alibaba.fastjson.JSONObject;

public class Student {
public String name;
public Integer age;

public Student(String name, Integer age) {this.name = name;this.age = age;
}public static Student fromJson(String s){JSONObject jsonObject = JSONObject.parseObject(s);String name = jsonObject.getString("name");Integer age = jsonObject.getInteger("age");return new Student(name,age);
}public String getName() {return name;
}public void setName(String name) {this.name = name;
}public Integer getAge() {return age;
}public void setAge(Integer age) {this.age = age;
}

}

//下面是main主函数
package com.boke.kafka;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.nio.charset.StandardCharsets;

public class kafkaSource {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource source = KafkaSource.builder()
.setBootstrapServers(“brokers”)
.setTopics(“input-topic”)
.setGroupId(“my-group”)
.setStartingOffsets(OffsetsInitializer.earliest())//【无论如何都从最早开始消费】
// .setStartingOffsets(OffsetsInitializer.latest())//【无论如何都从最新开始消费】
// .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))//【groupid 存在offset 则从offset消费,否则从最早开始消费】
// .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))//【groupid 存在offset 则从offset消费,否则从最新开始消费】

// .setDeserializer(KafkaRecordDeserializationSchema.of(new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema())))
// .setDeserializer(KafkaRecordDeserializationSchema.of(new SimpleStringSchema());
.setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializationSchema()))
// .setDeserializer(KafkaRecordDeserializationSchema.valueOnly())
.build();

    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
}

}
class MyKafkaDeserializationSchema implements KafkaDeserializationSchema{

@Override
public boolean isEndOfStream(Student nextElement) {return false;
}

//Deserializes the Kafka record.
//Params:
//record – Kafka record to be deserialized.
//Returns:
//The deserialized message as an object (null if the message cannot be deserialized).
@Override
public Student deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
/*
*自定义kafka反序列化
*如果数据异常,可以直接返回nulll即可,源码中有一句英文:null if the message cannot be deserialized
* */
String topic = record.topic();
long KafkaTimeStamp = record.timestamp();
int partitionNum = record.partition();
String value = new String(record.value(), StandardCharsets.UTF_8);
return Student.fromJson(value);
}

@Override
public TypeInformation<Student> getProducedType() {return TypeInformation.of(new TypeHint<Student>() {});
}

}


http://www.ppmy.cn/news/1200787.html

相关文章

如果你们团队想提升剪辑效率,这个批量剪辑神器不可错过

实话实说&#xff0c;现在市场上批量剪辑视频的软件真的特别多&#xff0c;但是其实仔细了解下&#xff0c;会发现功能都是大差不差&#xff0c;但又有一些细微的差别&#xff0c;让人难以抉择。 今天给大家推荐一款个人觉得性价比很高的软件——超级编导。 首先&#xff0c;…

支付宝AI布局: 新产品助力小程序智能化,未来持续投入加速创新

支付宝是全球领先的独立第三方支付平台&#xff0c;致力于为广大用户提供安全快速的电子支付/网上支付/安全支付/手机支付体验&#xff0c;及转账收款/水电煤缴费/信用卡还款/AA收款等生活服务应用。 支付宝不仅是一个支付工具&#xff0c;也是一个数字生活平台&#xff0c;通过…

智慧班牌系统全套解决方案 智慧校园云平台

随着智能的不断发展&#xff0c;学校也有了更多智能化的应用&#xff0c;传统教育信息化水平低、校园和班级文化建设、日常教学管理缺少有力的数字抓手&#xff0c;家校通缺乏渠道&#xff0c;无法及时掌握孩子在校情况&#xff0c;学校教育和家庭教育出现断层&#xff0c;存在…

【多媒体文件格式】MTS、M2TS

MTS MTS 文件是由典型摄像机拍摄的 AVCHD&#xff08;高级视频编码高清&#xff09;视频文件。此文件格式是许多 AVCHD 兼容摄像机&#xff08;如 Sony 和 Panasonic HD 摄像机&#xff09;使用的标准视频格式。 MTS 允许您录制高清 (HD) 视频以在您的计算机上转换或共享它们。…

第五章 多维数组和广义表【数据结构与算法】【精致版】

第五章 多维数组和广义表【数据结构与算法】 前言版权第5章 多维数组和广义表5.1 应用实例5.2 多维数组5.3 矩阵的压缩存储5.3.1 特殊矩阵5.3.2 稀疏矩阵 5.4 广义表5.4.1广义表的概念5.4.2广义表的存储5.4.3 广义表的操作 5.5实例分析与实现 习题1.单项选择题2.完成题3.算法设…

icpc合肥(A-Welcome to USTC)

思路&#xff1a; 条件&#xff1a;标定字符串&#xff0c;保证其中有USTC且按顺序&#xff1b;问题&#xff1a;求将之合并最短代价&#xff1b;显然考虑中点&#xff0c;容易证明中点最小&#xff0c;min(ans) x[4] - x[1] x[3] - x[2] - 2 - 2;直接计算输出即可&#xff…

linux之信号

Linux之信号 什么是信号信号的产生方式signalsignactionkill信号集信号屏蔽 什么是信号 信号机制是一种使用信号来进行进程之间传递消息的方法&#xff0c;信号的全称为软中断信号&#xff0c;简称软中断。 信号的本质是软件层次上对中断的一种模拟&#xff08;软中断&#xff…

业绩持续增长,“创新与变革”是云南白药发展的不二法门?

提及云南白药&#xff0c;大多数消费者的第一反应便是云南白药气雾剂、云南白药牙膏等产品。事实上&#xff0c;随着消费需求驱动、行业升级走向愈发明确&#xff0c;云南白药早已启动从传统中药制造企业到现代化大健康企业的转型&#xff0c;并持续产出成果。 近日&#xff0…