StarRocks Elasticsearch Catalog原理简析

news/2024/9/28 5:16:46/

前言

Elasticsearch不仅是强大的全文搜索引擎,在很多场景下(特别是TiDB、ShardingSphere等框架成熟之前)也被当做分布式HTAP数据库使用,在存储、更新海量数据的同时,提供高效的点查和部分聚合查询能力。StarRocks从3.1版本开始支持Elasticsearch Catalog,极大方便了ES数据的联邦查询。本文简要分析其原理,并提出一个小问题和对应的临时解决方法。

元数据获取阶段

当用户创建一个ES Catalog时,本质是创建了ElasticsearchConnector和它对应的ElasticsearchMetadata,后者持有该Catalog的全部配置信息和访问ES集群的EsRestClient。这点和2.x版本中旧有的ES外表不同,每张ES外表都会对应一个EsRestClient,会导致目标ES集群的HTTP连接数比正常偏多,ES Catalog则基本不存在这个问题。

每个ES Catalog只有一个默认数据库default_db,以下则是ES实例中的索引,在FE中称为EsTable,相当于复用了原ES外表的实现(当然ES Catalog会自动获取并推断字段,无需自己建表)。每个EsTable对象都持有一个EsMetaStateTracker用于同步元数据,其中又分为3个阶段(phase),按顺序分别为:

  • VersionPhase:通过GET /请求获取ES集群的版本号;
  • MappingPhase:通过GET /indexName/_mapping请求获取索引的Mapping信息,同时解析keyword类型字段(包括text内嵌的keyword)和存在doc_values的字段(即允许排序、聚合的字段),并存入上下文;
  • PartitionPhase:通过GET /indexName/_search_shards请求获取索引的分片信息,再通过GET /_nodes/http请求获取ES集群数据节点的地址,将分片ID和所在节点的映射关系存入EsShardPartitions容器。

FE计划阶段

ES Catalog查询对应的物理节点是EsScanNode,在生成Fragment的过程中除了维护Catalog的信息外,还会负责计算ScanRangeLocation,即每个BE节点负责请求的ES分片的对应关系,同时会尽量做colocate分配,使得BE节点和请求的ES分片所在节点是同一个(当然实际部署中这种情况不多见)。另外执行EXPLAIN语句时,会打印查询谓词翻译出来的ES DSL,如下所示。注意这个DSL只是示意作用,实际执行时BE会重新生成一次。

MySQL [default_db]> EXPLAIN SELECT id,waybillCode,orderTime FROM realtimewaybillmonitor_202409 WHERE yn <= 0 AND orderTime >= hours_sub(now(), 1) AND waybillCode LIKE 'JDX%' AND length(sku) > 3 LIMIT 1000;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Explain String                                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                                                                             |
|  OUTPUT EXPRS:13: id | 130: waybillCode | 71: orderTime                                                                                                     |
|   PARTITION: UNPARTITIONED                                                                                                                                  |
|                                                                                                                                                             |
|   RESULT SINK                                                                                                                                               |
|                                                                                                                                                             |
|   2:EXCHANGE                                                                                                                                                |
|      limit: 1000                                                                                                                                            |
|                                                                                                                                                             |
| PLAN FRAGMENT 1                                                                                                                                             |
|  OUTPUT EXPRS:                                                                                                                                              |
|   PARTITION: RANDOM                                                                                                                                         |
|                                                                                                                                                             |
|   STREAM DATA SINK                                                                                                                                          |
|     EXCHANGE ID: 02                                                                                                                                         |
|     UNPARTITIONED                                                                                                                                           |
|                                                                                                                                                             |
|   1:Project                                                                                                                                                 |
|   |  <slot 13> : 13: id                                                                                                                                     |
|   |  <slot 71> : 71: orderTime                                                                                                                              |
|   |  <slot 130> : 130: waybillCode                                                                                                                          |
|   |  limit: 1000                                                                                                                                            |
|   |                                                                                                                                                         |
|   0:EsScanNode                                                                                                                                              |
|      TABLE: realtimewaybillmonitor_202409                                                                                                                   |
|      PREDICATES: 9: yn <= 0, 71: orderTime >= '2024-09-26 15:46:17', 130: waybillCode LIKE 'JDX%', length(14: sku) > 3                                      |
|      LOCAL_PREDICATES: length(14: sku) > 3                                                                                                                  |
|      REMOTE_PREDICATES: 9: yn <= 0, 71: orderTime >= '2024-09-26 15:46:17', 130: waybillCode LIKE 'JDX%'                                                    |
|      ES_QUERY_DSL: {"bool":{"must":[{"range":{"yn":{"lte":0}}},{"range":{"orderTime":{"gte":"2024-09-26 15:46:17"}}},{"wildcard":{"waybillCode":"JDX*"}}]}} |
|      ES index/type: realtimewaybillmonitor_202409/realtimewaybillmonitor                                                                                    |
|      limit: 1000                                                                                                                                            |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+

