Flink系列-11、Flink DataStream的Sink

news/2024/11/30 18:53:12/

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

大数据系列文章目录

官方网址:https://flink.apache.org/

学习资料:https://flink-learning.org.cn/
在这里插入图片描述

目录

    • Flink在批处理中常见的sink
    • sink到本地集合
    • 基于文件的sink
      • StreamingFileSink
        • 文件格式
        • 桶分配逻辑
        • 滚动策略
        • 代码
    • sink到kafka
    • sink到mysql
    • sink到redis
      • RedisSink简介
      • 如何使用Redis Sink?

Flink在批处理中常见的sink

  • 基于本地集合的sink(Collection-based-sink)
    print
    printToErr
  • 基于文件的sink(File-based-sink)
  • 自定义的sink(Custom-based-sink)
  • 基于kafka的sink操作

sink到本地集合

package batch.sink;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author lwh* @date 2023/5/5* @description 到本地集合**/
public class SinkToLocalCollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6);// 打印source.print();// 带前缀打印source.print("我是前缀>>>");// 打印到stderr中source.printToErr();// 打印到stderr并带前缀source.printToErr("我是stderr前缀>>>");env.execute();}
}

基于文件的sink

通过writeAsText将数据写出。
支持本地文件和HDFS

package batch.sink;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author lwh* @date 2023/5/5* @description 基于文件的sink**/
public class SinkToLocalFileAndHDFSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Tuple3<Integer, String, Double>> source = env.fromElements(Tuple3.of(19, "潇潇", 170.50),Tuple3.of(11, "甜甜", 168.8),Tuple3.of(16, "刚刚", 178.8),Tuple3.of(19, "蛋蛋", 179.99));/*写文件可以设置为并行度为1, 避免产生出来多个文件*/// 写出到本地文件source.writeAsText("data/output/SinkToLocalFileAndHDFSDemo.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);// 写出到hdfssource.writeAsText("hdfs://node1:8020/output/SinkToLocalFileAndHDFSDemo.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);// 写出为csvsource.writeAsCsv("data/output/SinkToLocalFileAndHDFSDemo.csv",FileSystem.WriteMode.OVERWRITE,"\n",",").setParallelism(1);env.execute();}}

StreamingFileSink

通过上面的代码可以看出,writeAsText已经废弃,推荐使用StreamingFileSink
参见:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/streamfile_sink.html
这个连接器提供了一个 Sink 来将分区文件写入到支持 Flink FileSystem 接口的文件系统中。
Streaming File Sink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。
桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含一个部分文件(part file)。额外的部分文件(part file)将根据滚动策略创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。

使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。

在这里插入图片描述

由于我们用的是流任务,那么任务会一直持续进行,数据也会持续不断的写出,由于数据是源源不断的产生,那么就需要给数据设立边界,让其完成某个文件数据的写出。不然某个文件会一直处于写入状态中。
那么StreamingFileSink就是一个写出流数据的类
它会将数据分桶(分part)写出到文件中,按照指定规则(时间、文件大小等),完成某一part的写入过程。
比如:每隔1小时或者每当文件大小达到比如1GB的时候,就完成当前文件的写入,将状态标记为Finished,然后开启一个新文件继续写流数据。

数据在写出之前,在Flink内部会按照各个子任务(并行)划分数据桶,每个桶可以包含多个part文件
文件在写的过程中有3个状态:

  1. In-progress :当前文件正在写入中
  2. Pending :当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
  3. Finished :在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态

文件格式

StreamingFileSink 支持行编码格式和批量编码格式,比如 Apache Parquet 。这两种变体随附了各自的构建器,可以使用以下静态方法创建:

• Row-encoded sink: StreamingFileSink.forRowFormat(basePath, rowEncoder)
一次写入一行数据
• Bulk-encoded sink: StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)
一次写入一批数据, 如parquet、avro

桶分配逻辑

