-
Notifications
You must be signed in to change notification settings - Fork 0
/
lookup.py
102 lines (77 loc) · 1.87 KB
/
lookup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import socket
from contextlib import contextmanager
from modules.apputils.curl import curl
RIPE_BGP_STATUS_URL = "https://stat.ripe.net/data/bgp-state/data.json?resource={}"
class QueryMethod(object):
ripe = 0
radb_whois = 1
def fetch_ripe_info(as_list):
"""
Downloading metadata from RIPE
:type as_list list[str]
:rtype list
"""
if not isinstance(as_list, (list, set)):
return None
results = []
r = curl(RIPE_BGP_STATUS_URL.format(",".join(as_list)))
if r.code == 200:
nets = r.from_json()
if not nets:
return None
for item in nets["data"]["bgp_state"]:
results.append(item["target_prefix"])
return results
class WhoisQuery(object):
def __init__(self, server="whois.radb.net", port=43):
self.__whois_server = (server, port)
def query(self, q):
"""
:type q str
"""
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
_sock.connect(self.__whois_server)
_sock.send(str(q + "\r\n").encode())
response = b""
while True:
data = _sock.recv(4096)
if not data:
break
response += data
return response.decode()
finally:
try:
_sock.close()
except:
pass
return ""
def subnets_by_asn(self, asn):
"""
:type asn str
:rtype list[str]
"""
result = []
q = "-i origin {}".format(asn)
lines = self.query(q).split("\n")
for line in lines:
if line.startswith("route"):
result.append(line.partition(":")[2].strip())
return result
def subnets_by_asns(self, asn_list):
"""
:type asn_list list
:rtype list[str]
"""
results = []
for asn in asn_list:
results.extend(self.subnets_by_asn(asn))
return results
def nslookup(sitename):
"""
:type sitename str
"""
try:
return socket.gethostbyname_ex(sitename)[2]
except:
return []