【Elasticsearch】inference ingest pipeline

embedded/2025/1/24 5:36:10/

 Elasticsearch 的 Ingest Pipeline 功能允许你在数据索引之前对其进行预处理。通过使用 Ingest Pipeline,你可以执行各种数据转换和富化操作,包括使用机器学习模型进行推理(inference)。这在处理词嵌入、情感分析、图像识别等场景中非常有用。

 

### 使用 Inference Ingest Pipeline

 

以下是一个详细的步骤,展示如何使用 Inference Ingest Pipeline 在 Elasticsearch 中加载和使用预训练的机器学习模型来进行推理。

 

### 步骤 1: 准备机器学习模型

 

首先,你需要准备一个预训练的机器学习模型,并将其部署到 Elasticsearch 的机器学习模块中。Elasticsearch 支持多种模型格式,包括 TensorFlow、PyTorch、ONNX 等。

 

#### 示例:上传 TensorFlow 模型

 

1. **下载或训练模型**:确保你有一个 TensorFlow 模型文件(例如,`.pb` 文件)。

2. **上传模型**:使用 Elasticsearch 的机器学习 API 将模型上传到 Elasticsearch。

 

```json

PUT _ml/trained_models/my_word_embedding_model

{

  "input": {

    "field_names": ["text"]

  },

  "inference_config": {

    "natural_language_inference": {

      "results_field": "inference_results"

    }

  },

  "model": {

    "definition": {

      "path": "path/to/your/model.pb"

    }

  }

}

```

 

### 步骤 2: 创建 Ingest Pipeline

 

创建一个 Ingest Pipeline,使用刚刚上传的模型进行推理。

 

```json

PUT _ingest/pipeline/word_embedding_pipeline

{

  "description": "Pipeline to add word embeddings using a trained model",

  "processors": [

    {

      "inference": {

        "model_id": "my_word_embedding_model",

        "target_field": "embedding"

      }

    }

  ]

}

```

 

### 步骤 3: 使用 Ingest Pipeline 索引数据

 

在索引数据时,指定使用创建的 Ingest Pipeline。

 

```json

POST word_embeddings/_doc?pipeline=word_embedding_pipeline

{

  "word": "example"

}

```

 

### 示例:完整流程

 

以下是一个完整的示例,展示如何从头开始创建和使用 Inference Ingest Pipeline。

 

#### 1. 上传模型

 

```json

PUT _ml/trained_models/my_word_embedding_model

{

  "input": {

    "field_names": ["text"]

  },

  "inference_config": {

    "natural_language_inference": {

      "results_field": "inference_results"

    }

  },

  "model": {

    "definition": {

      "path": "path/to/your/model.pb"

    }

  }

}

```

 

#### 2. 创建 Ingest Pipeline

 

```json

PUT _ingest/pipeline/word_embedding_pipeline

{

  "description": "Pipeline to add word embeddings using a trained model",

  "processors": [

    {

      "inference": {

      "model_id": "my_word_embedding_model",

      "target_field": "embedding"

    }

  ]

}

```

 

#### 3. 创建索引

 

```json

PUT word_embeddings

{

  "mappings": {

    "properties": {

      "word": {

        "type": "keyword"

      },

      "embedding": {

        "type": "dense_vector",

        "dims": 100 // 根据你的词嵌入模型的维度设置

      }

    }

  }

}

```

 

#### 4. 索引数据

 

```json

POST word_embeddings/_doc?pipeline=word_embedding_pipeline

{

  "word": "example"

}

```

 

### 验证结果

 

你可以通过查询索引来验证数据是否正确索引,并且词嵌入向量是否已添加。

 

```json

GET word_embeddings/_search

{

  "query": {

    "match": {

      "word": "example"

    }

  }

}

```

 

### 注意事项

 

1. **模型路径**:确保模型文件路径正确,并且 Elasticsearch 有权限访问该路径。

