一、概述
检索增强生成(Retrieval-Augmented Generation,RAG)已成为大型语言模型(LLM)应用的重要架构,通过结合外部知识库来增强模型的回答能力,特别是在处理专业领域知识、最新信息或企业私有数据时。本报告将系统梳理使用 Elasticsearch(ES)作为向量数据库实现 RAG 系统的优缺点,与传统向量数据库及其他存储解决方案的对比,以及基于 Deepseek V3 和 Qwen2.5 大模型的实现方案。
二、Elasticsearch 作为 RAG 向量数据库的优缺点分析
1. 优点
低门槛的独立技术栈
- 一站式解决方案:ES 能够一站式完成向量生成、存储、索引和检索,通过配置即可实现大部分功能
- 成熟的生态系统:作为成熟的搜索引擎,拥有丰富的文档、社区支持和工具集
- 简化的部署和维护:相比需要部署多个组件的解决方案,ES 可以作为单一系统处理所有 RAG 相关任务
高性能和扩展性
- 分布式架构:支持百万级 QPS 和千亿级数据量
- 灵活的扩展能力:可以通过添加节点水平扩展,满足不断增长的数据需求
- 高可用性:内置的分片和复制机制确保系统的可靠性
混合检索能力
- 文本与向量的结合:同时支持传统的全文检索和向量相似性搜索
- 提高检索精度:混合检索策略能够显著提升搜索结果的准确性和多样性
- 处理短查询优势:对于短查询,传统关键词搜索可以弥补纯向量搜索的不足
丰富的数据处理能力
- 强大的文本分析:内置多种语言分析器,支持分词、同义词、停用词等处理
- 结构化和非结构化数据支持:可以同时处理结构化字段和非结构化文本
- 聚合和分析功能:提供丰富的聚合功能,可用于数据分析和可视化
多种相似度计算算法支持
- 余弦相似度(Cosine Similarity):适用于文本语义搜索,不受向量长度影响
- 点积(Dot Product):适用于推荐系统,考虑向量长度和方向
- 欧几里得距离(L2 Norm):适用于图像特征、地理位置等场景
- 脚本评分(Script Score):支持自定义脚本实现更复杂的相似度计算
- 函数评分(Function Score):允许结合衰减函数、字段值等因素调整相似度分数
与大模型的无缝集成
- 简化的集成流程:提供 API 和工具,便于与各种大模型集成
- 灵活的检索配置:可以根据不同大模型的特点调整检索策略
2. 不足
向量搜索性能限制
- 非专用架构:ES 的架构不是专为向量搜索设计的,在大规模向量搜索时性能可能不如专用向量数据库
- 延迟问题:在大规模向量集上,搜索延迟通常为毫秒级,而专用向量数据库可达微秒级
- 资源消耗较高:向量操作可能需要更多的内存和计算资源
向量功能相对有限
- 算法支持有限:支持的向量索引和搜索算法相对较少,主要是 HNSW
- 缺乏专业优化:缺少针对向量操作的专门优化,如 GPU 加速
- 向量维度限制:在处理超高维向量时可能存在效率问题
学习和配置复杂性
- 配置复杂:需要正确配置索引映射、分片策略等
- 调优难度:优化 ES 性能需要专业知识和经验
- 维护成本:需要定期维护和监控集群状态
存储效率问题
- 存储开销:存储向量数据可能需要更多空间
- 索引大小:包含向量的索引通常比纯文本索引大得多
三、Elasticsearch 向量存储与检索原理
1. 向量数据存储原理
Elasticsearch 使用 dense_vector 字段类型来存储向量数据,其工作原理如下:
基本存储结构
- 文档结构:向量被存储为文档的一个字段,与其他字段(如文本、数字等)一起构成完整的文档。
- 向量表示:向量以浮点数数组的形式存储,每个数组元素对应向量的一个维度。
- 索引映射:通过索引映射定义向量字段的属性,包括维度大小、索引方式和相似度计算方法。
存储过程简述
-
1. 向量生成:通过嵌入模型将文本、图像等内容转换为固定维度的向量。
-
2. 文档创建:创建包含向量字段的文档,将向量数据与其他元数据一起存储。
-
3. 分片分配:ES 将文档分配到不同的分片中,每个分片可以位于不同的节点上。
-
4. 磁盘存储:向量数据最终以 Lucene 索引格式存储在磁盘上,同时部分热数据会缓存在内存中。
2. 向量检索原理
Elasticsearch 主要使用 HNSW(Hierarchical Navigable Small World)算法进行向量检索,这是一种近似最近邻(ANN)搜索算法:
HNSW 算法简介
- 多层图结构:HNSW 构建一个多层的图结构,顶层包含少量节点,底层包含所有节点。
- 导航原理:搜索从顶层开始,通过"贪心"策略快速找到大致方向,然后在下层进行更精细的搜索。
- 近似搜索:通过牺牲一定的精确度来换取显著的性能提升。
检索过程
-
1. 查询向量生成:将用户查询转换为向量表示。
-
2. 分片搜索:在每个相关分片上执行向量搜索。
-
3. 相似度计算:根据配置的相似度方法(余弦、点积或 L2 范数)计算查询向量与索引向量的相似度。
-
4. 结果合并:将各分片的搜索结果合并,并按相似度排序。
-
5. 返回结果:返回最相似的文档作为搜索结果。
性能优化参数
- ef_construction:构建索引时的精度参数,值越大索引质量越高但构建越慢。
- ef_search:搜索时的精度参数,值越大搜索结果越精确但速度越慢。
- m:每个节点的最大连接数,影响图的连通性和搜索效率。
3. 多模态向量存储支持
Elasticsearch 不仅可以存储文本向量,还可以存储来自图片、音频和视频等多模态数据的向量表示:
图像向量存储
- 实现原理:使用图像嵌入模型(如 ResNet、ViT 等)将图像转换为向量表示。
- 存储方式:与文本向量相同,使用 dense_vector 字段存储图像向量。
- 应用场景:图像相似度搜索、以图搜图、视觉内容检索。
音频向量存储
- 实现原理:使用音频嵌入模型(如 Wav2Vec、CLAP 等)将音频转换为向量表示。
- 存储方式:同样使用 dense_vector 字段,维度根据音频嵌入模型而定。
- 应用场景:音乐推荐、语音搜索、音频内容分类。
视频向量存储
- 实现原理:可以通过两种方式处理:
- 将视频分解为关键帧,然后使用图像嵌入模型处理每个关键帧。
- 使用专门的视频嵌入模型(如 Video Transformers)直接生成视频向量。
- 存储方式:可以存储单个视频的整体向量,或者存储多个关键帧的向量序列。
- 应用场景:视频内容搜索、相似视频推荐、视频分类。
多模态向量存储的简单示例
# 图像向量存储示例
from PIL import Image
from transformers import AutoFeatureExtractor, AutoModel
import torch# 加载图像嵌入模型
image_model = AutoModel.from_pretrained("openai/clip-vit-base-patch32")
feature_extractor = AutoFeatureExtractor.from_pretrained("openai/clip-vit-base-patch32")# 处理图像
image = Image.open("example.jpg")
inputs = feature_extractor(images=image, return_tensors="pt")
with torch.no_grad():image_features = image_model.get_image_features(**inputs)
image_vector = image_features[0].tolist()# 存储到Elasticsearch
doc = {"title": "示例图片","description": "这是一张示例图片","image_path": "example.jpg","image_vector": image_vector # 使用dense_vector字段存储
}es.index(index="image-vectors", document=doc)
四、Elasticsearch 与其他向量存储解决方案的对比
1. 各向量存储解决方案综合对比
特性 | Elasticsearch | PGVector (PostgreSQL) | Redis Vector | MongoDB Atlas | FAISS | Milvus | Pinecone | Qdrant | Chroma |
架构类型 | 分布式搜索引擎 | 关系型数据库扩展 | 内存数据库扩展 | 文档数据库扩展 | 库/嵌入式 | 分布式向量数据库 | 托管向量数据库 | 向量数据库 | 嵌入式向量数据库 |
部署模式 | 自托管/云服务 | 自托管/云服务 | 自托管/云服务 | 自托管/云服务 | 嵌入式 | 自托管/云服务 | 仅云服务 | 自托管/云服务 | 嵌入式/自托管 |
向量索引算法 | HNSW | HNSW, IVF | HNSW | HNSW | HNSW, IVF, PQ 等多种 | HNSW, IVF, FLAT 等多种 | HNSW 变体 | HNSW | HNSW |
查询性能 | 毫秒级 | 毫秒级 | 亚毫秒级 | 毫秒级 | 微秒-毫秒级 | 微秒-毫秒级 | 毫秒级 | 毫秒级 | 毫秒级 |
扩展性 | 优秀 | 有限 | 有限 | 优秀 | 有限 | 优秀 | 优秀 | 良好 | 有限 |
混合查询 | 原生支持 | 支持 SQL+向量 | 有限支持 | 支持文档+向量 | 不支持 | 支持 | 有限支持 | 支持 | 支持 |
元数据过滤 | 强大 | 强大(SQL) | 有限 | 强大(文档查询) | 有限 | 支持 | 支持 | 支持 | 支持 |
GPU 加速 | 不支持 | 不支持 | 不支持 | 不支持 | 支持 | 支持 | 内部支持 | 不支持 | 不支持 |
存储容量 | TB-PB 级 | TB 级 | GB-TB 级 | TB-PB 级 | 受内存限制 | TB 级 | TB 级 | TB 级 | GB-TB 级 |
全文搜索 | 优秀 | 基础 | 良好 | 基础 | 不支持 | 有限 | 不支持 | 有限 | 有限 |
相似度算法 | 余弦、点积、L2 范数 | 余弦、L2 范数 | 余弦、L2 范数 | 余弦、点积、欧几里得 | 多种(余弦、内积、L2 等) | 多种(余弦、内积、L2 等) | 余弦、点积 | 余弦、点积、欧几里得 | 余弦 |
事务支持 | 有限 | 完整 ACID | 有限 | 文档级事务 | 不支持 | 有限 | 不支持 | 不支持 | 不支持 |
易用性 | 中等 | 高(SQL 熟悉度) | 高 | 高(MongoDB 用户) | 复杂 | 中等 | 简单 | 简单 | 简单 |
维护成本 | 高 | 中等 | 低 | 中等 | 低(嵌入式) | 中 | 低(托管) | 中 | 低 |
社区支持 | 强大 | 强大 | 强大 | 强大 | 活跃 | 活跃 | 有限 | 活跃 | 活跃 |
成本 | 开源,自托管 | 开源,自托管 | 开源+商业版 | 开源+商业版 | 开源,免费 | 开源+商业版 | 商业版,按量付费 | 开源+商业版 | 开源,免费 |
适用场景 | 混合搜索 | 结构化数据+向量 | 高性能、低延迟 | 半结构化数据+向量 | 高性能向量搜索 | 大规模向量管理 | 简单部署、高可用 | 中小规模应用 | 快速原型开发 |
2. 不同相似度计算方法对比
相似度方法 | 数学表达式 | 值域 | 适用场景 | 特点 | 支持的存储系统 |
余弦相似度 | cos(θ) = A·B / (‖A‖·‖B‖) | [-1, 1] | 文本语义搜索 | 不受向量长度影响,只考虑方向 | 全部 |
点积 | A·B = ∑(A_i × B_i) | 无限制 | 推荐系统 | 考虑向量长度和方向,通常需要归一化 | ES, MongoDB, FAISS, Milvus, Pinecone, Qdrant |
欧几里得距离 | ‖A-B‖ = √(∑(A_i - B_i)²) | [0, ∞) | 图像特征、地理位置 | 值越小表示越相似,需要转换为相似度 | ES, PGVector, Redis, MongoDB, FAISS, Milvus, Qdrant |
曼哈顿距离 | ∑|A_i - B_i| | [0, ∞) | 网格空间、特征差异 | 计算简单,对异常值不敏感 | FAISS, Milvus |
杰卡德相似度 | J(A,B) = |A∩B| / |A∪B| | [0, 1] | 集合比较、文档相似度 | 适用于二进制特征或集合 | ES(脚本), PGVector(SQL) |
汉明距离 | H(A,B) = ∑(A_i ⊕ B_i) | [0, dim] | 二进制特征、错误检测 | 计算二进制向量中不同位的数量 | FAISS |
3. 适用场景对比总结
解决方案 | 最适合场景 | 不适合场景 |
Elasticsearch | 需要混合搜索(文本+向量)、已有 ES 基础设施、需要复杂文本处理 | 超大规模纯向量搜索、极低延迟要求、GPU 加速需求 |
PGVector | 已有 PostgreSQL 基础设施、需要事务支持、结构化数据+向量 | 超大规模向量集、分布式部署需求 |
Redis Vector | 超低延迟需求、缓存层向量搜索、临时数据 | 持久化要求高、复杂查询、大规模数据 |
MongoDB Atlas | 半结构化数据、文档+向量混合、MongoDB 用户 | 复杂文本分析、极高性能要求 |
FAISS | 纯向量搜索、算法研究、GPU 加速、嵌入式应用 | 需要持久化、分布式部署、元数据过滤 |
Milvus | 大规模向量管理、需要丰富索引算法、混合查询 | 简单应用、资源受限环境 |
Pinecone | 快速部署、无运维需求、按需扩展 | 自托管需求、成本敏感、复杂查询 |
Qdrant | 中小规模应用、需要良好元数据过滤、开源需求 | 超大规模、GPU 加速需求 |
Chroma | 快速原型开发、简单应用、本地部署 | 企业级应用、大规模数据、高并发 |
五、基于 Elasticsearch 的 RAG 实现方案
1. 系统架构设计
基于 Elasticsearch 实现 RAG 系统的整体架构如下:
-
1. 数据处理层:负责文档的收集、清洗、分块和向量化
-
2. 存储层:使用 Elasticsearch 存储文本内容和向量表示
-
3. 检索层:实现混合检索策略,结合文本和向量搜索
-
4. 生成层:集成 Deepseek V3 或 Qwen2.5 大模型,基于检索结果生成回答
-
5. 应用层:提供用户界面和 API 接口
2. 实现步骤详解
2.1 环境准备
首先,我们需要安装必要的依赖:
# 安装必要的库
pip install elasticsearch langchain langchain-elasticsearch langchain-community langchain-openai
pip install deepseek-ai qwen-api
2.2 Elasticsearch 配置
设置 Elasticsearch 集群并创建适合 RAG 的索引,展示不同相似度计算方法:
from elasticsearch import Elasticsearch# 连接到Elasticsearch
es = Elasticsearch(hosts=["http://localhost:9200"],basic_auth=("user", "password"), # 如果有认证request_timeout=60
)# 创建索引映射,使用cosine相似度
cosine_index_mapping = {"mappings": {"properties": {"title": {"type": "text", "analyzer": "standard"},"content": {"type": "text", "analyzer": "standard"},"source": {"type": "keyword"},"content_vector": {"type": "dense_vector","dims": 1536, # 根据嵌入模型调整维度"index": True,"similarity": "cosine" # 余弦相似度}}},"settings": {"number_of_shards": 2,"number_of_replicas": 1}
}# 创建使用余弦相似度的索引
es.indices.create(index="rag-knowledge-cosine", body=cosine_index_mapping)# 创建索引映射,使用dot_product相似度
dot_product_index_mapping = {"mappings": {"properties": {"title": {"type": "text", "analyzer": "standard"},"content": {"type": "text", "analyzer": "standard"},"source": {"type": "keyword"},"content_vector": {"type": "dense_vector","dims": 1536,"index": True,"similarity": "dot_product" # 点积相似度}}},"settings": {"number_of_shards": 2,"number_of_replicas": 1}
}# 创建使用点积相似度的索引
es.indices.create(index="rag-knowledge-dot-product", body=dot_product_index_mapping)# 创建索引映射,使用l2_norm相似度
l2_norm_index_mapping = {"mappings": {"properties": {"title": {"type": "text", "analyzer": "standard"},"content": {"type": "text", "analyzer": "standard"},"source": {"type": "keyword"},"content_vector": {"type": "dense_vector","dims": 1536,"index": True,"similarity": "l2_norm" # 欧几里得距离(L2范数)}}},"settings": {"number_of_shards": 2,"number_of_replicas": 1}
}# 创建使用L2范数相似度的索引
es.indices.create(index="rag-knowledge-l2-norm", body=l2_norm_index_mapping)
2.3 文档处理与索引
实现文档的处理、分块和向量化,并分别存储到不同相似度计算方法的索引中:
import os
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings # 也可以使用其他嵌入模型# 加载文档
loader = DirectoryLoader("./documents", glob="**/*.txt", loader_cls=TextLoader)
documents = loader.load()# 文档分块
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000,chunk_overlap=200,length_function=len,
)
chunks = text_splitter.split_documents(documents)# 初始化嵌入模型
embeddings = OpenAIEmbeddings() # 可替换为其他嵌入模型# 处理文档并索引到不同的Elasticsearch索引
for i, chunk in enumerate(chunks):# 生成向量嵌入vector = embeddings.embed_query(chunk.page_content)# 准备索引文档doc = {"title": f"Chunk {i} from {chunk.metadata.get('source', 'unknown')}","content": chunk.page_content,"source": chunk.metadata.get("source", "unknown"),"content_vector": vector}# 索引文档到不同相似度计算方法的索引es.index(index="rag-knowledge-cosine", document=doc)# 对于dot_product,通常需要归一化向量es.index(index="rag-knowledge-dot-product", document=doc)# 对于l2_norm,直接使用原始向量es.index(index="rag-knowledge-l2-norm", document=doc)# 刷新索引
es.indices.refresh(index="rag-knowledge-cosine")
es.indices.refresh(index="rag-knowledge-dot-product")
es.indices.refresh(index="rag-knowledge-l2-norm")
2.4 使用 LangChain 与 Elasticsearch 实现 RAG
下面是使用 LangChain 与 Elasticsearch 结合实现 RAG 的代码,支持不同相似度计算方法:
from langchain_elasticsearch import ElasticsearchStore
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain.schema import Document# 初始化不同相似度计算方法的Elasticsearch向量存储
es_store_cosine = ElasticsearchStore(es_url="http://localhost:9200",index_name="rag-knowledge-cosine",embedding=embeddings,es_user="user", # 如果有认证es_password="password" # 如果有认证
)es_store_dot_product = ElasticsearchStore(es_url="http://localhost:9200",index_name="rag-knowledge-dot-product",embedding=embeddings,es_user="user",es_password="password"
)es_store_l2_norm = ElasticsearchStore(es_url="http://localhost:9200",index_name="rag-knowledge-l2-norm",embedding=embeddings,es_user="user",es_password="password"
)# 创建不同相似度计算方法的检索器
retriever_cosine = es_store_cosine.as_retriever(search_type="similarity",search_kwargs={"k": 5}
)retriever_dot_product = es_store_dot_product.as_retriever(search_type="similarity",search_kwargs={"k": 5}
)retriever_l2_norm = es_store_l2_norm.as_retriever(search_type="similarity",search_kwargs={"k": 5}
)# 创建多相似度混合检索器
def multi_similarity_retriever(query, top_k=5):# 从三种不同相似度方法获取结果cosine_docs = retriever_cosine.get_relevant_documents(query)dot_product_docs = retriever_dot_product.get_relevant_documents(query)l2_norm_docs = retriever_l2_norm.get_relevant_documents(query)# 合并结果all_docs = cosine_docs + dot_product_docs + l2_norm_docs# 去重unique_docs = list({doc.page_content: doc for doc in all_docs}.values())# 限制返回数量return unique_docs[:top_k]# 定义提示模板
template = """
你是一个专业的AI助手。请基于以下提供的上下文信息,回答用户的问题。
如果你无法从上下文中找到答案,请直接说"我无法从提供的信息中找到答案",不要编造信息。上下文信息:
{context}用户问题: {question}回答:
"""prompt = PromptTemplate(template=template,input_variables=["context", "question"]
)# 格式化文档函数
def format_docs(docs):return "\n\n".join([doc.page_content for doc in docs])
2.5 集成 Deepseek V3 模型
from langchain_deepseek import DeepseekChat# 初始化Deepseek模型
deepseek_llm = DeepseekChat(model_name="deepseek-ai/deepseek-v3",api_key="your_api_key",temperature=0.1
)# 构建RAG链(使用余弦相似度)
deepseek_rag_chain_cosine = ({"context": retriever_cosine | format_docs, "question": RunnablePassthrough()}| prompt| deepseek_llm| StrOutputParser()
)# 构建RAG链(使用点积相似度)
deepseek_rag_chain_dot_product = ({"context": retriever_dot_product | format_docs, "question": RunnablePassthrough()}| prompt| deepseek_llm| StrOutputParser()
)# 构建RAG链(使用L2范数相似度)
deepseek_rag_chain_l2_norm = ({"context": retriever_l2_norm | format_docs, "question": RunnablePassthrough()}| prompt| deepseek_llm| StrOutputParser()
)# 构建RAG链(使用多相似度混合检索)
def answer_with_deepseek_multi_similarity(question):# 获取多相似度混合检索结果docs = multi_similarity_retriever(question)context = format_docs(docs)# 构建RAG链chain = ({"context": RunnablePassthrough(), "question": RunnablePassthrough()}| prompt| deepseek_llm| StrOutputParser())# 生成回答return chain.invoke({"context": context, "question": question})# 示例
response = answer_with_deepseek_multi_similarity("什么是向量数据库?")
print(response)
2.6 集成 Qwen2.5 模型
from langchain_community.llms import QianfanLLMEndpoint# 初始化Qwen模型
qwen_llm = QianfanLLMEndpoint(model_name="qwen-2.5",api_key="your_api_key",secret_key="your_secret_key",temperature=0.1
)# 构建RAG链(使用余弦相似度)
qwen_rag_chain_cosine = ({"context": retriever_cosine | format_docs, "question": RunnablePassthrough()}| prompt| qwen_llm| StrOutputParser()
)# 构建RAG链(使用点积相似度)
qwen_rag_chain_dot_product = ({"context": retriever_dot_product | format_docs, "question": RunnablePassthrough()}| prompt| qwen_llm| StrOutputParser()
)# 构建RAG链(使用L2范数相似度)
qwen_rag_chain_l2_norm = ({"context": retriever_l2_norm | format_docs, "question": RunnablePassthrough()}| prompt| qwen_llm| StrOutputParser()
)# 构建RAG链(使用多相似度混合检索)
def answer_with_qwen_multi_similarity(question):# 获取多相似度混合检索结果docs = multi_similarity_retriever(question)context = format_docs(docs)# 构建RAG链chain = ({"context": RunnablePassthrough(), "question": RunnablePassthrough()}| prompt| qwen_llm| StrOutputParser())# 生成回答return chain.invoke({"context": context, "question": question})# 示例
response = answer_with_qwen_multi_similarity("Elasticsearch的主要优势是什么?")
print(response)
3. 高级功能实现
3.1 实现混合搜索策略
def enhanced_hybrid_search(query, es_client, index_name, embedding_model, similarity_type="cosine"):# 生成查询向量query_vector = embedding_model.embed_query(query)# 根据不同相似度类型构建脚本if similarity_type == "cosine":similarity_script = "cosineSimilarity(params.query_vector, 'content_vector') + 1.0"elif similarity_type == "dot_product":similarity_script = "dotProduct(params.query_vector, 'content_vector')"elif similarity_type == "l2_norm":# 对于L2范数,较小的值表示更相似,所以我们使用负值或倒数similarity_script = "1 / (1 + l2Norm(params.query_vector, 'content_vector'))"else:raise ValueError(f"不支持的相似度类型: {similarity_type}")# 构建混合查询hybrid_query = {"query": {"bool": {"should": [# 向量搜索部分{"script_score": {"query": {"match_all": {}},"script": {"source": similarity_script,"params": {"query_vector": query_vector}}}},# 文本搜索部分{"match": {"content": {"query": query,"boost": 0.5 # 调整文本搜索的权重}}}]}},"size": 5}# 执行搜索response = es_client.search(index=index_name, body=hybrid_query)# 处理结果results = []for hit in response["hits"]["hits"]:results.append({"score": hit["_score"],"title": hit["_source"]["title"],"content": hit["_source"]["content"],"source": hit["_source"]["source"]})return results
3.2 实现自适应相似度选择
def adaptive_similarity_search(query, es_client, embedding_model):# 分析查询特征,决定使用哪种相似度方法query_length = len(query.split())# 短查询(1-3个词)可能更适合关键词搜索if query_length <= 3:# 使用混合搜索,但给文本搜索更高权重results = enhanced_hybrid_search(query, es_client, "rag-knowledge-cosine", embedding_model,similarity_type="cosine")# 中等长度查询(4-10个词)elif query_length <= 10:# 使用余弦相似度,适合一般语义搜索results = enhanced_hybrid_search(query, es_client, "rag-knowledge-cosine", embedding_model,similarity_type="cosine")# 长查询(>10个词)else:# 对于长查询,点积可能更适合捕捉更多语义信息results = enhanced_hybrid_search(query, es_client, "rag-knowledge-dot-product", embedding_model,similarity_type="dot_product")return results
3.3 实现多步检索策略
def multi_step_retrieval(query, es_client, embedding_model):# 第一步:使用文本搜索获取初步结果text_query = {"query": {"match": {"content": query}},"size": 20}text_response = es_client.search(index="rag-knowledge-cosine", body=text_query)# 提取文档ID和内容candidate_docs = []for hit in text_response["hits"]["hits"]:candidate_docs.append({"id": hit["_id"],"content": hit["_source"]["content"]})# 如果文本搜索没有足够结果,添加向量搜索if len(candidate_docs) < 10:# 生成查询向量query_vector = embedding_model.embed_query(query)# 向量搜索vector_query = {"query": {"script_score": {"query": {"match_all": {}},"script": {"source": "cosineSimilarity(params.query_vector, 'content_vector') + 1.0","params": {"query_vector": query_vector}}}},"size": 20 - len(candidate_docs)}vector_response = es_client.search(index="rag-knowledge-cosine", body=vector_query)# 添加到候选文档for hit in vector_response["hits"]["hits"]:# 避免重复if not any(doc["id"] == hit["_id"] for doc in candidate_docs):candidate_docs.append({"id": hit["_id"],"content": hit["_source"]["content"]})# 第二步:对候选文档进行重新排序# 生成查询向量query_vector = embedding_model.embed_query(query)# 为每个候选文档计算相似度分数for doc in candidate_docs:# 生成文档向量doc_vector = embedding_model.embed_query(doc["content"])# 计算余弦相似度similarity = cosine_similarity([query_vector],[doc_vector])[0][0]doc["similarity_score"] = similarity# 按相似度排序ranked_docs = sorted(candidate_docs, key=lambda x: x["similarity_score"], reverse=True)# 返回前5个最相关文档return ranked_docs[:5]
3.4 实现查询扩展和改写
def query_expansion(original_query, llm):# 使用大模型扩展查询expansion_prompt = f"""请基于以下原始查询,生成3个不同的扩展查询,以便更全面地搜索相关信息。每个扩展查询应该保留原始查询的核心意图,但可以添加相关术语、同义词或上下文。原始查询: {original_query}扩展查询(每行一个):"""# 获取扩展查询response = llm.invoke(expansion_prompt)# 解析响应expanded_queries = [q.strip() for q in response.split("\n") if q.strip()]# 过滤掉可能的空行或无关行expanded_queries = [q for q in expanded_queries if len(q) > 5]# 确保至少有一个查询(原始查询)if not expanded_queries:expanded_queries = [original_query]else:# 添加原始查询expanded_queries.append(original_query)return expanded_queriesdef multi_query_retrieval(original_query, es_client, embedding_model, llm):# 扩展查询expanded_queries = query_expansion(original_query, llm)# 对每个扩展查询执行搜索all_results = []for query in expanded_queries:results = enhanced_hybrid_search(query, es_client, "rag-knowledge-cosine", embedding_model)all_results.extend(results)# 去重unique_results = []seen_contents = set()for result in all_results:if result["content"] not in seen_contents:unique_results.append(result)seen_contents.add(result["content"])# 重新排序(可以基于原始查询的相似度)query_vector = embedding_model.embed_query(original_query)# 为每个结果计算与原始查询的相似度for result in unique_results:# 假设我们可以从ES获取内容的向量content_vector_query = {"query": {"term": {"_id": result["id"]}},"_source": ["content_vector"]}vector_response = es_client.search(index="rag-knowledge-cosine", body=content_vector_query)content_vector = vector_response["hits"]["hits"][0]["_source"]["content_vector"]# 计算相似度similarity = cosine_similarity([query_vector],[content_vector])[0][0]result["final_score"] = similarity# 按最终分数排序ranked_results = sorted(unique_results, key=lambda x: x["final_score"], reverse=True)# 返回前5个最相关结果return ranked_results[:5]
六、总结与结论
1. Elasticsearch 作为 RAG 向量数据库的优势总结
-
1. 全栈解决方案:Elasticsearch 提供了从数据索引到检索的完整解决方案,无需集成多个系统。
-
2. 混合检索能力:结合传统文本搜索和向量搜索的能力是其最大优势,能够显著提高检索质量。
-
3. 成熟的生态系统:作为成熟的搜索引擎,拥有丰富的文档、工具和社区支持。
-
4. 多模态支持:能够存储和检索文本、图像、音频和视频的向量表示,支持多模态 RAG 应用。
-
5. 灵活的相似度计算:支持多种相似度计算方法(余弦、点积、L2 范数),适应不同应用场景。
2. 实施建议
-
1. 选择合适的相似度计算方法:
- 文本语义搜索:优先使用余弦相似度
- 推荐系统:考虑使用点积
- 图像或地理位置搜索:考虑使用 L2 范数
-
2. 优化索引配置:
- 根据数据量和查询频率调整分片数量
- 为频繁查询的索引配置足够的副本
- 合理设置 HNSW 参数,平衡搜索精度和性能
-
3. 实现混合检索策略:
- 结合向量搜索和关键词搜索
- 根据查询特征动态调整搜索策略
- 考虑使用查询扩展和多步检索提高召回率
- 4. 性能优化:
- 对于大规模部署,考虑使用专用节点
- 监控和调整 JVM 堆大小
- 定期优化索引
3. 未来发展方向
-
1. 与专用向量数据库的集成:
- 考虑 Elasticsearch 与专用向量数据库(如 FAISS、Milvus)的混合架构
- 利用 ES 的文本处理和混合查询能力,结合专用向量数据库的高性能向量搜索
-
2. 知识图谱增强:
- 将向量搜索与知识图谱结合,实现 GraphRAG
- 利用实体关系增强检索结果的相关性和可解释性
-
3. 多模态 RAG 应用:
- 扩展到图像、音频和视频内容的检索增强生成
- 实现跨模态检索,如以文搜图、以图搜文
- 4. 自适应检索策略:
- 开发能够根据查询特征和上下文自动选择最佳检索策略的系统
- 利用强化学习优化检索参数
4. 结论
Elasticsearch 作为 RAG 系统的向量数据库具有显著优势,特别是在需要混合检索、多模态支持和成熟生态系统的场景下。虽然在纯向量搜索性能上可能不如专用向量数据库,但其全面的功能和灵活性使其成为构建企业级 RAG 系统的有力选择。
通过合理配置和优化,结合本报告提供的实现方案,可以充分发挥 Elasticsearch 在 RAG 系统中的潜力,为大模型应用提供高质量的知识检索支持。
对于需要构建 RAG 系统的团队,建议根据具体需求和现有技术栈选择合适的向量存储解决方案,Elasticsearch 特别适合已有 ES 基础设施、需要混合检索能力以及处理多模态数据的场景。
【大模型介绍电子书】
快速揭秘DeepSeek背后的AI工作原理
要获取本书全文PDF内容,请在VX后台留言:“AI大模型基础” 或者 “大模型基础” 就会获得电子书的PDF。