Skip to content

Commit

Permalink
Merge pull request #86 from nineinchnick/element-findings
Browse files Browse the repository at this point in the history
assign findings to elements
  • Loading branch information
izar authored Mar 31, 2020
2 parents d6d0afe + b1e0017 commit 73fe5fe
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 269 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,29 @@ Name|From|To |Data|Protocol|Port
```

To group findings by elements, use a more advanced, nested loop:

```text
## Findings
{elements:repeat:{{item.findings:if:
### {{item.name}}
{{item.findings:repeat:
**Threat**: {{{{item.id}}}} - {{{{item.description}}}}
**Severity**: {{{{item.severity}}}}
**Mitigations**: {{{{item.mitigations}}}}
**References**: {{{{item.references}}}}
}}}}}
```

All items inside a loop must be escaped, doubling the braces, so `{item.name}` becomes `{{item.name}}`.
The example above uses two nested loops, so items in the inner loop must be escaped twice, that's why they're using four braces.

## Threats database

For the security practitioner, you may supply your own threats file by setting `TM.threatsFile`. It should contain entries like:
Expand Down
70 changes: 46 additions & 24 deletions pytm/pytm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import random
import sys
import uuid
import sys
from collections import defaultdict
from collections.abc import Iterable
from hashlib import sha224
Expand Down Expand Up @@ -98,6 +99,19 @@ def __set__(self, instance, value):
super().__set__(instance, value)


class varFindings(var):

def __set__(self, instance, value):
for i, e in enumerate(value):
if not isinstance(e, Finding):
raise ValueError(
"expecting a list of Findings, item number {} is a {}".format(
i, type(value)
)
)
super().__set__(instance, list(value))


def _setColor(element):
if element.inScope is True:
return "black"
Expand Down Expand Up @@ -206,22 +220,21 @@ class Threat():
references = varString("")
target = ()

def __init__(self, json_read):
self.id = json_read['SID']
self.description = json_read['description']
self.condition = json_read['condition']
self.target = json_read['target']
self.details = json_read['details']
self.severity = json_read['severity']
self.mitigations = json_read['mitigations']
self.example = json_read['example']
self.references = json_read['references']

if not isinstance(self.target, str) and isinstance(self.target, Iterable):
self.target = tuple(self.target)
def __init__(self, **kwargs):
self.id = kwargs['SID']
self.description = kwargs.get('description', '')
self.condition = kwargs.get('condition', 'True')
target = kwargs.get('target', 'Element')
if not isinstance(target, str) and isinstance(target, Iterable):
target = tuple(target)
else:
self.target = (self.target,)
self.target = tuple(getattr(sys.modules[__name__], x) for x in self.target)
target = (target,)
self.target = tuple(getattr(sys.modules[__name__], x) for x in target)
self.details = kwargs.get('details', '')
self.severity = kwargs.get('severity', '')
self.mitigations = kwargs.get('mitigations', '')
self.example = kwargs.get('example', '')
self.references = kwargs.get('references', '')

def __repr__(self):
return "<{0}.{1}({2}) at {3}>".format(
Expand Down Expand Up @@ -268,7 +281,6 @@ class TM():
_BagOfFlows = []
_BagOfElements = []
_BagOfThreats = []
_BagOfFindings = []
_BagOfBoundaries = []
_threatsExcluded = []
_sf = None
Expand All @@ -279,6 +291,7 @@ class TM():
doc="JSON file with custom threats")
isOrdered = varBool(False, doc="Automatically order all Dataflows")
mergeResponses = varBool(False, doc="Merge response edges in DFDs")
findings = varFindings([], doc="threats found for elements of this model")

def __init__(self, name, **kwargs):
for key, value in kwargs.items():
Expand All @@ -292,7 +305,6 @@ def reset(cls):
cls._BagOfFlows = []
cls._BagOfElements = []
cls._BagOfThreats = []
cls._BagOfFindings = []
cls._BagOfBoundaries = []

def _init_threats(self):
Expand All @@ -304,14 +316,23 @@ def _add_threats(self):
threats_json = json.load(threat_file)

for i in threats_json:
TM._BagOfThreats.append(Threat(i))
TM._BagOfThreats.append(Threat(**i))

def resolve(self):
for e in (TM._BagOfElements):
if e.inScope is True:
for t in TM._BagOfThreats:
if t.apply(e) is True:
TM._BagOfFindings.append(Finding(e, t.description, t.details, t.severity, t.mitigations, t.example, t.id, t.references))
findings = []
elements = defaultdict(list)
for e in TM._BagOfElements:
if not e.inScope:
continue
for t in TM._BagOfThreats:
if not t.apply(e):
continue
f = Finding(e, t.description, t.details, t.severity, t.mitigations, t.example, t.id, t.references)
findings.append(f)
elements[e].append(f)
self.findings = findings
for e, findings in elements.items():
e.findings = findings

def check(self):
if self.description is None:
Expand Down Expand Up @@ -361,7 +382,7 @@ def report(self, *args, **kwargs):
with open(self._template) as file:
template = file.read()

print(self._sf.format(template, tm=self, dataflows=self._BagOfFlows, threats=self._BagOfThreats, findings=self._BagOfFindings, elements=self._BagOfElements, boundaries=self._BagOfBoundaries))
print(self._sf.format(template, tm=self, dataflows=self._BagOfFlows, threats=self._BagOfThreats, findings=self.findings, elements=self._BagOfElements, boundaries=self._BagOfBoundaries))

def process(self):
self.check()
Expand Down Expand Up @@ -400,6 +421,7 @@ class Element():
definesConnectionTimeout = varBool(False)
OS = varString("")
isAdmin = varBool(False)
findings = varFindings([])

def __init__(self, name, **kwargs):
for key, value in kwargs.items():
Expand Down
14 changes: 2 additions & 12 deletions tests/test_private_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,7 @@ def test_defaults(self):
server_query = Dataflow(server, db, "server query")
func_query = Dataflow(func, db, "func query")

default = {
"SID": "",
"description": "",
"condition": "",
"target": ["Actor", "Boundary", "Dataflow", "Datastore", "Server"],
"details": "",
"severity": "",
"mitigations": "",
"example": "",
"references": "",
}
default_target = ["Actor", "Boundary", "Dataflow", "Datastore", "Server"]
testCases = [
{"target": server, "condition": "target.oneOf(Server, Datastore)"},
{"target": server, "condition": "not target.oneOf(Actor, Dataflow)"},
Expand All @@ -173,7 +163,7 @@ def test_defaults(self):
{"target": user, "condition": "target.inside(Boundary)"},
]
for case in testCases:
t = Threat({**default, **{"condition": case["condition"]}})
t = Threat(SID="", target=default_target, condition=case["condition"])
self.assertTrue(
t.apply(case["target"]),
"Failed to match {} against {}".format(
Expand Down
Loading

0 comments on commit 73fe5fe

Please sign in to comment.