Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres as backend for mispwarninglist #732

Merged
merged 4 commits into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions analyzers/MISPWarningLists/MISPWarningLists.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"author": "Nils Kuhnert, CERT-Bund",
"license": "AGPL-V3",
"url": "https://github.com/BSI-CERT-Bund/misp-warninglists-analyzer",
"version": "1.0",
"version": "2.0",
"description": "Check IoCs/Observables against MISP Warninglists to filter false positives.",
"dataTypeList": ["ip", "hash", "domain", "fqdn", "url"],
"baseConfig": "MISPWarningLists",
Expand All @@ -14,7 +14,14 @@
"description": "path to Warninglists folder",
"type": "string",
"multi": false,
"required": true
"required": false
},
{
"name": "conn",
"description": "sqlalchemy connection string",
"multi": false,
"required": false,
"type": "string"
}
]
}
185 changes: 136 additions & 49 deletions analyzers/MISPWarningLists/mispwarninglists.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@
from glob import glob
from os.path import exists

try:
import sqlalchemy as db
from tld import get_tld

USE_DB = True
except ImportError:
USE_DB = False


class MISPWarninglistsAnalyzer(Analyzer):
"""
Expand All @@ -22,92 +30,171 @@ class MISPWarninglistsAnalyzer(Analyzer):
}
```
"""

def __init__(self):
Analyzer.__init__(self)

self.data = self.get_data()
self.path = self.get_param('config.path', 'misp-warninglists')
if not exists(self.path):
self.error('Path to misp-warninglists does not exist.')
self.warninglists = self.readwarninglists()
self.path = self.get_param("config.path", "misp-warninglists")
conn = self.get_param("config.conn", None)
self.warninglists = self.readwarninglists() if not USE_DB else None
self.engine = db.create_engine(conn) if conn and USE_DB else None
if not exists(self.path) and not self.engine:
self.error("wrong configuration settings.")

def readwarninglists(self):
files = glob('{}/lists/*/*.json'.format(self.path))
files = glob("{}/lists/*/*.json".format(self.path))
listcontent = []
for file in files:
with io.open(file, 'r') as fh:
with io.open(file, "r") as fh:
content = json.loads(fh.read())
values = Extractor().check_iterable(content.get('list', []))
values = Extractor().check_iterable(content.get("list", []))
obj = {
"name": content.get('name', 'Unknown'),
"values": [value['data'] for value in values],
"dataTypes": [value['dataType'] for value in values]
"name": content.get("name", "Unknown"),
"values": [value["data"] for value in values],
"dataTypes": [value["dataType"] for value in values],
}
listcontent.append(obj)
return listcontent

def lastlocalcommit(self):
try:
with io.open('{}/.git/refs/heads/master'.format(self.path), 'r') as fh:
return fh.read().strip('\n')
with io.open("{}/.git/refs/heads/master".format(self.path), "r") as fh:
return fh.read().strip("\n")
except Exception as e:
return 'Error: could not get local commit hash ({}).'.format(e)
return "Error: could not get local commit hash ({}).".format(e)

@staticmethod
def lastremotecommit():
url = 'https://api.github.com/repos/misp/misp-warninglists/branches/master'
url = "https://api.github.com/repos/misp/misp-warninglists/branches/master"
try:
result_dict = requests.get(url).json()
return result_dict['commit']['sha']
return result_dict["commit"]["sha"]
except Exception as e:
return 'Error: could not get remote commit hash ({}).'.format(e)
return "Error: could not get remote commit hash ({}).".format(e)

def run(self):
results = []
data = self.data
if self.data_type == 'ip':

if self.data_type == "ip":
try:
data = ipaddress.ip_address(self.data)
except ValueError:
return self.error("{} is said to be an IP address but it isn't".format(self.data))
for list in self.warninglists:
if self.data_type not in list.get('dataTypes'):
continue

if self.data_type == 'ip':
for net in list.get('values', []):
try:
if data in ipaddress.ip_network(net):
results.append({"name": list.get('name')})
break
except ValueError:
# Ignoring if net is not a valid IP network since we want to compare ip addresses
pass
return self.error(
"{} is said to be an IP address but it isn't".format(self.data)
)

if not self.engine:
for list in self.warninglists:
if self.data_type not in list.get("dataTypes"):
continue

