快速掌握Elasticsearch检索之二:滚动查询(scrool)获取全量数据(golang)

devtools/2024/12/28 12:24:39/

Elasticsearch8.17.0在mac上的安装

Kibana8.17.0在mac上的安装

Elasticsearch检索方案之一:使用from+size实现分页

1、滚动查询的使用场景

滚动查询区别于上一篇文章介绍的使用from、size分页检索,最大的特点是,它能够检索超过10000条外的所有文档,可以理解为是一种全量检索的技术方案,也正是因为这种特性,使得滚动查询的代价非常高昂,检索过程消耗大量的内存,所以对于实时检索的场景,滚动查询是不适用的。

那滚动查询使用在什么场景呢?主要是应用在离线、检索全量数据,对于实时性要求不高的场景,比如一个数据平台,前台页面展示的数据用来预览,可以使用from+size分页查询,以提升检索效率以及平台的用户体验,如果还需要检索全量数据用于二次使用,那么后台离线检索全量就需要使用滚动查询以获取到全量数据,这将是一个耗费大量资源和时间的过程。

2、使用Kibana直观体验滚动查询

初始化滚动查询:

GET /new_tag_202411/_search?scroll=1m
{"size": 10,"sort":[{"doc_id":{"order": "asc"}}]
}

检索条件设置返回2条数据,按【doc_id】字段升序排列,doc_id分别为1-10的文档。

scroll=1m,表示Elasticsearch允许等待的最长时间是1分钟,如果在一分钟之内,接下来的 scroll 请求没有到达的话,那么当前请求的上下文将会失效:

 从上图返回可以看出,有一个【_scroll_id】字段,这个字段非常重要,接下来的滚动查询需要使用这个字段:

第一次滚动,返回doc_id从11开始的数据,第二次滚动时,需要使用第一次滚动返回的【_scroll_id】替换滚动请求,数据从doc_id为21的数据开始返回,之后循环这个过程,直到检索到全部数据。

注意一点,在测试过程中,我创建了多次滚动查询,发现scrool_id特别像,大家别误以为scrool_id没变,比如以下三个scrool_id,每个id只有3个字符不一样:

FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFng3akdDTWthVFZLVTE0ODhLdGdaR1EAAAAAAAAWbhZZZEloTnlyU1FGaTgxQV9QR1pXTUdR

FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFng3akdDTWthVFZLVTE0ODhLdGdaR1EAAAAAAAActhZZZEloTnlyU1FGaTgxQV9QR1pXTUdR

FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFng3akdDTWthVFZLVTE0ODhLdGdaR1EAAAAAAAAjDxZZZEloTnlyU1FGaTgxQV9QR1pXTUdR

3、代码实现滚动查询(golang)

首先是初始化一个滚动查询:

res, err := client.Search(client.Search.WithIndex("new_tag_202411"),client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),client.Search.WithScroll(time.Minute*1),
)

这行代码:

client.Search.WithScroll(time.Minute*1)

就是在设置滚动查询上下文的有效时间,其他几行很容易理解。

这几行代码执行完成后,除了能拿到检索数据,还能拿到scroll_id。之后就可以进行滚动查询:

for {docs = Documents{}res, err = client.Scroll(client.Scroll.WithScrollID(scrollId),client.Scroll.WithScroll(time.Minute),)if err != nil {fmt.Println("scroll err:", err.Error())return}err = json.NewDecoder(res.Body).Decode(&docs)if err != nil {fmt.Println("json decode err:", err)return}if len(docs.Hits.Hits) == 0 {break}fmt.Println("search count:", len(docs.Hits.Hits))scrollId = docs.ScrollID
}

这里要注意的一点是,循环滚动时,每个轮次,必须更新scrool_id为上一次滚动返回的值,如上面最后一行代码。

L17-L19行的代码,表示已经查出所有数据,本次没有数据了,同时循环结束。

4、一个必须要考虑的问题

对于滚动查询,前面也说过,会创建一个上下文,当es中存在的上下文数量超过一定限制后,将无法再次创建滚动查询,从而无法检索数据,这个【限制】es默认是500个,我们可以通过es的api查看当前系统中已经创建的上下文数量:

GET /_nodes/stats/indices/search

默认情况下,只要【open_contexts】值小于500,都能正常进行滚动查询,如果已经创建了500个上下文,就会出现问题,下面测试一下,利用代码,创建500个上下文:

 如上图,上下文已经创建500个,运行代码,再次执行滚动查询的动作:

无法查出任何数据,但是以下代码也无任何的报错:

res, err := client.Search(client.Search.WithIndex("new_tag_202411"),client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),client.Search.WithScroll(time.Minute*100),
)
if err != nil {fmt.Println("search err:", err.Error())return
}

没有走到err分支,经过调试发现,res的结构中的http状态码变了,我们加一行打印:

res, err := client.Search(client.Search.WithIndex("new_tag_202411"),client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),client.Search.WithScroll(time.Minute*100),)if err != nil {fmt.Println("search err:", err.Error())return}fmt.Println("resp code:", res.StatusCode)err = json.NewDecoder(res.Body).Decode(&docs)if err != nil {fmt.Println("decode err:", err.Error())return}

 运行结果如下:

状态码由正常值0变成了429,所以,在执行滚动查询时,我们需要加上对状态码的判断,以捕获到上下文超限的情况,否则没有检索到数据,还以为系统出bug了呢。

这个问题就是滚动查询的一个短板,系统用户量大了,发起滚动查询一旦超过500,就会影响用户检索数据,当然了,es还是有其他解决方案来进行全量的数据检索,还是那句话,下一篇文章再写。

5、所有代码

github:GitHub - liupengh3c/career 

代码位于以下文件:

https://github.com/liupengh3c/career/blob/main/elastic/scrool/main.go

代码也粘过来吧:

package mainimport ("fmt""os""strings""time""github.com/elastic/go-elasticsearch/v8"jsoniter "github.com/json-iterator/go""github.com/liupengh3c/esbuilder"
)// 最外层数据结构
type Documents struct {ScrollID string      `json:"_scroll_id"`Shards   Shards      `json:"_shards"`Hits     HitOutLayer `json:"hits"`TimedOut bool        `json:"timed_out"`Took     int         `json:"took"`
}
type Shards struct {Failed     int `json:"failed"`Skipped    int `json:"skipped"`Successful int `json:"successful"`Total      int `json:"total"`
}
type HitOutLayer struct {Hits     []Hits  `json:"hits"`MaxScore float64 `json:"max_score"`Total    Total   `json:"total"`
}
type Hits struct {ID     string         `json:"_id"`Index  string         `json:"_index"`Score  float64        `json:"_score"`Source map[string]any `json:"_source"`Type   string         `json:"_type"`
}
type Total struct {Relation string `json:"relation"`Value    int    `json:"value"`
}func main() {client, err := NewEsClient()if err != nil {fmt.Println("create client err:", err.Error())return}fmt.Println("connect success")for i := 0; i < 510; i++ {ScrollSearch(client)}
}
func NewEsClient() (*elasticsearch.Client, error) {cert, _ := os.ReadFile("/Users/liupeng/Documents/study/elasticsearch-8.17.0/config/certs/http_ca.crt")client, err := elasticsearch.NewClient(elasticsearch.Config{Username:  "elastic",Password:  "XBS=adqa799j_Aoz=A+h",Addresses: []string{"https://127.0.0.1:9200"},CACert:    cert,})if err != nil {// fmt.Println("create client err:", err.Error())return client, err}return client, nil
}func ScrollSearch(client *elasticsearch.Client) {var json = jsoniter.ConfigCompatibleWithStandardLibrarydocs := Documents{}dslQuery := esbuilder.NewDsl()boolQuery := esbuilder.NewBoolQuery()dslQuery.SetOrder(esbuilder.NewSortQuery("doc_id", "asc"))dslQuery.SetQuery(boolQuery)dslQuery.SetSize(10000)res, err := client.Search(client.Search.WithIndex("new_tag_202411"),client.Search.WithBody(strings.NewReader(dslQuery.BuildJson())),client.Search.WithScroll(time.Minute*20),)if err != nil {fmt.Println("search err:", err.Error())return}err = json.NewDecoder(res.Body).Decode(&docs)if err != nil {fmt.Println("decode err:", err.Error())return}fmt.Println("search count:", len(docs.Hits.Hits))scrollId := docs.ScrollIDfor {docs = Documents{}res, err = client.Scroll(client.Scroll.WithScrollID(scrollId),client.Scroll.WithScroll(time.Minute),)if err != nil {fmt.Println("scroll err:", err.Error())return}err = json.NewDecoder(res.Body).Decode(&docs)if err != nil {fmt.Println("decode err:", err.Error())return}defer res.Body.Close()if res.StatusCode == 429 {fmt.Println("scroll contexts is more than 500")return}if len(docs.Hits.Hits) == 0 {break}fmt.Println("search count:", len(docs.Hits.Hits))scrollId = docs.ScrollID}client.ClearScroll(client.ClearScroll.WithScrollID(scrollId),)
}


http://www.ppmy.cn/devtools/146109.html

相关文章

OCR(五)linux 环境 基于c++的 paddle ocr 编译【CPU版本 】

1. 下载 下载opencv4.10 2. 编译opencv 2.1 安装依赖库 sudo apt install -y g ++ sudo apt install -y cmake sudo apt install -y make sudo apt install -y wget sudo apt install -y unzip sudo apt-get install build-essential libgtk2.0-dev libgtk-3-devlibavcodec-…

一篇文章学会HTML

目录 页面结构 网页基本标签 图像标签 超链接标签 文本链接 图像链接 锚链接 功能链接 列表 有序列表 无序列表 自定义列表 表格 跨列/跨行 表头 媒体元素 视频 音频 网站的嵌套 表单 表单元素 文本框 单选框 多选框 按钮 下拉框 文本域和文件域 表…

mysql部署(5.7.31)

下载地址&#xff1a;MySQL :: Download MySQL Community Server (Archived Versions) 因为系统是Kylin Linux Server V10&#xff0c;底层是centos7&#xff0c;所以选择下面版本 一、删除原来安装的mysql 1.查询并删除 [rootlocalhost /]# find / -name mysql find: ‘/ru…

Windows Subsystem(副系统)

目录 Windows Subsystem for Linux (WSL)&#xff1a; Windows Subsystem for Android (WSA)&#xff1a; 其他 Windows 子系统&#xff1a; Windows 子系统&#xff08;Windows Subsystem&#xff09;是微软在 Windows 操作系统中引入的一种技术&#xff0c;它允许在 Windo…

打造高效美颜体验:直播APP美颜SDK技术原理与开发实战

随着直播行业的迅猛发展&#xff0c;越来越多的直播平台为主播提供了美颜功能&#xff0c;提升观众的观看体验。美颜SDK&#xff08;Software Development Kit&#xff0c;软件开发工具包&#xff09;作为直播APP中不可或缺的一部分&#xff0c;承担着实时优化视频画质、改善主…

电子应用设计方案75:智能家庭智能锁系统设计

智能家庭智能锁系统设计 一、引言 智能家庭智能锁系统作为家庭安全防护的重要环节&#xff0c;为用户提供了更加便捷、安全和智能化的门锁解决方案。本设计方案旨在打造一个功能强大、性能稳定且易于使用的智能锁系统。 二、系统概述 1. 系统目标 - 实现多种开锁方式&#xf…

数据仓库和数据湖 数据仓库和数据库

数据仓库和数据湖是两种不同的数据存储解决方案&#xff0c;它们在设计、用途和数据管理方式上有着显著的区别。以下是数据仓库和数据湖的主要区别&#xff1a; 1. 数据结构&#xff1a;• 数据仓库&#xff1a;通常存储结构化数据&#xff0c;这些数据经过清洗、转换和加载&a…

CV(7)--神经网络训练

前言 仅记录学习过程&#xff0c;有问题欢迎讨论 什么是神经网络&#xff1a; 神经网络是一种模拟人脑神经元工作原理的算法&#xff0c;它由多个神经元组成&#xff0c;每个神经元都接受输入&#xff0c;通过计算产生输出&#xff0c;并将输出传递给其他神经元。神经网络的…