Flink流批一体计算(11):PyFlink Tabel API之TableEnvironment

news/2024/10/31 3:27:35/

目录

概述

设置重启策略

什么是flink的重启策略(Restartstrategy)

flink的重启策略(Restartstrategy)实战

flink的4种重启策略

FixedDelayRestartstrategy(固定延时重启策略)

FailureRateRestartstrategy(故障率重启策略)

NoRestartstrategy(不重启策略)

配置State Backends 以及 Checkpointing

Checkpoint

启用和配置

选择 State backend

MemoryStateBackend

FsStateBackend

RocksDBStateBackend

State backend比较

概述

编写 Flink Python Table API 程序的第一步是创建 TableEnvironment。这是 Python Table API 作业的入口类。

get_config()返回 table config,可以通过 table config 来定义 Table API 的运行时行为。

t_env = TableEnvironment.create(Environmentsettings.in_streaming_mode())
t_env.get_config().get_configuration().set_string("parallelism.default", "1")

设置重启策略

在TableConfig中,通过设置键值选项来配置它们。

什么是flink的重启策略(Restartstrategy)

Restartstrategy,重启策略,在遇到机器或者代码等不可预知的问题时导致 Job 或者 Task 挂掉的时候,它会根据配置的重启策略将 Job 或者受影响的 Task 拉起来重新执行,以使得作业恢复到之前正常执行状态。Flink 中的重启策略决定了是否要重启 Job 或者 Task,以及重启的次数和每次重启的时间间隔。

flink的重启策略(Restartstrategy)实战

flink的 Restartstrategy 作用是提升任务健壮性和容错性,保证任务可以实时产出数据。

设置重启策略和公司处理数据业务需求有很大的关系,根据不同的业务需求设置处理任务的不同策略。

其实遇到上面这种问题比较常见,比如有时候因为数据的问题(不合规范、为 null 等),这时在处理这些脏数据的时候可能就会遇到各种各样的异常错误,比如空指针、数组越界、数据类型转换错误等。

可能你会说只要过滤掉这种脏数据就行了,或者进行异常捕获就不会导致 Job 不断重启的问题了。

所以日常开发中我们要尽力的保证代码的健壮性,但是也要配置好 Flink Job 的 Restartstrategy(重启策略)。

flink的4种重启策略

默认的重启策略是通过Flink的flink-conf.yaml来指定的,这个配置参数restart-strategy定义了哪种策略会被采用。

如果checkpoint未启动,就会采用no restart策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用fixed-delay策略,重试Integer.MAX_VALUE次。

配置参数 restart-strategy 定义了哪个策略被使用。

固定间隔 (Fixed delay)

失败率 (Failure rate)

无重启 (No restart)

FixedDelayRestartstrategy(固定延时重启策略)

FixedDelayRestartstrategy是固定延迟重启策略,程序按照集群配置文件中或者程序中额外设置的重启次数尝试重启作业,如果尝试次数超过了给定的最大次数,程序还没有起来,则停止作业,另外还可以配置连续两次重启之间的等待时间。

在 flink-conf.yaml 中可以像下面这样配置:

restart-strategy: fixed-delay

#表示作业重启的最大次数,启用 checkpoint 的话是 Integer.MAX_VALUE,否则是 1

restart-strategy.fixed-delay.attempts: 3

#如果设置分钟可以类似 1 min,该参数表示两次重启之间的时间间隔,当程序与外部系统有连接交互时延迟重启可能会有帮助,启用 checkpoint 的话,延迟重启的时间是 10 秒,否则使用 akka.ask.timeout 的值。

restart-strategy.fixed-delay.delay: 10 s

python程序设置重启策略为 "fixed-delay":

table_env.get_config().get_configuration().set_string("restart-strategy", "fixed-delay")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.attempts", "3")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.delay", "30s")

FailureRateRestartstrategy(故障率重启策略)

FailureRateRestartstrategy 是故障率重启策略,在发生故障之后重启作业,如果固定时间间隔之内发生故障的次数超过设置的值后,作业就会失败停止,该重启策略也支持设置连续两次重启之间的等待时间。

