-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
103 lines (82 loc) · 3.35 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import config
import requests
import json
import hmac
import hashlib
import ipaddress
import threading
import datetime
import azloganalytics
la = azloganalytics.LogAnalytics(config.azID, config.azSecret, "shadowserver") # Create log analytics object with creds & table
def genHMAC(secret, request): # Generate SHA256 HMAC from request & secret
request_string = json.dumps(request)
secret_bytes = bytes(str(secret), "utf-8")
request_bytes = bytes(request_string, "utf-8")
hmac_generator = hmac.new(secret_bytes, request_bytes, hashlib.sha256)
return hmac_generator.hexdigest()
def listReports(): # Get list of available reports
reportDate=datetime.datetime.now()-datetime.timedelta(1) # yesterday
req = {"id": "", "limit": 100, "date": reportDate.strftime('%Y-%m-%d'), "apikey": f"{config.key}"}
url = "https://transform.shadowserver.org/api2/reports/list"
resp = requests.post(url, json=req, headers={"HMAC2": genHMAC(secret=config.secret, request=req)})
return resp.json()
def downloadReport(reportID): # Retreive data from specified report
req = {"id": reportID, "limit": 10000, "apikey": f"{config.key}"}
url = "https://transform.shadowserver.org/api2/reports/download"
resp = requests.post(url, json=req, headers={"HMAC2": genHMAC(secret=config.secret, request=req)})
try:
reportData=resp.json()
except:
print(f"JSON Parsing Error: {resp.content}")
reportData=[]
print(f"Downloaded Report: {reportID} | {len(reportData)}")
return reportData
def initIPAM():
global subnets
url = "https://ipam.node4.co.uk/api/securityteam/sections/3/subnets/"
authheader = {"phpipam-token": config.ipamToken}
subnets = requests.get(url, headers=authheader).json()["data"]
print("IPAM Data downloaded...")
def getDescription(ip):
if "/" in ip:
myIPAddress = ipaddress.IPv4Network(ip)
else:
myIPAddress = ipaddress.ip_address(ip)
matchingSubnets = [i for i in subnets if myIPAddress in ipaddress.ip_network(i["subnet"] + "/" + i["mask"])]
try:
smallestSubnet = sorted(matchingSubnets, reverse=True, key=lambda d: d["mask"])[0]
except:
smallestSubnet = {"description": "not found"}
return smallestSubnet["description"]
def sendRecord(scandata, scanType):
scandata["scan"] = scanType
scandata["subnet_description"] = getDescription(scandata["ip"])
deviceJSON = []
deviceJSON.append(scandata)
la.sendtoAzure(deviceJSON)
def getScanData(scan):
print("-------------")
if report["type"] != "device_id":
print(f"Start Report | {report['type']} | {report['file']} | {report['id']} ")
reportContent = downloadReport(report["id"])
print(f"Processing Report | {report['type']} | Records: {len(reportContent)}")
if len(reportContent) == 10000:
print("** 10k LIMIT REACHED - Results Truncated")
for device in reportContent:
if "ip" in device:
t = threading.Thread(target=sendRecord, args=(device, report["type"]))
threads.append(t)
t.start()
for t in threads:
t.join()
print("-------------")
threads = []
try:
initIPAM()
except:
print("Cannot reach IPAM")
subnets = []
reportList = listReports()
print(f"Found {len(reportList)} scans")
for report in reportList:
getScanData(report)