-
Notifications
You must be signed in to change notification settings - Fork 0
/
envs.py
305 lines (241 loc) · 10.9 KB
/
envs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
"""
Maze environment used in the first three modules of the course:
"Beginner to Master: Reinforcement Learning".
"""
from typing import Tuple, Dict, Optional, Iterable
import numpy as np
import gymnasium as gym
from gym import spaces
from gym.error import DependencyNotInstalled
import pygame
from pygame import gfxdraw
class Maze(gym.Env):
"""
Description:
The environment consists of a grid of (size x size) positions. The agent
starts the episode in location (row=0, col=0) if the environment is instantiated
without exploring starts or from a random location (different from the goal)
if it is. The goal is always at (row=size-1, col=size-1).
Observation:
Type: MultiDiscrete(2)
Num Observation Min Max
0 row coordinate 0 size-1
1 col coordinate 0 size-1
Actions:
Type: Discrete(4)
Num Action
0 Move up
1 Move right
2 Move down
3 Move left
Reward:
If the environment is instantiated with shaped rewards, then at each time step
the agent will receive a reward signal of the following magnitude:
r = - steps_to_goal(current_state) / steps_to_goal(furthest_state)
This ensures that the maximum reward awarded will be -1.0 and that at the goal
the reward will be 0.
If the environment is instantiated without shaped rewards, then at each time step
the agent will receive a reward of -1.0 until it reaches the goal.
Episode termination:
The episode terminates when the agent reaches the goal state.
"""
def __init__(self, exploring_starts: bool = False,
shaped_rewards: bool = False, size: int = 5) -> None:
"""
Initialize the environment.
Args:
exploring_starts: should the agent restart at a random location or not.
shaped_rewards: should the environment shape the rewards.
size: size of the maze. Will be of shape (size x size).
"""
super().__init__()
self.exploring_starts = exploring_starts
self.shaped_rewards = shaped_rewards
self.state = (size - 1, size - 1)
self.goal = (size - 1, size - 1)
self.maze = self._create_maze(size=size)
self.distances = self._compute_distances(self.goal, self.maze)
self.action_space = spaces.Discrete(n=4)
self.action_space.action_meanings = {0: 'UP', 1: 'RIGHT', 2: 'DOWN', 3: "LEFT"}
self.observation_space = spaces.MultiDiscrete([size, size])
self.screen = None
self.agent_transform = None
def step(self, action: int) -> Tuple[Tuple[int, int], float, bool, Dict]:
"""
Take an action in the environment and observe the next transition.
Args:
action: An indicator of the action to be taken.
Returns:
The next transition.
"""
reward = self.compute_reward(self.state, action)
self.state = self._get_next_state(self.state, action)
done = self.state == self.goal
info = {}
return self.state, reward, done, info
def reset(self) -> Tuple[int, int]:
"""
Reset the environment to execute a new episode.
Returns: State representing the initial position of the agent.
"""
if self.exploring_starts:
while self.state == self.goal:
self.state = tuple(self.observation_space.sample())
else:
self.state = (0, 0)
return self.state
def render(self, mode: str = 'human') -> Optional[np.ndarray]:
"""
Render a state of the environment.
Args:
mode: one of 'human' or 'rgb_array'. Human added only for compatibility.
All rendering will be done in 'rgb arrays' via NumPy.
Returns:
A numpy.ndarray or None.
"""
assert mode in ['human', 'rgb_array']
screen_size = 600
scale = screen_size / 5
if self.screen is None:
pygame.init()
self.screen = pygame.Surface((screen_size, screen_size))
surf = pygame.Surface((screen_size, screen_size))
surf.fill((22, 36, 71))
for row in range(5):
for col in range(5):
state = (row, col)
for next_state in [(row + 1, col), (row - 1, col), (row, col + 1), (row, col - 1)]:
if next_state not in self.maze[state]:
# Add the geometry of the edges and walls (i.e. the boundaries between
# adjacent squares that are not connected).
row_diff, col_diff = np.subtract(next_state, state)
left = (col + (col_diff > 0)) * scale - 2 * (col_diff != 0)
right = ((col + 1) - (col_diff < 0)) * scale + 2 * (col_diff != 0)
top = (5 - (row + (row_diff > 0))) * scale - 2 * (row_diff != 0)
bottom = (5 - ((row + 1) - (row_diff < 0))) * scale + 2 * (row_diff != 0)
gfxdraw.filled_polygon(surf, [(left, bottom), (left, top), (right, top), (right, bottom)], (255, 255, 255))
# Add the geometry of the goal square to the viewer.
left, right, top, bottom = scale * 4 + 10, scale * 5 - 10, scale - 10, 10
gfxdraw.filled_polygon(surf, [(left, bottom), (left, top), (right, top), (right, bottom)], (40, 199, 172))
# Add the geometry of the agent to the viewer.
agent_row = int(screen_size - scale * (self.state[0] + .5))
agent_col = int(scale * (self.state[1] + .5))
gfxdraw.filled_circle(surf, agent_col, agent_row, int(scale * .6 / 2), (228, 63, 90))
surf = pygame.transform.flip(surf, False, True)
self.screen.blit(surf, (0, 0))
return np.transpose(
np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2)
)
def close(self) -> None:
"""
Clean up resources before shutting down the environment.
Returns: None.
"""
if self.screen is not None:
pygame.display.quit()
pygame.quit()
self.screen = None
def compute_reward(self, state: Tuple[int, int], action: int) -> float:
"""
Compute the reward attained by taking action 'a' at state 's'.
Args:
state: the state of the agent prior to taking the action.
action: the action taken by the agent.
Returns:
A float representing the reward signal received by the agent.
"""
next_state = self._get_next_state(state, action)
if self.shaped_rewards:
return - (self.distances[next_state] / self.distances.max())
return - float(state != self.goal)
def simulate_step(self, state: Tuple[int, int], action: int):
"""
Simulate (without taking) a step in the environment.
Args:
state: the state of the agent prior to taking the action.
action: the action to simulate the step with.
Returns:
The next transition.
"""
reward = self.compute_reward(state, action)
next_state = self._get_next_state(state, action)
done = next_state == self.goal
info = {}
return next_state, reward, done, info
def _get_next_state(self, state: Tuple[int, int], action: int) -> Tuple[int, int]:
"""
Gets the next state after the agent performs action 'a' in state 's'. If there is a
wall in the way, the next state will be the same as the current.
Args:
state: current state (before taking the action).
action: move performed by the agent.
Returns: a State instance representing the new state.
"""
if action == 0:
next_state = (state[0] - 1, state[1])
elif action == 1:
next_state = (state[0], state[1] + 1)
elif action == 2:
next_state = (state[0] + 1, state[1])
elif action == 3:
next_state = (state[0], state[1] - 1)
else:
raise ValueError("Action value not supported:", action)
if next_state in self.maze[state]:
return next_state
return state
@staticmethod
def _create_maze(size: int) -> Dict[Tuple[int, int], Iterable[Tuple[int, int]]]:
"""
Creates a representation of the maze as a dictionary where the keys are
the states available to the agent and the values are lists of adjacent
states.
Args:
size: number of elements of each side in the square grid.
Returns: the adjacency list dictionary.
"""
maze = {(row, col): [(row - 1, col), (row + 1, col), (row, col - 1), (row, col + 1)]
for row in range(size) for col in range(size)}
left_edges = [[(row, 0), (row, -1)] for row in range(size)]
right_edges = [[(row, size - 1), (row, size)] for row in range(size)]
upper_edges = [[(0, col), (-1, col)] for col in range(size)]
lower_edges = [[(size - 1, col), (size, col)] for col in range(size)]
walls = [
[(1, 0), (1, 1)], [(2, 0), (2, 1)], [(3, 0), (3, 1)],
[(1, 1), (1, 2)], [(2, 1), (2, 2)], [(3, 1), (3, 2)],
[(3, 1), (4, 1)], [(0, 2), (1, 2)], [(1, 2), (1, 3)],
[(2, 2), (3, 2)], [(2, 3), (3, 3)], [(2, 4), (3, 4)],
[(4, 2), (4, 3)], [(1, 3), (1, 4)], [(2, 3), (2, 4)],
]
obstacles = upper_edges + lower_edges + left_edges + right_edges + walls
for src, dst in obstacles:
maze[src].remove(dst)
if dst in maze:
maze[dst].remove(src)
return maze
@staticmethod
def _compute_distances(goal: Tuple[int, int],
maze: Dict[Tuple[int, int], Iterable[Tuple[int, int]]]) -> np.ndarray:
"""
Compute the distance to the goal from all other positions in the maze using Dijkstra's
algorithm.
Args:
goal: A tuple representing the location of the goal in a two-dimensional grid.
maze: A dictionary holding the adjacency lists of all locations in the
two-dimensional grid.
Returns: A (H x W) numpy array holding the minimum number of moves for each position
to reach the goal.
"""
distances = np.full((5, 5), np.inf)
visited = set()
distances[goal] = 0.
while visited != set(maze):
sorted_dst = [(v // 5, v % 5) for v in distances.argsort(axis=None)]
closest = next(x for x in sorted_dst if x not in visited)
visited.add(closest)
for neighbour in maze[closest]:
distances[neighbour] = min(distances[neighbour], distances[closest] + 1)
return distances
env = Maze()
env.reset()
env.render(mode='rgb_array')