Delta lake with Java--数据增删改查

devtools/2024/9/22 18:42:46/

之前写的关于spark sql 操作delta lake表的,总觉得有点混乱,今天用Java以真实的数据来进行一次数据的CRUD操作,所涉及的数据来源于Delta lake up and running配套的 GitGitHub - benniehaelen/delta-lake-up-and-running: Companion repository for the book 'Delta Lake Up and Running'

要实现的效果是新建表,导入数据,然后对表进行增删改查操作,具体代码如下:

java">package detal.lake.java;import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;import java.text.SimpleDateFormat;
import io.delta.tables.DeltaTable;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.HashMap;public class DeltaLakeCURD {//将字符串转换成java.sql.Timestamppublic static java.sql.Timestamp strToSqlDate(String strDate, String dateFormat) {SimpleDateFormat sf = new SimpleDateFormat(dateFormat);java.util.Date date = null;try {date = sf.parse(strDate);} catch (Exception e) {e.printStackTrace();}java.sql.Timestamp dateSQL = new java.sql.Timestamp(date.getTime());return dateSQL;}public static void main(String[] args) {SparkSession spark = SparkSession.builder().master("local[*]").appName("delta_lake").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.databricks.delta.autoCompact.enabled", "true").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate();SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String savePath="file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxi";String csvPath="D:\\bookcode\\delta-lake-up-and-running-main\\data\\YellowTaxisLargeAppend.csv";String tableName = "taxidb.YellowTaxis";spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");//定义表DeltaTable.createIfNotExists(spark).tableName(tableName).addColumn("RideId","INT").addColumn("VendorId","INT").addColumn("PickupTime","TIMESTAMP").addColumn("DropTime","TIMESTAMP").location(savePath).execute();//加载csv数据并导入delta表var df=spark.read().format("delta").table(tableName);var schema=df.schema();System.out.println(schema.simpleString());var df_for_append=spark.read().option("header","true").schema(schema).csv(csvPath);System.out.println("记录总行数:"+df_for_append.count());System.out.println("导入数据,开始时间"+  sdf.format(new Date()));df_for_append.write().format("delta").mode(SaveMode.Overwrite).saveAsTable(tableName);System.out.println("导入数据,结束时间" + sdf.format(new Date()));DeltaTable deltaTable = DeltaTable.forName(spark,tableName);//插入数据List<Row> list = new ArrayList<Row>();list.add(RowFactory.create(-1,-1,strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss"),strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss")));List<StructField> structFields = new ArrayList<>();structFields.add(DataTypes.createStructField("RideId", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("VendorId", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("PickupTime", DataTypes.TimestampType, true));structFields.add(DataTypes.createStructField("DropTime", DataTypes.TimestampType, true));StructType structType = DataTypes.createStructType(structFields);var yellowTaxipDF=spark.createDataFrame(list,structType); //建立需要新增数据并转换成dataframeSystem.out.println("插入数据,开始时间"+  sdf.format(new Date()));yellowTaxipDF.write().format("delta").mode(SaveMode.Append).saveAsTable(tableName);System.out.println("插入数据,结束时间"+  sdf.format(new Date()));System.out.println("插入后数据");deltaTable.toDF().select("*").where("RideId=-1").show(false);//更新数据System.out.println("更新前数据");deltaTable.toDF().select("*").where("RideId=999994").show(false);System.out.println("更新数据,开始时间"+  sdf.format(new Date()));deltaTable.updateExpr("RideId = 999994",new HashMap<String, String>() {{put("VendorId", "250");}});System.out.println("更新数据,结束时间"+  sdf.format(new Date()));System.out.println("更新后数据");deltaTable.toDF().select("*").where("RideId=999994").show(false);//查询数据System.out.println("查询数据,开始时间"+  sdf.format(new Date()));var selectDf= deltaTable.toDF().select("*").where("RideId=1");selectDf.show(false);System.out.println("查询数据,结束时间" + sdf.format(new Date()));//删除数据System.out.println("删除数据,开始时间"+  sdf.format(new Date()));deltaTable.delete("RideId=1");System.out.println("删除数据,结束时间"+  sdf.format(new Date()));deltaTable.toDF().select("*").where("RideId=1").show(false);}
}