if self.data_type == "ip":
for net in list.get("values", []):
try:
if data in ipaddress.ip_network(net):
results.append({"name": list.get("name")})
break
except ValueError:
# Ignoring if net is not a valid IP network since we want to compare ip addresses
pass
else:
if data.lower() in list.get("values", []):
results.append({"name": list.get("name")})

self.report(
{
"results": results,
"mode": "json",
"is_uptodate": self.lastlocalcommit()
== self.lastremotecommit(),
}
)
else:
field = None
if self.data_type == "ip":
sql = (
"SELECT list_name, list_version, address as value FROM warninglists WHERE address >>= inet '%s'"
% data
)
elif self.data_type == "hash":
sql = (
"SELECT list_name, list_version, hash as value FROM warninglists WHERE hash='%s'"
% data
)
else:
if data.lower() in list.get('values', []):
results.append({
"name": list.get('name')
})

self.report({
"results": results,
"is_uptodate": self.lastlocalcommit() == self.lastremotecommit()
})
ext = get_tld(data, fix_protocol=True, as_object=True)
subdomain = ext.subdomain if ext.subdomain != "" else None
domain = ext.domain
tld = ext.tld
query = ext.parsed_url[2] if ext.parsed_url[2] != "" else None

if not domain or not tld:
return self.error(
"{} is not a valid url/domain/fqdn".format(self.data)
)

if query:
if subdomain and subdomain != "*":
sql = (
"SELECT list_name, list_version, concat(subdomain, '.', domain, '.', tld, query) as value FROM warninglists WHERE subdomain = '%s' and domain = '%s' and tld = '%s' and query = '%s'"
% (subdomain, domain, tld, query)
)
else:
sql = (
"SELECT list_name, list_version, concat(domain, '.', tld, query) as value FROM warninglists WHERE domain = '%s' and tld = '%s' and query = '%s'"
% (domain, tld, query)
)
elif not subdomain:
sql = (
"SELECT list_name, list_version, concat(domain, '.', tld) as value FROM warninglists WHERE subdomain is null and domain = '%s' and tld = '%s'"
% (domain, tld)
)
elif subdomain == "*":
sql = (
"SELECT list_name, list_version, concat(subdomain, '.', domain, '.', tld) as value FROM warninglists WHERE subdomain is not null and domain = '%s' and tld = '%s'"
% (domain, tld)
)
else:
sql = (
"SELECT list_name, list_version, concat(subdomain, '.', domain, '.', tld) as value FROM warninglists WHERE (subdomain = '%s' or subdomain = '*') and domain = '%s' and tld = '%s'"
% (subdomain, domain, tld)
)
values = self.engine.execute(sql)
self.engine.dispose()
if values.rowcount > 0:
for row in values:
results.append(
{
key: value
for (key, value) in zip(
["list_name", "list_version", "value"], row
)
}
)
self.report({"results": results, "mode": "db", "is_uptodate": "N/A"})

def summary(self, raw):
taxonomies = []
if len(raw['results']) > 0:
taxonomies.append(self.build_taxonomy('suspicious', 'MISP', 'Warninglists', 'Potential fp'))
if len(raw["results"]) > 0:
taxonomies.append(
self.build_taxonomy(
"suspicious", "MISP", "Warninglists", "Potential fp"
)
)
else:
taxonomies.append(self.build_taxonomy('info', 'MISP', 'Warninglists', 'No hits'))
taxonomies.append(
self.build_taxonomy("info", "MISP", "Warninglists", "No hits")
)

if not raw.get('is_uptodate', False):
taxonomies.append(self.build_taxonomy('info', 'MISP', 'Warninglists', 'Outdated'))
if raw.get("mode", None) == "json" and not raw.get("is_uptodate", False):
taxonomies.append(
self.build_taxonomy("info", "MISP", "Warninglists", "Outdated")
)

return {
"taxonomies": taxonomies
}
return {"taxonomies": taxonomies}


if __name__ == '__main__':
if __name__ == "__main__":
MISPWarninglistsAnalyzer().run()
3 changes: 3 additions & 0 deletions analyzers/MISPWarningLists/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
cortexutils
requests
ipaddress
tld
sqlalchemy
psycopg2-binary
Loading