Elasticsearch 索引数据预处理

server/2024/10/11 6:55:11/

pipeline

在文档写入 ES 之前,对数据进行预处理(ingest)工作通过定义 pipeline 和 processors 实现。

注意:数据预处理必须在 Ingest node 节点处理,ES 默认所有节点都是 Ingest node。

如果需要禁用 Ingest ,可以在 elasticsearch.yaml 配置:

node.ingest: false

pipeline Demo

创建 pipeline

# 创建名称为:pipeline_uppercase pipeline
# processors 包含一个 processor :将 message 字段的内容转换为大写
PUT _ingest/pipeline/pipeline_uppercase
{"description": "uppercase field message","processors": [{"uppercase": {"field": "message","ignore_missing": true}}]
}

写入数据

# 指定使用的 pipeline 名字
POST index_data/_doc?pipeline=pipeline_uppercase
{"name": "pipeline","message": "this is so cool!"
}

查看数据

GET index_data/_search# 结果显示
{"took" : 0,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 2,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "index_data","_type" : "_doc","_id" : "lIiXsnYBv8g5MehmL98X","_score" : 1.0,"_source" : {"name" : "pipeline","message" : "THIS IS SO COOL!"}}]}
}

模拟pipeline

GET /_ingest/pipeline/pipeline_uppercase/_simulate
{"docs": [{"_index": "index","_id": "id","_source": {"message": "this is so cool!"}},{"_index": "index","_id": "id","_source": {"message": "elasticsearch"}}]
}# 测试结果
{"docs" : [{"doc" : {"_index" : "index","_type" : "_doc","_id" : "id","_source" : {"message" : "THIS IS SO COOL!"},"_ingest" : {"timestamp" : "2020-12-30T07:44:47.1443329Z"}}},{"doc" : {"_index" : "index","_type" : "_doc","_id" : "id","_source" : {"message" : "ELASTICSEARCH"},"_ingest" : {"timestamp" : "2020-12-30T07:44:47.1443329Z"}}}]
}

内置 processor

ES 内置了大量 processor。

常用 processor :

  • convert
  • Date
  • Drop
  • Grok
  • Dissect
  • Remove
  • Rename
  • Set
  • URI

参考官方文档:

ES 还提供了 ingest 插件,需要自己安装

  • Ingest-geoip: 是地理位置处理的数据库插件,在最新的版本已经不作为插件发布,合并到geoip processor。
  • ingest-user-agent:扩展浏览器请求信息,在最新的版本已经不作为插件发布,合并到User agent processor。
  • ingest-attachment:该插件扩展ES处理文本文件的能力, 使用它可以实现对(PDF,DOC,EXCEL等)主流格式文件的文本抽取及自动导入。处理的Field必须是Base64格式的二进制编码。

使用 Ingest API

# 创建pipeline
PUT /_ingest/pipeline/<pipeline> # 查询pipeline
GET /_ingest/pipeline/<pipeline> 
GET /_ingest/pipeline# 删除pipeline
DELETE /_ingest/pipeline/<pipeline># 模拟pipeline
POST /_ingest/pipeline/<pipeline>/_simulate
GET /_ingest/pipeline/<pipeline>/_simulate
POST /_ingest/pipeline/_simulate
GET /_ingest/pipeline/_simulate

自定义 processor 插件

自定义 processor 可以仿照 ES 官方插件实现自己的 processor 处理。直接完毕直接安装插件就可以使用。

**初始化 IngestPlugin 插件 **

public class CoordinateConvertPlugin extends Plugin implements IngestPlugin {    			@Override    public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {        return Collections.singletonMap(CoordinateProcessor.TYPE, new CoordinateProcessor.Factory());    }
}

IngestPlugin内我们只需要实现 getProcessors 创建 processsor 工厂方法,其中 CoordinateProcessor.TYPE 是 processor 名称不能重复。ES框架会遍历所有的 IngestPlugin,加载 Processor。

new CoordinateProcessor.Factory() 是初始化 Processor.Factory 工厂类。

Processor.Factory工厂实现

Processor.Factory主要是定义processor的命令行格式和初始化CoordinateProcessor。

