Skip to content

Commit

Permalink
Replaced external WHOIS module with faster pure-Python implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
elceef committed Aug 20, 2023
1 parent a198ffb commit 6d37ff8
Showing 1 changed file with 82 additions and 21 deletions.
103 changes: 82 additions & 21 deletions dnstwist.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import urllib.parse
import gzip
from io import BytesIO
import contextlib
from datetime import datetime

def _debug(msg):
if 'DEBUG' in os.environ:
Expand Down Expand Up @@ -103,13 +103,6 @@ def __init__(self):
def country_by_addr(self, ipaddr):
return self.reader.country(ipaddr).country.name

try:
import whois
MODULE_WHOIS = True
except ImportError as e:
_debug(e)
MODULE_WHOIS = False

try:
import ssdeep
MODULE_SSDEEP = True
Expand Down Expand Up @@ -187,6 +180,81 @@ def domain_tld(domain):
return d


class Whois():
WHOIS_IANA = 'whois.iana.org'
TIMEOUT = 2.0
WHOIS_TLD = {
'com': 'whois.verisign-grs.com',
'net': 'whois.verisign-grs.com',
'org': 'whois.pir.org',
'info': 'whois.afilias.net',
'pl': 'whois.dns.pl',
'us': 'whois.nic.us',
'co': 'whois.nic.co',
'cn': 'whois.cnnic.cn',
'ru': 'whois.tcinet.ru',
'in': 'whois.registry.in',
}

def __init__(self):
self.whois_tld = self.WHOIS_TLD

def _brute_datetime(self, s):
formats = ('%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%d %H:%M:%S%z', '%Y-%m-%d %H:%M', '%Y.%m.%d %H:%M',
'%Y.%m.%d %H:%M:%S', '%d.%m.%Y %H:%M:%S', '%a %b %d %Y', '%d-%b-%Y', '%Y-%m-%d')
for f in formats:
try:
dt = datetime.strptime(s, f)
return dt
except ValueError:
pass
return None

def _extract(self, response):
fields = {
'registrar': (r'[\r\n]registrar[ .]*:\s+(?:name:\s)?(?P<registrar>[^\r\n]+)', str),
'creation_date': (r'[\r\n](?:created(?: on)?|creation date|registered(?: on)?)[ .]*:\s+(?P<creation_date>[^\r\n]+)', self._brute_datetime),
}
result = {'text': response}
response_reduced = '\r\n'.join([x.strip() for x in response.splitlines() if not x.startswith('%')])
for field, (pattern, func) in fields.items():
match = re.search(pattern, response_reduced, re.IGNORECASE | re.MULTILINE)
if match:
result[field] = func(match.group(1))
else:
result[field] = None
return result

def query(self, query, server=None):
_, _, tld = domain_tld(query)
server = server or self.whois_tld.get(tld, self.WHOIS_IANA)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.TIMEOUT)
response = b''
try:
sock.connect((server, 43))
sock.send(query.encode() + b'\r\n')
while True:
buf = sock.recv(4096)
if not buf:
break
response += buf
if server and server != self.WHOIS_IANA and tld not in self.whois_tld:
self.whois_tld[tld] = server
except socket.timeout:
return ''
finally:
sock.close()
response = response.decode('utf-8', errors='ignore')
refer = re.search(r'refer:\s+(?P<server>\w[-.a-z0-9]+)', response, re.IGNORECASE | re.MULTILINE)
if refer:
return self.query(query, refer.group('server'))
return response

def whois(self, domain, server=None):
return self._extract(self.query(domain, server))


class UrlOpener():
def __init__(self, url, timeout=REQUEST_TIMEOUT_HTTP, headers={}, verify=True):
http_headers = {'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9',
Expand Down Expand Up @@ -1272,10 +1340,6 @@ def signal_handler(signal, frame):
except ValueError:
parser.error('invalid domain name: ' + args.phash_url)

if args.whois:
if not MODULE_WHOIS:
parser.error('missing whois library')

if args.geoip:
if not MODULE_GEOIP:
parser.error('missing geoip2 library or database file (check $GEOLITE2_MMDB environment variable)')
Expand Down Expand Up @@ -1412,21 +1476,18 @@ def signal_handler(signal, frame):

if args.whois:
total = sum([1 for x in domains if x.is_registered()])
whois = Whois()
for i, domain in enumerate([x for x in domains if x.is_registered()]):
p_cli(ST_CLR + '\rWHOIS: {} ({:.2%})'.format(domain['domain'], (i+1)/total))
try:
_, dom, tld = domain_tld(domain['domain'])
with open(os.devnull, 'w') as devnull, contextlib.redirect_stderr(devnull):
whoisq = whois.query('.'.join([dom, tld]), ignore_returncode=True)
wreply = whois.whois('.'.join(domain_tld(domain['domain'])[1:]))
except Exception as e:
_debug(e)
else:
if whoisq is None:
continue
if whoisq.creation_date:
domain['whois_created'] = str(whoisq.creation_date).split(' ')[0]
if whoisq.registrar:
domain['whois_registrar'] = str(whoisq.registrar)
if wreply.get('creation_date'):
domain['whois_created'] = wreply.get('creation_date').strftime('%Y-%m-%d')
if wreply.get('registrar'):
domain['whois_registrar'] = wreply.get('registrar')
p_cli('\n')

p_cli('\n')
Expand Down

0 comments on commit 6d37ff8

Please sign in to comment.