forked from certifi/python-certifi
-
Notifications
You must be signed in to change notification settings - Fork 1
/
update_bundle.py
202 lines (159 loc) · 6.42 KB
/
update_bundle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import datetime
import hashlib
import io
import pathlib
import shutil
import typing
from typing import cast
import requests
import asn1crypto.pem
from asn1crypto.x509 import KeyPurposeId
from signify.authenticode.authroot import CertificateTrustList, \
CertificateTrustSubject
PACKAGE_DIR = pathlib.Path(__file__).resolve().parent / "mscerts"
# This is where we fetch and store the STL-file
AUTHROOTSTL_URL = "http://ctldl.windowsupdate.com/msdownload/update/v3/static/trustedr/en/authroot.stl"
AUTHROOTSTL_PATH = PACKAGE_DIR / "authroot.stl"
# This is where we fetch the individual certificates from
CERT_URL = "http://ctldl.windowsupdate.com/msdownload/update/v3/static/trustedr/en/{}.crt"
# Certificates are cached here
CACHE_PATH = pathlib.Path(__file__).resolve().parent / ".cache" / "certs"
CACHE_PATH.mkdir(parents=True, exist_ok=True)
# And we generate one big bundle here
BUNDLE_PATH = PACKAGE_DIR / "cacert.pem"
# We update the version in this file
VERSION_FILE = PACKAGE_DIR / "__init__.py"
# First fetch all data
def fetch_to_file(url: str, path: pathlib.Path) -> None:
"""Fetches the provided URL and writes it to the provided path."""
with requests.get(url, stream=True) as r, open(str(path), "wb") as f:
r.raise_for_status()
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
def hash_file(path: pathlib.Path) -> str:
"""Returns a simple (md5) hash of the provided path. Used to check whether
the file contents have changed.
"""
hash = hashlib.md5()
with open(path, "rb") as f:
while True:
chunk = f.read(hash.block_size)
if not chunk:
break
hash.update(chunk)
return hash.hexdigest()
def check_certificate_in_cache(
identifier: str,
cache_path: pathlib.Path = CACHE_PATH,
) -> bool:
"""Checks whether the identifier is already present in the cache."""
if not (cache_path / identifier).exists():
return False
with open(cache_path / identifier, "r") as cert_file:
content = cert_file.read()
if "-----END CERTIFICATE-----" not in content:
print(f"Invalid cached certificate, adding {identifier} again")
return False
return True
def fetch_certificate(identifier: str, cert_url: str = CERT_URL) -> None:
"""Fetches the certificate by identifier from the certificate URL,
and writes it armored to the cache directory.
"""
r = requests.get(cert_url.format(identifier))
r.raise_for_status()
with open(CACHE_PATH / identifier, "wb") as f:
f.write(asn1crypto.pem.armor("CERTIFICATE", r.content))
print(f"- Fetched certificate {identifier}")
def fetch_certificates(ctl: CertificateTrustList) -> None:
"""Fetches all certificates in the CertificateTrustList"""
for i, subject in enumerate(ctl.subjects):
print(subject.friendly_name[:-1], f"{i + 1} / {len(ctl.subjects)}")
if check_certificate_in_cache(subject.identifier.hex()):
continue
fetch_certificate(subject.identifier.hex())
def dump_certificate(
f: typing.TextIO,
subject: CertificateTrustSubject,
) -> None:
"""Dump an individual certificate to an already-open file."""
with open(CACHE_PATH / subject.identifier.hex(), "r") as cert_file:
certificate_body = cert_file.read()
f.write(f"# Subject Identifier: {subject.identifier.hex()}\n")
if subject.friendly_name:
name = subject.friendly_name.encode('ascii', 'ignore').decode()
if name != subject.friendly_name:
f.write(f"# Friendly Name (ASCII): {name}\n")
else:
f.write(f"# Friendly Name: {name}\n")
if subject.extended_key_usages:
f.write(
f"# Extended key usages: {subject.extended_key_usages}\n"
)
if subject.subject_name_md5:
f.write(f"# Subject Name MD5: {subject.subject_name_md5.hex()}\n")
if subject.disallowed_filetime:
f.write(f"# Disallowed Filetime: {subject.disallowed_filetime}\n")
if subject.root_program_chain_policies:
f.write(
"# Root Program Chain Policies: "
f"{subject.root_program_chain_policies}\n"
)
if subject.disallowed_extended_key_usages:
f.write(
"# Disallowed extended key usages: "
f"{subject.disallowed_extended_key_usages}\n"
)
if subject.not_before_filetime:
f.write(f"# Not before Filetime: {subject.not_before_filetime}\n")
if subject.not_before_extended_key_usages:
f.write(
"# Not before extended key usages: "
f"{subject.not_before_extended_key_usages}\n"
)
f.write(certificate_body)
f.write("\n")
def patch_version(filename: pathlib.Path = VERSION_FILE) -> None:
"""Changes the date-based version number in the provided file."""
cache = io.StringIO()
with open(filename, "r") as f:
for line in f:
if line.startswith("__version__"):
today = datetime.date.today()
# write it quite ugly, but this ensures that we do not have
# leading zeroes
cache.write(
f'__version__ = "{today.year}.{today.month}.{today.day}"\n'
)
else:
cache.write(line)
cache.seek(0)
with open(filename, "w") as f:
shutil.copyfileobj(cache, f)
def main() -> None:
# Calculate current hash to see if contents have changed
if AUTHROOTSTL_PATH.exists():
old_hash = hash_file(AUTHROOTSTL_PATH)
else:
old_hash = ""
# Download new file and hash
fetch_to_file(AUTHROOTSTL_URL, AUTHROOTSTL_PATH)
new_hash = hash_file(AUTHROOTSTL_PATH)
# let signify parse the CertificateTrustList for us
ctl = CertificateTrustList.from_stl_file(AUTHROOTSTL_PATH)
print(f"Fetched CTL file, there are {len(ctl.subjects)} subjects")
# fetch all certificates to cache
fetch_certificates(ctl)
print("Fetched all certificates to cache")
# dump certificates to bundle
with open(BUNDLE_PATH, "w", encoding='utf-8') as f:
for subject in ctl.subjects:
dump_certificate(f, subject)
print(f"Dumped certificates to {BUNDLE_PATH}")
# patch version if needed
if old_hash != new_hash:
patch_version()
print(f"Patched version number in {VERSION_FILE}")
else:
print("Did not patch version number because contents have not changed.")
if __name__ == '__main__':
main()