-
Notifications
You must be signed in to change notification settings - Fork 37
/
tiny_gp_plus.py
237 lines (210 loc) · 9.54 KB
/
tiny_gp_plus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# tiny genetic programming plus, by © moshe sipper, www.moshesipper.com
# graphic output, dynamic progress display, bloat-control option
# need to install https://pypi.org/project/graphviz/
from random import random, randint, seed
from statistics import mean
from copy import deepcopy
import matplotlib.pyplot as plt
from IPython.display import Image, display
from graphviz import Digraph, Source
POP_SIZE = 60 # population size
MIN_DEPTH = 2 # minimal initial random tree depth
MAX_DEPTH = 5 # maximal initial random tree depth
GENERATIONS = 250 # maximal number of generations to run evolution
TOURNAMENT_SIZE = 5 # size of tournament for tournament selection
XO_RATE = 0.8 # crossover rate
PROB_MUTATION = 0.2 # per-node mutation probability
BLOAT_CONTROL = False # True adds bloat control to fitness function
def add(x, y): return x + y
def sub(x, y): return x - y
def mul(x, y): return x * y
FUNCTIONS = [add, sub, mul]
TERMINALS = ['x', -2, -1, 0, 1, 2]
def target_func(x): # evolution's target
return x*x*x*x + x*x*x + x*x + x + 1
def generate_dataset(): # generate 101 data points from target_func
dataset = []
for x in range(-100,101,2):
x /= 100
dataset.append([x, target_func(x)])
return dataset
class GPTree:
def __init__(self, data = None, left = None, right = None):
self.data = data
self.left = left
self.right = right
def node_label(self): # return string label
if (self.data in FUNCTIONS):
return self.data.__name__
else:
return str(self.data)
def draw(self, dot, count): # dot & count are lists in order to pass "by reference"
node_name = str(count[0])
dot[0].node(node_name, self.node_label())
if self.left:
count[0] += 1
dot[0].edge(node_name, str(count[0]))
self.left.draw(dot, count)
if self.right:
count[0] += 1
dot[0].edge(node_name, str(count[0]))
self.right.draw(dot, count)
def draw_tree(self, fname, footer):
dot = [Digraph()]
dot[0].attr(kw='graph', label = footer)
count = [0]
self.draw(dot, count)
Source(dot[0], filename = fname + ".gv", format="png").render()
display(Image(filename = fname + ".gv.png"))
def compute_tree(self, x):
if (self.data in FUNCTIONS):
return self.data(self.left.compute_tree(x), self.right.compute_tree(x))
elif self.data == 'x': return x
else: return self.data
def random_tree(self, grow, max_depth, depth = 0): # create random tree using either grow or full method
if depth < MIN_DEPTH or (depth < max_depth and not grow):
self.data = FUNCTIONS[randint(0, len(FUNCTIONS)-1)]
elif depth >= max_depth:
self.data = TERMINALS[randint(0, len(TERMINALS)-1)]
else: # intermediate depth, grow
if random () > 0.5:
self.data = TERMINALS[randint(0, len(TERMINALS)-1)]
else:
self.data = FUNCTIONS[randint(0, len(FUNCTIONS)-1)]
if self.data in FUNCTIONS:
self.left = GPTree()
self.left.random_tree(grow, max_depth, depth = depth + 1)
self.right = GPTree()
self.right.random_tree(grow, max_depth, depth = depth + 1)
def mutation(self):
if random() < PROB_MUTATION: # mutate at this node
self.random_tree(grow = True, max_depth = 2)
elif self.left: self.left.mutation()
elif self.right: self.right.mutation()
# def depth(self):
# if self.data in TERMINALS: return 0
# l = self.left.depth() if self.left else 0
# r = self.right.depth() if self.right else 0
# return 1 + max(l, r)
def size(self): # tree size in nodes
if self.data in TERMINALS: return 1
l = self.left.size() if self.left else 0
r = self.right.size() if self.right else 0
return 1 + l + r
def build_subtree(self): # count is list in order to pass "by reference"
t = GPTree()
t.data = self.data
if self.left: t.left = self.left.build_subtree()
if self.right: t.right = self.right.build_subtree()
return t
def scan_tree(self, count, second): # note: count is list, so it's passed "by reference"
count[0] -= 1
if count[0] <= 1:
if not second: # return subtree rooted here
return self.build_subtree()
else: # glue subtree here
self.data = second.data
self.left = second.left
self.right = second.right
else:
ret = None
if self.left and count[0] > 1: ret = self.left.scan_tree(count, second)
if self.right and count[0] > 1: ret = self.right.scan_tree(count, second)
return ret
def crossover(self, other): # xo 2 trees at random nodes
if random() < XO_RATE:
second = other.scan_tree([randint(1, other.size())], None) # 2nd random subtree
self.scan_tree([randint(1, self.size())], second) # 2nd subtree "glued" inside 1st tree
# end class GPTree
def init_population(): # ramped half-and-half
pop = []
for md in range(3, MAX_DEPTH + 1):
for i in range(int(POP_SIZE/6)):
t = GPTree()
t.random_tree(grow = True, max_depth = md) # grow
pop.append(t)
for i in range(int(POP_SIZE/6)):
t = GPTree()
t.random_tree(grow = False, max_depth = md) # full
pop.append(t)
return pop
def error(individual, dataset):
return mean([abs(individual.compute_tree(ds[0]) - ds[1]) for ds in dataset])
def fitness(individual, dataset):
if BLOAT_CONTROL:
return 1 / (1 + error(individual, dataset) + 0.01*individual.size())
else:
return 1 / (1 + error(individual, dataset))
def selection(population, fitnesses): # select one individual using tournament selection
tournament = [randint(0, len(population)-1) for i in range(TOURNAMENT_SIZE)] # select tournament contenders
tournament_fitnesses = [fitnesses[tournament[i]] for i in range(TOURNAMENT_SIZE)]
return deepcopy(population[tournament[tournament_fitnesses.index(max(tournament_fitnesses))]])
def prepare_plots():
fig, axarr = plt.subplots(2, sharex=True)
fig.canvas.set_window_title('EVOLUTIONARY PROGRESS')
fig.subplots_adjust(hspace = 0.5)
axarr[0].set_title('error', fontsize=14)
axarr[1].set_title('mean size', fontsize=14)
plt.xlabel('generation', fontsize=18)
plt.ion() # interactive mode for plot
axarr[0].set_xlim(0, GENERATIONS)
axarr[0].set_ylim(0, 1) # fitness range
xdata = []
ydata = [ [], [] ]
line = [None, None]
line[0], = axarr[0].plot(xdata, ydata[0], 'b-') # 'b-' = blue line
line[1], = axarr[1].plot(xdata, ydata[1], 'r-') # 'r-' = red line
return axarr, line, xdata, ydata
def plot(axarr, line, xdata, ydata, gen, pop, errors, max_mean_size):
xdata.append(gen)
ydata[0].append(min(errors))
line[0].set_xdata(xdata)
line[0].set_ydata(ydata[0])
sizes = [ind.size() for ind in pop]
if mean(sizes) > max_mean_size[0]:
max_mean_size[0] = mean(sizes)
axarr[1].set_ylim(0, max_mean_size[0])
ydata[1].append(mean(sizes))
line[1].set_xdata(xdata)
line[1].set_ydata(ydata[1])
plt.draw()
plt.pause(0.01)
def main():
# init stuff
seed() # init internal state of random number generator
dataset = generate_dataset()
population= init_population()
best_of_run = None
best_of_run_error = 1e20
best_of_run_gen = 0
fitnesses = [fitness(ind, dataset) for ind in population]
max_mean_size = [0] # track maximal mean size for plotting
axarr, line, xdata, ydata = prepare_plots()
# go evolution!
for gen in range(GENERATIONS):
nextgen_population=[]
for i in range(POP_SIZE):
parent1 = selection(population, fitnesses)
parent2 = selection(population, fitnesses)
parent1.crossover(parent2)
parent1.mutation()
nextgen_population.append(parent1)
population=nextgen_population
fitnesses = [fitness(ind, dataset) for ind in population]
errors = [error(ind, dataset) for ind in population]
if min(errors) < best_of_run_error:
best_of_run_error = min(errors)
best_of_run_gen = gen
best_of_run = deepcopy(population[errors.index(min(errors))])
print("________________________")
best_of_run.draw_tree("best_of_run",\
"gen: " + str(gen) + ", error: " + str(round(best_of_run_error,3)))
plot(axarr, line, xdata, ydata, gen, population, errors, max_mean_size)
if best_of_run_error <= 1e-5: break
endrun = "_________________________________________________\nEND OF RUN (bloat control was "
endrun += "ON)" if BLOAT_CONTROL else "OFF)"
print(endrun)
s = "\n\nbest_of_run attained at gen " + str(best_of_run_gen) + " and has error=" + str(round(best_of_run_error,3))
best_of_run.draw_tree("best_of_run",s)
if __name__== "__main__":
main()