when-to-switch

Форк
0
/
replan_algo.py 
145 строк · 5.2 Кб
1
import numpy as np
2
import cppimport.import_hook
3
from planning.planner import planner
4
from pogema import GridConfig
5

6
from planning.astar_no_grid import AStar, INF
7

8

9
class RePlanBase:
10
    def __init__(self, use_best_move: bool = True, max_steps: int = INF, algo_source='c++', seed=None,
11
                 ignore_other_agents=False):
12

13
        self.use_best_move = use_best_move
14
        gc: GridConfig = GridConfig()
15

16
        self.actions = {tuple(gc.MOVES[i]): i for i in range(len(gc.MOVES))}
17
        self.steps = 0
18

19
        if algo_source == 'c++':
20
            self.algo_source = planner
21
        else:
22
            self.algo_source = AStar
23
        self.planner = None
24
        self.max_steps = max_steps
25
        self.previous_positions = None
26
        self.rnd = np.random.default_rng(seed)
27
        self.ignore_other_agents = ignore_other_agents
28

29
    def act(self, obs, skip_agents=None):
30
        num_agents = len(obs)
31
        if self.planner is None:
32
            self.planner = [self.algo_source(self.max_steps) for _ in range(num_agents)]
33
        if self.previous_positions is None:
34
            self.previous_positions = [[] for _ in range(num_agents)]
35
        obs_radius = len(obs[0]['obstacles']) // 2
36
        action = []
37

38
        for k in range(num_agents):
39
            self.previous_positions[k].append(obs[k]['xy'])
40
            if obs[k]['xy'] == obs[k]['target_xy']:
41
                action.append(None)
42
                continue
43
            obstacles = np.transpose(np.nonzero(obs[k]['obstacles']))
44
            if self.ignore_other_agents:
45
                other_agents = None
46
            else:
47
                other_agents = np.transpose(np.nonzero(obs[k]['agents']))
48
            self.planner[k].update_obstacles(obstacles, other_agents,
49
                                             (obs[k]['xy'][0] - obs_radius, obs[k]['xy'][1] - obs_radius))
50

51
            if skip_agents and skip_agents[k]:
52
                action.append(None)
53
                continue
54

55
            self.planner[k].update_path(obs[k]['xy'], obs[k]['target_xy'])
56
            path = self.planner[k].get_next_node(self.use_best_move)
57
            if path is not None and path[1][0] < INF:
58
                action.append(self.actions[(path[1][0] - path[0][0], path[1][1] - path[0][1])])
59
            else:
60
                action.append(None)
61
        self.steps += 1
62
        return action
63

64
    def get_path(self):
65
        results = []
66
        for idx in range(len(self.planner)):
67
            results.append(self.planner[idx].get_path(use_best_node=False))
68
        return results
69

70

71
class FixNonesWrapper:
72

73
    def __init__(self, agent):
74
        self.agent = agent
75
        self.rnd = self.agent.rnd
76
        # self.env = agent.env
77

78
    def act(self, obs, skip_agents=None):
79
        actions = self.agent.act(obs, skip_agents=skip_agents)
80
        for idx in range(len(actions)):
81
            if actions[idx] is None:
82
                actions[idx] = 0
83
        return actions
84

85

86
class NoPathSoRandomOrStayWrapper:
87

88
    def __init__(self, agent):
89
        self.agent = agent
90
        self.rnd = self.agent.rnd
91

92
    def act(self, obs, skip_agents=None):
93
        actions = self.agent.act(obs, skip_agents=skip_agents)
94
        for idx in range(len(actions)):
95
            if actions[idx] is None:
96
                if self.rnd.random() <= 0.5:
97
                    actions[idx] = 0
98
                else:
99
                    actions[idx] = self.get_random_move(obs, idx)
100
        return actions
101

102
    def get_random_move(self, obs, agent_id):
103
        deltas = GridConfig().MOVES
104
        actions = [1, 2, 3, 4]
105

106
        self.agent.rnd.shuffle(actions)
107
        for idx in actions:
108
            i = len(obs[agent_id]['obstacles']) // 2 + deltas[idx][0]
109
            j = len(obs[agent_id]['obstacles']) // 2 + deltas[idx][1]
110
            if obs[agent_id]['obstacles'][i][j] == 0:
111
                return idx
112
        return 0
113

114

115
class FixLoopsWrapper(NoPathSoRandomOrStayWrapper):
116
    def __init__(self, agent, stay_if_loop_prob=None, add_none_if_loop=False):
117
        super().__init__(agent)
118
        self.rnd = self.agent.rnd
119
        self.previous_positions = None
120
        self.stay_if_loop_prob = stay_if_loop_prob if stay_if_loop_prob else 0.5
121
        self.add_none_if_loop = add_none_if_loop
122

123
    def act(self, obs, skip_agents=None):
124
        num_agents = len(obs)
125
        if self.previous_positions is None:
126
            self.previous_positions = [[] for _ in range(num_agents)]
127

128
        actions = self.agent.act(obs, skip_agents=skip_agents)
129
        for idx in range(len(actions)):
130
            if actions[idx] is None:
131
                continue
132
            path = self.previous_positions[idx]
133
            if len(path) > 1:
134
                next_step = obs[idx]['xy']
135
                dx, dy = GridConfig().MOVES[actions[idx]]
136
                next_pos = dx + next_step[0], dy + next_step[1]
137
                if path[-1] == next_pos or path[-2] == next_pos:
138
                    if self.add_none_if_loop:
139
                        actions[idx] = None
140
                    elif next_pos == next_step:
141
                        actions[idx] = self.get_random_move(obs, idx)
142
                    elif self.rnd.random() < self.stay_if_loop_prob:
143
                        actions[idx] = 0
144
            self.previous_positions[idx].append(obs[idx]['xy'])
145
        return actions
146

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.