过程监督(Process Supervision)融入到 GRPO (Group Relative Policy Optimization)

news/2025/2/23 18:03:29/

下面演示如何把“过程监督(Process Supervision)”的思想融入到 GRPO (Group Relative Policy Optimization) 中,从而对每个输出的中间步骤逐一打分、计算相对优势。本文的示例代码与 grpo_train.py (来源见下文)类似,核心区别在于对“中间步骤”进行奖励评估并将其累计到对应 token,而不是只在回答最终完成时打分。

1. 背景:什么是过程监督(Process Supervision)?

LLM 的强化学习 (RL) 训练中,我们可以根据不同的“奖励时间点”来划分:

  • Outcome Supervision: 只对最后的完成输出给一个整体奖励。之前的 grpo_train.py 即是这种做法。
  • Process Supervision: 在生成完成输出的过程中,每一步或每个重要阶段都可以有一个“中间奖励(或惩罚)”,这样模型会及时地知道哪些中间推理步骤更优,从而更精细地调整策略。

对于数学推理、代码生成等场景,如果我们能获取“过程”中的正确/错误信息,那么过程监督往往会收敛得更好、更快。

2. 核心思路:对每个中间步骤打分,再把奖励“回填”到后续 token

在下方示例里,我们做了一个极简版本GRPO + 过程监督:

  1. 同一 Prompt 下采样 G 条回答:每条回答可能包含多个步骤(或多行拆开)。
  2. 把回答分成若干步骤:例如碰到某些分隔符时就视为一个step;
  3. 调用奖励函数:对每一步骤打一个分 r(step_j)
  4. 回填(token-level):对于第 j 步的奖励,要赋给从该步骤对应的起始 token下一步开始 token前的那些 tokens;这样后续的 tokens 也会“继承”该步的优劣信息。
  5. 组内做相对比较:跟 outcome supervision 类似,把 G 条回答中所有步骤的分数合在一起做 group-wise 标准化,得到相对优势。
  6. 计算损失:跟常规 PPO/GRPO 相似,不过“优势”现在是按步骤累加。

请注意,这只是一个示例,实际工程中,如何判定“一个步骤”可能需要事先插入特殊分割符,或正则去找“Step N: ”之类的标签等。


3. 示例代码

为直观起见,我们先给出一个与 GRPOTrainer 类似的 PSGRPOTrainer(Process-Supervision GRPO Trainer,请参考笔者的另一篇博客:TRL里面GRPOTrainer中grpo_train.py文件详解)示例文件。你可以把它当成 pseudo code 或 demo,因为很多实现细节(比如如何准确分割 step、如何加载 reward model)需要你在项目中自行完成。

