Milvus 到 TiDB 向量迁移实践

devtools/2024/10/22 0:58:07/

作者: caiyfc 原文来源: https://tidb.net/blog/e0035e5e

一、背景

我最近在研究使用向量数据库搭建RAG应用,并且已经使用 Milvus、Llama 3、Ollama、LangChain 搭建完成。最近通过活动获取了 TiDB Cloud Serverless 使用配额,于是打算把 Milvus 已完成的向量数据给迁移到 TiDB Cloud Serverless 中。

经过查阅相关资料,我发现向量数据迁移的工具还不支持从 Milvus 迁移到 TiDB。那就无法迁移了吗?不,虽然现有的工具不能迁移,但是我可以手动迁移。于是就有了这篇文章。

TiDB Cloud Serverless 活动地址: 【TiDB 社区福利】贡献开源代码的开发者看过来!最高可获得超 14,000 元的 TiDB Cloud Serverless 云资源额度

搭建RAG应用方法: 手把手系列 | 使用Milvus、Llama 3、Ollama、LangChain本地设置RAG应用

二、迁移方案

要做数据迁移,首先需要确定迁移方案。最简单的迁移就两个步骤:数据从源库导出、数据导入到目标库,这样就完成了数据迁移。

但是这次就不同了。RAG应用使用了 LangChain,根据调研,LangChain 在 Milvus 和在 TiDB 中创建的结构是不同的。

在 Milvus 中的 collection 名称是:LangChainCollection,结构是: image-20241016162111628

但是在 TiDB 中的table 名称是:langchain_vector,结构是:

image-20241016162229794

在 LangChain 的文档中也有说明: image-20241016162326669

那么这次数据迁移就需要多增加两个步骤了:数据整理、表结构调整。而这两个又是异构数据库,所以导出的数据格式选择较为通用的csv。

整体方案如下:

image-20241016163112988

三、Milvus 数据导出

根据 Milvus 的官方文档,没找到能直接把数据导出成 csv 文件的工具,但是我可以用 python 的 SDK 来把数据读取出来,然后存成 csv 文件。

import csv
from pymilvus import connections, Collection
​
# 连接到 Milvus
connections.connect("default", host="10.3.xx.xx", port="19530")
​
# 获取 Collection
collection = Collection("LangChainCollection")
​# 分页查询所有数据
limit = 1000
offset = 0
all_results = []
​
while True:# 传递 expr 参数,使用一个简单的条件查询所有数据results = collection.query(expr="", output_fields=["pk", "source", "page", "text", "vector"], limit=limit, offset=offset)if not results:breakall_results.extend(results)offset += limit
​
# 打开 CSV 文件,准备写入数据
with open("milvus_data.csv", "w", newline="", encoding='utf-8') as csvfile:# 定义 CSV 列名fieldnames = ["pk", "source", "page", "text", "vector"]writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
​# 写入表头writer.writeheader()
​# 写入每一条记录for result in all_results:# 解析 JSON 数据,提取字段vector_str = ','.join(map(str, result.get("vector", [])))  # 将向量数组转换为字符串writer.writerow({"pk": result.get("pk"),          # 获取主键"source": result.get("source"),  # 获取源文件"page": result.get("page"),      # 获取页码"text": result.get("text"),      # 获取文本"vector": vector_str             # 写入向量数据})
​
print(f"Total records written to CSV: {len(all_results)}")

导出的 csv 文件数据的格式为:

image-20241016165426794

四、数据整理、表结构整理

我用少量测试数据转换成向量,使用 LangChain 加载到 TiDB Cloud 中了。这样就得到了 TiDB Cloud 中的数据结构及数据格式了。

表结构为:

CREATE TABLE `langchain_vector` (
`id` varchar(36) NOT NULL,
`embedding` vector(768) NOT NULL COMMENT 'hnsw(distance=cosine)',
`document` text DEFAULT NULL,
`meta` json DEFAULT NULL,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin

可以在 TiDB Cloud 中直接创建该表。

导出成 csv 的数据格式为(省略部分内容):

"id","embedding","document","meta","create_time","update_time"
"00a2ad02-eff5-4649-947f-820db0d24afa","[-0.08534411,0.048610855,0.018906716,0.023978366,***********-0.023846595,0.06352842,0.07482053]","— 22 —   (七)移交利用共用部位、共用设施设备经营的相关资料、\n物业服务费用和公共水电分摊费用交纳记录等资料;   (八)法律、法规员会和物业服务企业。","{\"page\": 21, \"source\": \"./湖北省物业服务和管理条例.pdf\"}","2024-10-15 08:18:16","2024-10-15 08:18:16"
​

已知 Milvus 导出的 csv 的数文件,根据对应关系,其实就是embedding 对应 vector,document 对应 text,meta 对应 page加source。这样逻辑就清晰了。根据对应关系编写数据整理的脚本:

import pandas as pd
import json
from uuid import uuid4
from datetime import datetime
​
# 读取CSV文件
input_csv = 'milvus_data.csv'  # 替换为你的CSV文件名
df = pd.read_csv(input_csv)
​
# 创建新的DataFrame
output_data = []
​
for _, row in df.iterrows():# 提取需要的字段id_value = str(uuid4())  # 生成唯一IDembedding = f"[{','.join(row['vector'].split(','))}]"  # 将vector转换为嵌入格式document = row['text']# 生成meta信息meta_dict = {"page": row['page'], "source": row['source']}meta = json.dumps(meta_dict, ensure_ascii=False)  # 首先生成正常的JSON# meta = meta.replace('"', '\\"')  # 转义双引号
​create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")update_time = create_time  # 更新时同样的时间
​# 添加到输出数据output_data.append({"id": id_value,"embedding": embedding,"document": document,"meta": meta,"create_time": create_time,"update_time": update_time})
​
# 转换为DataFrame
output_df = pd.DataFrame(output_data)
​
# 保存为CSV文件
output_csv = 'output.csv'  # 输出文件名
output_df.to_csv(output_csv, index=False, quoting=1)  # quoting=1用于确保字符串加引号
​
print(f"转换完成,已保存为 {output_csv}")
​

数据整理完成后,就可以导入数据到 TiDB Cloud 中了。

五、导入数据到 TiDB Cloud

TiDB Cloud 提供三种导入方式:

image-20241016171123690

本次使用本地上传的方式。

小于 50MiB 的 csv文件可以使用第一种上传本地文件的方式,如果文件大于 50 MiB,可以使用脚本将文件拆分为多个较小的文件再上传:

image.png

上传文件后,选择已经创建好的库和表,点击 define table

image-20241016171658449

调整好对应关系,点击 start import 即可

image-20241016171812870

更多的导入方式可以查看文档: Migration and Import Overview

六、验证结果

数据成功导入之后,就需要开始验证数据了。于是我修改了 RAG应用的代码,分别从 Milvus 和 TiDB 中读取向量数据,使用同一个问题,来让大模型返回答案,查看答案是否类似。

from langchain_community.llms import Ollama
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain import hub
from langchain.chains import RetrievalQA
from langchain.vectorstores.milvus import Milvus
from langchain_community.embeddings.jina import JinaEmbeddings
from langchain_community.vectorstores import TiDBVectorStore
import os
​
llm = Ollama(
model="llama3",
callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]
),
stop=["<|eot_id|>"],
)
​
​
embeddings = JinaEmbeddings(jina_api_key="xxxx", model_name="jina-embeddings-v2-base-zh")
​
vector_store_milvus = Milvus(embedding_function=embeddings,connection_args={"uri": "http://10.3.xx.xx:19530"},
)
​
​
TIDB_CONN_STR="mysql+pymysql://xxxx.root:password@host:4000/test?ssl_ca=/Downloads/isrgrootx1.pem&ssl_verify_cert=true&ssl_verify_identity=true"
vector_store_tidb = TiDBVectorStore(connection_string=TIDB_CONN_STR,embedding_function=embeddings,table_name="langchain_vector",
)
​
​
os.environ["LANGCHAIN_API_KEY"] = "xxxx"
query = input("\nQuery: ")
prompt = hub.pull("rlm/rag-prompt")   
​
qa_chain = RetrievalQA.from_chain_type(llm, retriever=vector_store_milvus.as_retriever(), chain_type_kwargs={"prompt": prompt}
)
print("milvus")
result = qa_chain({"query": query})
​
print("\n--------------------------------------")
print("tidb")
qa_chain = RetrievalQA.from_chain_type(llm, retriever=vector_store_tidb.as_retriever(), chain_type_kwargs={"prompt": prompt}
)
result = qa_chain({"query": query})

其中 TiDB 的连接串可以直接从 TiDB Cloud 中获取:

image-20241017100552250

向 RAG 应用提问之后,查看回答,发现 Milvus 和 TiDB 的回答基本一致,说明向量迁移是成功的。还可以更进一步,比对数据条数,如果一致,那么迁移应该已经成功,没有丢失数据。

RAG 应用的执行结果如下图:

image-20241016172840498

七、总结

不同数据库之间的数据迁移,本质上是将数据转换为所有数据库都能识别的通用格式,向量数据也不例外。本次迁移与传统的关系型数据库迁移有所不同,尽管 RAG 应用使用了 LangChain,但 LangChain 针对不同的数据库,创建的表结构和数据格式是不同的,因此需要对数据和表结构进行额外的整理,才能顺利将数据迁移至目标数据库。值得庆幸的是,TiDB Cloud 提供了多种便捷的数据导入方式,使迁移过程相对简单。


http://www.ppmy.cn/devtools/127697.html

相关文章

Elasticsearch入门:增删改查详解与实用场景

引言 在我之前做社交架构设计的时候&#xff0c;我们有一项关键且必要的需求&#xff1a;需要存储并记录用户的所有聊天记录。这些记录不仅用于业务需求&#xff0c;也承担了风控审查的职责。因此&#xff0c;在架构设计中&#xff0c;我们需要考虑每天海量的聊天消息量&#…

滚雪球学Redis[6.3讲]:Redis分布式锁的实战指南:从基础到Redlock算法

全文目录&#xff1a; &#x1f389;前言&#x1f6a6;Redis分布式锁的概念与应用场景&#x1f343;1.1 什么是分布式锁&#xff1f;&#x1f342;1.2 应用场景 ⚙️使用Redis实现分布式锁&#x1f33c;2.1 基本思路&#x1f33b;2.2 示例代码&#x1f940;2.3 代码解析 &#…

JMeter之mqtt-jmeter 插件介绍

前言 mqtt-jmeter插件是JMeter中的一个第三方插件&#xff0c;用于支持MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;协议的性能测试。MQTT是一种轻量级的发布/订阅消息传输协议&#xff0c;广泛应用于物联网和传感器网络中。 一、安装插件 mqtt-jmeter项目…

ChatGLM-6B大模型 + Bert预训练模型 + RAG实现知识库信息抽取(含完整代码)

‌ 目录 RAG技术 知识库准备 检索相关文本 生成问答 完整代码 本文将基于ChatGLM-6B大模型、Bert预训练模型和RAG完整金融知识库信息抽取任务。 RAG技术 RAG技术,即检索增强生成(Retrieval-Augmented Generation),是一种结合了检索和生成技术的模型。‌ 它通…

电影评论网站开发:Spring Boot技术指南

3系统分析 3.1可行性分析 通过对本电影评论网站实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本电影评论网站采用SSM框架&#xff0c;JAVA作为开发语言&#…

Uncaught (in promise) TypeError: Cannot convert object to primitive value

使用vue3的时候报了这个错误&#xff0c;而且还同时报了一个警告 说一下我这里这个错误和警告的原因&#xff0c;是因为我把传给 第三方组件的值 也当做了 第三方组件的 ref&#xff0c;可能没太说清楚&#xff0c;所以接下来看代码&#xff0c;我这里使用的第三方组件是 vxe-t…

Django操作数据库

Django操作数据库 1、ORM框架2、ORM-创建数据库3、ORM-连接数据库4、ORM-操作表4.1、类创建表4.2、修改表结构4.2.1、删除表结构4.2.2、新增表结构 5、ORM-增删改查5.1、新增数据5.2、删除数据5.3、查询数据5.4、更新数据 1、ORM框架 Django开发操作数据库很简单&#xff0c;内…

MySQL(python开发)——(3)表数据的基本操作,增删改查

MySQL&#xff08;python开发)——&#xff08;1&#xff09;数据库概述及其MySQL介绍 MySQL&#xff08;python开发)——&#xff08;2&#xff09;数据库基本操作及数据类型 MySQL—— 表数据基本操作 一、表中插入(insert)数据——增 insert into 表名 values (值1&#…