使用Flink Operator部署Flink on k8s方案

embedded/2025/2/21 10:41:58/
1. Flink Operator 简介
  • Flink Operator 是一个 Kubernetes Operator,旨在简化 Flink 在 Kubernetes 上的部署和管理。
  • 它基于 Kubernetes 的 CRD(Custom Resource Definition)机制,通过声明式的方式管理 Flink 集群的生命周期。
  • 支持 Flink 的高可用性(HA)、自动扩缩容、作业提交与管理等功能。

2. 环境准备
  • Kubernetes 集群
    确保 Kubernetes 集群版本为 v1.21 或更高版本,并安装了 Helm(用于快速部署 Flink Operator)。
  • 存储解决方案
    • 配置好持久化存储(如 NFS、Ceph、阿里云 NAS 等),用于存储 Flink 的 checkpoint 和 savepoint 数据。
    • 确保存储路径在所有节点上可访问。
  • 网络配置
    • 确保集群内的网络通信正常,Pod 间可以通过 DNS 或 IP 相互通信。
    • 如果使用 Istio 或其他服务网格,需配置相应的流量规则。

3. 部署 Flink Operator
  • 安装 Helm
    如果尚未安装 Helm,请先按照官方文档安装 Helm 工具。
  • 添加 Flink Operator 仓库
    helm repo add flink-operator https://flink-operator.github.io/flink-operator/
    helm repo update 
    
  • 安装 Flink Operator
    helm install flink-operator flink-operator/flink-operator --namespace flink-system --create-namespace 
    
  • 验证安装
    检查 Flink Operator 是否正常运行:
    kubectl get pods -n flink-system 
    

4. 部署 Flink 集群
  • 创建 Flink Cluster 配置文件
    编写一个 YAML 文件(如 flink-cluster.yaml),定义 Flink 集群的规格:
    apiVersion: flink.apache.org/v1beta1 
    kind: FlinkCluster 
    metadata:name: example-flink-cluster namespace: flink 
    spec:image:name: flink:1.17.0 jobManager:replicas: 1 resources:requests:cpu: "1"memory: "2Gi"taskManager:replicas: 3 resources:requests:cpu: "1"memory: "2Gi"checkpointing:interval: 60000 storageDir: s3a://flink-checkpoints/stateBackend:type: rocksdb storageDir: s3a://flink-state/highAvailability:mode: zookeeper zkQuorum: "zookeeper.default.svc.cluster.local:2181"podTemplateFile: "pod-template.yaml"
    
  • 创建命名空间
    kubectl create namespace flink 
    
  • 应用配置文件
    kubectl apply -f flink-cluster.yaml -n flink 
    
  • 验证集群状态
    检查 Flink 集群是否正常运行:
    kubectl get flinkclusters -n flink 
    kubectl describe flinkclusters example-flink-cluster -n flink 
    

5. 提交 Flink 作业
  • 使用 Flink Application CRD
    创建一个 Flink Application 的 YAML 文件(如 flink-app.yaml):
    apiVersion: flink.apache.org/v1beta1 
    kind: FlinkApplication 
    metadata:name: example-flink-app namespace: flink 
    spec:clusterName: example-flink-cluster jarURI: "s3a://flink-jars/example.jar"arguments:- "--input-topic"- "my-topic"- "--output-topic"- "my-output-topic"parallelism: 3 entryClass: com.example.FlinkJob 
    
    提交作业:
    kubectl apply -f flink-app.yaml -n flink 
    
  • 通过 Web UI 提交作业
    访问 Flink 的 Web UI(JobManager 的服务地址),手动上传 JAR 文件并提交作业。
  • 使用 CLI 提交作业
    如果需要通过命令行提交作业,可以使用以下命令:
    kubectl exec -n flink -it $(kubectl get pod -n flink | grep jobmanager | awk '{print $1}') -- /opt/flink/bin/flink run -c com.example.FlinkJob s3a://flink-jars/example.jar --input-topic my-topic --output-topic my-output-topic 
    

6. 监控与维护
  • 监控 Flink 集群
    使用 Prometheus 和 Grafana 监控 Flink 集群的性能指标:
    # 配置 Prometheus 抓取 Flink 指标 
    kubectl apply -f https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/deploy/prometheus/flink-monitoring-stack.yaml 
    
    访问 Grafana UI 并导入 Flink 的仪表盘模板。
  • 查看作业状态
    使用以下命令查看 Flink 作业的状态:
    kubectl get flinkapplications -n flink 
    kubectl describe flinkapplications example-flink-app -n flink 
    
  • 日志排查
    查看 Flink 作业的日志:
    kubectl logs -n flink -l app=flink,component=jobmanager 
    

7. 高可用性与容灾
  • 配置高可用性
    flink-cluster.yaml 中启用高可用性:
    highAvailability:mode: zookeeper zkQuorum: "zookeeper.default.svc.cluster.local:2181"storageDir: s3a://flink-ha/
    
  • 自动恢复
    Flink Operator 支持自动恢复失败的作业,确保在 Pod 重启或节点故障时作业能够快速恢复。
  • 备份与恢复
    定期备份 Flink 的 checkpoint 和 savepoint 数据,并存储到可靠的存储系统中(如 S3、HDFS)。

8. 扩展与优化
  • 水平扩展
    动态调整 TaskManager 的副本数:
    kubectl scale flinkclusters example-flink-cluster --replicas=5 -n flink 
    
  • 垂直扩展
    修改 TaskManager 的资源配额:
    taskManager:resources:requests:cpu: "2"memory: "4Gi"
    
  • 混合云部署
    将 Flink 集群部署在多云环境中,利用 Kubernetes 的 Federation 功能实现跨云负载均衡。

9. 总结

通过 Flink Operator 在 Kubernetes 上部署 Flink,可以显著简化 Flink 的运维工作,并充分利用 Kubernetes 的弹性伸缩和高可用性特性。以下是完整的部署流程总结:

  1. 安装并配置 Kubernetes 集群。
  2. 安装 Flink Operator。
  3. 创建 Flink 集群配置文件并部署。
  4. 提交 Flink 作业并通过 Web UI 或 CLI 管理。
  5. 使用 Prometheus 和 Grafana 监控集群状态。
  6. 配置高可用性和自动恢复功能。
  7. 根据业务需求动态调整资源。

通过以上步骤,可以高效地在 Kubernetes 上运行和管理 Flink 流处理应用。


http://www.ppmy.cn/embedded/164041.html

相关文章

A000目录

名称 说明文档 【001】基于SpringBootVue实现的小区物业管理系统 点击查看>> 【002】基于SpringBootthymeleaf实现的蓝天幼儿园管理系统 点击查看>> 【003】基于SpringBootVue实现的社团管理系统 点击查看>> 【004】基于SSM开发的社区论坛系统 点击查看>…

Docker Mysql 数据迁移

查看启动命令目录映射 查看容器名称 docker ps查看容器的启动命令 docker inspect mysql8.0 |grep CreateCommand -A 20如下图所示:我这边是把/var/lib/mysql 目录映射到我宿主机的/mnt/mysql/data目录下,而且我的数量比较大使用方法1的话时间比较久,所以我采用方法2 如果没…

5G-A的尔滨故事,冰雪下的科技春潮

刚刚结束的第九届亚冬会中,黑科技5G-A达成了刷屏级的效果。这也是5G-A首次大规模服务于国际大型体育赛事。 一场冰雪盛会之后,5G-A向何处去?这个黑科技的能力,将如何投放给大众消费者和企业?这是值得我们进一步思考的话…

如何选择适合自己需求的相位噪声分析仪和VCO测试仪?

选择适合自己需求的相位噪声分析仪和 VCO 测试仪,需要综合考虑多个因素,以下是一些要点: VCO测试仪-相噪分析仪-相位噪声分析仪-安铂克科技(上海)有限公司安铂克科技(上海)有限公司提供VCO测试仪,相噪分析仪,相位噪声分析仪产品,APPH系列是一…

C++17 中的 std::to_chars 和 std::from_chars:高效且安全的字符串转换工具

文章目录 1. 传统转换方法的局限性2. std::to_chars:数值到字符串的高效转换函数原型:返回值:示例代码:输出: 3. std::from_chars:字符串到数值的高效解析函数原型:返回值:示例代码&…

应急响应(linux 篇,以centos 7为例)

一、基础命令 1.查看已经登录的用户w 2.查看所有用户最近一次登录:lastlog 3.查看历史上登录的用户还有登录失败的用户 历史上所有登录成功的记录 last /var/log/wtmp 历史上所有登录失败的记录 Lastb /var/log/btmp 4.SSH登录日志 查看所有日志:…

10分钟上手DeepSeek开发:SpringBoot + Vue2快速构建AI对话系统

作者:后端小肥肠 目录 1. 前言 为什么选择DeepSeek? 本文技术栈 2. 环境准备 2.1. 后端项目初始化 2.2. 前端项目初始化 3. 后端服务开发 3.1. 配置文件 3.2. 核心服务实现 4. 前端服务开发 4.1. 聊天组件ChatWindow.vue开发 5. 效果展示及源…

如何利用AI制作PPT,轻松实现高效演示

如何利用AI制作PPT,轻松实现高效演示!在这个信息爆炸的时代,PPT已经成为了日常工作和学习中不可或缺的工具。每当我们需要汇报、展示或总结时,PPT几乎成了“必杀技”。然而制作一份精彩的PPT往往需要花费大量的时间和精力。随着人…