How can I stream a response from LangChain‘s OpenAI using Flask API?

devtools/2024/9/23 21:17:19/

题意:怎样在 Flask API 中使用 LangChain 的 OpenAI 模型流式传输响应

问题背景:

I am using Python Flask app for chat over data. In the console I am getting streamable response directly from the OpenAI since I can enable streming with a flag streaming=True.

我正在使用 Python Flask 应用程序进行数据聊天。在控制台中,我直接从 OpenAI 获取流式响应,因为我可以通过设置 `streaming=True` 来启用流式传输。

The problem is, that I can't "forward" the stream or "show" the stream than in my API call.

问题是,我无法在 API 调用中“转发”或“显示”这个流式响应。

Code for the processing OpenAI and chain is:

处理 OpenAI 和链的代码如下:

python">def askQuestion(self, collection_id, question):collection_name = "collection-" + str(collection_id)self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,  output_key='answer')chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),return_source_documents=True,verbose=VERBOSE, memory=self.memory)result = self.chain({"question": question})res_dict = {"answer": result["answer"],}res_dict["source_documents"] = []for source in result["source_documents"]:res_dict["source_documents"].append({"page_content": source.page_content,"metadata":  source.metadata})return res_dict

and the API route code:        以及 API 路由的代码:

python">@app.route("/collection/<int:collection_id>/ask_question", methods=["POST"])
def ask_question(collection_id):question = request.form["question"]# response_generator = document_thread.askQuestion(collection_id, question)# return jsonify(response_generator)def stream(question):completion = document_thread.askQuestion(collection_id, question)for line in completion['answer']:yield line

I am testing my endpoint with curl and I am passing flag -N to curl, so I should get the streamable response, if it is possible.

我正在使用 curl 测试我的端点,并传递了 `-N` 标志,因此如果可能的话,我应该能得到流式响应。

When I make API call first the endpoint is waiting to process the data (I can see in my terminal in VS code the streamable answer) and when finished, I get everything displayed in one go.

当我发起 API 调用时,端点首先等待处理数据(我可以在 VS Code 的终端中看到流式的回答),处理完成后,所有内容一次性显示出来。

问题解决:

With the usage of threading and callback we can have a streaming response from flask API.

通过使用 `threading` 和 `callback`,我们可以在 Flask API 中实现流式响应。

In flask API, you may create a queue to register tokens through langchain's callback.

在 Flask API 中,可以创建一个队列,通过 LangChain 的回调函数来注册令牌。

class StreamingHandler(BaseCallbackHandler):...def on_llm_new_token(self, token: str, **kwargs) -> None:self.queue.put(token)

You may get tokens from the same queue in your flask route.

你可以在 Flask 路由中从同一个队列获取令牌。

from flask import Response, stream_with_context
import threading @app.route(....):
def stream_output():q = Queue()def generate(rq: Queue):...# add your logic to prevent while loop# to run indefinitely  while( ...):yield rq.get()callback_fn = StreamingHandler(q)threading.Thread(target= askQuestion, args=(collection_id, question, callback_fn))return Response(stream_with_context(generate(q))

In your langchain's ChatOpenAI add the above custom callback StreamingHandler.

在你的 LangChain 的 `ChatOpenAI` 中添加上述自定义回调 `StreamingHandler`。

self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback=[callback_fn,]
)

For reference:        参考如下

  • https://python.langchain.com/en/latest/modules/callbacks/getting_started.html#creating-a-custom-handler
  • Streaming Contents — Flask Documentation (2.3.x)


http://www.ppmy.cn/devtools/116177.html

相关文章

前端——JavaScript练习 做一个todoList

用前端制作一个todoList的表格&#xff0c;实现更新、删除、修改等功能。 涉及几个知识点&#xff1a; 设置最小高度&#xff08;宽度&#xff09;&#xff1a; .container{min-width: 350px;/* 最小宽度 最小不会小于210px */ } 去掉外轮廓 outline: none; 去除字…

CRC校验的生成多项式如何转换为移位寄存器

CRC校验的生成多项式转换为移位寄存器主要涉及到线性反馈移位寄存器&#xff08;LFSR&#xff09;的应用。LFSR是内测试电路中最基本的标准模块结构&#xff0c;它既可以用于产生伪随机测试码&#xff0c;也可以用于CRC校验码的生成。以下是详细的转换过程&#xff1a; 1. 理解…

专题·大模型安全 | 生成式人工智能的内容安全风险与应对策略

正如一枚硬币的两面&#xff0c;生成式人工智能大模型&#xff08;以下简称“生成式大模型”&#xff09;在助力内容生成的同时也潜藏风险&#xff0c;成为虚假信息传播、数据隐私泄露等问题的温床&#xff0c;加剧了认知域风险。与传统人工智能&#xff08;AI&#xff09;相比…

【软件测试】--xswitch将请求代理到测试桩

背景 在做软件测试的过程中&#xff0c;经常会遇见需要后端返回特定的响应数据&#xff0c;这个时候就需要用到测试桩&#xff0c;进行mock测试。 测试工程师在本地模拟后端返回数据时&#xff0c;需要将前端请求数据代理到本地&#xff0c;本文介绍xswitch插件代理请求到flas…

Flyway 基本概念

Flyway 基本概念详解 Flyway 是一款非常流行的数据库版本控制工具&#xff0c;专为管理数据库的变更而设计。它帮助开发者在项目开发过程中自动管理数据库的迁移与版本控制&#xff0c;确保数据库结构的变更和代码版本一致。Flyway 可以自动执行 SQL 脚本或 Java 代码来管理数…

ITOP-2 分模块安装部署itop

ITOP-2 分模块安装部署itop 一、安装PHP组件1、查看当前Linux服务器安装的PHP版本2、安装源epel&#xff0c;安装源remi&#xff0c;安装yum-config-manager3、用yum-config-manager指定remi的php7.2仓库4、安装升级php5、验证当前PHP的版本 二、部署 MySQL 服务1、设置 Repo2、…

学习使用SQL Server Management Studio (SSMS)

SQL Server Management Studio (SSMS) 是一个集成环境&#xff0c;用于管理任何SQL基础设施&#xff0c;从SQL Server到Azure SQL数据库。SSMS提供了各种工具来配置、监控和管理SQL Server的实体和组件。以下是一篇详细的使用指南&#xff0c;涵盖了SSMS的主要功能和操作。 1.…

《简单的逻辑学》

BEING LOGICAL 区分客观事实和主观观念 多数人的认知都来源于主观世界&#xff08;事实在大脑中的映射&#xff09;&#xff0c;很少有人探寻客观事实&#xff0c;所以有许多人在倡导&#xff0c;看待问题要探寻本质 多数命题的存在&#xff0c;都是有前提条件的 【黑白分明…