import torch
import torch.nn.functional as F
from torch import nn
from transformers import Trainer
import mathclass PSGRPOTrainer(Trainer):"""A simple example of GRPO with Process Supervision, by inheriting from Trainer.Similar structure to `grpo_train.py`, but modifies `_prepare_inputs` to do step-level scoring."""def __init__(self, *args, reward_model=None, num_generations=4, beta=0.02, **kwargs):super().__init__(*args, **kwargs)self.reward_model = reward_model  # 过程监督用的RMself.num_generations = num_generationsself.beta = beta# 其他初始化:参考模型、max_prompt_length 等等在此省略def _prepare_inputs(self, inputs):"""1. 对每条prompt采样G条输出2. 对每条输出分步骤打分3. 计算组内相对优势(此处演示Process-level)"""device = self.args.device# Step 1: 取出 promptprompts = [example["prompt"] for example in inputs]# 用 tokenizer 编码 prompt,这里省略# prompt_ids, prompt_mask = ...# Step 2: 对每个prompt一次性生成多条回答# 下面是伪码: # completion_ids = []# for each prompt in batch:#   for g in range(self.num_generations):#       out = self.model.generate(prompt_ids, ...)#       completion_ids.append(out)# completion_ids => shape: [B * G, seq_len]# 省略: 需要类似 grpo_train.py 的 EOS masking, 参考模型 logprob 计算等# ...# ----下面只演示 “分步骤打分 + 累计奖励” 部分----# Step 3: 分割回答成若干step并打分# 这是个极简示例:假设回答里用特殊符号 "STEP:" 标识中间过程# 需要把 tokens decode -> text, split -> steps# 也可以在token层面做,但此处为直观简化# decode 到文本completions_text = ["示例回答1: STEP: 中间推理. STEP: 最终结论",  # for B*G lines"示例回答2: ...","..."]# 我们打算对每一条回答的 step 做打分# 先把 group reshape# group_size = self.num_generations# B = len(prompts)# 这里省略# 用于存储 "每条输出" 的每个step的分数# 例如 step_rewards_list[i] = [r_step1, r_step2, ...]step_rewards_list = []for completion_text in completions_text:# 分割出多个step:steps = completion_text.split("STEP:")# 每个step都要调用 reward_model 打分 (过程监督)# 这里假设 reward_model 接受字符串输入 => 返回 [分数]step_rewards = []for s in steps:# 纯演示r = self._score_one_step(s)  # 后面定义个例子函数step_rewards.append(r)step_rewards_list.append(step_rewards)# Step 4: 把 step-level reward 转成 token-level advantage# 我们需要知道 "step_j" 覆盖了哪些 token# 这里做一个简单的假设:steps间等长拆分, 真实情况需要依赖 token idx.# 先假设 each step = roughly same length# 伪码: # for each sample i in [0..B*G):#   step_boundaries[i][j] = (start_token_idx, end_token_idx)#   total_token_in_completion[i] = ...#   then for token in [start,end): advantage[token] += step_rewards[j]# 由于是示例,我们简要写成:# shape = [B*G, max_completion_length], 先都 0completion_mask = torch.ones(len(completions_text), 30, dtype=torch.int32) # 30假设step_advantages = torch.zeros_like(completion_mask, dtype=torch.float32)for i, step_rs in enumerate(step_rewards_list):# step_rs: [r1, r2, ...]# 假设回答长 30 tokens, steps = len(step_rs), step_len = 30 // len(step_rs)# 真实情况要用 tokenizer decode去找step起止num_steps = len(step_rs)if num_steps == 0:continuestep_len = 30 // num_stepsfor j, r in enumerate(step_rs):start_idx = j * step_lenend_idx = (j+1)*step_len if j < num_steps-1 else 30# 把这个step的奖励写到对应token上step_advantages[i, start_idx:end_idx] = r# 现在 step_advantages[i,t] 就是 “第i条回答在token t 处的 reward”# 但还不是 group内部标准化 => 我们得先算 group内均值/方差# 这里先把 step_advantages -> sum => single scalar?# 1) 求每条回答 sum => shape: [B*G]# (也可对step更精细, 这里演示)sums = step_advantages.sum(dim=1)# 2) 分组# suppose shape [B, G]# B = len(prompts)# 这里略做 pseudoB = 2G = self.num_generationssums_2d = sums.view(B, G)  # => [B, G]mean = sums_2d.mean(dim=1, keepdim=True)  # => [B,1]std = sums_2d.std(dim=1, keepdim=True)    # => [B,1]# broadcast回去 => [B, G]sums_norm = (sums_2d - mean) / (std + 1e-4)# 3) expand回 [B*G]adv_norm = sums_norm.view(-1)# 4) token级别 advantage: 这时我们可以把 adv_norm[i] 覆盖到 step_advantages#   以便每个token都乘到相同“组内相对值”for i in range(step_advantages.size(0)):step_advantages[i] *= adv_norm[i] / (step_advantages[i].mean() + 1e-5)# 这里做一个很粗糙的 “再归一化” 示例 => 真实情况你可用别的方法# 最终 step_advantages 中每个token有个 相对 advantage# 下面返回给外面 compute_loss 用return {"completion_ids": ...,"completion_mask": completion_mask,"step_advantages": step_advantages,# 还需要 ref_model_logps, prompt_ids, etc...}def compute_loss(self, model, inputs, return_outputs=False):"""类似grpo_train.py的compute_loss,但过程奖励是 step_advantages"""completion_ids = inputs["completion_ids"]completion_mask = inputs["completion_mask"]step_advantages = inputs["step_advantages"]# 1) compute current model log prob#    => shape (B*G, seq_len)# logps = self._get_per_token_logps(model, completion_ids,...)logps = torch.randn_like(step_advantages)  # 伪造# 2) compute KL with ref model#    => shape same as logpskl = torch.zeros_like(logps)  # demo# 3) 组装loss, #    这里也跟 grpo_train.py相似: - [ exp(logp-old_logp)*A - beta*kl ]#    但此处A换成 step_advantagesratio = torch.exp(logps - logps.detach())  # placeholderper_token_loss = ratio * step_advantages - self.beta * klper_token_loss = -per_token_loss  # maximize => negative# apply maskmasked_loss = per_token_loss * completion_maskloss = masked_loss.sum(dim=1) / (completion_mask.sum(dim=1)+1e-5)final_loss = loss.mean()return final_lossdef _score_one_step(self, step_text):"""给示例用的过程监督打分函数你可以对 step_text 做任何解析、用 reward_model forward...这里就简单假设: 越长的文本给越高reward"""return float(len(step_text)) * 0.01

