diff --git a/README.md b/README.md index 8e155e1..6e2987e 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,29 @@ Name|From|To |Data|Protocol|Port ``` +To group findings by elements, use a more advanced, nested loop: + +```text +## Findings + +{elements:repeat:{{item.findings:if: +### {{item.name}} + +{{item.findings:repeat: +**Threat**: {{{{item.id}}}} - {{{{item.description}}}} + +**Severity**: {{{{item.severity}}}} + +**Mitigations**: {{{{item.mitigations}}}} + +**References**: {{{{item.references}}}} + +}}}}} +``` + +All items inside a loop must be escaped, doubling the braces, so `{item.name}` becomes `{{item.name}}`. +The example above uses two nested loops, so items in the inner loop must be escaped twice, that's why they're using four braces. + ## Threats database For the security practitioner, you may supply your own threats file by setting `TM.threatsFile`. It should contain entries like: diff --git a/pytm/pytm.py b/pytm/pytm.py index 8e85da0..6b8aa84 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -5,6 +5,7 @@ import random import sys import uuid +import sys from collections import defaultdict from collections.abc import Iterable from hashlib import sha224 @@ -98,6 +99,19 @@ def __set__(self, instance, value): super().__set__(instance, value) +class varFindings(var): + + def __set__(self, instance, value): + for i, e in enumerate(value): + if not isinstance(e, Finding): + raise ValueError( + "expecting a list of Findings, item number {} is a {}".format( + i, type(value) + ) + ) + super().__set__(instance, list(value)) + + def _setColor(element): if element.inScope is True: return "black" @@ -206,22 +220,21 @@ class Threat(): references = varString("") target = () - def __init__(self, json_read): - self.id = json_read['SID'] - self.description = json_read['description'] - self.condition = json_read['condition'] - self.target = json_read['target'] - self.details = json_read['details'] - self.severity = json_read['severity'] - self.mitigations = json_read['mitigations'] - self.example = json_read['example'] - self.references = json_read['references'] - - if not isinstance(self.target, str) and isinstance(self.target, Iterable): - self.target = tuple(self.target) + def __init__(self, **kwargs): + self.id = kwargs['SID'] + self.description = kwargs.get('description', '') + self.condition = kwargs.get('condition', 'True') + target = kwargs.get('target', 'Element') + if not isinstance(target, str) and isinstance(target, Iterable): + target = tuple(target) else: - self.target = (self.target,) - self.target = tuple(getattr(sys.modules[__name__], x) for x in self.target) + target = (target,) + self.target = tuple(getattr(sys.modules[__name__], x) for x in target) + self.details = kwargs.get('details', '') + self.severity = kwargs.get('severity', '') + self.mitigations = kwargs.get('mitigations', '') + self.example = kwargs.get('example', '') + self.references = kwargs.get('references', '') def __repr__(self): return "<{0}.{1}({2}) at {3}>".format( @@ -268,7 +281,6 @@ class TM(): _BagOfFlows = [] _BagOfElements = [] _BagOfThreats = [] - _BagOfFindings = [] _BagOfBoundaries = [] _threatsExcluded = [] _sf = None @@ -279,6 +291,7 @@ class TM(): doc="JSON file with custom threats") isOrdered = varBool(False, doc="Automatically order all Dataflows") mergeResponses = varBool(False, doc="Merge response edges in DFDs") + findings = varFindings([], doc="threats found for elements of this model") def __init__(self, name, **kwargs): for key, value in kwargs.items(): @@ -292,7 +305,6 @@ def reset(cls): cls._BagOfFlows = [] cls._BagOfElements = [] cls._BagOfThreats = [] - cls._BagOfFindings = [] cls._BagOfBoundaries = [] def _init_threats(self): @@ -304,14 +316,23 @@ def _add_threats(self): threats_json = json.load(threat_file) for i in threats_json: - TM._BagOfThreats.append(Threat(i)) + TM._BagOfThreats.append(Threat(**i)) def resolve(self): - for e in (TM._BagOfElements): - if e.inScope is True: - for t in TM._BagOfThreats: - if t.apply(e) is True: - TM._BagOfFindings.append(Finding(e, t.description, t.details, t.severity, t.mitigations, t.example, t.id, t.references)) + findings = [] + elements = defaultdict(list) + for e in TM._BagOfElements: + if not e.inScope: + continue + for t in TM._BagOfThreats: + if not t.apply(e): + continue + f = Finding(e, t.description, t.details, t.severity, t.mitigations, t.example, t.id, t.references) + findings.append(f) + elements[e].append(f) + self.findings = findings + for e, findings in elements.items(): + e.findings = findings def check(self): if self.description is None: @@ -361,7 +382,7 @@ def report(self, *args, **kwargs): with open(self._template) as file: template = file.read() - print(self._sf.format(template, tm=self, dataflows=self._BagOfFlows, threats=self._BagOfThreats, findings=self._BagOfFindings, elements=self._BagOfElements, boundaries=self._BagOfBoundaries)) + print(self._sf.format(template, tm=self, dataflows=self._BagOfFlows, threats=self._BagOfThreats, findings=self.findings, elements=self._BagOfElements, boundaries=self._BagOfBoundaries)) def process(self): self.check() @@ -400,6 +421,7 @@ class Element(): definesConnectionTimeout = varBool(False) OS = varString("") isAdmin = varBool(False) + findings = varFindings([]) def __init__(self, name, **kwargs): for key, value in kwargs.items(): diff --git a/tests/test_private_func.py b/tests/test_private_func.py index eb973c4..1aabfeb 100644 --- a/tests/test_private_func.py +++ b/tests/test_private_func.py @@ -146,17 +146,7 @@ def test_defaults(self): server_query = Dataflow(server, db, "server query") func_query = Dataflow(func, db, "func query") - default = { - "SID": "", - "description": "", - "condition": "", - "target": ["Actor", "Boundary", "Dataflow", "Datastore", "Server"], - "details": "", - "severity": "", - "mitigations": "", - "example": "", - "references": "", - } + default_target = ["Actor", "Boundary", "Dataflow", "Datastore", "Server"] testCases = [ {"target": server, "condition": "target.oneOf(Server, Datastore)"}, {"target": server, "condition": "not target.oneOf(Actor, Dataflow)"}, @@ -173,7 +163,7 @@ def test_defaults(self): {"target": user, "condition": "target.inside(Boundary)"}, ] for case in testCases: - t = Threat({**default, **{"condition": case["condition"]}}) + t = Threat(SID="", target=default_target, condition=case["condition"]) self.assertTrue( t.apply(case["target"]), "Failed to match {} against {}".format( diff --git a/tests/test_pytmfunc.py b/tests/test_pytmfunc.py index 36ca112..8f4b1d5 100644 --- a/tests/test_pytmfunc.py +++ b/tests/test_pytmfunc.py @@ -14,7 +14,7 @@ with open(os.path.abspath(os.path.join(dirname(__file__), '..')) + "/pytm/threatlib/threats.json", "r") as threat_file: - threats_json = json.load(threat_file) + threats = {t["SID"]: Threat(**t) for t in json.load(threat_file)} @contextmanager @@ -83,6 +83,39 @@ def test_dfd(self): self.maxDiff = None self.assertEqual(output, expected) + def test_resolve(self): + random.seed(0) + + TM.reset() + tm = TM("my test tm", description="aaa") + internet = Boundary("Internet") + server_db = Boundary("Server/DB") + user = Actor("User", inBoundary=internet, inScope=False) + web = Server("Web Server") + db = Datastore("SQL Database", inBoundary=server_db) + + req = Dataflow(user, web, "User enters comments (*)") + query = Dataflow(web, db, "Insert query with comments") + results = Dataflow(db, web, "Retrieve comments") + resp = Dataflow(web, user, "Show comments (*)") + + TM._BagOfThreats = [ + Threat(SID=klass, target=klass) + for klass in ["Actor", "Server", "Datastore", "Dataflow"] + ] + tm.resolve() + + self.maxDiff = None + self.assertListEqual([f.id for f in tm.findings], ['Server', 'Datastore', 'Dataflow', 'Dataflow', 'Dataflow', 'Dataflow']) + self.assertListEqual([f.id for f in user.findings], []) + self.assertListEqual([f.id for f in web.findings], ["Server"]) + self.assertListEqual([f.id for f in db.findings], ["Datastore"]) + self.assertListEqual([f.id for f in req.findings], ["Dataflow"]) + self.assertListEqual([f.id for f in query.findings], ["Dataflow"]) + self.assertListEqual([f.id for f in results.findings], ["Dataflow"]) + self.assertListEqual([f.id for f in resp.findings], ["Dataflow"]) + + class Testpytm(unittest.TestCase): # Test for all the threats in threats.py - test Threat.apply() function @@ -96,22 +129,22 @@ def test_INP01(self): process1.usesEnvironmentVariables = True process1.sanitizesInput = False process1.checksInputBounds = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP01")) - self.assertTrue(ThreatObj.apply(lambda1)) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP01"] + self.assertTrue(threat.apply(lambda1)) + self.assertTrue(threat.apply(process1)) def test_INP02(self): process1 = Process('myprocess') process1.checksInputBounds = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP02")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP02"] + self.assertTrue(threat.apply(process1)) def test_INP03(self): web = Server('Web') web.sanitizesInput = False web.encodesOutput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP03")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP03"] + self.assertTrue(threat.apply(web)) def test_CR01(self): user = Actor("User") @@ -123,17 +156,17 @@ def test_CR01(self): user_to_web.protocol = 'HTTP' user_to_web.usesVPN = False user_to_web.usesSessionTokens = True - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR01")) - self.assertTrue(ThreatObj.apply(web)) - self.assertTrue(ThreatObj.apply(user_to_web)) + threat = threats["CR01"] + self.assertTrue(threat.apply(web)) + self.assertTrue(threat.apply(user_to_web)) def test_INP04(self): web = Server("Web Server") web.validatesInput = False web.validatesHeaders = False web.protocol = 'HTTP' - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP04")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP04"] + self.assertTrue(threat.apply(web)) def test_CR02(self): user = Actor("User") @@ -147,30 +180,30 @@ def test_CR02(self): user_to_web.sanitizesInput = False user_to_web.validatesInput = False user_to_web.usesSessionTokens = True - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR02")) - self.assertTrue(ThreatObj.apply(web)) - self.assertTrue(ThreatObj.apply(user_to_web)) + threat = threats["CR02"] + self.assertTrue(threat.apply(web)) + self.assertTrue(threat.apply(user_to_web)) def test_INP05(self): web = Server("Web Server") web.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP05")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP05"] + self.assertTrue(threat.apply(web)) def test_INP06(self): web = Server("Web Server") web.protocol = 'SOAP' web.sanitizesInput = False web.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP06")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP06"] + self.assertTrue(threat.apply(web)) def test_SC01(self): process1 = Process("Process1") process1.implementsNonce = False process1.data = 'JSON' - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "SC01")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["SC01"] + self.assertTrue(threat.apply(process1)) def test_LB01(self): process1 = Process("Process1") @@ -181,26 +214,26 @@ def test_LB01(self): lambda1.implementsAPI = True lambda1.validatesInput = False lambda1.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "LB01")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) + threat = threats["LB01"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) def test_AA01(self): process1 = Process("Process1") web = Server("Web Server") process1.authenticatesSource = False web.authenticatesSource = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AA01")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AA01"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(web)) def test_DS01(self): web = Server("Web Server") web.sanitizesInput = False web.validatesInput = False web.encodesOutput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DS01")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["DS01"] + self.assertTrue(threat.apply(web)) def test_DE01(self): user = Actor("User") @@ -208,8 +241,8 @@ def test_DE01(self): user_to_web = Dataflow(user, web, "User enters comments (*)") user_to_web.protocol = 'HTTP' user_to_web.isEncrypted = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DE01")) - self.assertTrue(ThreatObj.apply(user_to_web)) + threat = threats["DE01"] + self.assertTrue(threat.apply(user_to_web)) def test_DE02(self): web = Server("Web Server") @@ -218,18 +251,18 @@ def test_DE02(self): web.sanitizesInput = False process1.validatesInput = False process1.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DE02")) - self.assertTrue(ThreatObj.apply(web)) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["DE02"] + self.assertTrue(threat.apply(web)) + self.assertTrue(threat.apply(process1)) def test_API01(self): process1 = Process("Process1") lambda1 = Lambda("Lambda1") process1.implementsAPI = True lambda1.implementsAPI = True - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "API01")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) + threat = threats["API01"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) def test_AC01(self): web = Server("Web Server") @@ -241,22 +274,22 @@ def test_AC01(self): process1.authorizesSource = False db.hasAccessControl = False db.authorizesSource = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC01")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(web)) - self.assertTrue(ThreatObj.apply(db)) + threat = threats["AC01"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(web)) + self.assertTrue(threat.apply(db)) def test_INP07(self): process1 = Process("Process1") process1.usesSecureFunctions = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP07")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP07"] + self.assertTrue(threat.apply(process1)) def test_AC02(self): db = Datastore("DB") db.isShared = True - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC02")) - self.assertTrue(ThreatObj.apply(db)) + threat = threats["AC02"] + self.assertTrue(threat.apply(db)) def test_DO01(self): process1 = Process("Process1") @@ -264,16 +297,16 @@ def test_DO01(self): process1.handlesResourceConsumption = False process1.isResilient = False web.handlesResourceConsumption = True - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DO01")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["DO01"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(web)) def test_HA01(self): web = Server("Web Server") web.validatesInput = False web.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "HA01")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["HA01"] + self.assertTrue(threat.apply(web)) def test_AC03(self): process1 = Process("Process1") @@ -286,9 +319,9 @@ def test_AC03(self): lambda1.implementsAuthenticationScheme = False lambda1.validatesInput = False lambda1.authorizesSource = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC03")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) + threat = threats["AC03"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) def test_DO02(self): process1 = Process("Process1") @@ -299,20 +332,20 @@ def test_DO02(self): lambda1.handlesResourceConsumption = False web.handlesResourceConsumption = False db.handlesResourceConsumption = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DO02")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) - self.assertTrue(ThreatObj.apply(web)) - self.assertTrue(ThreatObj.apply(db)) + threat = threats["DO02"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) + self.assertTrue(threat.apply(web)) + self.assertTrue(threat.apply(db)) def test_DS02(self): process1 = Process("Process1") lambda1 = Lambda("Lambda1") process1.environment = 'Production' lambda1.environment = 'Production' - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DS02")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) + threat = threats["DS02"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) def test_INP08(self): process1 = Process("Process1") @@ -324,29 +357,29 @@ def test_INP08(self): lambda1.sanitizesInput = False web.validatesInput = False web.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP08")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP08"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) + self.assertTrue(threat.apply(web)) def test_INP09(self): web = Server("Web Server") web.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP09")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP09"] + self.assertTrue(threat.apply(web)) def test_INP10(self): web = Server("Web Server") web.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP10")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP10"] + self.assertTrue(threat.apply(web)) def test_INP11(self): web = Server("Web Server") web.validatesInput = False web.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP11")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP11"] + self.assertTrue(threat.apply(web)) def test_INP12(self): process1 = Process("Process1") @@ -355,9 +388,9 @@ def test_INP12(self): process1.validatesInput = False lambda1.checksInputBounds = False lambda1.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP12")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) + threat = threats["INP12"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) def test_AC04(self): user = Actor("User") @@ -365,16 +398,16 @@ def test_AC04(self): user_to_web = Dataflow(user, web, "User enters comments (*)") user_to_web.data = 'XML' user_to_web.authorizesSource = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC04")) - self.assertTrue(ThreatObj.apply(user_to_web)) + threat = threats["AC04"] + self.assertTrue(threat.apply(user_to_web)) def test_DO03(self): user = Actor("User") web = Server("Web Server") user_to_web = Dataflow(user, web, "User enters comments (*)") user_to_web.data = 'XML' - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DO03")) - self.assertTrue(ThreatObj.apply(user_to_web)) + threat = threats["DO03"] + self.assertTrue(threat.apply(user_to_web)) def test_AC05(self): process1 = Process("Process1") @@ -383,18 +416,18 @@ def test_AC05(self): process1.authorizesSource = False web.providesIntegrity = False web.authorizesSource = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC05")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AC05"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(web)) def test_INP13(self): process1 = Process("Process1") lambda1 = Lambda("Lambda1") process1.validatesInput = False lambda1.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP13")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) + threat = threats["INP13"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) def test_INP14(self): process1 = Process("Process1") @@ -403,10 +436,10 @@ def test_INP14(self): process1.validatesInput = False lambda1.validatesInput = False web.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP14")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP14"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) + self.assertTrue(threat.apply(web)) def test_DE03(self): user = Actor("User") @@ -415,17 +448,17 @@ def test_DE03(self): user_to_web.protocol = 'HTTP' user_to_web.isEncrypted = False user_to_web.usesVPN = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DE03")) - self.assertTrue(ThreatObj.apply(user_to_web)) + threat = threats["DE03"] + self.assertTrue(threat.apply(user_to_web)) def test_CR03(self): process1 = Process("Process1") web = Server("Web Server") process1.implementsAuthenticationScheme = False web.implementsAuthenticationScheme = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR03")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["CR03"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(web)) def test_API02(self): process1 = Process("Process1") @@ -434,92 +467,92 @@ def test_API02(self): process1.validatesInput = False lambda1.implementsAPI = True lambda1.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "API02")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) + threat = threats["API02"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) def test_HA02(self): EE = ExternalEntity("EE") EE.hasPhysicalAccess = True - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "HA02")) - self.assertTrue(ThreatObj.apply(EE)) + threat = threats["HA02"] + self.assertTrue(threat.apply(EE)) def test_DS03(self): web = Server("Web Server") web.isHardened = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DS03")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["DS03"] + self.assertTrue(threat.apply(web)) def test_AC06(self): web = Server("Web Server") web.isHardened = False web.hasAccessControl = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC06")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AC06"] + self.assertTrue(threat.apply(web)) def test_HA03(self): web = Server("Web Server") web.validatesHeaders = False web.encodesOutput = False web.isHardened = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "HA03")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["HA03"] + self.assertTrue(threat.apply(web)) def test_SC02(self): web = Server("Web Server") web.validatesInput = False web.encodesOutput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "SC02")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["SC02"] + self.assertTrue(threat.apply(web)) def test_AC07(self): web = Server("Web Server") web.hasAccessControl = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC07")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AC07"] + self.assertTrue(threat.apply(web)) def test_INP15(self): web = Server("Web Server") web.protocol = 'IMAP' web.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP15")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP15"] + self.assertTrue(threat.apply(web)) def test_HA04(self): EE = ExternalEntity("ee") EE.hasPhysicalAccess = True - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "HA04")) - self.assertTrue(ThreatObj.apply(EE)) + threat = threats["HA04"] + self.assertTrue(threat.apply(EE)) def test_SC03(self): web = Server("Web Server") web.validatesInput = False web.sanitizesInput = False web.hasAccessControl = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "SC03")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["SC03"] + self.assertTrue(threat.apply(web)) def test_INP16(self): web = Server("Web Server") web.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP16")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP16"] + self.assertTrue(threat.apply(web)) def test_AA02(self): web = Server("Web Server") process1 = Process("process") web.authenticatesSource = False process1.authenticatesSource = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AA02")) - self.assertTrue(ThreatObj.apply(web)) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["AA02"] + self.assertTrue(threat.apply(web)) + self.assertTrue(threat.apply(process1)) def test_CR04(self): web = Server("Web Server") web.usesSessionTokens = True web.implementsNonce = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR04")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["CR04"] + self.assertTrue(threat.apply(web)) def test_DO04(self): user = Actor("User") @@ -527,24 +560,24 @@ def test_DO04(self): user_to_web = Dataflow(user, web, "User enters comments (*)") user_to_web.data = 'XML' user_to_web.handlesResources = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DO04")) - self.assertTrue(ThreatObj.apply(user_to_web)) + threat = threats["DO04"] + self.assertTrue(threat.apply(user_to_web)) def test_DS04(self): web = Server("Web Server") web.encodesOutput = False web.validatesInput = False web.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DS04")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["DS04"] + self.assertTrue(threat.apply(web)) def test_SC04(self): web = Server("Web Server") web.sanitizesInput = False web.validatesInput = False web.encodesOutput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "SC04")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["SC04"] + self.assertTrue(threat.apply(web)) def test_CR05(self): web = Server("Web Server") @@ -553,57 +586,57 @@ def test_CR05(self): web.usesEncryptionAlgorithm != 'AES' db.usesEncryptionAlgorithm != 'RSA' db.usesEncryptionAlgorithm != 'AES' - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR05")) - self.assertTrue(ThreatObj.apply(web)) - self.assertTrue(ThreatObj.apply(db)) + threat = threats["CR05"] + self.assertTrue(threat.apply(web)) + self.assertTrue(threat.apply(db)) def test_AC08(self): web = Server("Web Server") web.hasAccessControl = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC08")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AC08"] + self.assertTrue(threat.apply(web)) def test_DS05(self): web = Server("Web Server") web.usesCache = True - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DS05")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["DS05"] + self.assertTrue(threat.apply(web)) def test_SC05(self): web = Server("Web Server") web.providesIntegrity = False web.usesCodeSigning = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "SC05")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["SC05"] + self.assertTrue(threat.apply(web)) def test_INP17(self): web = Server("Web Server") web.validatesContentType = False web.invokesScriptFilters = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP17")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP17"] + self.assertTrue(threat.apply(web)) def test_AA03(self): web = Server("Web Server") web.providesIntegrity = False web.authenticatesSource = False web.usesStrongSessionIdentifiers = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AA03")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AA03"] + self.assertTrue(threat.apply(web)) def test_AC09(self): web = Server("Web Server") web.hasAccessControl = False web.authorizesSource = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC09")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AC09"] + self.assertTrue(threat.apply(web)) def test_INP18(self): web = Server("Web Server") web.sanitizesInput = False web.encodesOutput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP18")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP18"] + self.assertTrue(threat.apply(web)) def test_CR06(self): user = Actor("User") @@ -613,16 +646,16 @@ def test_CR06(self): user_to_web.usesVPN = False user_to_web.implementsAuthenticationScheme = False user_to_web.authorizesSource = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR06")) - self.assertTrue(ThreatObj.apply(user_to_web)) + threat = threats["CR06"] + self.assertTrue(threat.apply(user_to_web)) def test_AC10(self): web = Server("Web Server") web.usesLatestTLSversion = False web.implementsAuthenticationScheme = False web.authorizesSource = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC10")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AC10"] + self.assertTrue(threat.apply(web)) def test_CR07(self): user = Actor("User") @@ -630,16 +663,16 @@ def test_CR07(self): user_to_web = Dataflow(user, web, "User enters comments (*)") user_to_web.protocol = 'HTTP' user_to_web.data = 'XML' - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR07")) - self.assertTrue(ThreatObj.apply(user_to_web)) + threat = threats["CR07"] + self.assertTrue(threat.apply(user_to_web)) def test_AA04(self): web = Server("Web Server") web.implementsServerSideValidation = False web.providesIntegrity = False web.authorizesSource = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AA04")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AA04"] + self.assertTrue(threat.apply(web)) def test_CR08(self): user = Actor("User") @@ -647,79 +680,79 @@ def test_CR08(self): user_to_web = Dataflow(user, web, "User enters comments (*)") user_to_web.protocol = 'HTTP' user_to_web.usesLatestTLSversion = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "CR08")) - self.assertTrue(ThreatObj.apply(user_to_web)) + threat = threats["CR08"] + self.assertTrue(threat.apply(user_to_web)) def test_INP19(self): web = Server("Web Server") web.usesXMLParser = False web.disablesDTD = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP19")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP19"] + self.assertTrue(threat.apply(web)) def test_INP20(self): process1 = Process("process") process1.disablesiFrames = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP20")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP20"] + self.assertTrue(threat.apply(process1)) def test_AC11(self): web = Server("Web Server") web.usesStrongSessionIdentifiers = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC11")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AC11"] + self.assertTrue(threat.apply(web)) def test_INP21(self): web = Server("Web Server") web.usesXMLParser = False web.disablesDTD = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP21")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP21"] + self.assertTrue(threat.apply(web)) def test_INP22(self): web = Server("Web Server") web.usesXMLParser = False web.disablesDTD = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP22")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP22"] + self.assertTrue(threat.apply(web)) def test_INP23(self): process1 = Process("Process") process1.hasAccessControl = False process1.sanitizesInput = False process1.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP23")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP23"] + self.assertTrue(threat.apply(process1)) def test_DO05(self): web = Server("Web Server") web.validatesInput = False web.sanitizesInput = False web.usesXMLParser = True - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DO05")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["DO05"] + self.assertTrue(threat.apply(web)) def test_AC12(self): process1 = Process("Process") process1.hasAccessControl = False process1.implementsPOLP = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC12")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["AC12"] + self.assertTrue(threat.apply(process1)) def test_AC13(self): process1 = Process("Process") process1.hasAccessControl = False process1.implementsPOLP = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC13")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["AC13"] + self.assertTrue(threat.apply(process1)) def test_AC14(self): process1 = Process("Process") process1.implementsPOLP = False process1.usesEnvironmentVariables = False process1.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC14")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["AC14"] + self.assertTrue(threat.apply(process1)) def test_INP24(self): process1 = Process("Process") @@ -728,9 +761,9 @@ def test_INP24(self): process1.validatesInput = False lambda1.checksInputBounds = False lambda1.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP24")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) + threat = threats["INP24"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) def test_INP25(self): process1 = Process("Process") @@ -739,9 +772,9 @@ def test_INP25(self): process1.sanitizesInput = False lambda1.validatesInput = False lambda1.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP25")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) + threat = threats["INP25"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) def test_INP26(self): process1 = Process("Process") @@ -750,16 +783,16 @@ def test_INP26(self): process1.sanitizesInput = False lambda1.validatesInput = False lambda1.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP26")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(lambda1)) + threat = threats["INP26"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(lambda1)) def test_INP27(self): process1 = Process("Process") process1.validatesInput = False process1.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP27")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP27"] + self.assertTrue(threat.apply(process1)) def test_INP28(self): web = Server("Web Server") @@ -770,9 +803,9 @@ def test_INP28(self): process1.validatesInput = False process1.sanitizesInput = False process1.encodesOutput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP28")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP28"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(web)) def test_INP29(self): web = Server("Web Server") @@ -783,153 +816,153 @@ def test_INP29(self): process1.validatesInput = False process1.sanitizesInput = False process1.encodesOutput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP29")) - self.assertTrue(ThreatObj.apply(process1)) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP29"] + self.assertTrue(threat.apply(process1)) + self.assertTrue(threat.apply(web)) def test_INP30(self): process1 = Process("Process") process1.validatesInput = False process1.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP30")) - self.assertTrue(ThreatObj.apply(process1)) - + threat = threats["INP30"] + self.assertTrue(threat.apply(process1)) + def test_INP31(self): process1 = Process("Process") process1.validatesInput = False process1.sanitizesInput = False process1.usesParameterizedInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP31")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP31"] + self.assertTrue(threat.apply(process1)) def test_INP32(self): process1 = Process("Process") process1.validatesInput = False process1.sanitizesInput = False process1.encodesOutput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP32")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP32"] + self.assertTrue(threat.apply(process1)) def test_INP33(self): process1 = Process("Process") process1.validatesInput = False process1.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP33")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP33"] + self.assertTrue(threat.apply(process1)) def test_INP34(self): web = Server("web") web.checksInputBounds = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP34")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP34"] + self.assertTrue(threat.apply(web)) def test_INP35(self): process1 = Process("Process") process1.validatesInput = False process1.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP35")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP35"] + self.assertTrue(threat.apply(process1)) def test_DE04(self): data = Datastore("DB") data.validatesInput = False data.implementsPOLP = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "DE04")) - self.assertTrue(ThreatObj.apply(data)) + threat = threats["DE04"] + self.assertTrue(threat.apply(data)) def test_AC15(self): process1 = Process("Process") process1.implementsPOLP = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC15")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["AC15"] + self.assertTrue(threat.apply(process1)) def test_INP36(self): web = Server("web") web.implementsStrictHTTPValidation = False web.encodesHeaders = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP36")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP36"] + self.assertTrue(threat.apply(web)) def test_INP37(self): web = Server("web") web.implementsStrictHTTPValidation = False web.encodesHeaders = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP37")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["INP37"] + self.assertTrue(threat.apply(web)) def test_INP38(self): process1 = Process("Process") process1.allowsClientSideScripting = True process1.validatesInput = False process1.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP38")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP38"] + self.assertTrue(threat.apply(process1)) def test_AC16(self): web = Server("web") web.usesStrongSessionIdentifiers = False web.encryptsCookies = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC16")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AC16"] + self.assertTrue(threat.apply(web)) def test_INP39(self): process1 = Process("Process") process1.allowsClientSideScripting = True process1.validatesInput = False process1.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP39")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP39"] + self.assertTrue(threat.apply(process1)) def test_INP40(self): process1 = Process("Process") process1.allowsClientSideScripting = True process1.sanitizesInput = False process1.validatesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP40")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP40"] + self.assertTrue(threat.apply(process1)) def test_AC17(self): web = Server("web") web.usesStrongSessionIdentifiers = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC17")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AC17"] + self.assertTrue(threat.apply(web)) def test_AC18(self): process1 = Process("Process") process1.usesStrongSessionIdentifiers = False process1.encryptsCookies = False process1.definesConnectionTimeout = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC18")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["AC18"] + self.assertTrue(threat.apply(process1)) def test_INP41(self): process1 = Process("Process") process1.validatesInput = False process1.sanitizesInput = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "INP41")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["INP41"] + self.assertTrue(threat.apply(process1)) def test_AC19(self): web = Server("web") web.usesSessionTokens = True web.implementsNonce = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC19")) - self.assertTrue(ThreatObj.apply(web)) + threat = threats["AC19"] + self.assertTrue(threat.apply(web)) def test_AC20(self): process1 = Process("Process") process1.definesConnectionTimeout = False process1.usesMFA = False process1.encryptsSessionData = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC20")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["AC20"] + self.assertTrue(threat.apply(process1)) def test_AC21(self): process1 = Process("Process") process1.implementsCSRFToken = False process1.verifySessionIdentifiers = False - ThreatObj = Threat(next(item for item in threats_json if item["SID"] == "AC21")) - self.assertTrue(ThreatObj.apply(process1)) + threat = threats["AC21"] + self.assertTrue(threat.apply(process1)) if __name__ == '__main__':