1. 流式输出和非流失输出:
大模型的流式输出(Streaming Output)和非流式输出(Non-streaming Output)是指在生成文本或其他输出时,如何将结果返回给用户或下游系统。
流式输出 (Streaming Output):流式输出意味着模型生成的内容是逐步、逐块返回的,而不是等到整个生成过程完成后再一次性返回所有内容。
优点:
用户可以更早地开始看到部分结果,提高用户体验。
对于长时间生成任务,可以减少内存占用,因为不需要一次性存储完整的输出。
更适合实时应用,如在线聊天机器人,其中即时反馈是非常重要的。
应用场景:适用于需要即时响应的场景,比如对话系统、直播翻译等。
非流式输出 (Non-streaming Output):非流式输出是指模型在完成整个生成过程后,一次性返回全部生成的结果。
**优点:
实现简单,易于处理和调试。
在某些情况下,可能更适合那些需要对完整输出进行后续处理的应用。
应用场景:适用于那些不依赖即时反馈、或者需要对整个输出进行整体处理的场景,例如批量文本生成、文档摘要等。
2. 流式输出、非流失输出和vllm的同步、异步关系
对于vllm同步:无论是流式还是非流式输出,vllm的LLM函数创建的模型对象通常以同步的方式工作,处理多并发情况时只能以队列形式一个个输出。对于非流式输出,它会阻塞直到生成完成并返回结果;对于流式输出,它也可以逐步返回数据给前端,但这是假流式,因为后端以及把所有的文本都输出了,然后我们又把文本一个个传给前端。
对vllm异步:异步引擎同样可以支持流式和非流式输出,但它允许你以非阻塞的方式处理这些输出。你可以启动一个生成任务而不等待它完成,然后根据需要逐步获取流式输出,或者在任务完成后一次性获取非流式输出(也是并发状态)。这为高并发环境下的应用提供了更好的性能和灵活性。
总结:流式输出和非流式输出关注的是输出的传输方式,而AsyncLLMEngine和LLM则更多地涉及到执行模式(同步 vs 异步)。两者可以组合使用,例如,你可以使用AsyncLLMEngine来异步地处理流式输出,从而在高并发环境中获得最佳性能和用户体验。
fastapi_25">3. fastapi流式响应代码
from fastapi import FastAPI
import uvicorn
import os
import json
import time
from starlette.responses import StreamingResponse
# from fastapi.responses import StreamingResponse# 创建一个FastAPI应用程序实例
app = FastAPI()@app.post("/api")
def aaa():# StreamingResponse是一个提供的用于包装流式响应的类,必须以json字符串进行数据传递# starlette.responses和fastapi.responses中的StreamingResponse对象实现方式基本类似。return StreamingResponse(handle_post_request())def handle_post_request():for i in range(5):print(i)time.sleep(1)yield json.dumps({'type': i}) + '\n'if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("API_PORT", 7777)), workers=1)
开启上述api后,使用以下代码发送请求:
import json
import requests# url_api = "http://10.4.0.141:800h1/cat/stream"
url_api = "http://localhost:7777/api"# 此调用方法为推荐方法。也可直接request.post发送请求,见https://editor.csdn.net/md/?articleId=144560081
with requests.post(url_api, stream=True) as r:r.raise_for_status() # 检查请求是否成功print(r.iter_lines())for line in r.iter_lines():if line: # 过滤掉保持连接的空行print(json.loads(line.decode('utf-8')))
4. LLM流式输出代码(非接口型,仅后端)
普通后端的流式输出使用TextStreamer(是同步的,不适合做api的流式响应)
from modelscope import AutoModelForCausalLM, AutoTokenizer
from transformers import TextStreamer
model_name = "地址"model = AutoModelForCausalLM.from_pretrained(model_name,torch_dtype="auto",device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)prompt = "你是谁"
messages = [{"role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant."},{"role": "user", "content": prompt}
]
text = tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)# 非流输出
# generated_ids = model.generate(
# **model_inputs,
# max_new_tokens=1024
# )# generated_ids = [
# output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
# ]# response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
# print(response)# 流输出,普通后端的流式输出使用TextStreamer,这是同步的,不适合于api交互做流式响应,必须使用TextIteratorStreamer,它是异步的
streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
generated_ids = model.generate(**model_inputs,max_new_tokens=512,streamer=streamer,
)
5. LLM流式响应代码(接口型FastAPI)
API的流式输出使用TextIteratorStreamer(是异步的,适合做api的流式响应)
from fastapi import FastAPI
import uvicorn
import os
import json
from starlette.responses import StreamingResponse
# from fastapi.responses import StreamingResponse
from transformers import TextIteratorStreamer
from threading import Thread
from modelscope import AutoModelForCausalLM, AutoTokenizer# 创建一个FastAPI应用程序实例
app = FastAPI()model_name = "地址"model = AutoModelForCausalLM.from_pretrained(model_name,torch_dtype="auto",device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)prompt = "写一篇800字的作文"
messages = [{"role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant."},{"role": "user", "content": prompt}
]
text = tokenizer.apply_chat_template(messages,tokenize=False,add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)@app.post("/api")
def aaa():# StreamingResponse包装的是可迭代对象return StreamingResponse(handle_post_request())def handle_post_request():# TextIteratorStreamer为异步。skip_prompt=True, skip_special_tokens=True可以去除输出中的|im_start|等标记streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True)generation_kwargs = {"max_new_tokens": 1024, # 或者任何其他生成参数"streamer": streamer,}thread = Thread(target=model.generate, kwargs={**model_inputs, **generation_kwargs})thread.start()answer = ''for new_text in streamer:answer += new_textprint(answer)yield json.dumps({'content': answer}) + '\n'if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("API_PORT", 7777)), workers=1)
前端的调用为
import json
import requests# url_api = "http://10.4.0.141:800h1/cat/stream"
url_api = "http://localhost:7777/api"with requests.post(url_api, stream=True) as r:r.raise_for_status() # 检查请求是否成功print(r.iter_lines())for line in r.iter_lines():if line: # 过滤掉保持连接的空行print(json.loads(line.decode('utf-8')))
6. 注意
非流式响应:fastapi之间的通信一般都是json对象的形式,发送请求时请求体为{},后端返回请求时的响应体也为{}。
流式响应:fastapi之间的通信一般都是json对象字符串格式的形式。使用json.dumps()将 str或者python对象(basemodel或者dict)转为json字符串格式。且流式响应一定要使用StreamingResponse对象包装