上面这段代码的思路和原有 grpo_train.py 类似,但在 _prepare_inputs 的部分我们做了过程监督:

  • 拆分多步completion_text.split("STEP:")
  • step 级打分_score_one_step
  • 把每个step的分数写到token区间上
  • 再做group内均值/方差归一化,得到相对优势;
  • 最后在 compute_loss,按照这些“step_advantages”来计算每个token应被鼓励还是惩罚。

你需要做的改进

  1. 步骤边界如何划分:实际中,你可能在上游 prompt 或回答中嵌入特殊标签(“Step 1: …”),或者对Chain-of-thought做额外标记;
  2. 奖励模型:可以是真实训练好的 reward model,输入 step_text + prompt,输出一个float分数;
  3. KL 与 参考模型:示例中只用 0 做占位,应换成真正 ref_model 计算 token-level logp,以做 PPO/GRPO style KL 惩罚;
  4. 组内对每一步Reward做归一化:示例里先把 step-level reward 累加成 [B*G] scalar,再做 group wise std / mean;实际你可考虑更精细化:比如先对 step-level reward 同一组内比较,然后再把它加到 token-level advantage。

4. 关键差异:Outcome vs. Process Supervision

  • Outcome Supervisionrewards[B*G],对整条输出一次性打分 -> group 内均值/方差 -> advantage。
  • Process Supervision:每个输出包含若干步骤([step1, step2, ...]),给每个step一个reward,然后映射到token级别,再进行group-wise归一化。
    • 好处:对复杂推理题,模型能“分步知道”哪些环节导致错误/正确;
    • 代价:实现更复杂,需要在生成或解析环节多做拆分和打分,reward model 也要能处理细粒度步骤。

5. 总结

  • 过程监督在理论和实践中都有助于提高训练效率、减少大模型出现错误推理的概率;对长序列任务(如数学、多段代码生成)尤其有价值。
  • 本文用一个简单的PSGRPOTrainer示例演示了“如何在 _prepare_inputs 阶段对每条回答拆分多步 -> 打分 -> 累加到 token”,再在 compute_loss 做相对优势与 KL 的计算。
  • 相比起 grpo_train.py 中的 outcome 监督,此处的关键是:“一个回答” 不再只产出一个 reward,而是对多个“step”产出多个 reward,并把它累加到每个 step 的 token 区间上
  • 若你想真正应用到生产环境,需要配合真实的“步骤拆分 / 高质量过程 reward model”,以及可靠的 KL 计算、去重、并行等机制。

希望以上分析能帮助你理解如何把过程监督融入到 GRPO 中,从而获得更细粒度的奖励信号,让大语言模型更加“稳健”地学会多步推理、代码生成等复杂任务。

GRPOdemo_254">改进的GRPO:可运行的demo

下面这段代码展示了一个可运行的示例,演示如何在“过程监督(Process Supervision)”场景下,用GRPO的思路来训练大语言模型。相比之前的简化版本,我们做了以下改进:

  1. 上游 Prompt 或回答中显式插入“Step 1: … Step N: …”标签,以便我们能够准确识别和解析“中间步骤”。
  2. 奖励模型(reward_model)可被外部传入,并且在本示例中通过 forward(prompt, step_text) 返回一个 float 分数(模拟真实训练好的模型)。
  3. KL 与 参考模型:我们定义了一个 ref_model,在计算 loss 时真正地对比“当前模型 vs. 参考模型”的对数概率,得到 KL 惩罚项。

请注意,这段示例是一个“可运行的演示”:

  • _sample_completions 中并未真正调用 model.generate,而是生成一些“带 step 标签的回答”用于说明过程监督的思路;
  • 你可以在实际项目中替换 _sample_completions 为真实的文本生成逻辑,并替换 DummyRewardModel 为你的“训练好的奖励模型”或“自定义评分函数”。

