-
Notifications
You must be signed in to change notification settings - Fork 1
/
grokkermod.py
185 lines (155 loc) · 7.35 KB
/
grokkermod.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import concurrent.futures
import os
import pefile
import tarfile
import threading
import zstandard
from contextlib import contextmanager, closing
from urllib.request import urlopen
_tls = threading.local()
# python uses .pyd for extensions, ruby uses .so, and octave uses .oct
PE_FILE_EXTENSIONS = frozenset((".dll", ".exe", ".pyd", ".so", ".oct"))
@contextmanager
def open_zstd_supporting_tar(name, fileobj):
# HACK: please, Python, support zst with |* in tarfile
# could probably check for magic, but would have to have a stream wrapper
# like tarfile already has to "put back" the magic bytes
if str(name).endswith(".zst"):
if not hasattr(_tls, 'zdctx'):
_tls.zdctx = zstandard.ZstdDecompressor()
with _tls.zdctx.stream_reader(fileobj, closefd=False) as zstream, \
tarfile.open(fileobj=zstream, mode="r|") as tar:
yield tar
else:
with tarfile.open(fileobj=fileobj, mode="r|*") as tar:
yield tar
@contextmanager
def pefile_set_max_import_symbols(max_symbols: int):
# Since pefile uses global state for this, and we don't want to mess with it
# for the rest of the program, we'll use a context manager to temporarily
# bump the value.
old = pefile.MAX_IMPORT_SYMBOLS
pefile.MAX_IMPORT_SYMBOLS = max_symbols
try:
yield
finally:
pefile.MAX_IMPORT_SYMBOLS = old
class ProblematicImportSearcher(object):
def __init__(self, problem_dll_symbols, local_mirror=None, artifacts=None):
super(ProblematicImportSearcher, self).__init__()
self.problem_dlls = problem_dll_symbols
self.local_mirror = local_mirror
self.artifacts = artifacts
def _open_package(self, pkg):
if self.artifacts and pkg.name in self.artifacts:
return open(self.artifacts[pkg.name], "rb")
if self.local_mirror:
localfile = os.path.join(self.local_mirror, pkg.filename)
return open(localfile, "rb")
else:
return urlopen("{}/{}".format(pkg.db.url, pkg.filename))
def __call__(self, pkg):
try:
if not any(os.path.splitext(f)[-1] in PE_FILE_EXTENSIONS for f in pkg.files):
return None
filename = self.artifacts.get(pkg.name, pkg.filename) if self.artifacts else pkg.filename
with self._open_package(pkg) as pkgfile, \
open_zstd_supporting_tar(filename, pkgfile) as tar:
for entry in tar:
if not entry.isreg() or os.path.splitext(entry.name)[-1] not in PE_FILE_EXTENSIONS:
continue
try:
with tar.extractfile(entry) as infofile, \
closing(pefile.PE(data=infofile.read(), fast_load=True, max_symbol_exports=0x10000)) as pe:
with pefile_set_max_import_symbols(0x10000):
pe.parse_data_directories(directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']
])
if len(pe.get_warnings()) > 0:
print(f"Warnings for {entry.name}:")
pe.show_warnings()
for entry in pe.DIRECTORY_ENTRY_IMPORT:
problem_symbols = self.problem_dlls.get(entry.dll.lower(), None)
if problem_symbols is not None:
if not problem_symbols:
return pkg
for imp in entry.imports:
if imp.name in problem_symbols:
return pkg
except pefile.PEFormatError:
continue
except Exception:
raise RuntimeError(f"Failed to grok package {pkg.name}")
return None
def grok_dependency_tree(repo, package, package_handler):
with concurrent.futures.ThreadPoolExecutor(20) as executor:
makedepend={}
done={}
if isinstance(package, str):
todo=set((package,))
else:
todo=set(package)
# Check packages that immediately makedepend on the given package
# https://github.com/jeremyd2019/package-grokker/issues/6
for pkgname in todo:
for rdep in repo.get_pkg(pkgname).compute_rdepends('makedepends'):
if rdep not in makedepend:
makedepend[rdep] = executor.submit(package_handler, repo.get_pkg(rdep))
while todo:
more=set()
for pkgname in todo:
pkg = repo.get_pkg(pkgname)
more.update(rdep for rdep in pkg.compute_requiredby() if rdep not in done and rdep not in todo)
if pkgname in makedepend:
done[pkgname] = makedepend[pkgname]
del makedepend[pkgname]
else:
done[pkgname] = executor.submit(package_handler, pkg)
todo = more
del repo
futures = set(done.values())
futures.update(makedepend.values())
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is not None:
yield result.base
def exports_for_package(name, fileobj):
try:
package_exports = {}
with open_zstd_supporting_tar(name, fileobj) as tar:
for entry in tar:
if not entry.isreg() or os.path.splitext(entry.name)[-1] not in PE_FILE_EXTENSIONS:
continue
try:
with tar.extractfile(entry) as infofile, \
closing(pefile.PE(data=infofile.read(), fast_load=True, max_symbol_exports=0x10000)) as pe:
pe.parse_data_directories(directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT']
])
if len(pe.get_warnings()) > 0:
print(f"Warnings for {entry.name}:")
pe.show_warnings()
# assume we don't need to worry about ordinal-only exports
if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
package_exports[entry.name] = set(exp.name for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols)
else:
package_exports[entry.name] = set()
except pefile.PEFormatError:
continue
return package_exports
except Exception:
raise RuntimeError(f"Failed to get DLL exports from package {name!s}")
def diff_package_exports(url1, url2):
with urlopen(url1) as fileobj:
exports1 = exports_for_package(os.path.basename(url1), fileobj)
with urlopen(url2) as fileobj:
exports2 = exports_for_package(os.path.basename(url2), fileobj)
problem_dll_symbols = {}
for dll, exports in exports1.items():
if dll not in exports2:
problem_dll_symbols[os.path.basename(dll).encode('ascii').lower()] = set()
else:
removed = exports - exports2[dll]
if removed:
problem_dll_symbols[os.path.basename(dll).encode('ascii').lower()] = removed
return problem_dll_symbols