可见上述查询的前三个谓词都可以下推到ES,但是第四个谓词无法下推,需要将结果拉取到SR端再进行过滤。

BE执行阶段

BE接收到前述EsScanNode后,将能够下推到ES的谓词封装为EsPredicate,分为几种情况:

  • 二元谓词,且一侧需为字面量,形如yn <= 0orderTime >= hours_sub(now(), 1)(右侧可以做常量折叠)都符合条件;
  • 函数调用谓词,支持esquery()(直接透传DSL的SR内置函数)、IS NULLIS NOT NULLLIKE,其他的均无法下推。即如果把上述示例的waybillCode LIKE 'JDX%'改成starts_with(waybillCode, 'JDX') = 1,这个条件就不能下推了;
  • INNOT IN谓词,对应terms query,简单直接;
  • 复合的AND谓词,实际上是对以上三种情况的组合做分解。

下推到ES的谓词会从谓词列表中删除。接下来每个BE会分别创建ESScanReader以扫描ES数据,这里需要注意,如果不是所有谓词都下推到了ES(即谓词列表中还有剩余),那么为了保证结果准确,原始查询中的LIMIT子句也不能下推。

上一节中的查询实际生成的DSL JSON如下所示。如果无法命中doc_values,则会改用source查询。

{"query": {"bool": {"filter": [{"bool": {"should": [{"range": {"yn": {"lte": "0"}}}]}}, {"bool": {"should": [{"range": {"orderTime": {"gte": "1727336859000"}}}]}}, {"bool": {"should": [{"wildcard": {"waybillCode": "JDX*"}}]}}]}},"stored_fields": "_none_","docvalue_fields": ["waybillCode", "orderTime", "yn", "id", "sku"],"sort": ["_doc"],"size": 4096
}

正式执行查询时,又分为两种情况。

  • LIMIT子句下推到了ES,那么BE会认为这是一个"exactly-once"的查询(代码中如此),可以类比流式处理引擎中exactly-once的含义,即“只查询一次就可以了”。此时组装的搜索请求URL形如{target}/{index}/{type}/_search?terminate_after={limit}&preference=_shards:{shards}&{filter_path}
  • 若没有LIMIT子句下推到ES,则需要执行Scroll查询,分页获取结果。此时组装的搜索请求URL形如{target}/{index}/{type}/_search?scroll={keep_alive}&preference=_shards:{shards}&{filter_path}。Scroll上下文的TTL由BE参数es_scroll_keepalive设定,默认是5m

接下来ESScanReader每次请求上述URL获取一批数据,调用超时由BE参数es_http_timeout_ms设定,默认是5000(即5秒),在网络环境欠佳时,应适当调大。获取到的数据经过JSON解析,获取到doc_values或者_source,逐行填充到Chunk中(没有值的则填充默认值)。这里实际上可以优化为按列填充,代码中也有相应的TODO标记。

ES数组类型的问题

ES没有显式的数组类型,当某字段插入了多个值时,它会自然地变为数组类型,但在索引Mapping中无法直接区分该字段是否为数组。在我们的历史ES集群中,有大量ES索引含有实际为数组的字段,使用SR ES Catalog查询时则会抛出异常或只返回第一个值,影响体验。这里提出一个不优雅的临时解决方案,在Catalog参数中增加array_fields配置项,让用户创建ES Catalog时手动指定数组字段。

// Fields that should be treated as arrays when building Elasticsearch external table.                      
// Since Elasticsearch makes no distinction between scalar and array types, we should manually specify them.
// The format is: `field1,index2:field2...`                                                                 
// which means `field1` in all indices and `field2` in `index2` are arrays.                                 
@Config(key = KEY_ARRAY_FIELDS,                                                                             desc = "Fields that should be treated as arrays when building Elasticsearch external table. " +     "The format is: `field1,index2:field2,...`.",                                               defaultValue = "")                                                                                  
private String arrayFields;

然后在ElasticsearchMetadata中获取并缓存每个索引中的数组字段名。

