Skip to content

Commit

Permalink
add build context
Browse files Browse the repository at this point in the history
  • Loading branch information
nineinchnick committed Jun 21, 2020
1 parent a459d66 commit 296b85d
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 90 deletions.
75 changes: 38 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,53 +64,54 @@ that periodically cleans the Database.

from pytm.pytm import TM, Server, Datastore, Dataflow, Boundary, Actor, Lambda

User_Web = Boundary("User/Web")
Web_DB = Boundary("Web/DB")
tm = TM("my test tm")
tm.description = "another test tm"
tm.isOrdered = True

user = Actor("User")
user.inBoundary = User_Web
with tm.build():
User_Web = Boundary("User/Web")
Web_DB = Boundary("Web/DB")

web = Server("Web Server")
web.OS = "CloudOS"
web.isHardened = True
user = Actor("User")
user.inBoundary = User_Web

db = Datastore("SQL Database (*)")
db.OS = "CentOS"
db.isHardened = False
db.inBoundary = Web_DB
db.isSql = True
db.inScope = False
web = Server("Web Server")
web.OS = "CloudOS"
web.isHardened = True

my_lambda = Lambda("cleanDBevery6hours")
my_lambda.hasAccessControl = True
my_lambda.inBoundary = Web_DB
db = Datastore("SQL Database (*)")
db.OS = "CentOS"
db.isHardened = False
db.inBoundary = Web_DB
db.isSql = True
db.inScope = False

my_lambda_to_db = Dataflow(my_lambda, db, "(λ)Periodically cleans DB")
my_lambda_to_db.protocol = "SQL"
my_lambda_to_db.dstPort = 3306
my_lambda = Lambda("cleanDBevery6hours")
my_lambda.hasAccessControl = True
my_lambda.inBoundary = Web_DB

user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.protocol = "HTTP"
user_to_web.dstPort = 80
user_to_web.data = 'Comments in HTML or Markdown'
my_lambda_to_db = Dataflow(my_lambda, db, "(λ)Periodically cleans DB")
my_lambda_to_db.protocol = "SQL"
my_lambda_to_db.dstPort = 3306

web_to_user = Dataflow(web, user, "Comments saved (*)")
web_to_user.protocol = "HTTP"
web_to_user.data = 'Ack of saving or error message, in JSON'
user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.protocol = "HTTP"
user_to_web.dstPort = 80
user_to_web.data = 'Comments in HTML or Markdown'

web_to_db = Dataflow(web, db, "Insert query with comments")
web_to_db.protocol = "MySQL"
web_to_db.dstPort = 3306
web_to_db.data = 'MySQL insert statement, all literals'
web_to_user = Dataflow(web, user, "Comments saved (*)")
web_to_user.protocol = "HTTP"
web_to_user.data = 'Ack of saving or error message, in JSON'

db_to_web = Dataflow(db, web, "Comments contents")
db_to_web.protocol = "MySQL"
db_to_web.data = 'Results of insert op'
web_to_db = Dataflow(web, db, "Insert query with comments")
web_to_db.protocol = "MySQL"
web_to_db.dstPort = 3306
web_to_db.data = 'MySQL insert statement, all literals'

db_to_web = Dataflow(db, web, "Comments contents")
db_to_web.protocol = "MySQL"
db_to_web.data = 'Results of insert op'

tm = TM("my test tm")
tm.description = "another test tm"
tm.isOrdered = True
tm.elements = [my_lambda_to_db, user_to_web, web_to_user, web_to_db, db_to_web]
tm.process()

