ElasticSearch分页查询性能及封装实现

server/2024/9/24 8:54:46/

Es的分页方式

from+size

最基本的分页方式,类似于SQL中的Limit语法:

javascript">//查询年龄在12到32之间的前15条数据
{"query":{"bool":{"must":{"range":{"user_age":{"gte":12,"lte":32}}}}},"sort":{"user_age":{"order":"desc"}}"from":0,"size":15
}

 与Limit一样,from+size分页是通过设置from参数来指定返回结果的起始位置,而size参数来指定返回结果的数量。这里的页码在程序中需要额外进行处理一下,因为from是从0开始的,而size代表返回的条数,使用时需要对页码参数进行 :

from = (pageNo - 1) * pageSize 

 原理

 Es的查询过程如上图,即一个查询请求,在集群环境下是会被协调到各个节点中,最终落到对应索引的分片上,由每个分片进行查询,最终将数据回给主节点进行汇总。而使用from+size的分页,每个分片的处理则是这样的:

  • 搜索请求通常跨越多个分片,每个分片必须将其请求的命中内容以及任何先前页面的命中内容加载到内存中。
  • 对于翻页较深的页面或大量结果,这些操作会显著增加内存和 CPU 使用率,从而导致性能下降或节点故障。

例如,from=10000,size=10,需要将10010 条数据加载到内存,这通常意味着需要从多个分片中收集数据,然后在协调节点上进行合并和排序,然后经过后台处理后返回了最后 10条我们想要的数据。这个过程随着数据量的增加而变得更加复杂和资源密集,那也就意味着,越往后翻页(也就是深度翻页)需要加载的数据量越大,势必会越耗费 CPU + 内存资源,响应也会越慢。

性能

默认情况下,from+size 的限制是 10000,这意味着 from 参数加上 size 参数的值不能超过 10000,这是为了避免大数据量的召回导致性能低下。如果你尝试进行深度分页,超过了这个限制,Elasticsearch 会抛出错误,提示结果窗口太大。

为了解决这个问题,可以通过调整 index.max_result_window 的值来增加这个限制,但这通常不推荐,因为它会增加内存和 CPU 使用率,可能导致性能下降或节点故障

超出from+size限制报错:

javascript">"root_cause": [{"type": "illegal_argument_exception","reason": "Result window is too large, from + size must be less than or equal to: [10000] but was [10001]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting."}],"type": "search_phase_execution_exception",

scroll分页

原理

当发起一个带有 scroll 参数的搜索请求时,Elasticsearch 的分片会为这次搜索创建一个上下文,然后各分片基于快照数据进行相应的查询,每轮查询结束后,会记录一个scrollId,将这批结果以及对应的scrollId返回给客户端。

此时客户端根据返回的scrollId再次发起查询,此时Es服务端会根据该scrollId,找到所属的上下文,并基于上次查询的结果的尾段进行继续查询,相比from+size的每次查询都需要重复大数据量的召回,scroll查询有效的避免了召回操作。减少了CPU、IO的消耗。

性能

相比from+size的重复大批量数据召回消耗CPU和IO,Scroll是更友好且适合大数据量的深度查询(不受制于max_result_window),但是Scroll提高性能的代价是牺牲实时性;当开始一个 scroll session 时,Elasticsearch 会创建一个索引的快照(上下文中),这个快照代表了初始化搜索请求时的索引状态。在 scroll session 的生命周期内,即使索引发生了变化(如新增、删除或更新文档),这些变化也不会反映在后续的滚动查询中。

同时,快照、上下文维护的存在必然导致需要更多的内存来支撑

官方文档强调:不再建议使用scroll API进行深度分页。如果要分页检索超过 Top 10,000+ 结果时,推荐使用:PIT + search_after。

search_after分页

原理

search_after 是 Elasticsearch 5.0 以上版本提供的一种分页查询机制,用于解决深度分页的性能问题。它通过维护一个实时游标来避免传统 from+size 分页方式在处理大量数据时的性能损耗,也不需要像 scroll API 那样创建和维护一个历史快照,从而减少了资源的占用。

search_after进行查询时,必须指定排序字段,它使用上一次查询的最后一个文档的排序值来获取下一页数据。不同于scroll基于快照的查询,search_after是基于实时的数据,不需要维护一个很大的快照。 