简单理解:如何划分桶
桶分配逻辑定义了如何将数据结构化为基本输出目录中的子目录
行格式和批量格式都使用 DateTimeBucketAssigner 作为默认的分配器。 默认情况下,DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶,格式如下: yyyy-MM-dd–HH 。日期格式(即桶的大小)和时区都可以手动配置。
我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner 。
Flink 有两个内置的 BucketAssigners :
• DateTimeBucketAssigner :默认基于时间的分配器
• BasePathBucketAssigner :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)

内置的不满足需求可以自定义实现BucketAssigner

滚动策略

简单理解:啥时候(按时间、按大小等)算完成1个文件的写入。
滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。
Flink 有两个内置的滚动策略:

• DefaultRollingPolicy
核心策略:

  1. 当没有正在写入的part文件的时候,不工作,
  2. 当文件达到最大桶大小的时候关闭文件完成写入 (by default 128MB), 可设置
  3. 当前写入文件写入时长超过默认间隔 (by default 60 sec), 或者 可设置
  4. 当前文件一定时间内没有写入(by default 60 sec). 可设置

• OnCheckpointRollingPolicy
核心策略:
当进行一次CheckPoint活动的时候,完成当前文件的写入(跟随检查点的节奏走)

内置的不满足需求可以自定义实现RollingPolicy

代码

package batch.sink;import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;/*** @author lwh* @date 2023/5/5* @description StreamingFileSink**/
public class StreamingFileSinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);env.setStateBackend(new FsStateBackend("file:///D:\\checkpoint"));env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// Socket SourceDataStreamSource<String> socketTextStream = env.socketTextStream("node1", 9999);// 泛型指的是处理的数据类型是什么StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path("data/output/sink3"),      // 文件写出的路径// 文件写出的序列化器和编码new SimpleStringEncoder<String>("UTF-8"))// 桶分配策略.withBucketAssigner(new BasePathBucketAssigner<String>())// 文件滚动(完成一次文件写出)的策略.withRollingPolicy(OnCheckpointRollingPolicy.build()).build();socketTextStream.addSink(sink);env.execute();}
}

sink到kafka

示例

数据写出到Kafka中
使用:FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema()); 来定义一个kafka的sink对象

开发步骤

  • 创建流处理环境
  • 设置并行度
  • 添加自定义MySql数据源
  • 转换元组数据为字符串
  • 构建FlinkKafkaProducer
  • 添加sink
  • 执行任务
package batch.sink;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.codehaus.commons.nullanalysis.Nullable;import java.util.Properties;/*** @author lwh* @date 2023/5/5* @description sink kafka**/
public class KafkaSinkDemo {public static void main(String[] args) throws Exception {// EnvStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.fromElements("Sink to Kafka Test 1","Sink to Kafka Test 2","Sink to Kafka Test 3","Sink to Kafka Test 4","Sink to Kafka Test 5","Sink to Kafka Test 6");// 使用FlinkKafkaProducer来构建一个Kafka的生产者String brokerList = "node1:9092,node2:9092,node3:9092";String topic = "kafkatopic";Properties properties = new Properties();properties.setProperty("bootstrap.servers", brokerList);// 废弃
//        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(topic,                              // topicnew MyKafkaSerializationSchema(),   // 自定义实现kafka的序列化器properties,                         // producer的configFlinkKafkaProducer.Semantic.EXACTLY_ONCE    // kafka的一致性选择);// 构建kafka的sinksource.addSink(kafkaSink);env.execute();}// 自定义构建kafka序列化public static class MyKafkaSerializationSchema implements KafkaSerializationSchema<String> {private String topic = "kafkatopic";@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {return new ProducerRecord<byte[], byte[]>(topic,element.getBytes());}/*ProducerRecord有很多构造, 我们使用的是最基础的, 给定topic并且将数据byte化即可, 即ProducerRecord(String topic, V value)别的构造还会要求传入如:- partition号,int类型, 传入啥写哪个分区, 如果不给定的话, 按照key的hash来计算, 如果没有key的话就按照轮询的方式写入kafka各个分区- key, 数据的key, 用以计算key的hash来计算数据落入哪个分区- timestamp, 给数据一个指定的时间戳, 如果不设置, 默认以当前系统时间上面3个都有默认值, 所以我们不需要设置, 有需要设置可以用其它的重载的构造函数, 如:ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)*/}
}

sink到mysql

示例
加载下列本地集合,导入MySql中

UserInfo(9, "xiaoxiao", "123456", "潇潇")

开发步骤

  • 创建流执行环境
  • 准备数据
  • 添加sink
  • 构建自定义Sink,继承自RichSinkFunction
  • 重写open方法,获取Connection和PreparedStatement
  • 重写invoke方法,执行插入操作
  • 重写close方法,关闭连接操作
  • 执行任务
package batch.sink;import entity.UserInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** @author lwh* @date 2023/5/5* @description sink mysql**/
public class MysqlSinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<UserInfo> source = env.fromElements(new UserInfo(9, "xiaoxiao", "123456", "潇潇"));source.addSink(new MyMySQLSink());env.execute();}public static class MyMySQLSink extends RichSinkFunction<UserInfo> {private Connection connection = null;       // 数据库连接对象private PreparedStatement ps = null;        // ps对象/*invoke就是执行的方法, 类似自定义Source中的run*/@Overridepublic void invoke(UserInfo value, Context context) throws Exception {this.ps.setInt(1, value.getId());this.ps.setString(2, value.getUsername());this.ps.setString(3, value.getPassword());this.ps.setString(4, value.getName());this.ps.execute();}// 实例化的时候执行一次, 适合用来做连接的创建@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);String url = "jdbc:mysql://localhost:3306/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false";Class.forName("com.mysql.jdbc.Driver");this.connection = DriverManager.getConnection(url, "root", "123456");this.ps = connection.prepareStatement("INSERT INTO user VALUES(?,?,?,?);");}// 销毁实例的时候执行一次, 适合用来释放使用的资源@Overridepublic void close() throws Exception {super.close();// 关闭资源if (this.ps != null) this.ps.close();if (this.connection != null) this.connection.close();}}}

sink到redis

通过flink操作redis其实我们可以通过传统的redis连接池Jpoools进行redis的相关操作,但是flink提供了专门操作redis的RedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink如何使用。

RedisSink简介

Redis Sink 提供用于向Redis发送数据的接口的类。接收器可以使用三种不同的方法与不同类型的Redis环境进行通信:

  • 单Redis服务器
  • Redis集群
  • Redis Sentinel

注意:本文主要介绍如何创建与单个redis服务器通信的接收器,其他模式请参考flink官网。https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

如何使用Redis Sink?

Redis Sink 核心类是RedisMapper的一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法,如下所示:
在这里插入图片描述

