案例1.spark和flink分别实现作业配置动态更新案例

news/2025/2/11 5:45:31/

目录

 目录

一、背景

二、解决

spark%20broadcast%E5%B9%BF%E6%92%AD%E5%8F%98%E9%87%8F-toc" name="tableOfContents" style="margin-left:40px">1.方法1:spark broadcast广播变量

a. 思路

b. 案例

① 需求

② 数据

③ 代码

flink%20RichSourceFunction-toc" name="tableOfContents" style="margin-left:40px">2.方法2:flink RichSourceFunction

a. 思路

b. 案例

① 需求

② 数据

③ 代码

④ 测试验证

测试1

测试2

测试3


一、背景

         在实时作业(如 Spark Streaming、Flink 等流处理作业)中,通过外部配置管理系统动态修改配置,有以下优点:

         1. 无需重启作业,实现配置热更新
好处:实时作业通常需要长时间运行,重启会导致数据丢失或处理延迟。通过外部配置动态更新,可以在作业运行时修改配置(如并行度、窗口大小、超时时间等),而无需重启作业。

         2. 灵活应对业务需求变化
好处:实时作业通常需要快速响应业务需求的变化(如规则调整、参数优化等)。通过外部配置管理,可以快速更新业务逻辑或参数,而无需重新部署代码。

        3. 集中化管理配置
好处:将配置集中存储在外部的配置管理系统(如 ZooKeeper、Consul、Nacos、数据库等),便于统一管理和维护。多个作业可以共享同一份配置,避免配置分散和重复。

        通常初始化sparkConf 作为 Spark 应用程序的配置对象,在运行时设置配置参数,但是大多数配置在 SparkContext 初始化后无法更改了。

        如果想让程序在运行时做参数的动态调整,以下有两种思路可供参考,通过代码案例可以更深一步理解。

二、解决

spark%20broadcast%E5%B9%BF%E6%92%AD%E5%8F%98%E9%87%8F" name="1.%E6%96%B9%E6%B3%951%EF%BC%9Aspark%20broadcast%E5%B9%BF%E6%92%AD%E5%8F%98%E9%87%8F">1.方法1:spark broadcast广播变量

a. 思路

        可以将数据字典或者规则放入文件当中,spark读取加载,并将这些字典配置广播发送到各个节点。

b. 案例

① 需求

        根据ip地址查找其所属,实现根据ip对应所属的规则,将该规则广播出去,为数据中的ip附加所属。将ip与所属地址的规则broadcast广播到各个executor中。


② 数据

access.log
样例:字符分隔\t 分别是 时间\tip\t网址

20161127172603 12.197.80.128 http://scala.bjut.com.cn/scala/course/8.html
20161127172605 128.44.80.128 http://java.bjut.com.cn/java/course/31.html
20161127172605 12.18.80.188 http://java.bjut.com.cn/java/course/22.html
20161127172610 197.160.85.168 http://java.bjut.com.cn/java/course/18.html

实现根据ip对应所属的规则,将该规则广播出去,为数据中的ip附加所属。实现代码如下:

③ 代码

IpLocation.scala

package com.spark.demo.broadcastimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext/**
 * 根据ip地址查找其所属
 * 将ip与所属地址的规则broadcast广播到各个executor中
 */
object IpLocation {    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("ip location").setMaster("local[2]")
        val sc = new SparkContext(conf)        val ipRulesRdd = sc.textFile("src\\com\\spark\\demo\\broadcast\\ip-by-country.csv").map { line => 
            val fields = line.split(",")
            val startIpNum = fields(2)
            val endIpNum = fields(3)
            val cityName = fields(5)
            (startIpNum, endIpNum, cityName)
        }
        //全部的ip映射规则
        val ipRulesArray = ipRulesRdd.collect //将数据收集到driver,为后面广播做准备。该变量值在driver中有,worker中不存在
        val bIpRules = sc.broadcast(ipRulesArray)        //加载要处理的数据
        val ipsRdd = sc.textFile("src\\com\\spark\\demo\\broadcast\\access.log").map { line =>  
            val fields = line.split("\t")
            val ip = fields(1)
            ip
        }        val result = ipsRdd.map { ip => 
            val ipNum = IpUtil.ip2Long(ip)
            val info = IpUtil.searchContentByKey(bIpRules.value, ipNum)
            val countryName = info.split(",")(2)
            (ip, countryName)
        }        result.collect().foreach(println)
        sc.stop()
    }}

