将 OneLake 数据索引到 Elasticsearch - 第 1 部分

news/2025/1/30 8:18:20/
aidu_pl">

作者:来自 Elastic Gustavo Llermaly

学习配置 OneLake,使用 Python 消费数据并在 Elasticsearch 中索引文档,然后运行语义搜索。

OneLake 是一款工具,可让你连接到不同的 Microsoft 数据源,例如 Power BI、Data Activator 和 Data factory 等。它支持将数据集中在 DataLakes 中,DataLakes 是支持全面数据存储、分析和处理的大容量存储库。

在本文中,我们将学习如何配置 OneLake、使用 Python 消费数据以及在 Elasticsearch 中索引文档,然后运行语义搜索。

有时,你可能希望在非结构化数据和来自不同来源和软件提供商的结构化数据中运行搜索,并使用 Kibana 创建可视化。对于这种任务,在 Elasticsearch 中索引文档作为中央存储库会变得非常有用。

在这个例子中,我们将使用一家名为 Shoestic 的虚拟公司,这是一家在线鞋店。我们在结构化文件 (CSV) 中列出了产品列表,而一些产品的数据表则采用非结构化格式 (DOCX)。这些文件存储在 OneLake 中。

你可以在此处找到包含完整示例(包括测试文档)的笔记本。

步骤

  • OneLake 初始配置
  • 使用 Python 连接到 OneLake
  • 索引文档
  • 查询

OneLake 初始配置

OneLake 架构可以总结如下:

要使用 OneLake 和 Microsoft Fabric,我们需要一个 Office 365 帐户。如果你没有,可以在此处创建一个试用帐户。

使用你的帐户登录 Microsoft Fabric。然后,创建一个名为 “ShoesticWorkspace” 的工作区。进入新创建的工作区后,创建一个 Lakehouse 并将其命名为“ShoesticDatalake”。最后一步是在 “Files” 中创建一个新文件夹。单击 “new subfolder” 并将其命名为 “ProductsData”。

完成了!我们准备开始提取数据了。

使用 Python 连接到 OneLake

配置完 OneLake 后,我们现在可以准备 Python 脚本。Azure 有处理凭据并与 OneLake 通信的库。

pip install azure-identity elasticsearch==8.14 azure-storage-file-datalake azure-cli python-docx

“azure-identity azure-storage-file-datalake” 库让我们可以与 OneLake 交互,同时 “azure-cli” 可以访问凭据并授予权限。为了读取文件内容以便稍后将其索引到 Elasticsearch,我们使用 python-docx。

在我们的本地环境中保存 Microsoft 凭据

我们将使用 “az login” 进入我们的 Microsoft 帐户并运行:

 az login --allow-no-subscriptions

标志 “ --allow-no-subscriptions”允许我们在没有有效订阅的情况下向 Microsoft Azure 进行身份验证。

此命令将打开一个浏览器窗口,你必须在其中访问你的帐户,然后选择你帐户的订阅号。

现在我们可以开始编写代码了!

创建一个名为 onelake.py 的文件并添加以下内容:

_onelake.py_