public static final class Factory implements Processor.Factory {static final Set<Property> DEFAULT_PROPERTIES = EnumSet.allOf(Property.class);@Overridepublic Processor create(Map<String, Processor.Factory> processorFactories, String tag,Map<String, Object> config) throws Exception {String field = readStringProperty(TYPE, tag, config, "field");String targetField = readStringProperty(TYPE, tag, config, "target_field", "coordinate");List<String> propertyNames = readOptionalList(TYPE, tag, config, "properties");boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", false);final Set<Property> properties;if (propertyNames != null) {properties = EnumSet.noneOf(Property.class);for (String fieldName : propertyNames) {try {properties.add(Property.parse(fieldName));} catch (Exception e) {throw newConfigurationException(TYPE, tag, "properties", "illegal field option [" +fieldName + "]. valid values are " + Arrays.toString(Property.values()));}}} else {properties = DEFAULT_PROPERTIES;}return new CoordinateProcessor(tag, field, targetField, properties, ignoreMissing);}}

Processor实现

CoordinateProcessor 是AbstractProcessor 的实现,所有的 processor 都是实现 Processor 接口,ES 框架定义了抽象类AbstractProcessor,不同的 Processor 实现各自不同的处理逻辑。

public class CoordinateProcessor extends AbstractProcessor {public static final String TYPE = "location_coordinate";public CoordinateProcessor(String tag, String field, String targetField, Set<Property> properties, boolean ignoreMissing) {super(tag);this.field = field;this.targetField = targetField;this.properties = properties;this.ignoreMissing = ignoreMissing;this.mapIDRevertLonLat = new MapIDRevertLonLat();}@Overridepublic IngestDocument execute(IngestDocument ingestDocument) throws Exception {Map<String, Object> additionalFields = new HashMap<>();String input = ingestDocument.getFieldValue(field, String.class);// 此处省略...additionalFields.put("longitude", value[0]);additionalFields.put("latitude", value[1]);return ingestDocument;}@Overridepublic String getType() {return TYPE;}
}

这样我们只需要在 execute 中实现自己的 processor 处理逻辑就实现了自己的 processor 了。

效果测试

创建 pipeline,process 指定为 location_coordinate。

PUT _ingest/pipeline/coordinate
{"description": "Extract single location coordinate information","processors": [{"location_coordinate": {"field": "url","ignore_missing": true}}]
}

使用 pipeline

POST /coordinate_test/_doc?pipeline=coordinate
{"url": "x=25357&y=6538&level=15"
}

测试效果

{"_index" : "coordinate_test","_type" : "_doc","_id" : "l4i4snYBv8g5MehmA9_v","_score" : 1.0,"_source" : {"coordinate" : {"level" : 15,"latitude" : "18.17","x" : 25357,"y" : 6538,"longitude" : "98.59"},"url" : "x=25357&y=6538&level=15"}}

小结

ES的 Ingest node pipeline 功能很强大,具有强大的数据处理的能力,官方提供了丰富的 processes,用户可以灵活选择,也可以通过自定义 IngestPlugin 实现更为复杂的操作。既可以灵活的变更索引的结构和数据,又可以减少对业务代码的侵入。对数据清洗处理的任务提供了一种轻量级的解决方案。


http://www.ppmy.cn/server/130012.html

相关文章

智能EDA小白从0开始 —— DAY15 PADS

PADS&#xff08;Personal Automated Design System&#xff09;作为一款由Mentor Graphics公司推出的电子设计自动化&#xff08;EDA&#xff09;软件&#xff0c;自1986年面世以来&#xff0c;一直在电子设计领域扮演着重要角色。PADS以其模块化的设计、丰富的功能和稳定的性…

服装生产管理的数字化转型:SpringBoot框架

4 系统设计 4.1 系统结构设计 在结构设计过程中&#xff0c;首先对系统进行需求分析&#xff0c;然后进行系统初步设计&#xff0c;将系统功能模块细化&#xff0c;具体分析每一个功能模块具体应该首先哪些功能&#xff0c;最后将各个模块进行整合&#xff0c;实现系统结构的…

GitHub Copilot 使用手册(一)--配置

一、 什么是GitHub Copilot GitHub Copilot 是GitHub和OpenAI合作开发的一个人工智能工具&#xff0c;在使用Visual Studio Code、Microsoft Visual Studio、Vim、Cursor或JetBrains等IDE时可以协助用户编写代码等工作&#xff0c;实现虚拟的结对编程。 二、 GitHub Copilot …

003 Springboot操作RabbitMQ

Springboot整合RabbitMQ 文章目录 Springboot整合RabbitMQ1.pom依赖2.yml配置3.配置队列、交换机方式一&#xff1a;直接通过配置类配置bean方式二&#xff1a;消息监听通过注解配置 4.编写消息监听发送测试5.其他类型交换机配置1.FanoutExchange2.TopicExchange3.HeadersExcha…

java连接mysql查询数据(基础版,无框架)

依赖引入: <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java

SpringBoot WebSocket Jmeter压力测试 踩坑记录之URL

先说结论&#xff0c;使用Jmeter压测的时候一定要在URL上区分出哪些是sessionId &#xff0c;否则可能会出现会话重复的情况。 以下是具体内容&#xff1a; 以下面的 WebSocket URL 为例&#xff1a; ws://127.0.0.1:8005/market-ws/110/uyux3kws/websocket 这个 URL 可以…

「实战应用」如何用图表控件LightningChart可视化天气数据?(一)

LightningChart.NET完全由GPU加速&#xff0c;并且性能经过优化&#xff0c;可用于实时显示海量数据-超过10亿个数据点。 LightningChart包括广泛的2D&#xff0c;高级3D&#xff0c;Polar&#xff0c;Smith&#xff0c;3D饼/甜甜圈&#xff0c;地理地图和GIS图表以及适用于科学…

Golang | Leetcode Golang题解之第472题连接词

题目&#xff1a; 题解&#xff1a; type trie struct {children [26]*trieisEnd bool }func (root *trie) insert(word string) {node : rootfor _, ch : range word {ch - aif node.children[ch] nil {node.children[ch] &trie{}}node node.children[ch]}node.isE…