TinyRAG
接下来我会带领大家一步一步的实现一个简单的RAG模型,这个模型是基于RAG的一个简化版本,我们称之为Tiny-RAG。Tiny-RAG是一个基于RAG的简化版本,它只包含了RAG的核心功能,即Retrieval和Generation。Tiny-RAG的目的是为了帮助大家更好的理解RAG模型的原理和实现。
OK,让我们开始吧!
1. RAG 介绍
LLM会产生误导性的 “幻觉”,依赖的信息可能过时,处理特定知识时效率不高,缺乏专业领域的深度洞察,同时在推理能力上也有所欠缺。
正是在这样的背景下,检索增强生成技术(Retrieval-Augmented Generation,RAG)应时而生,成为 AI 时代的一大趋势。
RAG 通过在语言模型生成答案之前,先从广泛的文档数据库中检索相关信息,然后利用这些信息来引导生成过程,极大地提升了内容的准确性和相关性。RAG 有效地缓解了幻觉问题,提高了知识更新的速度,并增强了内容生成的可追溯性,使得大型语言模型在实际应用中变得更加实用和可信。
RAG的基本结构有哪些呢?
- 要有一个向量化模块,用来将文档片段向量化。
- 要有一个文档加载和切分的模块,用来加载文档并切分成文档片段。
- 要有一个数据库来存放文档片段和对应的向量表示。
- 要有一个检索模块,用来根据 Query (问题)检索相关的文档片段。
- 要有一个大模型模块,用来根据检索出来的文档回答用户的问题。
那接下来,让我们梳理一下 RAG 的流程是什么样的呢?
- 索引:将文档库分割成较短的 Chunk,并通过编码器构建向量索引。
- 检索:根据问题和 chunks 的相似度检索相关文档片段。
- 生成:以检索到的上下文为条件,生成问题的回答。
2. 向量化
首先实现一个向量化的类,这是RAG架构的基础。向量化的类主要是用来将文档片段向量化,将一段文本映射为一个向量。
那首先我们要设置一个Embedding基类,这样我们再用其他的模型的时候,只需要继承这个基类,然后在此基础上进行修改即可,方便代码扩展。
class BaseEmbeddings:"""Base class for embeddings"""def __init__(self, path: str, is_api: bool) -> None:self.path = pathself.is_api = is_apidef get_embedding(self, text: str, model: str) -> List[float]:raise NotImplementedError@classmethoddef cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float:"""calculate cosine similarity between two vectors"""dot_product = np.dot(vector1, vector2)magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)if not magnitude:return 0return dot_product / magnitude
观察一下BaseEmbeddings基类都有什么方法,首先有一个get_embedding方法,这个方法是用来获取文本的向量表示的,然后有一个cosine_similarity方法,这个方法是用来计算两个向量之间的余弦相似度的。其次在初始化类的时候设置了,模型的路径和是否是API模型。比如使用OpenAI的Embedding API的话就需要设置self.is_api=Ture。
继承BaseEmbeddings类的话,就只需要编写get_embedding方法即可,cosine_similarity方法会被继承下来,直接用就行。这就是编写基类的好处。
class OpenAIEmbedding(BaseEmbeddings):"""class for OpenAI embeddings"""def __init__(self, path: str = '', is_api: bool = True) -> None:super().__init__(path, is_api)if self.is_api:from openai import OpenAIself.client = OpenAI()self.client.api_key = os.getenv("OPENAI_API_KEY")self.client.base_url = os.getenv("OPENAI_BASE_URL")def get_embedding(self, text: str, model: str = "text-embedding-3-large") -> List[float]:if self.is_api:text = text.replace("\n", " ")return self.client.embeddings.create(input=[text], model=model).data[0].embeddingelse:raise NotImplementedError
3. 文档加载和切分
接下来我们来实现一个文档加载和切分的类,这个类主要是用来加载文档并切分成文档片段。
那我们都需要切分什么文档呢?这个文档可以是一篇文章,一本书,一段对话,一段代码等等。这个文档的内容可以是任何的,只要是文本就行。比如:pdf文件、md文件、txt文件等等。
def read_file_content(cls, file_path: str):# 根据文件扩展名选择读取方法if file_path.endswith('.pdf'):return cls.read_pdf(file_path)elif file_path.endswith('.md'):return cls.read_markdown(file_path)elif file_path.endswith('.txt'):return cls.read_text(file_path)else:raise ValueError("Unsupported file type")
那我们把文件内容都读取之后,还需要切分呀!那怎么切分呢,OK,接下来咱们就按 Token 的长度来切分文档。我们可以设置一个最大的 Token 长度,然后根据这个最大的 Token 长度来切分文档。这样切分出来的文档片段就是一个一个的差不多相同长度的文档片段了。
不过在切分的时候要注意,片段与片段之间最好要有一些重叠的内容,这样才能保证检索的时候能够检索到相关的文档片段。还有就是切分文档的时候最好以句子为单位,也就是按 \n
进行粗切分,这样可以基本保证句子内容是完整的。
def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):chunk_text = []curr_len = 0curr_chunk = ''lines = text.split('\n') # 假设以换行符分割文本为行for line in lines:line = line.replace(' ', '')line_len = len(enc.encode(line))if line_len > max_token_len:print('warning line_len = ', line_len)if curr_len + line_len <= max_token_len:curr_chunk += linecurr_chunk += '\n'curr_len += line_lencurr_len += 1else:chunk_text.append(curr_chunk)curr_chunk = curr_chunk[-cover_content:]+linecurr_len = line_len + cover_contentif curr_chunk:chunk_text.append(curr_chunk)return chunk_text
4. 数据库 && 向量检索
上面,我们做好了文档切分,也做好了 Embedding 模型的加载。那接下来就得设计一个向量数据库用来存放文档片段和对应的向量表示了。
还有就是也要设计一个检索模块,用来根据 Query (问题)检索相关的文档片段。OK,我们冲冲冲!
一个数据库对于最小RAG架构来说,需要实现几个功能呢?
persist:数据库持久化,本地保存
load_vector:从本地加载数据库
get_vector:获得文档的向量表示
query:根据问题检索相关的文档片段
class VectorStore:def __init__(self, document: List[str] = ['']) -> None:self.document = documentdef get_vector(self, EmbeddingModel: BaseEmbeddings) -> List[List[float]]:# 获得文档的向量表示passdef persist(self, path: str = 'storage'):# 数据库持久化,本地保存passdef load_vector(self, path: str = 'storage'):# 从本地加载数据库passdef query(self, query: str, EmbeddingModel: BaseEmbeddings, k: int = 1) -> List[str]:# 根据问题检索相关的文档片段pass
那让我们来看一下,query方法具体是怎么实现的呢?
首先先把用户提出的问题向量化,然后去数据库中检索相关的文档片段,最后返回检索到的文档片段。可以看到咱们在向量检索的时候仅使用Numpy进行加速,代码非常容易理解和修改。
主要是方便改写和大家理解,并没有使用成熟的数据库,这样可以更好的理解RAG的原理。
def query(self, query: str, EmbeddingModel: BaseEmbeddings, k: int = 1) -> List[str]:query_vector = EmbeddingModel.get_embedding(query)result = np.array([self.get_similarity(query_vector, vector)for vector in self.vectors])return np.array(self.document)[result.argsort()[-k:][::-1]].tolist()
5. 大模型模块
那就来到了最后一个模块了,大模型模块。这个模块主要是用来根据检索出来的文档回答用户的问题。
一样的,我们还是先实现一个基类,这样我们在遇到其他的自己感兴趣的模型就可以快速的扩展了。
class BaseModel:def __init__(self, path: str = '') -> None:self.path = pathdef chat(self, prompt: str, history: List[dict], content: str) -> str:passdef load_model(self):pass
BaseModel
包含了两个方法,chat
和load_model
,如果使用API模型,比如OpenAI的话,那就不需要load_model
方法,如果你要本地化运行的话,那还是会选择使用开源模型,那就需要load_model
方法发啦。
这里咱们以 InternLM2-chat-7B 模型为例
class InternLMChat(BaseModel):def __init__(self, path: str = '') -> None:super().__init__(path)self.load_model()def chat(self, prompt: str, history: List = [], content: str='') -> str:prompt = PROMPT_TEMPLATE['InternLM_PROMPT_TEMPALTE'].format(question=prompt, context=content)response, history = self.model.chat(self.tokenizer, prompt, history)return responsedef load_model(self):import torchfrom transformers import AutoTokenizer, AutoModelForCausalLMself.tokenizer = AutoTokenizer.from_pretrained(self.path, trust_remote_code=True)self.model = AutoModelForCausalLM.from_pretrained(self.path, torch_dtype=torch.float16, trust_remote_code=True).cuda()
可以用一个字典来保存所有的prompt,这样比较好维护。
PROMPT_TEMPLATE = dict(InternLM_PROMPT_TEMPALTE="""先对上下文进行内容总结,再使用上下文来回答用户的问题。如果你不知道答案,就说你不知道。总是使用中文回答。问题: {question}可参考的上下文:···{context}···如果给定的上下文无法让你做出回答,请回答数据库中没有这个内容,你不知道。有用的回答:"""
)
那这样的话,我们就可以利用InternLM2模型来做RAG啦!
6. LLM Tiny-RAG Demo
那接下来,我们就来看一下Tiny-RAG的Demo吧!
from RAG.VectorBase import VectorStore
from RAG.utils import ReadFiles
from RAG.LLM import OpenAIChat, InternLMChat
from RAG.Embeddings import JinaEmbedding#建立向量数据库
docs = ReadFiles('data1').get_content(max_token_len=600, cover_content=150) # 获得data目录下的所有文件内容并分割
vector = VectorStore(docs)
embedding = JinaEmbedding(path='/mnt/workspace/autodl-tmp/jinaai/jina-embeddings-v2-base-zh') # 创建EmbeddingModel
vector.get_vector(EmbeddingModel=embedding)
vector.persist(path='storage') # 将向量和文档内容保存到storage目录下,下次再用就可以直接加载本地的数据库
vector = VectorStore()vector.load_vector('storage') # 加载本地的数据库embedding = JinaEmbedding(path='/mnt/workspace/autodl-tmp/jinaai/jina-embeddings-v2-base-zh')question = '商品信息中的人工和制费分摊权数怎么设置?'content = vector.query(question, EmbeddingModel=embedding, k=1)[0]print(content)model = InternLMChat(path='/mnt/workspace/autodl-tmp/Shanghai_AI_Laboratory/internlm2-chat-7b')
print(model.chat(question, [], content))
Tiny-LLM
此项目在于实现一个简单的大语言模型,从训练tokenizer开始,到训练模型,再到使用模型生成文本。仅使用Numpy和Pytorch即可实现一个简单的大语言模型训练,显存使用2G左右。以下为项目效果展示
训练模型所需要的资源也是很少的,仅需要一个显卡即可,显存使用2G左右。训练模型的时间也不长,仅需要几个小时即可完成。
Usage
按照以下步骤进行训练。
- 训练Tokenizer:
python train_vocab.py --download True --vocab_size 4096
- 数据预处理:
python preprocess.py
- 训练模型:
python train.py
- 使用模型生成文本:
python sample.py --prompt "One day, Lily met a Shoggoth"
Blog
Step 1: 训练Tokenizer
首先,我们需要为文本处理训练一个Tokenizer。Tokenizer的作用是将文本转换为数字序列,以便模型能够理解和处理。我们使用的数据集是 TinyStory ,它是一个由GPT-3.5和GPT-4生成的小型故事数据集,包含简短的故事,且词汇量有限。在这个任务中,我们采用字符级Tokenizer,将文本中的每个字符映射为对应的数字。通过以下命令可以下载数据集并训练Tokenizer。
python train_vocab.py --download True --vocab_size 4096
LLaMA2 的词表大小为 32,000,但由于 TinyStory 数据集较小,词汇量有限,我们将词表大小设置为 4,096。训练完成后,我们得到的 Tokenizer 能够将文本转换为数字序列,也可以将数字序列还原为文本。
def download_file(url: str, fname: str, chunk_size=1024):"""发送HTTP GET请求以流式方式获取文件"""···def download():"""执行 download_file 下载数据集"""···def train_vocab(vocab_size: int=32000, num_shards: int=20):"""vocab_size: int, 词汇表的大小,决定分词器的词汇量。num_shards: int, 用于加快词汇表训练的效率,指定要处理的分片数量。"""# 确保词汇表大小为正数assert vocab_size > 0, "Vocab size must be positive"# SentencePiece 模型的前缀路径,将用于保存分词器prefix = os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}")# 1) 将多个分片中的文本导出为单个文本文件 tiny.txttiny_file = os.path.join(DATA_CACHE_DIR, "tiny.txt")data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data")shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json")))# 创建 tiny.txt 文件并写入指定数量的分片中的文本print(f"Writing temporary file {tiny_file} with {num_shards} shards...")with open(tiny_file, "w", encoding="utf-8") as of:# 遍历前 num_shards 个分片for shard in tqdm(shard_filenames[:num_shards]):with open(shard, "r") as f:data = json.load(f) # 读取分片中的JSON数据# 遍历每个例子,将其中的故事文本写入 tiny.txt 文件for example in data:text = example["story"]text = text.strip() # 去除文本首尾的空白字符of.write(text + "\n") # 每个文本写入一行# 输出生成的 tiny.txt 文件的大小print(f"Size is: {os.path.getsize(tiny_file) / 1024 / 1024:.2f} MB")# 2) 使用 SentencePiece 训练分词器print("Will now train the vocab...")spm.SentencePieceTrainer.train(input=tiny_file, # 输入文件为之前生成的 tiny.txtmodel_prefix=prefix, # 模型前缀路径model_type="bpe", # 使用 Byte-Pair Encoding (BPE) 训练分词器vocab_size=vocab_size, # 词汇表大小self_test_sample_size=0, # 自测样本大小设置为 0input_format="text", # 输入文件格式为纯文本character_coverage=1.0, # 覆盖所有字符(包括非常见字符)num_threads=os.cpu_count(), # 使用 CPU 的线程数split_digits=True, # 拆分数字allow_whitespace_only_pieces=True, # 允许仅由空格组成的词元byte_fallback=True, # 启用字节级回退unk_surface=r" \342\201\207 ", # UNK token 表示未知字符的方式normalization_rule_name="identity" # 使用“identity”归一化规则)# 3) 可选的清理操作,询问用户是否删除临时文件 tiny.txtdec = input(f"Delete the temporary file {tiny_file}? [y/N] ")if dec.lower() == "y":os.remove(tiny_file) # 删除临时文件print(f"Deleted {tiny_file}")# 输出模型保存的路径print(f"Trained tokenizer is in {prefix}.model")print("Done.")
在本部分中,我们使用了 SentencePiece
库来训练自定义的 Tokenizer
。首先,我们需要从 TinyStory
数据集中提取文本内容,作为训练的输入数据。SentencePiece
是一种基于子词单元的分词算法,能够有效处理不同语言中的词汇碎片化问题。
训练 Tokenizer
时,SentencePiece
会自动生成两个文件:tok4096.model
和 tok4096.vocab
,其中 tok4096.model
是我们训练好的模型文件,位于 data
目录下。这个文件可以用于将文本数据转换为 Token
序列,也可以将 Token
序列还原为文本。
为了更便捷地使用这个 Tokenizer
,我们还在 tokenizer.py
文件中定义了一个 Tokenizer
类。这个类封装了 Tokenizer
的常用操作,例如文本编码和解码功能,并支持加载我们训练好的模型文件。通过这个类,我们可以轻松地将文本转换为模型可接受的数字序列,或将预测结果转化为可读的文本。
具体的代码实现和细节可以在 tokenizer.py
文件中找到,接下来我们将进一步展示如何使用该类来处理 TinyStory
数据集中的故事文本。
class Tokenizer:def __init__(self, tokenizer_model=None):"""初始化分词器。加载预训练的SentencePiece模型,并设置一些特殊的token ID。参数:tokenizer_model: str, 可选,分词器模型的路径,如果不指定则使用默认路径 TOKENIZER_MODEL。"""# 如果提供了分词器模型路径,使用该路径;否则使用默认模型路径model_path = tokenizer_model if tokenizer_model else TOKENIZER_MODEL# 确保模型文件存在assert os.path.isfile(model_path), model_path# 加载 SentencePiece 模型self.sp_model = SentencePieceProcessor(model_file=model_path)self.model_path = model_path# 获取分词器的特殊token和词汇表大小self.n_words: int = self.sp_model.vocab_size() # 词汇表大小self.bos_id: int = self.sp_model.bos_id() # 句子开头 (BOS) 的IDself.eos_id: int = self.sp_model.eos_id() # 句子结尾 (EOS) 的IDself.pad_id: int = self.sp_model.pad_id() # 填充 (PAD) 的ID# 验证分词器词汇表大小是否正确assert self.sp_model.vocab_size() == self.sp_model.get_piece_size()def encode(self, s: str, bos: bool, eos: bool) -> List[int]:"""将字符串编码为词元ID列表。可以选择是否添加句子开头 (BOS) 和句子结尾 (EOS) 标记。参数:s: str, 要编码的字符串。bos: bool, 是否在编码的词元列表前添加 BOS 标记。eos: bool, 是否在编码的词元列表末尾添加 EOS 标记。返回:List[int]: 编码后的词元ID列表。"""# 确保输入是字符串类型assert type(s) is str# 使用SentencePiece将字符串编码为词元IDt = self.sp_model.encode(s)# 如果需要BOS标记,将其添加到词元列表开头if bos:t = [self.bos_id] + t# 如果需要EOS标记,将其添加到词元列表末尾if eos:t = t + [self.eos_id]return tdef decode(self, t: List[int]) -> str:"""将词元ID列表解码为字符串。参数:t: List[int], 词元ID列表。返回:str: 解码后的字符串。s"""return self.sp_model.decode(t)
在这个 Tokenizer
类中,我们首先初始化了一些特殊的 token ID,这些特殊 tokens 在自然语言处理任务中有着重要作用,分别用于填充、处理未识别的词汇、表示句子的开头和结尾等。在模型训练和推理过程中,正确处理这些特殊 tokens 对于提升模型性能至关重要。
接着,我们定义了两个关键方法:
-
encode 方法:该方法负责将输入文本转换为 token ID 序列。通过加载预训练的 Tokenizer 模型,我们可以对文本进行分词,将其拆解为词或子词,并将其映射为相应的数字表示。这个数字序列可以被模型接受用于训练和推理。
-
decode 方法:与 encode 方法相反,decode 方法用于将 token ID 序列还原为可读的文本。它将数字序列转换回对应的 tokens,并拼接成完整的文本,从而可以对模型的输出进行解释和展示。
这些方法的定义使得我们在使用过程中,可以非常方便地在文本与数字序列之间进行转换,为模型的输入与输出提供接口。大家可以使用以下代码测试 Tokenizer
的功能,验证其是否能够正确地将文本转换为数字序列,或者将数字序列还原为文本。
Step 2: 数据预处理
在训练模型之前,首先需要对数据进行预处理。这一步的核心任务是将文本数据转换为模型能够理解的数字序列。具体来说,文本中的每个字符、单词或子词都需要被映射为一个唯一的数字 ID,这样模型才能处理这些数据。
# 定义分片处理函数
def process_shard(args, vocab_size, tokenizer_model_path):""" 处理数据分片,将其中的文本进行分词并保存为二进制文件 """···# 定义预处理函数,用于对多个数据分片进行批量处理
def pretokenize(vocab_size):""" 预处理所有的数据分片,并将分词后的数据保存为二进制文件 """···class PretokDataset(torch.utils.data.IterableDataset):"""从磁盘加载已预处理的分词数据,并将其以 PyTorch 张量的形式返回。"""def __init__(self, split, max_seq_len, vocab_size, vocab_source):"""初始化数据集。参数:split: str, 数据集的分割方式('train' 或 'test')。max_seq_len: int, 最大序列长度,用于生成输入输出序列。vocab_size: int, 词汇表的大小。vocab_source: str, 词汇表的来源('llama2' 或 'custom')。"""super().__init__()self.split = split # 数据集划分(训练集或测试集)self.max_seq_len = max_seq_len # 最大序列长度self.vocab_size = vocab_size # 词汇表大小self.vocab_source = vocab_source # 词汇表来源def __iter__(self):"""返回迭代器,按批次加载数据并生成模型输入/输出。"""# 获取DataLoader的worker信息(用于并行数据加载)worker_info = torch.utils.data.get_worker_info()worker_id = worker_info.id if worker_info else 0 # worker ID# 获取分布式训练的rank信息(用于多GPU训练)rank = dist.get_rank() if dist.is_initialized() else 0# 基于worker_id和rank生成唯一的随机数种子,确保数据在每个worker和rank之间是唯一的seed = 42 + worker_id + 1337 * rankrng = random.Random(seed)print(f"Created a PretokDataset with rng seed {seed}")# 根据词汇表来源决定数据路径if self.vocab_source == "llama2":# 如果使用 Llama 2 词汇表,.bin 文件和 .json 文件在同一目录下bin_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data")shard_filenames = sorted(glob.glob(os.path.join(bin_dir, "*.bin")))elif self.vocab_source == "custom":# 如果使用自定义词汇表,.bin 文件在 tok{N} 目录下bin_dir = os.path.join(DATA_CACHE_DIR, f"tok{self.vocab_size}")shard_filenames = sorted(glob.glob(os.path.join(bin_dir, "*.bin")))# 根据数据集划分使用不同的分片文件# 训练集使用所有分片文件,测试集只使用第一个分片shard_filenames = shard_filenames[1:] if self.split == "train" else shard_filenames[:1]assert len(shard_filenames) > 0, f"在 {bin_dir} 中未找到任何 .bin 文件"while True:# 随机打乱分片文件rng.shuffle(shard_filenames)for shard in shard_filenames:# 使用 memmap 读取文件,使得数据留在磁盘上,减少内存占用m = np.memmap(shard, dtype=np.uint16, mode="r")# 计算该分片中的批次数量num_batches = len(m) // self.max_seq_lennum_batches -= 1 # 去掉最后一个不完整的批次assert num_batches > 0, "这个分片文件太小了?请检查。"# 随机打乱批次索引ixs = list(range(num_batches))rng.shuffle(ixs)# 对每个批次生成输入 x 和目标输出 yfor ix in ixs:start = ix * self.max_seq_len # 批次起始索引end = start + self.max_seq_len + 1 # 批次结束索引# 将数据转换为 NumPy 数组并拷贝到 RAM 中chunk = torch.from_numpy((m[start:end]).astype(np.int64))# 模型输入 x 是当前批次的前 max_seq_len 个词元x = chunk[:-1]# 模型输出 y 是下一个词元y = chunk[1:]# 生成 x, y 对yield x, yclass Task:@staticmethoddef iter_batches(batch_size, device, num_workers=0, **dataset_kwargs):ds = PretokDataset(**dataset_kwargs)dl = torch.utils.data.DataLoader(ds, batch_size=batch_size, pin_memory=True, num_workers=num_workers)for x, y in dl:x = x.to(device, non_blocking=True)y = y.to(device, non_blocking=True)yield x, y
在这部分中,首先定义了 process_shard
函数,用于处理数据分片。该函数的主要功能是将文本数据分词后,转换为更高效的二进制文件格式,以便后续更快速地加载和处理数据。
接下来,我们定义了 pretokenize
函数,用于批量处理多个数据分片。通过这一函数,所有数据可以并行处理,进一步加快预处理的速度。
然后,我们设计了一个 PretokDataset
类,用于加载已预处理好的数据集。我们继承了 torch.utils.data.IterableDataset
来定义该数据集,这使得我们可以更灵活、高效地处理数据。在这个类中,核心是 __iter__
方法,它负责生成用于训练的数据批次。
最后,我们还定义了一个 Task
类,专门用于迭代数据集,并生成模型所需的输入和目标输出。这一部分的设计确保了数据流的顺畅对接,为模型训练提供了标准化的数据输入。可以通过以下代码来测试预处理后的数据集。
Step 3: 训练模型
在数据预处理完成后,我们就可以开始训练模型了。我们使用的模型是一个和LLama2结构一样的 Decoder only Transformer模型,使用Pytorch实现。相关代码在model.py
文件中。此处不再赘述,源码中有详细的中文注释,且我们在之前的文章中也有详细的介绍。
在模型这一部分可以重点看一下生成式模型是如何实现生成token的,可以查看model.py
文件中的Transforerm
类中的generate
方法。
在完成数据预处理后,我们就可以开始训练模型了。我们使用的模型是一个与 LLaMA2 结构相同的 Decoder-only Transformer 模型,采用 PyTorch 实现。具体的实现细节已经包含在 model.py
文件中,在此不再赘述。该源码中包含详细的中文注释,此外我们在之前的文章中也对模型架构进行了深入介绍。
在模型部分,建议重点关注生成式模型如何生成 token 的过程。
在 generate 方法中,我们首先获取序列中最后一个位置的 logits,然后基于这些 logits 生成新的 token。接着,生成的新 token 会被添加到序列中,模型随后会继续生成下一个 token。通过这种迭代过程,我们能够生成完整的文本。接下来,您可以使用以下命令开始训练模型。
python train.py
在 train.py
中我们定义了很多超参数,包括但不限于模型的维度,层数,学习率等等。如下所示,更多的内容大家可以在源码中查看,源码加了很详细的中文注释,相信大家可以很容易看懂。
# -----------------------------------------------------------------------------
# I/O 配置,用于定义输出目录和训练时的日志记录与评估设置
out_dir = "output" # 模型输出保存路径
eval_interval = 2000 # 评估间隔步数
log_interval = 1 # 日志记录间隔步数
eval_iters = 100 # 每次评估时迭代的步数
eval_only = False # 如果为True,脚本在第一次评估后立即退出
always_save_checkpoint = False # 如果为True,在每次评估后总是保存检查点
init_from = "scratch" # 可以选择从头开始训练('scratch')或从已有的检查点恢复('resume')# 数据配置
batch_size = 8 # 每个微批次的样本数量,如果使用梯度累积,实际批次大小将更大
max_seq_len = 256 # 最大序列长度
vocab_size = 4096 # 自定义词汇表大小# 模型配置
dim = 288 # 模型的隐藏层维度
n_layers = 8 # Transformer的层数
n_heads = 8 # 注意力头的数量
n_kv_heads = 4 # 模型分组
multiple_of = 32 # 在某些层的维度必须是该数的倍数
dropout = 0.0 # Dropout概率# AdamW优化器配置
gradient_accumulation_steps = 4 # 梯度累积步数,用于模拟更大的批次
learning_rate = 5e-4 # 最大学习率
max_iters = 100000 # 总的训练迭代次数
weight_decay = 1e-1 # 权重衰减系数
beta1 = 0.9 # AdamW优化器的β1参数
beta2 = 0.95 # AdamW优化器的β2参数
grad_clip = 1.0 # 梯度裁剪阈值,0表示不裁剪# 学习率衰减配置
decay_lr = True # 是否启用学习率衰减
warmup_iters = 1000 # 学习率预热的步数# 系统设置
device = "cuda:0" # 设备选择:'cpu','cuda','cuda:0'等
dtype = "bfloat16" # 数据类型:'float32','bfloat16','float16'
Step 4: 使用模型生成文本
在模型训练完成后,会在output
目录下生成一个ckpt.pt
文件,这个文件就是我们训练好的模型。我们可以使用以下命令生成文本。
python sample.py --prompt "One day, Lily met a Shoggoth"
定义了一个TextGenerator
类,用于生成文本。
class TextGenerator:def __init__(self, checkpoint='output/ckpt.pt', # 模型检查点路径tokenizer_model_path='tok4096.model', # 分词器模型路径seed=1337, # 随机种子,确保可重复性device=None, # 设备,优先使用 CUDA,如果没有可用的 CUDA,则使用 CPUdtype="float32"): # 数据类型,默认为 float32,可以选择 float16 或 bfloat16"""初始化 TextGenerator 类,加载模型、设置设备和分词器等。"""# 模型加载配置self.checkpoint = checkpoint # 保存的模型检查点路径self.tokenizer_model_path = tokenizer_model_path # 分词器模型文件路径self.seed = seed # 随机数种子,用于生成的可重复性self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu') # 根据硬件条件选择设备self.dtype = dtype # 模型的浮点数类型self.device_type = 'cuda' if 'cuda' in self.device else 'cpu' # 判断当前设备是否为 CUDA# 设置随机种子,确保生成的可重复性torch.manual_seed(seed) # 设置 CPU 随机种子torch.cuda.manual_seed(seed) # 设置 CUDA 随机种子torch.backends.cuda.matmul.allow_tf32 = True # 允许 CUDA 使用 TF32 精度进行矩阵乘法运算torch.backends.cudnn.allow_tf32 = True # 允许 cuDNN 使用 TF32 精度加速# 根据 dtype 选择适当的自动混合精度上下文ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[self.dtype]self.ctx = nullcontext() if self.device_type == 'cpu' else torch.amp.autocast(device_type=self.device_type, dtype=ptdtype)# 加载模型检查点文件checkpoint_dict = torch.load(self.checkpoint, map_location=self.device) # 加载模型参数gptconf = ModelArgs(**checkpoint_dict['model_args']) # 初始化模型参数self.model = Transformer(gptconf) # 实例化 Transformer 模型state_dict = checkpoint_dict['model'] # 获取模型状态字典# 去除状态字典中的不必要前缀unwanted_prefix = '_orig_mod.' # 这个前缀在保存时可能被添加,现在要去除它for k, v in list(state_dict.items()):if k.startswith(unwanted_prefix):state_dict[k[len(unwanted_prefix):]] = state_dict.pop(k) # 去除不必要的前缀# 加载模型参数到模型中self.model.load_state_dict(state_dict, strict=False)# 计算模型参数量num_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)print(f"Model has {num_params} parameters.")# 设置模型为评估模式(evaluation mode),防止训练模式下的 dropout 等操作影响结果self.model.eval()# 将模型放置到正确的设备上(GPU 或 CPU)self.model.to(self.device)# 初始化分词器self.tokenizer = Tokenizer(tokenizer_model=self.tokenizer_model_path) # 根据指定的路径加载分词器def sample(self, start="Hello!", # 生成文本的起始提示词,可以是任意字符串num_samples=3, # 生成样本的数量,默认生成 3 个样本max_new_tokens=256, # 每个样本生成的最大 token 数,默认最多生成 256 个 tokentemperature=1.0, # 控制生成的随机性,1.0 为标准,值越大越随机top_k=300): # 保留概率最高的 top_k 个 token,限制生成时的选择范围"""根据给定的起始文本生成样本。:param start: 生成文本的起始提示词:param num_samples: 要生成的文本样本数:param max_new_tokens: 每个样本生成的最大 token 数:param temperature: 控制生成的随机性,值越小生成越确定,值越大生成越随机:param top_k: 限制生成时选择的 token 范围:return: 生成的文本样本列表"""# 如果 start 是以 'FILE:' 开头,表示从文件中读取起始文本if start.startswith('FILE:'):with open(start[5:], 'r', encoding='utf-8') as f:start = f.read() # 读取文件内容作为起始文本# 将起始文本编码为 token id 序列start_ids = self.tokenizer.encode(start, bos=True, eos=False) # bos=True 表示加上句首标记,eos=False 表示不加句尾标记x = (torch.tensor(start_ids, dtype=torch.long, device=self.device)[None, ...]) # 将编码后的 token id 转为 PyTorch 张量generated_texts = [] # 用于保存生成的文本样本with torch.no_grad(): # 禁用梯度计算,提升效率with self.ctx: # 进入自动混合精度的上下文(如果是 GPU 并使用 float16 时)for k in range(num_samples): # 循环生成指定数量的样本y = self.model.generate(x, max_new_tokens, temperature=temperature, top_k=top_k) # 生成文本generated_texts.append(self.tokenizer.decode(y[0].tolist())) # 解码生成的 token 序列为可读文本return generated_texts # 返回生成的文本样本
8. 参考文献
- When Large Language Models Meet Vector Databases: A Survey
- Retrieval-Augmented Generation for Large Language Models: A Survey
- Learning to Filter Context for Retrieval-Augmented Generation
- In-Context Retrieval-Augmented Language Models