HiveMetastore 的架构简析

news/2024/11/12 23:33:44/

HiveMetastore 的架构简析

Hive Metastore 是 Hive 元数据管理的服务。可以把元数据存储在数据库中。对外通过 api 访问。

hive_metastorethrift_3">hive_metastore.thrift

对外提供的 Thrift 接口定义在文件 standalone-metastore/src/main/thrift/hive_metastore.thrift 中。

内容包括用到的结构体和枚举,和常量,和 rpc Service。
如分区定义如下:

struct Partition {1: list<string> values // string value is converted to appropriate partition key type2: string       dbName,3: string       tableName,4: i32          createTime,5: i32          lastAccessTime,6: StorageDescriptor   sd,7: map<string, string> parameters,8: optional PrincipalPrivilegeSet privileges,9: optional string catName
}

Service的定义了 client 和 server 的 RPC 请求。如增加分区的定义如下:

service ThriftHiveMetastore extends fb303.FacebookService
{Partition add_partition(1:Partition new_part)throws(1:InvalidObjectException o1, 2:AlreadyExistsException o2, 3:MetaException o3)
}

ThriftHiveMetastore.java

hive_metastore.thrift 编译之后生成文件 ThriftHiveMetastore.java

ThriftHiveMetastore 结构如下:

public class ThriftHiveMetastore {// Iface 定义了 Service 的所有的接口。仅列出了 add_partitionpublic interface Iface extends com.facebook.fb303.FacebookService.Iface {public Partition add_partition(Partition new_part) throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException;// omit other methods}// AsyncIface 定义了异步接口。public interface AsyncIface extends com.facebook.fb303.FacebookService .AsyncIface {public void add_partition(Partition new_part, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;// omit other methods}// Client 的实现public static class Client extends com.facebook.fb303.FacebookService.Client implements Iface {public Partition add_partition(Partition new_part) throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException{send_add_partition(new_part);return recv_add_partition();}// receiveBase 调用 result.read 方法,从 protocalpublic Partition recv_add_partition() throws InvalidObjectException, AlreadyExistsException, MetaException, org.apache.thrift.TException{add_partition_result result = new add_partition_result();receiveBase(result, "add_partition");if (result.isSetSuccess()) {return result.success;}if (result.o1 != null) {throw result.o1;}if (result.o2 != null) {throw result.o2;}if (result.o3 != null) {throw result.o3;}throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "add_partition failed: unknown result");}}// Processor 是服务端处理框架,把所有要处理的 rpc 的名称和处理的映射放到 map里,客户端请求 rpc,先输出 rpc 的名称。public static class Processor<I extends Iface> extends com.facebook.fb303.FacebookService.Processor<I> implements org.apache.thrift.TProcessor {private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {processMap.put("add_partition", new add_partition());}}// AsyncProcessorpublic static class AsyncProcessor<I extends AsyncIface> extends com.facebook.fb303.FacebookService.AsyncProcessor<I> {}
}

IHMSHandler

IHMSHandler 是服务器端需要实现的接口。除了 ThriftHiveMetastore.Iface 外,还包括其他一些方法。

IHMSHandler extends ThriftHiveMetastore.Iface, Configurable

HMSHandler

HMSHandler 是服务器端的具体实现。创建服务时,创建 HMSHandler,多个线程调用同一个 HMSHandler 对象来处理 client 的请求。

public class HMSHandler extends FacebookBase implements IHMSHandler {@Overridepublic Partition add_partition(final Partition part)throws InvalidObjectException, AlreadyExistsException, MetaException {return add_partition_with_environment_context(part, null);}// omit other methods@Overridepublic Partition add_partition_with_environment_context(final Partition part, EnvironmentContext envContext)throws InvalidObjectException, AlreadyExistsException,MetaException {startTableFunction("add_partition",part.getCatName(), part.getDbName(), part.getTableName());Partition ret = null;Exception ex = null;try {ret = add_partition_core(getMS(), part, envContext);} catch (Exception e) {ex = e;if (e instanceof MetaException) {throw (MetaException) e;} else if (e instanceof InvalidObjectException) {throw (InvalidObjectException) e;} else if (e instanceof AlreadyExistsException) {throw (AlreadyExistsException) e;} else {throw newMetaException(e);}} finally {endFunction("add_partition", ret != null, ex, part != null ?  part.getTableName(): null);}return ret;}
}

getMS 从 ThreadLocal 里,每个线程单独的。

    @Overridepublic RawStore getMS() throws MetaException {Configuration conf = getConf();return getMSForConf(conf);}public static RawStore getMSForConf(Configuration conf) throws MetaException {RawStore ms = threadLocalMS.get();if (ms == null) {ms = newRawStoreForConf(conf);ms.verifySchema();threadLocalMS.set(ms);ms = threadLocalMS.get();}return ms;}

为什么 Handler 总是一个线程处理一个 client 的请求

如果不是一个线程处理一个 client 的请求,那么 client 先发送一个请求,然后再发送第二个请求时, RawStore ms = threadLocalMS.get(); 就有可能拿到的是其他线程的 ms。

因为 org.apache.thrift.server.TThreadPoolServer.serve方法中。为每个 socket 创建一个 client 对象,并且把 client 的所有请求有 WorkerProcess 进行处理。WorkerProcess 是一个 Runnable。最终提交到 executorService_ 中。

