已经有很多平台集成RAG模式,dify,cherrystudio等,这里通过AI辅助,用DS的API实现一个简单的RAG部署。框架主要技术栈是Chroma,langchain,streamlit,答案流式输出,并且对答案加上索引。支持doc,docx,pdf,txt。
import os
import streamlit as st
import chromadb
import fitz # PyMuPDF
import pypandoc # DOC解析
from docx import Document
from typing import List, Dict, Any, Generator
from langchain_chroma import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.llms import BaseLLM
from pydantic import Field, BaseModel
from openai import OpenAI
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_core.outputs import LLMResult, Generation
import shutil
import re
from tenacity import retry, wait_exponential, stop_after_attempt# 每次运行会将chroma_db删除,就要重新构建知识库。
# shutil.rmtree("./chroma_db", ignore_errors=True)# 自定义支持流式输出的DeepSeek LLM类
class DeepSeekLLM(BaseLLM, BaseModel):api_key: str = Field(..., description="DeepSeek API密钥")base_url: str = Field(..., description="API基础地址")@retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(3))def _stream_call(self, prompt: str, stop: List[str] = None) -> Generator[str, None, None]:client = OpenAI(api_key=self.api_key, base_url=self.base_url)response = client.chat.completions.create(model="deepseek-reasoner",messages=[{"role": "system", "content": "你是各领域资深专家"},{"role": "user", "content": prompt},],temperature=0.3,stream=True # 启用流式输出)for chunk in response:if chunk.choices and chunk.choices[0].delta.content:yield chunk.choices[0].delta.contentdef _call(self, prompt: str, stop: List[str] = None) -> str:return "".join(self._stream_call(prompt, stop))def _generate(self, prompts: List[str], stop: List[str] = None) -> LLMResult:generations = []for prompt in prompts:stream_output = list(self._stream_call(prompt, stop))generations.append([Generation(text="".join(stream_output))])return LLMResult(generations=generations)@propertydef _llm_type(self) -> str:return "deepseek-legal-llm"@propertydef _identifying_params(self) -> Dict[str, Any]:return {"api_key": self.api_key, "base_url": self.base_url}# 配置中文小模型(约300MB)
embeddings_bge = HuggingFaceEmbeddings(model_name="./bge-large-zh-v1.5"# model_kwargs={# 'device': 'cpu' # 强制使用CPU,避免CUDA依赖# }
)
test_embedding = embeddings_bge.embed_query("测试")
if not test_embedding or len(test_embedding) == 0:st.error("Embedding 生成失败,请检查本地模型路径是否正确!")raise RuntimeError("Embedding 生成失败")# API配置
DEEPSEEK_API_KEY = "你的密钥"
DEEPSEEK_BASE_URL = "https://api.deepseek.com"# 初始化组件
deepseek_llm = DeepSeekLLM(api_key=DEEPSEEK_API_KEY, base_url=DEEPSEEK_BASE_URL)# 文档处理函数
@st.cache_data # 在文件上传逻辑添加缓存(避免重复处理)
def process_document(file) -> str:"""支持PDF/DOC/DOCX/TXT的解析"""if file.name.endswith(".pdf"):with fitz.open(stream=file.read()) as doc:return "\n".join([page.get_text() for page in doc])elif file.name.endswith(".docx"):return "\n".join([p.text for p in Document(file).paragraphs])elif file.name.endswith(".doc"):return pypandoc.convert_text(file.read(), 'plain', format='doc')elif file.name.endswith(".txt"):return file.read().decode()return ""# RAG处理流程
# 修改后的流式RAG处理流程
def build_streaming_retrieval_chain():vectorstore = Chroma(collection_name="legal_docs",embedding_function=embeddings_bge,persist_directory="./chroma_db")return RetrievalQA.from_chain_type(llm=deepseek_llm,chain_type="stuff",retriever=vectorstore.as_retriever(ssearch_kwargs={"k": 3,"filter": {"metadata_field": {"$gte": 0.7}}}), # 返回Top3且相似度>0.7return_source_documents=True)# Streamlit界面
st.title("DeepSeek-RAG系统")# 文件上传处理
uploaded_files = st.file_uploader("上传文件",type=["pdf", "doc", "docx", "txt"],accept_multiple_files=True)
if uploaded_files:text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)# 使用LangChain的Chroma添加文档vectorstore = Chroma(collection_name="legal_docs",embedding_function=embeddings_bge,persist_directory="./chroma_db")# if vectorstore._collection.count() == 0:# st.warning("警告:Chroma 数据库为空,可能未成功加载任何数据")for file in uploaded_files:text = process_document(file)if not text.strip(): # 检查解析后文本是否为空st.error(f"文件 {file.name} 解析失败,跳过")continuechunks = text_splitter.split_text(text)if not chunks: # 确保 chunks 不是空列表st.error(f"文件 {file.name} 无法进行文本切分,跳过")continuemetadatas = [{"source": file.name} for _ in chunks] # 让每个 chunk 记录来源文件名vectorstore.add_texts(texts=chunks, metadatas=metadatas) # 添加文本和元数据st.success(f"已成功加载{len(uploaded_files)}份文件")if query := st.text_input("请输入问题:"):# 初始化流式输出容器answer_container = st.empty()full_answer = ""source_docs = []# 创建QA链qa_chain = build_streaming_retrieval_chain()try:# 执行查询并获取流式响应result = qa_chain.invoke({"query": query})source_docs = result['source_documents']# 流式输出处理for token in deepseek_llm._stream_call(result['result']):full_answer += token# 实时更新显示(带光标效果)answer_container.markdown(f"**答案**:{full_answer}▌")# 最终显示完整答案answer_container.markdown(f"**答案**:{full_answer}")except Exception as e:st.error(f"生成中断: {str(e)}")full_answer += "\n\n(输出因错误中断)"answer_container.markdown(f"**答案**:{full_answer}")# 处理引用标注(修改后的版本)if full_answer and source_docs:doc_references = {}doc_counter = 1for doc in source_docs:source = doc.metadata.get('source', '未知来源')if source not in doc_references:doc_references[source] = str(doc_counter)doc_counter += 1# 智能分段与标注逻辑segmentation_patterns = [r'\n{2,}',r'\n(?=\d+[\.、])',r'\n(?=[•\-*□▶])',r'(?<=。|!|?)\s+(?=.)']split_regex = re.compile('|'.join(segmentation_patterns))paragraphs = [p.strip() for p in split_regex.split(full_answer) if p.strip()]formatted_paragraphs = []sorted_citation = "".join(f"[{num}]" for num insorted(doc_references.values(), key=lambda x: int(x)))for para in paragraphs:if re.search(r'[。!?]$', para) or '。' in para:formatted_para = para + f" {sorted_citation}"else:formatted_para = paraformatted_paragraphs.append(formatted_para)# 更新显示带引用的答案formatted_answer = '\n\n'.join(formatted_paragraphs)answer_container.markdown(f"**答案**:{formatted_answer}")# 显示来源文档st.subheader("依据文件")for source, num in sorted(doc_references.items(), key=lambda x: int(x[1])):st.markdown(f"[{num}] {source}")