StructuredStreaming Sink
Output Modes
-
append
默认追加模式, 将新的数据输出,只支持简单查询
-
complete
完整模式,支持聚合和排序
-
update
更新模式,支持聚合不支持排序,没有聚合和append一样
下面这段操作,有聚合,有排序,只能用complete
val ds = df.as[String]
val wordDs = ds.flatMap(_.split(" "))
val result = wordDs.groupBy('value).count().orderBy('count)
Sink位置
Memory sink
输出内存表存储在内存中
支持append和complete
应用于测试环境
// 输出
val query = result.writeStream.format("memory").queryName("result").outputMode("complete").start()while (true) {spark.sql("select * from result").show()Thread.sleep(3000)
}
完整代码
object Sink {def main(args: Array[String]): Unit = {// 创建环境val spark = SparkSession.builder().appName("Operation").master("local[*]").config("spark.sql.shuffle.partitions", "4") // 设置分区数.getOrCreate()val sc = spark.sparkContextimport spark.implicits._// 加载数据val df = spark.readStream.format("socket").option("host", "hadoop102").option("port", 9999).load()df.printSchema()// 处理数据// DSLval ds = df.as[String]val wordDs = ds.flatMap(_.split(" "))val result = wordDs.groupBy('value).count().orderBy('count)// 输出val query = result.writeStream.format("memory").queryName("result").outputMode("complete").start()while (true) {spark.sql("select * from result").show()Thread.sleep(3000)}// query.awaitTermination()spark.stop()}}
ForeachBatch Sink
ForeachSink可以对输出记录进行任意计算,针对每一条数据
ForeachBatch Sink 针对每一批数据
object ForeachBatchSink {def main(args: Array[String]): Unit = {//TODO 0.创建环境val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]").config("spark.sql.shuffle.partitions", "4").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._//TODO 1.加载数据val df: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()df.printSchema()//TODO 2.处理数据val ds: Dataset[String] = df.as[String]val result: Dataset[Row] = ds.flatMap(_.split(" ")).groupBy('value).count().orderBy('count.desc)//TODO 3.输出结果result.writeStream.foreachBatch((ds: Dataset[Row], batchId:Long) => {//自定义输出到控制台println("-------------")println(s"batchId:${batchId}")println("-------------")ds.show()//自定义输出到MySQLds.coalesce(1).write.mode(SaveMode.Overwrite).format("jdbc").option("url", "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8").option("user", "root").option("password", "root").option("dbtable", "bigdata.words").save()}).outputMode("complete")//TODO 4.启动并等待结束.start().awaitTermination()//TODO 5.关闭资源spark.stop()}}
触发间隔
微批处理的时候,每隔批次都可以做checkpoint
连续处理时需要指定时间做checkpoint
val ds: Dataset[String] = df.as[String]val result: Dataset[Row] = ds.coalesce(1).flatMap(_.split(" ")).groupBy('value).count()result.writeStream.format("console").outputMode("complete")//触发间隔://1.默认的不写就是:尽可能快的运行微批,Default trigger (runs micro-batch as soon as it can)//2.指定0也是尽可能快的运行// .trigger(Trigger.ProcessingTime("0 seconds"))//3.指定时间间隔//.trigger(Trigger.ProcessingTime("5 seconds"))//4.触发1次//.trigger(Trigger.Once())//5.连续处理并指定Checkpoint时间间隔,实验的.trigger(Trigger.Continuous("1 second")).option("checkpointLocation", "./ckp"+System.currentTimeMillis())//TODO 4.启动并等待结束.start().awaitTermination()