在 flink-conf.yaml 中可以像下面这样配置:

restart-strategy: failure-rate

restart-strategy.failure-rate.max-failures-per-interval:

#固定时间间隔内允许的最大重启次数,默认 1

restart-strategy.failure-rate.failure-rate-interval: 5 min 

#固定时间间隔,默认 1 分钟

restart-strategy.failure-rate.delay: 10 s

#连续两次重启尝试之间的延迟时间,默认是 akka.ask.timeout

python程序设置重启策略为 "fixed-delay":

table_env.get_config().get_configuration().set_string("restart-strategy", "failure-rate")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.delay", "1s")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.failure-rate-interval", "1 min")
table_env.get_config().get_configuration().set_string("restart-strategy.failure-rate.max-failures-per-interval", "1")

NoRestartstrategy(不重启策略)

NoRestartstrategy 作业不重启策略,直接失败停止,在 flink-conf.yaml 中配置如下:

restart-strategy: none

配置State Backends 以及 Checkpointing

Checkpoint

Flink为了使 State 容错,需要有 State checkpoint(状态检查点)。

Checkpoint 允许 Flink 恢复流的 State 和处理位置,从而为程序提供与无故障执行相同的语义。

Checkpoint 使用的先决条件:

一个持久化的,能够在一定时间范围内重放记录的数据源。

例如,持久化消息队列:Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub 或文件系统:HDFS,S3,GFS,NFS,Ceph...

State 持久化存储系统,通常是分布式文件系统:HDFS,S3,GFS,NFS,Ceph...

启用和配置

Checkpoint 默认情况下是不启用的。StreamExecutionEnvironment 对象调用 enableCheckpointing(n) 启用 Checkpoint,其中n是以毫秒为单位的 Checkpoint 间隔。

Checkpoint 的配置项包括:

#设置 checkpoint 模式为 EXACTLY_ONCE

#Checkpoint 支持这两种模式:恰好一次exactly-once或至少一次at-least-once

#对于大多数应用来说,EXACTLY_ONCE是优选的。at-least-once可能在某些要求超低延迟(几毫秒)的应用程序使用。

table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")

#Checkpoint最小间隔时间,设置为5000,表示在上一个 checkpoint 完成后的至少5秒后才会启动下一个 checkpoint

table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "3min")

#Checkpoint 超时时间:在超时时间内 checkpoint 未完成,则中止正在进行的 checkpoint

table_env.get_config().get_configuration().set_string("execution.checkpointing.timeout", "10min")

#Checkpoint并发数:最多可以同时运行checkpoint的数量,当达到最大值,必须其中一个完成,才能新启一个。

table_env.get_config().get_configuration().set_string("execution.checkpointing.max-concurrent-checkpoints", "2")

选择 State backend

Checkpoint 的存储的位置取决于配置的 State backend(JobManager 内存,文件系统,数据库...)。

默认情况下,State 存储在 TaskManager 内存中,Checkpoint 存储在 JobManager 内存中。

Flink 自带了以下几种开箱即用的 state backend:

MemoryStateBackend

FsStateBackend

RocksDBStateBackend

在没有配置的情况下,系统默认使用 MemoryStateBackend

MemoryStateBackend

使用 MemoryStateBackend,在 checkpoint 中对 State 做一次快照,并在向 JobManager 发送 checkpoint 确认完成的消息中带上此快照数据,然后快照就会存储在 JobManager 的内存堆中。

FsStateBackend

FsStateBackend 需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

FsStateBackend 在 TaskManager 的内存中持有正在处理的数据。

Checkpoint 时将 state snapshot 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

FsStateBackend 默认是异步操作,以避免在写 state snapshot 时阻塞处理程序。如果要禁用异步,可以在 FsStateBackend 构造函数中设置

RocksDBStateBackend

RocksDBStateBackend 需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

RocksDBStateBackend 在 RocksDB 中持有正在处理的数据,RocksDB 在 TaskManager 的数据目录下。

Checkpoint 时将整个 RocksDB 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

RocksDBStateBackend 通常也是异步的。目前唯一支持增量 checkpoint。