```
Expand Down
22 changes: 20 additions & 2 deletions pytm/pytm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import uuid
from collections import defaultdict
from collections.abc import Iterable
from contextlib import contextmanager
from enum import Enum
from hashlib import sha224
from itertools import combinations
Expand Down Expand Up @@ -373,6 +374,7 @@ def __str__(self):
class TM():
"""Describes the threat model administratively, and holds all details during a run"""

_contexts = []
_sf = None
_duplicate_ignored_attrs = "name", "note", "order", "response", "responseTo"
name = varString("", required=True, doc="Model name")
Expand Down Expand Up @@ -513,7 +515,7 @@ def _check_duplicates(self, flows):

raise ValueError(
"Duplicate Dataflow found between {} and {}: "
"{} is same as {}".format(left.source, left.sink, left, right,)
"{} is same as {}".format(left.source, left.sink, left, right)
)

def dfd(self):
Expand Down Expand Up @@ -586,6 +588,15 @@ def process(self):
[print("{} - {}".format(t.id, t.description)) for t in self._threats]
sys.exit(0)

@contextmanager
def build(self):
c = []
TM._contexts.append(c)
try:
yield c
finally:
self.elements = TM._contexts.pop()


class Element():
"""A generic element"""
Expand Down Expand Up @@ -979,6 +990,8 @@ def __init__(self, source, sink, name, **kwargs):
self.source = source
self.sink = sink
super().__init__(name, **kwargs)
for c in TM._contexts:
c.append(self)

def __set__(self, instance, value):
print("Should not have gotten here.")
Expand Down Expand Up @@ -1009,11 +1022,16 @@ def dfd(self, mergeResponses=False, **kwargs):
class Boundary(Element):
"""Trust boundary"""

elements = varElements([])
elements = varElements([], onSet=lambda i, v: i._init_elements())

def __init__(self, name, **kwargs):
super().__init__(name, **kwargs)

def _init_elements(self):
for e in self.elements:
if e.inBoundary != self:
e.inBoundary = self

def dfd(self):
if self._is_drawn:
return
Expand Down
70 changes: 70 additions & 0 deletions tests/test_pytmfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,76 @@ def test_eager_init(self):
self.assertEqual(len(tm._boundaries), 3)
self.assertEqual(len(tm._flows), 5)

def test_context_init(self):
tm = TM("my test tm", description="desc", isOrdered=True)

user = Actor("User", protocol="HTTP")
web = Server(
"Web Server",
OS="Ubuntu",
protocol="HTTP",
dstPort=80,
isHardened=True,
sanitizesInput=False,
encodesOutput=True,
authorizesSource=False,
)
db = Datastore(
"SQL Database",
OS="CentOS",
protocol="MySQL",
dstPort=3306,
isHardened=False,
isSQL=True,
)
my_lambda = Lambda("AWS Lambda", hasAccessControl=True)
aws = Boundary("AWS VPC", elements=[my_lambda])
internet = Boundary("Internet", elements=[user])
backend = Boundary("Server/DB", elements=[web, db])

with tm.build():
req = Dataflow(
user,
web,
"User enters comments (*)",
data="Comments in HTML or Markdown",
note="This is a simple web app.",
)
insert = Dataflow(
web,
db,
"Insert query with comments",
data="MySQL insert statement, all literals",
note="Web server inserts user comments.",
)
query = Dataflow(
db,
web,
"Retrieve comments",
protocol="MySQL",
data="Web server retrieves comments from DB",
)
resp = Dataflow(
web,
user,
"Show comments (*)",
data="Web server shows comments to the end user",
)
job = Dataflow(
my_lambda,
db,
"Lambda periodically cleans DB",
data="Lamda clears DB every 6 hours",
)

assets = [user, web, db, my_lambda]
boundaries = [aws, internet, backend]
elements = [req, insert, query, resp, job]
self.assertTrue(tm.check())
self.assertListEqual(tm._elements, assets + boundaries + elements)
self.assertEqual(len(tm._boundaries), 3)
self.assertEqual(len(tm._flows), 5)


class TestThreats(unittest.TestCase):
# Test for all the threats in threats.py - test Threat.apply() function
Expand Down
102 changes: 51 additions & 51 deletions tm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,67 +7,67 @@
# make sure generated diagrams do not change, makes sense if they're commited
random.seed(0)

internet = Boundary("Internet")
server_db = Boundary("Server/DB")
vpc = Boundary("AWS VPC")
tm = TM("my test tm")
tm.description = """This is a sample threat model of a very simple system - a web-based
comment system. The user enters comments and these are added to a database and displayed
back to the user. The thought is that it is, though simple, a complete enough example
to express meaningful threats."""
tm.isOrdered = True
tm.mergeResponses = True

user = Actor("User")
user.inBoundary = internet
with tm.build():
internet = Boundary("Internet")
server_db = Boundary("Server/DB")
vpc = Boundary("AWS VPC")

web = Server("Web Server")
web.OS = "Ubuntu"
web.isHardened = True
web.sanitizesInput = False
web.encodesOutput = True
web.authorizesSource = False
user = Actor("User")
user.inBoundary = internet

db = Datastore("SQL Database")
db.OS = "CentOS"
db.isHardened = False
db.inBoundary = server_db
db.isSQL = True
db.inScope = True
web = Server("Web Server")
web.OS = "Ubuntu"
web.isHardened = True
web.sanitizesInput = False
web.encodesOutput = True
web.authorizesSource = False

my_lambda = Lambda("AWS Lambda")
my_lambda.hasAccessControl = True
my_lambda.inBoundary = vpc
db = Datastore("SQL Database")
db.OS = "CentOS"
db.isHardened = False
db.inBoundary = server_db
db.isSQL = True
db.inScope = True

user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.protocol = "HTTP"
user_to_web.dstPort = 80
user_to_web.data = 'Comments in HTML or Markdown'
user_to_web.note = "This is a simple web app\nthat stores and retrieves user comments."
my_lambda = Lambda("AWS Lambda")
my_lambda.hasAccessControl = True
my_lambda.inBoundary = vpc

web_to_db = Dataflow(web, db, "Insert query with comments")
web_to_db.protocol = "MySQL"
web_to_db.dstPort = 3306
web_to_db.data = 'MySQL insert statement, all literals'
web_to_db.note = "Web server inserts user comments\ninto it's SQL query and stores them in the DB."
user_to_web = Dataflow(user, web, "User enters comments (*)")
user_to_web.protocol = "HTTP"
user_to_web.dstPort = 80
user_to_web.data = 'Comments in HTML or Markdown'
user_to_web.note = "This is a simple web app\nthat stores and retrieves user comments."

db_to_web = Dataflow(db, web, "Retrieve comments")
db_to_web.protocol = "MySQL"
db_to_web.dstPort = 80
db_to_web.data = 'Web server retrieves comments from DB'
db_to_web.responseTo = web_to_db
web_to_db = Dataflow(web, db, "Insert query with comments")
web_to_db.protocol = "MySQL"
web_to_db.dstPort = 3306
web_to_db.data = 'MySQL insert statement, all literals'
web_to_db.note = "Web server inserts user comments\ninto it's SQL query and stores them in the DB."

web_to_user = Dataflow(web, user, "Show comments (*)")
web_to_user.protocol = "HTTP"
web_to_user.data = 'Web server shows comments to the end user'
web_to_user.responseTo = user_to_web
db_to_web = Dataflow(db, web, "Retrieve comments")
db_to_web.protocol = "MySQL"
db_to_web.dstPort = 80
db_to_web.data = 'Web server retrieves comments from DB'
db_to_web.responseTo = web_to_db

my_lambda_to_db = Dataflow(my_lambda, db, "Lambda periodically cleans DB")
my_lambda_to_db.protocol = "MySQL"
my_lambda_to_db.dstPort = 3306
my_lambda_to_db.data = "Lamda clears DB every 6 hours"
web_to_user = Dataflow(web, user, "Show comments (*)")
web_to_user.protocol = "HTTP"
web_to_user.data = 'Web server shows comments to the end user'
web_to_user.responseTo = user_to_web

tm = TM("my test tm")
tm.description = """This is a sample threat model of a very simple system - a web-based
comment system. The user enters comments and these are added to a database and displayed
back to the user. The thought is that it is, though simple, a complete enough example
to express meaningful threats."""
tm.isOrdered = True
tm.mergeResponses = True
tm.elements = [user_to_web, web_to_db, db_to_web, web_to_user, my_lambda_to_db]
my_lambda_to_db = Dataflow(my_lambda, db, "Lambda periodically cleans DB")
my_lambda_to_db.protocol = "MySQL"
my_lambda_to_db.dstPort = 3306
my_lambda_to_db.data = "Lamda clears DB every 6 hours"

if __name__ == "__main__":
tm.process()

0 comments on commit 296b85d

Please sign in to comment.