里面涉及spark的TimestampType类型,如何将字符串输入到TimestampType列,找了几个小时才找到答案,具体参考了如下连接,原来直接将string转成java.sql.Timestamp即可,于是在网上找了一个方法,实现了转换,转换代码非原创,也是借鉴其他大牛的。

scala - How to create TimestampType column in spark from string - Stack Overflow

最后运行结果


http://www.ppmy.cn/devtools/31458.html

相关文章

解决clickhouse 启动报错

解决clickhouse 启动报错 Error response from daemon: driver failed programming external connectivity on endpoint clickhouse-server (b42457434cebe7d8ad024d31e4fd28eae2139bb2b5046c283bea17ce4398d5b0): Error starting userland proxy: listen tcp4 0.0.0.0:8123: …

npm ERR! Invalid dependency type requested: alias解决

错误说明&#xff1a; 在使用vue通过npm进行依赖下载的时候出现&#xff1a; npm ERR! Invalid dependency type requested: alias 原因是使用的是nodejs版本比较低&#xff0c;其中附带的npm版本也比较低&#xff0c;较低npm 版本不支持使用别名&#xff08;alias&#xff0…

SpringCloudAlibaba:3.1dubbo

dubbo 概述 简介 Apache Dubbo 是一款 RPC 服务开发框架&#xff0c;用于解决微服务架构下的服务治理与通信问题 官方提供了 Java、Golang、Rust 等多语言 SDK 实现 Dubbo的开源故事 最早在2008年&#xff0c;阿里巴巴就将Dubbo捐献到开源社区&#xff0c;它很快成为了国内开源…

Docker搭建LNMP+Wordpress

目录 一.项目模拟 1.项目环境 2.服务器环境 3.任务需求 &#xff08;1&#xff09;使用 Docker 构建 LNMP 环境并运行 Wordpress 网站平台 &#xff08;2&#xff09;限制 Nginx 容器最多使用 500MB 的内存和 1G 的 Swap &#xff08;3&#xff09;限制 Mysql 容器写 /d…

利用pytorch两层线性网络对titanic数据集进行分类(kaggle)

利用pytorch两层线性网络对titanic数据集进行分类 最近在看pytorch的入门课程&#xff0c;做了一下在kaggle网站上的作业&#xff0c;用的是titanic数据集&#xff0c;因为想搭一下神经网络&#xff0c;所以数据加载部分简单的把训练集和测试集中有缺失值的列还有含有字符串的…

深度学习之基于Matlab卷积神经网络验证码识别系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景 随着互联网的发展&#xff0c;验证码作为一种常用的安全验证手段&#xff0c;被广泛应用于各种网站和…

【华为 ICT HCIA eNSP 习题汇总】——题目集19

1、&#xff08;多选&#xff09;以下选项中&#xff0c;FTP 常用文件传输类型有&#xff08;&#xff09;。 A、ASCII 码类型 B、二进制类型 C、EBCDIC 类型 D、本地类型 考点&#xff1a;应用层 解析&#xff1a;&#xff08;AB&#xff09; 文件传输协议&#xff08;FTP&…

【QT】初始QT

目录 一.背景1.GUI开发的各种技术方案2.什么是框架3.QT支持的系统4.QT的版本5.QT的优点6.QT的应用常见 二.环境搭建1.认识QTSDK中的重要工具2.使用QT Creator创建项目3.项目解释(1)main.cpp(2)widget.h(3)widget.cpp(4)widget.ui(5)Empty.pro(6)临时文件 三.初始QT1.Hello Worl…