-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdq_learning.py
122 lines (91 loc) · 4.07 KB
/
dq_learning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""Double Q-learning model to operate on the gymnasium toy-text frozen-lake environment"""
import gymnasium as gym
from gymnasium.envs.toy_text.frozen_lake import generate_random_map
import numpy as np
from learning_common import *
def q_learning(env, grid_size: int, world: list) -> dict[int:int]:
"""Simple implementation of Q-Learning"""
num_actions = 4
learning_rate = 0.2
gamma = 0.995
num_episodes = 30_000
lr_step = (learning_rate - 0.01) / num_episodes
states = list(range(0, grid_size**2))
# Set up the value tables and the random starting policies (we need 2 for double Q-learning)
value_table_a = {}
value_table_b = {}
count_table = {} # To keep track of how many times we've visited a state-action pair
policy = {}
for s in states:
for a in range(grid_size):
value_table_a[(s, a)] = 1 # Optimistic initialisation
value_table_b[(s, a)] = 1 # Optimistic initialisation
count_table[(s, a)] = 1
policy[s] = np.random.choice(num_actions)
# Training loop
episode_count = 0
while episode_count < num_episodes:
# Get starting state
if episode_count % 1000 == 0:
print(f"Starting episode: {episode_count}")
if (episode_count - 1) % 10000 == 0 and episode_count > 1000:
print(f"Rendering policy at episode {episode_count}.")
policy_test(world, policy)
current_state = env.reset()[0]
learning_rate -= lr_step # Necessary to reduce learning rate over time
step = 1
while True:
# Get UCB action
action = get_best_ucb_action(value_table_a, current_state, count_table, step, 2, value_table_b)
# Update the count table
count_table[(current_state, action)] += 1
new_state, reward, terminated, truncated, _ = env.step(action)
done = 1 if terminated or truncated else 0
# Update the value tables with 50% probability
if np.random.rand() < 0.5:
value_table_a[(current_state, action)] = (
value_table_a[(current_state, action)] + learning_rate * (
reward + (1 - done) * gamma * value_table_b[(new_state, get_max_value_action(value_table_a, new_state))] -
value_table_a[(current_state, action)]
))
else:
value_table_b[(current_state, action)] = (
value_table_b[(current_state, action)] + learning_rate * (
reward + (1 - done) * gamma * value_table_a[(new_state, get_max_value_action(value_table_b, new_state))] -
value_table_b[(current_state, action)]
))
if done:
break
current_state = new_state
step += 1
episode_count += 1
# Update the policy for use later
for s in states:
best_action = get_max_value_action(value_table_a, s, value_table_b)
policy[s] = best_action
return policy
def policy_test(world, policy):
test_env = gym.make('FrozenLake-v1', desc=world, render_mode="human", is_slippery=True)
current_state = test_env.reset()[0]
while True:
new_state, reward, end, trunc, _ = test_env.step(policy[current_state])
current_state = new_state
test_env.render()
if end or trunc:
break
test_env.close()
def main(grid_size: int = 8):
"""Set up the world and try to learn"""
world = generate_random_map(size=grid_size)
print(f"Let's learn this new ice world!\n{print_world(world)}\n")
env = gym.make('FrozenLake-v1', desc=world, is_slippery=True)
the_policy = q_learning(env, grid_size, world)
print(f"The world we learnt on is: ")
print_world(world)
print(f"\nThe new policy is: ")
print_policy(the_policy, grid_size)
env.close()
# Let's test the new policy
policy_test(world, the_policy)
if __name__ == "__main__":
main(8)