大数据SQL调优专题——Hive执行原理

news/2025/2/15 20:55:11/

引入

Apache Hive 是基于Hadoop的数据仓库工具,它可以使用SQL来读取、写入和管理存在分布式文件系统中的海量数据。在Hive中,HQL默认转换成MapReduce程序运行到Yarn集群中,大大降低了非Java开发者数据分析的门槛,并且Hive提供命令行工具和JDBC驱动程序,方便用户连接到Hive进行数据分析操作。

严格意义上,Hive并不属于计算引擎,而是建立在Hadoop生态之上的数据仓库管理工具。它将繁杂的MapReduce作业抽象成SQL,使得开发及维护成本大幅降低。得益于HDFS的存储和MapReduce的读写能力,Hive展现出了强大的兼容能力、数据吞吐能力和服务稳定性,时至今日依然是大数据架构中不可或缺的一部分。

Hive的核心特点

  • Hive是基于Hadoop的数仓工具,底层数据存储在HDFS中;

  • Hive提供标准SQL功能,支持SQL语法访问操作数据;

  • Hive适合OLAP数据分析场景,不适合OLTP数据处理场景,所以适合数据仓库构建;

  • HQL默认转换成MapReduce任务执行,也可以配置转换成Apache Spark、Apache Tez任务运行;

  • Hive中支持定义UDF、UDAF、UDTF函数扩展功能。

Hive的架构设计

Hive用户接口

访问Hive可以通过CLI、Beeline、JDBC/ODBC、WebUI几种方式。在Hive早期版本中可以使用Hive CLI来操作Hive,Hive CLI并发性能差、脚本执行能力有限并缺乏JDBC驱动支持,从Hive 4.x版本起废弃了Hive CLI推荐使用Beeline。Beeline是一个基于JDBC的Hive客户端,支持并发环境、复杂脚本执行、JDBC驱动等,在Hive集群内连接Hive可以使用Beeline方式。在Hive集群外,通过代码或者工具连接操作Hive时可以通过JDBC/ODBC方式。通过WebUI方式可以通过浏览器查看到Hive集群的一些信息。

HiveServer2服务

HiveServer2服务提供JDBC/ODBC接口,主要用于代理远程客户端对Hive的访问,是一种基于Thrift协议的服务。例如通过JDBC或者Beeline连接访问Hive时就需要启动HiveServer2服务,就算Beeline访问本机上的Hive服务也需要启动HiveServer2服务。

HiveServer2代理远程客户端对Hive操作时会涉及到操作HDFS数据,就会有操作权限问题,那么操作HDFS中数据的用户是启动HiveServer2的用户还是远程客户端的用户需要通过“hive.server2.enable.doAs” 参数决定,该参数默认为true,表示HiveServer2操作HDFS时的用户为远程客户端用户,如果设置为false表示操作HDFS数据的用户为启动HiveServer2的用户。

MetaStore服务

MetaStore服务负责存储和管理Hive元数据,为HiverServer2提供元数据访问接口。Hive中的元数据包括表的名字,表的列和分区及其属性,表的属性(表拥有者、是否为外部表等),表的数据所在目录等。

Hive MetaStore可以将元数据存储在mysql、derby数据库中。

Hive Driver

Driver中包含解释器(SQL Parser)、编译器(Compiler)、优化器(Optimizer),负责完成HQL查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在HDFS中,并在随后有执行器(Executor)调用MapReduce执行。

对于Hive有了一个初步认识,我们下面开始梳理Hive的执行原理。

Hive的执行原理

Hive无论采用哪种调用方式,最终都会辗转到org.apache.hadoop.hive.ql.Driver类。SQL语句在Driver类中,通过Antlr框架进行解析编译,将SQL转换成最终执行的MapReduce任务。

如果直接盲目的去看Driver类的代码,会很容易看懵逼,我们需要再往前一点。

SQLOperation

先看org.apache.hive.service.cli.operation.SQLOperation 类,它负责创建Driver对象、编译SQL、异步执行SQL。其中核心的就是 runInternal()方法,主要进行如下两个步骤:

  1. Driver对象创建并编译SQL,将SQL编译成Query Plan执行计划。
  2. 对QueryPaln 进行处理,转换成MR 任务执行。