性能

search_after 是一种无状态的分页方式,它不需要维护搜索上下文,因此不会占用额外的资源。每次请求都会根据上一次请求的最后一个文档的排序值来获取下一页数据,这样可以避免大量的内存消耗。search_after 提供了更好的实时性,因为它每次请求都会反映索引的最新状态。这意味着在查询过程中如果有数据的更新,这些变化会反映在分页结果中

场景及优缺点汇总

基于RestHighLevelClient的实现

RestHighLevelClient 是 Elasticsearch 的高级 Java 客户端,它提供了一套简单易用的 API 来与 Elasticsearch 服务器进行交互。

RestHighLevelClient位于org.elasticsearch.client包下,常用功能包括:

方法名称入参使用样例备注
createIndexclientRestHighLevelClient 实例, indexName: 索引名称createIndex(client, "my_index");创建索引 
deleteIndexclientRestHighLevelClient 实例, indexName: 索引名称deleteIndex(client, "my_index");删除索引 
indexrequestIndexRequest 对象client.index(request, RequestOptions.DEFAULT);插入数据 
getrequestGetRequest 对象client.get(request, RequestOptions.DEFAULT);根据ID获取数据 
updaterequestUpdateRequest 对象client.update(request, RequestOptions.DEFAULT);更新数据 
deleterequestDeleteRequest 对象client.delete(request, RequestOptions.DEFAULT);根据ID删除数据 
searchrequestSearchRequest 对象client.search(request, RequestOptions.DEFAULT);搜索数据 
scrollrequestSearchScrollRequest 对象client.scroll(request, RequestOptions.DEFAULT);滚动搜索 
clearScrollrequestClearScrollRequest 对象client.clearScroll(request, RequestOptions.DEFAULT);清除滚动ID 
bulkrequestBulkRequest 对象client.bulk(request, RequestOptions.DEFAULT);批量操作 
countrequestCountRequest 对象client.count(request, RequestOptions.DEFAULT);计数查询 
existsrequestGetRequest 对象client.exists(request, RequestOptions.DEFAULT);检查文档是否存在 
updateByQueryrequestUpdateByQueryRequest 对象client.updateByQuery(request, RequestOptions.DEFAULT);根据查询更新数据 
deleteByQueryrequestDeleteByQueryRequest 对象client.deleteByQuery(request, RequestOptions.DEFAULT);根据查询删除数据 

其中 BulkRequest 、GetRequest 等参数,均为ActionRequest的子类,具体使用方式可以参考下文。

其中DSL语法可配合org.elasticsearch.search.builder包中的Builder来进行构建,eg:

java"> public void test(){SearchSourceBuilder searchBody = new SearchSourceBuilder().from(0).size(10).query(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("user_name","张三")).must(QueryBuilders.termQuery("age",12))).sort("id", SortOrder.DESC).fetchSource(Arrays.asList("id","name","age").toArray(new String[0]),new String[0]).aggregation(AggregationBuilders.terms("Test").field("className").size(15));}

form+size分页

定义es数据实体类 DocBaseEntity<T>类:

java">@Data
public class DocBaseEntity<T> implements Serializable {private String _index;private String _type;private String _id;private T datas;public DocBaseEntity(SearchHit data) {this._index = data.getIndex();this._type = data.getType();this._id = data.getId();}public DocBaseEntity(JSONObject jsonHits){this._index = jsonHits.getStr("_index");this._type = jsonHits.getStr("_type");this._id = jsonHits.getStr("_id");}public T getDatas(){return datas;}}

查询返回实体类SearchResult<T>

java">@Data
public class SearchResult<T> implements Serializable {private int total;private List<DocBaseEntity<T>> source = new ArrayList<>();private JSONObject aggregations;public void addData(DocBaseEntity<T> obj){source.add(obj);}public List<T> getDatas(){return source.stream().map(DocBaseEntity::getDatas).collect(Collectors.toList());}public void addDatas(List<DocBaseEntity<T>> objs){source.addAll(objs);}public void setTotal(Object total){this.total = Integer.parseInt(String.valueOf(total));}public JSONObject toJSONObject(){return JSONUtil.parseObj(this,true);}}