2. **模型格式**:Elasticsearch 支持多种模型格式,确保你使用的模型格式与 Elasticsearch 兼容。

3. **性能**:Inference Ingest Pipeline 可能会影响索引性能,特别是在处理大量数据时。考虑在生产环境中进行性能测试。

 

通过以上步骤,你可以在 Elasticsearch 中使用 Inference Ingest Pipeline 对数据进行预处理,从而实现词嵌入的自动计算和存储。希望这些示例和说明能帮助你更好地理解和使用 Elasticsearch 的 Inference Ingest Pipeline 功能。

当你执行以下查询时,Elasticsearch 会返回与 `word` 字段匹配 "example" 的所有文档及其相关信息。假设你已经按照前面的步骤创建了索引并插入了数据,查询结果将包含文档的 `_id`、`_source` 等字段。

 

### 查询示例

 

```json

GET word_embeddings/_search

{

  "query": {

    "match": {

      "word": "example"

    }

  }

}

```

 

### 返回结果示例

 

假设你已经索引了一些文档,查询结果可能如下所示:

 

```json

{

  "took": 1,

  "timed_out": false,

  "_shards": {

    "total": 1,

    "successful": 1,

    "skipped": 0,

    "failed": 0

  },

  "hits": {

    "total": {

      "value": 1,

      "relation": "eq"

    },

    "max_score": 0.2876821,

    "hits": [

      {

        "_index": "word_embeddings",

        "_type": "_doc",

        "_id": "1",

        "_score": 0.2876821,

        "_source": {

          "word": "example",

          "embedding": [0.1, 0.2, ..., 0.100]

        }

      }

    ]

  }

}

```

 

### 解释

 

- **`took`**: 查询花费的时间(毫秒)。

- **`timed_out`**: 查询是否超时。

- **`_shards`**: 分片信息,包括总分片数、成功分片数、跳过分片数和失败分片数。

- **`hits`**: 匹配的文档列表。

  - **`total`**: 匹配的文档总数。

  - **`max_score`**: 最高评分。

  - **`hits`**: 匹配的文档列表,每个文档包含以下字段:

    - **`_index`**: 文档所属的索引名称。

    - **`_type`**: 文档类型(在Elasticsearch 7.x及更高版本中,类型总是`_doc`)。

    - **`_id`**: 文档的唯一标识符。

    - **`_score`**: 文档的相关性评分。

    - **`_source`**: 文档的原始内容,包括所有字段及其值。

 

### 示例解释

 

假设你索引了一个文档,其中 `word` 字段为 "example",`embedding` 字段为 `[0.1, 0.2, ..., 0.100]`,查询结果将返回这个文档的信息。

 

- **`_index`**: 文档所属的索引名称,这里是 `word_embeddings`。

- **`_type`**: 文档类型,这里是 `_doc`。

- **`_id`**: 文档的唯一标识符,假设为 `1`。

- **`_score`**: 文档的相关性评分,这里是 `0.2876821`。

- **`_source`**: 文档的原始内容,包括 `word` 和 `embedding` 字段。

 

### 进一步操作

 

如果你需要进一步处理或过滤结果,可以在查询中添加更多的条件。例如,你可以使用 `size` 参数来限制返回的文档数量,或者使用 `sort` 参数来按特定字段排序。

 

```json

GET word_embeddings/_search

{

  "query": {

    "match": {

      "word": "example"

    }

  },

  "size": 10,

  "sort": [

    { "_score": { "order": "desc" } }

  ]

}

```

 

希望这些解释和示例能帮助你更好地理解和使用 Elasticsearch 的查询结果。

