-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue #222: fixed Findings creating spuriois Elements #223
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -242,15 +242,16 @@ def __ne__(self, other): | |
def __str__(self): | ||
return ", ".join(sorted(set(d.name for d in self))) | ||
|
||
|
||
class varControls(var): | ||
def __set__(self, instance, value): | ||
if not isinstance(value, Controls): | ||
raise ValueError( | ||
"expecting an Controls " | ||
"value, got a {}".format(type(value)) | ||
"expecting an Controls " "value, got a {}".format(type(value)) | ||
) | ||
super().__set__(instance, value) | ||
|
||
|
||
class Action(Enum): | ||
"""Action taken when validating a threat model.""" | ||
|
||
|
@@ -442,18 +443,25 @@ def _apply_defaults(flows, data): | |
e._safeset("dstPort", e.sink.port) | ||
if hasattr(e.sink.controls, "isEncrypted"): | ||
e.controls._safeset("isEncrypted", e.sink.controls.isEncrypted) | ||
e.controls._safeset("authenticatesDestination", e.source.controls.authenticatesDestination) | ||
e.controls._safeset("checksDestinationRevocation", e.source.controls.checksDestinationRevocation) | ||
e.controls._safeset( | ||
"authenticatesDestination", e.source.controls.authenticatesDestination | ||
) | ||
e.controls._safeset( | ||
"checksDestinationRevocation", e.source.controls.checksDestinationRevocation | ||
) | ||
|
||
for d in e.data: | ||
if d.isStored: | ||
if hasattr(e.sink.controls, "isEncryptedAtRest"): | ||
for d in e.data: | ||
d._safeset("isDestEncryptedAtRest", e.sink.controls.isEncryptedAtRest) | ||
d._safeset( | ||
"isDestEncryptedAtRest", e.sink.controls.isEncryptedAtRest | ||
) | ||
if hasattr(e.source, "isEncryptedAtRest"): | ||
for d in e.data: | ||
d._safeset( | ||
"isSourceEncryptedAtRest", e.source.controls.isEncryptedAtRest | ||
"isSourceEncryptedAtRest", | ||
e.source.controls.isEncryptedAtRest, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change appears to go against the convention you have for other changes in this set - from a single line _safeset() to 2-line, but here you have 3-lines. Try to be consistent. |
||
) | ||
if d.credentialsLife != Lifetime.NONE and not d.isCredentials: | ||
d._safeset("isCredentials", True) | ||
|
@@ -522,28 +530,26 @@ def _describe_classes(classes): | |
|
||
def _list_elements(): | ||
"""List all elements which can be used in a threat model with the corresponding description""" | ||
|
||
def all_subclasses(cls): | ||
"""Get all sub classes of a class""" | ||
subclasses = set(cls.__subclasses__()) | ||
return subclasses.union( | ||
(s for c in subclasses for s in all_subclasses(c))) | ||
return subclasses.union((s for c in subclasses for s in all_subclasses(c))) | ||
|
||
def print_components(cls_list): | ||
elements = sorted(cls_list, key=lambda c: c.__name__) | ||
max_len = max((len(e.__name__) for e in elements)) | ||
for sc in elements: | ||
doc = sc.__doc__ if sc.__doc__ is not None else '' | ||
print(f'{sc.__name__:<{max_len}} -- {doc}') | ||
#print all elements | ||
print('Elements:') | ||
doc = sc.__doc__ if sc.__doc__ is not None else "" | ||
print(f"{sc.__name__:<{max_len}} -- {doc}") | ||
|
||
# print all elements | ||
print("Elements:") | ||
print_components(all_subclasses(Element)) | ||
|
||
# Print Attributes | ||
print('\nAtributes:') | ||
print_components( | ||
all_subclasses(OrderedEnum) | {Data, Action, Lifetime} | ||
) | ||
|
||
print("\nAtributes:") | ||
print_components(all_subclasses(OrderedEnum) | {Data, Action, Lifetime}) | ||
|
||
|
||
def _get_elements_and_boundaries(flows): | ||
|
@@ -661,7 +667,7 @@ def __init__( | |
if args: | ||
element = args[0] | ||
else: | ||
element = kwargs.pop("element", Element("invalid")) | ||
element = kwargs.pop("element", Element("created from Finding")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In class Finding, element is set to required. If Finding can be connected to the tm rather than a specific Element, it seems this might address the problem. A dummy element is being created just to satisfy this constraint. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Furthermore, Finding also has 2 attributes on associating with Elements:
Maybe we need to remove one? |
||
|
||
self.target = element.name | ||
self.element = element | ||
|
@@ -683,6 +689,7 @@ def __init__( | |
|
||
threat_id = kwargs.get("threat_id", None) | ||
for f in element.overrides: | ||
# we override on threat_id, which is | ||
if f.threat_id != threat_id: | ||
continue | ||
for i in dir(f.__class__): | ||
|
@@ -729,7 +736,14 @@ class TM: | |
_data = [] | ||
_threatsExcluded = [] | ||
_sf = None | ||
_duplicate_ignored_attrs = "name", "note", "order", "response", "responseTo", "controls" | ||
_duplicate_ignored_attrs = ( | ||
"name", | ||
"note", | ||
"order", | ||
"response", | ||
"responseTo", | ||
"controls", | ||
) | ||
name = varString("", required=True, doc="Model name") | ||
description = varString("", required=True, doc="Model description") | ||
threatsFile = varString( | ||
|
@@ -877,15 +891,14 @@ def _check_duplicates(self, flows): | |
|
||
left_controls_attrs = left.controls._attr_values() | ||
right_controls_attrs = right.controls._attr_values() | ||
#for a in self._duplicate_ignored_attrs: | ||
# for a in self._duplicate_ignored_attrs: | ||
# del left_controls_attrs[a], right_controls_attrs[a] | ||
if left_controls_attrs != right_controls_attrs: | ||
continue | ||
if self.onDuplicates == Action.IGNORE: | ||
right._is_drawn = True | ||
continue | ||
|
||
|
||
raise ValueError( | ||
"Duplicate Dataflow found between {} and {}: " | ||
"{} is same as {}".format( | ||
|
@@ -1087,7 +1100,6 @@ def _stale(self, days): | |
print(f"Checking for code {days} days older than this model.") | ||
|
||
for e in TM._elements: | ||
|
||
for src in e.sourceFiles: | ||
try: | ||
src_mtime = datetime.fromtimestamp( | ||
|
@@ -1164,6 +1176,7 @@ def get_table(self, db, klass): | |
] | ||
return db.define_table(name, fields) | ||
|
||
|
||
class Controls: | ||
"""Controls implemented by/on and Element""" | ||
|
||
|
@@ -1256,15 +1269,13 @@ def _attr_values(self): | |
result[i] = value | ||
return result | ||
|
||
|
||
def _safeset(self, attr, value): | ||
try: | ||
setattr(self, attr, value) | ||
except ValueError: | ||
pass | ||
|
||
|
||
|
||
class Element: | ||
"""A generic element""" | ||
|
||
|
@@ -1303,7 +1314,8 @@ def __init__(self, name, **kwargs): | |
self.controls = Controls() | ||
self.uuid = uuid.UUID(int=random.getrandbits(128)) | ||
self._is_drawn = False | ||
TM._elements.append(self) | ||
if self.name != "created from Finding": | ||
TM._elements.append(self) | ||
|
||
def __repr__(self): | ||
return "<{0}.{1}({2}) at {3}>".format( | ||
|
@@ -1607,7 +1619,7 @@ class Datastore(Asset): | |
* FILE_SYSTEM - files on a file system | ||
* SQL - A SQL Database | ||
* LDAP - An LDAP Server | ||
* AWS_S3 - An S3 Bucket within AWS""" | ||
* AWS_S3 - An S3 Bucket within AWS""", | ||
) | ||
|
||
def __init__(self, name, **kwargs): | ||
|
@@ -1874,27 +1886,29 @@ def serialize(obj, nested=False): | |
result[i.lstrip("_")] = value | ||
return result | ||
|
||
|
||
def encode_element_threat_data(obj): | ||
"""Used to html encode threat data from a list of Elements""" | ||
encoded_elements = [] | ||
if (type(obj) is not list): | ||
raise ValueError("expecting a list value, got a {}".format(type(value))) | ||
if type(obj) is not list: | ||
raise ValueError("expecting a list value, got a {}".format(type(obj))) | ||
|
||
for o in obj: | ||
c = copy.deepcopy(o) | ||
for a in o._attr_values(): | ||
if (a == "findings"): | ||
encoded_findings = encode_threat_data(o.findings) | ||
c._safeset("findings", encoded_findings) | ||
c = copy.deepcopy(o) | ||
for a in o._attr_values(): | ||
if a == "findings": | ||
encoded_findings = encode_threat_data(o.findings) | ||
c._safeset("findings", encoded_findings) | ||
else: | ||
v = getattr(o, a) | ||
if (type(v) is not list or (type(v) is list and len(v) != 0)): | ||
c._safeset(a, v) | ||
encoded_elements.append(c) | ||
v = getattr(o, a) | ||
if type(v) is not list or (type(v) is list and len(v) != 0): | ||
c._safeset(a, v) | ||
|
||
encoded_elements.append(c) | ||
|
||
return encoded_elements | ||
|
||
|
||
def encode_threat_data(obj): | ||
"""Used to html encode threat data from a list of threats or findings""" | ||
encoded_threat_data = [] | ||
|
@@ -1955,7 +1969,9 @@ def get_args(): | |
"--describe", help="describe the properties available for a given element" | ||
) | ||
_parser.add_argument( | ||
"--list-elements", action="store_true", help="list all elements which can be part of a threat model" | ||
"--list-elements", | ||
action="store_true", | ||
help="list all elements which can be part of a threat model", | ||
) | ||
_parser.add_argument("--json", help="output a JSON file") | ||
_parser.add_argument( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.