colossalai
70 строк · 2.5 Кб
1from typing import List
2
3import torch
4from coati.experience_buffer.utils import BufferItem, make_experience_batch, split_experience_batch
5from coati.experience_maker.base import Experience
6
7# from torch.multiprocessing import Queue
8from ray.util.queue import Queue
9
10
11class DetachedReplayBuffer:
12"""
13Detached replay buffer. Share Experience across workers on the same node.
14Therefore, a trainer node is expected to have only one instance.
15It is ExperienceMakerHolder's duty to call append(exp) method, remotely.
16
17Args:
18sample_batch_size: Batch size when sampling. Exp won't enqueue until they formed a batch.
19tp_world_size: Number of workers in the same tp group
20limit: Limit of number of experience sample BATCHs. A number <= 0 means unlimited. Defaults to 0.
21cpu_offload: Whether to offload experience to cpu when sampling. Defaults to True.
22"""
23
24def __init__(self, sample_batch_size: int, limit: int = 0) -> None:
25self.sample_batch_size = sample_batch_size
26self.limit = limit
27self.items = Queue(self.limit, actor_options={"num_cpus": 1})
28self.batch_collector: List[BufferItem] = []
29
30@torch.no_grad()
31def append(self, experience: Experience) -> None:
32"""
33Expected to be called remotely.
34"""
35items = split_experience_batch(experience)
36self.extend(items)
37
38@torch.no_grad()
39def extend(self, items: List[BufferItem]) -> None:
40"""
41Expected to be called remotely.
42"""
43self.batch_collector.extend(items)
44while len(self.batch_collector) >= self.sample_batch_size:
45items = self.batch_collector[: self.sample_batch_size]
46experience = make_experience_batch(items)
47self.items.put(experience, block=True)
48self.batch_collector = self.batch_collector[self.sample_batch_size :]
49
50def clear(self) -> None:
51# self.items.close()
52self.items.shutdown()
53self.items = Queue(self.limit)
54self.worker_state = [False] * self.tp_world_size
55self.batch_collector = []
56
57@torch.no_grad()
58def sample(self, worker_rank=0, to_device="cpu") -> Experience:
59ret = self._sample_and_erase()
60ret.to_device(to_device)
61return ret
62
63@torch.no_grad()
64def _sample_and_erase(self) -> Experience:
65ret = self.items.get(block=True)
66return ret
67
68def get_length(self) -> int:
69ret = self.items.qsize()
70return ret
71