PUT _ingest/pipeline/word_embedding_pipeline
{
  "description": "Pipeline to add word embeddings using a trained model",
  "processors": [
    {
      "inference": {
      "model_id": "my_word_embedding_model",
      "target_field": "embedding"
    }
  ]
}这段是干嘛的?

这段代码用于在 Elasticsearch 中创建一个 Ingest Pipeline,该管道将在数据索引之前使用预训练的机器学习模型进行推理(inference),并将推理结果(词嵌入向量)添加到文档中。具体来说,这段代码做了以下几件事:

 

### 1. 创建 Ingest Pipeline

 

```json

PUT _ingest/pipeline/word_embedding_pipeline

{

  "description": "Pipeline to add word embeddings using a trained model",

  "processors": [

    {

      "inference": {

        "model_id": "my_word_embedding_model",

        "target_field": "embedding"

      }

    }

  ]

}

```

 

### 详细解释

 

- **`PUT _ingest/pipeline/word_embedding_pipeline`**:

  - 这是一个 HTTP PUT 请求,用于创建一个新的 Ingest Pipeline。`word_embedding_pipeline` 是管道的名称,你可以根据需要选择一个合适的名称。

 

- **`description`**:

  - 这是一个可选字段,用于描述管道的目的。在这里,描述为 "Pipeline to add word embeddings using a trained model",表示这个管道用于使用预训练模型添加词嵌入向量。

 

- **`processors`**:

  - 这是一个数组,包含一个或多个处理器(processors)。每个处理器定义了一个数据处理步骤。

 

- **`inference`**:

  - 这是一个处理器,用于执行机器学习模型的推理。它有两个主要参数:

    - **`model_id`**: 指定要使用的预训练模型的 ID。在这个例子中,模型 ID 是 `my_word_embedding_model`。

    - **`target_field`**: 指定将推理结果存储到文档中的哪个字段。在这个例子中,推理结果将存储在 `embedding` 字段中。

 

### 2. 使用 Ingest Pipeline 索引数据

 

一旦创建了 Ingest Pipeline,你可以在索引数据时指定使用该管道。这样,Elasticsearch 会在数据索引之前自动执行推理,并将结果添加到文档中。

 

```json

POST word_embeddings/_doc?pipeline=word_embedding_pipeline

{

  "word": "example"

}

```

 

### 示例流程

 

1. **上传模型**:

   - 首先,你需要将预训练的机器学习模型上传到 Elasticsearch。假设你已经上传了一个名为 `my_word_embedding_model` 的模型。

 

2. **创建 Ingest Pipeline**:

   - 使用上述代码创建一个 Ingest Pipeline,该管道将使用 `my_word_embedding_model` 模型进行推理,并将结果存储在 `embedding` 字段中。

 

3. **索引数据**:

   - 在索引数据时,指定使用创建的 Ingest Pipeline。Elasticsearch 将在数据索引之前执行推理,并将结果添加到文档中。

 

### 示例

 

假设你已经上传了一个名为 `my_word_embedding_model` 的模型,并创建了 `word_embedding_pipeline` 管道。以下是一个完整的示例:

 

#### 1. 上传模型

 

```json

PUT _ml/trained_models/my_word_embedding_model

{

  "input": {

    "field_names": ["text"]

  },

  "inference_config": {

    "natural_language_inference": {

      "results_field": "inference_results"

    }

  },

  "model": {

    "definition": {

      "path": "path/to/your/model.pb"

    }

  }

}

```

 

#### 2. 创建 Ingest Pipeline

 

```json

PUT _ingest/pipeline/word_embedding_pipeline

{

  "description": "Pipeline to add word embeddings using a trained model",

  "processors": [

    {

      "inference": {

        "model_id": "my_word_embedding_model",

        "target_field": "embedding"

      }

    }

  ]

}

```

 

#### 3. 创建索引

 

```json

PUT word_embeddings

{

  "mappings": {

    "properties": {

      "word": {

        "type": "keyword"

      },

      "embedding": {

        "type": "dense_vector",

        "dims": 100 // 根据你的词嵌入模型的维度设置

      }

    }

  }

}

```

 

#### 4. 索引数据

 

```json

POST word_embeddings/_doc?pipeline=word_embedding_pipeline

{

  "word": "example"

}

```

 

### 验证结果

 

你可以通过查询索引来验证数据是否正确索引,并且词嵌入向量是否已添加。

 

```json

GET word_embeddings/_search

{

  "query": {

    "match": {

      "word": "example"

    }

  }

}

```

 

### 返回结果示例

 

```json

{

  "took": 1,

  "timed_out": false,

  "_shards": {

    "total": 1,

    "successful": 1,

    "skipped": 0,

    "failed": 0

  },

  "hits": {

    "total": {

      "value": 1,

      "relation": "eq"

    },

    "max_score": 0.2876821,

    "hits": [

      {

        "_index": "word_embeddings",

        "_type": "_doc",

        "_id": "1",

        "_score": 0.2876821,

        "_source": {

          "word": "example",

          "embedding": [0.1, 0.2, ..., 0.100]

        }

      }

    ]

  }

}

```

 

通过以上步骤,你可以在 Elasticsearch 中使用 Ingest Pipeline 自动处理数据,将词嵌入向量添加到文档中。希望这些解释和示例能帮助你更好地理解和使用 Ingest Pipeline。如果有任何进一步的问题,请随时提问!

 


http://www.ppmy.cn/embedded/156481.html

相关文章

redis报错如何解决

错误复现 我启动我的redis,发现启动不成功。 查看错误日志 我们使用下面这个命令,来看看具体的错误原因。到底是为啥 journalctl -xe 我这个错误是因为,在我安装redis的时候,我的redis安装目录是,/usr/local/src/re…

【Golang 面试题】每日 3 题(四十三)

✍个人博客:Pandaconda-CSDN博客 📣专栏地址:http://t.csdnimg.cn/UWz06 📚专栏简介:在这个专栏中,我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话,欢迎点赞👍收藏…

王兴饭否二

关于创业 01.最一流的创业路径,注定前无古人后无来者。 02.创业不能蛮干,要等「大势至」。 03.在不同行业的创业里,资本可能是催化剂、助燃剂、或主力燃料。 04.创业公司的一个基本特征,就是不能指望进入「自动巡航」状态。 0…

leetcode刷题记录(八十四)——739. 每日温度

(一)问题描述 739. 每日温度 - 力扣(LeetCode)739. 每日温度 - 给定一个整数数组 temperatures ,表示每天的温度,返回一个数组 answer ,其中 answer[i] 是指对于第 i 天,下一个更高…

python学opencv|读取图像(四十一 )使用cv2.add()函数实现各个像素点BGR叠加

【1】引言 前序已经学习了直接在画布上使用掩模,会获得彩色图像的多种叠加效果,相关文章链接为: python学opencv|读取图像(四十)掩模:三通道图像的局部覆盖-CSDN博客 这时候如果更进一步,直接…

汇编实验·系统调用

一、实验目的: 1.掌握基于特定操作系统中调用API或者SYSTEMCALL的基本方法。 2.进一步理解高级语言中函数调用的相关规定和约定(stdcall,cdec,fastcall等) 3.IA-32架构下API参数在汇编中的实现方式和约定。 二、实验内容 1.在课程设定的VS2022社区版的汇编开发环境下,完…

web3py+flask+ganache的智能合约教育平台

最近在学习web3的接口文档,使用web3pyflaskganache写了一个简易的智能合约教育平台,语言用的是python,ganche直接使用的本地区块链网络,用web3py进行交互。 代码逻辑不难,可以私信或者到我的闲鱼号夏沫mds获取我的代码…

两台局域网电脑通过飞秋传输大文件失败的解决方案

问题描述: 局域网两台电脑之间传输大文件(超过20G),不想太复杂,就各装个飞秋。但是通过直接发送文件发现总是失败,一会就中断了。 解决方法: 主界面上有一个文件共享的按钮,通过文…