colossalai

Форк
0
/
detached_replay_buffer.py 
70 строк · 2.5 Кб
1
from typing import List
2

3
import torch
4
from coati.experience_buffer.utils import BufferItem, make_experience_batch, split_experience_batch
5
from coati.experience_maker.base import Experience
6

7
# from torch.multiprocessing import Queue
8
from ray.util.queue import Queue
9

10

11
class DetachedReplayBuffer:
12
    """
13
        Detached replay buffer. Share Experience across workers on the same node.
14
        Therefore, a trainer node is expected to have only one instance.
15
        It is ExperienceMakerHolder's duty to call append(exp) method, remotely.
16

17
    Args:
18
        sample_batch_size: Batch size when sampling. Exp won't enqueue until they formed a batch.
19
        tp_world_size: Number of workers in the same tp group
20
        limit: Limit of number of experience sample BATCHs. A number <= 0 means unlimited. Defaults to 0.
21
        cpu_offload: Whether to offload experience to cpu when sampling. Defaults to True.
22
    """
23

24
    def __init__(self, sample_batch_size: int, limit: int = 0) -> None:
25
        self.sample_batch_size = sample_batch_size
26
        self.limit = limit
27
        self.items = Queue(self.limit, actor_options={"num_cpus": 1})
28
        self.batch_collector: List[BufferItem] = []
29

30
    @torch.no_grad()
31
    def append(self, experience: Experience) -> None:
32
        """
33
        Expected to be called remotely.
34
        """
35
        items = split_experience_batch(experience)
36
        self.extend(items)
37

38
    @torch.no_grad()
39
    def extend(self, items: List[BufferItem]) -> None:
40
        """
41
        Expected to be called remotely.
42
        """
43
        self.batch_collector.extend(items)
44
        while len(self.batch_collector) >= self.sample_batch_size:
45
            items = self.batch_collector[: self.sample_batch_size]
46
            experience = make_experience_batch(items)
47
            self.items.put(experience, block=True)
48
            self.batch_collector = self.batch_collector[self.sample_batch_size :]
49

50
    def clear(self) -> None:
51
        # self.items.close()
52
        self.items.shutdown()
53
        self.items = Queue(self.limit)
54
        self.worker_state = [False] * self.tp_world_size
55
        self.batch_collector = []
56

57
    @torch.no_grad()
58
    def sample(self, worker_rank=0, to_device="cpu") -> Experience:
59
        ret = self._sample_and_erase()
60
        ret.to_device(to_device)
61
        return ret
62

63
    @torch.no_grad()
64
    def _sample_and_erase(self) -> Experience:
65
        ret = self.items.get(block=True)
66
        return ret
67

68
    def get_length(self) -> int:
69
        ret = self.items.qsize()
70
        return ret
71

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.