StructuredStreaming Sink

news/2024/11/27 20:38:17/

StructuredStreaming Sink

Output Modes

  • append

    默认追加模式, 将新的数据输出,只支持简单查询

  • complete

    完整模式,支持聚合和排序

  • update

    更新模式,支持聚合不支持排序,没有聚合和append一样

下面这段操作,有聚合,有排序,只能用complete

val ds = df.as[String]
val wordDs = ds.flatMap(_.split(" "))
val result = wordDs.groupBy('value).count().orderBy('count)

Sink位置

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zcv9N6Yb-1674275476338)(F:\study\大数据\spark\spark3.0\代码-笔记\Spark-day05\笔记\Spark-day05.assets\1610097909181.png)]

Memory sink

输出内存表存储在内存中

支持append和complete

应用于测试环境

// 输出
val query = result.writeStream.format("memory").queryName("result").outputMode("complete").start()while (true) {spark.sql("select * from result").show()Thread.sleep(3000)
}

完整代码

object Sink {def main(args: Array[String]): Unit = {// 创建环境val spark = SparkSession.builder().appName("Operation").master("local[*]").config("spark.sql.shuffle.partitions", "4") // 设置分区数.getOrCreate()val sc = spark.sparkContextimport spark.implicits._// 加载数据val df = spark.readStream.format("socket").option("host", "hadoop102").option("port", 9999).load()df.printSchema()// 处理数据// DSLval ds = df.as[String]val wordDs = ds.flatMap(_.split(" "))val result = wordDs.groupBy('value).count().orderBy('count)// 输出val query = result.writeStream.format("memory").queryName("result").outputMode("complete").start()while (true) {spark.sql("select * from result").show()Thread.sleep(3000)}//    query.awaitTermination()spark.stop()}}

ForeachBatch Sink

ForeachSink可以对输出记录进行任意计算,针对每一条数据

ForeachBatch Sink 针对每一批数据

object ForeachBatchSink {def main(args: Array[String]): Unit = {//TODO 0.创建环境val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]").config("spark.sql.shuffle.partitions", "4").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._//TODO 1.加载数据val df: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()df.printSchema()//TODO 2.处理数据val ds: Dataset[String] = df.as[String]val result: Dataset[Row] = ds.flatMap(_.split(" ")).groupBy('value).count().orderBy('count.desc)//TODO 3.输出结果result.writeStream.foreachBatch((ds: Dataset[Row], batchId:Long) => {//自定义输出到控制台println("-------------")println(s"batchId:${batchId}")println("-------------")ds.show()//自定义输出到MySQLds.coalesce(1).write.mode(SaveMode.Overwrite).format("jdbc").option("url", "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8").option("user", "root").option("password", "root").option("dbtable", "bigdata.words").save()}).outputMode("complete")//TODO 4.启动并等待结束.start().awaitTermination()//TODO 5.关闭资源spark.stop()}}

触发间隔

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cwImfSh6-1674275476340)(F:\study\大数据\spark\spark3.0\代码-笔记\Spark-day05\笔记\Spark-day05.assets\1610099456264.png)]

微批处理的时候,每隔批次都可以做checkpoint

连续处理时需要指定时间做checkpoint

val ds: Dataset[String] = df.as[String]val result: Dataset[Row] = ds.coalesce(1).flatMap(_.split(" ")).groupBy('value).count()result.writeStream.format("console").outputMode("complete")//触发间隔://1.默认的不写就是:尽可能快的运行微批,Default trigger (runs micro-batch as soon as it can)//2.指定0也是尽可能快的运行// .trigger(Trigger.ProcessingTime("0 seconds"))//3.指定时间间隔//.trigger(Trigger.ProcessingTime("5 seconds"))//4.触发1次//.trigger(Trigger.Once())//5.连续处理并指定Checkpoint时间间隔,实验的.trigger(Trigger.Continuous("1 second")).option("checkpointLocation", "./ckp"+System.currentTimeMillis())//TODO 4.启动并等待结束.start().awaitTermination()

http://www.ppmy.cn/news/18622.html

相关文章

【Java寒假打卡】JavaWeb-ServletContext

【Java寒假打卡】JavaWeb-ServletContext概述域对象ServletContext的配置方式ServletContext的常用方法ServletContext共享数据的方法概述 ServletContext是应用上下文对象(应用域对象)。每一个应用中只有一个ServletContext对象作用:可以配…

java spring IOC外部Bean注入

外部Bean注入也是一种Bean操作的属性注入 但这次我们要注入的是一个类对象 我们先创建spring项目 引入基本依赖 然后在src下创建两个包 gettingStarted 和 generate 这个名字可以随便取 但和我同名 可以让你们不会出现 名称不一样导致资源找不到的问题 然后在 gettingStarte…

文件操作详解-IO

目录 1.认识文件 2.文件的类型 3.java对文件的操作 针对文件系统操作 针对文件内容操作 字节流 字符流 字节流的使用 字符流的使用 4.文件IO小程序练习 示例1 示例2 1.认识文件 狭义的文件指的是硬盘上的文件和目录 广义的文件泛指计算机中的很多的软硬件资源,操…

图的最短路径

文章目录最短路径Dijkstra算法邻接表邻接矩阵Bellman-Ford算法邻接表邻接矩阵Floyd-Warshall算法邻接表邻接矩阵源代码邻接表邻接矩阵最短路径 概念 从在带权有向图G中的某一顶点出发, 找出一条通往另一顶点的最短路径, 最短也就是沿路径各边的权值总和达到最小. 分类 单源…

尚硅谷-分布式锁

分布式锁 在应用开发中,特别是web工程开发,通常都是并发编程,不是多进程就是多线程。这种场景下极易出现线程并发性安全问题,此时不得不使用锁来解决问题。在多线程高并发场景下,为了保证资源的线程安全问题&#xff…

Tkinter的Label与Button

Tkinter是Python的一个内置包,主要用于简单的界面设计,使用起来非常方便。 目录 一、创建界面 1. 具体步骤 1.1 导入tkinter包 1.2 tk.Tk()函数:创建一个主界面,并命名为root 1.3 root.title()函数:给root界面设置…

行人属性识别研究综述(二)

文章目录6 PAR(行人属性识别)算法综述6.1全局基于图像的模型6.1.1 ACN (iccvw-2015)6.1.2 DeepSAR and DeepMAR (ACPR-2015) [6]6.1.3 MTCNN (TMM-2015) [7]6.2 基于部件的模型6.2.1 Poselets (ICCV-2011)6.2.2 rad (iccv-2013)6.2.3 PANDA (cvp -2014) …

「链表」简析

前言 前言:研究一个数据结构的时候,首先讲的是增删改查。 文章目录前言一、链表简介1. 含义2. 节点组成3. 存储方式1)数据在内存中的存储方式2)单链表在内存中的存储方式3)双链表在内存中的存储方式4)循环链…