-
Notifications
You must be signed in to change notification settings - Fork 4
/
csp.py
executable file
·404 lines (353 loc) · 16 KB
/
csp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
import numpy as np
import math
import sys
import collections, util, copy
## Taken from CS221 scheduling starter code
gibbsMaxIter = 1000
# General code for representing a weighted CSP (Constraint Satisfaction Problem).
# All variables are being referenced by their index instead of their original
# names.
class CSP(object):
def __init__(self):
# Total number of variables in the CSP.
self.numVars = 0
# The list of variable names in the same order as they are added. A
# variable name can be any hashable objects, for example: int, str,
# or any tuple with hashtable objects.
self.variables = []
# Each key K in this dictionary is a variable name.
# values[K] is the list of domain values that variable K can take on.
self.values = {}
# Each entry is a unary factor table for the corresponding variable.
# The factor table corresponds to the weight distribution of a variable
# for all added unary factor functions. If there's no unary function for
# a variable K, there will be no entry for K in unaryFactors.
# E.g. if B \in ['a', 'b'] is a variable, and we added two
# unary factor functions f1, f2 for B,
# then unaryFactors[B]['a'] == f1('a') * f2('a')
self.unaryFactors = {}
# Each entry is a dictionary keyed by the name of the other variable
# involved. The value is a binary factor table, where each table
# stores the factor value for all possible combinations of
# the domains of the two variables for all added binary factor
# functions. The table is represented as a dictionary of dictionary.
#
# As an example, if we only have two variables
# A \in ['b', 'c'], B \in ['a', 'b']
# and we've added two binary functions f1(A,B) and f2(A,B) to the CSP,
# then binaryFactors[A][B]['b']['a'] == f1('b','a') * f2('b','a').
# binaryFactors[A][A] should return a key error since a variable
# shouldn't have a binary factor table with itself.
self.binaryFactors = {}
def add_variable(self, var, domain):
"""
Add a new variable to the CSP.
"""
if var in self.variables:
raise Exception("Variable name already exists: %s" % str(var))
self.numVars += 1
self.variables.append(var)
self.values[var] = domain
self.unaryFactors[var] = None
self.binaryFactors[var] = dict()
def get_neighbor_vars(self, var):
"""
Returns a list of variables which are neighbors of |var|.
"""
return self.binaryFactors[var].keys()
def add_unary_factor(self, var, factorFunc):
"""
Add a unary factor function for a variable. Its factor
value across the domain will be *merged* with any previously added
unary factor functions through elementwise multiplication.
How to get unary factor value given a variable |var| and
value |val|?
=> csp.unaryFactors[var][val]
"""
factor = {val:float(factorFunc(val)) for val in self.values[var]}
if self.unaryFactors[var] is not None:
assert len(self.unaryFactors[var]) == len(factor)
self.unaryFactors[var] = {val:self.unaryFactors[var][val] * \
factor[val] for val in factor}
else:
self.unaryFactors[var] = factor
def add_binary_factor(self, var1, var2, factor_func):
"""
Takes two variable names and a binary factor function
|factorFunc|, add to binaryFactors. If the two variables already
had binaryFactors added earlier, they will be *merged* through element
wise multiplication.
How to get binary factor value given a variable |var1| with value |val1|
and variable |var2| with value |val2|?
=> csp.binaryFactors[var1][var2][val1][val2]
"""
# never shall a binary factor be added over a single variable
try:
assert var1 != var2
except:
raise
self.update_binary_factor_table(var1, var2,
{val1: {val2: float(factor_func(val1, val2)) \
for val2 in self.values[var2]} for val1 in self.values[var1]})
self.update_binary_factor_table(var2, var1, \
{val2: {val1: float(factor_func(val1, val2)) \
for val1 in self.values[var1]} for val2 in self.values[var2]})
def update_binary_factor_table(self, var1, var2, table):
"""
Private method you can skip for 0c, might be useful for 1c though.
Update the binary factor table for binaryFactors[var1][var2].
If it exists, element-wise multiplications will be performed to merge
them together.
"""
if var2 not in self.binaryFactors[var1]:
self.binaryFactors[var1][var2] = table
else:
currentTable = self.binaryFactors[var1][var2]
for i in table:
for j in table[i]:
assert i in currentTable and j in currentTable[i]
currentTable[i][j] *= table[i][j]
def get_delta_weight(self, assignment, var, val):
"""
Given a partial assignment, and a proposed new value for a variable,
return the change of weights after assigning the variable with the proposed
value.
@param assignment: A dictionary of current assignment. Unassigned variables
do not have entries, while an assigned variable has the assigned value
as value in dictionary. e.g. if the domain of the variable A is [5,6],
and 6 was assigned to it, then assignment[A] == 6.
@param var: name of an unassigned variable.
@param val: the proposed value.
@return w: Change in weights as a result of the proposed assignment. This
will be used as a multiplier on the current weight.
"""
assert var not in assignment
w = 0.0
if self.unaryFactors[var]:
value = self.unaryFactors[var][val]
if value == 0:
return -float('inf')
else:
w += np.log(value)
for var2, factor in self.binaryFactors[var].items():
if var2 not in assignment: continue # Not assigned yet
value = factor[val][assignment[var2]]
if value == 0:
return -float('inf')
else:
w += np.log(value)
return w
def get_assignment_weight(self, assignment):
"""
Given a current assignment(can be partial), return the weight
"""
w = 0.0
mockassignment = {}
variables = list(assignment)
for i in range(len(variables)):
var1 = variables[i]
delta = self.get_delta_weight(mockassignment, var1, assignment[var1])
if delta == -float('inf'):
return delta
else:
w += delta
mockassignment[var1] = assignment[var1]
return w
class GibbsSampling():
def solve(self, csp):
currentWeight = 0
currentAssignment = {}
# Generate initial assignment at random and compute weight
for var in csp.variables:
currentAssignment[var] = np.random.choice(csp.values[var])
currentWeight = csp.get_assignment_weight(currentAssignment)
diff = 1
iterations = 0
optimalWeight = -float('inf')
optimalAssignment = {}
while diff > 0.001 and iterations < gibbsMaxIter:
for var in csp.variables:
# print ("Currently dealing with variable %d" % var)
# unassign variable
del currentAssignment[var]
# compute delta weights for all possible assinments
delta_weights = np.exp([csp.get_delta_weight(currentAssignment, var, val) for val in csp.values[var]])
# sample variable using weights
new_value = np.random.choice(csp.values[var], 1, p=delta_weights/np.sum(delta_weights))[0]
currentAssignment[var] = new_value
newWeight = csp.get_assignment_weight(currentAssignment)
if newWeight > optimalWeight:
optimalWeight = newWeight
optimalAssignment = currentAssignment
diff = abs(currentWeight - newWeight)
currentWeight = newWeight
iterations += 1
# print (("Converged in %d iterations") % iterations)
# print ((" Optimal weight is %f") % optimalWeight)
return optimalAssignment
class BacktrackingSearch():
def reset_results(self):
# Keep track of the best assignment and weight found.
self.optimalAssignment = {}
self.optimalWeight = 0
# Keep track of the number of optimal assignments and assignments. These
# two values should be identical when the CSP is unweighted or only has binary
# weights.
self.numOptimalAssignments = 0
self.numAssignments = 0
# Keep track of the number of times backtrack() gets called.
self.numOperations = 0
# List of all solutions found.
self.allAssignments = []
def print_stats(self):
"""
Prints a message summarizing the outcome of the solver.
"""
if self.optimalAssignment:
print ("Found %d optimal assignments with weight %f in %d operations" % \
(self.numOptimalAssignments, self.optimalWeight, self.numOperations))
else:
print ("No solution was found.")
def solve(self, csp, mcv = False, ac3 = False):
"""
Solves the given weighted CSP using heuristics as specified in the
parameter. Note that unlike a typical unweighted CSP where the search
terminates when one solution is found, we want this function to find
all possible assignments. The results are stored in the variables
described in reset_result().
@param csp: A weighted CSP.
@param mcv: When enabled, Most Constrained Variable heuristics is used.
@param ac3: When enabled, AC-3 will be used after each assignment of an
variable is made.
"""
# CSP to be solved.
self.csp = csp
# Set the search heuristics requested asked.
self.mcv = mcv
self.ac3 = ac3
# Reset solutions from previous search.
self.reset_results()
# The dictionary of domains of every variable in the CSP.
self.domains = {var: list(self.csp.values[var]) for var in self.csp.variables}
# Perform backtracking search.
self.backtrack({}, 0, 0)
# Print summary of solutions.
self.print_stats()
def backtrack(self, assignment, numAssigned, weight):
"""
Perform the back-tracking algorithms to find all possible solutions to
the CSP.
@param assignment: A dictionary of current assignment. Unassigned variables
do not have entries, while an assigned variable has the assigned value
as value in dictionary. e.g. if the domain of the variable A is [5,6],
and 6 was assigned to it, then assignment[A] == 6.
@param numAssigned: Number of currently assigned variables
@param weight: The weight of the current partial assignment.
"""
self.numOperations += 1
assert weight > -float('inf')
if numAssigned == self.csp.numVars:
# A satisfiable solution have been found. Update the statistics.
self.numAssignments += 1
newAssignment = {}
for var in self.csp.variables:
newAssignment[var] = assignment[var]
self.allAssignments.append(newAssignment)
if len(self.optimalAssignment) == 0 or weight >= self.optimalWeight:
if weight == self.optimalWeight:
self.numOptimalAssignments += 1
else:
self.numOptimalAssignments = 1
self.optimalWeight = weight
self.optimalAssignment = newAssignment
return
# Select the next variable to be assigned.
var = self.get_unassigned_variable(assignment)
# Get an ordering of the values.
ordered_values = self.domains[var]
# Continue the backtracking recursion using |var| and |ordered_values|.
if not self.ac3:
# When arc consistency check is not enabled.
for val in ordered_values:
deltaWeight = self.csp.get_delta_weight(assignment, var, val)
if deltaWeight > -float('inf'):
assignment[var] = val
self.backtrack(assignment, numAssigned + 1, weight + deltaWeight)
del assignment[var]
else:
# Arc consistency check is enabled.
# Problem 1c: skeleton code for AC-3
# You need to implement arc_consistency_check().
for val in ordered_values:
deltaWeight = self.csp.get_delta_weight(assignment, var, val)
if deltaWeight > -float('inf'):
assignment[var] = val
# create a deep copy of domains as we are going to look
# ahead and change domain values
localCopy = copy.deepcopy(self.domains)
# fix value for the selected variable so that hopefully we
# can eliminate values for other variables
self.domains[var] = [val]
# enforce arc consistency
self.arc_consistency_check(var)
self.backtrack(assignment, numAssigned + 1, weight + deltaWeight)
# restore the previous domains
self.domains = localCopy
del assignment[var]
def get_unassigned_variable(self, assignment):
"""
Given a partial assignment, return a currently unassigned variable.
@param assignment: A dictionary of current assignment. This is the same as
what you've seen so far.
@return var: a currently unassigned variable.
"""
if not self.mcv:
# Select a variable without any heuristics.
for var in self.csp.variables:
if var not in assignment: return var
else:
min_variable = None
min_count = sys.maxint
for var in self.csp.variables:
if var not in assignment:
count = 0
for p in self.domains[var]:
if self.csp.get_delta_weight(assignment, var, p) > -float('inf'):
count+=1
if count < min_count:
min_count = count
min_variable = var
return min_variable
# END_YOUR_CODE
def arc_consistency_check(self, var):
"""
Perform the AC-3 algorithm. The goal is to reduce the size of the
domain values for the unassigned variables based on arc consistency.
@param var: The variable whose value has just been set.
"""
def isConsistent(var1, var2, val2):
for val1 in self.domains[var1]:
if self.csp.binaryFactors[var1][var2][val1][val2] > 0:
return True
return False
def updateDomain(var1, var2):
newDomain = []
hasChanges = False
for val2 in self.domains[var2]:
consistent = isConsistent(var1, var2, val2)
if consistent:
# hasChanges = True
newDomain.append(val2)
else:
hasChanges = True
if hasChanges:
self.domains[var2] = newDomain
return hasChanges
queue = []
queue.append(var)
while len(queue) > 0:
var1 = queue.pop()
for var2 in self.csp.get_neighbor_vars(var1):
updated = updateDomain(var1, var2)
if updated:
queue.append(var2)