使用 SQL 和表格数据进行问答和 RAG(7)—将表格数据(CSV 或 Excel 文件)加载到向量数据库(ChromaDB)中

embedded/2025/1/11 23:18:10/

将表格数据(CSV 或 Excel 文件)加载到向量数据库(ChromaDB)中。这里定义的类 PrepareVectorDBFromTabularData,它的主要功能是读取表格数据文件到DataFrame中、生成嵌入向量、并将这些数据存储在向量数据库的集合中,同时对注入的数据进行验证。


代码结构与功能分析

1. 类的简介
  • 目标:将 CSV 或 Excel 文件的数据转换为向量并存储到 ChromaDB 中。
  • 主要方法
    • run_pipeline:运行整个数据处理管道。
    • _load_dataframe:加载数据文件为 Pandas DataFrame。
    • _prepare_data_for_injection:根据文件内容生成向量和相关元数据。
    • _inject_data_into_chromadb:将数据注入到 ChromaDB 中。
    • _validate_db:验证向量数据库的集合内容。

2. 初始化 (__init__)
def __init__(self, file_directory:str) -> None:self.APPCFG = LoadConfig()self.file_directory = file_directory
  • 参数
    • file_directory:待处理文件的路径。
  • 功能
    • 加载配置对象 self.APPCFG,其中包含数据库和嵌入生成器的实例。
    • 初始化文件路径 file_directory

3. 运行数据处理管道 (run_pipeline)
def run_pipeline(self):self.df, self.file_name = self._load_dataframe(file_directory=self.file_directory)self.docs, self.metadatas, self.ids, self.embeddings = self._prepare_data_for_injection(df=self.df, file_name=self.file_name)self._inject_data_into_chromadb()self._validate_db()
  • 功能
    • 加载数据文件(调用 _load_dataframe)。
    • 准备向量和相关元数据(调用 _prepare_data_for_injection)。
    • 将数据注入到 ChromaDB 集合中(调用 _inject_data_into_chromadb)。
    • 验证注入是否成功(调用 _validate_db)。

4. 加载数据文件 (_load_dataframe)
def _load_dataframe(self, file_directory: str):file_names_with_extensions = os.path.basename(file_directory)file_name, file_extension = os.path.splitext(file_names_with_extensions)if file_extension == ".csv":df = pd.read_csv(file_directory)return df, file_nameelif file_extension == ".xlsx":df = pd.read_excel(file_directory)return df, file_nameelse:raise ValueError("The selected file type is not supported")
  • 参数
    • file_directory:待加载的文件路径。
  • 功能
    • 根据文件扩展名(.csv.xlsx),将文件加载为 Pandas DataFrame。
    • 返回加载的数据和文件名(不带扩展名)。
  • 异常
    • 如果文件类型不被支持,则抛出 ValueError

5. 准备数据 (_prepare_data_for_injection)
def _prepare_data_for_injection(self, df:pd.DataFrame, file_name:str):docs = []metadatas = []ids = []embeddings = []for index, row in df.iterrows():output_str = ""for col in df.columns:output_str += f"{col}: {row[col]},\n"response = self.APPCFG.OpenAIEmbeddings.embed_documents(output_str)[0]embeddings.append(response)docs.append(output_str)metadatas.append({"source": file_name})ids.append(f"id{index}")return docs, metadatas, ids, embeddings
  • 参数
    • df:待处理的 Pandas DataFrame。
    • file_name:文件名,用于生成元数据。
  • 功能
    • 遍历 DataFrame 的每一行,将行数据格式化为字符串 output_str
    • 使用 OpenAIEmbeddings.embed_documents 为字符串生成向量。
    • 保存生成的文档、元数据、唯一 ID 和向量。
  • 返回值
    • 文档列表、元数据列表、ID 列表和向量列表。

6. 注入数据到 ChromaDB (_inject_data_into_chromadb)
def _inject_data_into_chromadb(self):chroma_client = self.APPCFG.chroma_clientexisting_collections = chroma_client.list_collections()collection_name = self.APPCFG.collection_nameexisting_collection_names = [collection.name for collection in existing_collections]if collection_name in existing_collection_names:collection = chroma_client.get_collection(name=collection_name)print(f"Retrieved existing collection: {collection_name}")else:collection = chroma_client.create_collection(name=collection_name)print(f"Created new collection: {collection_name}")collection.add(documents=self.docs,metadatas=self.metadatas,embeddings=self.embeddings,ids=self.ids)print("Data is stored in ChromaDB.")
  • 功能
    • 检查集合是否已存在。如果存在,则获取;否则,创建新集合。
    • 将文档、元数据、嵌入向量和 ID 添加到集合中。
  • 异常处理
    • 避免重复创建集合。

7. 验证数据库内容 (_validate_db)
def _validate_db(self):vectordb = self.APPCFG.chroma_client.get_collection(name=self.APPCFG.collection_name)print("Number of vectors in vectordb:", vectordb.count())
  • 功能
    • 获取集合并打印其中向量的数量,确认数据是否注入成功。

代码运行结果:
在这里插入图片描述

总结

这段代码的整体流程如下:

  1. 加载 CSV 或 Excel 文件,转换为 Pandas DataFrame。
  2. 遍历 DataFrame 的每一行,生成文档、元数据和嵌入向量。
  3. 将生成的数据注入到 ChromaDB 的集合中。
  4. 验证数据库集合中的向量数量,确保注入成功。

需要注意文件格式支持、嵌入生成器和 ChromaDB 客户端的兼容性问题。

完整代码:

import os
import pandas as pd
from utils.load_config import LoadConfig
import pandas as pdclass PrepareVectorDBFromTabularData:"""This class is designed to prepare a vector database from a CSV and XLSX file.It then loads the data into a ChromaDB collection. The process involvesreading the CSV file, generating embeddings for the content, and storing the data in the specified collection.Attributes:APPCFG: Configuration object containing settings and client instances for database and embedding generation.file_directory: Path to the CSV file that contains data to be uploaded."""def __init__(self, file_directory:str) -> None:"""Initialize the instance with the file directory and load the app config.Args:file_directory (str): The directory path of the file to be processed."""self.APPCFG = LoadConfig()self.file_directory = file_directorydef run_pipeline(self):"""Execute the entire pipeline for preparing the database from the CSV.This includes loading the data, preparing the data for injection, injectingthe data into ChromaDB, and validating the existence of the injected data."""self.df, self.file_name = self._load_dataframe(file_directory=self.file_directory)self.docs, self.metadatas, self.ids, self.embeddings = self._prepare_data_for_injection(df=self.df, file_name=self.file_name)self._inject_data_into_chromadb()self._validate_db()def _load_dataframe(self, file_directory: str):"""Load a DataFrame from the specified CSV or Excel file.Args:file_directory (str): The directory path of the file to be loaded.Returns:DataFrame, str: The loaded DataFrame and the file's base name without the extension.Raises:ValueError: If the file extension is neither CSV nor Excel."""file_names_with_extensions = os.path.basename(file_directory)print(file_names_with_extensions)file_name, file_extension = os.path.splitext(file_names_with_extensions)if file_extension == ".csv":df = pd.read_csv(file_directory)return df, file_nameelif file_extension == ".xlsx":df = pd.read_excel(file_directory)return df, file_nameelse:raise ValueError("The selected file type is not supported")def _prepare_data_for_injection(self, df:pd.DataFrame, file_name:str):"""Generate embeddings and prepare documents for data injection.Args:df (pd.DataFrame): The DataFrame containing the data to be processed.file_name (str): The base name of the file for use in metadata.Returns:list, list, list, list: Lists containing documents, metadatas, ids, and embeddings respectively."""docs = []metadatas = []ids = []embeddings = []for index, row in df.iterrows():output_str = ""# Treat each row as a separate chunkfor col in df.columns:output_str += f"{col}: {row[col]},\n"response = self.APPCFG.OpenAIEmbeddings.embed_documents(output_str)[0]embeddings.append(response)docs.append(output_str)metadatas.append({"source": file_name})ids.append(f"id{index}")return docs, metadatas, ids, embeddingsdef _inject_data_into_chromadb(self):"""Inject the prepared data into ChromaDB.Raises an error if the collection_name already exists in ChromaDB.The method prints a confirmation message upon successful data injection."""chroma_client = self.APPCFG.chroma_client# 列出所有集合的名称existing_collections = chroma_client.list_collections()collection_name = self.APPCFG.collection_name #"titanic_small"# 获取所有集合existing_collections = chroma_client.list_collections()# 提取集合名称existing_collection_names = [collection.name for collection in existing_collections]if collection_name in existing_collection_names:# 如果集合存在,获取它collection = chroma_client.get_collection(name=collection_name)print(f"Retrieved existing collection: {collection_name}")else:# 如果集合不存在,创建它collection = chroma_client.create_collection(name=collection_name)print(f"Created new collection: {collection_name}")collection.add(documents=self.docs,metadatas=self.metadatas,embeddings=self.embeddings,ids=self.ids)print("==============================")print("Data is stored in ChromaDB.")     def _validate_db(self):"""Validate the contents of the database to ensure that the data injection has been successful.Prints the number of vectors in the ChromaDB collection for confirmation."""vectordb =  self.APPCFG.chroma_client.get_collection(name=self.APPCFG.collection_name)print("==============================")print("Number of vectors in vectordb:", vectordb.count())print("==============================")

http://www.ppmy.cn/embedded/153147.html

相关文章

Electron使用记录

Electron 参考引用 参考文档: ElectronVue3.2TypeScriptVite开发桌面端 - 掘金 (juejin.cn) 如何用Electronvuevite构建桌面端应用(一) - 掘金 (juejin.cn) Electron教程(三)如何打包 electron 程序:electron-forge 的使用教程-C…

CRTP编程模式(奇异递归模板)实现客户端的https管理模块

一、什么是 CRTP(Curiously Recurring Template Pattern)? CRTP 是 C 中的一种编程技巧,它利用模板机制来实现静态多态性。 简单来说,CRTP 是指派生类在继承基类时,将自身作为模板参数传递给基类。通过这种…

Current offset 978 for partition [topic,0] out of range; reset offset to 979

这个错误信息出现在使用Apache Kafka时,表示消费者在尝试读取某个分区的数据时,当前的偏移量(offset)已经不在可用的范围内。具体来说,消费者尝试读取的偏移量978超出了可用范围,因此Kafka自动将偏移量重置…

【ACM独立出版 - 往届 EI Scopus 检索记录 | 教育,计算机相关主题均可投稿】第六届大数据与信息化教育国际学术会议(ICBDIE 2025)

高录用 | 快EI检索 | 年度高届数重磅会议 | 高校联合支持举办 ACM 独立出版!见刊检索快速稳定!往届 EI & Scopus 检索记录 教 育,计算机相关主题均可投稿 支持单位:西北师范大学、西交利物浦大学、University of Leicester…

【Vue.js 组件化】高效组件管理与自动化实践指南

文章目录 摘要引言组件命名规范与组织结构命名规范目录组织 依赖管理工具自动化组件文档生成构建自动引入和文档生成的组件化体系代码结构自动引入组件配置使用 Storybook 展示组件文档自动生成 代码详解QA 环节总结参考资料 摘要 在现代前端开发中,组件化管理是 V…

Redis 全维度深度剖析:从基础架构到实战应用

一、Redis 简介 Redis(Remote Dictionary Server)是一款开源的、基于内存的数据结构存储系统,由 Salvatore Sanfilippo(网名:antirez)于 2009 年开发并开源。它以其高性能、丰富的数据结构以及简单易用的特…

计算机毕业设计Python机器学习农作物健康识别系统 人工智能 图像识别 机器学习 大数据毕业设计 算法

温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…

551 灌溉

常规解法&#xff1a; #include<bits/stdc.h> using namespace std; int n,m,k,t; const int N105; bool a[N][N],b[N][N]; int cnt; //设置滚动数组来存贮当前和下一状态的条件 //处理传播扩散问题非常有效int main() {cin>>n>>m>>t;for(int i1;i&l…