-
Notifications
You must be signed in to change notification settings - Fork 1
/
ModifiedBayes.py
173 lines (145 loc) · 10.2 KB
/
ModifiedBayes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
from modifiedNaive import ModifiedNaive
class ModifiedBayes(ModifiedNaive):
def __init__(self):
ModifiedNaive.__init__(self)
'''
Calculates the discrimination score by subtracting the probability of being in the privileged group
with a C+ classification minus the probability of being in the underprivileged group in the C+ classification.
CHigherSHigher (float) - the probability of being in the priviliged group given a C+ classification
CHigherSLower (float) - the probability of being in the underpriviliged group given a C+ classification
Returns: the discrimination score
'''
def calculateDiscriminationScore(self, CHigherSHigher, CHigherSLower):
return CHigherSHigher - CHigherSLower
'''
Based on the parameter C+ passed into the modify() function, we manually match up the two possible classifications
with the keys "higher" and "lower" inside a dictionary so we can refer to them later.
classificationDict (dict) - an empty dictionary
CHigher (str) - positive classification
classesList (list) - list of possible classifications
'''
def assignClassifications(self, classificationDict, CHigher, classesList):
if (str(classesList[0]) == CHigher):
classificationDict["higher"] = classesList[0]
classificationDict["lower"] = classesList[1]
else:
classificationDict["higher"] = classesList[1]
classificationDict["lower"] = classesList[0]
'''
Assigns the keys "higher" and "lower" to the two possible sensitive attribute values based on which of the two
has a higher count. S+ ("higher") is the privileged group. We do this based on counts instead of as a manual
parameter because there isn't an 'ideal' sensitive attribute category like there is with classifications.
dataSet (DataSet) - the dataset
dataFrame (DataFrame) - the dataframe
sensitivityDict (dict) - an empty dict
'''
def assignSensitivity(self, dataSet, dataFrame, sensitivityDict):
sensitiveAttrCatList = self.getAttributeCategories(dataFrame, dataSet.protectedAttribute)
Sx = dataFrame.loc[dataFrame[dataSet.protectedAttribute] == sensitiveAttrCatList[0], dataSet.protectedAttribute].count()
Sy = dataFrame.loc[dataFrame[dataSet.protectedAttribute] == sensitiveAttrCatList[1], dataSet.protectedAttribute].count()
if (Sx > Sy):
sensitivityDict["higher"] = sensitiveAttrCatList[0]
sensitivityDict["lower"] = sensitiveAttrCatList[1]
else:
sensitivityDict["higher"] = sensitiveAttrCatList[1]
sensitivityDict["lower"] = sensitiveAttrCatList[0]
'''
Counts up the number of elements in a particular column that match the classification value located in the
classDict passed in with the key "higher" (AKA - C+).
dataFrame (DataFrame) - the dataframe
column (str) - a column header
classDict (dict) - a dictionary containing keys of "higher" and "lower" and values of the class labels
Returns: the number of elements in the column that match C+
'''
def calculateNumPos(self, dataFrame, column, classDict):
return dataFrame.loc[dataFrame[column] == classDict["higher"], column].count()
'''
A function that can be called in the while loop to keep track/ watch how the counts are changing with each iteration
'''
def printCounts(self, dataSet, CHigherSLowerCount, CLowerSLowerCount, CHigherSHigherCount, CLowerSHigherCount, higherOrLowerSensitiveAttributeDict, higherOrLowerClassificationDict):
dataFrame = dataSet.dataFrame
print("c+s- count:", CHigherSLowerCount)
print("c-s- count:", CLowerSLowerCount)
print("c+s+ count:", CHigherSHigherCount)
print("c-s+ count:", CLowerSHigherCount)
print("bayes classification column c+s- count: ", self.countIntersection(dataFrame, dataSet.protectedAttribute, higherOrLowerSensitiveAttributeDict["lower"], "Bayes Classification", higherOrLowerClassificationDict["higher"]))
print("bayes classification column c-s- count: ", self.countIntersection(dataFrame, dataSet.protectedAttribute, higherOrLowerSensitiveAttributeDict["lower"], "Bayes Classification" , higherOrLowerClassificationDict["lower"]))
print("bayes classification column c+s+ count: ", self.countIntersection(dataFrame, dataSet.protectedAttribute, higherOrLowerSensitiveAttributeDict["higher"], "Bayes Classification", higherOrLowerClassificationDict["higher"]))
print("bayes classification column c-s+ count: ", self.countIntersection(dataFrame, dataSet.protectedAttribute, higherOrLowerSensitiveAttributeDict["higher"], "Bayes Classification", higherOrLowerClassificationDict["lower"]))
'''
Space saving function for modify() that prints out probabilities
'''
def printProbabilities(self, CHigherSLower, CLowerSLower, CHigherSHigher, CLowerSHigher):
print("c+s- prob:", CHigherSLower)
print("c-s- prob:", CLowerSLower)
print("c+s+ prob:", CHigherSHigher)
print("c-s+ prob:", CLowerSHigher)
'''
Trains the model using modify.
dataSet (DataSet) - the dataset
CHigher (str) - C+
'''
def train(self, dataSet, CHigher):
ModifiedNaive.train(self, dataSet, self.model)
self.modify(dataSet, CHigher)
'''
Classifies the dataset and modifies the model until the discrimination score is 0
dataSet (DataSet) - the dataset
CHigher (str) - C+
'''
def modify(self, dataSet, CHigher):
dataFrame = dataSet.trainDataFrame
protected = dataSet.protectedAttribute
groundTruth = dataSet.trueLabels
sensitiveAttributeModelIndex = dataSet.trainHeaders.index(protected) #need to know index of sensitive attribute in the model
dataFrame = self.classify(dataSet, "train")
#Assign dictionary values based on CHigher parameter
classesList = self.getAttributeCategories(dataFrame, dataSet.trueLabels)
higherOrLowerClassificationDict = {}
self.assignClassifications(higherOrLowerClassificationDict, CHigher, classesList)
#Assign the two sensitive attribute categories as S+ and S-
higherOrLowerSensitiveAttributeDict = {}
self.assignSensitivity(dataSet, dataFrame, higherOrLowerSensitiveAttributeDict)
#calculate the number of people in the dataset that are actually classified as C+ (in the ground truth column - the real number from the data)
actualNumPos = self.calculateNumPos(dataFrame, groundTruth, higherOrLowerClassificationDict)
#Compute counts for C+S-,C-S+,C+S+,and C-S- based on counts from the original groundTruth column
CHigherSLowerCount = self.countIntersection(dataFrame, protected, higherOrLowerSensitiveAttributeDict["lower"], groundTruth, higherOrLowerClassificationDict["higher"])
CLowerSHigherCount = self.countIntersection(dataFrame, protected, higherOrLowerSensitiveAttributeDict["higher"], groundTruth , higherOrLowerClassificationDict["lower"])
CHigherSHigherCount = self.countIntersection(dataFrame, protected, higherOrLowerSensitiveAttributeDict["higher"],groundTruth, higherOrLowerClassificationDict["higher"])
CLowerSLowerCount = self.countIntersection(dataFrame, protected, higherOrLowerSensitiveAttributeDict["lower"], groundTruth, higherOrLowerClassificationDict["lower"])
#Compute baseline probabilities based on the corresponding counts above, which will be used to calculate the preliminary disc score
CHigherSLower = CHigherSLowerCount / self.countAttr(dataFrame, protected, higherOrLowerSensitiveAttributeDict["lower"])
CHigherSHigher = CHigherSHigherCount / self.countAttr(dataFrame, protected, higherOrLowerSensitiveAttributeDict["higher"])
CLowerSLower = CLowerSLowerCount / self.countAttr(dataFrame, protected, higherOrLowerSensitiveAttributeDict["lower"])
CLowerSHigher = CLowerSHigherCount / self.countAttr(dataFrame, protected, higherOrLowerSensitiveAttributeDict["higher"])
#Calculate the preliminary discrimination score -- disc = P(C+ | S+) - P(C+ | S-)
disc = self.calculateDiscriminationScore(CHigherSHigher, CHigherSLower)
while (disc > 0.0):
#Calculate numPos -- the number of instances that we classify people as C+
numPos = self.calculateNumPos(dataFrame, "Bayes Classification", higherOrLowerClassificationDict)
weightOfChange = 0.01 #Value by which we will be modifiying the counts
#Uncomment if desired: prints out current artificial counts we're modifiying and current actual counts in bayes classification column
#self.printCounts(dataSet, CHigherSLowerCount, CLowerSLowerCount, CHigherSHigherCount, CLowerSHigherCount, higherOrLowerSensitiveAttributeDict, higherOrLowerClassificationDict)
if (numPos < actualNumPos): #We have more positive C+ labels we can assign
#Slightly increase the count for C+S- and slightly decrease the count for C-S-
CHigherSLowerCount = CHigherSLowerCount + (weightOfChange * CLowerSHigherCount)
CLowerSLowerCount = CLowerSLowerCount - (weightOfChange * CLowerSHigherCount)
#Update the probabilities based on these new counts
CHigherSLower = CHigherSLowerCount / self.countAttr(dataFrame, protected, higherOrLowerSensitiveAttributeDict["lower"])
CLowerSLower = CLowerSLowerCount / self.countAttr(dataFrame, protected, higherOrLowerSensitiveAttributeDict["lower"])
#Overwrite the old probabilities in the model
self.model[sensitiveAttributeModelIndex][higherOrLowerSensitiveAttributeDict["lower"]][higherOrLowerClassificationDict["higher"]] = CHigherSLower
self.model[sensitiveAttributeModelIndex][higherOrLowerSensitiveAttributeDict["lower"]][higherOrLowerClassificationDict["lower"]] = CLowerSLower
else: #we have assigned more positive C+ labels than we should be
#Slightly increase the count for the C-S+ and slightly decrease the count for C+S+
CLowerSHigherCount = CLowerSHigherCount + (weightOfChange * CHigherSLowerCount)
CHigherSHigherCount = CHigherSHigherCount - (weightOfChange * CHigherSLowerCount)
#Update the probabilities based on these new counts
CLowerSHigher = CLowerSHigherCount / self.countAttr(dataFrame, protected, higherOrLowerSensitiveAttributeDict["higher"])
CHigherSHigher = CHigherSHigherCount / self.countAttr(dataFrame, protected, higherOrLowerSensitiveAttributeDict["higher"])
#Overwrite the old probabilities in the model
self.model[sensitiveAttributeModelIndex][higherOrLowerSensitiveAttributeDict["higher"]][higherOrLowerClassificationDict["lower"]] = CLowerSHigher
self.model[sensitiveAttributeModelIndex][higherOrLowerSensitiveAttributeDict["higher"]][higherOrLowerClassificationDict["higher"]] = CHigherSHigher
#reclassify and recompute the new discrimination score
dataFrame = self.classify(dataSet, "train")
disc = self.calculateDiscriminationScore(CHigherSHigher, CHigherSLower)