Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use numbered dataflow labels on seq diagram #94

Merged
merged 1 commit into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

## New features

- Use numbered dataflow labels in sequence diagram [#94](https://github.com/izar/pytm/pull/94)
- Move authenticateDestination to base Element [#88](https://github.com/izar/pytm/pull/88)
- Assign inputs and outputs to all elements [#89](https://github.com/izar/pytm/pull/89)
- Allow detecting and/or hiding duplicate dataflows by setting `TM.onDuplicates` [#100](https://github.com/izar/pytm/pull/100)
Expand Down
92 changes: 68 additions & 24 deletions pytm/pytm.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

''' Helper functions '''

''' The base for this (descriptors instead of properties) has been shamelessly lifted from https://nbviewer.jupyter.org/urls/gist.github.com/ChrisBeaumont/5758381/raw/descriptor_writeup.ipynb
''' The base for this (descriptors instead of properties) has been
shamelessly lifted from
https://nbviewer.jupyter.org/urls/gist.github.com/ChrisBeaumont/5758381/raw/descriptor_writeup.ipynb
By Chris Beaumont
'''

Expand Down Expand Up @@ -284,7 +286,8 @@ class Threat():

id = varString("", required=True)
description = varString("")
condition = varString("", doc="a Python expression that should evaluate to a boolean True or False")
condition = varString("", doc="""a Python expression that should evaluate
to a boolean True or False""")
details = varString("")
severity = varString("")
mitigations = varString("")
Expand Down Expand Up @@ -323,7 +326,8 @@ def apply(self, target):


class Finding():
"""Represents a Finding - the element in question and a description of the finding """
"""Represents a Finding - the element in question
and a description of the finding"""

element = varElement(None, required=True, doc="Element this finding applies to")
target = varString("", doc="Name of the element this finding applies to")
Expand Down Expand Up @@ -359,7 +363,8 @@ def __init__(


class TM():
"""Describes the threat model administratively, and holds all details during a run"""
"""Describes the threat model administratively,
and holds all details during a run"""

_BagOfFlows = []
_BagOfElements = []
Expand All @@ -377,7 +382,8 @@ class TM():
mergeResponses = varBool(False, doc="Merge response edges in DFDs")
ignoreUnused = varBool(False, doc="Ignore elements not used in any Dataflow")
findings = varFindings([], doc="threats found for elements of this model")
onDuplicates = varAction(Action.NO_ACTION, doc="How to handle duplicate Dataflow with same properties, except name and notes")
onDuplicates = varAction(Action.NO_ACTION, doc="""How to handle duplicate Dataflow
with same properties, except name and notes""")

def __init__(self, name, **kwargs):
for key, value in kwargs.items():
Expand Down Expand Up @@ -415,7 +421,16 @@ def resolve(self):
for t in TM._BagOfThreats:
if not t.apply(e):
continue
f = Finding(e, t.description, t.details, t.severity, t.mitigations, t.example, t.id, t.references)
f = Finding(
e,
t.description,
t.details,
t.severity,
t.mitigations,
t.example,
t.id,
t.references,
)
findings.append(f)
elements[e].append(f)
self.findings = findings
Expand All @@ -424,12 +439,15 @@ def resolve(self):

def check(self):
if self.description is None:
raise ValueError("Every threat model should have at least a brief description of the system being modeled.")
raise ValueError("""Every threat model should have at least
a brief description of the system being modeled.""")
TM._BagOfFlows = _match_responses(_sort(TM._BagOfFlows, self.isOrdered))
self._check_duplicates(TM._BagOfFlows)
_apply_defaults(TM._BagOfFlows)
if self.ignoreUnused:
TM._BagOfElements, TM._BagOfBoundaries = _get_elements_and_boundaries(TM._BagOfFlows)
TM._BagOfElements, TM._BagOfBoundaries = _get_elements_and_boundaries(
TM._BagOfFlows
)
result = True
for e in (TM._BagOfElements):
if not e.check():
Expand Down Expand Up @@ -514,15 +532,23 @@ def seq(self):
participants = []
for e in TM._BagOfElements:
if isinstance(e, Actor):
participants.append("actor {0} as \"{1}\"".format(e._uniq_name(), e.name))
participants.append(
'actor {0} as "{1}"'.format(e._uniq_name(), e.display_name())
)
elif isinstance(e, Datastore):
participants.append("database {0} as \"{1}\"".format(e._uniq_name(), e.name))
participants.append(
'database {0} as "{1}"'.format(e._uniq_name(), e.display_name())
)
elif not isinstance(e, Dataflow) and not isinstance(e, Boundary):
participants.append("entity {0} as \"{1}\"".format(e._uniq_name(), e.name))
participants.append(
'entity {0} as "{1}"'.format(e._uniq_name(), e.display_name())
)

messages = []
for e in TM._BagOfFlows:
message = "{0} -> {1}: {2}".format(e.source._uniq_name(), e.sink._uniq_name(), e.name)
message = "{0} -> {1}: {2}".format(
e.source._uniq_name(), e.sink._uniq_name(), e.display_name()
)
note = ""
if e.note != "":
note = "\nnote left\n{}\nend note".format(e.note)
Expand Down Expand Up @@ -712,7 +738,6 @@ def inside(self, *boundaries):
return True
return False


def _attr_values(self):
klass = self.__class__
result = {}
Expand Down Expand Up @@ -755,7 +780,7 @@ def __init__(self, name, **kwargs):

def _dfd_template(self):
return """{uniq_name} [
shape = none;
shape = {shape};
fixedsize = shape;
image = "{image}";
imagescale = true;
Expand All @@ -775,9 +800,13 @@ def dfd(self, **kwargs):
uniq_name=self._uniq_name(),
label=self._label(),
color=self._color(),
shape=self._shape(),
image=os.path.join(os.path.dirname(__file__), "images", "lambda.png"),
)

def _shape(self):
return "none"


class Server(Element):
"""An entity processing data"""
Expand Down Expand Up @@ -877,7 +906,7 @@ def __init__(self, name, **kwargs):

def _dfd_template(self):
return """{uniq_name} [
shape = none;
shape = {shape};
color = {color};
fontcolor = {color};
label = <
Expand All @@ -888,6 +917,9 @@ def _dfd_template(self):
]
"""

def _shape(self):
return "none"


class Actor(Element):
"""An entity usually initiating actions"""
Expand Down Expand Up @@ -1043,9 +1075,9 @@ def _dfd_template(self):
return """subgraph cluster_{uniq_name} {{
graph [
fontsize = 10;
fontcolor = firebrick2;
fontcolor = {color};
style = dashed;
color = firebrick2;
color = {color};
label = <<i>{label}</i>>;
]

Expand All @@ -1069,19 +1101,31 @@ def dfd(self):
return self._dfd_template().format(
uniq_name=self._uniq_name(),
label=self._label(),
color=self._color(),
edges=indent("\n".join(edges), " "),
)

def _color(self):
return "firebrick2"


def get_args():
_parser = argparse.ArgumentParser()
_parser.add_argument('--debug', action='store_true', help='print debug messages')
_parser.add_argument('--dfd', action='store_true', help='output DFD')
_parser.add_argument('--report', help='output report using the named template file (sample template file is under docs/template.md)')
_parser.add_argument('--exclude', help='specify threat IDs to be ignored')
_parser.add_argument('--seq', action='store_true', help='output sequential diagram')
_parser.add_argument('--list', action='store_true', help='list all available threats')
_parser.add_argument('--describe', help='describe the properties available for a given element')
_parser.add_argument("--debug", action="store_true", help="print debug messages")
_parser.add_argument("--dfd", action="store_true", help="output DFD")
_parser.add_argument(
"--report",
help="""output report using the named template file
(sample template file is under docs/template.md)""",
)
_parser.add_argument("--exclude", help="specify threat IDs to be ignored")
_parser.add_argument("--seq", action="store_true", help="output sequential diagram")
_parser.add_argument(
"--list", action="store_true", help="list all available threats"
)
_parser.add_argument(
"--describe", help="describe the properties available for a given element"
)

_args = _parser.parse_args()
return _args
8 changes: 4 additions & 4 deletions tests/seq.plantuml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ actor actor_User_579e9aae81 as "User"
entity server_WebServer_f2eb7a3ff7 as "Web Server"
database datastore_SQLDatabase_d2006ce1bb as "SQL Database"

actor_User_579e9aae81 -> server_WebServer_f2eb7a3ff7: User enters comments (*)
actor_User_579e9aae81 -> server_WebServer_f2eb7a3ff7: (1) User enters comments (*)
note left
bbb
end note
server_WebServer_f2eb7a3ff7 -> datastore_SQLDatabase_d2006ce1bb: Insert query with comments
server_WebServer_f2eb7a3ff7 -> datastore_SQLDatabase_d2006ce1bb: (2) Insert query with comments
note left
ccc
end note
datastore_SQLDatabase_d2006ce1bb -> server_WebServer_f2eb7a3ff7: Retrieve comments
server_WebServer_f2eb7a3ff7 -> actor_User_579e9aae81: Show comments (*)
datastore_SQLDatabase_d2006ce1bb -> server_WebServer_f2eb7a3ff7: (3) Retrieve comments
server_WebServer_f2eb7a3ff7 -> actor_User_579e9aae81: (4) Show comments (*)
@enduml
7 changes: 4 additions & 3 deletions tests/test_pytmfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def test_seq(self):

TM.reset()
tm = TM("my test tm", description="aaa")
tm.isOrdered = True
internet = Boundary("Internet")
server_db = Boundary("Server/DB")
user = Actor("User", inBoundary=internet)
Expand All @@ -47,7 +48,7 @@ def test_seq(self):
Dataflow(db, web, "Retrieve comments")
Dataflow(web, user, "Show comments (*)")

tm.check()
self.assertTrue(tm.check())
output = tm.seq()

self.maxDiff = None
Expand All @@ -73,7 +74,7 @@ def test_seq_unused(self):
Dataflow(db, web, "Retrieve comments")
Dataflow(web, user, "Show comments (*)")

tm.check()
self.assertTrue(tm.check())
output = tm.seq()

self.maxDiff = None
Expand All @@ -99,7 +100,7 @@ def test_dfd(self):
Dataflow(db, web, "Retrieve comments")
Dataflow(web, user, "Show comments (*)")

tm.check()
self.assertTrue(tm.check())
output = tm.dfd()

self.maxDiff = None
Expand Down