-
Notifications
You must be signed in to change notification settings - Fork 0
/
bipedal_walker.py
263 lines (199 loc) · 8.26 KB
/
bipedal_walker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import os
import sys
import types
import gymnasium as gym
import numpy as np
from torch import save, load, float32, tensor
import matplotlib.pyplot as plt
from pathlib import Path
# Add project root to the python path
sys.path.append(os.path.dirname(__file__))
from controller.fc import FC
from controller.rbfn_fc import RBFN_FC
from controller.cpg_rbfn import CPG_RBFN
from controller.cpg_fc import CPG_FC
from utils.individual import Individual
from evolutionary.functions import mutate, norm_fitness_of_generation, roulette_wheel_selection, select_solutions_from_gen, resetFitness
#Get current directory
CWD = Path.cwd()
#Gym environment
ENV_TYPE = "HalfCheetah-v4"
ENV = gym.make(ENV_TYPE)
#MODEL TYPE
models = types.SimpleNamespace()
models.CPG_RBFN_MODEL = "CPG-RBFN"
models.CPG_FC_MODEL = "CPG-FC"
models.FC_MODEL = 'FC'
models.RBFN_FC_MODEL = 'RBFN-FC'
MODEL_TYPE = models.CPG_RBFN_MODEL
#MODEL_TYPE = models.CPG_FC_MODEL
#MODEL_TYPE = models.FC_MODEL
#MODEL_TYPE = models.RBFN_FC_MODEL
#Folder to save models
MODELS_PATH = f"{CWD}/models/{MODEL_TYPE}"
#CPG-RBFN Parameters
RBFN_UNITS = 10
#FC Network
FC_INPUT_UNITS = ENV.observation_space.shape[0]
FC_HID1_UNITS = 30
FC_HID2_UNITS = 30
OUTPUT_UNITS = ENV.action_space.shape[0]
### NEUROEVOLUTION PARAMS ###
REWARDS_GOAL = 1000
GENERATIONS = 1000
GEN_SIZE = 10
ELITE_SIZE = GEN_SIZE
##########-- NEUROEVOLUTION --##########
#Run individuals in generation through environment
def run_gen(generation, rewards_goal):
#Takes each individual and makes it play the game
for individual in generation:
#Reset the environment, get initial state
state, _ = ENV.reset()
#This is the goal you have set for the individual.
for _ in range(rewards_goal):
#Choose action
action = None
match(MODEL_TYPE):
case models.CPG_RBFN_MODEL | models.CPG_FC_MODEL:
action = individual.choose_action()
case models.FC_MODEL | models.RBFN_FC_MODEL:
x = np.array(state, dtype=np.float32)
x = tensor(x, dtype=float32)
action = individual.choose_action(x)
state, reward, terminated, truncated, _ = ENV.step(action)
individual.fitness += reward
if terminated or truncated:
break
#Reset CPG
if MODEL_TYPE == models.CPG_RBFN_MODEL or MODEL_TYPE == models.CPG_FC_MODEL:
individual.model.cpg.reset()
#Train through neuro evolution
def neuro_evolution(gen_size: int, generations: int, rewards_goal: int, elite_size: int, elite: list[Individual]=[]):
best_per_gen = []
best_indv = None
#Initialize first gen
generation = []
try:
# Add elite if any
for i in range(len(elite)):
generation.append(elite[i])
for _ in range(gen_size-len(elite)):
model = None
match(MODEL_TYPE):
case models.CPG_RBFN_MODEL:
model = CPG_RBFN(RBFN_UNITS, OUTPUT_UNITS)
case models.CPG_FC_MODEL:
model = CPG_FC(FC_HID1_UNITS, FC_HID2_UNITS, OUTPUT_UNITS)
case models.FC_MODEL:
model = FC(FC_INPUT_UNITS, FC_HID1_UNITS, FC_HID2_UNITS, OUTPUT_UNITS)
case models.RBFN_FC_MODEL:
model = RBFN_FC(FC_INPUT_UNITS, RBFN_UNITS, OUTPUT_UNITS)
new_individual = Individual(model)
generation.append(new_individual)
#Iterate generations
for gen_count in range(generations):
#Runs each individual through the sim
run_gen(generation, rewards_goal)
#Get fitness of current generation
fitness_of_generation = norm_fitness_of_generation(generation)
#Breed gen_size children
children = []
for _ in range(0,gen_size):
# Select parents for breeding through roulette wheel selection
parent = generation[roulette_wheel_selection(fitness_of_generation)]
#Mutation
mutate_percent = 0.2
mutations = int(parent.model.dim * mutate_percent)
model = None
match(MODEL_TYPE):
case models.CPG_RBFN_MODEL:
model = CPG_RBFN(RBFN_UNITS, OUTPUT_UNITS)
case models.CPG_FC_MODEL:
model = CPG_FC(FC_HID1_UNITS, FC_HID2_UNITS, OUTPUT_UNITS)
case models.FC_MODEL:
model = FC(FC_INPUT_UNITS, FC_HID1_UNITS, FC_HID2_UNITS, OUTPUT_UNITS)
case models.RBFN_FC_MODEL:
model = RBFN_FC(FC_INPUT_UNITS, RBFN_UNITS, OUTPUT_UNITS)
child = Individual(model)
child.model.set_params(mutate(parent.model.get_params(), mutations))
children.append(child)
#Runs each child through the biped walker sim
run_gen(children, rewards_goal)
#Add the breeded children to current generation
generation.extend(children)
#From select the best solutions up to gen_size
generation = select_solutions_from_gen(generation, gen_size)
#Print results
print(f'Generation: {gen_count} Best Fitness: {generation[0].fitness}')
best_per_gen.append(generation[0].fitness)
best_indv = generation[0]
elite = generation[0:elite_size]
#Reset generation's fitness
resetFitness(generation)
# if MODEL_TYPE == models.CPG_RBFN_MODEL or MODEL_TYPE == models.CPG_FC_MODEL:
# for i in generation:
# i.model.cpg.reset()
except KeyboardInterrupt:
for i in range(len(elite)):
save(elite[i].model.state_dict(), f"{MODELS_PATH}/model{i}.pt")
print("Saved Models")
print(best_per_gen)
sys.exit()
ENV.close()
return best_indv, elite, best_per_gen
#Run the algorithms with learned models
def test_algorithm(best_nn:Individual, episodes:int=1000):
#Set test environment
test_env = gym.make(ENV_TYPE, render_mode="human")
#Reset the environment, get initial state
state, _ = test_env.reset()
total_rewards = 0
for _ in range(episodes):
#Choose action
action = None
match(MODEL_TYPE):
case models.CPG_RBFN_MODEL | models.CPG_FC_MODEL:
action = best_nn.choose_action()
case models.FC_MODEL | models.RBFN_FC_MODEL:
x = np.array(state, dtype=np.float32)
x = tensor(x, dtype=float32)
action = best_nn.choose_action(x)
state, reward, terminated, _, _ = test_env.step(action)
total_rewards += reward
print(f"Rewards: {total_rewards}")
if terminated:
break
test_env.render()
test_env.close()
model = None
match(MODEL_TYPE):
case models.CPG_RBFN_MODEL:
model = CPG_RBFN(RBFN_UNITS, OUTPUT_UNITS)
case models.CPG_FC_MODEL:
model = CPG_FC(FC_HID1_UNITS, FC_HID2_UNITS, OUTPUT_UNITS)
case models.FC_MODEL:
model = FC(FC_INPUT_UNITS, FC_HID1_UNITS, FC_HID2_UNITS, OUTPUT_UNITS)
case models.RBFN_FC_MODEL:
model = RBFN_FC(FC_INPUT_UNITS, RBFN_UNITS, OUTPUT_UNITS)
### CONTINUE NEUROEVOLUTION RUN ###
elite = []
for i in range(ELITE_SIZE):
model.load_state_dict(load(f"{MODELS_PATH}/model{i}.pt"))
best_indv = Individual(model)
best_indv.model = model
elite.append(best_indv)
best_indv, new_elite, best_per_gen = neuro_evolution(gen_size=GEN_SIZE, generations=GENERATIONS, rewards_goal=REWARDS_GOAL, elite_size=ELITE_SIZE, elite=elite)
for i in range(len(new_elite)):
save(new_elite[i].model.state_dict(), f"{MODELS_PATH}/model{i}.pt")
## LOAD BEST SAVED MODEL ###
# model.load_state_dict(load(f"{MODELS_PATH}/model0.pt"))
# print(model.state_dict())
# best_indv = Individual(model)
# best_indv.model = model
# test_algorithm(best_nn=best_indv)
### FIRST NEUROEVOLUTION RUN ###
# best_indv, elite, best_per_gen = neuro_evolution(gen_size=GEN_SIZE, generations=GENERATIONS, rewards_goal=REWARDS_GOAL, elite_size=ELITE_SIZE)
# for i in range(len(elite)):
# save(elite[i].model.state_dict(), f"{MODELS_PATH}/model{i}.pt")
# print(best_per_gen)