以下示例使用了 transformers 中的 Trainer 机制,同时使用了 PyTorch 进行数值运算。你可以把这段代码保存为 ps_grpo_train.py 之类的文件,安装好 transformers >=4.26.0, torch >=1.10 后直接跑一个“假训练循环”来观察它的执行。


import torch
import torch.nn.functional as F
from torch import nn
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments
from dataclasses import dataclass, field
from typing import Optional, List, Dictdef _get_per_token_logp(model: nn.Module, input_ids: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:"""给定 model、input_ids、attention_mask,返回token级别的 log p (对数概率),形状: (batch_size, seq_len)。这是 PPO/GRPO 常见的辅助函数。"""with torch.no_grad():outputs = model(input_ids=input_ids, attention_mask=attention_mask)# outputs.logits形状 [B, L, vocab_size]logits = outputs.logits# 对最后一列logits无token对应,所以取[:, :-1, :] => logit for token0..token(L-2)# 并与input_ids[:,1..]对齐,下面做示例中的简单对齐: # 你也可以根据需要做 shift# 这里只是演示:shifted_logits = logits[:, :-1, :]shifted_input_ids = input_ids[:, 1:]# log_softmax 后 gather:log_probs_all = F.log_softmax(shifted_logits, dim=-1)# gathergathered = log_probs_all.gather(dim=-1, index=shifted_input_ids.unsqueeze(-1)).squeeze(-1)# shape [B, L-1]# 补齐成与 input_ids 同长(或加padding)pad_len = input_ids.size(1) - gathered.size(1)if pad_len > 0:pad_zeros = gathered.new_zeros(gathered.size(0), pad_len)gathered = torch.cat([gathered, pad_zeros], dim=1)return gathered  # [B, L], 末尾几个可能是0class DummyRewardModel:"""模拟训练好的奖励模型,对 (prompt, step_text) 返回一个 float 分数这里根据 step_text 的长度 & 某些关键词来进行伪打分。"""def __init__(self):passdef forward(self, prompt: str, step_text: str) -> float:# 演示:越长的 step_text,分数越高;如果包含 'mistake' 字眼就减分base_score = float(len(step_text)) * 0.01if "mistake" in step_text.lower():base_score -= 0.3return base_score@dataclass
class PSGRPOConfig:"""训练配置: 包括 GRPO 相关参数"""num_generations: int = 2  # 一次对同一个 prompt 生成多少回答beta: float = 0.02        # KL 系数max_steps: int = 2        # 假设回答中最多会出现 Step 1: ... Step 2: ... # 其他可拓展class PSGRPOTrainer(Trainer):def __init__(self,ref_model: nn.Module,reward_model: DummyRewardModel,tokenizer: AutoTokenizer,ps_config: PSGRPOConfig,*args, **kwargs):super().__init__(*args, **kwargs)self.ref_model = ref_modelself.reward_model = reward_modelself.tokenizer = tokenizerself.ps_config = ps_configdef _sample_completions(self, prompt: str) -> List[str]:"""演示:给定 prompt,模拟生成 self.ps_config.num_generations 条包含 "Step i:" 的回答。真实场景你会在这里使用 self.model.generate。"""completions = []for g in range(self.ps_config.num_generations):# 这里简单拼: # Step 1: (some text) # Step 2: ...# 并带一点随机step1 = f"Step 1: Analysis for prompt [{prompt}] part {g}."step2 = f"Step 2: Conclusion. {('mistake' if g % 2 == 0 else 'correctness')} info"completions.append(step1 + "\n" + step2)return completionsdef _process_supervision_rewards(self, prompt: str, completion: str) -> List[float]:"""给定某条回答(内含 'Step i:' 标签),拆分出多个步骤,对每一步调用 reward_model 打分。返回一个list, 对应每个step的reward."""# 以 "Step i:" 做拆分# 这里写个简单split# 真实情况可能需要更精细的正则lines = completion.split("\n")step_rewards = []for line in lines:if "Step" in line:# line示例: "Step 1: Analysis for prompt ..."step_text = line.strip()# 用 reward_modelr = self.reward_model.forward(prompt, step_text)step_rewards.append(r)return step_rewardsdef _prepare_inputs(self, inputs: List[Dict]) -> Dict[str, torch.Tensor]:"""模拟 grpo_train.py 中 _prepare_inputs 作用:1. 批量对 prompt 生成多条回答2. 对回答分步骤打分3. 分组计算 advantage4. 返回计算loss需要的数据(含ref_model对数概率)"""# inputs 里包含 promptbatch_prompts = [ex["prompt"] for ex in inputs]B = len(batch_prompts)G = self.ps_config.num_generationsall_completions = []# shape => [B*G] textfor prompt in batch_prompts:completions = self._sample_completions(prompt)  # G条all_completions.extend([(prompt, c) for c in completions])# 对每条回答做 step-level rewardstep_rewards_list = []  # len=B*G, each => list of step rewardsfor (prompt, completion) in all_completions:step_rs = self._process_supervision_rewards(prompt, completion)step_rewards_list.append(step_rs)# 假设对每条回答embedding => token IDs# 统一拼 [prompt, completion], 以便后面做 model logp# 真实情况你也可分开concat_texts = []for (p, c) in all_completions:concat_texts.append(p + "\n" + c)encoded = self.tokenizer(concat_texts, return_tensors="pt", padding=True, truncation=True,max_length=128)input_ids = encoded["input_ids"]attention_mask = encoded["attention_mask"]# 计算 “当前模型” 对 token 的 logpwith torch.no_grad():per_token_logps = _get_per_token_logp(self.model, input_ids, attention_mask)# 计算 “参考模型” 对 token 的 logp (用于 KL)with torch.no_grad():ref_token_logps = _get_per_token_logp(self.ref_model, input_ids, attention_mask)# 现在每条回答有 step_rewards_list[i] 形如 [r_step1, r_step2, ...]# 需要将这些step reward映射到 token 上# 这里只是演示: 用 sum(r_step_j) 作为回答总reward => group内做相对比较# (你也可以做更精细 token-level对step区间赋值)total_rewards = []for rs in step_rewards_list:s = sum(rs)  # sum of step-leveltotal_rewards.append(s)total_rewards = torch.tensor(total_rewards, dtype=torch.float32)# => [B*G]# Group: reshape => [B, G]total_rewards_2d = total_rewards.view(B, G)group_mean = total_rewards_2d.mean(dim=1, keepdim=True)group_std = total_rewards_2d.std(dim=1, keepdim=True)# broadcast => [B, G]group_adv = (total_rewards_2d - group_mean) / (group_std + 1e-5)# => flatten => [B*G]advantages = group_adv.view(-1)# 下面把 advantages broadcast到 token级别# 这里只是最简写法: 每个回答里所有token共享同一 advantage# shape => (B*G, seq_len)expanded_adv = advantages.unsqueeze(1).expand(-1, input_ids.size(1))# 返回 dict,给 compute_loss 用return {"input_ids": input_ids, "attention_mask": attention_mask,"per_token_logps": per_token_logps,"ref_token_logps": ref_token_logps,"advantages": expanded_adv}def compute_loss(self, model, inputs, return_outputs=False):"""类似grpo_train的compute_loss,- ratio*(advantages) - beta*KL"""input_ids = inputs["input_ids"]attention_mask = inputs["attention_mask"]current_logps = inputs["per_token_logps"]        # [B*G, seq_len]ref_logps = inputs["ref_token_logps"]            # [B*G, seq_len]advantages = inputs["advantages"]                # [B*G, seq_len]# ratio => exp(current_logps - detach(current_logps))# 这里原版 PPO/GRPO 是 compare old vs current, # 我们为了演示 => compare current with current.detach()# 真实可传 old_logpsratio = torch.exp(current_logps - current_logps.detach())# 计算KL: 这里是( reference vs current ) 或( current vs reference )?# PPO/GRPO 常见写法 = exp(ref-curr) - (ref-curr) - 1kl_diff = ref_logps - current_logpsper_token_kl = torch.exp(kl_diff) - kl_diff - 1# token-level lossper_token_loss = ratio * advantages - self.ps_config.beta * per_token_kl# 我们想 maximize => 取负per_token_loss = -per_token_loss# maskmask = attention_mask.float()masked_loss = per_token_loss * maskloss_per_seq = masked_loss.sum(dim=1) / mask.sum(dim=1).clamp_min(1e-5)final_loss = loss_per_seq.mean()return final_lossdef training_step(self, model, inputs):"""重写 Trainer 默认的 training_step,先调 _prepare_inputs,再调 compute_loss"""# 1) 根据 batch (list of dict) 做 prepareprepared = self._prepare_inputs(inputs)# 2) compute_lossloss = self.compute_loss(model, prepared)loss.backward()return loss.detach()##
# 一个简易main: 演示如何用 PSGRPOTrainer 
## def main():# 1.加载或初始化 "当前策略模型" 及 "参考模型"# 演示: 同一个初始权重model_name = "distilroberta-base"  # 用roberta做文本democurrent_model = AutoModelForCausalLM.from_pretrained("gpt2")  # GPT2 for simpler causalref_model = AutoModelForCausalLM.from_pretrained("gpt2")tokenizer = AutoTokenizer.from_pretrained("gpt2")tokenizer.pad_token = tokenizer.eos_token  # GPT2没有pad_token,我们指定 eos# 2. 准备一个RewardModelreward_model = DummyRewardModel()# 3. 准备训练超参 & Configps_config = PSGRPOConfig(num_generations=2,beta=0.01,max_steps=2)# 4. 构造个小训练数据# 只存一个字段 "prompt"train_data = [{"prompt":"How to solve x^2=4?"}, {"prompt":"What is AI safety?"}]# 伪做 2 steps epochs# Transformers的Trainer需要 "Dataset" 或list => 这里可用# 但 Trainer默认会把传入batch => dict of list. 所以, # 这里我们用 "collator" 直接返回batch list# 也可自定义 dataset# 5. 定义PSGRPOTrainertraining_args = TrainingArguments(output_dir="./outputs_ps_grpo",num_train_epochs=1,per_device_train_batch_size=2,logging_steps=1,learning_rate=1e-4)def my_data_collator(features):# features是list of {"prompt":...}# 直接原样返回 => trainer内部会把它当作batch: list of dictreturn featurestrainer = PSGRPOTrainer(model=current_model,ref_model=ref_model,reward_model=reward_model,tokenizer=tokenizer,ps_config=ps_config,args=training_args,train_dataset=train_data,  # 传入list => Trainer自动把它当datasetdata_collator=my_data_collator)# 6. 调Trainer进行"假训练"trainer.train()if __name__ == "__main__":main()

代码说明

  1. 模型加载

    • 我们使用 GPT2 作为当前策略模型 current_model 和参考模型 ref_model,以便在 compute_loss 阶段对比它们的对数概率计算 KL 惩罚。
    • 需要注意的是,如果你真的要做 PPO/GRPO,需要在更新策略模型前存一份 old_model 的快照,然后在下一个 batch 中将 old_model 用于 ratio;在本示例里,为了简化,我们把“旧策略”近似成 current_logps.detach() 来模拟 PPO/GRPO 公式中的 ratio
  2. 过程监督

    • _sample_completions 函数中,我们人为地在回答里插入了 “Step 1:” / “Step 2:” 标签,表示中间推理步骤。
    • _process_supervision_rewards 逐行查找 “Step i:” 的信息,并调用 reward_model.forward(prompt, step_text) 来拿到每个 step 的分数。
    • 在这里,我们只取了它们的和 sum(rs) 作为回答最终 reward,然后做 group 内的归一化
    • 若你想对每个 step 映射到 token 级别,你可以更加精细地记录 “Step j 对应从 token_x 到 token_y”,再给 [x..y] 的 token 分数。
  3. KL 惩罚

    • ref_token_logps = _get_per_token_logp(self.ref_model, ...) 返回参考模型对每个 token 的对数概率。
    • current_logps = _get_per_token_logp(self.model, ...) 返回当前策略对每个 token 的对数概率。
    • compute_loss 中,kl = exp(ref - current) - (ref - current) - 1 是 PPO/GRPO 常见近似写法。
    • 最终 (ratio*advantage - beta*kl) 做了合并,这就是 GRPO 的核心公式之一。
  4. 训练循环

    • Trainer 内部会把 train_dataset 分成 batch(这里 batch_size=2),并在 training_step 阶段调用 _prepare_inputs + compute_loss
    • _prepare_inputs 里做 prompt -> completions -> rewards -> advantage;
    • compute_loss 里做 PPO/GRPO formula, loss.backward()
    • 这样就完成了一次 RL 迭代。实际项目中你会迭代多 epochs / steps。

总结

通过这个示例,我们把过程监督融入到 GRPO 流程中,包括对每个 step 进行单独打分、对参考模型做 KL 惩罚,以及把分数合并为 advantage。虽然这里做了很多简化(例如 _sample_completions 是伪造的),但核心思路完全一致:

  1. 识别中间步骤Step i:标签),
  2. 对每个步骤打分(过程监督),
  3. 将分数映射到 token(或直接作为回答整体分数),
  4. 做组内比较(Group advantage)并结合 KL 来更新策略模型。

