Skip to content

Commit

Permalink
use numbered dataflow labels on seq diagram
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Was authored and Jan Waś committed Sep 10, 2020
1 parent cf3bb77 commit f2b70df
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

## New features

- Use numbered dataflow labels in sequence diagram [#94](https://github.com/izar/pytm/pull/94)
- Move authenticateDestination to base Element [#88](https://github.com/izar/pytm/pull/88)
- Assign inputs and outputs to all elements [#89](https://github.com/izar/pytm/pull/89)
- Allow detecting and/or hiding duplicate dataflows by setting `TM.onDuplicates` [#100](https://github.com/izar/pytm/pull/100)
Expand Down
82 changes: 62 additions & 20 deletions pytm/pytm.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

''' Helper functions '''

''' The base for this (descriptors instead of properties) has been shamelessly lifted from https://nbviewer.jupyter.org/urls/gist.github.com/ChrisBeaumont/5758381/raw/descriptor_writeup.ipynb
''' The base for this (descriptors instead of properties) has been
shamelessly lifted from
https://nbviewer.jupyter.org/urls/gist.github.com/ChrisBeaumont/5758381/raw/descriptor_writeup.ipynb
By Chris Beaumont
'''

Expand Down Expand Up @@ -284,7 +286,8 @@ class Threat():

id = varString("", required=True)
description = varString("")
condition = varString("", doc="a Python expression that should evaluate to a boolean True or False")
condition = varString("", doc="""a Python expression that should evaluate
to a boolean True or False""")
details = varString("")
severity = varString("")
mitigations = varString("")
Expand Down Expand Up @@ -323,7 +326,8 @@ def apply(self, target):


class Finding():
"""Represents a Finding - the element in question and a description of the finding """
"""Represents a Finding - the element in question
and a description of the finding"""

element = varElement(None, required=True, doc="Element this finding applies to")
target = varString("", doc="Name of the element this finding applies to")
Expand Down Expand Up @@ -359,7 +363,8 @@ def __init__(


class TM():
"""Describes the threat model administratively, and holds all details during a run"""
"""Describes the threat model administratively,
and holds all details during a run"""

_BagOfFlows = []
_BagOfElements = []
Expand All @@ -377,7 +382,8 @@ class TM():
mergeResponses = varBool(False, doc="Merge response edges in DFDs")
ignoreUnused = varBool(False, doc="Ignore elements not used in any Dataflow")
findings = varFindings([], doc="threats found for elements of this model")
onDuplicates = varAction(Action.NO_ACTION, doc="How to handle duplicate Dataflow with same properties, except name and notes")
onDuplicates = varAction(Action.NO_ACTION, doc="""How to handle duplicate Dataflow
with same properties, except name and notes""")

def __init__(self, name, **kwargs):
for key, value in kwargs.items():
Expand Down Expand Up @@ -415,7 +421,16 @@ def resolve(self):
for t in TM._BagOfThreats:
if not t.apply(e):
continue
f = Finding(e, t.description, t.details, t.severity, t.mitigations, t.example, t.id, t.references)
f = Finding(
e,
t.description,
t.details,
t.severity,
t.mitigations,
t.example,
t.id,
t.references,
)
findings.append(f)
elements[e].append(f)
self.findings = findings
Expand All @@ -424,12 +439,15 @@ def resolve(self):

def check(self):
if self.description is None:
raise ValueError("Every threat model should have at least a brief description of the system being modeled.")
raise ValueError("""Every threat model should have at least
a brief description of the system being modeled.""")
TM._BagOfFlows = _match_responses(_sort(TM._BagOfFlows, self.isOrdered))
self._check_duplicates(TM._BagOfFlows)
_apply_defaults(TM._BagOfFlows)
if self.ignoreUnused:
TM._BagOfElements, TM._BagOfBoundaries = _get_elements_and_boundaries(TM._BagOfFlows)
TM._BagOfElements, TM._BagOfBoundaries = _get_elements_and_boundaries(
TM._BagOfFlows
)
result = True
for e in (TM._BagOfElements):
if not e.check():
Expand Down Expand Up @@ -514,15 +532,23 @@ def seq(self):
participants = []
for e in TM._BagOfElements:
if isinstance(e, Actor):
participants.append("actor {0} as \"{1}\"".format(e._uniq_name(), e.name))
participants.append(
'actor {0} as "{1}"'.format(e._uniq_name(), e.display_name())
)
elif isinstance(e, Datastore):
participants.append("database {0} as \"{1}\"".format(e._uniq_name(), e.name))
participants.append(
'database {0} as "{1}"'.format(e._uniq_name(), e.display_name())
)
elif not isinstance(e, Dataflow) and not isinstance(e, Boundary):
participants.append("entity {0} as \"{1}\"".format(e._uniq_name(), e.name))
participants.append(
'entity {0} as "{1}"'.format(e._uniq_name(), e.display_name())
)

messages = []
for e in TM._BagOfFlows:
message = "{0} -> {1}: {2}".format(e.source._uniq_name(), e.sink._uniq_name(), e.name)
message = "{0} -> {1}: {2}".format(
e.source._uniq_name(), e.sink._uniq_name(), e.display_name()
)
note = ""
if e.note != "":
note = "\nnote left\n{}\nend note".format(e.note)
Expand Down Expand Up @@ -712,7 +738,6 @@ def inside(self, *boundaries):
return True
return False


def _attr_values(self):
klass = self.__class__
result = {}
Expand Down Expand Up @@ -778,6 +803,9 @@ def dfd(self, **kwargs):
image=os.path.join(os.path.dirname(__file__), "images", "lambda.png"),
)

def _shape(self):
return "none"


class Server(Element):
"""An entity processing data"""
Expand Down Expand Up @@ -888,6 +916,9 @@ def _dfd_template(self):
]
"""

def _shape(self):
return "none"


class Actor(Element):
"""An entity usually initiating actions"""
Expand Down Expand Up @@ -1072,16 +1103,27 @@ def dfd(self):
edges=indent("\n".join(edges), " "),
)

def _color(self):
return "firebrick2"


def get_args():
_parser = argparse.ArgumentParser()
_parser.add_argument('--debug', action='store_true', help='print debug messages')
_parser.add_argument('--dfd', action='store_true', help='output DFD')
_parser.add_argument('--report', help='output report using the named template file (sample template file is under docs/template.md)')
_parser.add_argument('--exclude', help='specify threat IDs to be ignored')
_parser.add_argument('--seq', action='store_true', help='output sequential diagram')
_parser.add_argument('--list', action='store_true', help='list all available threats')
_parser.add_argument('--describe', help='describe the properties available for a given element')
_parser.add_argument("--debug", action="store_true", help="print debug messages")
_parser.add_argument("--dfd", action="store_true", help="output DFD")
_parser.add_argument(
"--report",
help="""output report using the named template file
(sample template file is under docs/template.md)""",
)
_parser.add_argument("--exclude", help="specify threat IDs to be ignored")
_parser.add_argument("--seq", action="store_true", help="output sequential diagram")
_parser.add_argument(
"--list", action="store_true", help="list all available threats"
)
_parser.add_argument(
"--describe", help="describe the properties available for a given element"
)

_args = _parser.parse_args()
return _args
8 changes: 4 additions & 4 deletions tests/seq.plantuml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ actor actor_User_579e9aae81 as "User"
entity server_WebServer_f2eb7a3ff7 as "Web Server"
database datastore_SQLDatabase_d2006ce1bb as "SQL Database"

actor_User_579e9aae81 -> server_WebServer_f2eb7a3ff7: User enters comments (*)
actor_User_579e9aae81 -> server_WebServer_f2eb7a3ff7: (1) User enters comments (*)
note left
bbb
end note
server_WebServer_f2eb7a3ff7 -> datastore_SQLDatabase_d2006ce1bb: Insert query with comments
server_WebServer_f2eb7a3ff7 -> datastore_SQLDatabase_d2006ce1bb: (2) Insert query with comments
note left
ccc
end note
datastore_SQLDatabase_d2006ce1bb -> server_WebServer_f2eb7a3ff7: Retrieve comments
server_WebServer_f2eb7a3ff7 -> actor_User_579e9aae81: Show comments (*)
datastore_SQLDatabase_d2006ce1bb -> server_WebServer_f2eb7a3ff7: (3) Retrieve comments
server_WebServer_f2eb7a3ff7 -> actor_User_579e9aae81: (4) Show comments (*)
@enduml
7 changes: 4 additions & 3 deletions tests/test_pytmfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def test_seq(self):

TM.reset()
tm = TM("my test tm", description="aaa")
tm.isOrdered = True
internet = Boundary("Internet")
server_db = Boundary("Server/DB")
user = Actor("User", inBoundary=internet)
Expand All @@ -47,7 +48,7 @@ def test_seq(self):
Dataflow(db, web, "Retrieve comments")
Dataflow(web, user, "Show comments (*)")

tm.check()
self.assertTrue(tm.check())
output = tm.seq()

self.maxDiff = None
Expand All @@ -73,7 +74,7 @@ def test_seq_unused(self):
Dataflow(db, web, "Retrieve comments")
Dataflow(web, user, "Show comments (*)")

tm.check()
self.assertTrue(tm.check())
output = tm.seq()

self.maxDiff = None
Expand All @@ -99,7 +100,7 @@ def test_dfd(self):
Dataflow(db, web, "Retrieve comments")
Dataflow(web, user, "Show comments (*)")

tm.check()
self.assertTrue(tm.check())
output = tm.dfd()

self.maxDiff = None
Expand Down

0 comments on commit f2b70df

Please sign in to comment.