diff --git a/tools/private_domains_checker/.gitignore b/tools/private_domains_checker/.gitignore new file mode 100644 index 000000000..40081a71b --- /dev/null +++ b/tools/private_domains_checker/.gitignore @@ -0,0 +1,3 @@ +venv +__pycache__ +data/*.csv \ No newline at end of file diff --git a/tools/private_domains_checker/PSLPrivateDomainsProcessor.py b/tools/private_domains_checker/PSLPrivateDomainsProcessor.py new file mode 100644 index 000000000..298a7492e --- /dev/null +++ b/tools/private_domains_checker/PSLPrivateDomainsProcessor.py @@ -0,0 +1,363 @@ +import datetime +import json +import time + +import pandas as pd +import requests +import whoisdomain as whois + + +def make_dns_request(domain, record_type): + """ + Makes DNS requests to both Google and Cloudflare DNS APIs. + + Args: + domain (str): The domain to query. + record_type (str): The type of DNS record to query. + + Returns: + list: A list containing the JSON responses from Google and Cloudflare. + """ + urls = [ + f"https://dns.google/resolve?name={domain}&type={record_type}", + f"https://cloudflare-dns.com/dns-query?name={domain}&type={record_type}" + ] + + headers = { + "accept": "application/dns-json" + } + + responses = [] + for url in urls: + try: + response = requests.get(url, headers=headers) + if response.status_code == 200: + json_response = response.json() + # print(f"URL: {url}, Response: {json_response}") + responses.append(json_response) + else: + # print(f"URL: {url}, Status Code: {response.status_code}") + responses.append(None) + except Exception as e: + print(f"URL: {url}, DNS Exception: {e}") + responses.append(None) + + return responses + + +def check_dns_status(domain): + """ + Checks the DNS status of a domain using Google and Cloudflare DNS APIs. + + Args: + domain (str): The domain to check. + + Returns: + str: The DNS status of the domain. + """ + + def make_request(): + responses = make_dns_request(domain, "NS") + if None in responses: + return "ERROR" + + google_status = responses[0].get("Status") + cloudflare_status = responses[1].get("Status") + + print(f"Google Status: {google_status}, Cloudflare Status: {cloudflare_status}") + + if google_status == cloudflare_status: + if google_status == 3: + return "NXDOMAIN" + else: + return "ok" + else: + return "INCONSISTENT" + + for _ in range(5): + dns_status = make_request() + print(f"Attempt {_ + 1}, DNS Status: {dns_status}") + if dns_status not in ["ERROR", "INCONSISTENT"]: + return dns_status + time.sleep(1) + return "INCONSISTENT" + + +def check_psl_txt_record(domain): + """ + Checks the _psl TXT record for a domain using Google and Cloudflare DNS APIs. + + Args: + domain (str): The domain to check. + + Returns: + str: The _psl TXT record status of the domain. + """ + # Prepare the domain for the TXT check + domain = domain.lstrip('*.').lstrip('!').encode('idna').decode('ascii') + + def make_request(): + responses = make_dns_request(f"_psl.{domain}", "TXT") + if None in responses: + return "ERROR" + + google_txt = responses[0].get("Answer", []) + cloudflare_txt = responses[1].get("Answer", []) + + google_txt_records = [record.get("data", "") for record in google_txt] + cloudflare_txt_records = [record.get("data", "").strip('"') for record in cloudflare_txt] + + print( + f"_psl TXT Records (Google): {google_txt_records}, _psl TXT Records (Cloudflare): {cloudflare_txt_records}") + + if google_txt_records == cloudflare_txt_records: + for record in google_txt_records: + if "github.com/publicsuffix/list/pull/" in record: + return "valid" + return "invalid" + else: + return "INCONSISTENT" + + for _ in range(5): + psl_txt_status = make_request() + print(f"Attempt {_ + 1}, PSL TXT Status: {psl_txt_status}") + if psl_txt_status not in ["ERROR", "INCONSISTENT"]: + return psl_txt_status + time.sleep(1) + return "INCONSISTENT" + + +def get_whois_data(domain): + """ + Retrieves WHOIS data for a domain using the whoisdomain package. + + Args: + domain (str): The domain to query. + + Returns: + tuple: A tuple containing WHOIS domain status, expiry date, and WHOIS status. + """ + try: + d = whois.query(domain) + whois_domain_status = d.statuses + whois_expiry = d.expiration_date + whois_status = "ok" + except Exception as e: + print(f"WHOIS Exception: {e}") + whois_domain_status = None + whois_expiry = None + whois_status = "ERROR" + return whois_domain_status, whois_expiry, whois_status + + +class PSLPrivateDomainsProcessor: + """ + A class to process PSL private section domains, check their status, and save the results. + """ + + def __init__(self): + """ + Initializes the PSLPrivateDomainsProcessor with default values and settings. + """ + self.psl_url = "https://raw.githubusercontent.com/publicsuffix/list/master/public_suffix_list.dat" + self.psl_icann_marker = "// ===BEGIN ICANN DOMAINS===" + self.psl_private_marker = "// ===BEGIN PRIVATE DOMAINS===" + self.columns = [ + "psl_entry", + "top_level_domain", + "dns_status", + "whois_status", + "whois_domain_expiry_date", + "whois_domain_status", + "psl_txt_status", + "expiry_check_status" + ] + self.df = pd.DataFrame(columns=self.columns) + self.icann_domains = set() + + def fetch_psl_data(self): + """ + Fetches the PSL data from the specified URL. + + Returns: + str: The raw PSL data. + """ + print("Fetching PSL data from URL...") + response = requests.get(self.psl_url) + psl_data = response.text + print("PSL data fetched.") + return psl_data + + def parse_domain(self, domain): + """ + Parses and normalizes a domain. + + Args: + domain (str): The domain to parse. + + Returns: + str: The normalized domain. + + Raises: + ValueError: If no valid top-level domain is found. + """ + domain = domain.lstrip('*.') # wildcards (*) + domain = domain.lstrip('!') # bangs (!) + + parts = domain.split('.') + + for i in range(len(parts)): + candidate = '.'.join(parts[i:]) + if candidate in self.icann_domains: + continue + elif '.'.join(parts[i + 1:]) in self.icann_domains: + return candidate.encode('idna').decode('ascii') + + raise ValueError(f"No valid top-level domain found in the provided domain: {domain}") + + def parse_psl_data(self, psl_data): + """ + Parses the fetched PSL data and separates ICANN and private domains. + + Args: + psl_data (str): The raw PSL data. + + Returns: + tuple: A tuple containing the unparsed private domains and the parsed private domains. + """ + print("Parsing PSL data...") + + lines = psl_data.splitlines() + process_icann = False + process_private = False + raw_private_domains = [] + parsed_private_domains = [] + + for line in lines: + stripped_line = line.strip() + if stripped_line == self.psl_icann_marker: + process_icann = True + process_private = False + continue + elif stripped_line == self.psl_private_marker: + process_icann = False + process_private = True + continue + + if stripped_line.startswith('//') or not stripped_line: + continue + + if process_icann: + self.icann_domains.add(stripped_line) + elif process_private: + raw_private_domains.append(stripped_line) + parsed_private_domains.append(stripped_line) + + print(f"Private domains to be processed: {len(parsed_private_domains)}\n" + f"ICANN domains: {len(self.icann_domains)}") + + parsed_private_domains = [self.parse_domain(domain) for domain in parsed_private_domains] + raw_private_domains = list(set(raw_private_domains)) + parsed_private_domains = list(set(parsed_private_domains)) + print("Private domains in the publicly registrable name space: ", len(parsed_private_domains)) + + return raw_private_domains, parsed_private_domains + + def process_domains(self, raw_domains, domains): + """ + Processes each domain, performing DNS, WHOIS, and _psl TXT record checks. + + Args: + raw_domains (list): A list of unparsed domains to process. + domains (list): A list of domains to process. + """ + data = [] + for raw_domain, domain in zip(raw_domains, domains): + whois_domain_status, whois_expiry, whois_status = get_whois_data(domain) + dns_status = check_dns_status(domain) + psl_txt_status = check_psl_txt_record(raw_domain) + + if whois_status == "ERROR": + expiry_check_status = "ERROR" + else: + expiry_check_status = "ok" if whois_expiry and whois_expiry >= ( + datetime.datetime.utcnow() + datetime.timedelta(days=365 * 2)) else "FAIL_2Y" + + print( + f"{domain} - DNS Status: {dns_status}, Expiry: {whois_expiry}, " + f"PSL TXT Status: {psl_txt_status}, Expiry Check: {expiry_check_status}") + + data.append({ + "psl_entry": domain, + "top_level_domain": domain, + "whois_domain_status": json.dumps(whois_domain_status), + "whois_domain_expiry_date": whois_expiry, + "whois_status": whois_status, + "dns_status": dns_status, + "psl_txt_status": psl_txt_status, + "expiry_check_status": expiry_check_status + }) + + self.df = pd.DataFrame(data, columns=self.columns) + + def save_results(self): + """ + Saves all processed domain data to data/all.csv. + """ + sorted_df = self.df.sort_values(by="psl_entry") + sorted_df.to_csv("data/all.csv", index=False) + + def save_invalid_results(self): + """ + Saves domains with invalid DNS or expired WHOIS data to data/nxdomain.csv and data/expired.csv. + """ + nxdomain_df = self.df[self.df["dns_status"] != "ok"].sort_values(by="psl_entry") + nxdomain_df.to_csv("data/nxdomain.csv", index=False) + + today_str = datetime.datetime.utcnow().strftime("%Y-%m-%d") + expired_df = self.df[ + self.df["whois_domain_expiry_date"].notnull() & + (self.df["whois_domain_expiry_date"].astype(str).str[:10] < today_str) + ].sort_values(by="psl_entry") + expired_df.to_csv("data/expired.csv", index=False) + + def save_hold_results(self): + """ + Saves domains with WHOIS status containing any form of "hold" to data/hold.csv. + """ + hold_df = self.df[ + self.df["whois_domain_status"].str.contains("hold", case=False, na=False) + ].sort_values(by="psl_entry") + hold_df.to_csv("data/hold.csv", index=False) + + def save_missing_psl_txt_results(self): + """ + Saves domains with invalid _psl TXT records to data/missing_psl_txt.csv. + """ + missing_psl_txt_df = self.df[self.df["psl_txt_status"] == "invalid"].sort_values(by="psl_entry") + missing_psl_txt_df.to_csv("data/missing_psl_txt.csv", index=False) + + def save_expiry_less_than_2yrs_results(self): + """ + Saves domains with WHOIS expiry date less than 2 years from now to data/expiry_less_than_2yrs.csv. + """ + expiry_less_than_2yrs_df = self.df[self.df["expiry_check_status"] == "FAIL_2Y"].sort_values(by="psl_entry") + expiry_less_than_2yrs_df.to_csv("data/expiry_less_than_2yrs.csv", index=False) + + def run(self): + """ + Executes the entire processing pipeline. + """ + psl_data = self.fetch_psl_data() + raw_domains, domains = self.parse_psl_data(psl_data) + self.process_domains(raw_domains, domains) + self.save_results() + self.save_invalid_results() + self.save_hold_results() + self.save_missing_psl_txt_results() + self.save_expiry_less_than_2yrs_results() + + +if __name__ == "__main__": + processor = PSLPrivateDomainsProcessor() + processor.run() diff --git a/tools/private_domains_checker/README.md b/tools/private_domains_checker/README.md new file mode 100644 index 000000000..f83589020 --- /dev/null +++ b/tools/private_domains_checker/README.md @@ -0,0 +1,112 @@ +# PSL Private Section Domains WHOIS Checker + +## Overview + +The `PSLPrivateDomainsProcessor` is a Python script designed to fetch data from the Public Suffix List (PSL) and check the domain status, expiry dates, and `_psl` TXT records of the private section domains. + +It performs WHOIS checks on these domains and saves the results into CSV files for manual review. + +## Requirements + +- Python 3.x +- `requests` +- `pandas` +- `whoisdomain` + +You can install the required packages using pip: + +```sh +pip install -r requirements.txt +``` + +Ensure that `whois` is installed on your operating system. + +```sh +sudo apt install whois # Debian/Ubuntu +sudo yum install whois # Fedora/Centos/Rocky +``` + +## Usage + +`PSLPrivateDomainsProcessor.py`: The main script containing the `PSLPrivateDomainsProcessor` class and functions for DNS and WHOIS checks. + +Run the script using Python: + +```sh +cd private_domains_checker +mkdir data +python PSLPrivateDomainsProcessor.py +``` + +## Main Components + +### Functions + +- `make_dns_request(domain, record_type)`: Makes DNS requests to both Google and Cloudflare DNS APIs. +- `check_dns_status(domain)`: Checks the DNS status of a domain using Google and Cloudflare DNS APIs. +- `get_whois_data(domain)`: Retrieves WHOIS data for a domain using the whoisdomain package. +- `check_psl_txt_record(domain)`: Checks the `_psl` TXT record for a domain using Google and Cloudflare DNS APIs. + +### Class + +#### PSLPrivateDomainsProcessor + +- `fetch_psl_data()`: Fetches the PSL data from the specified URL. +- `parse_domain(domain)`: Parses and normalizes a domain. +- `parse_psl_data(psl_data)`: Parses the fetched PSL data and separates ICANN and private domains. +- `process_domains(raw_domains, domains)`: Processes each domain, performing DNS, WHOIS, and `_psl` TXT record checks. +- `save_results()`: Saves all processed domain data to `data/all.csv`. +- `save_invalid_results()`: Saves domains with invalid DNS or expired WHOIS data to `data/nxdomain.csv` and `data/expired.csv`. +- `save_hold_results()`: Saves domains with WHOIS status containing any form of "hold" to `data/hold.csv`. +- `save_missing_psl_txt_results()`: Saves domains with invalid `_psl` TXT records to `data/missing_psl_txt.csv`. +- `save_expiry_less_than_2yrs_results()`: Saves domains with WHOIS expiry date less than 2 years from now to `data/expiry_less_than_2yrs.csv`. +- `run()`: Executes the entire processing pipeline. + +## Output + +The script generates the following CSV files in the `data` directory: + +- `all.csv`: Contains all processed domain data. +- `nxdomain.csv`: Contains domains that could not be resolved (`NXDOMAIN`). +- `expired.csv`: Contains domains with expired WHOIS records. +- `hold.csv`: Contains domains with WHOIS status indicating any kind of "hold". +- `missing_psl_txt.csv`: Contains domains with invalid `_psl` TXT records. +- `expiry_less_than_2yrs.csv`: Contains domains with WHOIS expiry date less than 2 years from now. + +## Example + +An example CSV entry: + +| psl_entry | top_level_domain | dns_status | whois_status | whois_domain_expiry_date | whois_domain_status | psl_txt_status | expiry_check_status | +| -------------- | ---------------- | ---------- | ------------ | ----------------------- | ---------------------------- | -------------- | ------------------- | +| example.com | example.com | ok | ok | 2024-12-31 | "clientTransferProhibited" | "valid" | ok | + +## Publicly Registrable Namespace Determination + +The script determines the publicly registrable namespace from private domains by using the ICANN section. + +Here's how it works: + +1. **ICANN Domains Set**: ICANN domains are stored in a set for quick lookup. +2. **Domain Parsing**: For each private domain, the script splits the domain into parts. It then checks if any suffix of these parts exists in the ICANN domains set. +3. **Normalization**: The private domain is normalized to its publicly registrable form using the ICANN domains set. + +Examples: + +- **Input**: PSL private domain entry `"*.example.com"` + - **Process**: + - Remove leading `'*.'`: `"example.com"` + - Check `"com"` against the ICANN domains set: Found + - **Output**: `"example.com"` + +- **Input**: PSL private domain entry `"sub.example.co.uk"` + - **Process**: + - Check `"example.co.uk"` against the ICANN domains set: Not found + - Check `"co.uk"` against the ICANN domains set: Found + - **Output**: `"example.co.uk"` + +The output is then used for checking WHOIS data. + +## License + +This tool is licensed under the MIT License. \ No newline at end of file diff --git a/tools/private_domains_checker/TestPSLPrivateDomainsProcessor.py b/tools/private_domains_checker/TestPSLPrivateDomainsProcessor.py new file mode 100644 index 000000000..c315aee6b --- /dev/null +++ b/tools/private_domains_checker/TestPSLPrivateDomainsProcessor.py @@ -0,0 +1,76 @@ +import unittest +import uuid + +from PSLPrivateDomainsProcessor import PSLPrivateDomainsProcessor, check_dns_status, get_whois_data, check_psl_txt_record + + +class TestPSLPrivateDomainsProcessor(unittest.TestCase): + + def setUp(self): + self.processor = PSLPrivateDomainsProcessor() + # Populate icann_domains for testing + self.processor.icann_domains = { + "com", "co.uk", "ac.uk", "net", "org" + } + + def test_parse_domain_icann_domain(self): + # Test cases where domains should be parsed correctly + test_cases = [ + ("*.example.com", "example.com"), + ("sub.example.com", "example.com"), + ("*.sub.example.com", "example.com"), + ("example.com", "example.com"), + ("example.co.uk", "example.co.uk"), + ("sub.example.co.uk", "example.co.uk"), + ("*.example.co.uk", "example.co.uk"), + ("*.sub.example.co.uk", "example.co.uk"), + ("abc.ac.uk", "abc.ac.uk"), + ("a.b.com", "b.com") + ] + + for domain, expected in test_cases: + with self.subTest(domain=domain): + result = self.processor.parse_domain(domain) + self.assertEqual(expected, result) + + def test_parse_domain_no_icann(self): + # Test case where no valid ICANN domain is found + self.processor.icann_domains.remove("com") + with self.assertRaises(ValueError): + self.processor.parse_domain("example.com") + + def test_parse_domain_edge_cases(self): + # Additional edge case testing + self.assertEqual("example.org", self.processor.parse_domain("sub.example.org")) + self.assertEqual("example.com", self.processor.parse_domain("example.com")) + self.assertEqual("example.ac.uk", self.processor.parse_domain("sub.example.ac.uk")) + + def test_parse_domain_invalid(self): + # Test invalid domains which should raise ValueError + invalid_domains = ["invalid.test", "*.invalid.test", "sub.invalid.test"] + for domain in invalid_domains: + with self.subTest(domain=domain): + with self.assertRaises(ValueError): + self.processor.parse_domain(domain) + + def test_check_dns_status(self): + # Test with a known good domain + self.assertEqual("ok", check_dns_status("mozilla.org")) + # Test with a likely non-existent domain + random_domain = "nxdomain-" + str(uuid.uuid4()) + ".edu" + self.assertEqual("NXDOMAIN", check_dns_status(random_domain)) + + def test_check_psl_txt_record(self): + # Test with a known domain having a valid _psl TXT record + self.assertEqual("valid", check_psl_txt_record("cdn.cloudflare.net")) + # Test with a domain without a _psl TXT record + random_domain = "invalid-" + str(uuid.uuid4()) + ".edu" + self.assertEqual("invalid", check_psl_txt_record(random_domain)) + + def test_get_whois_data(self): + whois_data = get_whois_data("example.com") + self.assertEqual("ok", whois_data[2]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/private_domains_checker/requirements.txt b/tools/private_domains_checker/requirements.txt new file mode 100644 index 000000000..3e8a5ef13 --- /dev/null +++ b/tools/private_domains_checker/requirements.txt @@ -0,0 +1,3 @@ +pandas +requests +whoisdomain \ No newline at end of file