Delta lake with Java--数据增删改查

embedded/2024/12/22 2:05:18/

之前写的关于spark sql 操作delta lake表的,总觉得有点混乱,今天用Java以真实的数据来进行一次数据的CRUD操作,所涉及的数据来源于Delta lake up and running配套的 GitGitHub - benniehaelen/delta-lake-up-and-running: Companion repository for the book 'Delta Lake Up and Running'

要实现的效果是新建表,导入数据,然后对表进行增删改查操作,具体代码如下:

java">package detal.lake.java;import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;import java.text.SimpleDateFormat;
import io.delta.tables.DeltaTable;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.HashMap;public class DeltaLakeCURD {//将字符串转换成java.sql.Timestamppublic static java.sql.Timestamp strToSqlDate(String strDate, String dateFormat) {SimpleDateFormat sf = new SimpleDateFormat(dateFormat);java.util.Date date = null;try {date = sf.parse(strDate);} catch (Exception e) {e.printStackTrace();}java.sql.Timestamp dateSQL = new java.sql.Timestamp(date.getTime());return dateSQL;}public static void main(String[] args) {SparkSession spark = SparkSession.builder().master("local[*]").appName("delta_lake").config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.databricks.delta.autoCompact.enabled", "true").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate();SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String savePath="file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxi";String csvPath="D:\\bookcode\\delta-lake-up-and-running-main\\data\\YellowTaxisLargeAppend.csv";String tableName = "taxidb.YellowTaxis";spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");//定义表DeltaTable.createIfNotExists(spark).tableName(tableName).addColumn("RideId","INT").addColumn("VendorId","INT").addColumn("PickupTime","TIMESTAMP").addColumn("DropTime","TIMESTAMP").location(savePath).execute();//加载csv数据并导入delta表var df=spark.read().format("delta").table(tableName);var schema=df.schema();System.out.println(schema.simpleString());var df_for_append=spark.read().option("header","true").schema(schema).csv(csvPath);System.out.println("记录总行数:"+df_for_append.count());System.out.println("导入数据,开始时间"+  sdf.format(new Date()));df_for_append.write().format("delta").mode(SaveMode.Overwrite).saveAsTable(tableName);System.out.println("导入数据,结束时间" + sdf.format(new Date()));DeltaTable deltaTable = DeltaTable.forName(spark,tableName);//插入数据List<Row> list = new ArrayList<Row>();list.add(RowFactory.create(-1,-1,strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss"),strToSqlDate("2023-01-01 10:00:00","yyyy-MM-dd HH:mm:ss")));List<StructField> structFields = new ArrayList<>();structFields.add(DataTypes.createStructField("RideId", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("VendorId", DataTypes.IntegerType, true));structFields.add(DataTypes.createStructField("PickupTime", DataTypes.TimestampType, true));structFields.add(DataTypes.createStructField("DropTime", DataTypes.TimestampType, true));StructType structType = DataTypes.createStructType(structFields);var yellowTaxipDF=spark.createDataFrame(list,structType); //建立需要新增数据并转换成dataframeSystem.out.println("插入数据,开始时间"+  sdf.format(new Date()));yellowTaxipDF.write().format("delta").mode(SaveMode.Append).saveAsTable(tableName);System.out.println("插入数据,结束时间"+  sdf.format(new Date()));System.out.println("插入后数据");deltaTable.toDF().select("*").where("RideId=-1").show(false);//更新数据System.out.println("更新前数据");deltaTable.toDF().select("*").where("RideId=999994").show(false);System.out.println("更新数据,开始时间"+  sdf.format(new Date()));deltaTable.updateExpr("RideId = 999994",new HashMap<String, String>() {{put("VendorId", "250");}});System.out.println("更新数据,结束时间"+  sdf.format(new Date()));System.out.println("更新后数据");deltaTable.toDF().select("*").where("RideId=999994").show(false);//查询数据System.out.println("查询数据,开始时间"+  sdf.format(new Date()));var selectDf= deltaTable.toDF().select("*").where("RideId=1");selectDf.show(false);System.out.println("查询数据,结束时间" + sdf.format(new Date()));//删除数据System.out.println("删除数据,开始时间"+  sdf.format(new Date()));deltaTable.delete("RideId=1");System.out.println("删除数据,结束时间"+  sdf.format(new Date()));deltaTable.toDF().select("*").where("RideId=1").show(false);}
}