# Importing dependencies 
import chardet 
from azure.identity import DefaultAzureCredential 
from docx import Document 
from azure.storage.filedatalake import DataLakeServiceClient # Initializing the OneLake client 
ONELAKE_ACCOUNT_NAME = "onelake" 
ONELAKE_WORKSPACE_NAME = "ShoesticWorkspace" 
# Path in format <DataLake>.Lakehouse/files/<Folder path> 
ONELAKE_DATA_PATH = "shoesticDatalake.Lakehouse/Files/ProductsData" # Microsoft token 
token_credential = DefaultAzureCredential() # OneLake services 
service_client = DataLakeServiceClient( account_url=f"https://{ONELAKE_ACCOUNT_NAME}.dfs.fabric.microsoft.com", credential=token_credential, 
) 
file_system_client = service_client.get_file_system_client(ONELAKE_WORKSPACE_NAME) 
directory_client = file_system_client.get_directory_client(ONELAKE_DATA_PATH) # OneLake functions   # Upload a file to a LakeHouse directory 
def upload_file_to_directory(directory_client, local_path, file_name): file_client = directory_client.get_file_client(file_name) with open(local_path, mode="rb") as data: file_client.upload_data(data, overwrite=True) print(f"File: {file_name} uploaded to the data lake.") # Get directory contents from your lake folder 
def list_directory_contents(file_system_client, directory_name): paths = file_system_client.get_paths(path=directory_name) for path in paths: print(path.name + "\n") # Get a file by name from your lake folder 
def get_file_by_name(file_name, directory_client): return directory_client.get_file_client(file_name) # Decode docx 
def get_docx_content(file_client): download = file_client.download_file() file_content = download.readall() temp_file_path = "temp.docx" with open(temp_file_path, "wb") as temp_file: temp_file.write(file_content) doc = Document(temp_file_path) text = [] for paragraph in doc.paragraphs: text.append(paragraph.text) return "\n".join(text) # Decode csv 
def get_csv_content(file_client): download = file_client.download_file() file_content = download.readall() result = chardet.detect(file_content) encoding = result["encoding"] return file_content.decode(encoding) 

将文件上传到 OneLake

在此示例中,我们将使用一个 CSV 文件和一些包含有关我们鞋店产品信息的 .docx 文件。虽然你可以使用 UI 上传它们,但我们将使用 Python 来完成。在此处下载文件。

我们将文件放在文件夹 /data 中,位于名为 upload_files.py 的新 Python 脚本旁边:

# upload_files.py # Importing dependencies 
from azure.identity import DefaultAzureCredential 
from azure.storage.filedatalake import DataLakeServiceClient from functions import list_directory_contents, upload_file_to_directory 
from onelake import ONELAKE_DATA_PATH, directory_client, file_system_client csv_file_name = "products.csv" 
csv_local_path = f"./data/{csv_file_name}" docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"] 
docx_local_paths = [f"./data/{file_name}" for file_name in docx_files] # Upload files to Lakehouse 
upload_file_to_directory(directory_client, csv_local_path, csv_file_name) for docx_local_path in docx_local_paths: docx_file_name = docx_local_path.split("/")[-1] upload_file_to_directory(directory_client, docx_local_path, docx_file_name) # To check that the files have been uploaded, run "list_directory_contents" function to show the contents of the /ProductsData folder in our Datalake: 
print("Upload finished, Listing files: ") 
list_directory_contents(file_system_client, ONELAKE_DATA_PATH) 

运行上传脚本:

python upload_files.py

结果应该是:

Upload finished, Listing files: 
shoesticDatalake.Lakehouse/Files/ProductsData/beach-flip-flops.docx 
shoesticDatalake.Lakehouse/Files/ProductsData/classic-loafers.docx 
shoesticDatalake.Lakehouse/Files/ProductsData/products.csv 
shoesticDatalake.Lakehouse/Files/ProductsData/sport-sneakers.docx 

现在我们已经准备好文件了,让我们开始使用 Elasticsearch 分析和搜索我们的数据!

索引文档

我们将使用 ELSER 作为向量数据库的嵌入提供程序,以便我们可以运行语义查询。

我们选择 ELSER 是因为它针对 Elasticsearch 进行了优化,在域外检索方面胜过大多数竞争对手,这意味着按原样使用模型,而无需针对你自己的数据进行微调。

配置 ELSER

首先创建推理端点:

PUT _inference/sparse_embedding/onelake-inference-endpoint 
{ "service": "elser", "service_settings": { "num_allocations": 1, "num_threads": 1 } 

在后台加载模型时,如果你以前没有使用过 ELSER,则可能会收到 502 Bad Gateway 错误。在 Kibana 中,你可以在 “Machine Learning” > “Trained Models” 中检查模型状态。等到模型部署完成后再继续执行后续步骤。

索引数据

现在,由于我们同时拥有结构化数据和非结构化数据,因此我们将在 Kibana DevTools 控制台中使用具有不同映射的两个不同索引。

对于我们的结构化销售,让我们创建以下索引:

PUT shoestic-products 
{ "mappings": { "properties": { "product_id": { "type": "keyword" }, "product_name": { "type": "text" }, "amount": { "type": "float" }, "tags": { "type": "keyword" } } } 
} 

为了索引我们的非结构化数据(产品数据表),我们将使用:

PUT shoestic-products-descriptions 
{ "mappings": { "properties": { "title": { "type": "text", "analyzer": "english" }, "super_body": { "type": "semantic_text", "inference_id": "onelake-inference-endpoint" }, "body": { "type": "text", "copy_to": "super_body" } } } 
} 

注意:使用带有 copy_to 的字段很重要,这样还可以运行全文搜索,而不仅仅是在正文字段上运行语义搜索。

读取 OneLake 文件

在开始之前,我们需要使用这些命令(使用你自己的云 ID 和 API 密钥)初始化我们的 Elasticsearch 客户端。

创建一个名为 indexing.py 的 Python 脚本并添加以下几行:

# Importing dependencies 
import csv 
from io import StringIO from onelake import directory_client 
from elasticsearch import Elasticsearch, helpers from functions import get_csv_content, get_docx_content, get_file_by_name 
from upload_files_to_onelake import csv_file_client ELASTIC_CLUSTER_ID = "your-cloud-id" 
ELASTIC_API_KEY = "your-api-key" # Elasticsearch client 
es_client = Elasticsearch( cloud_id=ELASTIC_CLUSTER_ID, api_key=ELASTIC_API_KEY, 
) docx_files = ["beach-flip-flops.docx", "classic-loafers.docx", "sport-sneakers.docx"] 
docx_local_paths = [f"./data/{file_name}" for file_name in docx_files] csv_file_client = get_file_by_name("products.csv", directory_client) 
docx_files_clients = [] for docx_file_name in docx_files: docx_files_clients.append(get_file_by_name(docx_file_name, directory_client)) # We use these functions to extract data from the files: 
csv_content = get_csv_content(csv_file_client) 
reader = csv.DictReader(StringIO(csv_content)) 
docx_contents = [] for docx_file_client in docx_files_clients: docx_contents.append(get_docx_content(docx_file_client)) print("CSV FILE CONTENT: ", csv_content) 
print("DOCX FILE CONTENT: ", docx_contents) # The CSV tags are separated by commas (,). We'll turn these tags into an array: 
rows = csv_content.splitlines() 
reader = csv.DictReader(rows) 
modified_rows = [] for row in reader: row["tags"] = row["tags"].replace('"', "").split(",") modified_rows.append(row) print(row["tags"]) # We can now index the files into Elasticsearch 
reader = modified_rows 
csv_actions = [{"_index": "shoestic-products", "_source": row} for row in reader] docx_actions = [ { "_index": "shoestic-products-descriptions", "_source": {"title": docx_file_name, "body": docx}, } for docx_file_name, docx in zip(docx_files, docx_contents) 
] helpers.bulk(es_client, csv_actions) 
print("CSV data indexed successfully.") 
helpers.bulk(es_client, docx_actions) 
print("DOCX data indexed successfully.") 

现在运行脚本:

python indexing.py

查询

在 Elasticsearch 中对文档进行索引后,我们就可以测试语义查询了。在本例中,我们将在某些产品(tag)中搜索唯一术语。我们将针对结构化数据运行关键字搜索,针对非结构化数据运行语义搜索。

1. 关键字搜索

GET shoestic-products/_search 
{ "query": { "term": { "tags": "summer" } } 
} 

结果:

"_source": { "product_id": "P-118", "product_name": "Casual Sandals", "amount": "128.22", "tags": [ "casual", "summer" ] } 

2. 语义搜索:

GET shoestic-products-descriptions/_search 
{ "_source": { "excludes": [ "*embeddings", "*chunks" ] }, "query": { "semantic": { "field": "super_body", "query": "summer" } } 
} 

*我们排除了嵌入和块只是为了便于阅读。

结果:

"hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 4.3853106, "hits": [ { "_index": "shoestic-products-descriptions", "_id": "P2Hj6JIBF7lnCNFTDQEA", "_score": 4.3853106, "_source": { "super_body": { "inference": { "inference_id": "onelake-inference-endpoint", "model_settings": { "task_type": "sparse_embedding" } } }, "title": "beach-flip-flops.docx", "body": "Ideal for warm, sunny days by the water, these lightweight essentials are water-resistant and come in bright colors, bringing a laid-back vibe to any outing in the sun." } } ] } 

如你所见,当使用关键字搜索时,我们会得到与其中一个标签的完全匹配,相反,当我们使用语义搜索时,我们会得到与描述中的含义匹配的结果,而无需完全匹配。

结论

OneLake 使使用来自不同 Microsoft 来源的数据变得更容易,然后索引这些文档 Elasticsearch 允许我们使用高级搜索工具。在第一部分中,我们学习了如何连接到 OneLake 并在 Elasticsearch 中索引文档。在第二部分中,我们将使用 Elastic 连接器框架制作更强大的解决方案。敬请期待!

想要获得 Elastic 认证?了解下一次 Elasticsearch 工程师培训的时间!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在你的本地机器上试用 Elastic。

原文:Indexing OneLake data into Elasticsearch - Part 1 - Elasticsearch Labs


http://www.ppmy.cn/news/1567842.html

相关文章

国内优秀的FPGA设计公司主要分布在哪些城市?

近年来&#xff0c;国内FPGA行业发展迅速&#xff0c;随着5G通信、人工智能、大数据等新兴技术的崛起&#xff0c;FPGA设计企业的需求也迎来了爆发式增长。很多技术人才在求职时都会考虑城市的行业分布和发展潜力。因此&#xff0c;国内优秀的FPGA设计公司主要分布在哪些城市&a…

算法随笔_31:移动零

上一篇:算法随笔_30: 去除重复字母-CSDN博客 题目描述如下: 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。 示例 1: 输入: nums [0,1,…

windows远程调用shell脚本

在Windows系统中远程调用Shell脚本通常涉及到几个步骤&#xff0c;因为Windows本身并不直接支持Shell脚本&#xff08;如Bash脚本&#xff09;。但是&#xff0c;你可以通过以下几种方式来实现这一目标&#xff1a; 方法1&#xff1a;使用Cygwin或Git Bash 安装Cygwin或Git Ba…

学习std::is_base_of笔记

1、std::is_base_of简介 在现代 C 中&#xff0c;模板元编程&#xff08;Template Metaprogramming&#xff09;是一种非常强大的编程技巧&#xff0c;它让我们能够在编译期进行类型推导和约束。而 std::is_base_of 是一个重要的工具&#xff0c;可以用来检查一个类型是否是另…

Python 函数魔法书:基础、范例、避坑、测验与项目实战

Python 函数魔法书&#xff1a;基础、范例、避坑、测验与项目实战 内容简介 本系列文章是为 Python3 学习者精心设计的一套全面、实用的学习指南&#xff0c;旨在帮助读者从基础入门到项目实战&#xff0c;全面提升编程能力。文章结构由 5 个版块组成&#xff0c;内容层层递进…

使用国内镜像加速器解决 Docker Hub 拉取镜像慢或被屏蔽的问题

一、问题背景 Docker Hub 是 Docker 默认的镜像仓库&#xff0c;但由于网络限制&#xff0c;国内用户直接拉取镜像可能面临以下问题&#xff1a; 下载速度极慢&#xff08;尤其是大镜像&#xff09;。连接超时或完全被屏蔽&#xff08;部分网络环境&#xff09;。依赖国外源的…

python:斐索实验(Fizeau experiment)

斐索实验&#xff08;Fizeau experiment&#xff09;是在1851年由法国物理学家阿曼德斐索&#xff08;Armand Fizeau&#xff09;进行的一项重要实验&#xff0c;旨在测量光在移动介质中的传播速度。这项实验的结果对当时的物理理论产生了深远的影响&#xff0c;并且在后来的相…

langchain基础(二)

一、输出解析器&#xff08;Output Parser&#xff09; 作用&#xff1a;&#xff08;1&#xff09;让模型按照指定的格式输出&#xff1b; &#xff08;2&#xff09;解析模型输出&#xff0c;提取所需的信息 1、逗号分隔列表 CommaSeparatedListOutputParser&#xff1a;…