SprakSQL-Catalog

news/2024/12/22 0:37:25/

祝福

在这个举国同庆的时刻,我们首先献上对祖国的祝福:

第一,我们感谢您给我们和平的环境,让我们能快乐生活

第二,祝福我们国家未来的路越走越宽广,科技更发达,人民更幸福

第三,我们会紧紧跟随您的脚步,一起为美好的未来奋斗

一、概述

Catalog可以翻译为目录,意思就是用户在使用的时候可以通过它大致了解整个数据的框架和结构,比如:充当底层元存储(例如Hive元存储)的代理、管理其所属Spark会话的临时视图和函数。

二、使用

val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()import spark.implicits._
import spark.sql//列举表
spark.catalog.listTables().show()
//缓存表
spark.catalog.cacheTable("tableName")
//释放表
spark.catalog.uncacheTable("tableName")

三、源码

1、SparkSession

构建一个CatalogImpl,用户可以通过该界面创建、删除、更改或查询底层数据库、表、函数等。

@transient lazy val catalog: Catalog = new CatalogImpl(self)

2、CatalogImpl

CatalogImpl是面向用户的“目录”的内部实现

class CatalogImpl(sparkSession: SparkSession) extends Catalog {private def sessionCatalog: SessionCatalog = sparkSession.sessionState.catalog//返回此会话中的当前默认数据库override def currentDatabase: String = sessionCatalog.getCurrentDatabase//返回所有会话中可用的数据库列表override def listDatabases(): Dataset[Database] = {val databases = sessionCatalog.listDatabases().map(makeDatabase)CatalogImpl.makeDataset(databases, sparkSession)}//创建一个新的数据库private def makeDatabase(dbName: String): Database = {val metadata = sessionCatalog.getDatabaseMetadata(dbName)new Database(name = metadata.name,description = metadata.description,locationUri = CatalogUtils.URIToString(metadata.locationUri))}//返回当前数据库中的表列表。这包括所有临时表。override def listTables(): Dataset[Table] = {listTables(currentDatabase)}//返回指定数据库中的表列表。这包括所有临时表。@throws[AnalysisException]("database does not exist")override def listTables(dbName: String): Dataset[Table] = {val tables = sessionCatalog.listTables(dbName).map(makeTable)CatalogImpl.makeDataset(tables, sparkSession)}//返回给定表/视图或临时视图的表。//请注意,此函数要求该表已存在于目录中。//如果由于任何原因导致表元数据检索失败(例如,表serde类不可访问或Spark SQL不接受表类型),此函数仍将返回相应的表,而不包含描述和表类型)private def makeTable(tableIdent: TableIdentifier): Table = {val metadata = try {Some(sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent))} catch {case NonFatal(_) => None}val isTemp = sessionCatalog.isTempView(tableIdent)new Table(name = tableIdent.table,database = metadata.map(_.identifier.database).getOrElse(tableIdent.database).orNull,description = metadata.map(_.comment.orNull).orNull,tableType = if (isTemp) "TEMPORARY" else metadata.map(_.tableType.name).orNull,isTemporary = isTemp)}//返回当前数据库中注册的函数列表。这包括所有临时功能override def listFunctions(): Dataset[Function] = {listFunctions(currentDatabase)}//注册一个自定义的函数private def makeFunction(funcIdent: FunctionIdentifier): Function = {val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)new Function(name = metadata.getName,database = metadata.getDb,description = null, // for now, this is always undefinedclassName = metadata.getClassName,isTemporary = metadata.getDb == null)}//返回给定表/视图或临时视图的列列表。@throws[AnalysisException]("table does not exist")override def listColumns(tableName: String): Dataset[Column] = {val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)listColumns(tableIdent)}//........}

从CatalogImpl中我们可以看到有这样的功能:查看数据库、表、列、视图、函数列表、创建数据库、表、视图、函数、缓存表、释放表等等,其中基本都使用了SessionCatalog,下面我们详细看下它。

3、SessionCatalog

object SessionCatalog {val DEFAULT_DATABASE = "default"
}
class SessionCatalog(externalCatalogBuilder: () => ExternalCatalog,globalTempViewManagerBuilder: () => GlobalTempViewManager,functionRegistry: FunctionRegistry,tableFunctionRegistry: TableFunctionRegistry,hadoopConf: Configuration,parser: ParserInterface,functionResourceLoader: FunctionResourceLoader,cacheSize: Int = SQLConf.get.tableRelationCacheSize,cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging {lazy val externalCatalog = externalCatalogBuilder()lazy val globalTempViewManager = globalTempViewManagerBuilder()//临时视图列表,从表名映射到其逻辑计划@GuardedBy("this")protected val tempViews = new mutable.HashMap[String, TemporaryViewRelation]//注意:我们在这里跟踪当前数据库,因为某些操作没有明确指定数据库(例如DROP TABLE my_TABLE)。在这些情况下,我们必须首先检查临时视图或函数是否存在,如果不存在,则对当前数据库中的相应项进行操作。@GuardedBy("this")protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)private val validNameFormat = "([\\w_]+)".r//检查给定名称是否符合Hive标准(“[a-zA-Z_0-9]+”),即此名称是否仅包含字符、数字和_。//此方法旨在具有与org.apache.hoop.hive.metastore相同的行为。MetaStoreUtils.validateName。private def validateName(name: String): Unit = {if (!validNameFormat.pattern.matcher(name).matches()) {throw QueryCompilationErrors.invalidNameForTableOrDatabaseError(name)}}//设置表名格式,同时考虑区分大小写。protected[this] def formatTableName(name: String): String = {if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)}protected[this] def formatDatabaseName(name: String): String = {if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)}//获取缓存计划def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {tableRelationCache.get(t, c)}//下面是对数据库、表、分区、函数的操作//............}

我们来整体看下它有哪些功能:

        1、数据库:此类别中的所有方法都直接与底层Catalog交互

        2、表:有两种表,临时视图和元存储表。

                     临时视图在会话之间是隔离的,不属于任何特定的数据库。

                     元存储表可以在多个会话中使用,因为它们的元数据被持久化在底层Catalog中。

                     与元存储表交互的方法:

                        createTable() 、alterTable() 、alterTableDataSchema() 、tableExists() 等

                     与临时视图交互的方法:

                        createTempView() 、createGlobalTempView() 、 alterTempViewDefinition()等

        3、分区:加载数据源表时,将自动发现表的分区

        4、函数:临时函数和元存储函数(永久UDF)

                        临时功能在会话之间隔离。元存储函数可以在多个会话中使用,

                        因为它们的元数据保存在底层Catalog中。

数据库

我们以创建数据库为例来看下内部逻辑

def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {//设置数据库名称的格式,同时考虑区分大小写val dbName = formatDatabaseName(dbDefinition.name)//校验该数据库名释放占用if (dbName == globalTempViewManager.database) {throw QueryCompilationErrors.cannotCreateDatabaseWithSameNameAsPreservedDatabaseError(globalTempViewManager.database)}//校验数据库名释放符合命名规范validateName(dbName)//调用外部的Catalog创建数据库externalCatalog.createDatabase(dbDefinition.copy(name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)),ignoreIfExists)}

元存储表

我们以创建表为例来看下内部逻辑

  //在“tableDefinition”中指定的数据库中创建一个元存储表。如果没有指定此类数据库,请在当前数据库中创建它。def createTable(tableDefinition: CatalogTable,ignoreIfExists: Boolean,validateLocation: Boolean = true): Unit = {val isExternal = tableDefinition.tableType == CatalogTableType.EXTERNALif (isExternal && tableDefinition.storage.locationUri.isEmpty) {throw QueryCompilationErrors.createExternalTableWithoutLocationError}//获取数据库名val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))//格式化表名 ,默认不打开大小写区分,但官方强烈建议打开,且设置成小写去解析val table = formatTableName(tableDefinition.identifier.table)//在数据库中标识这个表val tableIdentifier = TableIdentifier(table, Some(db))//校验表名释放符合规范validateName(table)val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined&& !tableDefinition.storage.locationUri.get.isAbsolute) {//使表的位置合格val qualifiedTableLocation =makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)tableDefinition.copy(storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),identifier = tableIdentifier)} else {tableDefinition.copy(identifier = tableIdentifier)}//会调用外部Catalog判断该数据库是否存在//externalCatalog.databaseExists(dbName)requireDbExists(db)//会调用外部Catalog判断该表是否存在//externalCatalog.tableExists(db, table)if (tableExists(newTableDefinition.identifier)) {if (!ignoreIfExists) {throw new TableAlreadyExistsException(db = db, table = table)}} else if (validateLocation) {validateTableLocation(newTableDefinition)}//还是会调用外部Catalog去创建表externalCatalog.createTable(newTableDefinition, ignoreIfExists)}

临时表

我们也以创建临时视图为例来看下内部逻辑

  //临时视图列表,从表名映射到其逻辑计划@GuardedBy("this")protected val tempViews = new mutable.HashMap[String, TemporaryViewRelation]def createTempView(name: String,viewDefinition: TemporaryViewRelation,overrideIfExists: Boolean): Unit = synchronized {//格式化表名val table = formatTableName(name)//判断之前是否创建过临时视图,且是否可以覆盖if (tempViews.contains(table) && !overrideIfExists) {throw new TempTableAlreadyExistsException(name)}//想map中放入该表和该表的临时视图信息(用于后续的分析计划)tempViews.put(table, viewDefinition)}

临时表就是在内存中维护了一个map来存储它,外部表和数据库都是通过ExternalCatalog来操作的,下面我们就看下ExternalCatalog

4、ExternalCatalog

系统目录(包括函数、分区、表和数据库)的接口

这仅用于非临时项,实现必须是线程安全的,因为它们可以在多个线程中访问。这是一个外部Catalog,因为它需要与外部系统交互。

它相应的也有对数据库、表、分区、函数的操作,来支撑上层的调用

它有个子类:HiveExternalCatalog(使用Hive的系统目录的持久实现)

HiveExternalCatalog里面也有对数据库、表、分区、函数的操作,并把这些操作转交给HiveClient来操作。

private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration)extends ExternalCatalog with Logging {org.apache.hadoop.util.VersionInfo.getVersion()//用于与元数据交互的Hive客户端lazy val client: HiveClient = {HiveUtils.newClientForMetadata(conf, hadoopConf)}//下面是对数据库、表、分区、函数的操作}

四、总结:

从源码的调用,我们可以清楚的知道Catalog对临时视图、hive库表、分区、函数进行了统一管理,其中临时视图是用要给map来维护它们的关系,hive方面的实体表是委托给HiveExternalCatalog调用HiveClient来进行操作。

 


http://www.ppmy.cn/news/1533352.html

相关文章

(done) 声音信号处理基础知识(11) (Complex Numbers for Audio Signal Processing)

参考:https://www.youtube.com/watch?vDgF4m0AWCgA&t1047s 似乎是因为信号处理需要使用复数,作者花了一节课介绍复数 据油管主所说,声学信号处理中引入复数的原因是:快速完成部分计算 这里的例子是,当我们做傅里…

安装epic games错误码2738解决(安装ue错误码2738)

这个错误不好找到解决方案,尝试删除注册表以及通过电脑管家下载安装都不生效,仍然会错误2738。直到找到了这个解决方案。 1.cmd然后右键以管理员身份运行, 2.cd %windir%\syswow64进入该目录 3.reg delete “HKCU\SOFTWARE\Classes\Wow6432No…

MongoDB 聚合管道

参考: 聚合管道 - MongoDB 手册 v7.0 介绍 聚合管道由一个或多个处理文档的阶段组成: 每个阶段对输入文档执行一个操作。例如,某个阶段可以过滤文档、对文档进行分组并计算值。 从一个阶段输出的文档将传递到下一阶段。 一个聚合管道可以返回针对文档…

Spring Boot框架下的足球青训俱乐部后台开发

摘 要 随着社会经济的快速发展,人们对足球俱乐部的需求日益增加,加快了足球健身俱乐部的发展,足球俱乐部管理工作日益繁忙,传统的管理方式已经无法满足足球俱乐部管理需求,因此,为了提高足球俱乐部管理效率…

解决方案:PCA跟SVD有什么不同

文章目录 一、现象二、解决方案 一、现象 在做模型建模做降维的时候,时常回想起PCA(主成分分析)和SVD(奇异值分解),但两者有什么区别,时而会弄混,所以整理一下 二、解决方案 PCA&…

智能家居技术的前景和现状

一、智能家居行业概述 智能家居是指利用先进的科技手段,将家庭生活场景中的各种设备连接在一起,实现智能化控制和管理。随着物联网、人工智能等技术的不断进步,智能家居行业得到了迅速发展。 智能家居产品涵盖了家庭安防、智能照明、智能影…

等保测评:企业数字安全的坚实盾牌

1.1 企业数字化转型的浪潮 在当今时代,企业数字化转型的浪潮正以前所未有的速度席卷全球,据IDC预测,到2023年,全球数字化转型支出将达到惊人的2.3万亿美元。这一趋势不仅重塑了企业的运营模式,更对企业的信息安全提出…

汽车EDI:Martinrea EDI 对接

Martinrea所有供应商都必须与其建立EDI连接,并按照Martinrea的要求执行EDI交易。供应商需要承担自己实施EDI连接以及EDI映射等所有相关费用,包括ASN发货通知以及运输标签的测试。 EDI和运输标签需要在收到Martinrea发来的《Martinrea International Inc.…