runInternal() 方法源码内容如下:

  /*** 内部运行方法,用于执行SQL操作。** @throws HiveSQLException 如果在执行过程中发生Hive SQL异常。*/public void runInternal() throws HiveSQLException {// 设置操作状态为PENDINGsetState(OperationState.PENDING);// 判断是否应该异步运行boolean runAsync = shouldRunAsync();// 判断是否应该异步编译final boolean asyncPrepare = runAsync&& HiveConf.getBoolVar(queryState.getConf(),HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_ASYNC_COMPILE);// 如果不是异步编译,则同步准备查询if (!asyncPrepare) {//创建Driver对象,编译SQL//Driver经过:SQL -> AST(抽象语法树) -> QueryBlock(查询块) -> Operator(e逻辑执行计划) -> TaskTree(物理执行计划) -> QueryPlan(查询计划)prepare(queryState);}// 如果不是异步运行,则同步运行查询if (!runAsync) {runQuery();} else {// 我们将在后台线程中传递ThreadLocals,从前台(处理程序)线程传递。// 1) ThreadLocal Hive对象需要在后台线程中设置// 2) Hive中的元数据存储客户端与正确的用户相关联。// 3) 当前UGI将在元数据存储处于嵌入式模式时被元数据存储使用Runnable work = new BackgroundWork(getCurrentUGI(), parentSession.getSessionHive(),SessionState.getPerfLogger(), SessionState.get(), asyncPrepare);try {// 如果没有可用的后台线程来运行此操作,此提交将阻塞Future<?> backgroundHandle = getParentSession().submitBackgroundOperation(work);// 设置后台操作句柄setBackgroundHandle(backgroundHandle);} catch (RejectedExecutionException rejected) {// 设置操作状态为ERRORsetState(OperationState.ERROR);// 抛出HiveSQLException异常throw new HiveSQLException("The background threadpool cannot accept" +" new task for execution, please retry the operation", rejected);}}}

1.Driver对象创建并编译SQL,将SQL编译成Query Plan执行计划

其中核心的是prepare()方法,它的源码在2.x和3.x、4.x有一些区别,不过其核心功能是没变的,主要是创建Driver对象,并编译SQL,然后通过Driver将SQL最终转换成Query Plan。

prepare()方法3.x的源码如下:

  /*** 准备执行SQL查询的操作。* 此方法负责初始化Driver,设置查询超时,编译查询语句,并处理可能的异常。** @param queryState 包含查询状态信息的对象。* @throws HiveSQLException 如果在准备过程中发生Hive SQL异常。*/public void prepare(QueryState queryState) throws HiveSQLException {// 设置操作状态为运行中setState(OperationState.RUNNING);try {// 创建Driver实例,返回的Driver对象是 ReExecDriverdriver = DriverFactory.newDriver(queryState, getParentSession().getUserName(), queryInfo);// 如果查询超时时间大于0,则启动一个定时任务来取消查询if (queryTimeout > 0) {// 创建一个单线程的定时任务执行器timeoutExecutor = new ScheduledThreadPoolExecutor(1);// 创建一个定时任务,在查询超时后取消查询Runnable timeoutTask = new Runnable() {@Overridepublic void run() {try {// 获取查询IDString queryId = queryState.getQueryId();// 记录日志,查询超时并取消执行LOG.info("Query timed out after: " + queryTimeout+ " seconds. Cancelling the execution now: " + queryId);// 取消查询SQLOperation.this.cancel(OperationState.TIMEDOUT);} catch (HiveSQLException e) {// 记录日志,取消查询时发生错误LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e);} finally {// 关闭定时任务执行器timeoutExecutor.shutdown();}}};// 安排定时任务在查询超时后执行timeoutExecutor.schedule(timeoutTask, queryTimeout, TimeUnit.SECONDS);}// 设置查询显示信息queryInfo.setQueryDisplay(driver.getQueryDisplay());// 设置操作句柄信息,以便Thrift API用户可以使用操作句柄查找Yarn ATS中的查询信息String guid64 = Base64.encodeBase64URLSafeString(getHandle().getHandleIdentifier().toTHandleIdentifier().getGuid()).trim();driver.setOperationId(guid64);// 编译SQL查询并响应 ReExecDriver.compileAndRespond(...) -> Driver.compileAndRespond(...)response = driver.compileAndRespond(statement);// 如果响应代码不为0,则抛出异常if (0 != response.getResponseCode()) {throw toSQLException("Error while compiling statement", response);}// 设置是否有结果集setHasResultSet(driver.hasResultSet());} catch (HiveSQLException e) {// 设置操作状态为错误setState(OperationState.ERROR);// 抛出异常throw e;} catch (Throwable e) {// 设置操作状态为错误setState(OperationState.ERROR);// 抛出异常throw new HiveSQLException("Error running query: " + e.toString(), e);}}

2.x与3.x源码最核心的区别就是在创建Driver,其对应源码是:

driver = new Driver(queryState, getParentSession().getUserName());

而4.x与3.x源码最核心的区别如下:

  1. 利用 Java 8 的 Lambda 表达式特性,简化代码逻辑,提高代码的可读性和可维护性。
  2. 通过将 queryTimeout 的类型改为 long,支持了更大的超时值,避免了溢出问题。
  3. 在资源管理方面,对调度器的生命周期管理也进行了优化,不需要显式的关闭操作。

4.x对应源码是:

if (queryTimeout > 0L) {timeoutExecutor = Executors.newSingleThreadScheduledExecutor();timeoutExecutor.schedule(() -> {try {final String queryId = queryState.getQueryId();log.info("Query timed out after: {} seconds. Cancelling the execution now: {}", queryTimeout, queryId);SQLOperation.this.cancel(OperationState.TIMEDOUT);} catch (HiveSQLException e) {log.error("Error cancelling the query after timeout: {} seconds", queryTimeout, e);}return null;}, queryTimeout, TimeUnit.SECONDS);
}

DriverFactory.newDriver()方法中返回 ReExecDriver对象,该对象表示执行过程失败可重试的Driver对象,然后调用 Driver.compileAndRespond() 方法进行编译SQL。

2.对QueryPaln 进行处理,转换成MR 任务执行

BackgroundWork是一个线程,负责异步处理QueryPlan,通过submitBackgroundOperation(work)提交运行,执行到SQLOperator.BackgroundOperation.run()方法,最终调用到Driver.run() 方法。

Driver

下面我们再来Driver类,它在不同版本中也有一些差别,比如2.x版本是直接 implements CommandProcessor,而3.x和4.x版本则是implements IDriver,而IDriver 则是 extends CommandProcessor。本质是为了更好的解耦和扩展性,使得代码更加模块化,易于维护和扩展。同时,通过继承 CommandProcessor 接口,也保持了与旧版本的兼容性,确保了功能的连续性。不过其核心功能是没变的,主要包含编译、优化及执行。

执行步骤

为了方便理解,我们先梳理整个执行步骤如下:

  1. 通过Antlr解析SQL语法规则和语法解析,将SQL语法转换成AST(抽象语法树)

  2. 遍历AST(抽象语法树) 将其转化成Query Block(查询块,可以看成查询基本执行单元)

  3. 将Query Block(查询块) 转换成OperatorTree(逻辑执行计划),并进行优化。

  4. OperatorTree(逻辑执行计划)转换成TaskTree(物理执行计划,每个Task对应一个MR Job任务)

  5. TaskTree(物理执行计划)最终包装成Query Plan(查询计划)

简单总结执行流程如下:

SQL -> AST(抽象语法树) -> QueryBlock(查询块) -> Operator(逻辑执行计划) -> TaskTree(物理执行计划) -> QueryPlan(查询计划)。

下面我们再结合SQLOperation调用的Driver类里面的核心方法,来看看底层源码是如何实现的:

compileAndRespond方法

首先第一个核心方法是

response = driver.compileAndRespond(statement);

compileAndRespond()方法2.x源码如下:

    /*** 编译给定的 SQL 命令并返回一个命令处理器响应。* 此方法调用 compileInternal 方法进行实际的编译操作,并使用编译结果创建一个命令处理器响应。** @param command 要编译的 SQL 命令* @return 包含编译结果的命令处理器响应*/public CommandProcessorResponse compileAndRespond(String command) {return createProcessorResponse(compileInternal(command, false));}

3.x和4.x会有些区别,会返回以下方法的调用结果:

coreDriver.compileAndRespond(statement);

无论哪个版本,最终compileAndRespond()方法都会调用到 compileInternal()方法,我们继续看2.x版本compileInternal()方法源码如下:

    private int compileInternal(String command, boolean deferClose) {int ret;// 获取Metrics实例,如果存在则增加等待编译操作的计数器Metrics metrics = MetricsFactory.getInstance();if(metrics != null) {metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);}// 尝试获取编译锁,如果获取失败则返回编译锁超时错误码final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled, command);if(compileLock == null) {return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();}try {// 如果Metrics实例存在,减少等待编译操作的计数器if(metrics != null) {metrics.decrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1);}// 进行Hive SQL编译ret = compile(command, true, deferClose);} finally {// 无论编译结果如何,最终都要释放编译锁compileLock.unlock();}// 如果编译失败,尝试释放锁并回滚事务if(ret != 0) {try {releaseLocksAndCommitOrRollback(false, null);} catch(LockException e) {// 记录释放锁时的异常信息LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e));}}// 保存编译时的性能日志,用于WebUI展示// 执行时的性能日志由另一个线程的PerfLogger或重置后的PerfLogger完成PerfLogger perfLogger = SessionState.getPerfLogger();queryDisplay.setPerfLogStarts(QueryDisplay.Phase.COMPILATION, perfLogger.getStartTimes());queryDisplay.setPerfLogEnds(QueryDisplay.Phase.COMPILATION, perfLogger.getEndTimes());return ret;}

3.x和4.x的源码相比起来有一些区别,但是都是通过执行Driver.compile()方法,由于4.x代码这块改动较大,做了很多解耦的操作,其核心内容还是变化不大,加上目前几乎很少应用4.x版本的hive,下面我们重点看看2.x和3.x版本的compile()方法内容。

compile方法

compile()方法2.x源码如下:

/*** 编译一个新的查询,可选择重置任务ID计数器并决定是否延迟关闭。* * @param command      要编译的HiveQL查询。* @param resetTaskIds 如果为true,则重置任务ID计数器。* @param deferClose   如果为true,则在编译过程被中断时延迟关闭/销毁操作。* @return 0表示编译成功,否则返回错误代码。*/
// deferClose 表示当进程被中断时,是否应该推迟关闭/销毁操作。如果 compile 方法是在另一个方法(如 runInternal)中被调用,并且该方法会将关闭操作推迟到其内部处理,那么 deferClose 应该设置为 true。
public int compile(String command, boolean resetTaskIds, boolean deferClose) {// 获取性能日志记录器,并开始记录编译过程的性能PerfLogger perfLogger = SessionState.getPerfLogger(true);perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);// 锁定驱动状态,将驱动状态设置为编译中lDrvState.stateLock.lock();try {lDrvState.driverState = DriverState.COMPILING;} finally {lDrvState.stateLock.unlock();}// 对查询命令进行变量替换command = new VariableSubstitution(new HiveVariableSource() {@Overridepublic Map<String, String> getHiveVariable() {return SessionState.get().getHiveVariables();}}).substitute(conf, command);// 存储查询字符串String queryStr = command;try {// 对查询命令进行脱敏处理,避免记录敏感数据queryStr = HookUtils.redactLogString(conf, command);} catch(Exception e) {// 若脱敏失败,记录警告信息LOG.warn("WARNING! Query command could not be redacted." + e);}// 检查编译过程是否被中断,若中断则处理中断并返回错误代码if(isInterrupted()) {return handleInterruption("at beginning of compilation."); //indicate if need clean resource}// 如果上下文不为空且解释分析状态不为运行中,则关闭现有上下文if(ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) {// close the existing ctx etc before compiling a new query, but does not destroy drivercloseInProcess(false);}// 如果需要重置任务ID,则重置任务工厂的IDif(resetTaskIds) {TaskFactory.resetId();}// 获取查询IDString queryId = conf.getVar(HiveConf.ConfVars.HIVEQUERYID);// 保存查询信息,用于Web UI显示this.queryDisplay.setQueryStr(queryStr);this.queryDisplay.setQueryId(queryId);// 记录编译开始信息LOG.info("Compiling command(queryId=" + queryId + "): " + queryStr);// 设置查询的当前时间戳SessionState.get().setupQueryCurrentTimestamp();// 标记编译过程中是否发生错误boolean compileError = false;try {// 初始化事务管理器final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf);// 移除旧的关闭hookShutdownHookManager.removeShutdownHook(shutdownRunner);// 创建新的关闭hook,用于在JVM关闭时释放锁shutdownRunner = new Runnable() {@Overridepublic void run() {try {releaseLocksAndCommitOrRollback(false, txnManager);} catch(LockException e) {// 若释放锁时发生异常,记录警告信息LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " + e.getMessage());}}};// 添加新的关闭hookShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);// 再次检查编译过程是否被中断if(isInterrupted()) {return handleInterruption("before parsing and analysing the query");}// 如果上下文为空,则创建新的上下文if(ctx == null) {ctx = new Context(conf);}// 设置上下文的重试次数、命令和HDFS清理标志ctx.setTryCount(getTryCount());ctx.setCmd(command);ctx.setHDFSCleanup(true);/*** 把 HQL命令 翻译成一个 ASTNode Tree* 封装了 ParseDriver 对 HQL 的解析工作* ParseDriver 对 command 进行词法分析和语法解析(统称为语法分析),返回一个抽象语法树AST*/// 开始记录解析过程的性能perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);// 解析查询命令,得到抽象语法树ASTNode tree = ParseUtils.parse(command, ctx);// 结束记录解析过程的性能perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);// 加载查询hookqueryHooks = loadQueryHooks();// 如果查询hook不为空且不为空列表,则触发查询hook的编译前操作if(queryHooks != null && !queryHooks.isEmpty()) {QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();qhc.setHiveConf(conf);qhc.setCommand(command);for(QueryLifeTimeHook hook : queryHooks) {hook.beforeCompile(qhc);}}// 开始记录语义分析过程的性能perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);// 获取语义分析器BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);// 获取语义分析hookList<HiveSemanticAnalyzerHook> saHooks = getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK, HiveSemanticAnalyzerHook.class);// 刷新元数据存储缓存,确保获取最新的元数据Hive.get().getMSC().flushCache();// 进行语义分析和计划生成if(saHooks != null && !saHooks.isEmpty()) {HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();hookCtx.setConf(conf);hookCtx.setUserName(userName);hookCtx.setIpAddress(SessionState.get().getUserIpAddress());hookCtx.setCommand(command);hookCtx.setHiveOperation(queryState.getHiveOperation());// 触发语义分析hook的预分析操作for(HiveSemanticAnalyzerHook hook : saHooks) {tree = hook.preAnalyze(hookCtx, tree);}/*** sem 是一个 SemanticAnalyzer(语义分析器) 对象* 主要的工作是将 ASTNode 转化为 TaskTree,包括可能的 optimize,过程比较复杂** tree:  AST  抽象语法树   ===> TaskTree*        TaskTree : 物理执行计划**   把抽象语法树交给 SemanticAnalyzer 执行语法解析*   1、从 AST 转成 解析树*   2、通过解析树 再生成 QB 在查询快*   3、从 QB 树在生成 OperatorTree (Logical Plan)*   4、逻辑执行计划的优化*   5、OperatorTree转变成TaskTree*   6、再针对物理执行计划执行优化*   7、生成QueryPlan*/// 进行语义分析sem.analyze(tree, ctx);// 更新hook上下文hookCtx.update(sem);// 触发语义分析hook的后分析操作for(HiveSemanticAnalyzerHook hook : saHooks) {hook.postAnalyze(hookCtx, sem.getAllRootTasks());}} else {// 若没有语义分析hook,直接进行语义分析sem.analyze(tree, ctx);}// 记录查询中发现的ACID文件接收器acidSinks = sem.getAcidFileSinks();// 记录语义分析完成信息LOG.info("Semantic Analysis Completed");// 验证语义分析生成的计划是否有效sem.validate();// 检查查询中是否包含ACID操作acidInQuery = sem.hasAcidInQuery();// 结束语义分析阶段的性能日志记录perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);// 检查编译过程是否被中断,如果中断则处理中断情况并返回if(isInterrupted()) {return handleInterruption("after analyzing query.");}// 根据语义分析结果和配置信息获取查询的输出模式schema = getSchema(sem, conf);/*** 把 TaskTree 生成一个 QueryPlan* 通过  Exeuctor 提交的方法,要接受的参数就是 QueryPlan*/// 根据查询字符串、语义分析器、开始时间、查询ID、操作类型和输出模式创建查询计划plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema);// 设置查询字符串到配置中conf.setQueryString(queryStr);// 设置MapReduce工作流ID到配置中conf.set("mapreduce.workflow.id", "hive_" + queryId);// 设置MapReduce工作流名称到配置中conf.set("mapreduce.workflow.name", queryStr);// 如果查询计划中包含FetchTask,则对其进行初始化if(plan.getFetchTask() != null) {plan.getFetchTask().initialize(queryState, plan, null, ctx.getOpContext());}// 进行授权检查,如果语义分析不跳过授权且开启了授权功能if(!sem.skipAuthorization() && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {try {// 开始记录授权过程的性能日志perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);// 执行授权操作doAuthorization(queryState.getHiveOperation(), sem, command);} catch(AuthorizationException authExp) {// 如果授权失败,打印错误信息并设置错误状态和返回码console.printError("Authorization failed:" + authExp.getMessage() + ". Use SHOW GRANT to " + "get" + " more details.");errorMessage = authExp.getMessage();SQLState = "42000";return 403;} finally {// 结束记录授权过程的性能日志perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);}}// 如果配置中开启了记录EXPLAIN输出的功能if(conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {// 获取查询的EXPLAIN输出String explainOutput = getExplainOutput(sem, plan, tree);if(explainOutput != null) {if(conf.getBoolVar(ConfVars.HIVE_LOG_EXPLAIN_OUTPUT)) {// 记录EXPLAIN输出到日志中LOG.info("EXPLAIN output for queryid " + queryId + " : " + explainOutput);}if(conf.isWebUiQueryInfoCacheEnabled()) {// 如果开启了Web UI查询信息缓存,将EXPLAIN计划设置到查询显示信息中queryDisplay.setExplainPlan(explainOutput);}}}// 编译成功,返回0return 0;} catch(Exception e) {// 如果编译过程中被中断,处理中断情况并返回if(isInterrupted()) {return handleInterruption("during query compilation: " + e.getMessage());}// 标记编译过程出现错误compileError = true;// 获取错误信息ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());// 构建错误消息errorMessage = "FAILED: " + e.getClass().getSimpleName();if(error != ErrorMsg.GENERIC_ERROR) {errorMessage += " [Error " + error.getErrorCode() + "]:";}// HIVE-4889if((e instanceof IllegalArgumentException) && e.getMessage() == null && e.getCause() != null) {errorMessage += " " + e.getCause().getMessage();} else {errorMessage += " " + e.getMessage();}if(error == ErrorMsg.TXNMGR_NOT_ACID) {errorMessage += ". Failed command: " + queryStr;}// 设置SQL状态码SQLState = error.getSQLState();// 记录下游错误信息downstreamError = e;// 打印错误信息和详细堆栈跟踪console.printError(errorMessage, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));// 返回错误代码return error.getErrorCode();// since it exceeds valid range of shell return values} finally {// 触发编译后的hook函数try {if(queryHooks != null && !queryHooks.isEmpty()) {QueryLifeTimeHookContext qhc = new QueryLifeTimeHookContextImpl();qhc.setHiveConf(conf);qhc.setCommand(command);for(QueryLifeTimeHook hook : queryHooks) {hook.afterCompile(qhc, compileError);}}} catch(Exception e) {// 如果触发hook函数时出现异常,记录警告信息LOG.warn("Failed when invoking query after-compilation hook.", e);}/*** 计算任务总耗时*/// 结束编译阶段的性能日志记录并计算耗时double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE) / 1000.00;// 获取编译过程中HMS调用的时间统计信息ImmutableMap<String, Long> compileHMSTimings = dumpMetaCallTimingWithoutEx("compilation");// 设置查询显示信息中的HMS时间统计信息queryDisplay.setHmsTimings(QueryDisplay.Phase.COMPILATION, compileHMSTimings);// 检查编译过程是否被中断boolean isInterrupted = isInterrupted();if(isInterrupted && !deferClose) {// 如果被中断且不延迟关闭,关闭正在进行的操作closeInProcess(true);}// 锁定驱动状态lDrvState.stateLock.lock();try {if(isInterrupted) {// 如果被中断,根据是否延迟关闭设置驱动状态lDrvState.driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR;} else {// 如果未被中断,根据编译是否出错设置驱动状态lDrvState.driverState = compileError ? DriverState.ERROR : DriverState.COMPILED;}} finally {// 解锁驱动状态lDrvState.stateLock.unlock();}if(isInterrupted) {// 如果编译过程被中断,记录中断信息LOG.info("Compiling command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds");} else {// 如果编译过程未被中断,记录编译完成信息LOG.info("Completed compiling command(queryId=" + queryId + "); Time taken: " + duration + " " + "seconds");}}
}

compile()方法在3.x和4.x有一些区别,但是都有以下三个核心方法:

  1. 首先是通过ParseUtils.parse(command, ctx)将Hive SQL转换成AST(抽象语法树),即:HQL -> AST(抽象语法树)转换;
  2. 然后是通过BaseSemanticAnalyzer.analyze()方法将AST(抽象语法树)解析生成TaskTree(物理执行计划)
  3. 最后将BaseSemanticAnalyzer传入QueryPlan的构造函数,来创建QueryPlan(查询计划)

其核心正是引入篇我们提到的解析(Parsing)、校验(Validation)、优化(Optimization)和执行(Execution)

下面我们深入这几个方法看看:

parse方法

compile()方法中,首先是通过ParseUtils.parse(command, ctx)进行词法分析与解析,将Hive HQL转换成AST抽象语法树。

我们来看看parse()方法的源码:

/*** 解析 HQL。* * 此方法接收一个 Hive 查询命令和上下文对象,调用另一个重载的 parse 方法进行实际的解析操作,* 并将视图的全限定名参数设为 null。* * @param command 要解析的 Hive 查询命令* @param ctx 查询的上下文对象* @return 解析后的 AST 节点* @throws ParseException 如果解析过程中出现异常*/
public static ASTNode parse(String command, Context ctx) throws ParseException {return parse(command, ctx, null);
}

继续往里走,对应源码如下:

  /*** 解析HQL * ParseDriver对command进行词法分析和语法解析(统称为语法分析),返回一个抽象语法树AST* * @param command 要解析的Hive查询命令* @param ctx 查询上下文信息* @param viewFullyQualifiedName 视图的完全限定名称* @return 解析后的AST节点* @throws ParseException 如果解析过程中出现错误*/public static ASTNode parse(String command, Context ctx, String viewFullyQualifiedName) throws ParseException {// 创建一个ParseDriver实例用于解析命令ParseDriver pd = new ParseDriver();// 使用ParseDriver解析命令,得到AST节点ASTNode tree = pd.parse(command, ctx, viewFullyQualifiedName);// 查找根节点中第一个具有非空令牌的节点tree = findRootNonNullToken(tree);// 处理设置列引用的情况handleSetColRefs(tree);// 返回处理后的AST节点return tree;}

pd.parse()方法中,核心调用的是HiveLexer和HiveParser这两个类,它们分别负责SQL的词法分析和语法解析,我们继续看看其中源码:

/*** 解析给定的命令字符串,将其转换为抽象语法树(AST)节点。** @param command 要解析的命令字符串。* @param ctx 解析上下文,可包含配置信息和tokens重写流。* @param viewFullyQualifiedName 视图的完全限定名称,如果不是视图解析则为 null。* @return 解析后的 AST 节点。* @throws ParseException 如果解析过程中出现错误。*/public ASTNode parse(String command, Context ctx, String viewFullyQualifiedName)throws ParseException {// 如果启用了调试日志,则记录正在解析的命令if (LOG.isDebugEnabled()) {LOG.debug("Parsing command: " + command);}/***  Antlr对语法文件 HiveLexer.g 编译后自动生成的词法解析和语法解析类(HiveLexerX,HiveParser)*  文件 HiveLexer.g 定义了一些 hive 的关键字,form、where,数字的定义格式【0–9】,分隔符,比较符之类的。*  每一个关键字分支都会变成一个 token。**  HiveLexerX 是 antlr 根据词法规则文件,通过编译生成的一个代码类*  能够执行词法和语法的解析   *  最终生成一个 ASTNode*/// 创建一个不区分大小写的字符流,并使用它初始化词法分析器HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));/***  根据词法分析的结果得到tokens的,此时不只是单纯的字符串,*  而是具有特殊意义的字符串的封装,其本身是一个流。*  lexer 把 SQL 语句中的各个语法分支,都转换成底层引擎能识别的各种 Token*/// 创建一个tokens重写流,用于处理词法分析器生成的tokensTokenRewriteStream tokens = new TokenRewriteStream(lexer);// 如果提供了上下文,则根据是否为视图设置tokens重写流,并设置词法分析器的配置if (ctx != null) {if (viewFullyQualifiedName == null) {// 顶层查询ctx.setTokenRewriteStream(tokens);} else {// 这是一个视图ctx.addViewTokenRewriteStream(viewFullyQualifiedName, tokens);}lexer.setHiveConf(ctx.getConf());}// 语法解析 HiveParser是 Antlr 根据 HiveParser.g 生成的文件// 使用tokens重写流初始化语法解析器HiveParser parser = new HiveParser(tokens);// 如果提供了上下文,则设置解析器的配置if (ctx != null) {parser.setHiveConf(ctx.getConf());}// 设置解析器的树适配器,用于创建 AST 节点parser.setTreeAdaptor(adaptor);// 声明一个变量来存储解析结果HiveParser.statement_return r = null;try {/*** 转化为 ASTTree 放在 ASTNode 中的 tree 属性中。 通过 r.getTree() 获取返回。* 当前这句代码完成了从 Tok 树到 AST 的转变* 把结果放在了 HiveParser.statement_return*/// 调用解析器的 statement 方法进行解析r = parser.statement();} catch (RecognitionException e) {// 打印异常堆栈跟踪信息e.printStackTrace();// 如果解析过程中出现识别异常,则抛出解析异常throw new ParseException(parser.errors);}// 检查词法分析器和解析器是否有错误if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) {// 如果没有错误,则记录解析完成的日志LOG.debug("Parse Completed");} else if (lexer.getErrors().size() != 0) {// 如果词法分析器有错误,则抛出解析异常throw new ParseException(lexer.getErrors());} else {// 如果解析器有错误,则抛出解析异常throw new ParseException(parser.errors);}// 获取解析结果的树,并将其转换为 AST 节点ASTNode tree = (ASTNode) r.getTree();// 设置 AST 节点的未知tokens边界tree.setUnknownTokenBoundaries();// 返回解析后的 AST 节点return tree;}

pd.parse()方法将sql语法转换成抽象语法树 AST,Hive中通过使用 Antlr(Another Tool for Language Recognition)进行词法分析和语法解析。

Antlr主要作用:

  • 词法分析:将输入的HiveQL查询字符串分解成一系列的Token,这些Token是语法分析的基础。Antlr生成的词法分析器(Lexer)负责将输入的HiveQL查询字符串分解成一个个Token,这些Token表示查询中的关键字、标识符、运算符等基本元素。
  • 语法解析:根据词法分析器生成的Token序列,解析HiveQL查询语句,生成AST抽象语法树。Antlr生成的语法解析器(Parser)负责读取Token序列,并根据语法规则解析这些Token,生成对应的AST抽象语法树。Token 对应 SQL中的每个关键字。

analyze方法

通过上一个步骤并获取到 ASTNode之后,需要对其进行进一步的抽象和结构化处理,以便能够更便捷地将其转换为MapReduce程序。为此将会初始化类BaseSemanticAnalyzer,并通过SemanticAnalyzerFactory确定SQL的类型,进而调用analyze()方法进行分析,其对应源码如下:

BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
sem.analyze(tree, ctx);

其中 sem 是一个 SemanticAnalyzer(语义分析器)对象,主要的工作是将 ASTNode 转化为 TaskTree(物理执行计划),包括可能的 optimize(优化),也就是前面执行步骤第2~5步做的内容。

首先看analyze()对应源码如下:

    /*** 分析给定的抽象语法树(AST)节点,并使用提供的上下文进行初始化。* * 此方法首先初始化上下文,然后初始化分析器的内部状态。* 最后,调用 `analyzeInternal` 方法对 AST 进行实际的分析。* * @param ast 要分析的抽象语法树节点。* @param ctx 分析过程中使用的上下文。* @throws SemanticException 如果在分析过程中发生语义错误。*/public void analyze(ASTNode ast, Context ctx) throws SemanticException {// 初始化上下文initCtx(ctx);// 初始化分析器的内部状态,清除部分缓存init(true);// 调用内部分析方法对 AST 进行分析analyzeInternal(ast);}

可以看到,除了进行必要的初始化之外,还会调用analyzeInternal()方法,对应源码如下:

    /*** 对抽象语法树(AST)进行内部语义分析。* 此方法为抽象方法,具体实现需在子类中完成。* 它负责对传入的AST进行详细的语义分析,以确保查询语句的合法性和正确性。** @param ast 待分析的抽象语法树节点* @throws SemanticException 如果在语义分析过程中发现错误*/public abstract void analyzeInternal(ASTNode ast) throws SemanticException;

可以看到analyzeInternal()是一个抽象方法,它有多种具体实现,通过断点查看,会发现流程是跳转到了org.apache.hadoop.hive.ql.parse.SemanticAnalyzer类,其源码注释如下:

Implementation of the semantic analyzer. It generates the query plan.
There are other specific semantic analyzers for some hive operations such as DDLSemanticAnalyzer for ddl operations.

翻译:

语义分析器的实现。它用于生成查询计划。
对于某些 Hive 操作,还有其他特定的语义分析器,例如用于 DDL 操作的 DDLSemanticAnalyzer。

这个类有点复杂,Hive优化的秘密全在于此,将AST抽象语法树解析生成TaskTree(物理执行计划)的全流程,包括逻辑执行计划、逻辑执行计划的优化、物理执行计划的切分、物理执行计划的优化、以及 MapReduce 任务的生成全部都在其中,下面我们就看看其中实现的analyzeInternal()方法源码:

  /*** 对传入的AST节点进行内部分析,生成查询计划。** @param ast 抽象语法树节点* @param pcf 计划上下文工厂* @throws SemanticException 语义分析异常*/void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticException {LOG.info("Starting Semantic Analysis");// 1. 从语法树生成解析树boolean needsTransform = needsTransform();// 改变位置别名处理的位置processPositionAlias(ast);PlannerContext plannerCtx = pcf.create();if (!genResolvedParseTree(ast, plannerCtx)) {return;}if (HiveConf.getBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY)) {for (String alias : qb.getSubqAliases()) {removeOBInSubQuery(qb.getSubqForAlias(alias));}}// 检查查询结果缓存。// 如果不需要进行掩码/过滤,则可以在生成操作符树和进行CBO之前检查缓存。// 否则,必须等到掩码/过滤步骤之后。boolean isCacheEnabled = isResultsCacheEnabled();QueryResultsCache.LookupInfo lookupInfo = null;if (isCacheEnabled && !needsTransform && queryTypeCanUseCache()) {lookupInfo = createLookupInfoForQuery(ast);if (checkResultsCache(lookupInfo)) {return;}}ASTNode astForMasking;if (isCBOExecuted() && needsTransform &&(qb.isCTAS() || qb.isView() || qb.isMaterializedView() || qb.isMultiDestQuery())) {// 如果使用CBO并且可能应用掩码/过滤策略,则创建ast的副本。// 原因是操作符树的生成可能会修改初始ast,但如果需要第二次解析,我们希望解析未修改的ast。astForMasking = (ASTNode) ParseDriver.adaptor.dupTree(ast);} else {astForMasking = ast;}// 2. 从解析树生成操作符树Operator sinkOp = genOPTree(ast, plannerCtx);boolean usesMasking = false;if (!unparseTranslator.isEnabled() &&(tableMask.isEnabled() && analyzeRewrite == null)) {// 在这里重写 * 以及掩码表ASTNode rewrittenAST = rewriteASTWithMaskAndFilter(tableMask, astForMasking, ctx.getTokenRewriteStream(),ctx, db, tabNameToTabObject, ignoredTokens);if (astForMasking != rewrittenAST) {usesMasking = true;plannerCtx = pcf.create();ctx.setSkipTableMasking(true);init(true);// 改变位置别名处理的位置processPositionAlias(rewrittenAST);genResolvedParseTree(rewrittenAST, plannerCtx);if (this instanceof CalcitePlanner) {((CalcitePlanner) this).resetCalciteConfiguration();}sinkOp = genOPTree(rewrittenAST, plannerCtx);}}// 检查查询结果缓存// 在需要进行行或列掩码/过滤的情况下,不支持缓存。// TODO: 为带有掩码/过滤的查询启用缓存if (isCacheEnabled && needsTransform && !usesMasking && queryTypeCanUseCache()) {lookupInfo = createLookupInfoForQuery(ast);if (checkResultsCache(lookupInfo)) {return;}}// 3. 推导结果集模式if (createVwDesc != null && !this.ctx.isCboSucceeded()) {resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());} else {// 如果满足以下条件,resultSchema将为null:// (1) cbo被禁用;// (2) 或者cbo启用但使用AST返回路径(无论是否成功,resultSchema都将重新初始化)// 只有在cbo启用且使用新返回路径并且成功时,resultSchema才不为null。if (resultSchema == null) {resultSchema = convertRowSchemaToResultSetSchema(opParseCtx.get(sinkOp).getRowResolver(),HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES));}}// 4. 为优化器和物理编译器生成解析上下文copyInfoToQueryProperties(queryProperties);ParseContext pCtx = new ParseContext(queryState, opToPartPruner, opToPartList, topOps,// 使用菱形操作符简化泛型类型声明new HashSet<>(joinContext.keySet()),// 使用菱形操作符简化泛型类型声明new HashSet<>(smbMapJoinContext.keySet()),loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner,globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc,queryProperties, viewProjectToTableSchema, acidFileSinks);// 在解析上下文中设置半连接提示pCtx.setSemiJoinHints(parseSemiJoinHint(getQB().getParseInfo().getHintList()));// 如果需要禁用映射连接提示,则设置pCtx.setDisableMapJoin(disableMapJoinWithHint(getQB().getParseInfo().getHintList()));// 5. 处理视图创建if (createVwDesc != null) {if (ctx.getExplainAnalyze() == AnalyzeState.RUNNING) {return;}if (!ctx.isCboSucceeded()) {saveViewDefinition();}// 此时验证创建视图语句,createVwDesc包含语义检查所需的所有信息validateCreateView();if (createVwDesc.isMaterialized()) {createVwDesc.setTablesUsed(getTablesUsed(pCtx));} else {// 由于我们只是创建视图(不执行它),因此不需要优化或转换计划(实际上,这些过程可能会干扰视图创建)。所以跳过此方法的其余部分。ctx.setResDir(null);ctx.setResFile(null);try {PlanUtils.addInputsForView(pCtx);} catch (HiveException e) {throw new SemanticException(e);}// 为创建视图语句生成谱系信息// 如果配置了LineageLoggerhook。// 添加计算谱系信息的转换。Set<String> postExecHooks = Sets.newHashSet(Splitter.on(",").trimResults().omitEmptyStrings().split(Strings.nullToEmpty(HiveConf.getVar(conf, HiveConf.ConfVars.POSTEXECHOOKS))));if (postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")|| postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {// 使用菱形操作符简化泛型类型声明ArrayList<Transform> transformations = new ArrayList<>();transformations.add(new HiveOpConverterPostProc());transformations.add(new Generator(postExecHooks));for (Transform t : transformations) {pCtx = t.transform(pCtx);}// 我们仅使用视图名称作为位置。queryState.getLineageState().mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp);}return;}}// 6. 如果需要,生成表访问统计信息if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) {TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx);setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess());}// 7. 执行逻辑优化if (LOG.isDebugEnabled()) {LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));}// 创建一个优化器实例,并对解析上下文进行逻辑优化。Optimizer optm = new Optimizer();// 设置优化器的解析上下文optm.setPctx(pCtx);// 初始化优化器optm.initialize(conf);// 执行优化操作,并更新解析上下文pCtx = optm.optimize();// 检查优化后的解析上下文中是否包含列访问信息if (pCtx.getColumnAccessInfo() != null) {// 设置列访问信息,用于视图列授权setColumnAccessInfo(pCtx.getColumnAccessInfo());}// 如果启用了调试日志,则输出优化后的操作符树信息if (LOG.isDebugEnabled()) {LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));}// 8. Generate column access stats if required - wait until column pruning// takes place during optimization// 检查是否需要收集列访问信息用于授权或统计boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2()&& HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED);if (isColumnInfoNeedForAuth|| HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {// 创建列访问分析器实例ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx);// 分析列访问信息,并更新列访问信息// view column access info is carried by this.getColumnAccessInfo().setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess(this.getColumnAccessInfo()));}// 9. Optimize Physical op tree & Translate to target execution engine (MR,// TEZ..)// 检查是否需要进行逻辑解释,如果不需要则进行物理操作树的优化和编译if (!ctx.getExplainLogical()) {// 获取任务编译器实例TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);// 初始化任务编译器compiler.init(queryState, console, db);// 编译解析上下文,生成任务和输入输出信息compiler.compile(pCtx, rootTasks, inputs, outputs);// 获取获取任务fetchTask = pCtx.getFetchTask();}//find all Acid FileSinkOperatorS// 创建查询计划后处理器实例,但该实例未被使用,后续可考虑移除QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, acidFileSinks, ctx.getExecutionId());// 10. Attach CTAS/Insert-Commit-hooks for Storage Handlers// 查找根任务列表中的第一个TezTaskfinal Optional<TezTask> optionalTezTask =rootTasks.stream().filter(task -> task instanceof TezTask).map(task -> (TezTask) task).findFirst();if (optionalTezTask.isPresent()) {// 获取第一个TezTask实例final TezTask tezTask = optionalTezTask.get();// 遍历根任务列表,为满足条件的DDLWork添加插入提交hook任务rootTasks.stream()// 过滤出工作类型为DDLWork的任务.filter(task -> task.getWork() instanceof DDLWork)// 将任务转换为DDLWork类型.map(task -> (DDLWork) task.getWork())// 过滤出预插入表描述不为空的DDLWork.filter(ddlWork -> ddlWork.getPreInsertTableDesc() != null)// 获取预插入表描述.map(ddlWork -> ddlWork.getPreInsertTableDesc())// 创建插入提交hook描述.map(ddlPreInsertTask -> new InsertCommitHookDesc(ddlPreInsertTask.getTable(),ddlPreInsertTask.isOverwrite()))// 为TezTask添加依赖任务.forEach(insertCommitHookDesc -> tezTask.addDependentTask(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), insertCommitHookDesc), conf)));}LOG.info("Completed plan generation");// 11. put accessed columns to readEntity// 检查是否需要收集扫描列的统计信息if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {// 将访问的列信息添加到读取实体中putAccessedColumnsToReadEntity(inputs, columnAccessInfo);}// 检查是否启用了查询结果缓存,并且查找信息不为空if (isCacheEnabled && lookupInfo != null) {// 检查查询是否可以被缓存if (queryCanBeCached()) {// 创建缓存查询信息QueryResultsCache.QueryInfo queryInfo = createCacheQueryInfoForQuery(lookupInfo);// Specify that the results of this query can be cached.// 指定该查询的结果可以被缓存setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo));}}}

简单总结一下,首先输入的是AST抽象语法树,主要经历了以下步骤:

  1. Generate Resolved Parse tree from syntax tree 从语法树生成解析树
  2. Gen OP Tree from resolved Parse Tree 从解析树生成Gen OP树  OperatorTree
  3. Deduce Resultset Schema(selct ...... 每个字段,我给你构造成一个 Field)推导结果集模式  CBO优化
  4. Generate ParseContext for Optimizer & Physical compiler 为优化器和物理编译器生成解析上下文
  5. Take care of view creation 注意视图创建
  6. Generate table access stats if required 生成表访问统计信息(如果需要)
  7. Perform Logical optimization 执行逻辑执行计划的优化
  8. Generate column access stats if required - wait until column pruning takes place during optimization
    根据需要生成列访问统计信息-等待优化期间进行列裁剪
    sql当中写了很多的无用的字段,但是最终执行逻辑不需要这些字段,就需要列裁剪。
  9. Optimize Physical op tree & Translate to target execution engine (MR, Spark, TEZ..) 优化物理操作树并转换为目标执行引擎(MR,TEZ ..)
  10. put accessed columns to readEntity 将访问的列放入 ReadEntity(要读取的列的信息)
  11. if desired check we're not going over partition scan limits 如果需要检查,我们不会超过分区扫描限制

生成QueryPlan

这一系列操作完成后,最后就是把得到的 TaskTree 生成一个 QueryPlan,相关源码如下:

/*** 创建一个新的 QueryPlan 对象。* * @param queryStr 要执行的查询字符串* @param sem 语义分析器对象,用于对查询进行语义分析* @param startTime 驱动程序开始运行的时间,通过 perfLogger 获取* @param queryId 查询的唯一标识符* @param hiveOperation Hive 操作类型* @param schema 查询结果的输出模式*/
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, queryState.getHiveOperation(), schema);

总结

本文介绍了Hive,并通过源码梳理了Hive的执行原理,其核心正是引入篇我们提到的解析(Parsing)、校验(Validation)、优化(Optimization)和执行(Execution)

总结起来主要有以下四个步骤:

  1. 词法分析与解析
    将SQL语法转换成AST(抽象语法树) 
  2. 语义分析
    将AST进行进一步的抽象和结构化处理,通过遍历AST(抽象语法树) 将其转化成Query Block
  3. 逻辑优化
    到了第三步时,操作符树虽然已经勾勒出执行任务的先后顺序和上下游依赖,但细节还比较粗糙,例如存在重复的数据扫描、不必要的Shuffle操作等,因此还需要进行进一步优化。通过优化,Hive可以改进查询的执行计划,并生成更高效的作业图以在分布式计算框架中执行。这些优化可以提高查询的性能和效率,并减少资源开销。
  4. 物理优化
    在逻辑优化阶段结束后,输入的SQL语句也逐步转换为优化后的逻辑计划,不过此时的逻辑计划仍然不能直接执行,还需要进一步转换成可以识别并执行的MapReduce Task,首先将优化后的OperatorTree(逻辑执行计划)转换成TaskTree(物理执行计划,每个Task对应一个MR Job任务),并对物理执行计划进行一些优化,然后依次调用执行。

有朋友看了初版觉得写的不够细,私信让我迭代丰富一下,但还有一些有意思的细节,比如4.x源码的区别等,感兴趣的小伙伴可以自行深入探索一下。


http://www.ppmy.cn/news/1572342.html

相关文章

深入浅出Java反射:掌握动态编程的艺术

小程一言反射何为反射反射核心类反射的基本使用获取Class对象创建对象调用方法访问字段 示例程序应用场景优缺点分析优点缺点 注意 再深入一些反射与泛型反射与注解反射与动态代理反射与类加载器 结语 小程一言 本专栏是对Java知识点的总结。在学习Java的过程中&#xff0c;学习…

Python 识别图片和扫描PDF中的文字

目录 工具与设置 Python 识别图片中的文字 Python 识别图片中的文字及其坐标位置 Python 识别扫描PDF中的文字 注意事项 在处理扫描的PDF和图片时&#xff0c;文字信息往往无法直接编辑、搜索或复制&#xff0c;这给信息提取和分析带来了诸多不便。手动录入信息不仅耗时费…

PHP防伪溯源查询系统小程序

&#x1f512; 防伪溯源查询系统——打造全方位品牌保护新利器&#xff0c;守护每一份信任 &#x1f4f1; 这是一款专为现代品牌量身打造的防伪溯源查询系统&#xff0c;它宛如品牌的贴身保镖&#xff0c;巧妙融合了PHP与Uniapp的前沿技术&#xff0c;无缝衔接微信小程序、H5网…

Xilinx kintex-7系列 FPGA支持PCIe 3.0 吗?

Xilinx kintex-7系列资源如下图 Xilinx各系列的GT资源类型和性能 PCIe Gen1/2/3的传输速率对比 K7上面使用的高速收发器GTX最高速率为12.5GT/s&#xff0c; PCIe Gen2 每个通道的传输速率为 5 GT/s。 PCIe Gen3 每个通道的传输速率为 8 GT/s。 所以理论上硬件支持PCIe3.0&#…

机器视觉--Halcon变量的创建与赋值

一、引言 在机器视觉领域&#xff0c;Halcon 作为一款强大且功能丰富的软件库&#xff0c;为开发者提供了广泛的工具和算子来处理各种复杂的视觉任务。而变量作为程序中存储和操作数据的基本单元&#xff0c;在 Halcon 编程中起着至关重要的作用。正确地创建和赋值变量是编写高…

2025年单片机毕业设计选题物联网计算机电气电子通信类

当然&#xff0c;以下是基于物联网技术设计的20个单片机类题目&#xff0c;旨在考察学生在物联网环境下单片机应用、系统设计、数据传输与处理等方面的能力&#xff1a; 基于物联网的智能家居温度湿度控制系统设计&#xff1a;利用单片机和传感器实现室内环境的温湿度监测&…

反向代理ml

1 概念 1.1 反向代理概念 反向代理是指以代理服务器来接收客户端的请求&#xff0c;然后将请求转发给内部网络上的服务器&#xff0c;将从服务器上得到的结果返回给客户端&#xff0c;此时代理服务器对外表现为一个反向代理服务器。 对于客户端来说&#xff0c;反向代理就相当…

能源物联网数据采集网关 多协议对接解决方案

安科瑞刘鸿鹏 摘要 随着配电系统智能化需求的提升&#xff0c;现代配电物联网&#xff08;IoT&#xff09;系统对数据采集、传输、处理及远程管理能力提出了更高要求。智能网关作为连接现场设备与上层管理平台的核心枢纽&#xff0c;其性能直接影响系统的实时性、可靠性与扩展…