实际中,你可以:

  • 用真正的 _sample_completions(即 model.generate)或在线采样;
  • 用真实的“过程Reward模型”对中间步骤进行评估;
  • 对 steps -> token 做更精细的映射;
  • 保存/加载参考模型 old_model 做严格的 PPO ratio 计算。

这样就可以构建一个完整的“Process Supervision RL with GRPO”训练流水线。希望本示例能帮助你更好地落地过程监督的做法,让大模型在多步骤推理、代码生成、复杂任务中获得更好的训练与表现。

后记

2025年2月22日22点07分于上海。在GPT o1大模型辅助下完成。


http://www.ppmy.cn/news/1574471.html

相关文章

windows上vscode cmake工程搭建

安装vscode插件&#xff1a; 1.按装fastc&#xff08;主要是安装MinGW\mingw64比较方便&#xff09; 2.安装C&#xff0c;cmake&#xff0c;cmake tools插件 3.准备工作完成之后&#xff0c;按F1&#xff0c;选择cmake:Quick Start就可以创建一个cmake工程。 4.设置Cmake: G…

一个比 Nginx 还简单的 Web 服务器

企业级的 Web 服务器非常多&#xff0c;Nginx、Tomcat、Apache、IIS、FastAPI、Flask 等。今天松哥再给大家介绍一个开源的 Web 服务器&#xff0c;这款服务器具备自动 HTTPS 功能和高度可配置性&#xff0c;它的名字是&#xff1a;Caddy。 Caddy 是一个 Go 编写的 Web 服务器…

