-
Notifications
You must be signed in to change notification settings - Fork 1
/
problog_ngs_abducer.py
99 lines (87 loc) · 3.38 KB
/
problog_ngs_abducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from multiprocessing import Pool
import bisect
from functools import partial
import numpy as np
def expr_generate(length, opts, nums, cur_list, ret):
if len(cur_list) == length:
ret.append("".join(cur_list))
return
if len(cur_list) % 2 == 0:
cands = nums
else:
cands = opts
for cand in cands:
if len(cur_list) > 0 and cur_list[-1] == '/' and cand == 0:
continue
cur_list.append(str(cand))
expr_generate(length, opts, nums, cur_list, ret)
cur_list.pop()
class Abducer:
def __init__(self, min_len = 1, max_len = 7, mode = "HWF"):
self.mode = mode
if mode == "HWF":
opts = ["+", "-", "*", "/"]
nums = list(range(1, 10))
elif mode == "2ADD":
opts = ["+"]
nums = list(range(0, 10))
else:
assert mode in ["HWF", "2ADD"]
self.KB = {}
for l in range(min_len, max_len + 1, 2):
expr_list = []
expr_generate(l, opts, nums, [], expr_list)
ans_list = map(eval, expr_list)
expr_dict = {}
for ans, expr in zip(ans_list, expr_list):
expr_dict.setdefault(ans, [])
expr_dict[ans].append(expr)
ans_list = [k for k, _ in expr_dict.items()]
expr_list = [v for _, v in expr_dict.items()]
equation_list = sorted(list(zip(ans_list, expr_list)))
ans_list = [e[0] for e in equation_list]
expr_list = [e[1] for e in equation_list]
self.KB[l] = (ans_list, expr_list)
def _get_ans_set(self, expr, ans):
l = len(expr)
sub_KB = self.KB[l]
ans_list = sub_KB[0]
expr_list = sub_KB[1]
idx = bisect.bisect_left(sub_KB[0], ans)
begin = max(0, idx - 1)
end = min(idx + 2, len(sub_KB[0]))
min_err = 999999
best_set = []
for idx in range(begin, end):
err = abs(ans_list[idx] - ans)
if err < min_err:
best_set = expr_list[idx]
min_err = err
return best_set
def _abduce(self, data, max_address_num, require_more_address):
sequence, ans = data
ans_set = self._get_ans_set(sequence, ans)
hamming_dist_list = np.array([sum(c1 != c2 for c1, c2 in zip(sequence, s)) for s in ans_set])
address_num = np.min(hamming_dist_list)
idxs = np.where(hamming_dist_list<=address_num+require_more_address)[0]
strip = 1
if self.mode == "2ADD":
strip = 2
return [ans_set[idx][::strip] for idx in idxs], address_num
def abduce(self, sequences, anss, max_address_num, require_more_address = 0):
if self.mode == "2ADD":
sequences = ["+".join(s for s in sequence) for sequence in sequences]
partial__abduce = partial(self._abduce, max_address_num = max_address_num, require_more_address = require_more_address)
#pool = Pool(processes = 30)
#ret = pool.map(partial__abduce, zip(sequences, anss))
#pool.close()
#pool.join()
ret = list(map(partial__abduce, zip(sequences, anss)))
return ret
if __name__ == "__main__":
# abducor = Abducer(mode = "HWF")
# ans = abducor(["2*3", "2+3"], [5, 6], 1)
# print(ans)
abducor = Abducer(mode = "2ADD")
ans = abducor.abduce(["23", "23", "87"], [5, 6, 15], 1, 2)
print(ans)