private Map<String, Set<String>> indicesWithArrayFields;                                                     public ElasticsearchMetadata(EsRestClient esRestClient, Map<String, String> properties, String catalogName) {this.esRestClient = esRestClient;                                                                        this.properties = properties;                                                                            this.catalogName = catalogName;                                                                          this.indicesWithArrayFields = Arrays.stream(StringUtils.split(properties.get(KEY_ARRAY_FIELDS), ","))     .map(s -> StringUtils.split(s, ":"))                                                              .filter(kv -> kv.length <= 2)                                                                    .collect(                                                                                        Collectors.toMap(                                                                        kv -> kv.length == 2 ? kv[0] : "",                                               kv -> new HashSet<>(Collections.singletonList(kv.length == 2 ? kv[1] : kv[0])),  (v1, v2) -> {                                                                    v1.addAll(v2);                                                               return v1;                                                                   }                                                                                )                                                                                        );                                                                                               
}

构建EsTable时,会调用EsUtil.convertColumnSchema()方法创建ES表的Schema,将对应索引的arrayFields参数传递给它,并将数组字段重新用ArrayType包装起来即可。

public static List<Column> convertColumnSchema(EsRestClient client, String index, Set<String> arrayFields)throws AnalysisException {                                                                        List<Column> columns = new ArrayList<>();                                                             String mappings = client.getMapping(index);                                                           JSONObject properties = parseProperties(index, mappings);                                             if (null == properties) {                                                                             return columns;                                                                                   }                                                                                                     for (String columnName : properties.keySet()) {                                                       JSONObject columnAttr = (JSONObject) properties.get(columnName);                                  // default set json.                                                                              Type type = Type.JSON;                                                                            if (columnAttr.has("type")) {                                                                     type = convertType(columnAttr.get("type").toString());                                        if (arrayFields.contains(columnName)) {                                                       type = new ArrayType(type);                                                               }                                                                                             }                                                                                                 Column column = new Column(columnName, type, true);                                               columns.add(column);                                                                              }                                                                                                     return columns;                                                                                       
}

The End

大家晚安。


http://www.ppmy.cn/news/1531450.html

相关文章

​​合​​合​​信​息​​​龙​​湖​​数​​科​​一​​面​​​

1. 请尽可能详细地说明&#xff0c;Git中merge和rebase的区别和应用场景&#xff1f;Git中pull和fetch的区别和应用场景&#xff1f;Git中revert和reset的区别和应用场景&#xff1f;你的回答中不要写出示例代码。 Git中merge和rebase的区别和应用场景 merge 区别&#xff1…

前端工程记录:Vue2 typescript项目升级Vue3

由于typescript飞速发展&#xff0c;某些vue2项目也在vue3出现之前集成了typescript开发&#xff0c;例如我的个人网站&#xff0c;当时花费了不少时间。而vue3我使用一段时间后&#xff0c;在2022年左右开始投入生产&#xff0c;但是这个个站就没怎么维护了。若是想继续&#…

【LLM】 TinyAgent 构建指南

文章目录 TinyAgent 构建指南项目概述实现步骤步骤一&#xff1a;模型构建步骤二&#xff1a;工具构建步骤三&#xff1a;Agent 构建步骤四&#xff1a;运行 Agent 结论参考文献学习心得 TinyAgent 构建指南 项目地址&#xff1a;https://github.com/datawhalechina/tiny-univ…

【深度学习】05-Rnn循环神经网络-01- 自然语言处理概述/词嵌入层/循环网络/文本生成案例精讲

循环神经网络&#xff08;RNN&#xff09;主要用于自然语言处理的。 循环神经网络&#xff08;RNN&#xff09;、卷积神经网络&#xff08;CNN&#xff09;和全连接神经网络&#xff08;FCN&#xff09;是三种常见的神经网络类型&#xff0c;各自擅长处理不同类型的数据。下面…

【Go语言】深入解读Go语言中的指针,助你拨开迷雾见月明

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

虚拟机文件系统根目录上的磁盘空间不足?VMware虚拟机扩容磁盘步骤讲解

VMware虚拟机扩容磁盘步骤讲解 今天使用vmware&#xff0c;想使用Ubuntu虚拟机&#xff0c;结果出现这种情况&#xff1a; 我的环境&#xff1a; Ubuntu20.04 VMWare workstation pro 17 VMware设置 参考链接&#xff1a; https://blog.csdn.net/hktkfly6/article/details…

Sqlite_Datetime列选择三月的行

In SQLite, use the strftime function to extract components from a date/time value SELECT * FROM table WHERE strftime(%m, datemonth) 03;strftime(‘%m’, datemonth): extracts the month part from the datemonth column as a string (with leading zeros for sing…

50道渗透测试面试题,全懂绝对是高手

吉祥知识星球http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247485367&idx1&sn837891059c360ad60db7e9ac980a3321&chksmc0e47eebf793f7fdb8fcd7eed8ce29160cf79ba303b59858ba3a6660c6dac536774afb2a6330&scene21#wechat_redirect 《网安面试指南》…