定义查询接口ElasticSearchActuator

java">
public interface ElasticSearchActuator {/*** from+size 分页查询* @param indexName 索引名称* @param searchSourceBuilder 查询条件* @param pageNo 页码* @param pageSize 每页数量* @param resultObj 具体目标对象* @return SearchResult*/<T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj);}

 from+size分页实现

java">@Component
@Slf4j
public class ElasticSearchActuatorImpl implements ElasticSearchActuator {//restHighLevelClient客户端Configure相关单独编写,这里不再复述@Autowirdprivate RestHighLevelClient restHighLevelClient;private final static Integer MAX_RESULT_WINDOW = 10000;  @Overridepublic <T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj){SearchResult<T> resultMap = new SearchResult<T>();//提前规避超出长度的情况if( from+size >= MAX_RESULT_WINDOW){log.error("XXXXXXX")//其他操作return null;}//分页参数处理int from = (pageNo - 1) * pageSize;searchSourceBuilder.from(from).size(pageSize);SearchRequest searchRequest = new SearchRequest(indexName);searchRequest.source(SearchSourceBuilder );SearchResponse response = executSearch(searchRequest);if(null != response){return  createSearchResult(searchResp,resultObj);}return resultMap;}/*** 执行查询*//*** 执行查询*/private SearchResponse executSearch(SearchRequest searchRequest)     {SearchResponse searchResponse = null;try{searchResponse = restHighLevelClient.search(searchRequest,RequestOptions.DEFAULT);}catch(Exception e){//异常处理}return searchResponse;}/*** 构建目标结果* @param response 返回参数* @param resultObj 类对象* @param <T>* @return*/private <T> SearchResult<T> createSearchResult(SearchResponse response,Class<T> resultObj){SearchResult<T> resultMap = new SearchResult<>();SearchHit[] datas = response.getHits().getHits();for(SearchHit data:datas){DocBaseEntity<T> temp = new DocBaseEntity<>(data);temp.setDatas(JSONUtil.toBean(JSONUtil.parseObj(data.getSourceAsMap()),resultObj));resultMap.addData(temp);}resultMap.setTotal(response.getHits().getTotalHits().value);return resultMap;}}

scroll分页

SearchResult<T>补充scrollId值:

java">@Data
public class SearchResult<T> implements Serializable {private int total;//scrollIdprivate String scrollId;private List<DocBaseEntity<T>> source = new ArrayList<>();private JSONObject aggregations;public void addData(DocBaseEntity<T> obj){source.add(obj);}public List<T> getDatas(){return source.stream().map(DocBaseEntity::getDatas).collect(Collectors.toList());}public void addDatas(List<DocBaseEntity<T>> objs){source.addAll(objs);}public void setTotal(Object total){this.total = Integer.parseInt(String.valueOf(total));}public JSONObject toJSONObject(){return JSONUtil.parseObj(this,true);}}

继续定义查询接口ElasticSearchActuator

java">
public interface ElasticSearchActuator {/*** from+size 分页查询* @param indexName 索引名称* @param searchSourceBuilder 查询条件* @param pageNo 页码* @param pageSize 每页数量* @param resultObj 具体目标对象* @return SearchResult*/<T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj);/*** 滚动分页查询* @param indexName 索引* @param searchSourceBuilder 查询体* @param pageNo 页码* @param pageSize 每页条数* @param scrollId 滚动ID* @param resultObj 目标对象* @return SearchResult* @param <T> T*/<T> SearchResult<T> scrollSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,String scrollId,Class<T> resultObj);}

实现类:

java">@Component
@Slf4j
public class ElasticSearchActuatorImpl implements ElasticSearchActuator {//restHighLevelClient客户端Configure相关单独编写,这里不再复述@Autowirdprivate RestHighLevelClient restHighLevelClient;private final static Integer MAX_RESULT_WINDOW = 10000;  @Overridepublic <T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj){//……省略from+size查询}@Overridepublic <T> SearchResult<T> scrollSearchElasticSearchDatas(String indexName, SearchSourceBuilder searchSourceBuilder, int pageNo, int pageSize, String scrollId, Class<T> resultObj) throws IOException {SearchRequest searchRequest = new SearchRequest(indexName);searchSourceBuilder.size(pageSize);//设定scroll失效时长Scroll scroll = new Scroll(TimeValue.timeValueMinutes(3));searchRequest.scroll(scroll);SearchResponse searchResponse = null;if(StringUtils.isEmpty(scrollId)){searchResponse = executSearch(searchRequest);String tempscrollId = searchResponse.getScrollId();SearchScrollRequest searchScrollRequest = new SearchScrollRequest(tempscrollId);searchScrollRequest.scroll(scroll);for (int i = 0; i < (pageNo -1); i++) {searchResponse = scrollSearch(searchScrollRequest);}scrollId = tempscrollId;}else {SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);searchResponse = scrollSearch(searchScrollRequest);}//构建结果SearchResult<T> result = createSearchResult(searchResponse,resultObj);result.setSrcollId(scrollId);clearScrollSession(scrollId);return result;}/*** 滚动查询执行* @param searchScrollRequest* @return*/private  SearchResponse scrollSearch(SearchScrollRequest searchScrollRequest){SearchResponse searchResponse = null;try{searchResponse = restHighLevelClient.scroll(searchScrollRequest,RequestOptions.DEFAULT);}catch(Exception e){//异常处理}return searchResponse;}/*** 关闭scroll* @param scrollId* @throws IOException*/private void clearScrollSession(String scrollId) throws IOException {if (scrollId != null) {ClearScrollRequest clearScrollRequest = new ClearScrollRequest();clearScrollRequest.addScrollId(scrollId);ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);clearScrollResponse.isSucceeded();}}}

注意:

使用scroll查询,如果设置的scroll超时,scroll ID会在指定的超时时间内保持活跃,这个超时时间可以通过scroll参数设置。一旦超出这个时间限制,scroll ID将失效,但是不会自动清理。为了避免资源泄露,建议在scroll使用完毕后,显式地清理scroll上下文。

这里最好建立一层缓存记录,即每次客户发来请求后,记录当次查询的scrollId序列,然后定时的释放掉缓存中不用的序列。

search_after分页