  • getCommandDescription():设置使用的redis数据结构类型,和key的名词,通过RedisCommand设置数据结构类型
  • String getKeyFromData(T data):设置value中的键值对 key的值
  • String getValueFromData(T data);设置value中的键值对 value的值

使用RedisCommand设置数据结构类型时和redis结构对应关系。

DataTypeRedis Command[Sink]
HASHHSET
LISTRPUSH, LPUSH
SETSADD
PUBSUBPUBLISH
STRINGSET
HYPER_LOG_LOGPFADD
SORTED_SETZADD
SORTED_SETZREM

代码

package batch.sink;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** @author lwh* @date 2023/5/5* @description**/
public class RedisSinkDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// SourceDataStreamSource<String> socketTextStream = env.socketTextStream("node1", 9999);// 将数据转换为Tuple2, 注意, socket 传入的数据需要是kv字符串以空格分隔SingleOutputStreamOperator<Tuple2<String, String>> map = socketTextStream.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String value) throws Exception {String[] strings = value.split(" ");return Tuple2.of(strings[0], strings[1]);}});// 构建Redis confFlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("node1").setPort(6379).build();// 基于自定义的RedisMapper实现来构建RedisSinkmap.addSink(new RedisSink<Tuple2<String, String>>(redisConf, new MyRedisMapper()));env.execute("Redis Sink Demo");}/*** 自定义实现RedisMapper接口*/public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {/*描述要写出的数据类型*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}// 设置key@Overridepublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}// 设置value@Overridepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}}}

http://www.ppmy.cn/news/58984.html

相关文章

BetaFlight统一硬件配置文件研读之serial命令

BetaFlight统一硬件配置文件研读之serial命令 1. 源由2. 代码分析3. 实例分析4. 配置情况5. 参考资料 统一硬件配置文件的设计是一种非常好的设计模式&#xff0c;可以将硬件和软件的工作进行解耦。 1. 源由 cli命令中serial是对UART串口的配置&#xff0c;通常情况下BetaFli…

DNS解析过程

域名解析&#xff08;获取域名对应的IP的过程&#xff09; 域名 维基百科解释&#xff1a; 在互联网上&#xff0c;域名是一个字符串&#xff0c;用于标识行政自治、权威或控制的领域。域名通常用于标识通过互联网提供的服务&#xff0c;例如网站、电子邮件服务等。一般来说&a…

荔枝派Zero(全志V3S)开启 SSH 实现远程连接和文件传输

文章目录 前言一、配置 buildroot二、编译 buildroot三、拷贝到 SD 卡四、测试 ssh1、修改 /etc/ssh/sshd_config 文件2、运行 /usr/sbin/sshd3、使用 SecureCRT 测试4、使用 SecureFx 测试 前言 本文将在 Buildroot 根文件系统开启 ssh 功能。 一、配置 buildroot 1、在 bui…

深入浅出PyTorch:从零开始入门人工智能

一、什么是PyTorch&#xff1f; PyTorch是由Facebook AI研究院开发的Python深度学习框架&#xff0c;是目前最流行的深度学习框架之一。它通过动态计算图的方式实现了神经网络的构建和优化&#xff0c;同时能够高效地利用GPU进行计算加速。PyTorch具有易用、灵活和高效等特点&…

13. Pod 从入门到深入理解(二)

本章讲解知识点 Pod 容器共享 VolumeConfigMapSecretDownward APIEmptyDir VolumeHostPath Volume1. Pod 容器共享 Volume 1.1. Volume 的背景及需要解决的问题 存储是必不可少的,对于服务运行产生的日志、数据,必须有一个地方进行保存,但是我们的容器每一次重启都是“恢复…

初识vue-模板

目录 模板语法 模板插值 指令 条件渲染&#xff08;v-if &#xff1b;v-show&#xff09; 列表循环&#xff08;v-for&#xff09; ref JSX&#xff08;render渲染&#xff09; 条件渲染 列表渲染 八皇后框架-背景格&#xff08;循环&#xff09; 模板语法 Vue.js使用了…

nestJS入门cli 创建项目以及集成swagger和mysql

nestJs 1. 简介 介绍 NestJS NestJS 是一个基于 TypeScript 的渐进式 Node.js 框架&#xff0c;它结合了 OOP、FP 和 FRP 的元素&#xff0c;以提供一种现代且可扩展的开发体验。NestJS 建立在 Express.js 之上&#xff0c;但是提供了更加抽象和模块化的方式来编写应用程序。…

为了做低代码平台,这些年我们对.NET的DataGridView做的那些扩展

我们的低代码开发平台从一开始决定做的时候&#xff0c;就追求未来能够支持多种类型的客户端&#xff0c;目前支持Winform&#xff0c;Web&#xff0c;H5&#xff0c;FlutterAPP&#xff0c;当然了&#xff0c;未来也有可能会随着实际的需要淘汰掉一些客户端的。 为了系统更易…