使用 RocksDB 可以保存的状态量仅受可用磁盘空间量的限制。这也意味着可以实现的最大吞吐量更低,后台的所有读/写都必须通过序列化和反序列化来检索/存储 State,这也比使用基于堆内存的方式代价更昂贵。

State backend比较

StateBackend

in-flight

checkpoint

吞吐

推荐使用场景

MemoryStateBackend 

TM Memory

JM Memory

调试、无状态或对数据丢失或重复无要求

FsStateBackend     

TM Memory

FS/HDFS

普通状态、窗口、KV 结构

RocksDBStateBackend

RocksDB on TM

FS/HDFS

超大状态、超长窗口、大型 KV 结构

# 设置 statebackend 类型为 "rocksdb",其他可选项有 "filesystem" "jobmanager"

# 你也可以将这个属性设置为 StateBackendFactory 的完整类名

# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

table_env.get_config().get_configuration().set_string("state.backend", "rocksdb")

# 设置 RocksDB statebackend 所需要的 checkpoint 目录

table_env.get_config().get_configuration().set_string("state.checkpoints.dir", "file:///tmp/checkpoints/")


http://www.ppmy.cn/news/741971.html

相关文章

怎样取消连续包月自动续费_大会员怎么取消自动续费

大家好,我是时间财富网智能客服时间君,上述问题将由我为大家进行解答。 以哔哩哔哩为例,取消大会员自动续费的方法是: 1、首先打开并登录“哔哩哔哩”APP。 2、然后在哔哩哔哩主界面点击左上角的“用户名头像”并进入。 3、接着在…

怎么关闭苹果手机自动扣费_优酷自动续费怎么关闭

优酷自动续费怎么关闭?历经几个月我终于关掉了,真的是普天同庆不容易。还没关掉的,就按照我圈起来的去操作,我收到短信提示关闭成功,下个月就不会自动扣钱啦!对了我是苹果手机

苹果手机怎么取消优酷自动续费_优酷:优酷公众号怎么取消自动续费会员

优酷会员充值中心,优酷公众号,优酷vip会员,优酷会员,优酷VIP会员,优酷会员卡免费领取,优酷会员充值,优酷视频vip,优酷会员免费领取,优酷会员充值中心,优酷会员…

怎么取消苹果手机自动续费_手机上优酷会员怎么取消自动续费

我们经常会在手机上下载优酷观看一些视频,但是优酷很多视频是需要会员功能才能看的,因此很多小伙伴都会购买会员,但是购买会员之后,默认是开通自动续费的服务的,如果不想自动续费,避免扣费造成影响&#xf…

平台活动免费送,免费领取1个月优酷/爱奇艺/腾讯视频会员

免费领取1个月优酷/爱奇艺/腾讯视频会员,视频会员免费撸活动。 1、手机应用商店搜索下载“喵惠”APP; 2、喵惠APP上点击进入“淘特1分购”,完成1分购包邮商品下单; 3、1分购包邮商品确认收货后联系客服领取1个月视频会员充值到…

怎样取消连续包月自动续费_手机爱奇艺会员怎么取消自动续费 VIP关闭解除自动续费方法...

爱奇艺会员办理提供了自动续费的功能,意思是会员快到期的话,就会自动付费继续开通,这样的操作给了一部分用户带来了便利,无需在刻意留意到期时间进行续费,不过也有很多小伙伴觉得不好,因为可能到期就不用了…

哔哩哔哩会员如何取消自动续费?

哩哔哩怎么取消会员自动续费?开通会员之后,不少用户会选择自动续费功能,不仅价格上更加优惠而且也十分方便,如果不想用了,可以直接取消。 纯净之家-win7纯净版系统_win7 ghost 纯净版 步骤如下: 1、首先进…

经验分享:爱奇艺关闭自动续费会员的功能

分享一个和技术开发无关的经验,但是对爱奇艺会员续费不熟悉的话,这个经验还是很有作用的,尤其是自动续费这个功能,爱奇艺做的很好,很隐蔽,不刻意找的话基本找不到怎么取消自动续费会员的功能。那么,下面就来介绍一下怎么取消爱奇艺自动续费会员的功能,包括手机端和电脑…