 对SearchResult<T>补充sortId值:

java">@Data
public class SearchResult<T> implements Serializable {private int total;//scrollIdprivate String scrollId;//sortIdprivate List<Object> sortId;private List<DocBaseEntity<T>> source = new ArrayList<>();private JSONObject aggregations;public void addData(DocBaseEntity<T> obj){source.add(obj);}public List<T> getDatas(){return source.stream().map(DocBaseEntity::getDatas).collect(Collectors.toList());}public void addDatas(List<DocBaseEntity<T>> objs){source.addAll(objs);}public void setTotal(Object total){this.total = Integer.parseInt(String.valueOf(total));}public JSONObject toJSONObject(){return JSONUtil.parseObj(this,true);}}

继续定义查询接口ElasticSearchActuator

java">public interface ElasticSearchActuator {/*** from+size 分页查询* @param indexName 索引名称* @param searchSourceBuilder 查询条件* @param pageNo 页码* @param pageSize 每页数量* @param resultObj 具体目标对象* @return SearchResult*/<T> SearchResult<T> fromSizeSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,Class<T> resultObj);/*** 滚动分页查询* @param indexName 索引* @param searchSourceBuilder 查询体* @param pageNo 页码* @param pageSize 每页条数* @param scrollId 滚动ID* @param resultObj 目标对象* @return SearchResult* @param <T> T*/<T> SearchResult<T> scrollSearchElasticSearchDatas(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,String scrollId,Class<T> resultObj);/*** aftersearch分页查询* @param indexName 索引* @param searchSourceBuilder 查询体dsl* @param pageNo 页码* @param pageSize 每页条数* @param sortId 排序游标* @param resultObj 目标对象* @return SearchResult* @param <T> T*/<T> SearchResult<T> afterSearchElasticSearchData(String indexName,SearchSourceBuilder searchSourceBuilder,int pageNo,int pageSize,List<Object> sortId,Class<T> resultObj);}

实现类:

java">@Component
@Slf4j
public class ElasticSearchActuatorImpl implements ElasticSearchActuator {//……其他逻辑@Overridepublic <T> SearchResult<T> afterSearchElasticSearchData(String indexName, SearchSourceBuilder searchSourceBuilder, int pageNo, int pageSize, List<Object> sortId, Class<T> resultObj) {SearchRequest searchRequest = new SearchRequest(indexName);searchSourceBuilder.size(pageSize);if(!CollectionUtils.isEmpty(sortId)){searchSourceBuilder.searchAfter(sortId.toArray());}else {if(pageNo > 1){//如果不携带上次排序标识,且非首页,递归查询SearchResult<T> previousPage = afterSearchElasticSearchData(indexName,searchSourceBuilder,pageNo-1,pageSize,null,resultObj);searchSourceBuilder.searchAfter(previousPage.getSortId().toArray());}searchRequest.source(searchSourceBuilder);}try{SearchResponse response = executSearch(searchRequest);SearchResult<T> rest = createSearchResult(response,resultObj);SearchHit[] hits = response.getHits().getHits();if(hits.length > 0){rest.setSortId(Arrays.asList(hits[hits.length-1].getSortValues()));}return rest;}catch (Exception e){//异常处理log.error("XXXXXX");}return null;}}

http://www.ppmy.cn/server/121270.html

相关文章

C语言——自定义类型

目录 结构体 概念 结构体变量的创建和初始化 结构体的自引用 结构体的内存对齐 内存对齐存在的原因 合理设计结构体 方法一 方法二 结构体传参 结构体实现位段 什么是位段 位段的内存分配 位段的跨平台问题 注意 联合体 概念 验证 优点 小应用 什么是大小…

每日一练:二叉树的直径

543. 二叉树的直径 - 力扣&#xff08;LeetCode&#xff09; 一、题目要求 给你一棵二叉树的根节点&#xff0c;返回该树的 直径 。 二叉树的 直径 是指树中任意两个节点之间最长路径的 长度 。这条路径可能经过也可能不经过根节点 root 。 两节点之间路径的 长度 由它们之…

WinCC中归档数据片段的时间和尺寸设置

1&#xff0e;归档数据片段介绍工控人加入PLC工业自动化精英社群 1.1 概述 WinCC V6.2 开始的后台数据库采用了MS SQL Server 2005 &#xff0c;所以归档方式与V5 有所不同&#xff0c;它的运行数据存放在数据片段&#xff08;segment&#xff09;当中&#xff0c;工程师可以…

基于单片机的智能校园照明系统

由于校园用电量较大&#xff0c;本设计可以根据实际环境情况的改变&#xff0c;实现实时照明的控制。本设计以单片机芯片为控制芯片&#xff0c;热释电传感器采集教室中学生出入的信息&#xff0c;并把信息传递给单片机芯片&#xff0c;单片机芯片根据传感器传递过来的信息来控…

列表、数组排序总结:Collections.sort()、list.sort()、list.stream().sorted()、Arrays.sort()

列表类型 一.Collections.sort() Collections.sort()用于List类型的排序&#xff0c;其提供了两个重载方法&#xff1a; 1.sort(List<T> list) &#xff08;1&#xff09;List指定泛型时只能指定引用数据类型&#xff0c;也就是说无法用于基本数据类型的排序。 &am…

二、电脑入门2之常用dos命令

打开dos命令窗口 win R 常用dos命令 dir&#xff1a; 列出当前目录下的所有文件以及目录 cls &#xff1a;清理屏幕 exit&#xff1a; 关闭dos命令窗口 c:(盘字母后带冒号) 切换盘符 del&#xff1a; 删除文件 ipconfig &#xff1a; 查看IP信息 ipconfig/all &#xf…

微前端中的路由加载流程

1. 初始化基座应用 基座应用&#xff1a;基座应用是微前端架构中的主应用&#xff0c;负责管理和协调各个子应用的加载和卸载。 初始化&#xff1a;基座应用在启动时会初始化路由配置&#xff0c;注册各个子应用的路由。 2. 注册子应用 子应用需要向基座应用注册自己的路由和…

新建flask项目,配置入口文件,启动项目

pycharm新建flask项目时&#xff0c;会提供一个创建flask项目的导向&#xff0c;自动设置虚拟环境&#xff0c;并且安装flask及其依赖而vscode新建flask项目时&#xff0c;需要手动设置虚拟环境并安装flask&#xff0c;需要在终端使用pip install flask命令来安装flask及其依赖…