forked from nuno-faria/tetris-ai
-
Notifications
You must be signed in to change notification settings - Fork 8
/
dqn_agent.py
139 lines (113 loc) · 5.34 KB
/
dqn_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from typing import ValuesView, List, Optional
from keras import Model
from keras.models import Sequential
from keras.layers import Dense
from collections import deque
import numpy as np
import random
# Deep Q Learning Agent + Maximin
#
# This version only provides only value per input,
# that indicates the score expected in that state.
# This is because the algorithm will try to find the
# best final state for the combinations of possible states,
# in contrast to the traditional way of finding the best
# action for a particular state.
# noinspection PyMethodMayBeStatic
class DQNAgent:
"""Deep Q Learning Agent + Maximin
Args:
state_size (int): Size of the input domain
mem_size (int): Size of the replay buffer
discount (float): How important is the future rewards compared to the immediate ones [0,1]
epsilon (float): Exploration (probability of random values given) value at the start
epsilon_min (float): At what epsilon value the agent stops decrementing it
epsilon_stop_episode (int): At what episode the agent stops decreasing the exploration variable
n_neurons (list(int)): List with the number of neurons in each inner layer
activations (list): List with the activations used in each inner layer, as well as the output
loss (obj): Loss function
optimizer (obj): Otimizer used
replay_start_size: Minimum size needed to train
"""
def __init__(self, state_size, mem_size=10000, discount=0.95,
epsilon=1, epsilon_min=0, epsilon_stop_episode=500,
n_neurons=(32, 32), activations=('relu', 'relu', 'linear'),
loss='mse', optimizer='adam', replay_start_size=None):
assert len(activations) == len(n_neurons) + 1
self.state_size = state_size
self.memory = deque(maxlen=mem_size)
self.discount = discount
self.epsilon = epsilon
self.epsilon_min = epsilon_min
self.epsilon_decay = (self.epsilon - self.epsilon_min) / epsilon_stop_episode
self.n_neurons = n_neurons
self.activations = activations
self.loss = loss
self.optimizer = optimizer
if not replay_start_size:
replay_start_size = mem_size / 2
self.replay_start_size = replay_start_size
self.model = self._build_model()
def _build_model(self) -> Model:
"""Builds a Keras deep neural network model"""
model = Sequential()
model.add(Dense(self.n_neurons[0], input_dim=self.state_size, activation=self.activations[0]))
for i in range(1, len(self.n_neurons)):
model.add(Dense(self.n_neurons[i], activation=self.activations[i]))
model.add(Dense(1, activation=self.activations[-1]))
model.compile(loss=self.loss, optimizer=self.optimizer)
return model
def add_to_memory(self, current_state, next_state, reward, done):
"""Adds a play to the replay memory buffer"""
self.memory.append((current_state, next_state, reward, done))
def random_value(self):
"""Random score for a certain action"""
return random.random()
def predict_value(self, state: np.ndarray) -> float:
"""Predicts the score for a certain state"""
return self.model.predict(state)[0]
def act(self, state):
"""Returns the expected score of a certain state"""
state = np.reshape(state, [1, self.state_size])
if random.random() <= self.epsilon:
return self.random_value()
else:
return self.predict_value(state)
def best_state(self, states: ValuesView[List[int]]) -> List[int]:
"""Returns the best state for a given collection of states"""
if random.random() <= self.epsilon:
return random.choice(list(states))
else:
max_value: Optional[float] = None
best_state: Optional[List[int]] = None
for state in states:
# ask the neural network about the best value
value = self.predict_value(np.reshape(state, [1, self.state_size]))
if not max_value or value > max_value:
max_value = value
best_state = state
return best_state
def train(self, batch_size=32, epochs=3):
"""Trains the agent"""
n = len(self.memory)
if n >= self.replay_start_size and n >= batch_size:
batch = random.sample(self.memory, batch_size)
# Get the expected score for the next states, in batch (better performance)
next_states = np.array([x[1] for x in batch])
next_qs = [x[0] for x in self.model.predict(next_states)]
x = []
y = []
# Build xy structure to fit the model in batch (better performance)
for i, (state, _, reward, done) in enumerate(batch):
if not done:
# Partial Q formula
new_q = reward + self.discount * next_qs[i]
else:
new_q = reward
x.append(state)
y.append(new_q)
# Fit the model to the given values
self.model.fit(np.array(x), np.array(y), batch_size=batch_size, epochs=epochs, verbose=0)
# Update the exploration variable
if self.epsilon > self.epsilon_min:
self.epsilon -= self.epsilon_decay