线代[8]|北大丘维声教授《怎样学习线性代数?》(红色字体为博主注释)

文章目录 说明一、线性代数的内容简介二、学习线性代数的用处三、线性代数的特点四、学习线性代数的方法五、更新时间记录 说明 文章中红色字体为博主敲录完丘教授这篇文章后所加&#xff0c;刷到这篇文章的读者在首次阅读应当跳过红色字体&#xff0c;先通读一读文章全文&…

CMake管理依赖实战:多仓库的无缝集成

随着软件复杂度的增加&#xff0c;单个项目可能需要依赖多个外部库或模块。这些依赖项可能是来自不同的代码仓库&#xff0c;如ATest和BTest。为了实现高效的依赖管理&#xff0c;CMake提供了多种方式来处理这种多仓库的情况。下面我们将详细介绍几种常见的方法&#xff0c;并通…

鸿蒙初学者学习手册(HarmonyOSNext_API14)_组件截图(@ohos.arkui.componentSnapshot (组件截图) )

前言&#xff1a; 这个模块可以截取组件的图片&#xff0c;无论组件是否已加载。截图只能拍到组件本身的大小区域。 如果组件或其子组件画得超出了自己的区域&#xff0c;超出的部分不会出现在截图中。截图不会拍到与当前组件平级的&#xff08;兄弟&#xff09;组件。 模块简…

ROS2 应用:按键控制 MoveIt2 中 Panda 机械臂关节位置

视频讲解 ROS2 应用&#xff1a;按键控制 MoveIt2 中 Panda 机械臂关节位置 创建 ROS 2 包 进入工作空间的 src 目录&#xff0c;然后创建一个新的 Python 包&#xff1a; ros2 pkg create --build-type ament_python panda_joint_control --dependencies rclpy control_msgs…

OpenBMC:BmcWeb定义service

1.定义service //src\webserver_run.cppint run() {...std::shared_ptr<sdbusplus::asio::connection> systemBus std::make_shared<sdbusplus::asio::connection>(io);crow::connections::systemBus systemBus.get();auto server sdbusplus::asio::object_serv…

python组备赛笔记(基础篇)

小数输出 1、代码示例&#xff1a; print(jc,%.3f%fc) 输出格式&#xff1a; 890 86075.959 2、代码示例&#xff1a; print(f%.3f%b) 输出格式&#xff1a; 1.000 金字塔输出 1、代码示例&#xff1a; n 5 t * for i in range(1,n1):print(f{t*(2*i-1):^{2*n-1}}) fo…