DeepSeek+RAG局域网部署

embedded/2025/3/26 1:22:47/

已经有很多平台集成RAG模式,dify,cherrystudio等,这里通过AI辅助,用DS的API实现一个简单的RAG部署。框架主要技术栈是Chroma,langchain,streamlit,答案流式输出,并且对答案加上索引。支持doc,docx,pdf,txt。

RAG

import os
import streamlit as st
import chromadb
import fitz  # PyMuPDF
import pypandoc  # DOC解析
from docx import Document
from typing import List, Dict, Any, Generator
from langchain_chroma import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.llms import BaseLLM
from pydantic import Field, BaseModel
from openai import OpenAI
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_core.outputs import LLMResult, Generation
import shutil
import re
from tenacity import retry, wait_exponential, stop_after_attempt# 每次运行会将chroma_db删除,就要重新构建知识库。
# shutil.rmtree("./chroma_db", ignore_errors=True)# 自定义支持流式输出的DeepSeek LLM类
class DeepSeekLLM(BaseLLM, BaseModel):api_key: str = Field(..., description="DeepSeek API密钥")base_url: str = Field(..., description="API基础地址")@retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(3))def _stream_call(self, prompt: str, stop: List[str] = None) -> Generator[str, None, None]:client = OpenAI(api_key=self.api_key, base_url=self.base_url)response = client.chat.completions.create(model="deepseek-reasoner",messages=[{"role": "system", "content": "你是各领域资深专家"},{"role": "user", "content": prompt},],temperature=0.3,stream=True  # 启用流式输出)for chunk in response:if chunk.choices and chunk.choices[0].delta.content:yield chunk.choices[0].delta.contentdef _call(self, prompt: str, stop: List[str] = None) -> str:return "".join(self._stream_call(prompt, stop))def _generate(self, prompts: List[str], stop: List[str] = None) -> LLMResult:generations = []for prompt in prompts:stream_output = list(self._stream_call(prompt, stop))generations.append([Generation(text="".join(stream_output))])return LLMResult(generations=generations)@propertydef _llm_type(self) -> str:return "deepseek-legal-llm"@propertydef _identifying_params(self) -> Dict[str, Any]:return {"api_key": self.api_key, "base_url": self.base_url}# 配置中文小模型(约300MB)
embeddings_bge = HuggingFaceEmbeddings(model_name="./bge-large-zh-v1.5"# model_kwargs={#     'device': 'cpu'  # 强制使用CPU,避免CUDA依赖# }
)
test_embedding = embeddings_bge.embed_query("测试")
if not test_embedding or len(test_embedding) == 0:st.error("Embedding 生成失败,请检查本地模型路径是否正确!")raise RuntimeError("Embedding 生成失败")# API配置
DEEPSEEK_API_KEY = "你的密钥"
DEEPSEEK_BASE_URL = "https://api.deepseek.com"# 初始化组件
deepseek_llm = DeepSeekLLM(api_key=DEEPSEEK_API_KEY, base_url=DEEPSEEK_BASE_URL)# 文档处理函数
@st.cache_data # 在文件上传逻辑添加缓存(避免重复处理)
def process_document(file) -> str:"""支持PDF/DOC/DOCX/TXT的解析"""if file.name.endswith(".pdf"):with fitz.open(stream=file.read()) as doc:return "\n".join([page.get_text() for page in doc])elif file.name.endswith(".docx"):return "\n".join([p.text for p in Document(file).paragraphs])elif file.name.endswith(".doc"):return pypandoc.convert_text(file.read(), 'plain', format='doc')elif file.name.endswith(".txt"):return file.read().decode()return ""# RAG处理流程
# 修改后的流式RAG处理流程
def build_streaming_retrieval_chain():vectorstore = Chroma(collection_name="legal_docs",embedding_function=embeddings_bge,persist_directory="./chroma_db")return RetrievalQA.from_chain_type(llm=deepseek_llm,chain_type="stuff",retriever=vectorstore.as_retriever(ssearch_kwargs={"k": 3,"filter": {"metadata_field": {"$gte": 0.7}}}), # 返回Top3且相似度>0.7return_source_documents=True)# Streamlit界面
st.title("DeepSeek-RAG系统")# 文件上传处理
uploaded_files = st.file_uploader("上传文件",type=["pdf", "doc", "docx", "txt"],accept_multiple_files=True)
if uploaded_files:text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)# 使用LangChain的Chroma添加文档vectorstore = Chroma(collection_name="legal_docs",embedding_function=embeddings_bge,persist_directory="./chroma_db")# if vectorstore._collection.count() == 0:#     st.warning("警告:Chroma 数据库为空,可能未成功加载任何数据")for file in uploaded_files:text = process_document(file)if not text.strip():  # 检查解析后文本是否为空st.error(f"文件 {file.name} 解析失败,跳过")continuechunks = text_splitter.split_text(text)if not chunks:  # 确保 chunks 不是空列表st.error(f"文件 {file.name} 无法进行文本切分,跳过")continuemetadatas = [{"source": file.name} for _ in chunks]  # 让每个 chunk 记录来源文件名vectorstore.add_texts(texts=chunks, metadatas=metadatas)  # 添加文本和元数据st.success(f"已成功加载{len(uploaded_files)}份文件")if query := st.text_input("请输入问题:"):# 初始化流式输出容器answer_container = st.empty()full_answer = ""source_docs = []# 创建QA链qa_chain = build_streaming_retrieval_chain()try:# 执行查询并获取流式响应result = qa_chain.invoke({"query": query})source_docs = result['source_documents']# 流式输出处理for token in deepseek_llm._stream_call(result['result']):full_answer += token# 实时更新显示(带光标效果)answer_container.markdown(f"**答案**:{full_answer}▌")# 最终显示完整答案answer_container.markdown(f"**答案**:{full_answer}")except Exception as e:st.error(f"生成中断: {str(e)}")full_answer += "\n\n(输出因错误中断)"answer_container.markdown(f"**答案**:{full_answer}")# 处理引用标注(修改后的版本)if full_answer and source_docs:doc_references = {}doc_counter = 1for doc in source_docs:source = doc.metadata.get('source', '未知来源')if source not in doc_references:doc_references[source] = str(doc_counter)doc_counter += 1# 智能分段与标注逻辑segmentation_patterns = [r'\n{2,}',r'\n(?=\d+[\.、])',r'\n(?=[•\-*□▶])',r'(?<=。|!|?)\s+(?=.)']split_regex = re.compile('|'.join(segmentation_patterns))paragraphs = [p.strip() for p in split_regex.split(full_answer) if p.strip()]formatted_paragraphs = []sorted_citation = "".join(f"[{num}]" for num insorted(doc_references.values(), key=lambda x: int(x)))for para in paragraphs:if re.search(r'[。!?]$', para) or '。' in para:formatted_para = para + f" {sorted_citation}"else:formatted_para = paraformatted_paragraphs.append(formatted_para)# 更新显示带引用的答案formatted_answer = '\n\n'.join(formatted_paragraphs)answer_container.markdown(f"**答案**:{formatted_answer}")# 显示来源文档st.subheader("依据文件")for source, num in sorted(doc_references.items(), key=lambda x: int(x[1])):st.markdown(f"[{num}] {source}")


http://www.ppmy.cn/embedded/176660.html

相关文章

《AI大模型开发笔记》企业RAG技术实战(二)

接上一篇 《AI大模型开发笔记》企业RAG技术实战(一)https://mp.csdn.net/mp_blog/creation/editor/146381354 使用llamaindex实例 https://docs.llamaindex.ai/en/stable/api_reference/ 环境配置 我们继续使用前面langchain例子的python虚环境,不用新建,激活就行 …

Vue3中router最佳封装落地

文章目录 前言一、拆分路由文件夹&#xff1f;二、main.ts中注册路由总结 前言 router在使用过程中如果我们直接在一个文件的一个数组中配置&#xff0c;最后路由越来越多会导致不易管理&#xff0c;我们可以将一个页面的路由配置在一个数组中最后统一导入&#xff0c;这样就会…

eclipse [jvm memory monitor] SHOW_MEMORY_MONITOR=true

eclipse虚拟机内存监控设置SHOW_MEMORY_MONITORtrue D:\eclipse-jee-oxygen-2-win32-x86_64\workspace\.metadata\.plugins\org.eclipse.core.runtime\.settings org.eclipse.ui.prefs (文件比较多&#xff0c;别找错了&#xff09; SHOW_MEMORY_MONITORtrue 重启 -xms 1024…

STL性能优化方法

STL&#xff08;Standard Template Library&#xff09;性能优化实战&#xff0c;涉及数据结构选择、内存管理、算法优化等多个方面。以下详细讲解STL性能优化方法&#xff0c;给出实践建议和典型场景&#xff1a; &#x1f4cc; 一、STL性能问题分析 STL性能瓶颈通常包括&…

k8s集群添加一个新GPU节点

前提 现在是已经搭建好一个GPU集群&#xff0c;需要添加一个新的节点&#xff08;3090卡&#xff09;&#xff0c;用来分担工作&#xff0c;大致可以分为以下几个部分&#xff1a; 1&#xff0c;安装GPU驱动2&#xff0c;安装docker3&#xff0c;安装cri-dockerd4&#xff0c…

操作系统导论——第13章 抽象:地址空间

一、早期系统 从内存来看&#xff0c;早期的机器并没有提供多少抽象给用户。基本上&#xff0c;机器的物理内存如图13.1所示 操作系统曾经是一组函数&#xff08;实际上是一个库&#xff09;&#xff0c;在内存中&#xff08;在本例中&#xff0c;从物理地址0开始&#xff09;&…

Postman高级功能深度解析:Mock Server与自动化监控——构建高效API测试与监控体系

引言&#xff1a;Postman在API开发中的核心价值 在数字化时代&#xff0c;API&#xff08;应用程序编程接口&#xff09;已成为系统间交互的“神经网络”&#xff0c;其质量直接影响用户体验与业务连续性。然而&#xff0c;传统API测试面临两大挑战&#xff1a; 开发阶段依赖…

网络安全威胁与防护措施(下)

8. 恶意软件&#xff08;Malware&#xff09; **恶意软件&#xff08;Malware&#xff0c;Malicious Software&#xff09;**是指旨在通过破坏、破坏或未经授权访问计算机系统、网络或设备的程序或代码。恶意软件通常用于窃取敏感信息、破坏系统、窃取资源、干扰正常操作&…