-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsearch.py
516 lines (416 loc) · 17.9 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
"""
2021 Generic search module for Python 3.5+
This search module is based on the AIMA book.
http://aima.cs.berkeley.edu/
Search (Chapters 3-4)
The way to use this code is to subclass the class 'Problem' to create
your own class of problems, then create problem instances and solve them with
calls to the various search functions.
Last modified 2020-02-3 by f.maire@qut.edu.au
- revised function headers
- simplified some implementation for the sake of clarity
Abstract Base Classes for Containers
https://docs.python.org/3/library/collections.abc.html
It is recommended to use memoization to cache the heuristic values of states
The relavatn decorator is
@functools.lru_cache
See details at
https://docs.python.org/3.7/library/functools.html
Last changed on 20/03/2021 by f.maire@qut.edu.au
Extended PriorityQueue.append() to accept f_value
"""
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# UTILS
import sys
# check the Python version is at least 3.5
assert sys.version_info >= (3, 5)
import itertools
import heapq
import collections # for dequeue
import functools
def memoize(fn, slot=None, maxsize=128):
"""
Memoize fn: make it remember the computed value for any argument list.
If slot is specified, store result in that slot of first argument.
If slot is false, use lru_cache for caching the values.
"""
if slot:
def memoized_fn(obj, *args):
if hasattr(obj, slot):
return getattr(obj, slot)
else:
val = fn(obj, *args)
setattr(obj, slot, val)
return val
else:
@functools.lru_cache(maxsize=maxsize)
def memoized_fn(*args):
return fn(*args)
return memoized_fn
#______________________________________________________________________________
# Queues: Stack, FIFOQueue, PriorityQueue
# Stack and FIFOQueue are implemented as list and collection.deque
# PriorityQueue is implemented here
class Queue:
"""
Queue is an abstract class/interface. There are three types:
LIFOQueue(): A Last In First Out Queue.
FIFOQueue(): A First In First Out Queue.
PriorityQueue(order, f): Queue in sorted order (min-first).
Each type of queue supports the following methods and functions:
q.append(item) -- add an item to the queue
q.extend(items) -- equivalent to: for item in items: q.append(item)
q.pop() -- return the top item from the queue
len(q) -- number of items in q (also q.__len__())
item in q -- does q contain item?
"""
def __init__(self):
raise NotImplementedError
def extend(self, items):
for item in items: self.append(item)
def LIFOQueue():
"""
Return an empty list, suitable as a Last-In-First-Out Queue.
Last-In-First-Out Queues are also called stacks
"""
return []
class FIFOQueue(collections.deque):
"""
A First-In-First-Out Queue.
"""
def __init__(self):
collections.deque.__init__(self)
def pop(self):
return self.popleft()
# ______________________________________________________________________________
# Queues: Stack, FIFOQueue, PriorityQueue
# Stack and FIFOQueue are implemented as list and collection.deque
# PriorityQueue is implemented here
class PriorityQueue:
"""A Queue in which the minimum (or maximum) element (as determined by f and
order) is returned first.
If order is 'min', the item with minimum f(x) is
returned first; if order is 'max', then it is the item with maximum f(x).
Also supports dict-like lookup."""
def __init__(self, order='min', f=lambda x: x):
self.heap = []
if order == 'min':
self.f = f
elif order == 'max': # now item with max f(x)
self.f = lambda x: -f(x) # will be popped first
else:
raise ValueError("Order must be either 'min' or 'max'.")
self.order = order
def append(self, item, f_value=None):
"""Insert item at its correct position."""
if f_value is None:
f_value = self.f(item)
# take already into account whether min or max PQ with self.f
else:
# A value for the item has been provided
if self.order=='max':
f_value = -f_value
# if the self.order is 'min' we can keep the f_value as provided
heapq.heappush(self.heap, (f_value, item))
def extend(self, items):
"""Insert each item in items at its correct position."""
for item in items:
self.append(item)
def pop(self):
"""Pop and return the item (with min or max f(x) value)
depending on the order."""
if self.heap:
return heapq.heappop(self.heap)[1]
else:
raise Exception('Trying to pop from empty PriorityQueue.')
def __len__(self):
"""Return current capacity of PriorityQueue."""
return len(self.heap)
def __contains__(self, key):
"""Return True if the key is in PriorityQueue."""
return any([item == key for _, item in self.heap])
def __getitem__(self, key):
"""Returns the first value associated with key in PriorityQueue.
Raises KeyError if key is not present."""
for value, item in self.heap:
if item == key:
return value
raise KeyError(str(key) + " is not in the priority queue")
def __delitem__(self, key):
"""Delete the first occurrence of key."""
try:
del self.heap[[item == key for _, item in self.heap].index(True)]
except ValueError:
raise KeyError(str(key) + " is not in the priority queue")
heapq.heapify(self.heap)
#______________________________________________________________________________
class Problem(object):
"""The abstract class for a formal problem. You should subclass
this and implement the methods actions and result, and possibly
__init__, goal_test, and path_cost. Then you will create instances
of your Problem subclass and solve them with the various search functions."""
def __init__(self, initial, goal=None):
"""The constructor specifies the initial state, and possibly a goal
state, if there is a unique goal. Your subclass's constructor can add
other arguments."""
self.initial = initial
self.goal = goal
def actions(self, state):
"""Return the actions that can be executed in the given
state. The result would typically be a list, but if there are
many actions, consider yielding them one at a time in an
iterator, rather than building them all at once."""
raise NotImplementedError
def result(self, state, action):
"""Return the state that results from executing the given
action in the given state. The action must be one of
self.actions(state)."""
raise NotImplementedError
def goal_test(self, state):
"""Return True if the state is a goal. The default method compares the
state to self.goal, as specified in the constructor. Override this
method if checking against a single self.goal is not enough."""
return state == self.goal
def path_cost(self, c, state1, action, state2):
"""Return the cost of a solution path that arrives at state2 from
state1 via action, assuming cost c to get up to state1. If the problem
is such that the path doesn't matter, this function will only look at
state2. If the path does matter, it will consider c and maybe state1
and action. The default method costs 1 for every step in the path."""
return c + 1
def value(self, state):
"""For optimization problems, each state has a value. Hill-climbing
and related algorithms try to maximize this value."""
raise NotImplementedError
#______________________________________________________________________________
class Node:
"""
A node in a search tree. Contains a pointer to the parent (the node
that this is a successor of) and to the actual state for this node. Note
that if a state is arrived at by two paths, then there are two nodes with
the same state. Also includes the action that got us to this state, and
the total path_cost (also known as g) to reach the node. Other functions
may add an f and h value; see best_first_graph_search and astar_search for
an explanation of how the f and h values are handled. You will not need to
subclass this class.
"""
def __init__(self, state, parent=None, action=None, path_cost=0):
"""Create a search tree Node, derived from a parent by an action."""
self.state = state
self.parent = parent
self.action = action
self.path_cost = path_cost
self.depth = 0
if parent:
self.depth = parent.depth + 1
def __repr__(self):
return "<Node {}>".format(self.state)
def __lt__(self, node):
return self.state < node.state
def expand(self, problem):
"""List the nodes reachable in one step from this node."""
return [self.child_node(problem, action)
for action in problem.actions(self.state)]
def child_node(self, problem, action):
"""
Fig. 3.10 of AIMA textbook
Create and return a child node corresponding to 'action'
"""
next_state = problem.result(self.state, action)
return Node(next_state, # next_state is a state
self, # parent is a node
action, # from this state to next state
problem.path_cost(self.path_cost, self.state, action, next_state)
)
def solution(self):
"""Return the sequence of actions to go from the root state to this node state."""
# The root node is associated to the initial state.
# The action associated to the root node is None
return [node.action for node in self.path()[1:]]
def path(self):
"Return a list of nodes forming the path from the root to this node."
node, path_back = self, []
while node:
path_back.append(node)
node = node.parent
return list(reversed(path_back))
# We want for a queue of nodes in breadth_first_search or
# astar_search to have no duplicated states, so we treat nodes
# with the same state as equal. [Problem: this may not be what you
# want in other contexts!]
def __eq__(self, other):
return isinstance(other, Node) and self.state == other.state
def __hash__(self):
# We use the hash value of the state
# stored in the node instead of the node
# object itself to quickly search a node
# with the same state in a Hash Table
return hash(self.state)
#______________________________________________________________________________
# Uninformed Search algorithms
def tree_search(problem, frontier):
"""
Search through the successors of a problem to find a goal.
The argument frontier should be an empty queue.
Don't worry about repeated paths to a state. [Fig. 3.7]
Return
the node of the first goal state found
or None is no goal state is found
"""
assert isinstance(problem, Problem)
frontier.append(Node(problem.initial))
while frontier:
node = frontier.pop()
if problem.goal_test(node.state):
return node
frontier.extend(node.expand(problem))
return None
def graph_search(problem, frontier):
"""
Search through the successors of a problem to find a goal.
The argument frontier should be an empty queue.
If two paths reach a state, only use the first one. [Fig. 3.7]
Return
the node of the first goal state found
or None is no goal state is found
"""
assert isinstance(problem, Problem)
frontier.append(Node(problem.initial))
explored = set() # initial empty set of explored states
while frontier:
node = frontier.pop()
if problem.goal_test(node.state):
return node
explored.add(node.state)
# Python note: next line uses of a generator
frontier.extend(child for child in node.expand(problem)
if child.state not in explored
and child not in frontier)
return None
def breadth_first_tree_search(problem):
"Search the shallowest nodes in the search tree first."
return tree_search(problem, FIFOQueue())
def depth_first_tree_search(problem):
"Search the deepest nodes in the search tree first."
return tree_search(problem, LIFOQueue())
def depth_first_graph_search(problem):
"Search the deepest nodes in the search tree first."
return graph_search(problem, LIFOQueue())
def breadth_first_graph_search(problem):
"Graph search version of BFS. [Fig. 3.11]"
return graph_search(problem, FIFOQueue())
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Informed Search algorithms
def best_first_tree_search(problem, f):
"""
Search the nodes with the lowest f scores first.
You specify the function f(node) that you want to minimize; for example,
if f is a heuristic estimate to the goal, then we have greedy best
first search; if f is node.depth then we have breadth-first search.
"""
node = Node(problem.initial)
if problem.goal_test(node.state):
return node
frontier = PriorityQueue(f=f)
frontier.append(node)
while frontier:
node = frontier.pop()
if problem.goal_test(node.state):
return node
for child in node.expand(problem):
# Test whether a node with the same state exists in the frontier.
# Remember that two nodes are considered equal iff their states
# are equal. See method Node.__eq__()
if child not in frontier:
# The node child is considered "in frontier", if a node
# already in frontier has the same state.
# See PriortyQueue.__contains__()
frontier.append(child)
else:
# A node in frontier has the same state as child
# frontier[child] is the f-value of the node.
# See method PriorityQueue.__getitem__()
if f(child) < frontier[child]:
# Replace the incumbent (that is the node
# already in the frontier) with child
del frontier[child]
frontier.append(child)
return None
def best_first_graph_search(problem, f):
"""
Search the nodes with the lowest f scores first.
You specify the function f(node) that you want to minimize; for example,
if f is a heuristic estimate to the goal, then we have greedy best
first search; if f is node.depth then we have breadth-first search.
"""
node = Node(problem.initial)
if problem.goal_test(node.state):
return node
frontier = PriorityQueue(f=f)
frontier.append(node)
explored = set() # set of states
while frontier:
node = frontier.pop()
if problem.goal_test(node.state):
return node
explored.add(node.state)
for child in node.expand(problem):
if child.state not in explored and child not in frontier:
frontier.append(child)
elif child in frontier:
# frontier[child] is the f value of the
# incumbent node that shares the same state as
# the node child. Read implementation of PriorityQueue
if f(child) < frontier[child]:
del frontier[child] # delete the incumbent node
frontier.append(child) #
return None
def uniform_cost_search(problem):
"[Fig. 3.14]"
return best_first_graph_search(problem, lambda node: node.path_cost)
def depth_limited_search(problem, limit=50):
"[Fig. 3.17]"
def recursive_dls(node, problem, limit):
if problem.goal_test(node.state):
return node
elif node.depth == limit:
return 'cutoff'
else:
cutoff_occurred = False
for child in node.expand(problem):
result = recursive_dls(child, problem, limit)
if result == 'cutoff':
cutoff_occurred = True
elif result is not None:
return result
if cutoff_occurred:
return 'cutoff'
else:
return None
# Body of depth_limited_search:
return recursive_dls(Node(problem.initial), problem, limit)
def iterative_deepening_search(problem):
"[Fig. 3.18]"
for depth in itertools.count():
result = depth_limited_search(problem, depth)
if result != 'cutoff':
return result
#______________________________________________________________________________
# Informed (Heuristic) Search
greedy_best_first_graph_search = best_first_graph_search
# Greedy best-first search is accomplished by specifying f(n) = h(n).
def astar_graph_search(problem, h=None):
"""A* search is best-first graph search with f(n) = g(n)+h(n).
You need to specify the h function when you call astar_search, or
else in your Problem subclass."""
return best_first_graph_search(problem, lambda n: n.path_cost + h(n))
def astar_tree_search(problem, h=None):
"""A* search is best-first graph search with f(n) = g(n)+h(n).
You need to specify the h function when you call astar_search, or
else in your Problem subclass."""
return best_first_tree_search(problem, lambda n: n.path_cost + h(n))
#______________________________________________________________________________
#
# + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
# CODE CEMETARY
# + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +