-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
147 lines (121 loc) · 5.68 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import json
import gzip
import os
import requests
import base64
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography import x509
from ratelimit import limits
import time
url = "https://twig.ct.letsencrypt.org/2024h1/ct/v1/add-chain"
# python-dotenv, bot3, gzip, json, ratelimit
PROCESSED_CHAINS_FILE = "processed_chains.txt"
directory = "OONI-S3-Datasets/2024"
def is_chain_processed(chain):
"""Checks if a certificate chain has already been processed by calculating its SHA256 hash and comparing it to the ones stored in the processed chains file."""
try:
chain_hash = hashes.Hash(hashes.SHA256(), backend=default_backend())
for cert in chain:
chain_hash.update(cert.public_bytes(serialization.Encoding.DER))
chain_hash = chain_hash.finalize().hex()
with open(PROCESSED_CHAINS_FILE, "r") as f:
return chain_hash in f.read().splitlines()
except FileNotFoundError:
return False
def mark_chain_processed(chain):
"""Marks a certificate chain as processed by calculating its SHA256 hash and storing it in the processed chains file."""
chain_hash = hashes.Hash(hashes.SHA256(), backend=default_backend())
for cert in chain:
chain_hash.update(cert.public_bytes(serialization.Encoding.DER))
chain_hash = chain_hash.finalize().hex()
with open(PROCESSED_CHAINS_FILE, "a") as f:
f.write(chain_hash + "\n")
def fetch_measurement_data(file_path):
"""Reads a gzipped JSONL file containing OONI measurements and yields each measurement as a dictionary."""
try:
with gzip.open(file_path, "rb") as f:
for line in f:
try:
measurement_data = json.loads(line.decode("utf-8"))
yield measurement_data
except json.JSONDecodeError as e:
print(f"Failed to decode line: {e}")
except Exception as e:
print(f"Failed to read file: {e}")
@limits(calls=3, period=10)
def submit_to_ct(chain):
"""Submits a certificate chain to the specified CT log with rate limiting."""
chain_data = [cert.public_bytes(serialization.Encoding.DER) for cert in chain]
payload = {"chain": [base64.b64encode(data).decode() for data in chain_data]}
try:
response = requests.post(url, json=payload)
# Handle various response codes
if response.status_code == 200:
print(f"Submission successful: {response.json()}")
elif 400 <= response.status_code < 500:
print(f"Client error: {response.status_code}, {response.text}")
elif response.status_code >= 500:
print(f"Server error: {response.status_code}, {response.text}")
else:
print(f"Unexpected response: {response.status_code}, {response.text}")
except requests.exceptions.RequestException as err:
print(f"Error submitting chain: {err}")
def extract_certificate_chains(measurement_data):
"""Extracts and parses certificate chains from OONI web connectivity measurement data."""
try:
if measurement_data.get("test_name") != "web_connectivity":
return []
tls_handshakes = measurement_data.get("test_keys", {}).get("tls_handshakes", [])
if not tls_handshakes:
print("No tls_handshakes found in measurement data.")
return []
all_certs = []
for tls_handshake in tls_handshakes:
peer_certificates = tls_handshake.get(
"peer_certificates", []
) # Extract the certificate chain
# Parse Certificate Chains
certs = []
for cert_data in peer_certificates:
cert_bytes = base64.b64decode(cert_data["data"])
try:
cert = x509.load_der_x509_certificate(cert_bytes, default_backend())
certs.append(cert)
except ValueError:
print(f"Failed to decode certificate in chain: {cert_data['data']}")
if certs: # If the cert list is not empty
all_certs.append(certs) # Add it to the list of all certificate chains
return all_certs
except Exception as e:
print(f"Error extracting certificate chain: {e}")
return []
if __name__ == "__main__":
for filename in os.listdir(directory):
if filename.endswith(".jsonl.gz"):
file_path = os.path.join(directory, filename)
try:
for measurement in fetch_measurement_data(
file_path
): # Iterate over each measurement in the file
if measurement:
certificate_chains = extract_certificate_chains(measurement)
if certificate_chains:
for chain in certificate_chains:
if not is_chain_processed(chain):
time.sleep(5)
submit_to_ct(chain)
mark_chain_processed(chain)
else:
print(
f"Skipping already submitted chain in {filename}"
)
else:
print(f"No certificate chains found in {filename}")
else:
print(f"No valid measurement data found in {filename}")
except Exception as e:
print(f"Error processing file {filename}: {e}")
else:
print(f"Skipping non-JSONL file: {filename}")