-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdeclarative.py
343 lines (297 loc) · 15.1 KB
/
declarative.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import collections
import numpy as np
import chunks
import utilities
import buffers
class DecMem(collections.MutableMapping):
"""
Declarative memory module.
"""
def __init__(self, data=None):
self._data = {}
self.restricted_number_chunks = collections.Counter() # counter for pairs of slot - value, used to store strength association
self.unrestricted_number_chunks = collections.Counter() # counter for chunks, used to store strength association
self.activations = {}
if data is not None:
try:
self.update(data)
except ValueError:
self.update({x: 0 for x in data})
print("DecMem", end="")
print(self)
def __contains__(self, elem):
return elem in self._data
def __delitem__(self, key):
del self._data[key]
def __iter__(self):
for elem in self._data:
yield elem
def __getitem__(self, key):
return self._data[key]
def __len__(self):
return len(self._data)
def __repr__(self):
return repr(self._data)
# return repr map(lambda a: a + '\n', self._data)
def __setitem__(self, key, time):
if self.unrestricted_number_chunks and key not in self:
for x in key:
if utilities.splitting(x[1]).values and utilities.splitting(
x[1]).values in self.unrestricted_number_chunks:
self.unrestricted_number_chunks.update([utilities.splitting(x[1]).values])
if self.restricted_number_chunks and key not in self:
for x in key:
if utilities.splitting(x[1]).values and (
x[0], utilities.splitting(x[1]).values) in self.restricted_number_chunks:
self.restricted_number_chunks.update([(x[0], utilities.splitting(x[1]).values)])
if isinstance(key, chunks.Chunk):
if isinstance(time, np.ndarray):
self._data[key] = time
else:
try:
self._data[key] = np.array([round(float(time), 4)])
except TypeError:
self._data[key] = np.array(time)
else:
raise utilities.ModelError(
"Only chunks can be added as attributes to Declarative Memory; '%s' is not a chunk" % key)
def add_activation(self, element, activation):
"""
Add activation of an element.
This raises an error if the element is not in the declarative memory
"""
if element in self:
self.activations[element] = activation
else:
raise AttributeError("The chunk %s is not in the declarative memory." % key)
def add(self, element, time=0):
"""
Add an element to decl. mem. Add time to the existing element.
element can be either one chunk, or an iterable of chunks.
"""
if isinstance(time, collections.Iterable):
try:
new = np.concatenate((self.setdefault(element, np.array([])), np.array(time)))
self[element] = new
except TypeError:
for x in element:
new = np.concatenate((self.setdefault(x, np.array([])), np.array(time)))
self[x] = new
else:
try:
new = np.append(self.setdefault(element, np.array([])), time)
self[element] = new
except TypeError:
for x in element:
new = np.append(self.setdefault(x, np.array([])), round(float(time), 4))
self[x] = new
def copy(self):
"""
Copy declarative memory.
"""
dm = DecMem(self._data.copy())
dm.activations = self.activations.copy()
dm.restricted_number_chunks = self.restricted_number_chunks.copy()
dm.unrestricted_number_chunks = self.unrestricted_number_chunks.copy()
return dm
class DecMemBuffer(buffers.Buffer):
"""
Declarative memory buffer.
"""
def __init__(self, decmem=None, data=None, finst=0):
buffers.Buffer.__init__(self, decmem, data)
self.recent = collections.deque()
self.__finst = finst
self.activation = None # activation of the last retrieved element
# parameters
self.model_parameters = {}
print("DecMem Buffer", end="")
print(self)
@property
def finst(self):
"""
Finst - how many chunks are 'remembered' in declarative memory buffer.
"""
return self.__finst
@finst.setter
def finst(self, value):
if value >= 0:
self.__finst = value
else:
raise ValueError('Finst in the dm buffer must be >= 0')
@property
def decmem(self):
"""
Default harvest of retrieval buffer.
"""
return self.dm
@decmem.setter
def decmem(self, value):
try:
self.dm = value
except ValueError:
raise utilities.ModelError('The default harvest set in the retrieval buffer is not a possible declarative memory')
def add(self, elem, time=0):
"""
Clear current buffer and adds a new chunk.
"""
self.clear(time)
super().add(elem)
def clear(self, time=0):
"""
Clear buffer, add cleared chunk into memory.
"""
if self._data:
self.dm.add(self._data.pop(), time)
def copy(self, dm=None):
"""
Copy buffer, along with its declarative memory, unless dm is specified. You need to specify new dm if 2 buffers share the same dm - only one of them should copy dm then.
"""
if dm == None:
dm = self.dm
copy_buffer = DecMemBuffer(dm, self._data.copy())
return copy_buffer
def test(self, state, inquiry):
"""
Is current state busy/free/error?
"""
return getattr(self, state) == inquiry
def retrieve(self, time, otherchunk, actrvariables, buffers, extra_tests, model_parameters):
"""
Retrieve a chunk from declarative memory that matches otherchunk.
"""
model_parameters = model_parameters.copy()
model_parameters.update(self.model_parameters)
if actrvariables == None:
actrvariables = {}
try:
mod_attr_val = {x[0]: utilities.check_bound_vars(actrvariables, x[1], negative_impossible=False) for x in
otherchunk.removeunused()}
except utilities.ModelError as arg:
raise utilities.ModelError("Retrieving the chunk '%s' is impossible; %s" % (otherchunk, arg))
chunk_tobe_matched = chunks.Chunk(otherchunk.typename, **mod_attr_val)
max_A = float("-inf")
retrieved = None
for chunk in self.dm:
try:
if extra_tests["recently_retrieved"] == False or extra_tests["recently_retrieved"] == 'False':
if self.__finst and chunk in self.recent:
continue
else:
if self.__finst and chunk not in self.recent:
continue
except KeyError:
pass
if model_parameters["subsymbolic"]: # if subsymbolic, check activation
A_pm = 0
if model_parameters["partial_matching"]:
A_pm = chunk_tobe_matched.match(chunk, partialmatching=True,
mismatch_penalty=model_parameters["mismatch_penalty"])
else:
if not chunk_tobe_matched <= chunk:
continue
try:
A_bll = utilities.baselevel_learning(time, self.dm[chunk], model_parameters["baselevel_learning"],
model_parameters["decay"], self.dm.activations.get(chunk),
optimized_learning=model_parameters[
"optimized_learning"]) # bll
except UnboundLocalError:
continue
A_sa = utilities.spreading_activation(chunk, buffers, self.dm,
model_parameters["buffer_spreading_activation"],
model_parameters["strength_of_association"],
model_parameters["spreading_activation_restricted"],
model_parameters["association_only_from_chunks"])
inst_noise = utilities.calculate_instantanoues_noise(model_parameters["instantaneous_noise"])
A = A_bll + A_sa + A_pm + inst_noise # chunk.activation is the manually specified activation, potentially used by the modeller
if utilities.retrieval_success(A, model_parameters["retrieval_threshold"]) and max_A < A:
max_A = A
self.activation = max_A
retrieved = chunk
extra_time = utilities.retrieval_latency(A, model_parameters["latency_factor"],
model_parameters["latency_exponent"])
if model_parameters["activation_trace"]:
print("(Partially) matching chunk:", chunk)
print("Base level learning:", A_bll)
print("Spreading activation", A_sa)
print("Partial matching", A_pm)
print("Noise:", inst_noise)
print("Total activation", A)
print("Time to retrieve", extra_time)
else: # otherwise, just standard time for rule firing, so no extra calculation needed
if chunk_tobe_matched <= chunk and self.dm[chunk][
0] != time: # the second condition ensures that the chunk that was created are not retrieved at the same time
retrieved = chunk
extra_time = model_parameters["rule_firing"]
if not retrieved:
if model_parameters["subsymbolic"]:
extra_time = utilities.retrieval_latency(model_parameters["retrieval_threshold"],
model_parameters["latency_factor"],
model_parameters["latency_exponent"])
else:
extra_time = model_parameters["rule_firing"]
if self.__finst:
self.recent.append(retrieved)
if self.__finst < len(self.recent):
self.recent.popleft()
return retrieved, extra_time
def retrieve1(self, time, chunk_tobe_matched, model_parameters,dmc,file,email):
"""
Retrieve a chunk from declarative memory that matches otherchunk.
"""
model_parameters = model_parameters.copy()
model_parameters.update(self.model_parameters)
max_A = float("-inf")
retrieved = None
for chunk in dmc:
if chunk.typename == chunk_tobe_matched.typename:
if model_parameters["subsymbolic"]: #if subsymbolic, check activation
A_pm = 0
A_sa = 0
if model_parameters["partial_matching"]:
A_pm = chunk_tobe_matched.match(chunk, partialmatching=True, mismatch_penalty=model_parameters["mismatch_penalty"])
if chunk.typename == 'IDENTIFY_LINKS':
A_pm = A_pm/2
else:
if not chunk_tobe_matched <= chunk:
continue
try:
A_bll = utilities.baselevel_learning(time, dmc[chunk], model_parameters["baselevel_learning"], model_parameters["decay"], dmc.activations.get(chunk), optimized_learning=model_parameters["optimized_learning"]) #bll
except UnboundLocalError:
continue
# A_sa = utilities.spreading_activation(chunk, buffers, dmc, model_parameters["buffer_spreading_activation"], model_parameters["strength_of_association"], model_parameters["spreading_activation_restricted"], model_parameters["association_only_from_chunks"])
inst_noise = utilities.calculate_instantanoues_noise(model_parameters["instantaneous_noise"])
A = A_bll + A_sa + A_pm + inst_noise #chunk.activation is the manually specified activation, potentially used by the modeller
if utilities.retrieval_success(A, model_parameters["retrieval_threshold"]) and max_A < A:
max_A = A
self.activation = max_A
retrieved = chunk
extra_time = utilities.retrieval_latency(A, model_parameters["latency_factor"], model_parameters["latency_exponent"])
if model_parameters["activation_trace"]:
file.write("(Potentially) matching chunk: "+"\n"+chunk.__repr__()+"\n")
file.write("Base level learning: "+ str(A_bll)+"\n")
file.write("Partial matching: "+str(A_pm)+"\n")
file.write("Noise: "+str(inst_noise)+"\n")
file.write("Total activation: "+str(A)+"\n")
# print("***************************************")
# print("(Partially) matching chunk:", chunk)
# print("Base level learning:", A_bll)
# print("Spreading activation", A_sa)
# print("Partial matching", A_pm)
# print("Noise:", inst_noise)
# print("Total activation", A)
# print("***************************************")
else: #otherwise, just standard time for rule firing, so no extra calculation needed
if chunk_tobe_matched <= chunk and dmc[chunk][0] != time: #the second condition ensures that the chunk that was created are not retrieved at the same time
retrieved = chunk
extra_time = model_parameters["rule_firing"]
if not retrieved:
if model_parameters["subsymbolic"]:
extra_time = utilities.retrieval_latency(model_parameters["retrieval_threshold"], model_parameters["latency_factor"], model_parameters["latency_exponent"])
else:
extra_time = model_parameters["rule_firing"]
if self.__finst:
self.recent.append(retrieved)
if self.__finst < len(self.recent):
self.recent.popleft()
return retrieved, max_A