【Elasticsearch】inference ingest pipeline

news/2025/1/24 1:25:55/

 Elasticsearch 的 Ingest Pipeline 功能允许你在数据索引之前对其进行预处理。通过使用 Ingest Pipeline,你可以执行各种数据转换和富化操作,包括使用机器学习模型进行推理(inference)。这在处理词嵌入、情感分析、图像识别等场景中非常有用。

 

### 使用 Inference Ingest Pipeline

 

以下是一个详细的步骤,展示如何使用 Inference Ingest Pipeline 在 Elasticsearch 中加载和使用预训练的机器学习模型来进行推理。

 

### 步骤 1: 准备机器学习模型

 

首先,你需要准备一个预训练的机器学习模型,并将其部署到 Elasticsearch 的机器学习模块中。Elasticsearch 支持多种模型格式,包括 TensorFlow、PyTorch、ONNX 等。

 

#### 示例:上传 TensorFlow 模型

 

1. **下载或训练模型**:确保你有一个 TensorFlow 模型文件(例如,`.pb` 文件)。

2. **上传模型**:使用 Elasticsearch 的机器学习 API 将模型上传到 Elasticsearch。

 

```json

PUT _ml/trained_models/my_word_embedding_model

{

  "input": {

    "field_names": ["text"]

  },

  "inference_config": {

    "natural_language_inference": {

      "results_field": "inference_results"

    }

  },

  "model": {

    "definition": {

      "path": "path/to/your/model.pb"

    }

  }

}

```

 

### 步骤 2: 创建 Ingest Pipeline

 

创建一个 Ingest Pipeline,使用刚刚上传的模型进行推理。

 

```json

PUT _ingest/pipeline/word_embedding_pipeline

{

  "description": "Pipeline to add word embeddings using a trained model",

  "processors": [

    {

      "inference": {

        "model_id": "my_word_embedding_model",

        "target_field": "embedding"

      }

    }

  ]

}

```

 

### 步骤 3: 使用 Ingest Pipeline 索引数据

 

在索引数据时,指定使用创建的 Ingest Pipeline。

 

```json

POST word_embeddings/_doc?pipeline=word_embedding_pipeline

{

  "word": "example"

}

```

 

### 示例:完整流程

 

以下是一个完整的示例,展示如何从头开始创建和使用 Inference Ingest Pipeline。

 

#### 1. 上传模型

 

```json

PUT _ml/trained_models/my_word_embedding_model

{

  "input": {

    "field_names": ["text"]

  },

  "inference_config": {

    "natural_language_inference": {

      "results_field": "inference_results"

    }

  },

  "model": {

    "definition": {

      "path": "path/to/your/model.pb"

    }

  }

}

```

 

#### 2. 创建 Ingest Pipeline

 

```json

PUT _ingest/pipeline/word_embedding_pipeline

{

  "description": "Pipeline to add word embeddings using a trained model",

  "processors": [

    {

      "inference": {

      "model_id": "my_word_embedding_model",

      "target_field": "embedding"

    }

  ]

}

```

 

#### 3. 创建索引

 

```json

PUT word_embeddings

{

  "mappings": {

    "properties": {

      "word": {

        "type": "keyword"

      },

      "embedding": {

        "type": "dense_vector",

        "dims": 100 // 根据你的词嵌入模型的维度设置

      }

    }

  }

}

```

 

#### 4. 索引数据

 

```json

POST word_embeddings/_doc?pipeline=word_embedding_pipeline

{

  "word": "example"

}

```

 

### 验证结果

 

你可以通过查询索引来验证数据是否正确索引,并且词嵌入向量是否已添加。

 

```json

GET word_embeddings/_search

{

  "query": {

    "match": {

      "word": "example"

    }

  }

}

```

 

### 注意事项

 

1. **模型路径**:确保模型文件路径正确,并且 Elasticsearch 有权限访问该路径。

2. **模型格式**:Elasticsearch 支持多种模型格式,确保你使用的模型格式与 Elasticsearch 兼容。

3. **性能**:Inference Ingest Pipeline 可能会影响索引性能,特别是在处理大量数据时。考虑在生产环境中进行性能测试。

 

通过以上步骤,你可以在 Elasticsearch 中使用 Inference Ingest Pipeline 对数据进行预处理,从而实现词嵌入的自动计算和存储。希望这些示例和说明能帮助你更好地理解和使用 Elasticsearch 的 Inference Ingest Pipeline 功能。

当你执行以下查询时,Elasticsearch 会返回与 `word` 字段匹配 "example" 的所有文档及其相关信息。假设你已经按照前面的步骤创建了索引并插入了数据,查询结果将包含文档的 `_id`、`_source` 等字段。

 

### 查询示例

 

```json

GET word_embeddings/_search

{

  "query": {

    "match": {

      "word": "example"

    }

  }

}

```

 

### 返回结果示例

 

假设你已经索引了一些文档,查询结果可能如下所示:

 

```json

{

  "took": 1,

  "timed_out": false,

  "_shards": {

    "total": 1,

    "successful": 1,

    "skipped": 0,

    "failed": 0

  },

  "hits": {

    "total": {

      "value": 1,

      "relation": "eq"

    },

    "max_score": 0.2876821,

    "hits": [

      {

        "_index": "word_embeddings",

        "_type": "_doc",

        "_id": "1",

        "_score": 0.2876821,

        "_source": {

          "word": "example",

          "embedding": [0.1, 0.2, ..., 0.100]

        }

      }

    ]

  }

}

```

 

### 解释

 

- **`took`**: 查询花费的时间(毫秒)。

- **`timed_out`**: 查询是否超时。

- **`_shards`**: 分片信息,包括总分片数、成功分片数、跳过分片数和失败分片数。

- **`hits`**: 匹配的文档列表。

  - **`total`**: 匹配的文档总数。

  - **`max_score`**: 最高评分。

  - **`hits`**: 匹配的文档列表,每个文档包含以下字段:

    - **`_index`**: 文档所属的索引名称。

    - **`_type`**: 文档类型(在Elasticsearch 7.x及更高版本中,类型总是`_doc`)。

    - **`_id`**: 文档的唯一标识符。

    - **`_score`**: 文档的相关性评分。

    - **`_source`**: 文档的原始内容,包括所有字段及其值。

 

### 示例解释

 

假设你索引了一个文档,其中 `word` 字段为 "example",`embedding` 字段为 `[0.1, 0.2, ..., 0.100]`,查询结果将返回这个文档的信息。

 

- **`_index`**: 文档所属的索引名称,这里是 `word_embeddings`。

- **`_type`**: 文档类型,这里是 `_doc`。

- **`_id`**: 文档的唯一标识符,假设为 `1`。

- **`_score`**: 文档的相关性评分,这里是 `0.2876821`。

- **`_source`**: 文档的原始内容,包括 `word` 和 `embedding` 字段。

 

### 进一步操作

 

如果你需要进一步处理或过滤结果,可以在查询中添加更多的条件。例如,你可以使用 `size` 参数来限制返回的文档数量,或者使用 `sort` 参数来按特定字段排序。

 

```json

GET word_embeddings/_search

{

  "query": {

    "match": {

      "word": "example"

    }

  },

  "size": 10,

  "sort": [

    { "_score": { "order": "desc" } }

  ]

}

```

 

希望这些解释和示例能帮助你更好地理解和使用 Elasticsearch 的查询结果。

