This repository has been archived by the owner on Jan 5, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsolver.py
205 lines (181 loc) · 7.66 KB
/
solver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import os, sys, constants, math, functools, dataclasses, argparse, ipc, threading, time, multiprocessing, multiprocessing.connection
database = []
try:
with open(os.path.join(sys.path[0], constants.DICT)) as file:
database = tuple(file.read().split())
except FileNotFoundError:
print(f"ERROR: {constants.DICT} not found", file=sys.stderr)
sys.exit(constants.ERROR_FILE_NOT_FOUND)
cache_table = [0] * (391) # stocheaza cele 26 litere * 5 pozitii posibile * 3 variante culori
cache_dict = {}
cached_set = None
@dataclasses.dataclass(frozen=True)
class MatchSet:
hashes: tuple[int] # hashurile seturilor care au fost intersectate
matches: set[str] # setul de cuvinte care au fost intersectate
@dataclasses.dataclass(frozen=True) # frozen o face imutabila
class MatchInfo:
code: int # 0, 1, 2 pt o culoare
pos: int # pozitia pe care e culoarea
char: str # litera cu care avem acea culoare
def invalidate_cache():
cache_table = [0] * (391)
cache_dist = {}
def get_match_hash(match : MatchInfo): # converteste structura MatchInfo intr-un hash
return (ord(match.char) - ord('A')) + ((match.pos * 26) if match.code != 0 else 0) + 26 * 5 * match.code
def merge_hash_tuples(l1 : tuple[int], l2 : tuple[int]):
return tuple(sorted(l1 + l2))
def filter_by_match(match : MatchInfo, db=database):
hash = get_match_hash(match)
if cache_table[hash] == 0:
if match.code == constants.FULL_MATCH:
cache_table[hash] = {x for x in db if x[match.pos] == match.char}
elif match.code == constants.PARTIAL_MATCH:
cache_table[hash] = {x for x in db if x[match.pos] != match.char and match.char in x}
else:
cache_table[hash] = {x for x in db if match.char not in x}
return cache_table[hash]
def get_match_info(guess : str, answer : str):
def letter_info(pos):
if guess[pos] == answer[pos]:
return constants.FULL_MATCH # 2 - litera e verde
elif guess[pos] in answer:
return constants.PARTIAL_MATCH # 1 - litera e galbena
else:
return constants.NO_MATCH # 0 - litera e rosie
return [MatchInfo(letter_info(pos), pos, guess[pos]) for pos in range(len(guess))]
def reduce_sets(s1 : MatchSet, s2 : MatchSet):
hash = merge_hash_tuples(s1.hashes, s2.hashes)
if cache_dict.get(hash) == None:
cache_dict[hash] = s1.matches & s2.matches # intersectia de set-uri
return MatchSet(hash, cache_dict[hash])
def get_match_space(matches : list[MatchInfo], base_set = database):
if matches == []:
return database
sets : list[MatchSet] = [MatchSet((get_match_hash(m),), filter_by_match(m, base_set)) for m in matches]
return functools.reduce(reduce_sets, sets).matches
def calculate_entropy(guess : str, prev_info: list[MatchInfo] = []):
global cached_set
entropy = 0
possible_words = get_match_space(prev_info) if prev_info != [] else database
if possible_words != cached_set:
cached_set = possible_words
invalidate_cache()
for answer in possible_words:
match = sorted(get_match_info(guess, answer), reverse=True, key=lambda x : x.code) # incep cu literele verzi
match_space = get_match_space(match, possible_words)
information = math.log2(len(possible_words) / len(match_space)) # informatie primita datorita cuvantului
entropy += information # formula entropiei
entropy /= len(possible_words) # deoarece probabilitatile cuvintelor sunt egale
return entropy
def calculate_opener():
print("Calculating Wordle opener entropies...")
print("Using PyPy is recommended...")
entropy_values = [0] * len(database)
for i in range(len(database)):
entropy_values[i] = calculate_entropy(database[i])
if not(i%100):
print(f"{(i+1)/len(database) * 100}%", flush=True)
with open("entropii.txt", mode='w') as file:
for val in zip(entropy_values, database):
print(f"{val[1]} -> {val[0]}", file=file)
with open("entropii_sortate.txt", mode='w') as file:
for val in sorted(zip(entropy_values, database), reverse=True):
print(f"{val[1]} -> {val[0]}", file=file)
def w2_word(m : list[MatchInfo]):
return ''.join(m.char for m in m)
def w2_path(m : list[MatchInfo]):
return os.path.join("w2_cache", ''.join(str(m.code) for m in m))
def load_w2_cache(match : list[MatchInfo]):
try:
with open(w2_path(match)) as f:
return f.read().strip()
except FileNotFoundError:
return None
def save_w2_cache(match : list[MatchInfo], word : str):
os.makedirs("w2_cache", exist_ok=True)
try:
with open(w2_path(match), "x") as f:
f.write(word)
except FileExistsError:
pass
def calculate_best_guess(prev_info : list[MatchInfo], hard_mode = False) -> str:
if prev_info == []:
return constants.OPENER
elif w2_word(prev_info) == constants.OPENER:
r = load_w2_cache(prev_info)
if r:
return r
db = database
possible = list(get_match_space(prev_info))
if len(possible) == 1:
if w2_word(prev_info) == constants.OPENER:
save_w2_cache(prev_info, possible[0])
return possible[0]
if hard_mode:
db = possible
entropy_values = [0] * len(db)
for i in range(len(db)):
entropy_values[i] = calculate_entropy(db[i], prev_info)
if not(i%1000):
print(f"{round((i+1)/len(db) * 100, 2)}%", flush=True)
best = sorted(zip(entropy_values, db), reverse=True)[0][1]
if w2_word(prev_info) == constants.OPENER:
save_w2_cache(prev_info, best)
return best
def parse_prev_data(msg):
try:
prev_data = []
for pair in msg.split():
for i, [char, code] in enumerate(zip(*pair.split("="))):
prev_data.append(MatchInfo(int(code), i, char))
return prev_data
except:
ipc.err()
def wait_for_command():
#TODO: wait for GUI request on PORT + 1
pass
def gen_caches():
max_proc = 10
work : list[multiprocessing.Process] = []
files_in_threads : list = []
for word in database:
mtc = get_match_info(constants.OPENER, word)
p = w2_path(mtc)
if not (os.path.exists(p) or p in files_in_threads):
files_in_threads.append(p)
print(f"Calculating cache {p} using word {word}.")
work.append(multiprocessing.Process(group=None, target=calculate_best_guess, args=[mtc]))
work[-1].start() # fork
if len(work) >= max_proc:
print("Max limit reached...")
multiprocessing.connection.wait([p.sentinel for p in work])
work = [p for p in work if p.is_alive()]
print("Making more processes...")
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--openers', action='store_true', help='calculate entropies for all possible openers')
parser.add_argument("--w2-cache", action="store_true", help="precalculate second guess cache")
parser.add_argument('--port', default=constants.PORT, help='port to use for IPC')
args = parser.parse_args()
if args.openers:
calculate_opener()
return
if args.w2_cache:
gen_caches()
return
ipc.set_port(args.port)
while True:
wait_for_command()
ipc.write("LIST")
prev_data = parse_prev_data(ipc.read())
guess = calculate_best_guess(prev_data)
ipc.write(guess)
result = ipc.read()
if result == "CASTIGAT":
print("DONE")
return
elif result != "OK":
ipc.err()
if __name__ == "__main__":
main()