-
Notifications
You must be signed in to change notification settings - Fork 0
/
env.py
410 lines (313 loc) · 13 KB
/
env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Classes for python environments suitable for reinforcement learning with
the tf_agents library.
"""
import random
# import tensorflow as tf
import numpy as np
# unused imports are sometimes used in the console while developing
from tf_agents.environments import py_environment
# from tf_agents.environments import tf_environment
# from tf_agents.environments import tf_py_environment
from tf_agents.environments import utils
from tf_agents.specs import array_spec
# from tf_agents.environments import wrappers
from tf_agents.trajectories import time_step as ts
class PyEnv2048(py_environment.PyEnvironment):
"""
2048 as a tf_agents.environments.py_environment.PyEnvironment object
Handles all the game logic.
Can be turned into a TensorFlow environment using the TFPyEnvironment
wrapper.
Implements variable negative rewards for actions
that don't change the state of the game,
and an adjustable reward multiplier.
Setting these to 0 and 1, respectively, results in behavior
identical to the original game.
The reward multiplier is applied only to positive rewards,
not punishments.
"""
def __init__(self, neg_reward=0, reward_multiplier=1):
"""
Initializes instance variables
"""
# Specs
self._action_spec = array_spec.BoundedArraySpec(
shape=(), dtype=np.int32, minimum=0, maximum=3, name='action')
self._observation_spec = array_spec.BoundedArraySpec(
shape=(4,4), dtype=np.int64, minimum=0, name='observation')
# Initializes the game board, a numpy.ndarray
self._state = np.zeros(shape=(4,4), dtype=np.int64)
# with two starting twos in random locations
a, b = random.sample([(x,y) for x in range(4) for y in range(4)], 2)
self._state[a[0]][a[1]] = 2
self._state[b[0]][b[1]] = 2
self._episode_ended = False # Whether the game is over or not
self._neg_reward = neg_reward
self._reward_multiplier = reward_multiplier
def action_spec(self):
return self._action_spec
def observation_spec(self):
return self._observation_spec
def _reset(self):
"""
Resets the environment, restarts the game.
"""
# Grid with two initial values
self._state = np.zeros(shape=(4,4), dtype=np.int64)
a, b = random.sample([(x,y) for x in range(4) for y in range(4)], 2)
self._state[a[0]][a[1]] = 2
self._state[b[0]][b[1]] = 2
self._episode_ended = False
# Returns "restart" TimeStep with the state of the game
return ts.restart(self._state)
def __gameover(self):
"""
Checks if the game is over
"""
# Return false if there are empty tiles left
if not self._state.all():
return False
# Checks if any tiles can be merged
for y in range(4):
for x in range(4):
# Only checks right and down, since if it can't be merged
# down it can't be merged up either, and same with
# right and left. All tiles are checked anyway.
# Excepts IndexError, you can't match out of bounds anyway.
try:
# Checks down
if self._state[y][x] == self._state[y+1][x]:
# If merge is possible, game is not over
return False
except IndexError:
pass
try:
# Checks right
if self._state[y][x] == self._state[y][x+1]:
return False
except IndexError:
pass
# If this point has been reached, all tiles have been checked for
# merges, and no possible merges have been found.
self._episode_ended = True
return True
def __new_tile(self):
"""
Creates a new tile on the board.
The returns are not used for anything right now, but they could be,
so I'm leaving them in.
"""
# If there are no empty tiles, return False
# Also creates list of all indices of empty tiles (empty)
if not (empty := np.argwhere(self._state == 0)).any():
return False
# 90% chance that the new tile is 2, otherwise 4
if random.random() < 0.9:
new = 2
else:
new = 4
# Set a random empty tile to the new value
y, x = random.choice(empty)
self._state[y][x] = new
# bool((x,y)) == True, returns "True" if a new tile was created.
return (x, y)
def _step(self, action):
"""
Expects action in (0, 1, 2, 3)
Accepts tf_agents.trajectories.policy_step.PolicyStep.action
from both TF and Py policies
"""
# Reset if episode is over
# This code is unreachable, but it is in the tutorial for some reason,
# so I put it here as well, maybe it has some use I don't know of?
if self._episode_ended:
return self.reset()
# List for tiles already merged this move
# tiles should not be merged twice
merged = []
reward = 0 # Cumulative reward for all merges
moved = False # Whether the board changed this move
# Performs move based on action:
# move up
if action == 0:
# Starts at the top (0,0), moving down and right,
# Loops through all tiles
for y in range(4):
for x in range(4):
# only moves non-zero tiles
if (tile_value := self._state[y][x]) != 0:
new_y = y
# Moves the tile up as far as it can go
while new_y > 0 and self._state[new_y-1][x] == 0:
new_y -= 1
# Checks if the tile can be merged, and merges
if new_y > 0 \
and tile_value == self._state[new_y-1][x] \
and (new_y - 1, x) not in merged:
# Sets the old location to 0
self._state[y][x] = 0
# Doubles the new location (merge)
self._state[new_y-1][x] *= 2
# Appends tile to merged list
merged.append((new_y-1, x))
# Adds reward
reward += tile_value * 2
# Sets moved to True
moved = True
# If it can not be merged, just moves it
elif new_y != y:
# Sets old location to 0
self._state[y][x] = 0
# Sets new location to the value of the tile
self._state[new_y][x] = tile_value
moved = True
# move right
elif action == 1:
for y in range(4):
# Start at the far right
for x in range(3, -1, -1):
if (tile_value := self._state[y][x]) != 0:
new_x = x
while new_x < 3 and self._state[y][new_x+1] == 0:
new_x += 1
if new_x < 3 \
and tile_value == self._state[y][new_x + 1] \
and (y, new_x + 1) not in merged:
self._state[y][x] = 0
self._state[y][new_x+1] *= 2
merged.append((y, new_x+1))
reward += tile_value * 2
moved = True
elif new_x != x:
self._state[y][x] = 0
self._state[y][new_x] = tile_value
moved = True
# move down
elif action == 2:
# start at the bottom
for y in range(3, -1, -1):
for x in range(4):
if (tile_value := self._state[y][x]) != 0:
new_y = y
while new_y < 3 and self._state[new_y+1][x] == 0:
new_y += 1
if new_y < 3 \
and tile_value == self._state[new_y+1][x] \
and (new_y + 1, x) not in merged:
self._state[y][x] = 0
self._state[new_y+1][x] *= 2
merged.append((new_y+1, x))
reward += tile_value * 2
moved = True
elif new_y != y:
self._state[y][x] = 0
self._state[new_y][x] = tile_value
moved = True
# move left
elif action == 3:
for y in range(4):
# start at the far left
for x in range(4):
if (tile_value := self._state[y][x]) != 0:
new_x = x
while new_x > 0 and self._state[y][new_x-1] == 0:
new_x -= 1
if new_x > 0 \
and tile_value == self._state[y][new_x-1] \
and (y, new_x - 1) not in merged:
self._state[y][x] = 0
self._state[y][new_x - 1] *= 2
merged.append((y, new_x - 1))
reward += tile_value * 2
moved = True
elif new_x != x:
self._state[y][x] = 0
self._state[y][new_x] = tile_value
moved = True
# If moved, add new tile and applies reward multiplier
if moved:
self.__new_tile()
reward *= self._reward_multiplier
else:
# If not moved, applies punishment
reward = - self._neg_reward
# Check whether game has ended
if self._episode_ended or self.__gameover():
# Returns "termination" TimeStep with current state and reward
return ts.termination(self._state, reward)
# If the game has not ended, returns "transiton" TimeStep
return ts.transition(self._state, reward)
class PyEnv2048FlatObservations(PyEnv2048):
"""
The same as PyEnv2048 but the observation has
shape (16,) instead of (4,4)
"""
def __init__(self, neg_reward=0, reward_multiplier=1):
"""
Calls __init__ from PyEnv2048 and redefines observation spec
to reflect the new shape.
"""
super().__init__(neg_reward, reward_multiplier)
self._observation_spec = array_spec.BoundedArraySpec(
shape=(16,), dtype=np.int64, minimum=0, name='observation')
def _step(self, action):
"""
Gets the TimeStep from PyEnv2048._step and then returns another
with the same content, but the observation array is flattened
"""
time_step = super()._step(action)
return ts.TimeStep(
step_type=time_step.step_type,
reward=time_step.reward,
discount=time_step.discount,
observation=time_step.observation.flatten())
def _reset(self):
"""
Gets the TimeStep from PyEnv2048._reset and then returns another
with the same content, but the observation array is flattened
"""
time_step = super()._reset()
return ts.TimeStep(
step_type=time_step.step_type,
reward=time_step.reward,
discount=time_step.discount,
observation=time_step.observation.flatten())
class PyEnv2048NoBadActions(PyEnv2048):
"""
Maps bad actions (that don't change the environment) to the next
available action. Doesn't punish such actions.
"""
def __init__(self, reward_multiplier=1):
super().__init__(1, reward_multiplier)
def _step(self, action):
time_step = super()._step(action)
while time_step.reward == -1 and not time_step.is_last():
action = (action + 1) % 4
time_step = super()._step(action)
return time_step
if __name__ == "__main__":
# Here are some basic tests
try:
environment = PyEnv2048()
utils.validate_py_environment(environment, episodes=5)
except:
raise
else:
print("No exceptions :)")
try:
environment = PyEnv2048FlatObservations()
utils.validate_py_environment(environment, episodes=5)
except:
raise
else:
print("No exceptions :)")
try:
environment = PyEnv2048NoBadActions()
utils.validate_py_environment(environment, episodes=5)
except:
raise
else:
print("No exceptions :)")