Skip to content

Commit

Permalink
threat matching methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Was committed Mar 19, 2020
1 parent 1c8bbcb commit 725b906
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 7 deletions.
72 changes: 66 additions & 6 deletions pytm/pytm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import argparse
import inspect
import json
import logging
import random
import sys
import uuid
from collections import defaultdict
from collections.abc import Iterable
from hashlib import sha224
from os.path import dirname
from re import match
Expand Down Expand Up @@ -178,13 +181,23 @@ def __init__(self, json_read):
self.example = json_read['example']
self.references = json_read['references']

def apply(self, target):
if type(self.target) is list:
if target.__class__.__name__ not in self.target:
return None
if not isinstance(self.target, str) and isinstance(self.target, Iterable):
self.target = tuple(self.target)
else:
if target.__class__.__name__ is not self.target:
return None
self.target = (self.target,)
self.target = tuple(getattr(sys.modules[__name__], x) for x in self.target)

def __repr__(self):
return "<{0}.{1}({2}) at {3}>".format(
self.__module__, type(self).__name__, self.id, hex(id(self))
)

def __str__(self):
return "{0}({1})".format(type(self).__name__, self.id)

def apply(self, target):
if not isinstance(target, self.target):
return None
return eval(self.condition)


Expand Down Expand Up @@ -384,6 +397,53 @@ def _safeset(self, attr, value):
except ValueError:
pass

def oneOf(self, *elements):
for element in elements:
if inspect.isclass(element):
if isinstance(self, element):
return True
elif self is element:
return True
return False

def crosses(self, *boundaries):
if self.source.inBoundary is self.sink.inBoundary:
return False
for boundary in boundaries:
if inspect.isclass(boundary):
if (
(
isinstance(self.source.inBoundary, boundary)
and not isinstance(self.sink.inBoundary, boundary)
)
or (
not isinstance(self.source.inBoundary, boundary)
and isinstance(self.sink.inBoundary, boundary)
)
or self.source.inBoundary is not self.sink.inBoundary
):
return True
elif (self.source.inside(boundary) and not self.sink.inside(boundary)) or (
not self.source.inside(boundary) and self.sink.inside(boundary)
):
return True
return False

def enters(self, *boundaries):
return self.source.inBoundary is None and self.sink.inside(*boundaries)

def exits(self, *boundaries):
return self.source.inside(*boundaries) and self.sink.inBoundary is None

def inside(self, *boundaries):
for boundary in boundaries:
if inspect.isclass(boundary):
if isinstance(self.inBoundary, boundary):
return True
elif self.inBoundary is boundary:
return True
return False


class Lambda(Element):
onAWS = varBool(True)
Expand Down
53 changes: 52 additions & 1 deletion tests/test_private_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import unittest
import random

from pytm.pytm import Actor, Boundary, Dataflow, Datastore, Server, TM
from pytm.pytm import Actor, Boundary, Dataflow, Datastore, Server, TM, Threat


class TestUniqueNames(unittest.TestCase):
Expand Down Expand Up @@ -129,3 +129,54 @@ def test_defaults(self):
self.assertEqual(resp_post.isEncrypted, server.isEncrypted)
self.assertEqual(resp_post.protocol, server.protocol)
self.assertEqual(resp_post.data, server.data)


class TestMethod(unittest.TestCase):

def test_defaults(self):
internet = Boundary("Internet")
cloud = Boundary("Cloud")
user = Actor("User", inBoundary=internet)
server = Server("Server")
db = Datastore("DB", inBoundary=cloud)
func = Datastore("Lambda function", inBoundary=cloud)
request = Dataflow(user, server, "request")
response = Dataflow(server, user, "response")
user_query = Dataflow(user, db, "user query")
server_query = Dataflow(server, db, "server query")
func_query = Dataflow(func, db, "func query")

default = {
"SID": "",
"description": "",
"condition": "",
"target": ["Actor", "Boundary", "Dataflow", "Datastore", "Server"],
"details": "",
"severity": "",
"mitigations": "",
"example": "",
"references": "",
}
testCases = [
{"target": server, "condition": "target.oneOf(Server, Datastore)"},
{"target": server, "condition": "not target.oneOf(Actor, Dataflow)"},
{"target": request, "condition": "target.crosses(Boundary)"},
{"target": user_query, "condition": "target.crosses(Boundary)"},
{"target": server_query, "condition": "target.crosses(Boundary)"},
{"target": func_query, "condition": "not target.crosses(Boundary)"},
{"target": func_query, "condition": "not target.enters(Boundary)"},
{"target": func_query, "condition": "not target.exits(Boundary)"},
{"target": request, "condition": "not target.enters(Boundary)"},
{"target": request, "condition": "target.exits(Boundary)"},
{"target": response, "condition": "target.enters(Boundary)"},
{"target": response, "condition": "not target.exits(Boundary)"},
{"target": user, "condition": "target.inside(Boundary)"},
]
for case in testCases:
t = Threat({**default, **{"condition": case["condition"]}})
self.assertTrue(
t.apply(case["target"]),
"Failed to match {} against {}".format(
case["target"], case["condition"]
),
)

0 comments on commit 725b906

Please sign in to comment.