【大数据学习 | Spark-SQL】SparkSQL读写数据

ops/2024/11/29 8:27:32/

我们使用sparksql进行编程,编程的过程我们需要创建dataframe对象,这个对象的创建方式我们是先创建RDD然后再转换rdd变成为DataFrame对象。

但是sparksql给大家提供了多种便捷读取数据的方式。

//原始读取数据方式
sc.textFile().toRDD
sqlSc.createDataFrame(rdd,schema)
//更便捷的使用方式
sqlSc.read.text|orc|parquet|jdbc|csv|json
df.write.text|orc|parquet|jdbc|csv|json

write写出存储数据的时候也是文件夹的,而且文件夹不能存在。

  • csv是一个介于文本和excel之间的一种格式,如果是文本打开用逗号分隔的。
  • text文本普通文本,但是这个文本必须只能保存一列内容。

以上两个文本都是只有内容的,没有列的。

  • json是一种字符串结构,本质就是字符串,但是存在kv,例子 {"name":"zhangsan","age":20}

多平台解析方便,带有格式信息。

  • orc格式一个列式存储格式,hive专有的。
  • parquet列式存储,顶级项目

以上都是列式存储问题,优点(1.列式存储,检索效率高,防止冗余查询 2.带有汇总信息,查询特别快 3.带有轻量级索引,可以跳过大部分数据进行检索),他们都是二进制文件,带有格式信息。

jdbc 方式,它是一种协议,只要符合jdbc规范的服务都可以连接,mysql,oracle,hive,sparksql

整体代码:

package com.hainiu.sparkimport org.apache.spark.sql.SQLContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.{SparkConf, SparkContext}import java.util.Propertiesobject TestMovieWithSql {def main(args: Array[String]): Unit = {//??movie???//1.id  middle=name  last=typeval conf = new SparkConf()conf.setAppName("movie")conf.setMaster("local[*]")conf.set("spark.shuffle.partitions","20")val sc = new SparkContext(conf)val sqlSc = new SQLContext(sc)import sqlSc.implicits._//deal dataval df = sc.textFile("data/movies.txt").flatMap(t => {val strs = t.split(",")val mid = strs(0)val types = strs.reverse.headval name = strs.tail.reverse.tail.reverse.mkString(" ")types.split("\\|").map((mid, name, _))}).toDF("mid", "mname", "type")df.limit(1).show()val df1 = sc.textFile("data/ratings.txt").map(t=>{val strs = t.split(",")(strs(0),strs(1),strs(2).toDouble)}).toDF("userid","mid","score")df1.limit(1).show()import org.apache.spark.sql.functions._val df11 = df.join(df1, "mid").groupBy("userid", "type").agg(count("userid").as("cnt")).withColumn("rn", row_number().over(Window.partitionBy("userid").orderBy($"cnt".desc))).where("rn = 1").select("userid", "type")val df22 = df.join(df1, "mid").groupBy("type", "mname").agg(avg("score").as("avg")).withColumn("rn", row_number().over(Window.partitionBy("type").orderBy($"avg".desc))).where("rn<4").select("type", "mname")val df33 = df11.join(df22, "type")//spark3.1.2?? spark2.x//    df33.write.csv()df33.write.format("csv").save("data/csv")//    df33.write.
//      csv("data/csv")
//    df33.write.json("data/json")//    df33.write.parquet("data/parquet")
//    df33.write.orc("data/orc")
//    val pro = new Properties()
//    pro.put("user","root")
//    pro.put("password","hainiu")
//    df33.write.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro)}
}

为了简化存储的计算方式:

package com.hainiu.sparkimport org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}object TestSink {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("test sink")conf.setMaster("local[*]")val sc = new SparkContext(conf)val sqlSc = new SQLContext(sc)import sqlSc.implicits._import org.apache.spark.sql.functions._val df = sc.textFile("data/a.txt").map(t=>{val strs = t.split(" ")(strs(0),strs(1),strs(2),strs(3))}).toDF("id","name","age","gender").withColumn("all",concat_ws(" ",$"id",$"name",$"age",$"gender")).select("all")
//    df.write.csv("data/csv")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2")
//      .save("data/csv")
//    df.write.parquet("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2")
//      .save("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2")
//      .save("data/json")df.write.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2").save("data/text")}
}