PUT _ingest/pipeline/word_embedding_pipeline
{
  "description": "Pipeline to add word embeddings using a trained model",
  "processors": [
    {
      "inference": {
      "model_id": "my_word_embedding_model",
      "target_field": "embedding"
    }
  ]
}这段是干嘛的?

这段代码用于在 Elasticsearch 中创建一个 Ingest Pipeline,该管道将在数据索引之前使用预训练的机器学习模型进行推理(inference),并将推理结果(词嵌入向量)添加到文档中。具体来说,这段代码做了以下几件事:

 

### 1. 创建 Ingest Pipeline

 

```json

PUT _ingest/pipeline/word_embedding_pipeline

{

  "description": "Pipeline to add word embeddings using a trained model",

  "processors": [

    {

      "inference": {

        "model_id": "my_word_embedding_model",

        "target_field": "embedding"

      }

    }

  ]

}

```

 

### 详细解释

 

- **`PUT _ingest/pipeline/word_embedding_pipeline`**:

  - 这是一个 HTTP PUT 请求,用于创建一个新的 Ingest Pipeline。`word_embedding_pipeline` 是管道的名称,你可以根据需要选择一个合适的名称。

 

- **`description`**:

  - 这是一个可选字段,用于描述管道的目的。在这里,描述为 "Pipeline to add word embeddings using a trained model",表示这个管道用于使用预训练模型添加词嵌入向量。

 

- **`processors`**:

  - 这是一个数组,包含一个或多个处理器(processors)。每个处理器定义了一个数据处理步骤。

 

- **`inference`**:

  - 这是一个处理器,用于执行机器学习模型的推理。它有两个主要参数:

    - **`model_id`**: 指定要使用的预训练模型的 ID。在这个例子中,模型 ID 是 `my_word_embedding_model`。

    - **`target_field`**: 指定将推理结果存储到文档中的哪个字段。在这个例子中,推理结果将存储在 `embedding` 字段中。

 

### 2. 使用 Ingest Pipeline 索引数据

 

一旦创建了 Ingest Pipeline,你可以在索引数据时指定使用该管道。这样,Elasticsearch 会在数据索引之前自动执行推理,并将结果添加到文档中。

 

```json

POST word_embeddings/_doc?pipeline=word_embedding_pipeline

{

  "word": "example"

}

```

 

### 示例流程

 

1. **上传模型**:

   - 首先,你需要将预训练的机器学习模型上传到 Elasticsearch。假设你已经上传了一个名为 `my_word_embedding_model` 的模型。

 

2. **创建 Ingest Pipeline**:

   - 使用上述代码创建一个 Ingest Pipeline,该管道将使用 `my_word_embedding_model` 模型进行推理,并将结果存储在 `embedding` 字段中。

 

3. **索引数据**:

   - 在索引数据时,指定使用创建的 Ingest Pipeline。Elasticsearch 将在数据索引之前执行推理,并将结果添加到文档中。

 

### 示例

 

假设你已经上传了一个名为 `my_word_embedding_model` 的模型,并创建了 `word_embedding_pipeline` 管道。以下是一个完整的示例:

 

#### 1. 上传模型

 

```json

PUT _ml/trained_models/my_word_embedding_model

{

  "input": {

    "field_names": ["text"]

  },

  "inference_config": {

    "natural_language_inference": {

      "results_field": "inference_results"

    }

  },

  "model": {

    "definition": {

      "path": "path/to/your/model.pb"

    }

  }

}

```

 

#### 2. 创建 Ingest Pipeline

 

```json

PUT _ingest/pipeline/word_embedding_pipeline

{

  "description": "Pipeline to add word embeddings using a trained model",

  "processors": [

    {

      "inference": {

        "model_id": "my_word_embedding_model",

        "target_field": "embedding"

      }

    }

  ]

}

```

 

#### 3. 创建索引

 

```json

PUT word_embeddings

{

  "mappings": {

    "properties": {

      "word": {

        "type": "keyword"

      },

      "embedding": {

        "type": "dense_vector",

        "dims": 100 // 根据你的词嵌入模型的维度设置

      }

    }

  }

}

```

 

#### 4. 索引数据

 

```json

POST word_embeddings/_doc?pipeline=word_embedding_pipeline

{

  "word": "example"

}

```

 

### 验证结果

 

你可以通过查询索引来验证数据是否正确索引,并且词嵌入向量是否已添加。

 

```json

GET word_embeddings/_search

{

  "query": {

    "match": {

      "word": "example"

    }

  }

}

```

 

### 返回结果示例

 

```json

{

  "took": 1,

  "timed_out": false,

  "_shards": {

    "total": 1,

    "successful": 1,

    "skipped": 0,

    "failed": 0

  },

  "hits": {

    "total": {

      "value": 1,

      "relation": "eq"

    },

    "max_score": 0.2876821,

    "hits": [

      {

        "_index": "word_embeddings",

        "_type": "_doc",

        "_id": "1",

        "_score": 0.2876821,

        "_source": {

          "word": "example",

          "embedding": [0.1, 0.2, ..., 0.100]

        }

      }

    ]

  }

}

```

 

通过以上步骤,你可以在 Elasticsearch 中使用 Ingest Pipeline 自动处理数据,将词嵌入向量添加到文档中。希望这些解释和示例能帮助你更好地理解和使用 Ingest Pipeline。如果有任何进一步的问题,请随时提问!

 


http://www.ppmy.cn/news/1565640.html

相关文章

IP属地与定位技术:谁更精准地锁定你的位置?

在数字化时代,位置信息的重要性不言而喻。无论是社交媒体上的互动、电商平台的个性化推荐,还是紧急情况下的快速定位,都离不开对位置的准确判断。在众多定位技术中,IP属地与基于GPS等技术的定位服务是最为常见的两种。那么&#x…

使用 Hadoop 实现大数据的高效存储与查询

💖 欢迎来到我的博客! 非常高兴能在这里与您相遇。在这里,您不仅能获得有趣的技术分享,还能感受到轻松愉快的氛围。无论您是编程新手,还是资深开发者,都能在这里找到属于您的知识宝藏,学习和成长…

【MySQL】超详细MySQL常用日期格式转换函数、字符串函数、聚合函数(最新版)

文章目录 一、MySQL常用日期格式转换函数 1、查看当前日期时间2、日期函数3、日期格式转换4、字符串日期转换5、时间单位转换6、DATE_ADD(date,interval expr type) 从日期加上指定的时间间隔 [expr为正数是往后加,为负数是往前减]7、DATE_SUB(date,interval expr …

自定义BeanPostProcessor实现自动注入标注了特定注解的Bean

定义注解 Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) Retention(RetentionPolicy.RUNTIME) Documented public interface MyAnno { }定义一个配置类 Configuration public class RestConfig {MyAnnoBeanpublic PayDTO payDTO(){PayDTO payDTO …

三篇物联网漏洞挖掘综述

由于物联网设备存在硬件资源受限、硬件复杂异构, 代码、文档未公开的问题, 物联网设备的漏洞挖掘存在较大的挑战: 硬件资源受限性: 通用动态二进分析技术需要在运行程序外围实施监控分析。由于物联网设备存储资源(存储)的受限性,…

win32汇编环境,怎么得到磁盘的盘符

;运行效果 ;win32汇编环境,怎么得到磁盘的盘符 ;以下代码主要为了展示一下原理,应用GetLogicalDrives、GetLogicalDriveStrings函数、屏蔽某些二进制位、按双字节复制内容等。以下代码最多查8个盘,即返回值中的1个字节的信息 ;直接抄进RadAsm可编译运行。…

【Rust自学】14.3. 使用pub use导出方便使用的API

喜欢的话别忘了点赞、收藏加关注哦(加关注即可阅读全文),对接下来的教程有兴趣的可以关注专栏。谢谢喵!(・ω・) 14.3.1. 使用pub use导出方便使用的API 在第七章中我们介绍了mod关键字,我们使…

电脑有两张网卡,如何实现同时访问外网和内网?

要是想让一台电脑用两张网卡,既能访问外网又能访问内网,那可以通过设置网络路由还有网卡的 IP 地址来达成。 检查一下网卡的连接 得保证电脑的两张网卡分别连到外网和内网的网络设备上,像路由器或者交换机啥的。 给网卡配上不一样的 IP 地…