while(!this.stopped_) {try {TTransport client = this.serverTransport_.accept();WorkerProcess wp = new WorkerProcess(client);while(true) {try {this.executorService_.execute(wp);break;} catch (Throwable var13) {// omit}}} catch (TTransportException var14) {// }}
  • WorkerProcess
    WorkerProcess 也是除了设置停止标志外死循环。
do {if (eventHandler != null) {eventHandler.processContext(connectionContext, inputTransport, outputTransport);}} while(!TThreadPoolServer.this.stopped_ && processor.process(inputProtocol, outputProtocol));

processor 类型是 TUGIBasedProcessor。

  • 当客户端正常退出时。
    client 会调用 metastore 的 shutdown 方法。此方法里,清除所有的 threadlocal 对象。
	public void shutdown() {cleanupRawStore();PerfLogger.getPerfLogger(false).cleanupPerfLogMetrics();}protected static void cleanupRawStore() {try {RawStore rs = HMSHandler.getRawStore();if (rs != null) {HMSHandler.logInfo("Cleaning up thread local RawStore...");rs.shutdown();}} finally {HMSHandler handler = HMSHandler.threadLocalHMSHandler.get();if (handler != null) {handler.notifyMetaListenersOnShutDown();}HMSHandler.threadLocalHMSHandler.remove();HMSHandler.threadLocalConf.remove();HMSHandler.threadLocalModifiedConfig.remove();HMSHandler.removeRawStore();HMSHandler.logInfo("Done cleaning up thread local RawStore");}}
  • 异常退出时
    WorkerProcess 的 finally 处理不论是否当前连接调用 shutdown,都执行 eventHandler.deleteContext
finally {if (eventHandler != null) {eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
}

在 HiveMetaStore.java里定义了 eventHandler, 也调用了 cleanupRawStore,和 shutdown 方法里调用的一样。

@Override
public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {openConnections.decrementAndGet();// If the IMetaStoreClient#close was called, HMSHandler#shutdown would have already// cleaned up thread local RawStore. Otherwise, do it now.HMSHandler.cleanupRawStore();
}

threadLocal 对象的 remove 方法多次调用是没有副作用的。

其他考虑的点

在异常退出时,没有调用 PerfLogger.getPerfLogger(false).cleanupPerfLogMetrics();, 是否会内存溢出。
调用 PerfLogger.getPerfLogger(false). 当参数是 false 时,如果 ThreadLocal 里已经有,则不会创建对象。处理线程的个数是固定的。不会导致内存问题


http://www.ppmy.cn/news/1546014.html

相关文章

java-web-web后端知识小结

spring框架三大核心: IOC--控制反转 DI---依赖注入 AOP--面向切面编程 web开发技术小结 1.过滤器,JWT令牌 2.三层架构 IOC, DI AOP, 全局异常处理, 事务管理 mybatis 3.数据操作与存储 mysql 阿里云OSS(云存储) 各个技术的归属: 1.过滤器, cookie,session--javaWeb 2.JWT, 阿里…

目标检测YOLO实战应用案例100讲-基于深度学习的人眼视线检测

目录 知识储备 视觉深度的测定 基本知识 视觉检测中的关键技术 单眼感知景深 内部摄像机距离的效果 Face ID 与3D传感技术 什么是Face ID? 3D传感技术原理 主动测距法 被动测距法 基于深度学习的人眼视线检测代码 数据集读取与预处理 卷积神经网络模型构建 模型…

css | padding vs margin

前置知识 height是作用域内容(content)区域的 padding和margin用百分比的时候是怎么算的&#xff1f;父元素的宽度。注意&#xff0c;不是根据父元素相应的属性&#xff0c;就是父亲的width 自身的height是0 以下代码&#xff0c;外面盒子是100x10的&#xff0c;里面的widt…

【数据结构算法】归并排序

归并排序时间里在归并操作上的一种 归并排序&#xff08;Merge Sort&#xff09;是建立在归并操作上的一种高效排序算法。该算法是分治法&#xff08;Divide and Conquer&#xff09;的典型应用。归并排序的核心思想是将已有序的子序列合并&#xff0c;得到完全有序的序列。 …

pycharm 使用

前期配置 1、检查 Python 安装路径&#xff1a; 确保 E:\tools\Pyn392_EN_x64\python.exe 是你正确的 Python 安装路径。你可以在终端或命令提示符中运行这个命令&#xff0c;确保能正常找到Python。 E:\tools\Pyn392_EN_x64\python.exe --version2、检查 pip 是否正确安装&…

京津冀自动驾驶技术行业盛会|2025北京自动驾驶技术展会

“自动驾驶技术”已经成为全球汽车产业的焦点之一。在这个充满创新与变革的时代&#xff0c;“2025北京国际自动驾驶技术展览会”拟定于6月份在北京亦创国际会展中心盛大开幕&#xff0c;为全球自动驾驶技术领域的专业人士、企业以及爱好者们提供了一个交流与展示的平台。作为一…

FPGA实现串口升级及MultiBoot(五)通过约束脚本添加IPROG实例

本文目录索引 一个指令和三种方式通过约束脚本添加Golden位流工程MultiBoot位流工程验证example1总结代码缩略词索引: K7:Kintex 7V7:Vertex 7A7:Artix 7MB:MicroBlaze上一篇文章种总结了MultiBoot 关键技术,分为:一个指令、二种位流、三种方式、四样错误。针对以上四句话我…

股票量化实时行情接口WebSocket接入Python封装

Python做量化&#xff0c;如果是日内策略&#xff0c;需要更实时的行情数据&#xff0c;不然策略滑点太大&#xff0c;容易跑偏结果。 之前用行情网站提供的level1行情接口&#xff0c;实测平均更新延迟达到了6秒&#xff0c;超过10只股票并发请求频率过快很容易封IP。后面又尝…