-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathq.py
125 lines (93 loc) · 3.78 KB
/
q.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import numpy as np
from policy import EpsilonGreedyPolicy, GreedyPolicy
class Q(EpsilonGreedyPolicy):
"""Docstring for QLearning. """
def __init__(self, n_states, n_actions, epsilon, gamma, alpha):
"""TODO: to be defined1.
Parameters
----------
env : TODO
epsilon : TODO
gamma : TODO
n_trials : TODO
n_episodes : TODO
max_iter : TODO
"""
super().__init__(self, n_states, n_actions, epsilon)
self._gamma = gamma
self._alpha = alpha
def update_Q(self, s, a, r, s_next):
Q = self.Q
best_a = self.choose_action(s_next)
td_target = r + self._gamma * Q[s_next, best_a]
td_error = td_target - Q[s, a]
Q_sa_new = Q[s, a] + self._alpha * td_error
self.update_Q_val(s, a, Q_sa_new)
if __name__ == "__main__":
"""
example of using QLearning
"""
import gym
env = gym.envs.make("MountainCarContinuous-v0")
n_states = 100
n_actions = 100
n_trials = 1
n_episodes = 100
max_iter = 1000
gamma = 0.80
alpha = 0.01
Q = Q(n_states, n_actions, epsilon, gamma, alpha)
"""
for now, suppose env is openai gym
"""
# Loop over some number of episodes
state_count = env.num_states
action_count = env.num_actions
reward_per_episode = np.zeros((trial_count, episode_min_count))
reward_per_step = np.zeros((trial_count, global_min_iter_count))
# episode gets terminated when past local_max
trial_lengths = []
for trial_idx in range(trial_count):
# Initialize the Q table
Q_table = np.zeros((state_count, action_count))
transition_count_table = np.zeros((state_count, state_count))
reward_value_table = np.zeros((state_count))
global_iter_idx = 0
# Loop until the episode is done
for episode_idx in range(episode_min_count):
# print('episode count {}'. format(episode_idx))
# print('global iter count {}'. format(global_iter_idx))
local_iter_idx = 0
# Start the env
env.reset()
state = env.observe()
action = policy(state, Q_table, action_count, epsilon)
episode_reward_list = []
# Loop until done
while local_iter_idx < local_max_iter_count:
# print('local iter count {}'. format(local_iter_idx))
new_state, reward = env.perform_action(action)
new_action = policy(new_state, Q_table, action_count, epsilon)
# FILL IN HERE: YOU WILL NEED CASES FOR THE DIFFERENT ALGORITHMS
if 'alpha' in hyperparams:
alpha = hyperparams['alpha'](hyperparams, np.cbrt(global_iter_idx+1))
Q_table = update_Q_Qlearning(Q_table, alpha, gamma, state, action, reward, new_state)
# store the data
episode_reward_list.append(reward)
if global_iter_idx < global_min_iter_count:
reward_per_step[trial_idx, global_iter_idx] = reward
# stop if at goal/else update for the next iteration
if env.is_terminal(state):
break
else:
state = new_state
action = new_action
local_iter_idx += 1
global_iter_idx += 1
# Store the rewards
reward_per_episode[trial_idx, episode_idx] = np.sum(episode_reward_list)
trial_lengths.append(global_iter_idx)
reward_per_step[trial_idx, :] = np.cumsum(reward_per_step[trial_idx, :])
# slice off to the shortest trial for consistent visualization
reward_per_step = reward_per_step[:,:np.min(trial_lengths)]
return Q_table, reward_per_step, reward_per_episode