读取数据代码:

package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContextimport java.util.Propertiesobject TestReadData {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("movie")conf.setMaster("local[*]")conf.set("spark.shuffle.partitions", "20")val sc = new SparkContext(conf)val sqlSc = new SQLContext(sc)
//    sqlSc.read.text("data/text").show()
//    sqlSc.read.csv("data/csv").show()
//  
//    sqlSc.read.parquet("data/parquet").show()
//    sqlSc.read.json("data/json").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2").load("data/text").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2").load("data/csv").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2").load("data/json").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2").load("data/parquet").show()sqlSc.read.orc("data/orc").show()val pro = new Properties()pro.put("user","root")pro.put("password","hainiu")sqlSc.read.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro).show()}
}

http://www.ppmy.cn/ops/137604.html

相关文章

进制的问题

蓝桥2015某题 计算数字x在进制p 下的各位数字之和 ​ int calc(int x,int p) {int res0;while(x){resx%p;//取当前位累加x/p;//去掉最低位}return res; }​

【设计模式】【行为型模式(Behavioral Patterns)】之责任链模式(Chain of Responsibility Pattern)

1. 设计模式原理说明 责任链模式&#xff08;Chain of Responsibility Pattern&#xff09; 是一种行为设计模式&#xff0c;它允许你将请求沿着处理者链进行发送。每个处理者都可以处理请求&#xff0c;或者将其传递给链上的下一个处理者。这种模式使得多个对象都有机会处理请…

【Unity】Unity编辑器扩展,替代预制体上重复拖拽赋值

今天做游戏时有个需求&#xff0c;游戏中需要给不同年份不同月份的奖牌制定不一样的非规则形状&#xff0c;其中形状为100个像素组成的不同图形&#xff0c;并且按照从1-100路径一个个解锁&#xff0c;所以需要全部手动放置。但是手动放置好后&#xff0c;发现再一个个挂到脚本…

英语知识在线学习:Spring Boot网站设计

4系统概要设计 4.1概述 本系统采用B/S结构(Browser/Server,浏览器/服务器结构)和基于Web服务两种模式&#xff0c;是一个适用于Internet环境下的模型结构。只要用户能连上Internet,便可以在任何时间、任何地点使用。系统工作原理图如图4-1所示&#xff1a; 图4-1系统工作原理…

数据库(MySQL黑马)

基础篇 MySQL概述 数据库概述 数据库相关概念 主流的关系型数据库管理系统 MySQL数据库的安装与启动 下载&#xff1a;MySQL :: MySQL Community Downloads 安装步骤 MySQL―8.0.40超详细保姆级安装教程_mysql8.0.40安装教程-CSDN博客文章浏览阅读1k次。_mysql8.0.40安装教…

深入浅出剖析典型文生图产品Midjourney

2022年7月,一个小团队推出了公测的 Midjourney,打破了 AIGC 领域的大厂垄断。作为一个精调生成模型,以聊天机器人方式部署在 Discord,它创作的《太空歌剧院》作品,甚至获得了美国「数字艺术/数码摄影」竞赛单元一等奖。 这一事件展示了 AI 在绘画领域惊人的创造力,让人们…

力扣刷题TOP101:4.BM5 合并k个已排序的链表

目录&#xff1a; 目的 思路 复杂度 记忆秘诀 python代码 目的 [{1,2},{1,4,5},{6}] 合并成 {1,1,2,4,5,6} 思路 这个任务是将 k 个升序链表合并成一个升序链表。假设有一个代跑腿小哥&#xff0c;他的任务是从多个快递点取包裹并统一派送。每个快递点的包裹已经按派送距…

MongoDB 中设置登录账号密码可以通过以下步骤实现

1. 启用身份验证 默认情况下&#xff0c;MongoDB 不启用身份验证&#xff0c;需要修改配置文件启用。 打开 MongoDB 配置文件&#xff08;通常是 mongod.conf&#xff09;。 确保配置中有以下行&#xff08;或添加&#xff09;&#xff1a; security:authorization: enabled…