diff --git a/README.md b/README.md
index 8e155e1..1ac64dd 100644
--- a/README.md
+++ b/README.md
@@ -64,10 +64,6 @@ that periodically cleans the Database.
from pytm.pytm import TM, Server, Datastore, Dataflow, Boundary, Actor, Lambda
-tm = TM("my test tm")
-tm.description = "another test tm"
-tm.isOrdered = True
-
User_Web = Boundary("User/Web")
Web_DB = Boundary("Web/DB")
@@ -111,6 +107,10 @@ db_to_web = Dataflow(db, web, "Comments contents")
db_to_web.protocol = "MySQL"
db_to_web.data = 'Results of insert op'
+tm = TM("my test tm")
+tm.description = "another test tm"
+tm.isOrdered = True
+tm.elements = [my_lambda_to_db, user_to_web, web_to_user, web_to_db, db_to_web]
tm.process()
```
diff --git a/pytm/pytm.py b/pytm/pytm.py
index 8e85da0..f7c2586 100644
--- a/pytm/pytm.py
+++ b/pytm/pytm.py
@@ -2,13 +2,13 @@
import inspect
import json
import logging
+import os
import random
import sys
import uuid
from collections import defaultdict
from collections.abc import Iterable
from hashlib import sha224
-from os.path import dirname
from textwrap import wrap
from weakref import WeakKeyDictionary
@@ -98,6 +98,19 @@ def __set__(self, instance, value):
super().__set__(instance, value)
+class varElements(var):
+
+ def __set__(self, instance, value):
+ for i, e in enumerate(value):
+ if not isinstance(e, Element):
+ raise ValueError(
+ "expecting a list of Elements, item number {} is a {}".format(
+ type(value)
+ )
+ )
+ super().__set__(instance, list(value))
+
+
def _setColor(element):
if element.inScope is True:
return "black"
@@ -109,8 +122,8 @@ def _setLabel(element):
return "
".join(wrap(element.name, 14))
-def _sort(elements, addOrder=False):
- ordered = sorted(elements, key=lambda flow: flow.order)
+def _sort(flows, addOrder=False):
+ ordered = sorted(flows, key=lambda flow: flow.order)
if not addOrder:
return ordered
for i, flow in enumerate(ordered):
@@ -149,7 +162,7 @@ def _match_responses(flows):
return flows
-def _applyDefaults(elements):
+def _apply_defaults(elements):
for e in elements:
e._safeset("data", e.source.data)
if e.isResponse:
@@ -190,6 +203,32 @@ def _describe_classes(classes):
print()
+def _sort_elements(elements):
+ ''' sort elements by order in which they appear in dataflows '''
+ if not elements:
+ return elements
+ orders = {}
+ for e in elements:
+ if not hasattr(e, "order"):
+ continue
+ if e.source not in orders or orders[e.source] > e.order:
+ orders[e.source] = e.order
+ m = max(orders.values()) + 1
+ # sort in this order:
+ # assets in order in which they appear in dataflows
+ # dataflows in order
+ # other types not assigned to dataflows (boundaries), by name
+ return sorted(
+ elements,
+ key=lambda e: (
+ orders.get(e, m),
+ e.__class__.__name__,
+ getattr(e, "order", 0),
+ str(e),
+ ),
+ )
+
+
''' End of help functions '''
@@ -204,24 +243,23 @@ class Threat():
mitigations = varString("")
example = varString("")
references = varString("")
- target = ()
-
- def __init__(self, json_read):
- self.id = json_read['SID']
- self.description = json_read['description']
- self.condition = json_read['condition']
- self.target = json_read['target']
- self.details = json_read['details']
- self.severity = json_read['severity']
- self.mitigations = json_read['mitigations']
- self.example = json_read['example']
- self.references = json_read['references']
-
- if not isinstance(self.target, str) and isinstance(self.target, Iterable):
- self.target = tuple(self.target)
- else:
- self.target = (self.target,)
- self.target = tuple(getattr(sys.modules[__name__], x) for x in self.target)
+ target = var([])
+
+ ''' Represents a possible threat '''
+ def __init__(self, **kwargs):
+ self.id = kwargs['SID']
+ self.description = kwargs.get('description', '')
+ self.condition = kwargs.get('condition', 'True')
+ self.details = kwargs.get('details', '')
+ self.severity = kwargs.get('severity', '')
+ self.mitigations = kwargs.get('mitigations', '')
+ self.example = kwargs.get('example', '')
+ self.references = kwargs.get('references', '')
+
+ target = kwargs.get("target", "Element")
+ if isinstance(target, str) or not isinstance(target, Iterable):
+ target = [target]
+ self.target = tuple(getattr(sys.modules[__name__], x) for x in target)
def __repr__(self):
return "<{0}.{1}({2}) at {3}>".format(
@@ -250,53 +288,120 @@ class Finding():
id = varString("", required=True, doc="Threat ID")
references = varString("", required=True, doc="Threat references")
- def __init__(self, element, description, details, severity, mitigations, example, id, references):
+ def __init__(
+ self,
+ element,
+ **kwargs,
+ ):
self.target = element.name
self.element = element
- self.description = description
- self.details = details
- self.severity = severity
- self.mitigations = mitigations
- self.example = example
- self.id = id
- self.references = references
+ attrs = [
+ "description",
+ "details",
+ "severity",
+ "mitigations",
+ "example",
+ "id",
+ "references",
+ ]
+ threat = kwargs.get("threat", None)
+ if threat:
+ for a in attrs:
+ setattr(self, a, getattr(threat, a))
+ return
+
+ for a in attrs:
+ if a in kwargs:
+ setattr(self, a, kwargs.get(a))
+
+ def __repr__(self):
+ return "<{0}.{1}({2}) at {3}>".format(
+ self.__module__, type(self).__name__, self.id, hex(id(self))
+ )
+
+ def __str__(self):
+ return "{0}({1})".format(type(self).__name__, self.id)
class TM():
"""Describes the threat model administratively, and holds all details during a run"""
-
- _BagOfFlows = []
- _BagOfElements = []
- _BagOfThreats = []
- _BagOfFindings = []
- _BagOfBoundaries = []
- _threatsExcluded = []
_sf = None
name = varString("", required=True, doc="Model name")
description = varString("", required=True, doc="Model description")
- threatsFile = varString(dirname(__file__) + "/threatlib/threats.json",
- onSet=lambda i, v: i._init_threats(),
- doc="JSON file with custom threats")
- isOrdered = varBool(False, doc="Automatically order all Dataflows")
+ elements = varElements([], onSet=lambda i, v: i._init_elements())
+ threatsFile = varString(
+ os.path.dirname(__file__) + "/threatlib/threats.json",
+ onSet=lambda i, v: i._init_threats(),
+ doc="JSON file with custom threats",
+ )
+ isOrdered = varBool(
+ False,
+ onSet=lambda i, v: i._init_elements(),
+ doc="Automatically order all Dataflows",
+ )
mergeResponses = varBool(False, doc="Merge response edges in DFDs")
def __init__(self, name, **kwargs):
+ self._threats = []
+ self._findings = []
+ self._threatsExcluded = []
+ self._sf = SuperFormatter()
for key, value in kwargs.items():
setattr(self, key, value)
self.name = name
- self._sf = SuperFormatter()
self._add_threats()
- @classmethod
- def reset(cls):
- cls._BagOfFlows = []
- cls._BagOfElements = []
- cls._BagOfThreats = []
- cls._BagOfFindings = []
- cls._BagOfBoundaries = []
+ def _init_elements(self):
+ self._flows = []
+ self._boundaries = []
+ self._elements = []
+ # first find all Dataflows to retain order
+ for e in self.elements:
+ if isinstance(e, Dataflow):
+ self._flows.append(e)
+ if isinstance(e, Boundary) and e not in self._boundaries:
+ self._boundaries.append(e)
+ # elements can contain Boundaries and Dataflows so create a complete set
+ # including their elements, source and sink
+ all_elements = set(self.elements)
+ for e in self.elements:
+ try:
+ all_elements |= set(e.elements)
+ except AttributeError:
+ pass
+ try:
+ all_elements.add(e.source)
+ if e.source.inBoundary is not None:
+ all_elements.add(e.source.inBoundary)
+ all_elements.add(e.sink)
+ if e.sink.inBoundary is not None:
+ all_elements.add(e.sink.inBoundary)
+ except AttributeError:
+ pass
+
+ self._flows = _match_responses(_sort(self._flows, self.isOrdered))
+ _apply_defaults(self._flows)
+ self._elements = _sort_elements(all_elements)
+
+ # gather elements that need to be assigned to boundaries
+ boundElements = defaultdict(list)
+ for e in self._elements:
+ try:
+ if e.inBoundary is None:
+ continue
+ except AttributeError:
+ continue
+ if e.inBoundary not in self._boundaries:
+ self._boundaries.append(e.inBoundary)
+ if e not in e.inBoundary.elements:
+ boundElements[e.inBoundary].append(e)
+
+ for boundary, elements in boundElements.items():
+ # if boundary was initiated with any elements, this will throw an exception
+ boundary.elements = elements
def _init_threats(self):
- TM._BagOfThreats = []
+ self._threats = []
self._add_threats()
def _add_threats(self):
@@ -304,44 +409,48 @@ def _add_threats(self):
threats_json = json.load(threat_file)
for i in threats_json:
- TM._BagOfThreats.append(Threat(i))
+ self._threats.append(Threat(**i))
def resolve(self):
- for e in (TM._BagOfElements):
- if e.inScope is True:
- for t in TM._BagOfThreats:
- if t.apply(e) is True:
- TM._BagOfFindings.append(Finding(e, t.description, t.details, t.severity, t.mitigations, t.example, t.id, t.references))
+ for e in self._elements:
+ if not e.inScope:
+ continue
+ for t in self._threats:
+ if not t.apply(e):
+ continue
+ self._findings.append(Finding(e, threat=t))
def check(self):
if self.description is None:
- raise ValueError("Every threat model should have at least a brief description of the system being modeled.")
- _applyDefaults(TM._BagOfFlows)
- for e in (TM._BagOfElements):
- e.check()
- TM._BagOfFlows = _match_responses(_sort(TM._BagOfFlows, self.isOrdered))
+ raise ValueError(
+ "Every threat model should have at least "
+ "a brief description of the system being modeled."
+ )
+ result = True
+ for e in self._elements:
+ if not e.check():
+ result = False
+ return result
def dfd(self):
print("digraph tm {\n\tgraph [\n\tfontname = Arial;\n\tfontsize = 14;\n\t]")
print("\tnode [\n\tfontname = Arial;\n\tfontsize = 14;\n\trankdir = lr;\n\t]")
print("\tedge [\n\tshape = none;\n\tfontname = Arial;\n\tfontsize = 12;\n\t]")
print('\tlabelloc = "t";\n\tfontsize = 20;\n\tnodesep = 1;\n')
- for b in TM._BagOfBoundaries:
+ for b in self._boundaries:
b.dfd()
-
if self.mergeResponses:
- for e in TM._BagOfFlows:
+ for e in self._flows:
if e.response is not None:
e.response._is_drawn = True
- for e in TM._BagOfElements:
- # Boundaries draw themselves
- if not e._is_drawn and not isinstance(e, Boundary) and e.inBoundary is None:
+ for e in self._elements:
+ if not e._is_drawn and e.inBoundary is None:
e.dfd(mergeResponses=self.mergeResponses)
print("}")
def seq(self):
print("@startuml")
- for e in TM._BagOfElements:
+ for e in self._elements:
if isinstance(e, Actor):
print("actor {0} as \"{1}\"".format(e._uniq_name(), e.name))
elif isinstance(e, Datastore):
@@ -349,7 +458,7 @@ def seq(self):
elif not isinstance(e, Dataflow) and not isinstance(e, Boundary):
print("entity {0} as \"{1}\"".format(e._uniq_name(), e.name))
- for e in TM._BagOfFlows:
+ for e in self._flows:
print("{0} -> {1}: {2}".format(e.source._uniq_name(), e.sink._uniq_name(), e.name))
if e.note != "":
print("note left\n{}\nend note".format(e.note))
@@ -357,11 +466,20 @@ def seq(self):
def report(self, *args, **kwargs):
result = get_args()
- TM._template = result.report
+ self._template = result.report
with open(self._template) as file:
template = file.read()
- print(self._sf.format(template, tm=self, dataflows=self._BagOfFlows, threats=self._BagOfThreats, findings=self._BagOfFindings, elements=self._BagOfElements, boundaries=self._BagOfBoundaries))
+ output = self._sf.format(
+ template,
+ tm=self,
+ dataflows=self._flows,
+ threats=self._threats,
+ findings=self._findings,
+ elements=self._elements,
+ boundaries=self._boundaries,
+ )
+ print(output)
def process(self):
self.check()
@@ -377,11 +495,11 @@ def process(self):
self.resolve()
self.report()
if result.exclude is not None:
- TM._threatsExcluded = result.exclude.split(",")
+ self._threatsExcluded = result.exclude.split(",")
if result.describe is not None:
_describe_classes(result.describe.split())
if result.list is True:
- [print("{} - {}".format(t.id, t.description)) for t in TM._BagOfThreats]
+ [print("{} - {}".format(t.id, t.description)) for t in self._threats]
sys.exit(0)
@@ -407,7 +525,6 @@ def __init__(self, name, **kwargs):
self.name = name
self.uuid = uuid.UUID(int=random.getrandbits(128))
self._is_drawn = False
- TM._BagOfElements.append(self)
def __repr__(self):
return "<{0}.{1}({2}) at {3}>".format(
@@ -415,7 +532,7 @@ def __repr__(self):
)
def __str__(self):
- return "{0}({1})".format(type(self).__name__, self.name)
+ return '{0}({1})'.format(type(self).__name__, self.name)
def _uniq_name(self):
''' transform name and uuid into a unique string '''
@@ -424,11 +541,8 @@ def _uniq_name(self):
return "{0}_{1}_{2}".format(type(self).__name__.lower(), name, h[:10])
def check(self):
- return True
''' makes sure it is good to go '''
- # all minimum annotations are in place
- if self.description == "" or self.name == "":
- raise ValueError("Element {} need a description and a name.".format(self.name))
+ return True
def dfd(self, **kwargs):
self._is_drawn = True
@@ -517,7 +631,7 @@ def __init__(self, name, **kwargs):
def dfd(self, **kwargs):
self._is_drawn = True
color = _setColor(self)
- pngpath = dirname(__file__) + "/images/lambda.png"
+ pngpath = os.path.dirname(__file__) + "/images/lambda.png"
label = _setLabel(self)
print('{0} [\n\tshape = none\n\tfixedsize=shape\n\timage="{2}"\n\timagescale=true\n\tcolor = {1}'.format(self._uniq_name(), color, pngpath))
print('\tlabel = <
>;'.format(label))
@@ -729,17 +843,10 @@ def __init__(self, source, sink, name, **kwargs):
self.source = source
self.sink = sink
super().__init__(name, **kwargs)
- TM._BagOfFlows.append(self)
def __set__(self, instance, value):
print("Should not have gotten here.")
- def check(self):
- ''' makes sure it is good to go '''
- # all minimum annotations are in place
- # then add itself to _BagOfFlows
- pass
-
def dfd(self, mergeResponses=False, **kwargs):
self._is_drawn = True
color = _setColor(self)
@@ -766,10 +873,10 @@ def dfd(self, mergeResponses=False, **kwargs):
class Boundary(Element):
"""Trust boundary"""
+ elements = varElements([])
+
def __init__(self, name, **kwargs):
super().__init__(name, **kwargs)
- if name not in TM._BagOfBoundaries:
- TM._BagOfBoundaries.append(self)
def dfd(self):
if self._is_drawn:
@@ -779,11 +886,12 @@ def dfd(self):
logger.debug("Now drawing boundary " + self.name)
label = self.name
print("subgraph cluster_{0} {{\n\tgraph [\n\t\tfontsize = 10;\n\t\tfontcolor = firebrick2;\n\t\tstyle = dashed;\n\t\tcolor = firebrick2;\n\t\tlabel = <{1}>;\n\t]\n".format(self._uniq_name(), label))
- for e in TM._BagOfElements:
- if e.inBoundary == self and not e._is_drawn:
- # The content to draw can include Boundary objects
- logger.debug("Now drawing content {}".format(e.name))
- e.dfd()
+ for e in self.elements:
+ if e._is_drawn:
+ continue
+ # The content to draw can include Boundary objects
+ logger.debug("Now drawing content {}".format(e.name))
+ e.dfd()
print("\n}\n")
diff --git a/tests/dfd.dot b/tests/dfd.dot
index f83a261..1c3b682 100644
--- a/tests/dfd.dot
+++ b/tests/dfd.dot
@@ -55,12 +55,6 @@ server_WebServer_f2eb7a3ff7 [
color = black
label = <>;
]
- actor_User_579e9aae81 -> server_WebServer_f2eb7a3ff7 [
- color = black;
- dir = forward;
-
- label = <>;
- ]
server_WebServer_f2eb7a3ff7 -> datastore_SQLDatabase_d2006ce1bb [
color = black;
dir = forward;
@@ -79,4 +73,10 @@ server_WebServer_f2eb7a3ff7 [
label = <>;
]
+ actor_User_579e9aae81 -> server_WebServer_f2eb7a3ff7 [
+ color = black;
+ dir = forward;
+
+ label = <>;
+ ]
}
diff --git a/tests/test_private_func.py b/tests/test_private_func.py
index eb973c4..97773f6 100644
--- a/tests/test_private_func.py
+++ b/tests/test_private_func.py
@@ -36,7 +36,7 @@ def test_kwargs(self):
def test_load_threats(self):
tm = TM("TM")
- self.assertNotEqual(len(TM._BagOfThreats), 0)
+ self.assertNotEqual(len(tm._threats), 0)
with self.assertRaises(FileNotFoundError):
tm.threatsFile = "threats.json"
@@ -44,8 +44,6 @@ def test_load_threats(self):
TM("TM", threatsFile="threats.json")
def test_responses(self):
- tm = TM("my test tm", description="aa", isOrdered=True)
-
user = Actor("User")
web = Server("Web Server")
db = Datastore("SQL Database")
@@ -57,6 +55,8 @@ def test_responses(self):
http_resp = Dataflow(web, user, "http resp")
http_resp.responseTo = http_req
+ tm = TM("my test tm", description="aa", isOrdered=True)
+ tm.elements = [http_req, insert, query, query_resp, http_resp]
tm.check()
self.assertEqual(http_req.response, http_resp)
@@ -70,7 +70,6 @@ def test_responses(self):
self.assertIs(insert.isResponse, False)
def test_defaults(self):
- tm = TM("TM")
user = Actor("User", data="HTTP")
server = Server(
"Server", port=443, protocol="HTTPS", isEncrypted=True, data="JSON"
@@ -92,6 +91,8 @@ def test_defaults(self):
req_post = Dataflow(user, server, "HTTP POST", data="JSON")
resp_post = Dataflow(server, user, "HTTP Response", isResponse=True)
+ tm = TM("TM")
+ tm.elements = [req_get, query, result, resp_get, req_post, resp_post]
tm.check()
self.assertEqual(req_get.srcPort, -1)
@@ -146,17 +147,7 @@ def test_defaults(self):
server_query = Dataflow(server, db, "server query")
func_query = Dataflow(func, db, "func query")
- default = {
- "SID": "",
- "description": "",
- "condition": "",
- "target": ["Actor", "Boundary", "Dataflow", "Datastore", "Server"],
- "details": "",
- "severity": "",
- "mitigations": "",
- "example": "",
- "references": "",
- }
+ default_target = ["Actor", "Boundary", "Dataflow", "Datastore", "Server"]
testCases = [
{"target": server, "condition": "target.oneOf(Server, Datastore)"},
{"target": server, "condition": "not target.oneOf(Actor, Dataflow)"},
@@ -173,7 +164,7 @@ def test_defaults(self):
{"target": user, "condition": "target.inside(Boundary)"},
]
for case in testCases:
- t = Threat({**default, **{"condition": case["condition"]}})
+ t = Threat(SID="", target=default_target, condition=case["condition"])
self.assertTrue(
t.apply(case["target"]),
"Failed to match {} against {}".format(
diff --git a/tests/test_pytmfunc.py b/tests/test_pytmfunc.py
index 36ca112..9b5dc76 100644
--- a/tests/test_pytmfunc.py
+++ b/tests/test_pytmfunc.py
@@ -9,12 +9,22 @@
from os.path import dirname
from io import StringIO
-from pytm import (TM, Actor, Boundary, Dataflow, Datastore, ExternalEntity,
- Lambda, Process, Server, Threat)
+from pytm import (
+ Actor,
+ Boundary,
+ Dataflow,
+ Datastore,
+ ExternalEntity,
+ Lambda,
+ Process,
+ Server,
+ Threat,
+ TM,
+)
with open(os.path.abspath(os.path.join(dirname(__file__), '..')) + "/pytm/threatlib/threats.json", "r") as threat_file:
- threats_json = json.load(threat_file)
+ threats = {t["SID"]: Threat(**t) for t in json.load(threat_file)}
@contextmanager
@@ -36,19 +46,20 @@ def test_seq(self):
with open(os.path.join(dir_path, 'seq.plantuml')) as x:
expected = x.read().strip()
- TM.reset()
- tm = TM("my test tm", description="aaa")
internet = Boundary("Internet")
server_db = Boundary("Server/DB")
user = Actor("User", inBoundary=internet)
web = Server("Web Server")
db = Datastore("SQL Database", inBoundary=server_db)
- Dataflow(user, web, "User enters comments (*)", note="bbb")
- Dataflow(web, db, "Insert query with comments", note="ccc")
- Dataflow(db, web, "Retrieve comments")
- Dataflow(web, user, "Show comments (*)")
+ flows = [
+ Dataflow(user, web, "User enters comments (*)", note="bbb"),
+ Dataflow(web, db, "Insert query with comments", note="ccc"),
+ Dataflow(db, web, "Retrieve comments"),
+ Dataflow(web, user, "Show comments (*)"),
+ ]
+ tm = TM("my test tm", description="aaa", elements=flows, isOrdered=True)
with captured_output() as (out, err):
tm.seq()
@@ -63,19 +74,20 @@ def test_dfd(self):
random.seed(0)
- TM.reset()
- tm = TM("my test tm", description="aaa")
internet = Boundary("Internet")
server_db = Boundary("Server/DB")
user = Actor("User", inBoundary=internet)
web = Server("Web Server")
db = Datastore("SQL Database", inBoundary=server_db)
- Dataflow(user, web, "User enters comments (*)")
- Dataflow(web, db, "Insert query with comments")
- Dataflow(db, web, "Retrieve comments")
- Dataflow(web, user, "Show comments (*)")
+ flows = [
+ Dataflow(user, web, "User enters comments (*)"),
+ Dataflow(web, db, "Insert query with comments"),
+ Dataflow(db, web, "Retrieve comments"),
+ Dataflow(web, user, "Show comments (*)"),
+ ]
+ tm = TM("my test tm", description="aaa", elements=flows)
with captured_output() as (out, err):
tm.dfd()
@@ -83,8 +95,145 @@ def test_dfd(self):
self.maxDiff = None
self.assertEqual(output, expected)
+ def test_lazy_init(self):
+ internet = Boundary("Internet")
+ server_db = Boundary("Server/DB")
+ vpc = Boundary("AWS VPC")
+
+ user = Actor("User")
+ user.protocol = "HTTP"
+ user.inBoundary = internet
+
+ web = Server("Web Server")
+ web.OS = "Ubuntu"
+ web.protocol = "HTTP"
+ web.dstPort = 80
+ web.isHardened = True
+ web.sanitizesInput = False
+ web.encodesOutput = True
+ web.authorizesSource = False
+
+ db = Datastore("SQL Database")
+ db.OS = "CentOS"
+ db.protocol = "MySQL"
+ db.dstPort = 3306
+ db.isHardened = False
+ db.inBoundary = server_db
+ db.isSQL = True
+ db.inScope = True
+
+ my_lambda = Lambda("AWS Lambda")
+ my_lambda.hasAccessControl = True
+ my_lambda.inBoundary = vpc
+
+ user_to_web = Dataflow(user, web, "User enters comments (*)")
+ user_to_web.data = 'Comments in HTML or Markdown'
+ user_to_web.note = "This is a simple web app."
+
+ web_to_db = Dataflow(web, db, "Insert query with comments")
+ web_to_db.data = 'MySQL insert statement, all literals'
+ web_to_db.note = "Web server inserts user comments."
+
+ db_to_web = Dataflow(db, web, "Retrieve comments")
+ db_to_web.protocol = "MySQL"
+ db_to_web.data = 'Web server retrieves comments from DB'
+
+ web_to_user = Dataflow(web, user, "Show comments (*)")
+ web_to_user.data = 'Web server shows comments to the end user'
+
+ my_lambda_to_db = Dataflow(my_lambda, db, "Lambda periodically cleans DB")
+ my_lambda_to_db.data = "Lamda clears DB every 6 hours"
+
+ tm = TM("my test tm")
+ tm.description = "desc"
+ tm.isOrdered = True
+ tm.elements = [user_to_web, web_to_db, db_to_web, web_to_user, my_lambda_to_db]
-class Testpytm(unittest.TestCase):
+ assets = [user, web, db, my_lambda]
+ dataflows = [user_to_web, web_to_db, db_to_web, web_to_user, my_lambda_to_db]
+ boundaries = [vpc, internet, server_db]
+ self.assertTrue(tm.check())
+ self.maxDiff = None
+ self.assertListEqual(tm._elements, assets + boundaries + dataflows)
+ self.assertListEqual(tm._boundaries, [internet, server_db, vpc])
+ self.assertListEqual(tm._flows, dataflows)
+
+ def test_eager_init(self):
+ user = Actor("User", protocol="HTTP")
+ web = Server(
+ "Web Server",
+ OS="Ubuntu",
+ protocol="HTTP",
+ dstPort=80,
+ isHardened=True,
+ sanitizesInput=False,
+ encodesOutput=True,
+ authorizesSource=False,
+ )
+ db = Datastore(
+ "SQL Database",
+ OS="CentOS",
+ protocol="MySQL",
+ dstPort=3306,
+ isHardened=False,
+ isSQL=True,
+ )
+ my_lambda = Lambda("AWS Lambda", hasAccessControl=True)
+
+ elements = [
+ Boundary("AWS VPC", elements=[my_lambda]),
+ Boundary("Internet", elements=[user]),
+ Boundary("Server/DB", elements=[web, db]),
+ Dataflow(
+ user,
+ web,
+ "User enters comments (*)",
+ data="Comments in HTML or Markdown",
+ note="This is a simple web app.",
+ ),
+ Dataflow(
+ web,
+ db,
+ "Insert query with comments",
+ data="MySQL insert statement, all literals",
+ note="Web server inserts user comments.",
+ ),
+ Dataflow(
+ db,
+ web,
+ "Retrieve comments",
+ protocol="MySQL",
+ data="Web server retrieves comments from DB",
+ ),
+ Dataflow(
+ web,
+ user,
+ "Show comments (*)",
+ data="Web server shows comments to the end user",
+ ),
+ Dataflow(
+ my_lambda,
+ db,
+ "Lambda periodically cleans DB",
+ data="Lamda clears DB every 6 hours",
+ ),
+ ]
+
+ tm = TM(
+ "my test tm",
+ description="desc",
+ isOrdered=True,
+ elements=elements,
+ )
+
+ assets = [user, web, db, my_lambda]
+ self.assertTrue(tm.check())
+ self.assertListEqual(tm._elements, assets + elements)
+ self.assertEqual(len(tm._boundaries), 3)
+ self.assertEqual(len(tm._flows), 5)
+
+
+class TestThreats(unittest.TestCase):
# Test for all the threats in threats.py - test Threat.apply() function
def test_INP01(self):
@@ -93,25 +242,25 @@ def test_INP01(self):
lambda1.usesEnvironmentVariables = True
lambda1.sanitizesInput = False
lambda1.checksInputBounds = False
- process1.usesEnvironmentVariables = True
- process1.sanitizesInput = False
+ process1.usesEnvironmentVariables = True
+ process1.sanitizesInput = False
process1.checksInputBounds = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP01"))
- self.assertTrue(ThreatObj.apply(lambda1))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP01"]
+ self.assertTrue(threat.apply(lambda1))
+ self.assertTrue(threat.apply(process1))
def test_INP02(self):
process1 = Process('myprocess')
process1.checksInputBounds = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP02"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP02"]
+ self.assertTrue(threat.apply(process1))
def test_INP03(self):
web = Server('Web')
web.sanitizesInput = False
web.encodesOutput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP03"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP03"]
+ self.assertTrue(threat.apply(web))
def test_CR01(self):
user = Actor("User")
@@ -123,17 +272,17 @@ def test_CR01(self):
user_to_web.protocol = 'HTTP'
user_to_web.usesVPN = False
user_to_web.usesSessionTokens = True
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR01"))
- self.assertTrue(ThreatObj.apply(web))
- self.assertTrue(ThreatObj.apply(user_to_web))
+ threat = threats["CR01"]
+ self.assertTrue(threat.apply(web))
+ self.assertTrue(threat.apply(user_to_web))
def test_INP04(self):
web = Server("Web Server")
web.validatesInput = False
web.validatesHeaders = False
web.protocol = 'HTTP'
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP04"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP04"]
+ self.assertTrue(threat.apply(web))
def test_CR02(self):
user = Actor("User")
@@ -147,30 +296,30 @@ def test_CR02(self):
user_to_web.sanitizesInput = False
user_to_web.validatesInput = False
user_to_web.usesSessionTokens = True
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR02"))
- self.assertTrue(ThreatObj.apply(web))
- self.assertTrue(ThreatObj.apply(user_to_web))
+ threat = threats["CR02"]
+ self.assertTrue(threat.apply(web))
+ self.assertTrue(threat.apply(user_to_web))
def test_INP05(self):
web = Server("Web Server")
web.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP05"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP05"]
+ self.assertTrue(threat.apply(web))
def test_INP06(self):
web = Server("Web Server")
web.protocol = 'SOAP'
web.sanitizesInput = False
web.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP06"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP06"]
+ self.assertTrue(threat.apply(web))
def test_SC01(self):
process1 = Process("Process1")
process1.implementsNonce = False
process1.data = 'JSON'
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "SC01"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["SC01"]
+ self.assertTrue(threat.apply(process1))
def test_LB01(self):
process1 = Process("Process1")
@@ -181,35 +330,35 @@ def test_LB01(self):
lambda1.implementsAPI = True
lambda1.validatesInput = False
lambda1.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "LB01"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
+ threat = threats["LB01"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
def test_AA01(self):
process1 = Process("Process1")
web = Server("Web Server")
process1.authenticatesSource = False
web.authenticatesSource = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AA01"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AA01"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(web))
def test_DS01(self):
web = Server("Web Server")
web.sanitizesInput = False
web.validatesInput = False
web.encodesOutput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DS01"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["DS01"]
+ self.assertTrue(threat.apply(web))
def test_DE01(self):
user = Actor("User")
- web = Server("Web Server")
+ web = Server("Web Server")
user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.protocol = 'HTTP'
user_to_web.isEncrypted = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DE01"))
- self.assertTrue(ThreatObj.apply(user_to_web))
+ threat = threats["DE01"]
+ self.assertTrue(threat.apply(user_to_web))
def test_DE02(self):
web = Server("Web Server")
@@ -218,18 +367,18 @@ def test_DE02(self):
web.sanitizesInput = False
process1.validatesInput = False
process1.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DE02"))
- self.assertTrue(ThreatObj.apply(web))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["DE02"]
+ self.assertTrue(threat.apply(web))
+ self.assertTrue(threat.apply(process1))
def test_API01(self):
process1 = Process("Process1")
lambda1 = Lambda("Lambda1")
process1.implementsAPI = True
lambda1.implementsAPI = True
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "API01"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
+ threat = threats["API01"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
def test_AC01(self):
web = Server("Web Server")
@@ -241,22 +390,22 @@ def test_AC01(self):
process1.authorizesSource = False
db.hasAccessControl = False
db.authorizesSource = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC01"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(web))
- self.assertTrue(ThreatObj.apply(db))
+ threat = threats["AC01"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(web))
+ self.assertTrue(threat.apply(db))
def test_INP07(self):
process1 = Process("Process1")
process1.usesSecureFunctions = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP07"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP07"]
+ self.assertTrue(threat.apply(process1))
def test_AC02(self):
db = Datastore("DB")
db.isShared = True
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC02"))
- self.assertTrue(ThreatObj.apply(db))
+ threat = threats["AC02"]
+ self.assertTrue(threat.apply(db))
def test_DO01(self):
process1 = Process("Process1")
@@ -264,16 +413,16 @@ def test_DO01(self):
process1.handlesResourceConsumption = False
process1.isResilient = False
web.handlesResourceConsumption = True
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DO01"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["DO01"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(web))
def test_HA01(self):
web = Server("Web Server")
web.validatesInput = False
web.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "HA01"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["HA01"]
+ self.assertTrue(threat.apply(web))
def test_AC03(self):
process1 = Process("Process1")
@@ -286,9 +435,9 @@ def test_AC03(self):
lambda1.implementsAuthenticationScheme = False
lambda1.validatesInput = False
lambda1.authorizesSource = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC03"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
+ threat = threats["AC03"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
def test_DO02(self):
process1 = Process("Process1")
@@ -299,20 +448,20 @@ def test_DO02(self):
lambda1.handlesResourceConsumption = False
web.handlesResourceConsumption = False
db.handlesResourceConsumption = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DO02"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
- self.assertTrue(ThreatObj.apply(web))
- self.assertTrue(ThreatObj.apply(db))
+ threat = threats["DO02"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
+ self.assertTrue(threat.apply(web))
+ self.assertTrue(threat.apply(db))
def test_DS02(self):
process1 = Process("Process1")
lambda1 = Lambda("Lambda1")
process1.environment = 'Production'
lambda1.environment = 'Production'
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DS02"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
+ threat = threats["DS02"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
def test_INP08(self):
process1 = Process("Process1")
@@ -324,29 +473,29 @@ def test_INP08(self):
lambda1.sanitizesInput = False
web.validatesInput = False
web.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP08"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP08"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
+ self.assertTrue(threat.apply(web))
def test_INP09(self):
- web = Server("Web Server")
+ web = Server("Web Server")
web.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP09"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP09"]
+ self.assertTrue(threat.apply(web))
def test_INP10(self):
- web = Server("Web Server")
+ web = Server("Web Server")
web.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP10"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP10"]
+ self.assertTrue(threat.apply(web))
def test_INP11(self):
- web = Server("Web Server")
+ web = Server("Web Server")
web.validatesInput = False
web.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP11"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP11"]
+ self.assertTrue(threat.apply(web))
def test_INP12(self):
process1 = Process("Process1")
@@ -355,26 +504,26 @@ def test_INP12(self):
process1.validatesInput = False
lambda1.checksInputBounds = False
lambda1.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP12"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
+ threat = threats["INP12"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
def test_AC04(self):
user = Actor("User")
- web = Server("Web Server")
+ web = Server("Web Server")
user_to_web = Dataflow(user, web, "User enters comments (*)")
- user_to_web.data = 'XML'
+ user_to_web.data = 'XML'
user_to_web.authorizesSource = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC04"))
- self.assertTrue(ThreatObj.apply(user_to_web))
+ threat = threats["AC04"]
+ self.assertTrue(threat.apply(user_to_web))
def test_DO03(self):
user = Actor("User")
- web = Server("Web Server")
+ web = Server("Web Server")
user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.data = 'XML'
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DO03"))
- self.assertTrue(ThreatObj.apply(user_to_web))
+ threat = threats["DO03"]
+ self.assertTrue(threat.apply(user_to_web))
def test_AC05(self):
process1 = Process("Process1")
@@ -383,18 +532,18 @@ def test_AC05(self):
process1.authorizesSource = False
web.providesIntegrity = False
web.authorizesSource = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC05"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AC05"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(web))
def test_INP13(self):
process1 = Process("Process1")
lambda1 = Lambda("Lambda1")
process1.validatesInput = False
lambda1.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP13"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
+ threat = threats["INP13"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
def test_INP14(self):
process1 = Process("Process1")
@@ -403,29 +552,29 @@ def test_INP14(self):
process1.validatesInput = False
lambda1.validatesInput = False
web.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP14"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP14"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
+ self.assertTrue(threat.apply(web))
def test_DE03(self):
user = Actor("User")
- web = Server("Web Server")
+ web = Server("Web Server")
user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.protocol = 'HTTP'
user_to_web.isEncrypted = False
user_to_web.usesVPN = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DE03"))
- self.assertTrue(ThreatObj.apply(user_to_web))
+ threat = threats["DE03"]
+ self.assertTrue(threat.apply(user_to_web))
def test_CR03(self):
process1 = Process("Process1")
web = Server("Web Server")
process1.implementsAuthenticationScheme = False
web.implementsAuthenticationScheme = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR03"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["CR03"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(web))
def test_API02(self):
process1 = Process("Process1")
@@ -434,117 +583,117 @@ def test_API02(self):
process1.validatesInput = False
lambda1.implementsAPI = True
lambda1.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "API02"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
+ threat = threats["API02"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
def test_HA02(self):
EE = ExternalEntity("EE")
EE.hasPhysicalAccess = True
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "HA02"))
- self.assertTrue(ThreatObj.apply(EE))
+ threat = threats["HA02"]
+ self.assertTrue(threat.apply(EE))
def test_DS03(self):
web = Server("Web Server")
web.isHardened = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DS03"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["DS03"]
+ self.assertTrue(threat.apply(web))
def test_AC06(self):
web = Server("Web Server")
web.isHardened = False
web.hasAccessControl = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC06"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AC06"]
+ self.assertTrue(threat.apply(web))
def test_HA03(self):
web = Server("Web Server")
web.validatesHeaders = False
web.encodesOutput = False
web.isHardened = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "HA03"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["HA03"]
+ self.assertTrue(threat.apply(web))
def test_SC02(self):
web = Server("Web Server")
web.validatesInput = False
web.encodesOutput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "SC02"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["SC02"]
+ self.assertTrue(threat.apply(web))
def test_AC07(self):
web = Server("Web Server")
web.hasAccessControl = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC07"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AC07"]
+ self.assertTrue(threat.apply(web))
def test_INP15(self):
web = Server("Web Server")
web.protocol = 'IMAP'
web.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP15"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP15"]
+ self.assertTrue(threat.apply(web))
def test_HA04(self):
EE = ExternalEntity("ee")
EE.hasPhysicalAccess = True
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "HA04"))
- self.assertTrue(ThreatObj.apply(EE))
+ threat = threats["HA04"]
+ self.assertTrue(threat.apply(EE))
def test_SC03(self):
web = Server("Web Server")
web.validatesInput = False
web.sanitizesInput = False
web.hasAccessControl = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "SC03"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["SC03"]
+ self.assertTrue(threat.apply(web))
def test_INP16(self):
web = Server("Web Server")
web.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP16"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP16"]
+ self.assertTrue(threat.apply(web))
def test_AA02(self):
web = Server("Web Server")
process1 = Process("process")
web.authenticatesSource = False
process1.authenticatesSource = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AA02"))
- self.assertTrue(ThreatObj.apply(web))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["AA02"]
+ self.assertTrue(threat.apply(web))
+ self.assertTrue(threat.apply(process1))
def test_CR04(self):
web = Server("Web Server")
web.usesSessionTokens = True
web.implementsNonce = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR04"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["CR04"]
+ self.assertTrue(threat.apply(web))
def test_DO04(self):
user = Actor("User")
- web = Server("Web Server")
+ web = Server("Web Server")
user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.data = 'XML'
user_to_web.handlesResources = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DO04"))
- self.assertTrue(ThreatObj.apply(user_to_web))
+ threat = threats["DO04"]
+ self.assertTrue(threat.apply(user_to_web))
def test_DS04(self):
web = Server("Web Server")
web.encodesOutput = False
web.validatesInput = False
web.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DS04"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["DS04"]
+ self.assertTrue(threat.apply(web))
def test_SC04(self):
web = Server("Web Server")
web.sanitizesInput = False
web.validatesInput = False
web.encodesOutput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "SC04"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["SC04"]
+ self.assertTrue(threat.apply(web))
def test_CR05(self):
web = Server("Web Server")
@@ -553,173 +702,173 @@ def test_CR05(self):
web.usesEncryptionAlgorithm != 'AES'
db.usesEncryptionAlgorithm != 'RSA'
db.usesEncryptionAlgorithm != 'AES'
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR05"))
- self.assertTrue(ThreatObj.apply(web))
- self.assertTrue(ThreatObj.apply(db))
+ threat = threats["CR05"]
+ self.assertTrue(threat.apply(web))
+ self.assertTrue(threat.apply(db))
def test_AC08(self):
web = Server("Web Server")
web.hasAccessControl = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC08"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AC08"]
+ self.assertTrue(threat.apply(web))
def test_DS05(self):
web = Server("Web Server")
web.usesCache = True
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DS05"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["DS05"]
+ self.assertTrue(threat.apply(web))
def test_SC05(self):
web = Server("Web Server")
web.providesIntegrity = False
web.usesCodeSigning = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "SC05"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["SC05"]
+ self.assertTrue(threat.apply(web))
def test_INP17(self):
web = Server("Web Server")
web.validatesContentType = False
web.invokesScriptFilters = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP17"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP17"]
+ self.assertTrue(threat.apply(web))
def test_AA03(self):
web = Server("Web Server")
web.providesIntegrity = False
web.authenticatesSource = False
web.usesStrongSessionIdentifiers = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AA03"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AA03"]
+ self.assertTrue(threat.apply(web))
def test_AC09(self):
web = Server("Web Server")
web.hasAccessControl = False
web.authorizesSource = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC09"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AC09"]
+ self.assertTrue(threat.apply(web))
def test_INP18(self):
web = Server("Web Server")
web.sanitizesInput = False
web.encodesOutput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP18"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP18"]
+ self.assertTrue(threat.apply(web))
def test_CR06(self):
user = Actor("User")
- web = Server("Web Server")
+ web = Server("Web Server")
user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.protocol = 'HTTP'
user_to_web.usesVPN = False
user_to_web.implementsAuthenticationScheme = False
user_to_web.authorizesSource = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR06"))
- self.assertTrue(ThreatObj.apply(user_to_web))
+ threat = threats["CR06"]
+ self.assertTrue(threat.apply(user_to_web))
def test_AC10(self):
web = Server("Web Server")
web.usesLatestTLSversion = False
web.implementsAuthenticationScheme = False
web.authorizesSource = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC10"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AC10"]
+ self.assertTrue(threat.apply(web))
def test_CR07(self):
user = Actor("User")
- web = Server("Web Server")
+ web = Server("Web Server")
user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.protocol = 'HTTP'
user_to_web.data = 'XML'
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR07"))
- self.assertTrue(ThreatObj.apply(user_to_web))
+ threat = threats["CR07"]
+ self.assertTrue(threat.apply(user_to_web))
def test_AA04(self):
web = Server("Web Server")
web.implementsServerSideValidation = False
web.providesIntegrity = False
web.authorizesSource = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AA04"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AA04"]
+ self.assertTrue(threat.apply(web))
def test_CR08(self):
user = Actor("User")
- web = Server("Web Server")
+ web = Server("Web Server")
user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.protocol = 'HTTP'
user_to_web.usesLatestTLSversion = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR08"))
- self.assertTrue(ThreatObj.apply(user_to_web))
+ threat = threats["CR08"]
+ self.assertTrue(threat.apply(user_to_web))
def test_INP19(self):
web = Server("Web Server")
web.usesXMLParser = False
web.disablesDTD = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP19"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP19"]
+ self.assertTrue(threat.apply(web))
def test_INP20(self):
process1 = Process("process")
process1.disablesiFrames = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP20"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP20"]
+ self.assertTrue(threat.apply(process1))
def test_AC11(self):
web = Server("Web Server")
web.usesStrongSessionIdentifiers = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC11"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AC11"]
+ self.assertTrue(threat.apply(web))
def test_INP21(self):
web = Server("Web Server")
web.usesXMLParser = False
web.disablesDTD = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP21"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP21"]
+ self.assertTrue(threat.apply(web))
def test_INP22(self):
web = Server("Web Server")
web.usesXMLParser = False
web.disablesDTD = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP22"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP22"]
+ self.assertTrue(threat.apply(web))
def test_INP23(self):
process1 = Process("Process")
process1.hasAccessControl = False
process1.sanitizesInput = False
process1.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP23"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP23"]
+ self.assertTrue(threat.apply(process1))
def test_DO05(self):
web = Server("Web Server")
web.validatesInput = False
web.sanitizesInput = False
web.usesXMLParser = True
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DO05"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["DO05"]
+ self.assertTrue(threat.apply(web))
def test_AC12(self):
process1 = Process("Process")
process1.hasAccessControl = False
process1.implementsPOLP = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC12"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["AC12"]
+ self.assertTrue(threat.apply(process1))
def test_AC13(self):
process1 = Process("Process")
process1.hasAccessControl = False
process1.implementsPOLP = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC13"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["AC13"]
+ self.assertTrue(threat.apply(process1))
def test_AC14(self):
process1 = Process("Process")
process1.implementsPOLP = False
process1.usesEnvironmentVariables = False
process1.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC14"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["AC14"]
+ self.assertTrue(threat.apply(process1))
def test_INP24(self):
process1 = Process("Process")
@@ -728,9 +877,9 @@ def test_INP24(self):
process1.validatesInput = False
lambda1.checksInputBounds = False
lambda1.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP24"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
+ threat = threats["INP24"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
def test_INP25(self):
process1 = Process("Process")
@@ -739,9 +888,9 @@ def test_INP25(self):
process1.sanitizesInput = False
lambda1.validatesInput = False
lambda1.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP25"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
+ threat = threats["INP25"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
def test_INP26(self):
process1 = Process("Process")
@@ -750,16 +899,16 @@ def test_INP26(self):
process1.sanitizesInput = False
lambda1.validatesInput = False
lambda1.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP26"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(lambda1))
+ threat = threats["INP26"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(lambda1))
def test_INP27(self):
process1 = Process("Process")
process1.validatesInput = False
process1.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP27"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP27"]
+ self.assertTrue(threat.apply(process1))
def test_INP28(self):
web = Server("Web Server")
@@ -770,9 +919,9 @@ def test_INP28(self):
process1.validatesInput = False
process1.sanitizesInput = False
process1.encodesOutput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP28"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP28"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(web))
def test_INP29(self):
web = Server("Web Server")
@@ -783,153 +932,153 @@ def test_INP29(self):
process1.validatesInput = False
process1.sanitizesInput = False
process1.encodesOutput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP29"))
- self.assertTrue(ThreatObj.apply(process1))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP29"]
+ self.assertTrue(threat.apply(process1))
+ self.assertTrue(threat.apply(web))
def test_INP30(self):
process1 = Process("Process")
process1.validatesInput = False
process1.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP30"))
- self.assertTrue(ThreatObj.apply(process1))
-
+ threat = threats["INP30"]
+ self.assertTrue(threat.apply(process1))
+
def test_INP31(self):
process1 = Process("Process")
process1.validatesInput = False
process1.sanitizesInput = False
process1.usesParameterizedInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP31"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP31"]
+ self.assertTrue(threat.apply(process1))
def test_INP32(self):
process1 = Process("Process")
process1.validatesInput = False
process1.sanitizesInput = False
process1.encodesOutput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP32"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP32"]
+ self.assertTrue(threat.apply(process1))
def test_INP33(self):
process1 = Process("Process")
process1.validatesInput = False
process1.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP33"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP33"]
+ self.assertTrue(threat.apply(process1))
def test_INP34(self):
web = Server("web")
web.checksInputBounds = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP34"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP34"]
+ self.assertTrue(threat.apply(web))
def test_INP35(self):
process1 = Process("Process")
process1.validatesInput = False
process1.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP35"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP35"]
+ self.assertTrue(threat.apply(process1))
def test_DE04(self):
data = Datastore("DB")
data.validatesInput = False
data.implementsPOLP = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DE04"))
- self.assertTrue(ThreatObj.apply(data))
+ threat = threats["DE04"]
+ self.assertTrue(threat.apply(data))
def test_AC15(self):
process1 = Process("Process")
process1.implementsPOLP = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC15"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["AC15"]
+ self.assertTrue(threat.apply(process1))
def test_INP36(self):
web = Server("web")
web.implementsStrictHTTPValidation = False
web.encodesHeaders = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP36"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP36"]
+ self.assertTrue(threat.apply(web))
def test_INP37(self):
web = Server("web")
web.implementsStrictHTTPValidation = False
web.encodesHeaders = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP37"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["INP37"]
+ self.assertTrue(threat.apply(web))
def test_INP38(self):
process1 = Process("Process")
process1.allowsClientSideScripting = True
process1.validatesInput = False
process1.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP38"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP38"]
+ self.assertTrue(threat.apply(process1))
def test_AC16(self):
web = Server("web")
web.usesStrongSessionIdentifiers = False
web.encryptsCookies = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC16"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AC16"]
+ self.assertTrue(threat.apply(web))
def test_INP39(self):
process1 = Process("Process")
process1.allowsClientSideScripting = True
process1.validatesInput = False
process1.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP39"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP39"]
+ self.assertTrue(threat.apply(process1))
def test_INP40(self):
process1 = Process("Process")
process1.allowsClientSideScripting = True
process1.sanitizesInput = False
process1.validatesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP40"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP40"]
+ self.assertTrue(threat.apply(process1))
def test_AC17(self):
web = Server("web")
web.usesStrongSessionIdentifiers = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC17"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AC17"]
+ self.assertTrue(threat.apply(web))
def test_AC18(self):
process1 = Process("Process")
process1.usesStrongSessionIdentifiers = False
process1.encryptsCookies = False
process1.definesConnectionTimeout = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC18"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["AC18"]
+ self.assertTrue(threat.apply(process1))
def test_INP41(self):
process1 = Process("Process")
process1.validatesInput = False
process1.sanitizesInput = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP41"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["INP41"]
+ self.assertTrue(threat.apply(process1))
def test_AC19(self):
web = Server("web")
web.usesSessionTokens = True
web.implementsNonce = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC19"))
- self.assertTrue(ThreatObj.apply(web))
+ threat = threats["AC19"]
+ self.assertTrue(threat.apply(web))
def test_AC20(self):
process1 = Process("Process")
process1.definesConnectionTimeout = False
process1.usesMFA = False
process1.encryptsSessionData = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC20"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["AC20"]
+ self.assertTrue(threat.apply(process1))
def test_AC21(self):
process1 = Process("Process")
process1.implementsCSRFToken = False
process1.verifySessionIdentifiers = False
- ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC21"))
- self.assertTrue(ThreatObj.apply(process1))
+ threat = threats["AC21"]
+ self.assertTrue(threat.apply(process1))
if __name__ == '__main__':
diff --git a/tm.py b/tm.py
index 1c1c4b5..44baa3b 100755
--- a/tm.py
+++ b/tm.py
@@ -7,11 +7,6 @@
# make sure generated diagrams do not change, makes sense if they're commited
random.seed(0)
-tm = TM("my test tm")
-tm.description = "This is a sample threat model of a very simple system - a web-based comment system. The user enters comments and these are added to a database and displayed back to the user. The thought is that it is, though simple, a complete enough example to express meaningful threats."
-tm.isOrdered = True
-tm.mergeResponses = True
-
internet = Boundary("Internet")
server_db = Boundary("Server/DB")
vpc = Boundary("AWS VPC")
@@ -65,6 +60,14 @@
my_lambda_to_db.dstPort = 3306
my_lambda_to_db.data = "Lamda clears DB every 6 hours"
+tm = TM("my test tm")
+tm.description = """This is a sample threat model of a very simple system - a web-based
+comment system. The user enters comments and these are added to a database and displayed
+back to the user. The thought is that it is, though simple, a complete enough example
+to express meaningful threats."""
+tm.isOrdered = True
+tm.mergeResponses = True
+tm.elements = [user_to_web, web_to_db, db_to_web, web_to_user, my_lambda_to_db]
if __name__ == "__main__":
tm.process()