里面涉及spark的TimestampType类型,如何将字符串输入到TimestampType列,找了几个小时才找到答案,具体参考了如下连接,原来直接将string转成java.sql.Timestamp即可,于是在网上找了一个方法,实现了转换,转换代码非原创,也是借鉴其他大牛的。

scala - How to create TimestampType column in spark from string - Stack Overflow

最后运行结果


http://www.ppmy.cn/embedded/31600.html

相关文章

java中对文件的基本操作

文件IO 文件IO。啥叫文件的IO&#xff1f; 他就是指&#xff1a;1.Input&#xff08;输入&#xff09;2.Output&#xff08;输出&#xff09;。 比如&#xff0c;我们的电脑可以从网络中下载文件&#xff0c;也可以通过网络上传文件等等很多的例子&#xff0c;都体现了输入和…

展开说说:Android Fragment完全解析-卷三

本文章分析了Fragment的管理器FragmentManager、事务FragmentTransaction 、以及完整的声明周期和动态加载Fragment的原理解析。 1、Fragment管理器 FragmentManager 类负责在应用的 fragment 上执行一些操作&#xff0c;如添加、移除或替换操作&#xff0c;以及将操作添加到…

【论文阅读】Tutorial on Diffusion Models for Imaging and Vision

1.The Basics: Variational Auto-Encoder 1.1 VAE Setting 自动编码器有一个输入变量x和一个潜在变量z Example. 获得图像的潜在表现并不是一件陌生的事情。回到jpeg压缩&#xff0c;使用离散余弦变换&#xff08;dct&#xff09;基φn对图像的底层图像/块进行编码。如果你给…

使用Postman对@RequestPart和HttpServletRequest组合传参方式

使用Postman对RequestPart和HttpServletRequest组合传参方式 方法代码如下&#xff1a; /*** 发布*/ApiOperation("发布")ApiImplicitParams({ApiImplicitParam(name "req", value "json格式", dataType "Map", dataTypeClass Ma…

【Python可视化】pyecharts

Echarts 是一个由百度开源的数据可视化&#xff0c;凭借着良好的交互性&#xff0c;精巧的图表设计&#xff0c;得到了众多开发者的认可。而 Python 是一门富有表达力的语言&#xff0c;很适合用于数据处理。当数据分析遇上数据可视化时&#xff0c;pyecharts 诞生了。 需要安…

C# Solidworks二次开发:枚举应用实战(第十三讲)

大家好&#xff0c;今天继续介绍我们的枚举应用系列。 下面是今天要介绍的枚举&#xff1a; &#xff08;1&#xff09;第一个为swsUserPreferenceIntegerValue_e&#xff0c;这个枚举的含义为用户偏好整数值&#xff0c;下面是官方的具体枚举值&#xff1a; MemberDescript…

RHCE shell-第一次作业

要求&#xff1a; 1、判断当前磁盘剩余空间是否有20G&#xff0c;如果小于20G&#xff0c;则将报警邮件发送给管理员&#xff0c;每天检査- 次磁盘剩余空间。 2、判断web服务是否运行(1、查看进程的方式判断该程序是否运行&#xff0c;2、通过查看端口的方式 判断该程序是否运…

Spark使用Java读取Mysql

在Apache Spark中使用Java来读取MySQL数据库中的数据&#xff0c;你需要使用JDBC&#xff08;Java Database Connectivity&#xff09;来连接MySQL&#xff0c;并且通常你会使用Spark的JdbcRDD或者DataFrameReader&#xff08;通过Spark SQL&#xff09;来读取数据。不过&#…