colossalai
71 строка · 2.8 Кб
1import torch
2import torch.nn.functional as F
3from coati.models.base import Actor, Critic, RewardModel
4from coati.models.generation import generate
5from coati.models.utils import calc_action_log_probs, compute_reward
6from transformers import PreTrainedTokenizer
7
8from .base import Experience, ExperienceMaker
9
10
11class NaiveExperienceMaker(ExperienceMaker):
12"""
13Naive experience maker.
14"""
15
16def __init__(
17self,
18actor: Actor,
19critic: Critic,
20reward_model: RewardModel,
21initial_model: Actor,
22tokenizer: PreTrainedTokenizer,
23kl_coef: float = 0.1,
24) -> None:
25super().__init__(actor, critic, reward_model, initial_model)
26self.tokenizer = tokenizer
27self.kl_coef = kl_coef
28
29@torch.no_grad()
30def make_experience(self, input_ids: torch.Tensor, **generate_kwargs) -> Experience:
31self.actor.eval()
32self.critic.eval()
33self.initial_model.eval()
34self.reward_model.eval()
35
36# generate sequences
37sequences = generate(self.actor, input_ids, self.tokenizer, **generate_kwargs)
38
39# calculate auxiliary tensors
40attention_mask = None
41pad_token_id = self.tokenizer.pad_token_id
42if pad_token_id is not None:
43attention_mask = sequences.not_equal(pad_token_id).to(dtype=torch.long, device=sequences.device)
44
45input_len = input_ids.size(1)
46eos_token_id = self.tokenizer.eos_token_id
47if eos_token_id is None:
48action_mask = torch.ones_like(sequences, dtype=torch.bool)
49else:
50# left padding may be applied, only mask action
51action_mask = (sequences[:, input_len:] == eos_token_id).cumsum(dim=-1) == 0
52action_mask = F.pad(action_mask, (1 + input_len, -1), value=True) # include eos token and input
53action_mask[:, :input_len] = False
54action_mask = action_mask[:, 1:]
55action_mask = action_mask[:, -(sequences.size(1) - input_len) :]
56num_actions = action_mask.size(1)
57
58actor_output = self.actor(sequences, attention_mask)["logits"]
59action_log_probs = calc_action_log_probs(actor_output, sequences, num_actions)
60base_model_output = self.initial_model(sequences, attention_mask)["logits"]
61base_action_log_probs = calc_action_log_probs(base_model_output, sequences, num_actions)
62value = self.critic(sequences, attention_mask)
63r = self.reward_model(sequences, attention_mask)
64reward = compute_reward(r, self.kl_coef, action_log_probs, base_action_log_probs, action_mask=action_mask)
65
66advantage = reward - value
67# TODO(ver217): maybe normalize adv
68if advantage.ndim == 1:
69advantage = advantage.unsqueeze(-1)
70
71return Experience(sequences, action_log_probs, value, reward, advantage, attention_mask, action_mask)
72