diff --git a/json_logger.py b/json_logger.py index 58eef1a..d382901 100644 --- a/json_logger.py +++ b/json_logger.py @@ -1,17 +1,17 @@ -import sys -import json -import time - - -def log(target: str = None, error: Exception = None, obj: dict = {}): - obj.update({"time": time.time()}) - - if target is not None: - obj.update({"target": target}) - - if error is not None: - obj.update({"error": str(error)}) - - print( - json.dumps(obj, default=str), file=sys.stdout if error is None else sys.stderr - ) +import sys +import json +import time + + +def log(target: str = None, error: Exception = None, obj: dict = {}): + obj.update({"time": time.time()}) + + if target is not None: + obj.update({"target": target}) + + if error is not None: + obj.update({"error": str(error)}) + + print( + json.dumps(obj, default=str), file=sys.stdout if error is None else sys.stderr + ) diff --git a/securitytxt.py b/securitytxt.py new file mode 100644 index 0000000..efd78d7 --- /dev/null +++ b/securitytxt.py @@ -0,0 +1,212 @@ +import requests +import time +import html +import re + +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry + +url_formats = { + 1: "https://{0}/.well-known/security.txt", + 2: "https://{0}/security.txt", + 3: "http://{0}/.well-known/security.txt", + 4: "http://{0}/security.txt", +} +redirect_value = ( + r"(?:window\.location\.(?:href|replace)\s*?[=\(]\s*?|" + + "http-equiv\s*?=\s*?['\"]?refresh.*?content\s*?=\s*?)" + + "['\"](?:.*?\;)?(?:\s+)?(?:url\s*?=\s*?)?(?P.+?)['\"]" +) +timeout=1 +adapter = HTTPAdapter(max_retries=1) +http = requests.Session() +http.mount("https://", adapter) +http.mount("http://", adapter) + + +def getSecurityTxt(domain: str): + https_error = False + http_error = False + + res = None + + for x in url_formats: + if not https_error and not http_error: + res = getSecurityTxtFormat(domain, url_formats[x]) + + if "error" in res and url_formats[x].startswith("https:"): + https_error = True + + if "error" in res and url_formats[x].startswith("http:"): + http_error = True + + if res["has_contact"]: + res.update({"sectxt_type": x}) + break + + return res + + +def getRedirectsFromReq(req, is_html_redirect=False): + res = [] + counter = 0 + + for r in req.history: + counter += 1 + rtype = str(r.status_code) + if counter == 1 and is_html_redirect: + rtype = "HTML" + res.append({"type": rtype, "val": r.url, "https": ("https" in r.url)}) + + reqtype = str(req.status_code) + if counter == 0 and is_html_redirect: + reqtype = "HTML" + res.append({"type": reqtype, "val": req.url, "https": ("https" in req.url)}) + return res + + +def onlyHTTPSInRedirects(redirects): + res = True + for r in redirects: + if not r["https"]: + res = False + break + return res + + +def getSecurityTxtFormat(domain: str, uf: str): + errorString = None + + try: + headers = {"User-Agent": "python requests - gotsecuritytxt.com"} + req = http.get( + uf.format(domain), headers=headers, verify=True, timeout=timeout + ) + redirects = getRedirectsFromReq(req) + + pr = parseResponse(req.headers, req.text, domain, req.url, req.status_code) + + if not pr["has_contact"] and req.text: + redirect_res = re.search(redirect_value, req.text) + if redirect_res is not None: + possible_redirect = redirect_res.group("redirect").strip() + if possible_redirect: + if possible_redirect.startswith("/"): + possible_redirect = "{0}://{1}{2}".format( + uf.split(":")[0], domain, possible_redirect + ) + + if "://" in possible_redirect: + req2 = http.get( + possible_redirect, + headers=headers, + verify=True, + timeout=timeout, + ) + pr2 = parseResponse( + req2.headers, req2.text, domain, req2.url, req2.status_code + ) + + if pr2["has_contact"]: + redirects = redirects + getRedirectsFromReq(req2, True) + pr2["redirects"] = redirects + pr2["valid_https"] = onlyHTTPSInRedirects(redirects) + return pr2 + + pr["redirects"] = redirects + pr["valid_https"] = onlyHTTPSInRedirects(redirects) + return pr + except requests.exceptions.SSLError as e: + errorString = "TLS/SSL error" + except requests.exceptions.RetryError as e: + errorString = "Retry error" + except requests.exceptions.Timeout as e: + errorString = "Timeout error" + except Exception as e: + errorString = str(e) + + print("getSecurityTxtFormat:error:", errorString) + pr = parseResponse({}, "", domain, "", 404) + pr["error"] = errorString + return pr + + +def parseResponse(headers: dict, body: str, domain: str, url: str, status_code: int): + has_contact = re.search("(?mi)^contact:", body) is not None + + res = { + "domain": domain, + "url": url, + "status_code": status_code, + "has_contact": has_contact, + "content_type": html.escape(headers["Content-Type"]) + if "Content-Type" in headers + else "", + "valid_https": False, + "valid_content_type": ( + "Content-Type" in headers + and headers["Content-Type"].startswith("text/plain") + ), + "full_text": "", + "min_text": "", + "items": { + "Acknowledgements": [], + "Canonical": [], + "Contact": [], + "Encryption": [], + "Preferred-Languages": "", + "Expires": "", + "Hiring": [], + "Policy": [], + }, + "redirects": [], + } + + if has_contact: + res["full_text"] = html.escape(body.strip()) + + actual_body = str(res["full_text"]) + + has_blocks = re.search("(?P\-\-+)", actual_body) + if has_blocks: + blocks = actual_body.split(has_blocks.groups("hyphens")[0]) + for block in blocks: + if "contact:" in block: + actual_body = block + break + + res["min_text"] = "" + + for line in actual_body.split("\n"): + stripped_line = line.strip() + line_search = re.search( + r"^(?: +)?(?P[A-Za-z\-]+)\:\s?(?P.+?)$", + stripped_line, + ) + if line_search: + for x in [ + "Acknowledgements", + "Acknowledgments", + "Canonical", + "Contact", + "Encryption", + "Preferred-Languages", + "Expires", + "Hiring", + "Policy", + ]: + if x.lower() in line_search.group("key").lower(): + + if x == "Acknowledgments": + x = "Acknowledgements" + + val = line_search.group("value") + + res["min_text"] += f"{x}: {val}\n" + + if type(res["items"][x]) == list: + res["items"][x].append(val) + else: + res["items"][x] = val + + return res diff --git a/socket_checker.py b/socket_checker.py index 74ace99..dec40ed 100644 --- a/socket_checker.py +++ b/socket_checker.py @@ -1,141 +1,141 @@ -import socket -import struct -import ssl -import OpenSSL.crypto -import pytz -from datetime import datetime - -from json_logger import log - - -def socket_check( - target: str, - ports: list = [443, 80], - timeout: int = 2, - custom_cipher_set: str = "", -) -> dict: - port = None - cert = None - raw_cert = None - - for p in ports: - ctx = ssl.create_default_context() - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - - if custom_cipher_set: - ctx.set_ciphers(custom_cipher_set) - - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0)) - sock.settimeout(timeout) - - is_break = False - - try: - result_of_check = 1 - with ctx.wrap_socket(sock, server_hostname=target) as sock: - result_of_check = sock.connect((target, p)) - bcert = sock.getpeercert(binary_form=True) - if bcert: - cert = get_certificate_info(bcert) - - if result_of_check is None: - port = p - is_break = True - - except ssl.SSLError as e: - port = p - is_break = True - log(target, e, {"port": p, "error_type": "sslerror"}) - - except Exception as e: - log(target, e, {"port": p, "error_type": "socket"}) - - sock.close() - - if is_break: - break - - return {"port": port, "certificate": cert} - - -def get_certificate_info(bcert): - res = None - cert = None - try: - fcert = ssl.DER_cert_to_PEM_cert(bcert) - c = OpenSSL.crypto - cert = c.load_certificate(c.FILETYPE_PEM, fcert) - - if cert is not None: - date_format, encoding = "%Y%m%d%H%M%SZ", "ascii" - - not_before = datetime.strptime( - cert.get_notBefore().decode(encoding), date_format - ).replace(tzinfo=pytz.UTC) - - not_after = datetime.strptime( - cert.get_notAfter().decode(encoding), date_format - ).replace(tzinfo=pytz.UTC) - - new_date_format = "%Y-%m-%d %H:%M:%S %Z" - - res = { - "issuer": x509name_tostring(cert.get_issuer()), - "notAfter": datetime.strftime(not_after, new_date_format), - "notBefore": datetime.strftime(not_before, new_date_format), - "serialNumber": cert.get_serial_number(), - "subject": x509name_tostring(cert.get_subject()), - } - - for n in range(0, cert.get_extension_count()): - ce = cert.get_extension(n) - if ce and "get_data" in dir(ce) and "get_short_name" in dir(ce): - short_name = ce.get_short_name().decode("utf-8") - data = None - - if short_name == "subjectAltName": - sadata = ce._subjectAltNameString().split(", ") - data = {} - for isad in sadata: - k = isad.partition(":")[0] - v = isad.partition(":")[2] - if k not in data: - data[k] = [] - data[k].append(v) - else: - try: - sdata = str(ce).split("\n") - data = [] - for isd in sdata: - isd = isd.strip() - if isd and isd not in data: - if "URI" in isd: - k = isd.partition(":")[0] - v = isd.partition(":")[2] - data.append({k: v}) - else: - data.append(isd) - except Exception: - data = str(ce.get_data()) - - res.update({short_name: data}) - except Exception as e: - log(target, e, {"port": p, "error_type": "get_certificate_info"}) - return res - - -def x509name_tostring(x509name): - if "get_components" in dir(x509name): - c = x509name.get_components() - res = "" - for x in c: - key = x[0].decode("ascii") - value = x[1].decode("ascii") - res += f"/{key}={value}" - return res - else: - return None +import socket +import struct +import ssl +import OpenSSL.crypto +import pytz +from datetime import datetime + +from json_logger import log + + +def socket_check( + target: str, + ports: list = [443, 80], + timeout: int = 2, + custom_cipher_set: str = "", +) -> dict: + port = None + cert = None + raw_cert = None + + for p in ports: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + if custom_cipher_set: + ctx.set_ciphers(custom_cipher_set) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0)) + sock.settimeout(timeout) + + is_break = False + + try: + result_of_check = 1 + with ctx.wrap_socket(sock, server_hostname=target) as sock: + result_of_check = sock.connect((target, p)) + bcert = sock.getpeercert(binary_form=True) + if bcert: + cert = get_certificate_info(bcert) + + if result_of_check is None: + port = p + is_break = True + + except ssl.SSLError as e: + port = p + is_break = True + log(target, e, {"port": p, "error_type": "sslerror"}) + + except Exception as e: + log(target, e, {"port": p, "error_type": "socket"}) + + sock.close() + + if is_break: + break + + return {"port": port, "certificate": cert} + + +def get_certificate_info(bcert): + res = None + cert = None + try: + fcert = ssl.DER_cert_to_PEM_cert(bcert) + c = OpenSSL.crypto + cert = c.load_certificate(c.FILETYPE_PEM, fcert) + + if cert is not None: + date_format, encoding = "%Y%m%d%H%M%SZ", "ascii" + + not_before = datetime.strptime( + cert.get_notBefore().decode(encoding), date_format + ).replace(tzinfo=pytz.UTC) + + not_after = datetime.strptime( + cert.get_notAfter().decode(encoding), date_format + ).replace(tzinfo=pytz.UTC) + + new_date_format = "%Y-%m-%d %H:%M:%S %Z" + + res = { + "issuer": x509name_tostring(cert.get_issuer()), + "notAfter": datetime.strftime(not_after, new_date_format), + "notBefore": datetime.strftime(not_before, new_date_format), + "serialNumber": cert.get_serial_number(), + "subject": x509name_tostring(cert.get_subject()), + } + + for n in range(0, cert.get_extension_count()): + ce = cert.get_extension(n) + if ce and "get_data" in dir(ce) and "get_short_name" in dir(ce): + short_name = ce.get_short_name().decode("utf-8") + data = None + + if short_name == "subjectAltName": + sadata = ce._subjectAltNameString().split(", ") + data = {} + for isad in sadata: + k = isad.partition(":")[0] + v = isad.partition(":")[2] + if k not in data: + data[k] = [] + data[k].append(v) + else: + try: + sdata = str(ce).split("\n") + data = [] + for isd in sdata: + isd = isd.strip() + if isd and isd not in data: + if "URI" in isd: + k = isd.partition(":")[0] + v = isd.partition(":")[2] + data.append({k: v}) + else: + data.append(isd) + except Exception: + data = str(ce.get_data()) + + res.update({short_name: data}) + except Exception as e: + log(target, e, {"port": p, "error_type": "get_certificate_info"}) + return res + + +def x509name_tostring(x509name): + if "get_components" in dir(x509name): + c = x509name.get_components() + res = "" + for x in c: + key = x[0].decode("ascii") + value = x[1].decode("ascii") + res += f"/{key}={value}" + return res + else: + return None diff --git a/target.py b/target.py index 0e8e373..58ffd4d 100644 --- a/target.py +++ b/target.py @@ -1,488 +1,488 @@ -import os -import sys -import dns -import dns.resolver -import requests -import ipaddress -import time -import json -import random -import urllib -import backoff -import re -import html - -from urllib3.util import connection - -from json_logger import log -from socket_checker import socket_check - -NAMESERVERS = os.getenv("NAMESERVERS", default="").split(";") -DNS_TIMEOUT = int(os.getenv("DNS_TIMEOUT", "5")) -SOCKET_TIMEOUT = int(os.getenv("SOCKET_TIMEOUT", "5")) -HTTP_TIMEOUT = int(os.getenv("HTTP_TIMEOUT", "5")) -HTTP_RETRIES = int(os.getenv("HTTP_RETRIES", "3")) -HTTP_USER_AGENT = os.getenv("HTTP_USER_AGENT", "findsecuritycontacts.com") -ALLOW_INTERNAL_IP = int(os.getenv("ALLOW_INTERNAL_IP", "0")) -CUSTOM_CIPHER_ADDITION = os.getenv("CUSTOM_CIPHER_ADDITION", ":HIGH:!DH:!aNULL") -CUSTOM_CIPHER_SET = os.getenv("CUSTOM_CIPHER_SET", "") - -REDIRECT_REGEX = ( - r"(?:window\.location\.(?:href|replace)\s*?[=\(]\s*?|" - + "http-equiv\s*?=\s*?['\"]?refresh.*?content\s*?=\s*?)" - + "['\"](?:.*?\;)?(?:\s+)?(?:url\s*?=\s*?)?(?P.+?)['\"]" -) - -requests.packages.urllib3.disable_warnings( - requests.packages.urllib3.exceptions.InsecureRequestWarning -) - -if CUSTOM_CIPHER_ADDITION: - requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += CUSTOM_CIPHER_ADDITION - CUSTOM_CIPHER_SET = requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS -elif CUSTOM_CIPHER_SET: - requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = CUSTOM_CIPHER_SET -try: - requests.packages.urllib3.contrib.pyopenssl.util.ssl_.DEFAULT_CIPHERS = ( - CUSTOM_CIPHER_SET - ) -except AttributeError: - # no pyopenssl support used / needed / available - pass - - -resolver = dns.resolver.Resolver() -if NAMESERVERS != [""]: - resolver.nameservers = NAMESERVERS -resolver.lifetime = resolver.timeout = DNS_TIMEOUT - -dns_responses = {} - - -def get_dns_responses() -> dict: - return dns_responses - - -def add_dns_response(d, a=None, cname=None, txt=None): - if d: - if d not in dns_responses: - dns_responses[d] = { - "a_records": [], - "cname_records": [], - "txt_records": [], - "time": 0, - } - - if a is not None and a not in dns_responses[d]["a_records"]: - dns_responses[d]["a_records"].append(a) - - if cname is not None and cname not in dns_responses[d]["cname_records"]: - dns_responses[d]["cname_records"].append(cname) - - if txt is not None and txt not in dns_responses[d]["txt_records"]: - if txt.startswith("\\"): - txt = txt.strip("\\") - - if txt.startswith('"'): - txt = txt.strip('"') - - if txt.startswith("'"): - txt = txt.strip("'") - - dns_responses[d]["txt_records"].append(txt) - - dns_responses[d]["dns_resolve_time"] = time.time() - - return - - -def targetparse(target: str) -> urllib.parse.ParseResult: - if target: - target = target.strip() - if "://" not in target: - target = f"null://{target}" - o = urllib.parse.urlparse(target) - if o.hostname is None: - return None - else: - return o - return None - - -def parse_ip(target: str) -> dict: - res = {"ip": None, "type": None, "is_ip": False} - try: - ip = ipaddress.ip_address(target) - ip_type = None - if type(ip) == ipaddress.IPv4Address: - ip_type = "IPv4" - elif type(ip) == ipaddress.IPv6Address: - ip_type = "IPv6" - if ip_type is not None: - res.update( - { - "ip": target, - "type": ip_type, - "is_ip": True, - "is_private": ip.is_private, - } - ) - except Exception as e: - log(error=e) - return res - - -def get_address_tuple(source_address): - if type(source_address) == tuple: - host, port = source_address - else: - host = source_address - port = 0 - - o = targetparse(host) - if o is None: - return None - host = o.hostname - - if o.port: - port = o.port - if port is None: - port = 0 - - resolve_hostname = True - - ip = parse_ip(host) - if ip["is_ip"]: - resolve_hostname = False - if not ALLOW_INTERNAL_IP and ip["is_private"]: - return None - - if resolve_hostname: - if host in dns_responses: - # if the existing DNS response is less than an hour old - if dns_responses[host]["dns_resolve_time"] >= (time.time() - 3600): - resolve_hostname = False - - if resolve_hostname: - try: - dns_result = resolver.resolve(host, "A") - for dns_val_raw in dns_result: - dns_val = dns_val_raw.to_text() - add_dns_response(host, a=dns_val) - except Exception as e: - log(host, e) - - try: - dns_result = resolver.resolve(host, "CNAME") - for dns_val_raw in dns_result: - dns_val = dns_val_raw.to_text() - add_dns_response(host, cname=dns_val) - except Exception as e: - log(host, e) - - try: - dns_result = resolver.resolve(host, "TXT") - for dns_val_raw in dns_result: - dns_val = dns_val_raw.to_text() - add_dns_response(host, txt=dns_val) - except Exception as e: - log(host, e) - - if host in dns_responses and "a_records" in dns_responses[host]: - if len(dns_responses[host]["a_records"]) > 0: - host = random.choice(dns_responses[host]["a_records"]) - - return (host, port) - - -_orig_urllib3_create_connection = connection.create_connection - - -def patched_urllib3_create_connection(address, *args, **kwargs): - return _orig_urllib3_create_connection(get_address_tuple(address), *args, **kwargs) - - -connection.create_connection = patched_urllib3_create_connection - - -def get_http_security_txt(hostname: str, port: int = None) -> dict: - url_formats = { - "https_well-known": "https://{0}/.well-known/security.txt", - "https_root": "https://{0}/security.txt", - "http_well-known": "http://{0}/.well-known/security.txt", - "http_root": "http://{0}/security.txt", - } - - for x in url_formats: - log(hostname, f"get_http_security_txt: trying: {x}: {url_formats[x]}") - res = getSecurityTxtFormat(hostname, port, url_formats[x]) - if "has_contact" in res and res["has_contact"]: - res.update({"type": x}) - return res - - return {} - - -def parseResponse(headers: dict, body: str, url: str, status_code: int) -> dict: - has_contact = re.search("(?mi)^contact:", body) is not None - - res = { - "url": url, - "status_code": status_code, - "has_contact": has_contact, - "valid_https": False, - "valid_content_type": ( - "Content-Type" in headers - and headers["Content-Type"].startswith("text/plain") - ), - "full_text": "", - "min_text": "", - "items": { - "Acknowledgements": [], - "Canonical": [], - "Contact": [], - "Encryption": [], - "Preferred-Languages": "", - "Expires": "", - "Hiring": [], - "Policy": [], - }, - "headers": {}, - } - - for rh in headers: - if rh in res["headers"]: - res["headers"][rh] = f'{res["headers"][rh]};{str(headers[rh])}' - else: - res["headers"][rh] = str(headers[rh]) - - if has_contact: - res["full_text"] = html.escape(body.strip()) - - actual_body = str(res["full_text"]) - - has_blocks = re.search("(?P\-\-+)", actual_body) - if has_blocks: - blocks = actual_body.split(has_blocks.groups("hyphens")[0]) - for block in blocks: - if "contact:" in block.lower(): - actual_body = block - break - - res["min_text"] = "" - - for line in actual_body.split("\n"): - stripped_line = line.strip() - line_search = re.search( - r"^(?: +)?(?P[A-Za-z\-]+)\:\s?(?P.+?)$", - stripped_line, - ) - if line_search: - for x in [ - "Acknowledgements", - "Acknowledgments", - "Canonical", - "Contact", - "Encryption", - "Preferred-Languages", - "Expires", - "Hiring", - "Policy", - ]: - if x.lower() in line_search.group("key").lower(): - - if x == "Acknowledgments": - x = "Acknowledgements" - - val = line_search.group("value") - - res["min_text"] += f"{x}: {val}\n" - - if type(res["items"][x]) == list: - res["items"][x].append(val) - else: - res["items"][x] = val - - return res - - -def getRedirectsFromReq(req, is_html_redirect: bool = False) -> dict: - res = [] - counter = 0 - - for r in req.history: - counter += 1 - rtype = "HTML" if counter == 1 and is_html_redirect else str(r.status_code) - res.append( - {"type": rtype, "val": r.url, "https": (r.url.startswith("https://"))} - ) - - rtype = "HTML" if counter == 0 and is_html_redirect else str(req.status_code) - res.append( - {"type": rtype, "val": req.url, "https": (req.url.startswith("https://"))} - ) - return res - - -def onlyHTTPSInRedirects(redirects): - res = True - for r in redirects: - if not r["https"]: - res = False - break - return res - - -@backoff.on_exception( - backoff.expo, - requests.exceptions.RequestException, - max_time=(HTTP_RETRIES * HTTP_TIMEOUT) + 1, - max_tries=HTTP_RETRIES, - giveup=lambda e: e.response is not None and e.response.status_code < 500, -) -def getSecurityTxtFormat( - hostname: str, - port: int, - uf: str, - url_override: str = None, - html_redirect: bool = False, - max_redirects: int = 5, -) -> dict: - res = {} - max_redirects -= 1 - if max_redirects <= 0: - return res - - url = ( - url_override - if url_override is not None - else uf.format(hostname + ("" if port is None else f":{port}")) - ) - - try: - headers = {"User-Agent": HTTP_USER_AGENT} - - req = None - try: - req = requests.get(url, headers=headers, verify=True, timeout=HTTP_TIMEOUT) - except requests.exceptions.SSLError as e: - res["https_failure"] = str(e) - req = requests.get(url, headers=headers, verify=False, timeout=HTTP_TIMEOUT) - except Exception as e: - raise e - - if req is None: - return {} - - res = parseResponse(req.headers, req.text, req.url, req.status_code) - - if not res["has_contact"] and req.text: - redirect_res = re.search(REDIRECT_REGEX, req.text) - if redirect_res is not None: - possible_redirect = redirect_res.group("redirect").strip() - if possible_redirect: - if possible_redirect.startswith("/"): - possible_redirect = "{0}://{1}{2}".format( - uf.split(":")[0], hostname, possible_redirect - ) - if "://" in possible_redirect: - return getSecurityTxtFormat( - hostname, port, uf, url_override, True, max_redirects - ) - - res["redirects"] = getRedirectsFromReq(req, html_redirect) - res["valid_https"] = onlyHTTPSInRedirects(res["redirects"]) - except Exception as e: - log(hostname, e) - - return res - - -def get_dnssecuritytxt(target: str = None) -> dict: - res = {"security_contact": None, "security_policy": None, "matching_domain": None} - - if target: - if target in dns_responses and "txt_records" in dns_responses[target]: - for x in dns_responses[target]["txt_records"]: - for y in ["security_contact", "security_policy"]: - if x.startswith(f"{y}="): - res["matching_domain"] = target - res[y] = x.replace(f"{y}=", "") - - if not target.startswith("_security") and res["matching_domain"] is None: - sec_subdomain = f"_security.{target}" - get_address_tuple(sec_subdomain) - res = get_dnssecuritytxt(sec_subdomain) - - return res - - -def scan(target: str) -> dict: - res = {"time": time.time(), "raw_target": target, "has_contact": False} - - parsed_target = targetparse(target) - if parsed_target is None: - res["error"] = "failed to parse target" - else: - res["target"] = parsed_target.hostname - - res["scan_type"] = "domain" - - ip = parse_ip(parsed_target.hostname) - if ip["is_ip"]: - res["scan_type"] = "ip" - if not ALLOW_INTERNAL_IP and ip["is_private"]: - res["error"] = "target is a private IP address" - res["scan_type"] = None - - log(target, obj=res) - - resolved_target = None - - if res["scan_type"] == "domain": - resolved_target, _ = get_address_tuple( - (parsed_target.hostname, parsed_target.port) - ) - res["domain_details"] = ( - dns_responses[parsed_target.hostname] - if parsed_target.hostname in dns_responses - else {} - ) - - res["dnssecuritytxt"] = get_dnssecuritytxt(parsed_target.hostname) - - elif scan_type == "ip": - resolved_target = parsed_target.hostname - res["domain_details"] = {} - res["dnssecuritytxt"] = get_dnssecuritytxt() - - if resolved_target is None: - return res - - if res["dnssecuritytxt"]["matching_domain"] is not None: - res["has_contact"] = True - - scargs = { - "target": parsed_target.hostname, - "timeout": SOCKET_TIMEOUT, - "custom_cipher_set": CUSTOM_CIPHER_SET, - } - if parsed_target.port is not None: - scargs["ports"] = [parsed_target.port] - scr = socket_check(**scargs) - res.update(scr) - - if res["port"] is None: - return res - - res["http_security_txt"] = get_http_security_txt( - parsed_target.hostname, scr["port"] - ) - - if ( - "has_contact" in res["http_security_txt"] - and res["http_security_txt"]["has_contact"] - ): - res["has_contact"] = True - - return res +import os +import sys +import dns +import dns.resolver +import requests +import ipaddress +import time +import json +import random +import urllib +import backoff +import re +import html + +from urllib3.util import connection + +from json_logger import log +from socket_checker import socket_check + +NAMESERVERS = os.getenv("NAMESERVERS", default="").split(";") +DNS_TIMEOUT = int(os.getenv("DNS_TIMEOUT", "5")) +SOCKET_TIMEOUT = int(os.getenv("SOCKET_TIMEOUT", "5")) +HTTP_TIMEOUT = int(os.getenv("HTTP_TIMEOUT", "5")) +HTTP_RETRIES = int(os.getenv("HTTP_RETRIES", "3")) +HTTP_USER_AGENT = os.getenv("HTTP_USER_AGENT", "findsecuritycontacts.com") +ALLOW_INTERNAL_IP = int(os.getenv("ALLOW_INTERNAL_IP", "0")) +CUSTOM_CIPHER_ADDITION = os.getenv("CUSTOM_CIPHER_ADDITION", ":HIGH:!DH:!aNULL") +CUSTOM_CIPHER_SET = os.getenv("CUSTOM_CIPHER_SET", "") + +REDIRECT_REGEX = ( + r"(?:window\.location\.(?:href|replace)\s*?[=\(]\s*?|" + + "http-equiv\s*?=\s*?['\"]?refresh.*?content\s*?=\s*?)" + + "['\"](?:.*?\;)?(?:\s+)?(?:url\s*?=\s*?)?(?P.+?)['\"]" +) + +requests.packages.urllib3.disable_warnings( + requests.packages.urllib3.exceptions.InsecureRequestWarning +) + +if CUSTOM_CIPHER_ADDITION: + requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += CUSTOM_CIPHER_ADDITION + CUSTOM_CIPHER_SET = requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS +elif CUSTOM_CIPHER_SET: + requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = CUSTOM_CIPHER_SET +try: + requests.packages.urllib3.contrib.pyopenssl.util.ssl_.DEFAULT_CIPHERS = ( + CUSTOM_CIPHER_SET + ) +except AttributeError: + # no pyopenssl support used / needed / available + pass + + +resolver = dns.resolver.Resolver() +if NAMESERVERS != [""]: + resolver.nameservers = NAMESERVERS +resolver.lifetime = resolver.timeout = DNS_TIMEOUT + +dns_responses = {} + + +def get_dns_responses() -> dict: + return dns_responses + + +def add_dns_response(d, a=None, cname=None, txt=None): + if d: + if d not in dns_responses: + dns_responses[d] = { + "a_records": [], + "cname_records": [], + "txt_records": [], + "time": 0, + } + + if a is not None and a not in dns_responses[d]["a_records"]: + dns_responses[d]["a_records"].append(a) + + if cname is not None and cname not in dns_responses[d]["cname_records"]: + dns_responses[d]["cname_records"].append(cname) + + if txt is not None and txt not in dns_responses[d]["txt_records"]: + if txt.startswith("\\"): + txt = txt.strip("\\") + + if txt.startswith('"'): + txt = txt.strip('"') + + if txt.startswith("'"): + txt = txt.strip("'") + + dns_responses[d]["txt_records"].append(txt) + + dns_responses[d]["dns_resolve_time"] = time.time() + + return + + +def targetparse(target: str) -> urllib.parse.ParseResult: + if target: + target = target.strip() + if "://" not in target: + target = f"null://{target}" + o = urllib.parse.urlparse(target) + if o.hostname is None: + return None + else: + return o + return None + + +def parse_ip(target: str) -> dict: + res = {"ip": None, "type": None, "is_ip": False} + try: + ip = ipaddress.ip_address(target) + ip_type = None + if type(ip) == ipaddress.IPv4Address: + ip_type = "IPv4" + elif type(ip) == ipaddress.IPv6Address: + ip_type = "IPv6" + if ip_type is not None: + res.update( + { + "ip": target, + "type": ip_type, + "is_ip": True, + "is_private": ip.is_private, + } + ) + except Exception as e: + log(error=e) + return res + + +def get_address_tuple(source_address): + if type(source_address) == tuple: + host, port = source_address + else: + host = source_address + port = 0 + + o = targetparse(host) + if o is None: + return None + host = o.hostname + + if o.port: + port = o.port + if port is None: + port = 0 + + resolve_hostname = True + + ip = parse_ip(host) + if ip["is_ip"]: + resolve_hostname = False + if not ALLOW_INTERNAL_IP and ip["is_private"]: + return None + + if resolve_hostname: + if host in dns_responses: + # if the existing DNS response is less than an hour old + if dns_responses[host]["dns_resolve_time"] >= (time.time() - 3600): + resolve_hostname = False + + if resolve_hostname: + try: + dns_result = resolver.resolve(host, "A") + for dns_val_raw in dns_result: + dns_val = dns_val_raw.to_text() + add_dns_response(host, a=dns_val) + except Exception as e: + log(host, e) + + try: + dns_result = resolver.resolve(host, "CNAME") + for dns_val_raw in dns_result: + dns_val = dns_val_raw.to_text() + add_dns_response(host, cname=dns_val) + except Exception as e: + log(host, e) + + try: + dns_result = resolver.resolve(host, "TXT") + for dns_val_raw in dns_result: + dns_val = dns_val_raw.to_text() + add_dns_response(host, txt=dns_val) + except Exception as e: + log(host, e) + + if host in dns_responses and "a_records" in dns_responses[host]: + if len(dns_responses[host]["a_records"]) > 0: + host = random.choice(dns_responses[host]["a_records"]) + + return (host, port) + + +_orig_urllib3_create_connection = connection.create_connection + + +def patched_urllib3_create_connection(address, *args, **kwargs): + return _orig_urllib3_create_connection(get_address_tuple(address), *args, **kwargs) + + +connection.create_connection = patched_urllib3_create_connection + + +def get_http_security_txt(hostname: str, port: int = None) -> dict: + url_formats = { + "https_well-known": "https://{0}/.well-known/security.txt", + "https_root": "https://{0}/security.txt", + "http_well-known": "http://{0}/.well-known/security.txt", + "http_root": "http://{0}/security.txt", + } + + for x in url_formats: + log(hostname, f"get_http_security_txt: trying: {x}: {url_formats[x]}") + res = getSecurityTxtFormat(hostname, port, url_formats[x]) + if "has_contact" in res and res["has_contact"]: + res.update({"type": x}) + return res + + return {} + + +def parseResponse(headers: dict, body: str, url: str, status_code: int) -> dict: + has_contact = re.search("(?mi)^contact:", body) is not None + + res = { + "url": url, + "status_code": status_code, + "has_contact": has_contact, + "valid_https": False, + "valid_content_type": ( + "Content-Type" in headers + and headers["Content-Type"].startswith("text/plain") + ), + "full_text": "", + "min_text": "", + "items": { + "Acknowledgements": [], + "Canonical": [], + "Contact": [], + "Encryption": [], + "Preferred-Languages": "", + "Expires": "", + "Hiring": [], + "Policy": [], + }, + "headers": {}, + } + + for rh in headers: + if rh in res["headers"]: + res["headers"][rh] = f'{res["headers"][rh]};{str(headers[rh])}' + else: + res["headers"][rh] = str(headers[rh]) + + if has_contact: + res["full_text"] = html.escape(body.strip()) + + actual_body = str(res["full_text"]) + + has_blocks = re.search("(?P\-\-+)", actual_body) + if has_blocks: + blocks = actual_body.split(has_blocks.groups("hyphens")[0]) + for block in blocks: + if "contact:" in block.lower(): + actual_body = block + break + + res["min_text"] = "" + + for line in actual_body.split("\n"): + stripped_line = line.strip() + line_search = re.search( + r"^(?: +)?(?P[A-Za-z\-]+)\:\s?(?P.+?)$", + stripped_line, + ) + if line_search: + for x in [ + "Acknowledgements", + "Acknowledgments", + "Canonical", + "Contact", + "Encryption", + "Preferred-Languages", + "Expires", + "Hiring", + "Policy", + ]: + if x.lower() in line_search.group("key").lower(): + + if x == "Acknowledgments": + x = "Acknowledgements" + + val = line_search.group("value") + + res["min_text"] += f"{x}: {val}\n" + + if type(res["items"][x]) == list: + res["items"][x].append(val) + else: + res["items"][x] = val + + return res + + +def getRedirectsFromReq(req, is_html_redirect: bool = False) -> dict: + res = [] + counter = 0 + + for r in req.history: + counter += 1 + rtype = "HTML" if counter == 1 and is_html_redirect else str(r.status_code) + res.append( + {"type": rtype, "val": r.url, "https": (r.url.startswith("https://"))} + ) + + rtype = "HTML" if counter == 0 and is_html_redirect else str(req.status_code) + res.append( + {"type": rtype, "val": req.url, "https": (req.url.startswith("https://"))} + ) + return res + + +def onlyHTTPSInRedirects(redirects): + res = True + for r in redirects: + if not r["https"]: + res = False + break + return res + + +@backoff.on_exception( + backoff.expo, + requests.exceptions.RequestException, + max_time=(HTTP_RETRIES * HTTP_TIMEOUT) + 1, + max_tries=HTTP_RETRIES, + giveup=lambda e: e.response is not None and e.response.status_code < 500, +) +def getSecurityTxtFormat( + hostname: str, + port: int, + uf: str, + url_override: str = None, + html_redirect: bool = False, + max_redirects: int = 5, +) -> dict: + res = {} + max_redirects -= 1 + if max_redirects <= 0: + return res + + url = ( + url_override + if url_override is not None + else uf.format(hostname + ("" if port is None else f":{port}")) + ) + + try: + headers = {"User-Agent": HTTP_USER_AGENT} + + req = None + try: + req = requests.get(url, headers=headers, verify=True, timeout=HTTP_TIMEOUT) + except requests.exceptions.SSLError as e: + res["https_failure"] = str(e) + req = requests.get(url, headers=headers, verify=False, timeout=HTTP_TIMEOUT) + except Exception as e: + raise e + + if req is None: + return {} + + res = parseResponse(req.headers, req.text, req.url, req.status_code) + + if not res["has_contact"] and req.text: + redirect_res = re.search(REDIRECT_REGEX, req.text) + if redirect_res is not None: + possible_redirect = redirect_res.group("redirect").strip() + if possible_redirect: + if possible_redirect.startswith("/"): + possible_redirect = "{0}://{1}{2}".format( + uf.split(":")[0], hostname, possible_redirect + ) + if "://" in possible_redirect: + return getSecurityTxtFormat( + hostname, port, uf, url_override, True, max_redirects + ) + + res["redirects"] = getRedirectsFromReq(req, html_redirect) + res["valid_https"] = onlyHTTPSInRedirects(res["redirects"]) + except Exception as e: + log(hostname, e) + + return res + + +def get_dnssecuritytxt(target: str = None) -> dict: + res = {"security_contact": None, "security_policy": None, "matching_domain": None} + + if target: + if target in dns_responses and "txt_records" in dns_responses[target]: + for x in dns_responses[target]["txt_records"]: + for y in ["security_contact", "security_policy"]: + if x.startswith(f"{y}="): + res["matching_domain"] = target + res[y] = x.replace(f"{y}=", "") + + if not target.startswith("_security") and res["matching_domain"] is None: + sec_subdomain = f"_security.{target}" + get_address_tuple(sec_subdomain) + res = get_dnssecuritytxt(sec_subdomain) + + return res + + +def scan(target: str) -> dict: + res = {"time": time.time(), "raw_target": target, "has_contact": False} + + parsed_target = targetparse(target) + if parsed_target is None: + res["error"] = "failed to parse target" + else: + res["target"] = parsed_target.hostname + + res["scan_type"] = "domain" + + ip = parse_ip(parsed_target.hostname) + if ip["is_ip"]: + res["scan_type"] = "ip" + if not ALLOW_INTERNAL_IP and ip["is_private"]: + res["error"] = "target is a private IP address" + res["scan_type"] = None + + log(target, obj=res) + + resolved_target = None + + if res["scan_type"] == "domain": + resolved_target, _ = get_address_tuple( + (parsed_target.hostname, parsed_target.port) + ) + res["domain_details"] = ( + dns_responses[parsed_target.hostname] + if parsed_target.hostname in dns_responses + else {} + ) + + res["dnssecuritytxt"] = get_dnssecuritytxt(parsed_target.hostname) + + elif scan_type == "ip": + resolved_target = parsed_target.hostname + res["domain_details"] = {} + res["dnssecuritytxt"] = get_dnssecuritytxt() + + if resolved_target is None: + return res + + if res["dnssecuritytxt"]["matching_domain"] is not None: + res["has_contact"] = True + + scargs = { + "target": parsed_target.hostname, + "timeout": SOCKET_TIMEOUT, + "custom_cipher_set": CUSTOM_CIPHER_SET, + } + if parsed_target.port is not None: + scargs["ports"] = [parsed_target.port] + scr = socket_check(**scargs) + res.update(scr) + + if res["port"] is None: + return res + + res["http_security_txt"] = get_http_security_txt( + parsed_target.hostname, scr["port"] + ) + + if ( + "has_contact" in res["http_security_txt"] + and res["http_security_txt"]["has_contact"] + ): + res["has_contact"] = True + + return res