IpUtil.scala

package com.spark.demo.broadcastimport scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMapobject IpUtil {
  /*
     * 将ip地址转换成数字long
     */
    def ip2Long(ip:String):Long = {
        val fragments = ip.split("[.]") 
        var ipNum = 0L
        for(i <- 0 until fragments.length) {
            ipNum = fragments(i).toLong | ipNum << 8L
        }
        ipNum
    }    def ip2Long2(ip:String) = {
        val fragments = ip.split("[.]")
        val ipNum = 16777216L*fragments(0).toLong+65536L*fragments(1).toLong+
                256L*fragments(2).toLong + fragments(3).toLong
        ipNum
    }
    /**
     * 将数字转换成ip地址
     */
    def long2Ip(ipNum:Long) = {
        val mask = List(0x000000FF,0x0000FF00,0x00FF0000,0xFF000000)
        val ipInfo = new StringBuffer
        var num = 0L
        for(i <- 0 until 4) {
            num = (ipNum & mask(i)) >> (i*8)
            if(i>0) ipInfo.insert(0, ".")
    

http://www.ppmy.cn/news/1571081.html

相关文章

Http和Socks的区别?

HTTP 和 SOCKS 的区别 HTTP 和 SOCKS 都是用于网络通信的协议&#xff0c;但它们在工作原理、应用场景和实现方式上有显著的区别。以下是详细的对比和说明。 一、HTTP 协议 1. 定义 HTTP&#xff08;HyperText Transfer Protocol&#xff09;是用于传输超文本数据的应用层协…

【C++】解锁<list>的正确姿势

> &#x1f343; 本系列为初阶C的内容&#xff0c;如果感兴趣&#xff0c;欢迎订阅&#x1f6a9; > &#x1f38a;个人主页:[小编的个人主页])小编的个人主页 > &#x1f380; &#x1f389;欢迎大家点赞&#x1f44d;收藏⭐文章 > ✌️ &#x1f91e; &#x1…

ZoneMinder index.php SQL注入漏洞复现(附脚本)(CVE-2024-43360)

免责申明: 本文所描述的漏洞及其复现步骤仅供网络安全研究与教育目的使用。任何人不得将本文提供的信息用于非法目的或未经授权的系统测试。作者不对任何由于使用本文信息而导致的直接或间接损害承担责任。如涉及侵权,请及时与我们联系,我们将尽快处理并删除相关内容。 0x0…

Docker容器访问外网:启动时的网络参数配置指南

在启动Docker镜像时,可以通过设置网络参数来确保容器能够访问外网。以下是几种常见的方法: 1. 使用默认的bridge网络 Docker的默认网络模式是bridge,它会创建一个虚拟网桥,将容器连接到宿主机的网络上。在大多数情况下,使用默认的bridge网络配置即可使容器访问外网。 启动…

Log4j定制JSON格式日志输出

1.前言 log4j是Java中一个强大的日志记录框架&#xff0c;通过简单的配置便可以在程序中进行日志打印与记录。关于log4j博主最近碰到一个需求&#xff0c;需要将程序运行过程中的日志按给定的json模板输出&#xff0c;本文记录一下log4j如何配置json格式的日志打印。 2.日志配…

【AI】在Ubuntu中使用docker对DeepSeek的部署与使用

这篇文章前言是我基于部署好的deepseek-r1:8b模型跑出来的 关于部署DeepSeek的前言与介绍 在当今快速发展的技术环境中&#xff0c;有效地利用机器学习工具来解决问题变得越来越重要。今天&#xff0c;我将引入一个名为DeepSeek 的工具&#xff0c;它作为一种强大的搜索引擎&a…

B+树原理详解及C语言实现

目录 B树的原理 B树的操作过程&#xff08;图形化演示&#xff09; B树的应用场景 B树与B树的对比 C语言实现及应用实例 文件结构 总结 B树的原理 B树是B树的一种变体&#xff0c;广泛应用于数据库和文件系统中。其原理和特点如下&#xff1a; 数据结构&#xff1a;B树…

练习题(2.10)

问题描述 有一个 SNS 被 NN 个用户使用&#xff0c;他们的编号从 11 到 NN。 在这个 SNS 中&#xff0c;两个用户可以成为朋友。 友谊是双向的&#xff1b;如果用户 X 是用户 Y 的朋友&#xff0c;那么用户 Y 也一定是用户 X 的朋友。 目前&#xff0c;在 SNS 上有 MM 对朋…