diff --git a/README.md b/README.md index 3deb156..e674bf9 100644 --- a/README.md +++ b/README.md @@ -64,53 +64,54 @@ that periodically cleans the Database. from pytm.pytm import TM, Server, Datastore, Dataflow, Boundary, Actor, Lambda -User_Web = Boundary("User/Web") -Web_DB = Boundary("Web/DB") +tm = TM("my test tm") +tm.description = "another test tm" +tm.isOrdered = True -user = Actor("User") -user.inBoundary = User_Web +with tm.build(): + User_Web = Boundary("User/Web") + Web_DB = Boundary("Web/DB") -web = Server("Web Server") -web.OS = "CloudOS" -web.isHardened = True + user = Actor("User") + user.inBoundary = User_Web -db = Datastore("SQL Database (*)") -db.OS = "CentOS" -db.isHardened = False -db.inBoundary = Web_DB -db.isSql = True -db.inScope = False + web = Server("Web Server") + web.OS = "CloudOS" + web.isHardened = True -my_lambda = Lambda("cleanDBevery6hours") -my_lambda.hasAccessControl = True -my_lambda.inBoundary = Web_DB + db = Datastore("SQL Database (*)") + db.OS = "CentOS" + db.isHardened = False + db.inBoundary = Web_DB + db.isSql = True + db.inScope = False -my_lambda_to_db = Dataflow(my_lambda, db, "(λ)Periodically cleans DB") -my_lambda_to_db.protocol = "SQL" -my_lambda_to_db.dstPort = 3306 + my_lambda = Lambda("cleanDBevery6hours") + my_lambda.hasAccessControl = True + my_lambda.inBoundary = Web_DB -user_to_web = Dataflow(user, web, "User enters comments (*)") -user_to_web.protocol = "HTTP" -user_to_web.dstPort = 80 -user_to_web.data = 'Comments in HTML or Markdown' + my_lambda_to_db = Dataflow(my_lambda, db, "(λ)Periodically cleans DB") + my_lambda_to_db.protocol = "SQL" + my_lambda_to_db.dstPort = 3306 -web_to_user = Dataflow(web, user, "Comments saved (*)") -web_to_user.protocol = "HTTP" -web_to_user.data = 'Ack of saving or error message, in JSON' + user_to_web = Dataflow(user, web, "User enters comments (*)") + user_to_web.protocol = "HTTP" + user_to_web.dstPort = 80 + user_to_web.data = 'Comments in HTML or Markdown' -web_to_db = Dataflow(web, db, "Insert query with comments") -web_to_db.protocol = "MySQL" -web_to_db.dstPort = 3306 -web_to_db.data = 'MySQL insert statement, all literals' + web_to_user = Dataflow(web, user, "Comments saved (*)") + web_to_user.protocol = "HTTP" + web_to_user.data = 'Ack of saving or error message, in JSON' -db_to_web = Dataflow(db, web, "Comments contents") -db_to_web.protocol = "MySQL" -db_to_web.data = 'Results of insert op' + web_to_db = Dataflow(web, db, "Insert query with comments") + web_to_db.protocol = "MySQL" + web_to_db.dstPort = 3306 + web_to_db.data = 'MySQL insert statement, all literals' + + db_to_web = Dataflow(db, web, "Comments contents") + db_to_web.protocol = "MySQL" + db_to_web.data = 'Results of insert op' -tm = TM("my test tm") -tm.description = "another test tm" -tm.isOrdered = True -tm.elements = [my_lambda_to_db, user_to_web, web_to_user, web_to_db, db_to_web] tm.process() ``` diff --git a/pytm/pytm.py b/pytm/pytm.py index 5124f5e..6d84a37 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -8,6 +8,7 @@ import uuid from collections import defaultdict from collections.abc import Iterable +from contextlib import contextmanager from enum import Enum from hashlib import sha224 from itertools import combinations @@ -373,6 +374,7 @@ def __str__(self): class TM(): """Describes the threat model administratively, and holds all details during a run""" + _contexts = [] _sf = None _duplicate_ignored_attrs = "name", "note", "order", "response", "responseTo" name = varString("", required=True, doc="Model name") @@ -513,7 +515,7 @@ def _check_duplicates(self, flows): raise ValueError( "Duplicate Dataflow found between {} and {}: " - "{} is same as {}".format(left.source, left.sink, left, right,) + "{} is same as {}".format(left.source, left.sink, left, right) ) def dfd(self): @@ -586,6 +588,15 @@ def process(self): [print("{} - {}".format(t.id, t.description)) for t in self._threats] sys.exit(0) + @contextmanager + def build(self): + c = [] + TM._contexts.append(c) + try: + yield c + finally: + self.elements = TM._contexts.pop() + class Element(): """A generic element""" @@ -979,6 +990,8 @@ def __init__(self, source, sink, name, **kwargs): self.source = source self.sink = sink super().__init__(name, **kwargs) + for c in TM._contexts: + c.append(self) def __set__(self, instance, value): print("Should not have gotten here.") @@ -1009,11 +1022,16 @@ def dfd(self, mergeResponses=False, **kwargs): class Boundary(Element): """Trust boundary""" - elements = varElements([]) + elements = varElements([], onSet=lambda i, v: i._init_elements()) def __init__(self, name, **kwargs): super().__init__(name, **kwargs) + def _init_elements(self): + for e in self.elements: + if e.inBoundary != self: + e.inBoundary = self + def dfd(self): if self._is_drawn: return diff --git a/tests/test_pytmfunc.py b/tests/test_pytmfunc.py index 0d960ca..65e8320 100644 --- a/tests/test_pytmfunc.py +++ b/tests/test_pytmfunc.py @@ -360,6 +360,76 @@ def test_eager_init(self): self.assertEqual(len(tm._boundaries), 3) self.assertEqual(len(tm._flows), 5) + def test_context_init(self): + tm = TM("my test tm", description="desc", isOrdered=True) + + user = Actor("User", protocol="HTTP") + web = Server( + "Web Server", + OS="Ubuntu", + protocol="HTTP", + dstPort=80, + isHardened=True, + sanitizesInput=False, + encodesOutput=True, + authorizesSource=False, + ) + db = Datastore( + "SQL Database", + OS="CentOS", + protocol="MySQL", + dstPort=3306, + isHardened=False, + isSQL=True, + ) + my_lambda = Lambda("AWS Lambda", hasAccessControl=True) + aws = Boundary("AWS VPC", elements=[my_lambda]) + internet = Boundary("Internet", elements=[user]) + backend = Boundary("Server/DB", elements=[web, db]) + + with tm.build(): + req = Dataflow( + user, + web, + "User enters comments (*)", + data="Comments in HTML or Markdown", + note="This is a simple web app.", + ) + insert = Dataflow( + web, + db, + "Insert query with comments", + data="MySQL insert statement, all literals", + note="Web server inserts user comments.", + ) + query = Dataflow( + db, + web, + "Retrieve comments", + protocol="MySQL", + data="Web server retrieves comments from DB", + ) + resp = Dataflow( + web, + user, + "Show comments (*)", + data="Web server shows comments to the end user", + ) + job = Dataflow( + my_lambda, + db, + "Lambda periodically cleans DB", + data="Lamda clears DB every 6 hours", + ) + + assets = [user, web, db, my_lambda] + boundaries = [aws, internet, backend] + elements = [req, insert, query, resp, job] + self.assertTrue(tm.check()) + self.assertListEqual(tm._elements, assets + boundaries + elements) + self.assertEqual(len(tm._boundaries), 3) + self.assertEqual(len(tm._flows), 5) + class TestThreats(unittest.TestCase): # Test for all the threats in threats.py - test Threat.apply() function diff --git a/tm.py b/tm.py index 44baa3b..e574e85 100755 --- a/tm.py +++ b/tm.py @@ -7,67 +7,67 @@ # make sure generated diagrams do not change, makes sense if they're commited random.seed(0) -internet = Boundary("Internet") -server_db = Boundary("Server/DB") -vpc = Boundary("AWS VPC") +tm = TM("my test tm") +tm.description = """This is a sample threat model of a very simple system - a web-based +comment system. The user enters comments and these are added to a database and displayed +back to the user. The thought is that it is, though simple, a complete enough example +to express meaningful threats.""" +tm.isOrdered = True +tm.mergeResponses = True -user = Actor("User") -user.inBoundary = internet +with tm.build(): + internet = Boundary("Internet") + server_db = Boundary("Server/DB") + vpc = Boundary("AWS VPC") -web = Server("Web Server") -web.OS = "Ubuntu" -web.isHardened = True -web.sanitizesInput = False -web.encodesOutput = True -web.authorizesSource = False + user = Actor("User") + user.inBoundary = internet -db = Datastore("SQL Database") -db.OS = "CentOS" -db.isHardened = False -db.inBoundary = server_db -db.isSQL = True -db.inScope = True + web = Server("Web Server") + web.OS = "Ubuntu" + web.isHardened = True + web.sanitizesInput = False + web.encodesOutput = True + web.authorizesSource = False -my_lambda = Lambda("AWS Lambda") -my_lambda.hasAccessControl = True -my_lambda.inBoundary = vpc + db = Datastore("SQL Database") + db.OS = "CentOS" + db.isHardened = False + db.inBoundary = server_db + db.isSQL = True + db.inScope = True -user_to_web = Dataflow(user, web, "User enters comments (*)") -user_to_web.protocol = "HTTP" -user_to_web.dstPort = 80 -user_to_web.data = 'Comments in HTML or Markdown' -user_to_web.note = "This is a simple web app\nthat stores and retrieves user comments." + my_lambda = Lambda("AWS Lambda") + my_lambda.hasAccessControl = True + my_lambda.inBoundary = vpc -web_to_db = Dataflow(web, db, "Insert query with comments") -web_to_db.protocol = "MySQL" -web_to_db.dstPort = 3306 -web_to_db.data = 'MySQL insert statement, all literals' -web_to_db.note = "Web server inserts user comments\ninto it's SQL query and stores them in the DB." + user_to_web = Dataflow(user, web, "User enters comments (*)") + user_to_web.protocol = "HTTP" + user_to_web.dstPort = 80 + user_to_web.data = 'Comments in HTML or Markdown' + user_to_web.note = "This is a simple web app\nthat stores and retrieves user comments." -db_to_web = Dataflow(db, web, "Retrieve comments") -db_to_web.protocol = "MySQL" -db_to_web.dstPort = 80 -db_to_web.data = 'Web server retrieves comments from DB' -db_to_web.responseTo = web_to_db + web_to_db = Dataflow(web, db, "Insert query with comments") + web_to_db.protocol = "MySQL" + web_to_db.dstPort = 3306 + web_to_db.data = 'MySQL insert statement, all literals' + web_to_db.note = "Web server inserts user comments\ninto it's SQL query and stores them in the DB." -web_to_user = Dataflow(web, user, "Show comments (*)") -web_to_user.protocol = "HTTP" -web_to_user.data = 'Web server shows comments to the end user' -web_to_user.responseTo = user_to_web + db_to_web = Dataflow(db, web, "Retrieve comments") + db_to_web.protocol = "MySQL" + db_to_web.dstPort = 80 + db_to_web.data = 'Web server retrieves comments from DB' + db_to_web.responseTo = web_to_db -my_lambda_to_db = Dataflow(my_lambda, db, "Lambda periodically cleans DB") -my_lambda_to_db.protocol = "MySQL" -my_lambda_to_db.dstPort = 3306 -my_lambda_to_db.data = "Lamda clears DB every 6 hours" + web_to_user = Dataflow(web, user, "Show comments (*)") + web_to_user.protocol = "HTTP" + web_to_user.data = 'Web server shows comments to the end user' + web_to_user.responseTo = user_to_web -tm = TM("my test tm") -tm.description = """This is a sample threat model of a very simple system - a web-based -comment system. The user enters comments and these are added to a database and displayed -back to the user. The thought is that it is, though simple, a complete enough example -to express meaningful threats.""" -tm.isOrdered = True -tm.mergeResponses = True -tm.elements = [user_to_web, web_to_db, db_to_web, web_to_user, my_lambda_to_db] + my_lambda_to_db = Dataflow(my_lambda, db, "Lambda periodically cleans DB") + my_lambda_to_db.protocol = "MySQL" + my_lambda_to_db.dstPort = 3306 + my_lambda_to_db.data = "Lamda clears DB every 6 hours" if __name__ == "__main__": tm.process()